当前位置: 首页 > article >正文

ThreadPoolExecutor 源码分析

理解 ThreadPoolExecutor 的源码是掌握 Java 线程池工作原理的关键。以下是其核心源码逻辑的深入分析,结合设计思想和关键代码片段。


一、核心设计思想

  1. 状态与线程数合并存储
    使用 AtomicInteger 类型的 ctl 变量,高 3 位存储线程池状态低 29 位存储工作线程数workerCount),通过位运算高效管理。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3; // 29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 最大线程数 (2^29-1)
    
    // 状态定义(高3位)
    private static final int RUNNING    = -1 << COUNT_BITS; // 111
    private static final int SHUTDOWN   =  0 << COUNT_BITS; // 000
    private static final int STOP       =  1 << COUNT_BITS; // 001
    private static final int TIDYING    =  2 << COUNT_BITS; // 010
    private static final int TERMINATED =  3 << COUNT_BITS; // 011
    
    // 获取状态和线程数
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    
  2. 任务队列与工作线程解耦

    • 任务队列 (workQueue):存储待执行的 Runnable 任务,实现生产者-消费者模式。
    • 工作线程 (Worker):封装 Thread 和任务执行逻辑,通过 AQS 实现简单锁。

二、核心源码分析

1. 任务提交:execute() 方法
public void execute(Runnable command) {
    if (command == null) throw new NullPointerException();
    int c = ctl.get();
    
    // 1. 当前线程数 < corePoolSize,创建新线程(核心线程)
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true)) // true表示使用corePoolSize限制
            return;
        c = ctl.get(); // 创建失败后重新获取状态
    }
    
    // 2. 任务入队(线程池处于RUNNING状态)
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 双重检查:若线程池已关闭,则回滚任务并拒绝
        if (!isRunning(recheck) && remove(command))
            reject(command);
        // 若线程数为0(如corePoolSize=0),则创建非核心线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. 队列已满,尝试创建非核心线程(不超过maximumPoolSize)
    else if (!addWorker(command, false))
        reject(command); // 触发拒绝策略
}
2. 工作线程管理:Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread; // 实际执行任务的线程
    Runnable firstTask;  // 初始任务(可能为null)

    Worker(Runnable firstTask) {
        setState(-1); // 防止线程未启动时被中断
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this); // 通过ThreadFactory创建线程
    }

    public void run() {
        runWorker(this); // 核心执行逻辑
    }

    // ... 其他方法(如锁操作)
}

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 允许中断
    boolean completedAbruptly = true;
    try {
        // 循环从队列中获取任务(getTask()方法)
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 检查线程池状态,若处于STOP则中断线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task); // 钩子方法(子类可扩展)
                try {
                    task.run(); // 执行任务
                    afterExecute(task, null); // 钩子方法
                } catch (Throwable ex) {
                    afterExecute(task, ex); // 处理异常
                    throw ex;
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly); // 线程退出处理(回收或替换)
    }
}
3. 任务获取:getTask() 方法
private Runnable getTask() {
    boolean timedOut = false;
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 1. 线程池已关闭且队列为空,返回null(线程退出)
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);
        // 2. 判断是否允许超时回收线程:
        //    - 允许核心线程超时(allowCoreThreadTimeOut为true)
        //    - 当前线程数超过corePoolSize
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 3. 线程数超过maximumPoolSize或需要回收线程时,减少线程数
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 4. 从队列中获取任务(支持超时)
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true; // 超时标记
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

三、关键机制解析

1. 线程池状态转换
  • RUNNING → SHUTDOWN:调用 shutdown(),停止接受新任务,继续处理队列任务。
  • RUNNING/SHUTDOWN → STOP:调用 shutdownNow(),中断所有线程并清空队列。
  • SHUTDOWN/STOP → TIDYING:当所有线程终止且队列为空时自动转换。
  • TIDYING → TERMINATED:调用 terminated() 钩子方法后完成终止。
2. 线程回收逻辑
  • 核心线程:默认不回收(allowCoreThreadTimeOut=false),除非显式设置允许。
  • 非核心线程:空闲超过 keepAliveTime 后被回收。
3. 拒绝策略触发条件
  • 线程池非RUNNING状态。
  • 队列已满且线程数达到 maximumPoolSize

四、设计精髓总结

  1. 状态压缩与原子操作:通过 ctl 变量高效管理状态和线程数,减少锁竞争。
  2. Worker与AQS:利用 AQS 实现简单锁,控制任务执行期间的中断。
  3. 任务获取策略getTask() 方法通过 poll/take 实现灵活的任务获取和超时控制。
  4. 动态参数调整:支持运行时修改 corePoolSizemaximumPoolSize(通过 setCorePoolSize() 等方法)。

五、使用注意事项

  1. 合理配置参数:根据任务类型(CPU密集型 vs IO密集型)调整 corePoolSize 和队列类型。
  2. 避免任务堆积:使用有界队列防止内存溢出,并设置合理的拒绝策略。
  3. 异常处理:通过 afterExecute()UncaughtExceptionHandler 捕获任务异常,避免线程泄漏。

http://www.kler.cn/a/590771.html

相关文章:

  • 【设计模式】3W 学习法深入剖析创建型模式:原理、实战与开源框架应用(含 Java 代码)
  • 电动车出入库管理软件,电动车维修保养售后服务管理系统,佳易王电动车店管理系统操作教程
  • OpenResty/Lua 编码指南/指南
  • 7、vue3做了什么
  • 3.17学习总结
  • vue3 引入element-plus组件后,发现输入的时候没有提示,而且鼠标移到el-button显示unknown的简单解决方法
  • 4.angular 服务
  • SpringBoot第三站:配置嵌入式服务器使用外置的Servlet容器
  • 【软件系统架构】单体架构
  • Android Fresco 框架兼容模块源码深度剖析(六)
  • Pygame实现记忆拼图游戏10
  • 【Deepseek进阶篇】--4.科研运用
  • Softmax 函数简介及其Python实现
  • 【Matlab GUI】封装matlab GUI为exe文件
  • Nuxt2 vue 给特定的页面 body 设置 background 不影响其他页面
  • vue3中如何实现路由导航跳转
  • 蓝桥杯 矩形拼接 10分题
  • R语言——变量
  • ESP32中的内存架构
  • 【TVM教程】使用自定义调度规则(Sketch Rule)在 CPU 上自动调度稀疏矩阵乘法