ThreadPoolExecutor 源码分析
理解 ThreadPoolExecutor
的源码是掌握 Java 线程池工作原理的关键。以下是其核心源码逻辑的深入分析,结合设计思想和关键代码片段。
一、核心设计思想
-
状态与线程数合并存储
使用AtomicInteger
类型的ctl
变量,高 3 位存储线程池状态,低 29 位存储工作线程数(workerCount
),通过位运算高效管理。private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; // 29 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 最大线程数 (2^29-1) // 状态定义(高3位) private static final int RUNNING = -1 << COUNT_BITS; // 111 private static final int SHUTDOWN = 0 << COUNT_BITS; // 000 private static final int STOP = 1 << COUNT_BITS; // 001 private static final int TIDYING = 2 << COUNT_BITS; // 010 private static final int TERMINATED = 3 << COUNT_BITS; // 011 // 获取状态和线程数 private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; }
-
任务队列与工作线程解耦
- 任务队列 (
workQueue
):存储待执行的Runnable
任务,实现生产者-消费者模式。 - 工作线程 (
Worker
):封装Thread
和任务执行逻辑,通过AQS
实现简单锁。
- 任务队列 (
二、核心源码分析
1. 任务提交:execute()
方法
public void execute(Runnable command) {
if (command == null) throw new NullPointerException();
int c = ctl.get();
// 1. 当前线程数 < corePoolSize,创建新线程(核心线程)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // true表示使用corePoolSize限制
return;
c = ctl.get(); // 创建失败后重新获取状态
}
// 2. 任务入队(线程池处于RUNNING状态)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 双重检查:若线程池已关闭,则回滚任务并拒绝
if (!isRunning(recheck) && remove(command))
reject(command);
// 若线程数为0(如corePoolSize=0),则创建非核心线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 队列已满,尝试创建非核心线程(不超过maximumPoolSize)
else if (!addWorker(command, false))
reject(command); // 触发拒绝策略
}
2. 工作线程管理:Worker
类
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread; // 实际执行任务的线程
Runnable firstTask; // 初始任务(可能为null)
Worker(Runnable firstTask) {
setState(-1); // 防止线程未启动时被中断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 通过ThreadFactory创建线程
}
public void run() {
runWorker(this); // 核心执行逻辑
}
// ... 其他方法(如锁操作)
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许中断
boolean completedAbruptly = true;
try {
// 循环从队列中获取任务(getTask()方法)
while (task != null || (task = getTask()) != null) {
w.lock();
// 检查线程池状态,若处于STOP则中断线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 钩子方法(子类可扩展)
try {
task.run(); // 执行任务
afterExecute(task, null); // 钩子方法
} catch (Throwable ex) {
afterExecute(task, ex); // 处理异常
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 线程退出处理(回收或替换)
}
}
3. 任务获取:getTask()
方法
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 1. 线程池已关闭且队列为空,返回null(线程退出)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 2. 判断是否允许超时回收线程:
// - 允许核心线程超时(allowCoreThreadTimeOut为true)
// - 当前线程数超过corePoolSize
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 3. 线程数超过maximumPoolSize或需要回收线程时,减少线程数
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 4. 从队列中获取任务(支持超时)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true; // 超时标记
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
三、关键机制解析
1. 线程池状态转换
- RUNNING → SHUTDOWN:调用
shutdown()
,停止接受新任务,继续处理队列任务。 - RUNNING/SHUTDOWN → STOP:调用
shutdownNow()
,中断所有线程并清空队列。 - SHUTDOWN/STOP → TIDYING:当所有线程终止且队列为空时自动转换。
- TIDYING → TERMINATED:调用
terminated()
钩子方法后完成终止。
2. 线程回收逻辑
- 核心线程:默认不回收(
allowCoreThreadTimeOut=false
),除非显式设置允许。 - 非核心线程:空闲超过
keepAliveTime
后被回收。
3. 拒绝策略触发条件
- 线程池非RUNNING状态。
- 队列已满且线程数达到
maximumPoolSize
。
四、设计精髓总结
- 状态压缩与原子操作:通过
ctl
变量高效管理状态和线程数,减少锁竞争。 - Worker与AQS:利用
AQS
实现简单锁,控制任务执行期间的中断。 - 任务获取策略:
getTask()
方法通过poll
/take
实现灵活的任务获取和超时控制。 - 动态参数调整:支持运行时修改
corePoolSize
和maximumPoolSize
(通过setCorePoolSize()
等方法)。
五、使用注意事项
- 合理配置参数:根据任务类型(CPU密集型 vs IO密集型)调整
corePoolSize
和队列类型。 - 避免任务堆积:使用有界队列防止内存溢出,并设置合理的拒绝策略。
- 异常处理:通过
afterExecute()
或UncaughtExceptionHandler
捕获任务异常,避免线程泄漏。