JAVA线程池源码解析

JAVA线程池源码解析

前言

本文将只针对1.8中的ThreadPoolExecutor做解析,主要解析execute方法、线程在池中的声明周期,从提交到Worker结束的整个过程。

线程池构建

直接看源码:

Alt text


我们依次看一下线程池的所有入参:

  1. corePoolSize核心线程数
  2. maximumPoolSize最大线程数
  3. keepAliveTime空闲线程维持活跃的等待时间
  4. unit等待时间的单位
  5. workQueue等待队列
  6. threadFactory线程工厂
  7. handler拒绝策略

后面我们会在源码中依次看到这几个参数的作用。

execute

直接看一下execute的代码:

Alt text


先看ctl,private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));这个原子型int的作用非常强大,利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态:

  1. RUNNING:-1 << COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务
  2. SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务
  3. STOP : 1 << COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务
  4. TIDYING : 2 << COUNT_BITS,即高3位为010,terminated方法执行之后会把线程池置为TIDYING
  5. TERMINATED: 3 << COUNT_BITS,即高3位为011

workerCount方法即为从ctl中取worker的数量,如果小于核心线程数,则会尝试进行addWorker操作,command即为我们要运行的任务,addWorker方法下面会慢慢解析。如果添加成功返回;不成功则更新c的值。

如果worker数量大于等于核心线程数,根据cc就是ctl的值)判断线程池是否还在运行,并且尝试添加任务到队列中,如果添加成功,这里需要一个double check,因为如果正在入队列的过程中,线程池停止,则需要一个回滚操作(回滚操作即为从队列中remove这条任务,并且用拒绝策略处理这条任务)。如果double check校验通过,但是这时候worker数量为0,则添加worker

那么如果把任务放入阻塞队列失败,即队列已满呢(workQueue.offer(command)返回false),这时候,直接尝试增加一个worker,如果失败,则执行拒绝策略处理该条任务。

那么,从execute方法我们可以看出来,execute的流程:

  1. 如果worker的数量没有达到核心线程数,则添加worker执行这条任务
  2. 如果worker的数量超过核心线程数,则放入到阻塞队列中等待执行
  3. 如果阻塞队列也放满了,则直接尝试新增一个worker执行任务,尝试失败则执行拒绝策略。

Worker

上面一直提到的Worker是线程池中的一个类:

Alt text


可以看到,这类是继承自AQS的,也是各种锁的父类,这是为了方便的进行并发的控制,并且是实现了run方法,run的方式是把自己作为一个task,丢到runWorker方法中。

大概看了Worker,再看addWorker方法:

Alt text


这只是前半部分,添加时,还是获取一下c,并且获取了线程池的状态,如果状态大于等于SHUTDOWN,则不提交,返回false。

下面是一个死循环,内部主要在根据core(是一个boolean,意思是,是否是核心线程)入参来处理,如果core为true,则比较worker count是否大于等于corePoolSize,否则判断是否大于等于maximumPoolSize。如果大于等于则返回false。否则通过CAS增加worker的数量,增加成功,则跳出循环下面开始创建新线程;失败,则说明出在竞争态中,则再次获取新的c,然后再循环处理上述过程。

这里我们能看出corePoolSizemaximumPoolSize的作用:线程池维护了一批叫做核心线程的线程,如果已经创建了足够的核心线程,我们可以再去创建线程,但是不能超过maximumPoolSize个。(如果二者值一样,则只会有corePoolSize个线程)

再看后半部分:

Alt text


首先创建Worker,在有可重入锁的锁定情况下,把Worker放到一个workers(这个workers是一个hashset)中。worker的构建方法入参firsttask就是我们execute提交的task。后面会提到为什么要交firsttask。如果加入到workers,会执行worker中的thread,这个thread就是代表worker的线程,那么,这个worker run的时候执行什么,上文已经说过了,执行runWorker(this)。现在具体看runWorker方法:

Alt text


这是整个线程池的核心

  1. worker释放锁。
  2. 获取任务firsttask,这里为什么叫第一个任务呢,因为一个worker创建处来,一开始会有个任务,这个任务执行完了呢,就会执行getTask方法,从队列中去获取任务。
  3. 在执行任务的前后,可以根据业务场景自定义beforeExecuteafterExecute方法。
  4. 如果队列中没有任务,getTask会等待。
  5. getTask出来的任务,会执行taskrun方法。注意:这里不是执行start方法!,要时刻记住,这是在执行worker!而worker本身创建出来就已经自身绑定了一个线程,所以我们是把任务取出来在worker的这个线程同步的跑,Worker的线程怎么创建的,可以看上文的Worker类。

总结一下,addWorkerrunWorker的整个流程:

  1. 检查线程池状态和核心线程数,然后创建Worker,把Worker放到workers中。
  2. 执行workerworker先执行创建时就分配给他的任务,执行之后再去阻塞队列中取任务执行。

worker的生命周期

最后谈谈worker的声明周期,这是很重要的地方,但是网上大多博文都忽略了,我们细看一下。

创建出来的流程我们已经说过了,不赘述,只看worker生命周期的终结。回忆上文,我们说到,worker先执行一开始分配给他的firsttask,执行完再从队列中取任务执行。我们看这个循环的代码:

1
2
3
while (task != null || (task = getTask()) != null) {
//....
}

task一开始是firsttask,执行完firsttask,则task = getTask()。我们看getTask

Alt text


还是老讨论,从队列中获取任务的时候,校验线程池状态是否关闭。我们直接看获取任务的地方:

Alt text
红框中我们看到了keepAliveTime,我们结合这个字段的含义,就知道,一个Worker从阻塞队列中取任务时,只会等待keepAliveTime这么长时间,如果取不到,则返回的task就是null,结合while (task != null || (task = getTask()) != null)这句,如果tasknull,则跳出循环,那么跳出循环干嘛了,再看循环结束后的代码:

Alt text


这是整个while循环的代码,为了清晰,再贴一次,我们可以看到,整个while循环在try块中,我们看一下有一个叫做completedAbruptly的参数,这个只有在while循环结束的时候会被设置为false!如果while循环中,执行task发生异常,会执行抛出操作,则这个参数不会被设置为false,而是初始化时候的true

不管while正常结束,还是异常中断,都会进入finally中,执行processWorkerExit方法:

Alt text


这个completedAbruptly的含义是:如果为true,则说明worker的结束是因为用户的task中有异常结束的

看代码这个退出worker干了啥:

  1. 如果是是用户问题导致worker结束,则减少worker数量。
  2. 全局加锁,把线程池执行完成了的task数量,加上这个worker执行了的task,相当于一个绩效考核。
  3. workers里移除这个worker
  4. 尝试终止线程池。
  5. 线程池状态小于STOP,则如果线程池是由于用户问题中断,直接补充一个worker(就是下面那句addWorker(null, false))。如果是由于用户问题中断,则判断核心线程数的数量和worker count,如果需要补充,则同样补充一个worker。注意:这里补充的worker是没有分配firsttask的,意味着这个补充的Worker会直接从阻塞队列中获取任务。

至此,我们总结一下worker的生命周期中的结束:

  1. worker在一个while循环中,不停地从阻塞队列中获取任务。
  2. keepAliveTime时间内获取不到任务,或者获取任务成功但是执行任务失败时,退出while循环,进入processWorkerExit
  3. 如果结束该Worker之后,需要补充worker,则补充一个,否则,一个worker的生命结束,等待被GC(从workers被移除之后,这个worker再没有任何引用)。

总结

至此,整个线程池的源码分析结束。我们可以看到,线程池中最主要的是Worker的整个生命周期,整个生命周期和线程池的状态流转关联的非常紧密,整个结构非常严谨。最后膜拜一下Doug Lea大神。