本文共 17442 字,大约阅读时间需要 58 分钟。
我们这篇分为2部分来:
1)首先说说ThreadPoolExecutor的成员,解释下他们的作用 2)带着1的基础,去看看源码的实现接下来我们一起去学习一下线程池执行过程的源码。
说源码之前,我们先看一下ThreadPoolExecutor类的一些重要成员; 他们在接下来的源码分析中需要用到。先来直接看ThreadPoolExecutor成员源码:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; }
首先看看一行代码:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
关于这个变量的解释,源码注释这里写到:
这断注释的意思就是:这是主池的控制状态,ctl静态变量他是一个原子性的整数包装类。ctl包含有2个概念性的字段: 1:workerCount:有效的线程数; 2:runState:表示线程池的运行状态,是否正在运行,正在关闭等等; workCount限制在最大值(2^29)-1≈5亿个; workCount:ctl的低29位存储有效线程数,runState:ctl的高3位存储线程池运行状态;private static final int COUNT_BITS = Integer.SIZE - 3;
COUNT_BITS 就是上面ctl对应的workCount中的29次方,他是代表有效线程的二进制最大位数,COUNT_BITS = Integer.SIZE - 3 = 32 - 3 = 29;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
这一行代码:这里稍微计算一下,CAPACITY = 2的29次方-1,刚好和上面ctl说的那样,其实这就是workCount的最大值;
// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
这一行代码:注释写到,runState被存储在高字节为,也就是ctl的高3位;这里简单了解一下就是:
如果runState<0,代表线程处于可用状态; 如果runState>=0,代表已经被关闭或正在关闭; 1)RUNNABLE:可处理新任务并执行队列中的任务; 2)SHUTDOWN:线程池关闭,不接收新任务,但会处理队列中的阻塞任务; 3)STOP:线程池关闭,不接受新任务,也不处理队列中的阻塞任务; 4)TIDYING:线程池整理,队列中的所有任务执行结束,即将结束; 5)TERMINATED :线程池结束态,已经执行了terminated()方法;private static int runStateOf(int c) { return c & ~CAPACITY; }
这一行代码把ctl和和低29位的非做一个与运算,得出的结果为:线程池的运行状态;
private static int workerCountOf(int c) { return c & CAPACITY; }
这一行代码前面说过了,就是计算把ctl和29的非做与运算,得到ctl的低29位,代表线程池中当前的有效线程数;
1.首先进入execute()方法:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
为了方便说明,我一段段的来说:
if (command == null) throw new NullPointerException(); 说明:如果传进来的Runnable为null,则直接抛出空指针异常;
int c = ctl.get();说明:获取线程池的ctl变量,变量的意思在第一部分已经讲过;高3位代表线程池的运行状态,低29位存储有效线程数;
if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get();=}说明:1:workerCountOf(c):上面也说过了,他是得到线程池中的工作线程数;2:corePoolSize:核心线程数,如果不懂的,请参考我另一篇,《线程池参数解析》;3:addWorker(command, true):添加一个worker执行子任务,参数为子任务和一个true,这个后面专门说明;因为在这里直接说这个方法,你会出不来了,继续看主线逻辑;3:如果线程池中的工作线程数小于核心线程数,那么就添加一个worker去执行子任务;4:如果添加worker执行成功,那么就直接返回;5:如果添加worker执行失败,那么再次获取最新的ctl;
if (isRunning(c) && workQueue.offer(command)) { ...}说明:能够执行到这一行,说明线程池的workCountOf数量大于了核心线程数,或者添加worker失败;1:先看主线判断,这里的意思是:如果线程池是RUNNING状态,并且该任务能够成功添加到阻塞队列中;2:再看else if:
else if (!addWorker(command, false)) reject(command);说明:1:如果线程池的状态不是RUNNING状态或者子任务添加阻塞队列失败,那么就拒绝该任务;
回到第一个if来,如果线程池的状态是RUNNING状态,并且任务添加到阻塞队列成功:
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false);}说明:1:int recheck = ctl.get();//重新获取ctl变量2:if (! isRunning(recheck) && remove(command)) reject(command); 2.1:如果线程池此时不是RUNNGIN状态并且从队列中删除任务成功,那么执行拒绝策略; 这里为什么要再次检查线程池状态?因为有可能会被其他线程给关闭; 3:else if (workerCountOf(recheck) == 0) addWorker(null, false);// 3.1:如果线程池还是RUNNGIN状态(注意此时任务已经被成功添加到阻塞队列中了的) 3.2:workerCountOf(recheck) == 0,如果当前可用工作线程数等于0(意思是线程池状态是对的, 但是我现在没有线程给你来执行任务),那么就添加一个worker执行任务, *这里特别注意:addWorker(null, false);为什么这里传的不是commond而是一个null, 因为在此之前,commond已经被添加到阻塞队列当中了,这里只是没有可用线程来执行它,所以这里 仅仅添加一个worker去创建线程来执行队列中的任务(worker后续会详解)*
到目前为止:execute的主体方法的逻辑已经结束;
总结一下: 1:添加一个新任务,如果当前线程池中的有效线程数没有超过核心线程数,那么就添加一个worker来执行任务;如果执行成功,就直接返回,流程结束; 2:如果添加失败或者核心线程数已满,那么就接着走,根据线程池的状态和添加任务到阻塞队列成功与否来判断; 3:如果线程池状态为RUNNING并且添加任务到队列成功,那么就再次检查线程池状态,如果线程池状态此时被shutdown并且任务删除成功,那么直接就拒绝该任务(任务已经从阻塞队列被删除了); 4:如果状态为RUNNING,那么接着判断是否有可用线程,如果没有,那么就添加一个worker创建线程来执行队列中的任务; 5:如果线程池的状态不是RUNNING状态或者子任务添加阻塞队列失败,那么就拒绝该任务;接着,我们进入addWorker()方法,为了方便,我直接在代码中写分析注释了。
private boolean addWorker(Runnable firstTask, boolean core) { //循环跳出标识位,一般用于跳出2层 retry: //死循环,类似while(true) for (;;) { //获取ctl变量 int c = ctl.get(); //获取高3位线程池运行状态 int rs = runStateOf(c); /** *这个if判断比较复杂,我们来分析一下: 1:rs>=SHUTDOWN:带包线程池状态不为RUNNING 2:firstTask:这是从execute方法传进来的子任务参数,可能为commond,可能为null 3:workQueue.isEmty():判断阻塞队列是否为空 我们看到这个if如果满足条件了,就会最后返回false,也就是添加worker失败, 满足if的条件: 1)rs >= SHUTDOWN:线程池状态不为RUNNING 2)! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())): 线程池状态已关闭 且 新任务为null 且 阻塞队列不为空 前面有个非!,那么意思就是:线程池状态为RUNNING,或子任务不为空,或队列为空 满足这1.2条件,就返回false,不会添加子任务,添加worker失败; 也许你还不是明白,那么我们反过来考虑一下什么时候返回true? 1)rs < SHUTDOWN,即为RUNNING状态 2)(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) = true 满足这2个条件之一就会返回true: 1)线程池为RUNNING状态,返回true,允许添加新任务 2)线程池状态为SHUTDOWN,并且新任务为空,并且阻塞队列非空,允许添加新任务; 这里分别对应上文的源码: if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); 1:如果线程池状态为RUNNING,并且有可用线程,那么就直接跳过这个if,继续执行后续; :2:如果池状态已关闭或移除任务失败,那么要确保这个任务在队列中被执行, 如果没有可用线程所以传参数null:addWorker(null, false); 添加一个新的worker来执行该任务; */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; //循环尝试去增加ctl有效线程数+1,cas保证多线程之间的并发安全性 for (;;) { //获取线程池的有效线程数 int wc = workerCountOf(c); //如果有效线程数超过了最大值2的29次方减1,或者超过了核心线程数或最大线程数, //返回false,因为已经无法再创建新的线程了 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //如果增加有效线程数成功 if (compareAndIncrementWorkerCount(c)) //终止二层for循环,执行到下一个代码块 break retry; //增加有效线程数失败,重新获取有效线程数 c = ctl.get(); //如果发现此时线程池状态发生了变化 if (runStateOf(c) != rs) //那么跳到retry,继续开始循环重试 continue retry; } } //标识worker是否启动成功 boolean workerStarted = false; //标识worker是否添加成功 boolean workerAdded = false; //定义一个worker,初始为null Worker w = null; try { //静态可重入锁 final ReentrantLock mainLock = this.mainLock; //初始化worker绑定子任务,注意:这里的子任务可能为null //(worker的结构后续会有详解,这里把他理解成用来执行任务的一个线程) w = new Worker(firstTask); //取出worker中新建的线程t final Thread t = w.thread; if (t != null) { //加锁 mainLock.lock(); try { //获取ctl变量 int c = ctl.get(); //获取线程池的运行状态 int rs = runStateOf(c); //如果线程池处于RUNNING状态,或者处于(已关闭状态且新任务为null): //这里就是对应execute()中的这段代码逻辑: /** else if (workerCountOf(recheck) == 0) addWorker(null, false); */ if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //检查线程是否已经被启动 //(这就是为什么同一个Thread不能重复调用start两次的原因, //如果已经启动,再次启动进入到该if判断,发现他已经处于运行中, //那么直接抛出异常) if (t.isAlive()) throw new IllegalThreadStateException(); //线程如果还没有启动,那么将worker添加到workers当中 workers.add(w); //判断workers的大小 int s = workers.size(); //如果大于了最大值 if (s > largestPoolSize) //更新workers数组的最大值为目前workers的最大值 largestPoolSize = s; //添加worker是否成功置为true workerAdded = true; } } finally { //释放锁 mainLock.unlock(); } //worker添加成功,则启动线程 if (workerAdded) { t.start(); //启动成功,置位true workerStarted = true; } } } finally { //启动失败 if (! workerStarted) //从workers中移除掉当前这个worker,并且工作线程数-1, //并调用terminated方法终止线程池 addWorkerFailed(w); } return workerStarted; }
addWorker()方法结束了,这里来总结一下:
1:addWorker(Runnable,true)时,代表当前工作线程数小于核心线程数,并且添加任务执行; 2:addWorker(Runnable,false)时,代表功当前工作线程数已经大于核心线程数,并且阻塞队列已满; 3:addWorker(null,false)时,代表当前工作线程数大于核心线程数,任务添加阻塞队列成功,但是没有可用线程来执行该任务; 4:addWorker(null,true)时,代表当前工作线程数小于核心线程数,没有可用线程执行任务;addWorker判断的逻辑:
1:线程池为RUNNING状态时,创建一个worker来创建线程,并且将工作线程数+1,同时启动线程; 2:线程池不为RUNNING状态时,新任务被添加到阻塞队列中,重新创建一个worker来创建线程,同时启动线程;会把所有阻塞队列中的任务执行完,线程才会终止;worker才是执行线程的核心,接下来看看worker的结构和工作原理:
首先看worker的定义:private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; //静态线程成员 final Thread thread; //Runnable成员 Runnable firstTask; //可见性的已完成的任务数 volatile long completedTasks; //构造方法,参数为Runnable Worker(Runnable firstTask) { //设置为-1防止线程中断;因为中断的前提是要大于等于0(后面会详细讲到) setState(-1); //成员变量指向新任务对象,有可能新任务为空 this.firstTask = firstTask; //从线程工厂中创建一个线程,参数为当前的Runnable this.thread = getThreadFactory().newThread(this); } //执行run方法 public void run() { //执行入口 runWorker(this); }}
**setState(-1)**为什么是防止线程中断?
首先来看线程池的shutdown方法:public ListshutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); //重点看这个方法,意思是尝试中断workers中的worker对象的成员Thread interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks;} 进入interruptWorkers()方法:private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) //重点看这里,循环每个worker,调用interruptIfStarted()方法 w.interruptIfStarted(); } finally { mainLock.unlock(); }}进入interruptIfStarted()方法: void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }这段代码我相信大家都能看得懂:当state>=0并且线程不为null,且线程中断标识=false的时候,调用线程的interrupt方法设置线程中断标识;所以我们在分析worker创建的构造函数中,加上setState(-1)防止线程中断;是不是豁然开朗?....
好了,我们接着来看runWorker(this);
先看源码:final void runWorker(Worker w) { //获取当前线程 Thread wt = Thread.currentThread(); //获取传进来的新任务,并且用一个新的Runnable重新指向新任务 //新任务有可能为null(这里哪种为null你应该懂了吧,前面反复强调了好多次了呢) Runnable task = w.firstTask; //将新任务交给上面一行task之后,自己重新设置为null w.firstTask = null; //解锁,将state设置为0,允许线程中断 w.unlock(); //异常结束标识 boolean completedAbruptly = true; try { //1:如果当前任务不为空,准备执行当前任务,并且接着循环执行完队列中的所有子任务 //2:如果当前任务为空,即属于那种核心线程数已满,任务添加到阻塞队列中,但是没有可用线程, //3:所以new了一个worker来执行队列中的任务,这时候task就是等于null //4:该方法会一直反复利用在当前线程执行完所有的子任务,这就是线程池的线程复用---重点 while (task != null || (task = getTask()) != null) { //上锁,用来保证及时线程中断也能完成后续所有的任务 w.lock(); //1:如果线程池状态大于等于STOP状态(调用了shutdown()); //2: 检查线程是否中断,并且线程池状态为STOP,并且线程还未中断 //满足1或2,都执行wt.interrupt();给线程设置中断标识; //(确保调用了shutdown之后成功设置中断标识) if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) //设置中断标识 wt.interrupt(); try { //在线程wt上执行任务task beforeExecute(wt, task); Throwable thrown = null; try { //执行线程的run()方法 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { //执行完之后task置空 task = null; //统计已完成的任务数 w.completedTasks++; //允许中断 w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
其中循环遍历阻塞队列getTask()方法也很重要,我们看一下:
private Runnable getTask() { //从队列中取任务是否超时 boolean timedOut = false; //循环标识位置 retry: //死循环 for (;;) { //获取ctl变量 int c = ctl.get(); //获取线程池运行状态 int rs = runStateOf(c); //如果线程池调用了shutdown或阻塞队列执行完成,已经为空,那么就减少ctl线程数 //如果线程池调用了shutdownNow,队列中的任务还没执行完,就直接丢弃执行, //减少线程数,返回空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } //线程是否超过空闲存活,即是否超过KeepAliveTime boolean timed; //死循环 for (;;) { //获取工作线程数 int wc = workerCountOf(c); //allowCoreThreadTimeOut=true时,核心线程超时会关闭 //工作线程数超过核心线程数,timed=true timed = allowCoreThreadTimeOut || wc > corePoolSize; //如果工作线程数小于最大线程数且未超时未超过,则跳出循环不处理 if (wc <= maximumPoolSize && ! (timedOut && timed)) break; //如果工作线程数超过了最大线程数或且有线程在取任务超时,那么就关闭一个线程放弃执行, //ctl减1 if (compareAndDecrementWorkerCount(c)) return null; //重新读取ctl c = ctl.get(); //如果线程池状态发生了变化,重新遍历取任务 if (runStateOf(c) != rs) continue retry; } try { /** 三元运算符: 1:允许核心线程数超时或当前工作线程超过核心线程数,任务从队列中超时时间内取; 2: 如果工作线程数不超过核心线程数且核心线程不超时,则从队列中阻塞取任务; */ Runnable r = timed ? //线程在超时时间内取队列中第一个任务, //如果超时,还没有可用任务,则返回null workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //没有超过核心线程数,则使核心线程一直挂起,阻塞,等待任务的到来 workQueue.take(); if (r != null) return r; //如果没有任务,则timeOut设置成true,回到上面的判断,就会关闭掉线程 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
总结一下getTask方法:
1:在一个worker内部,同一个Thread里面一直循环遍历取出阻塞队列中的所有任务并执行; 2:如果线程池调用了shutdown,阻塞队列不为空,那么会继续执行完剩下的任务; 3:如果工作线程数大于核心线程数或者超时线程从队列中取不到任务,那么就会关闭当前线程;源码分析就到此为止了,整体来总结一下:
1:新任务被请求执行时,如果工作线程不超过核心线程数,那么新建一个线程来执行; 2:如果工作线程超过了核心线程,则将新任务尝试添加到阻塞队列并且去执行; 3:如果核心线程数已满,阻塞队列已满,那么就尝试新建一个线程去执行;如果执行失败,代表工作线程已经大于等于最大线程数,那么就会执行拒绝策略; 4:如果工作线程大于核心线程,线程取任务超时,那么该线程放弃执行任务并关闭当前线程; 5:如果工作线程小于核心线程,阻塞度列为空,那么该线程阻塞挂起,直到有任务进来; 4:如果线程池调用了shutdown,阻塞队列不为空,那么就继续执行队列中的任务,此时拒绝新任务; 5:如果调用了shutdownNow,则在getTask()时直接返回null,放弃执行,关闭当前线程;好了,多线程篇先到这里为止。
转载地址:http://vikmi.baihongyu.com/