停止基于线程的Service--JCIP7.2读书笔记
[本文是我对Java Concurrency In Practice 7.2的归纳和总结. ?转载请注明作者和出处, ?如有谬误, 欢迎在评论中指正. ]
以ExecutorService为例, 该类内部封装有多个线程, 类外部无法直接停止这些线程. 相反, 外部调用Service的shutDown和shutDownNow方法关闭Service, 而Service负责停止其拥有的线程.
大多数server应用会使用到log, 下例中的LogWriter是一个使用生产者消费者模式构建的log service, 需要打印log的线程将待打印的内容加入到阻塞队列中, 而logger线程则不断的从阻塞队列中取出数据输出:
public class LogWriter {private final BlockingQueue<String> queue;private final LoggerThread logger;public LogWriter(Writer writer) {this.queue = new LinkedBlockingQueue<String>(CAPACITY);this.logger = new LoggerThread(writer);}public void start() {logger.start();}/** * 需要打印数据的线程调用该方法, 将待打印数据加入阻塞队列 */public void log(String msg) throws InterruptedException {queue.put(msg);}/** * 负责从阻塞队列中取出数据输出的线程 */private class LoggerThread extends Thread {private final PrintWriter writer;// ...public void run() {try {while (true)writer.println(queue.take());} catch (InterruptedException ignored) {} finally {writer.close();}}}}
LogWriter内部封装有LoggerThread线程, 所以LogWriter是一个基于线程构建的Service. 根据ExecutorService的经验, 我们需要在LogWriter中提供停止LoggerThread线程的方法. 看起来这并不是很难, 我们只需在LogWriter中添加shutDown方法:
/** * 该方法用于停止LoggerThread线程 */public void shutDown() {logger.interrupt();}
当LogWriter.shutDown方法被调用时, LoggerThread线程的中断标记被设置为true, 之后LoggerThread线程执行queue.take()方法时会抛出InterruptedException异常, 从而使得LoggerThread线程结束.
然而这样的shutDown方法并不是很恰当:?
1. 丢弃了队列中尚未来得及输出的数据.
2. 更严重的是, 假如线程A对LogWriter.log方法的调用因为队列已满而阻塞, 此时停止LoggerThread线程将导致线程A永远阻塞在queue.put方法上.
对上例的改进:
public class LogWriter {private final BlockingQueue<String> queue;private final LoggerThread loggerThread;private final PrintWriter writer;/** * 表示是否关闭Service */private boolean isShutdown;/** * 队列中待处理数据的数量 */private int reservations;public void start() {loggerThread.start();}public void shutDown() {synchronized (this) {isShutdown = true;}loggerThread.interrupt();}public void log(String msg) throws InterruptedException {synchronized (this) {// service已关闭后调用log方法直接抛出异常if (isShutdown)throw new IllegalStateException("Service has been shut down");++reservations;}// BlockingQueue本身就是线程安全的, put方法的调用不在同步代码块中// 我们只需要保证isShutdown和reservations是线程安全的即可queue.put(msg);}private class LoggerThread extends Thread {public void run() {try {while (true) {try {synchronized (this) {// 当service已关闭且处理完队列中的所有数据时才跳出while循环if (isShutdown && reservations == 0)break;}String msg = queue.take();synchronized (this) {--reservations;}writer.println(msg);} catch (InterruptedException e) {// 发生InterruptedException异常时不应该立刻跳出while循环// 而应该继续输出log, 直到处理完队列中的所有数据}}} finally {writer.close();}}}}
上面的处理显得过于复杂, 利用ExecutorService可以编写出相对更简洁的程序:
public class LogService {/** * 创建只包含单个线程的线程池, 提交给该线程池的任务将以串行的方式逐个运行 * 本例中, 此线程用于执行打印log的任务 */private final ExecutorService exec = Executors.newSingleThreadExecutor();private final PrintWriter writer;public void start() {}public void shutdown() throws InterruptedException {try {// 关闭ExecutorService后再调用其awaitTermination将导致当前线程阻塞, 直到所有已提交的任务执行完毕, 或者发生超时exec.shutdown();exec.awaitTermination(TIMEOUT, UNIT);} finally {writer.close();}}public void log(String msg) {try {// 线程池关闭后再调用其execute方法将抛出RejectedExecutionException异常exec.execute(new WriteTask(msg));} catch (RejectedExecutionException ignored) {}}private final class WriteTask implements Runnable {private String msg;public WriteTask(String msg) {this.msg = msg;}@Overridepublic void run() {writer.println(msg);}}}