java并发学习之二:线程池(五)
之前的线程池已实现了基本的功能:运行每一个线程,而且测试了一下,大约速度是ThreadPoolExecutor的1.5倍(当然,这是有充分的理由的,后文会提到)
之后的版本将准备是实现“优雅退出”和优化(非阻塞)空闲线程队列了,这个步骤想了很久,发现了很多的问题(包括准备的实现方法也在这里列一下):
1.初步构思了几个方法
void shutDown():该方法将让池不再接受任务,但会将现有的任务全部运行结束后停止List<Runnable> shutDownNow():该方法会interrupt线程,然后收集未调用的任务,并进一步回收已调用的任务(“已调用”不包括运行中的,因为运行中无法人为停止,只包括已下发到线程,但线程尚未执行的),然后将这些任务返回List<Runnable> shutDownAndWait():跟shutDownNow类似,区别是会等待至线程池完全关闭
2.线程及池的状态,一开始很傻气地用了boolean去实现,比如:isShutDown,isRunning等,而这些变量又无法避免地被声明为volatile,我们知道,对volatile的写和读都会比对线程的内部变量的访问占用更多的消耗,与其用多个boolean,不如用一个runState,只需要判断大小就可以了,但对volatile变量的访问只有一次,而判断大小所占用的cpu时间几乎可以忽略不计
这也是一个原则性的问题了,这里记录下来,算是积累把:
表示一个东西的状态(多个状态),尽量还是使用int,如果有同步问题,则用volatile,CAS对于单个变量的控制还是很有效的
3.怎么回收已经下发,但未执行的任务?这就牵扯到另一件事了,需要获取线程的状态(也可以是任务的执行状态,因为这里目标是回收未执行的任务),这就不得不引入了更多的volatile变量,然后这个状态变量,还必须在每次执行任务的时候进行修改,就变成了一个很大的消耗了(这是不能容忍的)
注:这里不能用Thread.getState()来获得线程的状态,该状态是用以系统监控的,并不是用来控制逻辑的(也就是说它不准确)
4.空闲线程队列如果打算重新实现,这又不可避免地需要加入更多的volatile和与程序逻辑相关的happen-before关系了,这会让本来的程序逻辑变得模糊,而且出现问题的几率变得非常大
5.因为要对每个线程进行interrupt,所以又不得不再维护一个线程队列,所以又不得不
……
发现实现当初的目标越来越麻烦了
带着这些问题,最终还是去看了ThreadPoolExecutor的实现,看看大师是怎么做的
发现ThreadPoolExecutor的性能优化只做在了减少锁的范围上,使用的是ReentrantLock,而大部分的参数都使用了volatile,目的只是为了保证可见性,几乎没有出现利用happen-before规则的代码,也就是说,在ThreadPoolExecutor的实现上,优化程度只是在锁的级别上,并没有考虑进一步的优化。而查看类的作者,发现是Doug Lea
回头想想,的确,线程池是不同于AQS的框架的,因为该线程池的实现更注重的是稳定,可复用性,灵活性,安全性。一般说来,线程池调用时间与真正任务的执行时间不是一个数量级的,所以又何必去太计较那点性能的消耗呢?
因此之前所说的“运行速度是ThreadPoolExecutor的1.5倍”的原因也就出来了:任务太简单了,所以大大提高了任务调用时间所占的比例,在现实生活中应该是很少出现这样的情况的。而又说明了另一点,每样功能的实现都是要消耗性能的(特指在并发中),所以在写要求高性能的相关的实现时,应该尽量将需求给落实了,减少功能的需求,才能得到更好的性能
这里再写一下看ThreadPoolExecutor的收获
1.它可以让你注册一个RejectedExecutionHandler,来控制你的下发被拒绝的任务
2.它允许你注册一个threadFactory,来创建线程(这样可以自由地订制自己所需要的线程)
3.里面有一个方法很值得学习的:
我们在正常逻辑中,会有一些常见场景和例外场景(像可能常见场景不需要加锁,但例外场景需要),例外场景应该用尽量少的变量来判断是否进入,而常见场景应该与例外场景完全分离(即使有方法可以重用)
举个例子
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) {//常见场景 if (runState != RUNNING || poolSize == 0)//判断是否进入例外场景 ensureQueuedTaskHandled(command);//例外场景,有加锁动作 } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }