Java 线程池如何实现 -- 解读 ThreadPoolExecutor
线程池如何实现 – 解读 ThreadPoolExecutor
线程池的职责就是用一组线程去执行一堆任务。
即线程池会包含 一组线程,一堆任务。
所以设计线程池一般包含三个部分:
- 管理线程池的线程管理器 - threadPool manager
- 存放任务堆的队列 - task queque
- 执行任务的线程组 - worker list
职责分离
关键要将线程的管理和任务执行分开,这样职责就会比较清晰
线程管理器的职责(主线程执行)
主线程构建 worker 线程池,保存任务,启动worker循环获取任务并执行
- 初始化,根据配置
- 提交任务
- 保存任务到task queue
- 创建worker 线程
- 启动worker 线程
- 保存worker 线程列表以形成线程池
- 统计维护worker线程的数量
- 管理 线程管理器 本身的状态,如RUNNING/SHUTDOWN/STOP
例如SHUTDOWN了就不再接受新的task
worker线程的职责 (worker 线程执行)
-
获取任务
创建worker 时传递的firstTask,或者是循环从task queue 中获取的task。 -
执行任务
-
任务完成线程退出
如果从task queue 中获取不到task 了,就退出 step1 step2 循环,这个线程也结束了。
这里有些tricky 的是 通过获取任务(getTask())来控制线程是否退出的,如果没有任务返回,就表示线程要退出了。并且当线程退出时会修改保存当前线程数量的变量来减少worker线程的数量。
但是没有任务返回的条件有很多,大体是为- 线程池SHUTDOWN
- task queue 为空,且 worker线程数量 > corePoolSize
注意,默认情况下,corePoolSize 大小的worker线程是不会退出的,会一直阻塞在从 task queue 中获取task,直到获取到任务来执行。
具体实现可以查看 ThreadPoolExecutor.getTask() method
-
将当前worker,即自身从worker列表中移除。即worker不再有引用,之后会被GC销毁回收。
Q&A
ThreadPoolExecutor 的关键方法是哪些?
execute
: 实现了主流程,即task 优先使用核心线程,再队列,队列满后增加最大线程。 而且也实现了如果线程数量达到最大,或线程池已经SHUTDOWN,则reject 新task。
addWorker
: 添加新的worker线程,将保存线程数的变量加一,且将worker保存到队列中,最后启动线程。
runWorker
: 这个就是worker线程启动后执行的方法,是worker线程主要做的实现,参考章节 worker线程的职责。
getTask
: 被 runWorker
方法调用去获取task。 通过返回null 来控制线程是否退出。 也通过这个方法来实现 keepAliveTime
timeout。参考下面的问题。
如何实现让大于corePoolSize的线程在超过keepAliveTime的idle时间后退出呢?
allowCoreThreadTimeOut
表示是否应该让corePoolSize的threads在空闲的时候,经过keepAliveTime
之后 timeout 退出,默认是false
。
如下代码,当 线程数 wc > corePoolSize
的时候,就会执行 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
去取task,如果超时了就会重新循环执行,这个时候就会进入 compareAndDecrementWorkerCount
分支,减少线程数一,然后返回null。 返回null 之后,线程就会结束退出了。
因为 poll 当没有task 时会阻塞 keepAliveTime
这么久,所以就相当于线程在idle 的时候 keep alive 了 keepAliveTime
这么久。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
......
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
......
}
}
如何保存和统计线程数量的?
在 ThreadPoolExecutor 类中有下面这些变量,用来标记当前的 threadPool 是什么状态。
final int COUNT_BITS = Integer.SIZE - 3; // 29
final int RUNNING = -1 << COUNT_BITS; // 1110 0000 0000 0000 0000 0000 0000 0000
final int SHUTDOWN = 0 << COUNT_BITS; // 0000 0000 0000 0000 0000 0000 0000 0000
final int STOP = 1 << COUNT_BITS; // 0010 0000 0000 0000 0000 0000 0000 0000
ThreadPoolExecutor 状态变量RUNNING/SHUTDOWN 等用 高位的 最后三位来表示状态。 然后用低位来保存这个状态下的线程数量。
例如下面的例子,RUNNING 状态下的一条active线程的表示方式是 1110 0000 0000 0000 0000 0000 0000 0001
final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
final int COUNT_MASK = (1 << COUNT_BITS) - 1; // 0001 1111 1111 1111 1111 1111 1111
int workerCountOf(int c) { return c & COUNT_MASK; }
int ctlOf(int rs, int wc) { return rs | wc; }
boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
int c = ctl.get();
compareAndIncrementWorkerCount(c); // ctl will because 1110 0000 0000 0000 0000 0000 0000 0001
int count = workerCountOf(c); // worker thread count is 1
线程的异常管理
非线程池中的线程,如下面代码中t
, 异常是被UncaughtExceptionHandler
处理的,可以查看这个接口的说明。
When a thread is about to terminate due to an uncaught exception the
Java Virtual Machine
will query the thread for itsUncaughtExceptionHandler
usinggetUncaughtExceptionHandler
and will invoke the handler’suncaughtException
method, passing the thread and the exception as arguments.
因为t
没有设置UncaughtExceptionHandler
, 所以会用到默认的UncaughtExceptionHandler
去处理这个异常。 就是打印exception stack 出来。
但是对于在threadPool 中的线程,可以通过下面的代码看到,其实exception 是没有打印出来的。
这个是因为submit()
方法提交的task
被会封装成futureTask
object, 这个类的run()
方法会将task
返回的结果保存起来,或者catch住task
抛出来的异常,然后保存起来。
等调用submit()
方法返回的future
的get()
方法时返回结果或者异常。
所以线程池中的异常是不会被抛到主线程的,且也不会teminate线程池中执行task的线程。
var t = new Thread(() -> {
System.out.println("Exception is printed for thread not in pool as below:");
System.out.println(1/0);
});
t.start();
t.join(); // wait the thread to exit
System.out.println("\n");
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(
1, 1,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10));
var resultFuture = threadPoolExecutor.submit(() -> {
System.out.println("No exception printed in log for thread in pool\n");
throw new RuntimeException("Exception in threadPool For test");
});
threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS);
System.out.println("Get the result of the task, here is exception");
resultFuture.get();
// The output looks like below:
Exception is printed for thread not in pool as below:
Exception in thread "Thread-3" java.lang.ArithmeticException: / by zero
at org.predisw.learning.netty.Test.lambda$threadTest$0(Test.java:43)
at java.base/java.lang.Thread.run(Thread.java:842)
No exception printed in log for thread in pool
Get the result of the task, here is exception
java.lang.RuntimeException: Exception in threadPool For test
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Exception in threadPool For test
FutureTask.Java 的 run() 方法部分摘抄
public void run() {
......
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
......
}