【Java并发编程】线程池详解
一、简介
随着计算机行业的飞速发展,摩尔定律逐渐失效,多核CPU成为主流。使用多线程并行计算逐渐成为开发人员提升服务器性能的基本武器。J.U.C提供的线程池:ThreadPoolExecutor 类,帮助开发人员管理线程并方便地执行并行任务。了解并合理使用线程池,是一个开发人员必修的基本功。
本文开篇简述线程池概念和用途,接着结合线程池的源码,帮助读者领略线程池的设计思路,最后回归实践,通过案例讲述使用线程池遇到的问题,并给出了一种动态化线程池解决方案。
1.1 线程池是什么
线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。
线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
当然,使用线程池可以带来一系列好处:
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池 ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
1.2 线程池解决的问题是什么
线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:
- 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
- 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
- 系统无法合理管理内部的资源分布,会降低系统的稳定性。
为解决资源分配这个问题,线程池采用了“池化”(Pooling)思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。“池化”思想不仅仅能应用在计算机领域,在金融、设备、人员管理、工作管理等领域也有相关的应用。
在计算机领域中的表现为:统一管理IT资源,包括服务器、存储、和网络资源等等。通过共享资源,使用户在低投入中获益。除去线程池,还有其他比较典型的几种使用策略包括:
- 内存池(Memory Pooling):预先申请内存,提升申请内存速度,减少内存碎片。
- 连接池(Connection Pooling):预先申请数据库连接,提升申请连接的速度,降低系统的开销。
- 实例池(Object Pooling):循环使用对象,减少资源在初始化和释放时的昂贵损耗。(如 Spring的BeanFactory)
二、线程池核心设计与实现
在前文中,我们了解到:线程池是一种通过“池化”思想,帮助我们管理线程而获取并发性的工具,在Java中的体现是 ThreadPoolExecutor 类。那么它的的详细设计与实现是什么样的呢?我们会在本章进行详细介绍。
2.1 线程池整体设计
Java中的线程池核心实现类是 ThreadPoolExecutor,本章基于JDK 1.8的源码来分析Java线程池的核心设计与实现。我们首先来看一下ThreadPoolExecutor 的UML类图,了解下ThreadPoolExecutor的继承关系。
ThreadPoolExecutor 实现的顶层接口是 Executor,顶层接口 Executor 提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由 Executor 框架完成线程的调配和任务的执行部分。
顶层接口 Executor 的子接口 ExecutorService 接口增加了一些能力:
- 扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;
- 提供了管控线程池的方法,比如停止线程池的运行。
ExecutorService 的实现类 AbstractExecutorService 则是 ThreadPoolExecutor 上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。
最下层的实现类 ThreadPoolExecutor 实现最复杂的运行部分,ThreadPoolExecutor 将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。ThreadPoolExecutor 是如何运行,如何同时维护线程和执行任务的呢?其运行机制如下图所示:
线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。
线程池的运行主要分成两部分:
- 任务管理 :充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:
- 直接申请线程执行该任务;
- 缓冲到队列中等待线程执行;
- 拒绝该任务。
- 线程管理 :线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。
接下来,我们会按照以下三个部分去详细讲解线程池运行机制:
- 线程池如何维护自身状态。
- 线程池如何管理任务。
- 线程池如何管理线程。
2.1.1 ThreadPoolExecutor 关键属性
public class ThreadPoolExecutor extends AbstractExecutorService {
// 控制变量-存放状态和线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 任务队列,必须是阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合
private final HashSet<Worker> workers = new HashSet<>();
// 全局锁
private final ReentrantLock mainLock = new ReentrantLock();
// awaitTermination方法使用的等待条件变量
private final Condition termination = mainLock.newCondition();
// 线程工厂,用于创建新的线程实例
private volatile ThreadFactory threadFactory;
// 拒绝执行处理器,对应不同的拒绝策略
private volatile RejectedExecutionHandler handler;
// 空闲线程等待任务的时间周期,单位是纳秒
private volatile long keepAliveTime;
// 是否允许核心线程超时,如果为true则keepAliveTime对核心线程也生效
private volatile boolean allowCoreThreadTimeOut;
// 核心线程数
private volatile int corePoolSize;
// 线程池容量
private volatile int maximumPoolSize;
//...... 省略其他代码
}
2.1.2 ThreadPoolExcutor构造方法
构造方法如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize:核心线程数量,会一直存在,除非allowCoreThreadTimeOut设置为true;刚刚创建ThreadPoolExecutor的时候,线程并不会立即创建,而是要等到有任务提交时才会创建,除非调用了prestartCoreThread/prestartAllCoreThreads事先创建核心线程。
- maximumPoolSize:线程池允许的最大线程池数量。
- keepAliveTime:线程数量超过corePoolSize,空闲线程的最大超时时间。
- unit:超时时间的单位。
- workQueue:工作队列(阻塞队列BlockingQueue),保存未执行的Runnable 任务。推荐使用有界队列,有界队列有助于避免资源耗尽的情况发生。
- threadFactory:创建线程的工厂类。可使用Executors.defaultThreadFactory();返回的是DefaultThreadFactory对象,DefaultThreadFactory是Executors的静态内部类。
- handler:拒绝策略;当线程已满,工作队列也满了的时候,会被调用。
- ThreadPoolExecutor.AbortPolicy :丢弃任务并抛出RejectedExecutionException异常。【默认】
- ThreadPoolExecutor.DiscardPolicy :丢弃任务,但是不抛出异常。如果线程队列已满,则后续提交的任务都会被丢弃,且是静默丢弃。
- ThreadPoolExecutor.DiscardOldestPolicy :丢弃队列最前面的任务,然后重新提交被拒绝的任务。
- ThreadPoolExecutor.CallerRunsPolicy :由调用线程处理该任务【一般为主线程】。此时主线程将在一段时间内不能提交任何任务,从而使工作线程处理正在执行的任务。此时提交的线程将被保存在TCP队列中,TCP队列满将会影响客户端,这是一种平缓的性能降低
- 自定义拒绝策略 :实现 RejectedExecutionHandler 接口。
2.2 生命周期管理
线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起,内部使用一个 AtomicInteger 类型的变量 ctl 维护两个值:
- 运行状态(runState) :线程池的运行状态,高3位保存。
- 线程数量 (workerCount) :线程池内有效线程的数量,低29位保存。
用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算方式,相比于基本运算,速度也会快很多。
关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;//29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
- 高位111为 RUNNING (注:RUNNING为负数,最小;负数在计算机中以二进制补码表示)
- 高位000为SHUTDOWN
- 高位001表示STOP
- 高位010表示TIDYING
- 高位100表示TERMINATED
- workerCountOf() 方法用来取低29位的数值,返回线程池的线程数。
- runStateOf() 方法则用来取高3位的数值,返回当前线程池的状态。
ThreadPoolExecutor的运行状态有5种,分别为:
其生命周期转换如下入所示:
2.3 任务执行机制
2.3.1 任务调度——execute方法
任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。了解这部分就相当于了解了线程池的核心运行机制。
首先,所有任务的调度都是由 execute() 方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。源码如下:
#ThreadPoolExecutor
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//当前线程数是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//执行addWork,提交为核心线程,提交成功return。提交失败重新获取ctl
//注意此处addWorker(command, true)返回false的情况:
// 1.线程池状态判断
// 2.线程数判断:wc >= (core ? corePoolSize : maximumPoolSize))
// 即当线程数>=核心线程数,返回false
if (addWorker(command, true))
return;
c = ctl.get();
}
// 上面是当线程数量小于corePoolSize的时候,下面是针对线程数量大于等于corePoolSize
// 线程池处于运行状态 && 任务添加到队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次check一下,当前线程池是否是运行状态,如果不是运行时状态,则把刚刚添加到workQueue中的command移除掉
if (!isRunning(recheck) && remove(command))
reject(command); //调用拒绝策略
else if (workerCountOf(recheck) == 0) // 如果【当前活动的线程数】为0,则执行addWork方法创建
addWorker(null, false);
}
//线程池关闭或者往workQueue提交任务失败,则尝试创建非核心线程,若创建失败则reject任务
//注意此处addWorker(command, false)返回false的情况:
// 1.线程池状态判断
// 2.线程数判断:wc >= (core ? corePoolSize : maximumPoolSize))
// 即当线程数>=最大线程数,返回false
else if (!addWorker(command, false))
reject(command);
}
其执行过程如下:
- 如果 workerCount < corePoolSize,则调用 addWorker(command, true) 方法创建并启动一个核心线程来执行新提交的任务:
- 创建成功,返回true,直接return ;
- 创建失败,返回false,继续往下执行(当workerCount >= corePoolSize,addWorker(firstTask, true) 方法返回false)。addWorker创建失败有多种情况:
- 线程池状态判断;
- 线程数判断:wc >= (core ? corePoolSize : maximumPoolSize)) ,即当workerCount >= corePoolSize,返回false
- 如果 workerCount >= corePoolSize :
- 若线程池是否处于 RUNNING 状态,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。加入阻塞队列后会再次检查线程池运行状态:
- 如果不是运行状态,则把刚刚添加到 workQueue 中的 command 移除掉并调用拒绝策略;
- 如果是运行状态,则判断当前活动线程数是否为0,为0则创建并启动一个非核心线程并且传入的任务对象为null。
- 若线程池关闭或者往 workQueue 提交任务失败,则尝试调用 addWorker(command, false) 创建非核心线程,若创建失败则reject任务。addWorker创建失败有多种情况:
- 线程池状态判断;
- 线程数判断:wc >= (core ? corePoolSize : maximumPoolSize)) ,即当workerCount >= maximumPoolSize,返回false
- 若线程池是否处于 RUNNING 状态,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。加入阻塞队列后会再次检查线程池运行状态:
2.3.2 任务缓冲——阻塞队列BlockingQueue
2.3.2.1 简单介绍 BlockingQueue
1)Queue
对于Queue我们比较熟悉,队列是一个先进先出的数据结构,可以往队尾加入元素,从队头获取或弹出一个元素,常见的方法有:
- boolean add(E element) :往队尾插入元素,如果超出capacity,则抛出异常
- boolean offer(E element) :往队尾插入元素,成功返回true,失败(capacity restrictions)返回false。
- E pool() :从对头弹出一个元素,如果队列为空,则返回null。
- E peek() :获取队头元素,但不弹出,如果队列为空,则返回null。
2)BlockingQueue
对于阻塞队列 BlockingQueue,因为继承了Queue接口,所以具备Queue的先进先出属性,但是也对一系列方法进行了增加了重写,常见方法有:
- boolan add(E element) :往队尾插入元素,如果超出capacity,则抛出异常。注意,此方法不会阻塞。 【ps:和Queue方法含义一致】
- boolean offer(E element) :往队尾插入元素,成功返回true,失败(capacity restrictions)返回false 注意,此方法不会阻塞。【ps:和Queue方法含义一致】
- void put(E element) :往队尾插入元素,阻塞到直至加入成功为止,因为阻塞的语义,此方法可能抛出InterruptedException。
- boolean offer(E element,long timeout,TimeUnit unit) :类似于put方法,但是只会阻塞指定时间,到时候没插入成功,则返回false。
- E take() :获取并弹出队头元素,阻塞直到成功为止
- E poll(long timeout,TimeUnit uint) :获取并弹出队头元素,只会阻塞指定时间,如果时间到,则返回null。
2.3.2.2 任务缓冲
任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:
- 在队列为空时,获取元素的线程会等待队列变为非空。
- 当队列满时,存储元素的线程会等待队列可用。
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
下图中展示了线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素:
使用不同的队列可以实现不一样的任务存取策略。在这里,我们可以再介绍下阻塞队列的成员:
2.3.3 任务申请——getTask方法
由上文的任务分配部分可知,任务的执行有两种可能:
- 任务直接由新创建的线程执行(execute(command)方法内):仅出现在线程初始创建的时候。
- 线程从阻塞队列中获取任务然后执行(runWorker()方法内的while循环里),执行完任务的空闲线程会再次去从队列中申请任务再去执行:是线程获取任务绝大多数的情况。
线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由 getTask() 方法实现,源码如下:
private Runnable getTask() {
// 表示上次从阻塞队列中获取任务是否超时
boolean timedOut = false;
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/**
* 同时满足如下两点,则线程池中工作线程数减1,并返回null
*
* 1> rs >= SHUTDOWN,表示线程池不是RUNNING状态
* 2> rs >= STOP 表示STOP、TIDYING和TERMINATED这三个状态,它们共同点就是【不接收新任务】也【不处理workQueue里的线程任务】or 阻塞队列workQueue为空
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); // 线程池中工作线程数减1
return null;
}
int wc = workerCountOf(c);
// timed用于判断是否需要进行超时控制,当allowCoreThreadTimeOut被设置为ture或者活跃线程数大于核心线程数,则需要进行超时控制
// allowCoreThreadTimeOut默认为false,则表明核心线程不允许超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 同时满足以下两种情况,则线程池中工作线程数减1并返回null:
* case1:当前活动线程数workCount大于最大线程数,或者需要超时控制(timed=true)并且上次从阻塞队列中获取任务发生了超时(timedOut=true)
* case2:如果有效线程数大于1,或者阻塞队列为空。
*/
if ((wc > maximumPoolSize // 因为在执行该方法的同时被执行了setMaximumPoolSize,导致最大线程数被缩小
|| (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) { //
if (compareAndDecrementWorkerCount(c)) { // 线程池中工作线程数减1
return null;
}
// 如果减1失败,则循环重试
continue;
}
try {
// 如果需要超时控制,则通过阻塞队列的poll方法进行超时控制,
// 否则,直接获取,如果队列为空,task方法会阻塞直到队列不为空
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // poll -->若队列为空,返回null
workQueue.take(); // take --> 若队列为空,发生阻塞,等待元素
if (r != null) {
return r;
}
// 如果r=null,表示超时了,则timeOut设置为true,标记为上一次超时状态
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
其执行流程如下图所示:
getTask() 这部分进行了多次判断,为的是控制线程的数量,使其符合线程池的状态。如果线程池现在不应该持有那么多线程,则会返回null值。工作线程 Worker 会不断接收新任务去执行,而当工作线程 Worker 接收不到任务的时候,就会开始被回收。
2.3.4 任务拒绝
任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到 maximumPoolSize 时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。
拒绝策略是一个接口,其设计如下:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
用户可以通过实现这个接口去定制拒绝策略,也可以选择JDK提供的四种已有拒绝策略,其特点如下:
2.4 Worker线程管理
2.4.1 Worker线程
线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。
首先在 ThreadPoolExecutor 中定义了一个 workers 集合去管理工作线程Worker ,代码如下:
private final HashSet<Worker> workers = new HashSet<Worker>();
我们来看一下工作线程Worker的部分代码:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;//Worker持有的线程,由ThreadFactory创建
Runnable firstTask;//初始化的任务,可以为null
// 记录每个线程完成的任务总数
volatile long completedTasks;
// 唯一的构造函数,传入任务实例firstTask,注意可以为null
Worker(Runnable firstTask) {
// 禁止线程中断,直到runWorker()方法执行
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 通过ThreadFactory创建线程实例,注意一下Worker实例自身作为Runnable用于创建新的线程实例
this.thread = getThreadFactory().newThread(this);
}
// 委托到外部的runWorker()方法,注意runWorker()方法是线程池的方法,而不是Worker的方法
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
// 是否持有独占锁,state值为1的时候表示持有锁,state值为0的时候表示已经释放锁
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 独占模式下尝试获取资源,这里没有判断传入的变量,直接CAS判断0更新为1是否成功,成功则设置独占线程为当前线程
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 独占模式下尝试释放资源,这里没有判断传入的变量,直接把state设置为0
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 加锁
public void lock() { acquire(1); }
// 尝试加锁
public boolean tryLock() { return tryAcquire(1); }
// 解锁
public void unlock() { release(1); }
// 是否锁定
public boolean isLocked() { return isHeldExclusively(); }
// 启动后进行线程中断,注意这里会判断线程实例的中断标志位是否为false,只有中断标志位为false才会中断
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker 这个工作线程,实现了 Runnable 接口,并持有一个线程 thread 和一个初始化的任务 firstTask :
- thread 是在调用构造方法时通过ThreadFactory 来创建的Worker所持有的线程(一个thread),可以用来执行任务【在addWorker()会调用thread.start()】;
- firstTask 用它来保存传入的第一个任务(一个 Runable),这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。
Worker执行任务的模型如下图所示:
线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。
Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。
- lock() 方法一旦获取了独占锁,表示当前线程正在执行任务中。
- 如果正在执行任务,则不应该中断线程。
- 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
- 线程池在执行 shutdown() 方法或 tryTerminate() 方法时会调用 interruptIdleWorkers() 方法来中断空闲的线程,interruptIdleWorkers() 方法会使用 tryLock() 方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。
在线程回收过程中就使用到了这种特性,回收过程如下图所示:
2.4.2 Worker线程增加——addWorker方法
增加线程是通过线程池中的 ThreadPoolExecutor#addWorker() 方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果。
addWorker(firstTask, core) 方法有两个参数:
- firstTask 参数 :用于指定新增的线程执行的第一个任务,该参数可以为空;
- core 参数 :为true表示在新增线程时会判断当前活动线程数是否小于 corePoolSize,false表示新增线程前需要判断当前活动线程数是否小于maximumPoolSize。
我们看看 addWorker() 方法的源码(提交任务时触发):
/**
* @param firstTask 需要执行的Runnable线程
* @param core true:新增线程时,【当前活动的线程数】是否 < corePoolSize
* false:新增线程时,【当前活动的线程数】是否 < maximumPoolSize
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
/**
* 步骤一:试图将workerCount+1
*/
for (;;) {
int c = ctl.get();
// 获得运行状态runState
int rs = runStateOf(c);
/**
* 只有如下两种情况可以新增worker,继续执行下去:
* case one: rs == RUNNING
* case two: rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()
*/
if (rs >= SHUTDOWN && // 即:非RUNNING状态(请查看isRunning()方法)。线程池异常,表示不再去接收新的线程任务了,返回false
/**
* 当线程池是SHUTDOWN状态时,表示不再接收新的任务了,所以:
* case1:如果firstTask!=null,表示要添加新任务,则:新增worker失败,返回false。
* case2:如果firstTask==null并且workQueue为空,表示队列中的任务已经处理完毕,不需要添加新任务了。则:新增worker失败,返回false
*/
!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
return false;
}
for (; ; ) {
// 获得当前线程池里的线程数
int wc = workerCountOf(c);
/**
* 满足如下任意情况,则新增worker失败,返回false
* case1:大于等于最大线程容量,即:int CAPACITY = 00011111111111111111111111111111 = 536870911(十进制)
* case2:当core是true时:>= 核心线程数
* 当core是false时:>= 最大线程数
*/
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
// 当前工作线程数加1
if (compareAndIncrementWorkerCount(c)) {
break retry; // 成功加1,则跳出retry标识的这两层for循环
}
// 如果线程数加1操作失败,则获取当前最新的线程池运行状态
c = ctl.get();
// 判断线程池运行状态(rs)是否改变;如果不同,则说明方法处理期间线程池运行状态发生了变化,重新获取最新runState
if (runStateOf(c) != rs) {
continue retry; // 跳出内层for循环,继续从第一个for循环执行
}
}
}
/**
* 步骤二:workerCount成功+1后,创建Worker,加入集合workers中,并启动Worker线程
*/
boolean workerStarted = false; // 用于判断新的worker实例是否已经开始执行Thread.start()
boolean workerAdded = false; // 用于判断新的worker实例是否已经被添加到线程池的workers队列中
Worker w = null; // AQS.Worker
try {
// 创建Worker实例,每个Worker对象都会针对入参firstTask来创建一个线程。
w = new Worker(firstTask);
// 从Worker中获得新建的线程t
final Thread t = w.thread;
if (t != null) {
// 重入锁
final ReentrantLock mainLock = this.mainLock;
/** ----------lock() 尝试加锁操作!!获得锁后继续执行,没获得则等待直到获得锁为止---------- */
mainLock.lock();
try {
// 获得线程池当前的运行状态runStatus
int rs = runStateOf(ctl.get());
/**
* 满足如下任意条件,即可向线程池中添加线程:
* case1:线程池状态为RUNNING。(请查看isRunning()方法)
* case2:线程池状态为SHUTDOWN并且firstTask为null。
*/
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
// 因为t是新构建的线程,还没有启动。所以,如果是alive状态,说明已经被启动了,则抛出异常。
if (t.isAlive()) {
throw new IllegalThreadStateException();
}
// workers中保存线程池中存在的所有work实例集合
workers.add(w);
int s = workers.size();
if (s > largestPoolSize) { // largestPoolSize用于记录线程池中曾经存在的最大的线程数量
largestPoolSize = s;
}
workerAdded = true;
}
} finally {
/** ----------unlock 解锁操作!!---------- */
mainLock.unlock();
}
if (workerAdded) {
/** 开启线程,执行Worker.run() */
t.start();
workerStarted = true;
}
}
} finally {
// 如果没有开启线程
if (!workerStarted) {
addWorkerFailed(w); // 往线程池中添加worker失败了
}
}
return workerStarted;
}
主要步骤如下:
- 试图将workerCount+1 ;
- workerCount成功+1后,创建Worker,加入集合workers中,并启动Worker线程【在addWorker()会调用thread.start()】 。
其执行流程如下图所示:
2.4.3 Worker线程执行任务——runWorker方法
在 Worker#run() 方法调用了 ThreadPoolExecutor#runWorker() 方法来执行任务,代码如下:
//ThreadPoolExecutor
public class ThreadPoolExecutor extends AbstractExecutorService {
//......
//ThreadPoolExecutor#Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
final void runWorker(Worker w) {
// 获取当前线程,实际上和Worker持有的线程实例是相同的
Thread wt = Thread.currentThread();
// 获取Worker中持有的初始化时传入的任务对象,这里注意存放在临时变量task中
Runnable task = w.firstTask;
// 设置Worker中持有的初始化时传入的任务对象为null
w.firstTask = null;
// 由于Worker初始化时AQS中state设置为-1,这里要先做一次解锁把state更新为0,允许线程中断
w.unlock(); // allow interrupts
// 记录线程是否因为用户异常终结,默认是true
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { //循环获取任务
w.lock();
/**
* 如果线程池正在停止,请确保线程被中断;否则,请确保线程不被中断。
* 这需要在第二种情况下重新检查以处理shutdownNow竞赛,同时清除中断
*
* 同时满足如下两个条件,则执行wt.interrupt()
* 1.线程状态为STOP、TIDYING、TERMINATED 或者 (当前线程被中断(清除中断标记)并且线程状态为STOP、TIDYING、TERMINATED)
* 2.当前线程wt没有被标记中断
*/
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted()) {
wt.interrupt();
}
try {
// 钩子方法,任务执行前
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); // 执行任务,真正做事儿的地方
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
// 钩子方法,任务执行后
afterExecute(task, thrown);
}
} finally {
task = null;
// 执行完成后自增当前工作线程执行的任务数量
w.completedTasks++;
// Worker解锁,本质是AQS释放资源,设置state为0
w.unlock();
}
}
// 如果线程能够执行到最后一行代表线程执行过程中没有由于发生异常导致跳出循环,将 突然结束 标志改为false
completedAbruptly = false;
} finally {
// 处理线程退出,completedAbruptly为true说明由于用户异常导致线程非正常退出
// 线程回收:消除自身在线程池内的引用
processWorkerExit(w, completedAbruptly);
}
}
//......
}
runWorker() 方法的执行过程如下:
- while 循环不断地通过 getTask() 方法获取任务。 (getTask()方法从阻塞队列中取任务)
- 如果线程池正在停止,请确保线程是中断状态;否则,请确保线程不是中断状态。
- 执行任务。 task.run()
- 如果 getTask() 结果为null则跳出循环,执行 processWorkerExit() 方法,销毁线程(线程引用移出线程池)。
执行流程如下图所示:
2.4.4 Worker线程回收——processWorkerExit方法
线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当 Worker 无法获取到任务,也就是获取的任务为空时,循环会结束,Worker 会主动消除自身在线程池内的引用。
我们看 Worker 的部分源码:
//ThreadPoolExecutor
public class ThreadPoolExecutor extends AbstractExecutorService {
//......
//ThreadPoolExecutor#Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
}
final void runWorker(Worker w) {
//......
try {
while (task != null || (task = getTask()) != null) { //循环获取任务,获取不到跳出循环
//......
task.run(); // 执行任务,真正做事儿的地方
//......
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); //线程回收:消除自身在线程池内的引用
}
}
//消除自身在线程池内的引用
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly=false,说明是由getTask返回null导致的,WorkerCount递减的操作已经执行
// 如果completedAbruptly=true,说明是由执行任务的过程中发生异常导致,需要进行WorkerCount递减的操作
if (completedAbruptly) {
decrementWorkerCount(); // 将workerCount减1
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 计算任务的完成数量: 全局的已完成任务记录数加上此将要终结的Worker中的已完成任务数
completedTaskCount += w.completedTasks;
// 将worker从线程池中移除
workers.remove(w);
} finally {
mainLock.unlock();
}
//根据线程池状态判断是否结束线程池
tryTerminate();
int c = ctl.get();
// runState是RUNNING或SHUTDOWN
if (runStateLessThan(c, STOP)) {
// 1.如果线程不是由于抛出用户异常终结,如果允许核心线程超时,则保持线程池中至少存在一个工作线程
// 2.如果线程由于抛出用户异常终结,或者当前工作线程数,那么直接添加一个新的非核心线程
if (!completedAbruptly) {
// 如果允许核心线程超时,最小值为0,否则为corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果最小值为0,同时任务队列不空,则更新最小值为1
if (min == 0 && !workQueue.isEmpty()) {
min = 1;
}
// 工作线程数大于等于最小值,直接返回不新增非核心线程
if (workerCountOf(c) >= min) {
return; // replacement not needed
}
}
addWorker(null, false);
}
}
//......
}
线程回收的工作是在 processWorkerExit() 方法完成的:
事实上,在这个方法中,将线程引用移出线程池【workers.remove(w)】就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程。
processWorkerExit()执行完毕之后,意味着该工作线程的生命周期已经完结:
2.4.4.1 tryTerminate方法——是否结束线程池
在上文 ThreadPoolExecutor#processWorkerExit() 方法中,调用了 tryTerminate() 方法:
//ThreadPoolExecutor
//消除自身在线程池内的引用
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//将worker从线程池中移除......
//根据线程池状态判断是否结束线程池
tryTerminate();
//根据新状态,重新分配线程......
}
//根据线程池状态判断是否结束线程池
final void tryTerminate() {
for (;;) {
int c = ctl.get(); // 获取ctl
// 如果线程池运行状态是RUNNING,或者大于等于TIDYING,或者运行状态为SHUTDOWN且队列为非空,则直接return返回
// 满足此条件直接return,不终结线程池
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果满足终结线程池的条件,且工作线程数不为0,则中断一个空闲的工作线程去确保线程池关闭的信号得以传播并return
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 尝试将线程池状态设置为TIDYING状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//如果CAS成功,执行terminated()钩子方法
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
2.4.4.2 interruptIdleWorkers方法——中断空闲工作线程
源码如下:
//ThreadPoolExecutor
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//若线程没有被中断且tryLock成功(空闲),则中断该线程
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
- interruptIdleWorkers(onlyOne) :若传入 onlyOne 为true,则在中断一个工作线程后跳出循环。若传入 onlyOne 为false,则遍历中断 workers 中所有的工作线程。
- interruptIdleWorkers() :内部调用 interruptIdleWorkers(false) 。
2.5 线程池关闭——shutdown()方法
设置线程池的状态为 SHUTDOWN,并停止接收新任务。线程池中的工作线程会继续执行已有的任务,直到这些任务执行完毕。当所有任务都执行完毕后,线程池中的线程才会被关闭。
源码如下:
//ThreadPoolExecutor
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 权限校验,安全策略相关判断
checkShutdownAccess();
// 设置SHUTDOWN状态
advanceRunState(SHUTDOWN);
// 中断所有的空闲的工作线程
interruptIdleWorkers();
// 钩子方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 调用上面分析果敢的尝试terminate方法,使状态更变为TIDYING,执行钩子方法terminated()后,最终状态更新为TERMINATED
tryTerminate();
}
三、JDK提供的原生线程池
在 Java 中,JDK通过 Executors 类为我们提供了四种封装好的线程池类型,如下所示:
//Executors
//创建一个定长的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
//创建一个单线程的线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
//创建一个可缓存支持灵活回收的线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
//创建一个支持周期执行任务的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
四、线程池在业务中的实践
4.1 业务背景
在当今的互联网业界,为了最大程度利用CPU的多核性能,并行运算的能力是不可或缺的。通过线程池管理线程获取并发性是一个非常基础的操作,让我们来看两个典型的使用线程池获取并发性的场景。
场景1:快速响应用户请求
描述:用户发起的实时请求,服务追求响应时间。比如说用户要查看一个商品的信息,那么我们需要将商品维度的一系列信息如商品的价格、优惠、库存、图片等等聚合起来,展示给用户。
分析:从用户体验角度看,这个结果响应的越快越好,如果一个页面半天都刷不出,用户可能就放弃查看这个商品了。而面向用户的功能聚合通常非常复杂,伴随着调用与调用之间的级联、多级级联等情况,业务开发同学往往会选择使用线程池这种简单的方式,将调用封装成任务并行的执行,缩短总体响应时间。另外,使用线程池也是有考量的,这种场景最重要的就是获取最大的响应速度去满足用户,所以应该不设置队列去缓冲并发任务,调高corePoolSize和maxPoolSize去尽可能创造多的线程快速执行任务。
场景2:快速处理批量任务
描述:离线的大量计算任务,需要快速执行。比如说,统计某个报表,需要计算出全国各个门店中有哪些商品有某种属性,用于后续营销策略的分析,那么我们需要查询全国所有门店中的所有商品,并且记录具有某属性的商品,然后快速生成报表。
分析:这种场景需要执行大量的任务,我们也会希望任务执行的越快越好。这种情况下,也应该使用多线程策略,并行计算。但与响应速度优先的场景区别在于,这类场景任务量巨大,并不需要瞬时的完成,而是关注如何使用有限的资源,尽可能在单位时间内处理更多的任务,也就是吞吐量优先的问题。所以应该设置队列去缓冲并发任务,调整合适的corePoolSize去设置处理任务的线程数。在这里,设置的线程数过多可能还会引发线程上下文切换频繁的问题,也会降低处理任务的速度,降低吞吐量。
4.2 实际问题及方案思考——参数如何配置
线程池使用面临的核心的问题在于:线程池的参数并不好配置。一方面线程池的运行机制不是很好理解,配置合理需要强依赖开发人员的个人经验和知识;另一方面,线程池执行的情况和任务类型相关性较大,IO密集型和CPU密集型的任务运行起来的情况差异非常大,这导致业界并没有一些成熟的经验策略帮助开发人员参考。
关于线程池配置不合理引发的故障,公司内部有较多记录,下面举一些例子:
Case1:2018年XX页面展示接口大量调用降级:
事故描述:XX页面展示接口产生大量调用降级,数量级在几十到上百。
事故原因:该服务展示接口内部逻辑使用线程池做并行计算,由于没有预估好调用的流量,导致最大线程数和阻塞队列设置偏小,大量抛出RejectedExecutionException,触发接口降级条件,示意图如下:
Case2:2018年XX业务服务不可用S2级故障
事故描述:XX业务提供的服务执行时间过长,作为上游服务整体超时,大量下游服务调用失败。
事故原因:该服务处理请求内部逻辑使用线程池做资源隔离,由于队列设置过长,最大线程数设置失效,导致请求数量增加时,大量任务堆积在队列中,任务执行时间过长,最终导致下游服务的大量调用超时失败。示意图如下:
业务中要使用线程池,而使用不当又会导致故障,那么我们怎样才能更好地使用线程池呢?针对这个问题,我们下面延展几个方向:
1. 能否不用线程池?
回到最初的问题,业务使用线程池是为了获取并发性,对于获取并发性,是否可以有什么其他的方案呢替代?我们尝试进行了一些其他方案的调研:
综合考虑,这些新的方案都能在某种情况下提升并行任务的性能,然而本次重点解决的问题是如何更简易、更安全地获得的并发性。另外,Actor模型的应用实际上甚少,只在Scala中使用广泛,协程框架在Java中维护的也不成熟。这三者现阶段都不是足够的易用,也并不能解决业务上现阶段的问题。
2. 追求参数设置合理性?
有没有一种计算公式,能够让开发同学很简易地计算出某种场景中的线程池应该是什么参数呢?
带着这样的疑问,我们调研了业界的一些线程池参数配置方案:
调研了以上业界方案后,我们并没有得出通用的线程池计算方式。并发任务的执行情况和任务类型相关,IO密集型和CPU密集型的任务运行起来的情况差异非常大,但这种占比是较难合理预估的,这导致很难有一个简单有效的通用公式帮我们直接计算出结果。
3. 线程池参数动态化?
尽管经过谨慎的评估,仍然不能够保证一次计算出来合适的参数,那么我们是否可以将修改线程池参数的成本降下来,这样至少可以发生故障的时候可以快速调整从而缩短故障恢复的时间呢?基于这个思考,我们是否可以将线程池的参数从代码中迁移到分布式配置中心上,实现线程池参数可动态配置和即时生效,线程池参数动态化前后的参数修改流程对比如下:
基于以上三个方向对比,我们可以看出参数动态化方向简单有效。
4.3 动态化线程池
4.3.1 整体设计
动态化线程池的核心设计包括以下三个方面:
- 简化线程池配置:线程池构造参数有8个,但是最核心的是3个:corePoolSize、maximumPoolSize,workQueue,它们最大程度地决定了线程池的任务分配和线程分配策略。考虑到在实际应用中我们获取并发性的场景主要是两种:
- 并行执行子任务,提高响应速度。这种情况下,应该使用同步队列,没有什么任务应该被缓存下来,而是应该立即执行。
- 并行执行大批次任务,提升吞吐量。这种情况下,应该使用有界队列,使用队列去缓冲大批量的任务,队列容量必须声明,防止任务无限制堆积。所以线程池只需要提供这三个关键参数的配置,并且提供两种队列的选择,就可以满足绝大多数的业务需求,Less is More。
- 参数可动态修改:为了解决参数不好配,修改参数成本高等问题。在Java线程池留有高扩展性的基础上,封装线程池,允许线程池监听同步外部的消息,根据消息进行修改配置。将线程池的配置放置在平台侧,允许开发同学简单的查看、修改线程池配置。
- 增加线程池监控:对某事物缺乏状态的观测,就对其改进无从下手。在线程池执行任务的生命周期添加监控能力,帮助开发了解线程池状态。
4.3.2 功能架构
动态化线程池提供如下功能:
- 动态调参:支持线程池参数动态调整、界面化操作;包括修改线程池核心大小、最大核心大小、队列长度等;参数修改后及时生效。
- 任务监控:支持应用粒度、线程池粒度、任务粒度的Transaction监控;可以看到线程池的任务执行情况、最大任务执行时间、平均任务执行时间、95/99线等。
- 负载告警:线程池队列任务积压到一定值的时候会通过大象(美团内部通讯工具)告知应用开发负责人;当线程池负载数达到一定阈值的时候会通过大象告知应用开发负责人。
- 操作监控:创建/修改和删除线程池都会通知到应用的开发负责人。
- 操作日志:可以查看线程池参数的修改记录,谁在什么时候修改了线程池参数、修改前的参数值是什么。
- 权限校验:只有应用开发负责人才能够修改应用的线程池参数。
4.3.2.1 参数动态化
JDK原生线程池ThreadPoolExecutor提供了如下几个public的setter方法,如下图所示:
JDK允许线程池使用方通过 ThreadPoolExecutor 的实例来动态设置线程池的核心策略,以setCorePoolSize() 方法为例,在运行期线程池使用方调用此方法设置 corePoolSize 之后,线程池会直接覆盖原来的 corePoolSize 值,并且基于当前值和原始值的比较结果采取不同的处理策略。对于当前值小于当前工作线程数的情况,说明有多余的worker线程,此时会向当前idle(空闲的)的worker线程发起中断请求以实现回收,多余的worker在下次idel的时候也会被回收;对于当前值大于原始值且当前队列中有待执行任务,则线程池会创建新的worker线程来执行队列任务,setCorePoolSize() 具体流程如下:
线程池内部会处理好当前状态做到平滑修改,其他几个方法限于篇幅,这里不一一介绍。重点是基于这几个public方法,我们只需要维护 ThreadPoolExecutor 的实例,并且在需要修改的时候拿到实例修改其参数即可。基于以上的思路,我们实现了线程池参数的动态化、线程池参数在管理平台可配置可修改,其效果图如下图所示:
用户可以在管理平台上通过线程池的名字找到指定的线程池,然后对其参数进行修改,保存后会实时生效。目前支持的动态参数包括核心数、最大值、队列长度等。除此之外,在界面中,我们还能看到用户可以配置是否开启告警、队列等待任务告警阈值、活跃度告警等等。关于监控和告警,我们下面一节会对齐进行介绍。
4.3.2.2 线程池监控
除了参数动态化之外,为了更好地使用线程池,我们需要对线程池的运行状况有感知,比如当前线程池的负载是怎么样的?分配的资源够不够用?任务的执行情况是怎么样的?是长任务还是短任务?基于对这些问题的思考,动态化线程池提供了多个维度的监控和告警能力,包括:线程池活跃度、任务的执行Transaction(频率、耗时)、Reject异常、线程池内部统计信息等等,既能帮助用户从多个维度分析线程池的使用情况,又能在出现问题第一时间通知到用户,从而避免故障或加速故障恢复。
1. 负载监控和告警
线程池负载关注的核心问题是:基于当前线程池参数分配的资源够不够。对于这个问题,我们可以从事前和事中两个角度来看。事前,线程池定义了“活跃度”这个概念,来让用户在发生Reject异常之前能够感知线程池负载问题,线程池活跃度计算公式为:线程池活跃度 = activeCount/maximumPoolSize。这个公式代表当活跃线程数趋向于maximumPoolSize的时候,代表线程负载趋高。事中,也可以从两方面来看线程池的过载判定条件,一个是发生了Reject异常,一个是队列中有等待任务(支持定制阈值)。以上两种情况发生了都会触发告警,告警信息会通过大象推送给服务所关联的负责人。
2. 任务级精细化监控
在传统的线程池应用场景中,线程池中的任务执行情况对于用户来说是透明的。比如在一个具体的业务场景中,业务开发申请了一个线程池同时用于执行两种任务,一个是发消息任务、一个是发短信任务,这两类任务实际执行的频率和时长对于用户来说没有一个直观的感受,很可能这两类任务不适合共享一个线程池,但是由于用户无法感知,因此也无从优化。动态化线程池内部实现了任务级别的埋点,且允许为不同的业务任务指定具有业务含义的名称,线程池内部基于这个名称做Transaction打点,基于这个功能,用户可以看到线程池内部任务级别的执行情况,且区分业务,任务监控示意图如下图所示:
3. 运行时状态实时查看
用户基于JDK原生线程池 ThreadPoolExecutor 提供的几个public的getter方法,可以读取到当前线程池的运行状态以及参数,如下图所示:
动态化线程池基于这几个接口封装了运行时状态实时查看的功能,用户基于这个功能可以了解线程池的实时状态,比如当前有多少个工作线程,执行了多少个任务,队列中等待的任务数等等。效果如下图所示: