当前位置: 首页 > article >正文

Java 线程池如何实现 -- 解读 ThreadPoolExecutor

线程池如何实现 – 解读 ThreadPoolExecutor

线程池的职责就是用一组线程去执行一堆任务。
即线程池会包含 一组线程,一堆任务。

所以设计线程池一般包含三个部分:

  • 管理线程池的线程管理器 - threadPool manager
  • 存放任务堆的队列 - task queque
  • 执行任务的线程组 - worker list

职责分离

关键要将线程的管理和任务执行分开,这样职责就会比较清晰

线程管理器的职责(主线程执行)

主线程构建 worker 线程池,保存任务,启动worker循环获取任务并执行

  1. 初始化,根据配置
  2. 提交任务
  3. 保存任务到task queue
  4. 创建worker 线程
  5. 启动worker 线程
  6. 保存worker 线程列表以形成线程池
  7. 统计维护worker线程的数量
  8. 管理 线程管理器 本身的状态,如RUNNING/SHUTDOWN/STOP
    例如SHUTDOWN了就不再接受新的task

worker线程的职责 (worker 线程执行)

  1. 获取任务
    创建worker 时传递的firstTask,或者是循环从task queue 中获取的task。

  2. 执行任务

  3. 任务完成线程退出
    如果从task queue 中获取不到task 了,就退出 step1 step2 循环,这个线程也结束了。
    这里有些tricky 的是 通过获取任务(getTask())来控制线程是否退出的,如果没有任务返回,就表示线程要退出了。并且当线程退出时会修改保存当前线程数量的变量来减少worker线程的数量。
    但是没有任务返回的条件有很多,大体是为

    • 线程池SHUTDOWN
    • task queue 为空,且 worker线程数量 > corePoolSize

    注意,默认情况下,corePoolSize 大小的worker线程是不会退出的,会一直阻塞在从 task queue 中获取task,直到获取到任务来执行。

    具体实现可以查看 ThreadPoolExecutor.getTask() method

  4. 将当前worker,即自身从worker列表中移除。即worker不再有引用,之后会被GC销毁回收。

Q&A

ThreadPoolExecutor 的关键方法是哪些?

execute: 实现了主流程,即task 优先使用核心线程,再队列,队列满后增加最大线程。 而且也实现了如果线程数量达到最大,或线程池已经SHUTDOWN,则reject 新task。

addWorker: 添加新的worker线程,将保存线程数的变量加一,且将worker保存到队列中,最后启动线程。

runWorker: 这个就是worker线程启动后执行的方法,是worker线程主要做的实现,参考章节 worker线程的职责。

getTask: 被 runWorker 方法调用去获取task。 通过返回null 来控制线程是否退出。 也通过这个方法来实现 keepAliveTime timeout。参考下面的问题。

如何实现让大于corePoolSize的线程在超过keepAliveTime的idle时间后退出呢?

allowCoreThreadTimeOut 表示是否应该让corePoolSize的threads在空闲的时候,经过keepAliveTime之后 timeout 退出,默认是false

如下代码,当 线程数 wc > corePoolSize 的时候,就会执行 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 去取task,如果超时了就会重新循环执行,这个时候就会进入 compareAndDecrementWorkerCount 分支,减少线程数一,然后返回null。 返回null 之后,线程就会结束退出了。

因为 poll 当没有task 时会阻塞 keepAliveTime 这么久,所以就相当于线程在idle 的时候 keep alive 了 keepAliveTime 这么久。


private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {

        ......

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
        
        ......
    }
}

如何保存和统计线程数量的?

在 ThreadPoolExecutor 类中有下面这些变量,用来标记当前的 threadPool 是什么状态。

final int COUNT_BITS = Integer.SIZE - 3; // 29

final int RUNNING    = -1 << COUNT_BITS; // 1110 0000 0000 0000 0000 0000 0000 0000
final int SHUTDOWN   =  0 << COUNT_BITS; // 0000 0000 0000 0000 0000 0000 0000 0000
final int STOP       =  1 << COUNT_BITS; // 0010 0000 0000 0000 0000 0000 0000 0000


ThreadPoolExecutor 状态变量RUNNING/SHUTDOWN 等用 高位的 最后三位来表示状态。 然后用低位来保存这个状态下的线程数量。

例如下面的例子,RUNNING 状态下的一条active线程的表示方式是 1110 0000 0000 0000 0000 0000 0000 0001

final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
final int COUNT_MASK = (1 << COUNT_BITS) - 1; // 0001 1111 1111 1111 1111 1111 1111
int workerCountOf(int c)  { return c & COUNT_MASK; }
int ctlOf(int rs, int wc) { return rs | wc; }


boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}

int c = ctl.get();
compareAndIncrementWorkerCount(c); // ctl will because 1110 0000 0000 0000 0000 0000 0000 0001

int count = workerCountOf(c); // worker thread count is 1

线程的异常管理

非线程池中的线程,如下面代码中t, 异常是被UncaughtExceptionHandler 处理的,可以查看这个接口的说明。

When a thread is about to terminate due to an uncaught exception the Java Virtual Machine will query the thread for its UncaughtExceptionHandler using getUncaughtExceptionHandler and will invoke the handler’s uncaughtException method, passing the thread and the exception as arguments.

因为t 没有设置UncaughtExceptionHandler, 所以会用到默认的UncaughtExceptionHandler去处理这个异常。 就是打印exception stack 出来。

但是对于在threadPool 中的线程,可以通过下面的代码看到,其实exception 是没有打印出来的。
这个是因为submit()方法提交的task被会封装成futureTask object, 这个类的run()方法会将task返回的结果保存起来,或者catch住task抛出来的异常,然后保存起来。
等调用submit()方法返回的futureget()方法时返回结果或者异常。

所以线程池中的异常是不会被抛到主线程的,且也不会teminate线程池中执行task的线程。

    var t = new Thread(() -> {
      System.out.println("Exception is printed for thread not in pool as below:");
      System.out.println(1/0);
    });
    t.start();
    t.join(); // wait the thread to exit

    System.out.println("\n");

    ExecutorService threadPoolExecutor = new ThreadPoolExecutor(
        1, 1,
        60, TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(10));

    var resultFuture = threadPoolExecutor.submit(() -> {
      System.out.println("No exception printed in log for thread in pool\n");
      throw new RuntimeException("Exception in threadPool For test");
    });
    threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS);

    System.out.println("Get the result of the task, here is exception");
    resultFuture.get(); 


// The output looks like below:
Exception is printed for thread not in pool as below:
Exception in thread "Thread-3" java.lang.ArithmeticException: / by zero
    at org.predisw.learning.netty.Test.lambda$threadTest$0(Test.java:43)
    at java.base/java.lang.Thread.run(Thread.java:842)


No exception printed in log for thread in pool

Get the result of the task, here is exception

java.lang.RuntimeException: Exception in threadPool For test
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Exception in threadPool For test

FutureTask.Java 的 run() 方法部分摘抄

    public void run() {
        ......
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
        ......        
}

http://www.kler.cn/a/464305.html

相关文章:

  • 【MongoDB详解】
  • 从摩托罗拉手机打印短信的简单方法
  • 金融租赁系统的创新与发展推动行业效率提升
  • ceph文件系统
  • 【VBA】EXCEL - VBA 遍历工作表的 5 种方法,以及注意事项
  • 丢弃法hhhh
  • 【LeetCode】827、最大人工岛
  • OpenCV计算机视觉 03 椒盐噪声的添加与常见的平滑处理方式(均值、方框、高斯、中值)
  • 学成在线:前端开发工程师区域(其他区域类似) ,版权区域
  • 《一文读懂PyTorch核心模块:开启深度学习之旅》
  • 通过 4 种方式快速将音乐从 iPod 传输到 Android
  • SpringAOP之日志和身份验证
  • salesforce addmonth()
  • 5G+工业互联网”迎来新机遇,CES Asia 2025见证产业腾飞
  • 操作014:惰性队列
  • 【PCIe 总线及设备入门学习专栏 4.1 -- PCI 总线的地址空间分配】
  • 福建科立讯通信有限公司指挥调度send_fax.php存在任意文件上传漏洞
  • Fabric环境部-Git和Node安装
  • 《计算机网络》(B)复习
  • MB31零收货处理批次物料:M7425 不能设置货物移动的最后交货标志
  • 【第二部分--Python之基础】03 容器类型的数据
  • 计算机的错误计算(一百九十九)
  • 腾讯视频Python爬虫项目实战
  • Dubbo 核心知识全解析:原理、流程与关键机制
  • leetcode hot 小偷
  • 汽车基础软件AutoSAR自学攻略(二)-AutoSAR CP分层架构(1)