博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
五:(精华)线程池源码深度解析
阅读量:4212 次
发布时间:2019-05-26

本文共 17442 字,大约阅读时间需要 58 分钟。

线程池源码深度解析

我们这篇分为2部分来:

1)首先说说ThreadPoolExecutor的成员,解释下他们的作用
2)带着1的基础,去看看源码的实现

ThreadPoolExecutor的重要成员

        接下来我们一起去学习一下线程池执行过程的源码。

说源码之前,我们先看一下ThreadPoolExecutor类的一些重要成员;
他们在接下来的源码分析中需要用到。先来直接看ThreadPoolExecutor成员源码:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));    private static final int COUNT_BITS = Integer.SIZE - 3;    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;    // runState is stored in the high-order bits    private static final int RUNNING    = -1 << COUNT_BITS;    private static final int SHUTDOWN   =  0 << COUNT_BITS;    private static final int STOP       =  1 << COUNT_BITS;    private static final int TIDYING    =  2 << COUNT_BITS;    private static final int TERMINATED =  3 << COUNT_BITS;    // Packing and unpacking ctl    private static int runStateOf(int c)     {
return c & ~CAPACITY; } private static int workerCountOf(int c) {
return c & CAPACITY; }

首先看看一行代码:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

关于这个变量的解释,源码注释这里写到:

在这里插入图片描述
这断注释的意思就是:这是主池的控制状态,ctl静态变量他是一个原子性的整数包装类。ctl包含有2个概念性的字段:
1:workerCount:有效的线程数;
2:runState:表示线程池的运行状态,是否正在运行,正在关闭等等;
workCount限制在最大值(2^29)-1≈5亿个;
workCount:ctl的低29位存储有效线程数,runState:ctl的高3位存储线程池运行状态;


private static final int COUNT_BITS = Integer.SIZE - 3;

COUNT_BITS 就是上面ctl对应的workCount中的29次方,他是代表有效线程的二进制最大位数,COUNT_BITS = Integer.SIZE - 3 = 32 - 3 = 29;


private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

这一行代码:这里稍微计算一下,CAPACITY = 2的29次方-1,刚好和上面ctl说的那样,其实这就是workCount的最大值;


// runState is stored in the high-order bits    private static final int RUNNING    = -1 << COUNT_BITS;    private static final int SHUTDOWN   =  0 << COUNT_BITS;    private static final int STOP       =  1 << COUNT_BITS;    private static final int TIDYING    =  2 << COUNT_BITS;    private static final int TERMINATED =  3 << COUNT_BITS;

这一行代码:注释写到,runState被存储在高字节为,也就是ctl的高3位;这里简单了解一下就是:

如果runState<0,代表线程处于可用状态;
如果runState>=0,代表已经被关闭或正在关闭;
1)RUNNABLE:可处理新任务并执行队列中的任务;
2)SHUTDOWN:线程池关闭,不接收新任务,但会处理队列中的阻塞任务;
3)STOP:线程池关闭,不接受新任务,也不处理队列中的阻塞任务;
4)TIDYING:线程池整理,队列中的所有任务执行结束,即将结束;
5)TERMINATED :线程池结束态,已经执行了terminated()方法;


private static int runStateOf(int c)     {
return c & ~CAPACITY; }

这一行代码把ctl和和低29位的非做一个与运算,得出的结果为:线程池的运行状态;


private static int workerCountOf(int c)  {
return c & CAPACITY; }

这一行代码前面说过了,就是计算把ctl和29的非做与运算,得到ctl的低29位,代表线程池中当前的有效线程数;


源码分析

1.首先进入execute()方法:

public void execute(Runnable command) {
if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }

为了方便说明,我一段段的来说:

if (command == null)            throw new NullPointerException();            说明:如果传进来的Runnable为null,则直接抛出空指针异常;
int c = ctl.get();说明:获取线程池的ctl变量,变量的意思在第一部分已经讲过;高3位代表线程池的运行状态,低29位存储有效线程数;
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) return; c = ctl.get();=}说明:1:workerCountOf(c):上面也说过了,他是得到线程池中的工作线程数;2:corePoolSize:核心线程数,如果不懂的,请参考我另一篇,《线程池参数解析》;3:addWorker(command, true):添加一个worker执行子任务,参数为子任务和一个true,这个后面专门说明;因为在这里直接说这个方法,你会出不来了,继续看主线逻辑;3:如果线程池中的工作线程数小于核心线程数,那么就添加一个worker去执行子任务;4:如果添加worker执行成功,那么就直接返回;5:如果添加worker执行失败,那么再次获取最新的ctl;
if (isRunning(c) && workQueue.offer(command)) {
...}说明:能够执行到这一行,说明线程池的workCountOf数量大于了核心线程数,或者添加worker失败;1:先看主线判断,这里的意思是:如果线程池是RUNNING状态,并且该任务能够成功添加到阻塞队列中;2:再看else if:
else if (!addWorker(command, false))            reject(command);说明:1:如果线程池的状态不是RUNNING状态或者子任务添加阻塞队列失败,那么就拒绝该任务;

回到第一个if来,如果线程池的状态是RUNNING状态,并且任务添加到阻塞队列成功:

if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false);}说明:1:int recheck = ctl.get();//重新获取ctl变量2:if (! isRunning(recheck) && remove(command)) reject(command); 2.1:如果线程池此时不是RUNNGIN状态并且从队列中删除任务成功,那么执行拒绝策略; 这里为什么要再次检查线程池状态?因为有可能会被其他线程给关闭; 3:else if (workerCountOf(recheck) == 0) addWorker(null, false);// 3.1:如果线程池还是RUNNGIN状态(注意此时任务已经被成功添加到阻塞队列中了的) 3.2:workerCountOf(recheck) == 0,如果当前可用工作线程数等于0(意思是线程池状态是对的, 但是我现在没有线程给你来执行任务),那么就添加一个worker执行任务, *这里特别注意:addWorker(null, false);为什么这里传的不是commond而是一个null, 因为在此之前,commond已经被添加到阻塞队列当中了,这里只是没有可用线程来执行它,所以这里 仅仅添加一个worker去创建线程来执行队列中的任务(worker后续会详解)*

到目前为止:execute的主体方法的逻辑已经结束;

总结一下:
1:添加一个新任务,如果当前线程池中的有效线程数没有超过核心线程数,那么就添加一个worker来执行任务;如果执行成功,就直接返回,流程结束;
2:如果添加失败或者核心线程数已满,那么就接着走,根据线程池的状态和添加任务到阻塞队列成功与否来判断;
3:如果线程池状态为RUNNING并且添加任务到队列成功,那么就再次检查线程池状态,如果线程池状态此时被shutdown并且任务删除成功,那么直接就拒绝该任务(任务已经从阻塞队列被删除了);
4:如果状态为RUNNING,那么接着判断是否有可用线程,如果没有,那么就添加一个worker创建线程来执行队列中的任务;
5:如果线程池的状态不是RUNNING状态或者子任务添加阻塞队列失败,那么就拒绝该任务;


接着,我们进入addWorker()方法,为了方便,我直接在代码中写分析注释了。

private boolean addWorker(Runnable firstTask, boolean core) {
//循环跳出标识位,一般用于跳出2层 retry: //死循环,类似while(true) for (;;) {
//获取ctl变量 int c = ctl.get(); //获取高3位线程池运行状态 int rs = runStateOf(c); /** *这个if判断比较复杂,我们来分析一下: 1:rs>=SHUTDOWN:带包线程池状态不为RUNNING 2:firstTask:这是从execute方法传进来的子任务参数,可能为commond,可能为null 3:workQueue.isEmty():判断阻塞队列是否为空 我们看到这个if如果满足条件了,就会最后返回false,也就是添加worker失败, 满足if的条件: 1)rs >= SHUTDOWN:线程池状态不为RUNNING 2)! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())): 线程池状态已关闭 且 新任务为null 且 阻塞队列不为空 前面有个非!,那么意思就是:线程池状态为RUNNING,或子任务不为空,或队列为空 满足这1.2条件,就返回false,不会添加子任务,添加worker失败; 也许你还不是明白,那么我们反过来考虑一下什么时候返回true? 1)rs < SHUTDOWN,即为RUNNING状态 2)(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) = true 满足这2个条件之一就会返回true: 1)线程池为RUNNING状态,返回true,允许添加新任务 2)线程池状态为SHUTDOWN,并且新任务为空,并且阻塞队列非空,允许添加新任务; 这里分别对应上文的源码: if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); 1:如果线程池状态为RUNNING,并且有可用线程,那么就直接跳过这个if,继续执行后续; :2:如果池状态已关闭或移除任务失败,那么要确保这个任务在队列中被执行, 如果没有可用线程所以传参数null:addWorker(null, false); 添加一个新的worker来执行该任务; */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; //循环尝试去增加ctl有效线程数+1,cas保证多线程之间的并发安全性 for (;;) {
//获取线程池的有效线程数 int wc = workerCountOf(c); //如果有效线程数超过了最大值2的29次方减1,或者超过了核心线程数或最大线程数, //返回false,因为已经无法再创建新的线程了 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //如果增加有效线程数成功 if (compareAndIncrementWorkerCount(c)) //终止二层for循环,执行到下一个代码块 break retry; //增加有效线程数失败,重新获取有效线程数 c = ctl.get(); //如果发现此时线程池状态发生了变化 if (runStateOf(c) != rs) //那么跳到retry,继续开始循环重试 continue retry; } } //标识worker是否启动成功 boolean workerStarted = false; //标识worker是否添加成功 boolean workerAdded = false; //定义一个worker,初始为null Worker w = null; try {
//静态可重入锁 final ReentrantLock mainLock = this.mainLock; //初始化worker绑定子任务,注意:这里的子任务可能为null //(worker的结构后续会有详解,这里把他理解成用来执行任务的一个线程) w = new Worker(firstTask); //取出worker中新建的线程t final Thread t = w.thread; if (t != null) {
//加锁 mainLock.lock(); try {
//获取ctl变量 int c = ctl.get(); //获取线程池的运行状态 int rs = runStateOf(c); //如果线程池处于RUNNING状态,或者处于(已关闭状态且新任务为null): //这里就是对应execute()中的这段代码逻辑: /** else if (workerCountOf(recheck) == 0) addWorker(null, false); */ if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
//检查线程是否已经被启动 //(这就是为什么同一个Thread不能重复调用start两次的原因, //如果已经启动,再次启动进入到该if判断,发现他已经处于运行中, //那么直接抛出异常) if (t.isAlive()) throw new IllegalThreadStateException(); //线程如果还没有启动,那么将worker添加到workers当中 workers.add(w); //判断workers的大小 int s = workers.size(); //如果大于了最大值 if (s > largestPoolSize) //更新workers数组的最大值为目前workers的最大值 largestPoolSize = s; //添加worker是否成功置为true workerAdded = true; } } finally {
//释放锁 mainLock.unlock(); } //worker添加成功,则启动线程 if (workerAdded) {
t.start(); //启动成功,置位true workerStarted = true; } } } finally {
//启动失败 if (! workerStarted) //从workers中移除掉当前这个worker,并且工作线程数-1, //并调用terminated方法终止线程池 addWorkerFailed(w); } return workerStarted; }

addWorker()方法结束了,这里来总结一下:

1:addWorker(Runnable,true)时,代表当前工作线程数小于核心线程数,并且添加任务执行;
2:addWorker(Runnable,false)时,代表功当前工作线程数已经大于核心线程数,并且阻塞队列已满;
3:addWorker(null,false)时,代表当前工作线程数大于核心线程数,任务添加阻塞队列成功,但是没有可用线程来执行该任务;
4:addWorker(null,true)时,代表当前工作线程数小于核心线程数,没有可用线程执行任务;


addWorker判断的逻辑:

1:线程池为RUNNING状态时,创建一个worker来创建线程,并且将工作线程数+1,同时启动线程;
2:线程池不为RUNNING状态时,新任务被添加到阻塞队列中,重新创建一个worker来创建线程,同时启动线程;会把所有阻塞队列中的任务执行完,线程才会终止;


worker才是执行线程的核心,接下来看看worker的结构和工作原理:

首先看worker的定义:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
private static final long serialVersionUID = 6138294804551838833L; //静态线程成员 final Thread thread; //Runnable成员 Runnable firstTask; //可见性的已完成的任务数 volatile long completedTasks; //构造方法,参数为Runnable Worker(Runnable firstTask) {
//设置为-1防止线程中断;因为中断的前提是要大于等于0(后面会详细讲到) setState(-1); //成员变量指向新任务对象,有可能新任务为空 this.firstTask = firstTask; //从线程工厂中创建一个线程,参数为当前的Runnable this.thread = getThreadFactory().newThread(this); } //执行run方法 public void run() {
//执行入口 runWorker(this); }}

**setState(-1)**为什么是防止线程中断?

首先来看线程池的shutdown方法:

public List
shutdownNow() {
List
tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {
checkShutdownAccess(); advanceRunState(STOP); //重点看这个方法,意思是尝试中断workers中的worker对象的成员Thread interruptWorkers(); tasks = drainQueue(); } finally {
mainLock.unlock(); } tryTerminate(); return tasks;} 进入interruptWorkers()方法:private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {
for (Worker w : workers) //重点看这里,循环每个worker,调用interruptIfStarted()方法 w.interruptIfStarted(); } finally {
mainLock.unlock(); }}进入interruptIfStarted()方法: void interruptIfStarted() {
Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt(); } catch (SecurityException ignore) {
} } }这段代码我相信大家都能看得懂:当state>=0并且线程不为null,且线程中断标识=false的时候,调用线程的interrupt方法设置线程中断标识;所以我们在分析worker创建的构造函数中,加上setState(-1)防止线程中断;是不是豁然开朗?....

好了,我们接着来看runWorker(this);

先看源码:

final void runWorker(Worker w) {
//获取当前线程 Thread wt = Thread.currentThread(); //获取传进来的新任务,并且用一个新的Runnable重新指向新任务 //新任务有可能为null(这里哪种为null你应该懂了吧,前面反复强调了好多次了呢) Runnable task = w.firstTask; //将新任务交给上面一行task之后,自己重新设置为null w.firstTask = null; //解锁,将state设置为0,允许线程中断 w.unlock(); //异常结束标识 boolean completedAbruptly = true; try {
//1:如果当前任务不为空,准备执行当前任务,并且接着循环执行完队列中的所有子任务 //2:如果当前任务为空,即属于那种核心线程数已满,任务添加到阻塞队列中,但是没有可用线程, //3:所以new了一个worker来执行队列中的任务,这时候task就是等于null //4:该方法会一直反复利用在当前线程执行完所有的子任务,这就是线程池的线程复用---重点 while (task != null || (task = getTask()) != null) {
//上锁,用来保证及时线程中断也能完成后续所有的任务 w.lock(); //1:如果线程池状态大于等于STOP状态(调用了shutdown()); //2: 检查线程是否中断,并且线程池状态为STOP,并且线程还未中断 //满足1或2,都执行wt.interrupt();给线程设置中断标识; //(确保调用了shutdown之后成功设置中断标识) if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) //设置中断标识 wt.interrupt(); try {
//在线程wt上执行任务task beforeExecute(wt, task); Throwable thrown = null; try {
//执行线程的run()方法 task.run(); } catch (RuntimeException x) {
thrown = x; throw x; } catch (Error x) {
thrown = x; throw x; } catch (Throwable x) {
thrown = x; throw new Error(x); } finally {
afterExecute(task, thrown); } } finally {
//执行完之后task置空 task = null; //统计已完成的任务数 w.completedTasks++; //允许中断 w.unlock(); } } completedAbruptly = false; } finally {
processWorkerExit(w, completedAbruptly); } }

其中循环遍历阻塞队列getTask()方法也很重要,我们看一下:

private Runnable getTask() {
//从队列中取任务是否超时 boolean timedOut = false; //循环标识位置 retry: //死循环 for (;;) {
//获取ctl变量 int c = ctl.get(); //获取线程池运行状态 int rs = runStateOf(c); //如果线程池调用了shutdown或阻塞队列执行完成,已经为空,那么就减少ctl线程数 //如果线程池调用了shutdownNow,队列中的任务还没执行完,就直接丢弃执行, //减少线程数,返回空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); return null; } //线程是否超过空闲存活,即是否超过KeepAliveTime boolean timed; //死循环 for (;;) {
//获取工作线程数 int wc = workerCountOf(c); //allowCoreThreadTimeOut=true时,核心线程超时会关闭 //工作线程数超过核心线程数,timed=true timed = allowCoreThreadTimeOut || wc > corePoolSize; //如果工作线程数小于最大线程数且未超时未超过,则跳出循环不处理 if (wc <= maximumPoolSize && ! (timedOut && timed)) break; //如果工作线程数超过了最大线程数或且有线程在取任务超时,那么就关闭一个线程放弃执行, //ctl减1 if (compareAndDecrementWorkerCount(c)) return null; //重新读取ctl c = ctl.get(); //如果线程池状态发生了变化,重新遍历取任务 if (runStateOf(c) != rs) continue retry; } try {
/** 三元运算符: 1:允许核心线程数超时或当前工作线程超过核心线程数,任务从队列中超时时间内取; 2: 如果工作线程数不超过核心线程数且核心线程不超时,则从队列中阻塞取任务; */ Runnable r = timed ? //线程在超时时间内取队列中第一个任务, //如果超时,还没有可用任务,则返回null workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //没有超过核心线程数,则使核心线程一直挂起,阻塞,等待任务的到来 workQueue.take(); if (r != null) return r; //如果没有任务,则timeOut设置成true,回到上面的判断,就会关闭掉线程 timedOut = true; } catch (InterruptedException retry) {
timedOut = false; } } }

总结一下getTask方法:

1:在一个worker内部,同一个Thread里面一直循环遍历取出阻塞队列中的所有任务并执行;
2:如果线程池调用了shutdown,阻塞队列不为空,那么会继续执行完剩下的任务;
3:如果工作线程数大于核心线程数或者超时线程从队列中取不到任务,那么就会关闭当前线程;

源码分析就到此为止了,整体来总结一下:

1:新任务被请求执行时,如果工作线程不超过核心线程数,那么新建一个线程来执行;
2:如果工作线程超过了核心线程,则将新任务尝试添加到阻塞队列并且去执行;
3:如果核心线程数已满,阻塞队列已满,那么就尝试新建一个线程去执行;如果执行失败,代表工作线程已经大于等于最大线程数,那么就会执行拒绝策略;
4:如果工作线程大于核心线程,线程取任务超时,那么该线程放弃执行任务并关闭当前线程;
5:如果工作线程小于核心线程,阻塞度列为空,那么该线程阻塞挂起,直到有任务进来;
4:如果线程池调用了shutdown,阻塞队列不为空,那么就继续执行队列中的任务,此时拒绝新任务;
5:如果调用了shutdownNow,则在getTask()时直接返回null,放弃执行,关闭当前线程;

好了,多线程篇先到这里为止。

转载地址:http://vikmi.baihongyu.com/

你可能感兴趣的文章
Windows下单机安装Spark开发环境
查看>>
tomcat如何配置环境变量
查看>>
Maven实战(三)Eclipse构建Maven项目
查看>>
Tomcat安装及配置教程
查看>>
The superclass "javax.servlet.http.HttpServlet" was not found on the Java Build Path
查看>>
JDK、JRE、JVM三者间的关系
查看>>
为什么 Chrome 开启 QUIC 之后能够快速顺畅访问 Google 和 Gmail?
查看>>
用 Tomcat 和 Eclipse 开发 Web 应用程序
查看>>
60款顶级大数据开源工具
查看>>
eclipse 配置scala问题-More than one scala library found in the build path
查看>>
IIS 承载的服务失败
查看>>
写连接代码时需要注意2000和2005的不同:
查看>>
五种开源协议的比较(BSD,Apache,GPL,LGPL,MIT) – 整理
查看>>
程序员公司任职软件开发著作权该归谁呢
查看>>
OLTP报表和OLAP报表
查看>>
Hbase案例:浏览器用户行为分析
查看>>
SQL Server 2008 Reporting Services报表中文乱码问题
查看>>
report builder 报表表头固定
查看>>
DNS中的正向解析与反向解析
查看>>
忘记oracle的sys用户密码怎么修改
查看>>