ThreadPool线程池源码解析
ThreadPool线程池源码解析
文章目录
- 前言
- 一、基本使用
- 二、执行流程
- 三、源码分析
- ThreadPoolExecutor 中重要属性
- ThreadPoolExecutor 内部类Worker
- execute()方法
- addWorker(command, true)方法
- runWorker(worker )方法
- getTask()方法
- shutdown和shutdownNow
- 四、线程池异常处理
- 总结
前言
- 如何实现线程复用的?
- 先提交的任务一定会先执行吗?
- 线程池中的线程如何做到空闲一定时间退出的?
- 没有任务时候超过最大存活时间被销毁的是非核心线程?
- 在调用shutdown方法关闭线程池的时候,如何判断线程有没有在执行任务?
- shutdown和shutdownNow两个方法区别?
- 如何处理执行失败的任务?
本觉得自己已经懂线程池了,看到以上几个问题能否都能答上来,还是又觉得自己不懂了,一脸懵逼?如果你能回答上来那么你可以绕过了。
上面前几个问题都是原理概念问题,最后一个是实际使用中必须要面对的问题,当然你可以不面对,我相信这也是觉大多数人的做法,直接忽略,才不管有没有执行成功,我把任务提交到线程池有么有最终执行成功关我毛事。本篇中先对线程池源码进行分析,逐个解答如上问题,并以实际工作案例给出常用的任务失败处理方案。
一、基本使用
如上是我们再熟悉不过的代码了。
二、执行流程
如上执行流程我们已经背诵的滚瓜烂熟了。
三、源码分析
首先看下最基本的类继承关系,我们本篇以方法execute(Runnable)
作为切入分析,submit(Runnable)
是对这个方法做了下封装,所以我们还是从最基本的开始。
在开始分析前execute(Runnable)
方法之前先看下ThreadPoolExecutor相关重要的属性,有个大概印象就行了,遇到不清楚的再回过头来看。
ThreadPoolExecutor 中重要属性
public class ThreadPoolExecutor extends AbstractExecutorService {
... ...
//高3位:表示当前线程池运行状态 除去高3位之后的低位:表示当前线程池所拥有的线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 表示在ctl中,低COUNT_BITS位 用于存放当前线程数量的位
private static final int COUNT_BITS = Integer.SIZE - 3;
//低COUNT_BITS位 所能表达的最大数值
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//线程池状态:
//表示可接受新任务,且可执行队列中的任务;
private static final int RUNNING = -1 << COUNT_BITS;
//表示不接受新任务,但可执行队列中的任务;
private static final int SHUTDOWN = 0 << COUNT_BITS;
//表示不接受新任务,且不再执行队列中的任务,且中断正在执行的任务;
private static final int STOP = 1 << COUNT_BITS;
// 所有任务已经中止,且工作线程数量为0,最后变迁到这个状态的线程将要执行terminated()钩子方法,只会有一个线程执行这个方法;
private static final int TIDYING = 2 << COUNT_BITS;
//中止状态,已经执行完terminated()钩子方法;
private static final int TERMINATED = 3 << COUNT_BITS;
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
//任务队列,当线程池中的线程达到核心线程数量时,再提交任务 就会直接提交到 workQueue
private final BlockingQueue<Runnable> workQueue;
//线程池全局锁,增加worker 减少 worker 时需要持有mainLock , 修改线程池运行状态时,也需要。
private final ReentrantLock mainLock = new ReentrantLock();
//线程池中真正存放 worker->thread 的地方。
private final HashSet<Worker> workers = new HashSet<Worker>();
// 记录线程池生命周期内 线程数最大值
private int largestPoolSize;
// 记录线程池所完成任务总数
private long completedTaskCount;
// 创建线程会使用线程工厂
private volatile ThreadFactory threadFactory;
//拒绝策略
private volatile RejectedExecutionHandler handler;
//空闲线程存活时间,当allowCoreThreadTimeOut == false 时,会维护核心线程数量内的线程存活,超出部分会被超时。
private volatile long keepAliveTime;
//控制核心线程数量内的线程 是否可以被回收。true 可以,false不可以。
private volatile boolean allowCoreThreadTimeOut;
// 核心线程池数量
private volatile int corePoolSize;
// 线程池最大数量
private volatile int maximumPoolSize;
// 默认拒绝策略
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
... ...
}
关键代码解析:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
线程池状态具体是存在ctl
成员变量中,ctl
中不仅存储了线程池的状态还存储了当前线程池中线程数的大小。
//表示可接受新任务,且可执行队列中的任务;
private static final int RUNNING = -1 << COUNT_BITS;
//表示不接受新任务,但可执行队列中的任务;
private static final int SHUTDOWN = 0 << COUNT_BITS;
//表示不接受新任务,且不再执行队列中的任务,且中断正在执行的任务;
private static final int STOP = 1 << COUNT_BITS;
// 所有任务已经中止,且工作线程数量为0,最后变迁到这个状态的线程将要执行terminated()钩子方法,只会有一个线程执行这个方法;
private static final int TIDYING = 2 << COUNT_BITS;
//中止状态,已经执行完terminated()钩子方法;
private static final int TERMINATED = 3 << COUNT_BITS;
线程池内部有5个常量来代表线程池的五种状态:
- RUNNING:线程池创建时就是这个状态,能够接收新任务,以及对已添加的任务进行处理。
- SHUTDOWN:调用shutdown方法线程池就会转换成SHUTDOWN状态,此时线程池不再接收新任务,但能继续处理已添加的任务到队列中任务。
- STOP:调用shutdownNow方法线程池就会转换成STOP状态,不接收新任务,也不能继续处理已添加的任务到队列中任务,并且会尝试中断正在处理的任务的线程。
- TIDYING:SHUTDOWN 状态下,任务数为 0, 其他所有任务已终止,线程池会变为 TIDYING 状态。线程池在 SHUTDOWN 状态,任务队列为空且执行中任务为空,线程池会变为 TIDYING 状态。线程池在 STOP 状态,线程池中执行中任务为空时,线程池会变为 TIDYING 状态。
- TERMINATED:线程池彻底终止。线程池在 TIDYING 状态执行完 terminated() 方法就会转变为 TERMINATED 状态。
ThreadPoolExecutor 内部类Worker
线程池中的工作线程以Worker作为体现,Worker也就是线程池中的线程,只是给线程包装了下,Worker中包含工作线程和要执行的任务。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
private static final long serialVersionUID = 6138294804551838833L;
// worker内部封装的工作线程
final Thread thread;
//当worker启动后thread线程会优先执行firstTask,当执行完firstTask后,会到queue中去获取下一个任务。
Runnable firstTask;
// 记录当前worker所完成的任务数量
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
其中构造函数很重要,firstTask即为我们提交的任务,newThread(this)将自身传入,我们从上面的代码中可以看到Worker implements Runnable,即线程会执行Worker类的run()方法,
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
run()中会执行外部类的runWorker方法。
public void run() {
runWorker(this);
}
execute()方法
public class ThreadPoolExecutor extends AbstractExecutorService {
... ...
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取ctl的值
int c = ctl.get();
// 当前线程数小于核心线程池数量,此次提交任务,直接创建一个新的worker
if (workerCountOf(c) < corePoolSize) {
// addWorker 即为创建线程的过程,会创建worker对象,并且将command作为firstTask
// core==true 表示创建的线程为核心线程,false非核心线程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 执行到这里有几种情况?
// 1.当前线程池数量已经达到corePoolSize
// 2. addWorker失败
// 当前线程池处于running状态,尝试将task放入到workQueue中
if (isRunning(c) && workQueue.offer(command)) {
// 获取当前ctl
int recheck = ctl.get();
// !isRunning()成功,代表当你提交到任务队列后,线程池状态被外部线程给修改,例如调用了shutDown(),shutDownNow()
// remove成功,提交之后,线程池中的线程还没消费
// remove 失败,说明在shutDown或者shutDown之前,就被线程池的线程给处理了
if (! isRunning(recheck) && remove(command))
reject(command);
// 当前线程池是running状态,
else if (workerCountOf(recheck) == 0)
// 如果当前没有线程,就添加一个线程保证当前至少有一个线程存在
addWorker(null, false);
}
//执行到这里,有几种情况?
//1.offer失败:offer失败,需要做什么? 说明当前queue 满了!这个时候如果当前线程数量尚未达到maximumPoolSize的话,会创建新的worker直接执行command
//2.当前线程池是非running状态:线程池状态为非running状态,这个时候因为 command != null addWorker 一定是返回false。
else if (!addWorker(command, false))
reject(command);
}
... ...
}
execute方法的执行流程大致可以分为以下几步:
- 工作线程数量小于核心数量,创建核心线程;
- 达到核心数量,进入任务队列;
- 任务队列满了,创建非核心线程;
- 达到最大数量,执行拒绝策略;
execute方法中逻辑还是很清晰的。
首先我们看第一个场景,当前线程数小于核心线程数,直接创建一个新的worker。
也就是execute()
方法中的这块逻辑。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
addWorker(command, true)方法
//firstTask 可以为null,如线程池刚启动初始化核心线程,worker自动到queue中获取任务,如果不是null,则worker优先执行firstTask
//core 如果为true表示创建的为核心线程 false表示为非核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
// 自旋:判断当前线程池状态是否允许创建线程的事情
retry:
for (;;) {
// 获取当前ctl值
int c = ctl.get();
// 获取当前线程池运行状态
int rs = runStateOf(c);
// 判断当前线程池是否允许添加线程
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 内部自旋:获取创建线程令牌的过程
for (;;) {
//判断当前线程是否超过限制,超过限制就无法创建线程
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 通过cas将线程数量加1,能够成功加1相当于申请到创建线程的令牌
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
// 判断当前线程状态是否发生变化
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建work
w = new Worker(firstTask);
//将新创建的work节点的线程 赋值给t
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//持有全局锁,可能会阻塞,直到获取成功为止,同一时刻操纵 线程池内部相关的操作,都必须持锁。
mainLock.lock();
try {
//获取最新线程池运行状态保存到rs中
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将创建的work添加到线程池中
workers.add(w);
// 获取最新当前线程池线程数量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
if (workerAdded) {
// 添加work成功后,将创建的线程启动
t.start();
workerStarted = true;
}
}
} finally {
// 启动失败
if (! workerStarted)
// 释放令牌
// 将当前worker清理出workers集合
addWorkerFailed(w);
}
return workerStarted;
}
runWorker(worker )方法
从上面的addWorker方法可以看到创建了一个work添加到线程池中,然后调用start()方法,也就是调用Worker中的run方法,run方法中调用runWorker方法。
final void runWorker(Worker w) {
// 工作线程
Thread wt = Thread.currentThread();
// 任务
Runnable task = w.firstTask;
w.firstTask = null;
// 释放锁
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 取任务,如果有第一个任务,这里先执行第一个任务
// getTask:取任务
while (task != null || (task = getTask()) != null) {
// 加锁,是因为当调用shutDown方法它会判断当前是否加锁,加锁就会跳过它接着执行下一个任务
w.lock();
// 检查线程池状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 钩子方法,方便子类在任务执行前做一些处理
beforeExecute(wt, task);
Throwable thrown = null;
try {
//真正任务执行的地方,task可能是FutureTask 也可能是 普通的Runnable接口实现类。
//如果前面是通过submit()提交的 runnable/callable 会被封装成 FutureTask。
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 钩子方法,方便子类在任务执行后做一些处理
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
关键代码解析:
while (task != null || (task = getTask()) != null)
在我们线程池刚启动还没有创建核心线程时候,提交任务到进来就会创建核心线程,创建完核心线程,调用run方法,执行提交进来的任务,这时候task也就是我们提交过来的任务,所以不为空进入循环体内,执行任务。执行完成之后又会进入上面的判断逻辑,这时候task==null了,所以进入getTask()方法从任务队列中获取任务来执行,此处也就是实现了线程复用。
w.lock();
在执行任务前要加锁,此处要结合shutDown方法来分析了,在shutDown中尝试w.tryLock()获取锁来判断线程是否在执行任务,
getTask()方法
private Runnable getTask() {
// 是否超时
boolean timedOut = false;
// 自旋
for (;;) {
int c = ctl.get();
// 线程池状态
int rs = runStateOf(c);
//当前程池状态是SHUTDOWN的时候会把队列中的任务执行完直到队列为空
// 线程池状态是stop时退出
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 获取工作线程数量
int wc = workerCountOf(c);
// 是否允许超时,有两种情况:
// 1. 是允许核心线程数超时,这种就是说所有的线程都可能超时
// 2. 是工作线程数大于了核心数量,这种肯定是允许超时的
// 注意,非核心线程是一定允许超时的,这里的超时其实是指取任务超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 真正取任务的地方
// 默认情况,只有当工作线程数量大于核心线程数量时,才会调用poll方法触发超时调用
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
// 取到任务就返回
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
关键代码解析:
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
BlockingQueue:
1)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null
2)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
当允许核心线程超时,或者当前线程数超过最大线程数时采用poll(time)方法获取任务,获取不到timedOut = true,在下一个for循环中就将此线程杀死了,这么来看,被杀死的线程不一定是非核心线程。
当当前线程小于核心线程数使用take()方法会一直阻塞直到取到任务或抛出中断异常。
上面我们就分析完成了第一场景:当前线程数未达到核心线程数时候创建核心线程,创建完成核心线程后就处理我们提交的任务,处理完成后这个线程就去从阻塞队列阻塞获取任务去了。
下面我们通过一组图展现这个流程:
线程池刚创建出来是什么样子呢(未启动创建核心线程),如下图:
刚创建出来的线程池中只有一个构造时传入的阻塞队列而已,此时里面并没有的任何线程。
当有线程通过execute方法提交了一个任务,会发生什么呢?
提交任务的时候,其实会去进行任务的处理首先会去判断当前线程池的线程数是否小于核心线程数,也就是线程池构造时传入的参数corePoolSize。如果小于,那么就直接通过ThreadFactory创建一个线程来执行这个任务,如下图:
当任务执行完之后,线程不会退出,而是会去从阻塞队列中获取任务,如下图:
如上就是我们的第一个业务场景,核心线程为满提交任务时候的处理流程。从execute()
方法中如下代码可以看到,即使线程池中有核心线程没有“闲着”,没有在处理任务,在没有达到核心线程数时提交过来的任务都是新建线程来处理的。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
首先我们看第二个场景,当前线程数等于核心线程数,不再创建线程,而是将提交的任务提交到任务队列,也就是execute()
方法中的这块逻辑。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
代码很简单,其实当我们分析完第一个场景后线程池的提交任务源码核心主逻辑基本就已经分析完了,还是以图分析。
如果线程池里的线程数不再小于核心线程数呢?那么此时就会尝试将任务放入阻塞队列中,入队成功之后,如下图:
这样在阻塞的线程就可以获取到任务了。
但是,随着任务越来越多,队列已经满了,任务放入失败了,那怎么办呢?
接下来就到我们的第三个场景了,当前线程达到核心线程数,并且任务队列已经满,那么就要创建非核心线程了,也就是如下代码:
else if (!addWorker(command, false))
从这里可以发现,就算队列中有任务,新创建的线程还是优先处理这个提交的任务,而不是从队列中获取已有的任务执行,从这可以看出,先提交的任务不一定先执行。
第三个场景也就分析完了。
但是不幸的事发生了,线程数已经达到了最大线程数量,那么此时会怎么办呢?
下面分析第四个场景,队列已经满,并且已经达到最大线程数。
RejectedExecutionHandler的实现JDK自带的默认有4种AbortPolicy:
- 丢弃任务,抛出运行时异常CallerRunsPolicy:
- 由提交任务的线程来执行任务DiscardPolicy:
- 丢弃这个任务,但是不抛异常DiscardOldestPolicy:
- 从队列中剔除最先进入队列的任务,然后再次提交任务
shutdown和shutdownNow
线程池提供了shutdown和shutdownNow两个方法来关闭线程池。
shutdown方法:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
就是将线程池的状态修改为SHUTDOWN,然后尝试打断空闲的线程(如何判断空闲,上面在说Worker时候说过),也就是在阻塞等待任务的线程。
shutdownNow方法:
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
就是将线程池的状态修改为STOP,然后尝试打断所有的线程,从阻塞队列中移除剩余的任务,这也是为什么shutdownNow不能执行剩余任务的原因。
四、线程池异常处理
这段使用方式我相信好多人觉得是没问题的。我们特意在run()方法中构造一个异常,然鹅控制台并没有输出异常,这下知道问题了吧。
我们经常在系统优化时候使用线程池,将不重要的逻辑拆出来使用线程池异步执行,以降低业务接口响应时间,在task线程run()方法中一顿操作猛如虎,总之宗旨也是提交到了线程池就行了,我也不关心执行结果,不出问题还好,出了问题找都找不到,我们采用submit提交的任务,run()中如果不把我们处理业务逻辑try…catch了出问题都没痕迹。
如上是在工作中遇到的实际生产问题,为了提交接口响应将非核心逻辑采用异步执行,构建了个task编写了一坨处理逻辑放入run()方法中,也没做任何异常逻辑处理,然后run()方法中在操作数据库入表时候,由于数据库字段长度限制导致入表失败,查询日志么有任何报错异常,这让人怎么找问题?本来很简单的一个问题,结果找了很久,程序员就是互相挖坑的职业。
提交到线程池的任务,在执行编写的run()方法失败的时候怎么处理?对于多数人来说真的是不管…
那么如何处理呢?
(1)在提交的任务中将异常捕获并处理,不抛给线程池。
(2)异常抛给线程池,但是我们要及时处理抛出的异常。
第一种思路把业务逻辑都trycatch起来,缺点:所有的不同任务类型都要trycatch,增加了代码量,代码丑陋。
第二种思路又有多种实现方式
-
自定义线程池:自定义线程池,继承ThreadPoolExecutor并复写其afterExecute(Runnable r, Throwable t)方法。
-
实现Thread.UncaughtExceptionHandler接口:实现Thread.UncaughtExceptionHandler接口,实现void uncaughtException(Thread t, Throwable e);方法,并将该handler传递给线程池的ThreadFactory。
上面方式针对的都是通过execute(xx)的方式提交任务,如果你提交任务用的是submit()方法,应该使用下面的方式
- 采用Future模式:如果提交任务的时候使用的方法是submit,那么该方法将返回一个Future对象,所有的异常以及处理结果都可以通过future对象获取。 采用Future模式,将返回结果以及异常放到Future中,在Future中处理。
总结
拙技蒙斧正,不胜雀跃。