并发基础之线程池(Thread Pool)
目录
- 前言
- 何为线程池
- 线程池优势
- 创建线程池方式
- 直接实例化ThreadPoolExecutor类
- JUC Executors 创建线程池
- 线程池挖掘
- Executors简单介绍
- ThreadPoolExecutor核心类
- ThreadPoolExecutor 类构造参数含义
- 线程池运行规则
- 线程设置数量
- 结语
前言
相信大家都知道当前的很多系统架构都要求高并发,所谓高并发(High Concurrency)就是系统通过设计满足多个请求并行的能力,如果非要通俗一点就是系统在单位时间要满足较高的QPS\TPS。那么,如何让系统满足这些高并发能力呢?满足高并发能力不仅仅是分布式解耦、读写分离、限流削峰、缓存、队列,当前还有我们代码编写层面的多线程运用,让单位时间尽可能快的完成业务功能以提升系统吞吐量,吞吐量上来了QPS/TPS自然会提升。所以,今天我们主要对并发基础之线程池简要说明。
何为线程池
线程池英文 Thread Pool,是一种线程处理形式,望文生义就是一个装满线程的池子。当我们需要处理任务时直接从线程池中抓取线程执行,从而减少创建线程开销,避免创建过多线程影响系统开销,也为了尽可能压榨资源提升系统运行效率。
线程池优势
使用线程池的优点我们可以总结为以下几点:
1、重复使用线程,避免频繁创建线程开销,提升系统性能;
2、提供定时调度、单线程、并发数量控制功能,方便实现具体业务场景;
3、灵活的并发线程数量控制,尽可能多的压榨资源提升系统效率,避免过多线程阻塞系统;
4、提供了线程监控功能,可以监控系统运行资源情况
创建线程池方式
直接实例化ThreadPoolExecutor类
直接实例化ThreadPoolExecutor类,传入自定义构造参数
private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
// 线程池核心池的大小
1,
// 线程池的最大线程数
2,
// 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间
1,
// 等待的时间单位
TimeUnit.SECONDS,
// 用来储存等待执行任务的队列
new ArrayBlockingQueue<Runnable>(10),
//线程工厂
new ThreadPoolExecutor.DiscardOldestPolicy());
JUC Executors 创建线程池
Executors 类有很多创建线程池的构造方法,如:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
其本质上还是调用的ThreadPoolExecutor 线程执行类的构造方法。
线程池挖掘
Executors简单介绍
Java JUC 包下Executors类提供了多种创建线程池的方法:
总的来说我们可以分为如下几种线程池类型:
1、Executors.newCachedThreadPool:创建一个可缓存的线程池,如果线程池的大小超过了需要,可以灵活回收空闲线程,如果没有可回收线程,则新建线程
2、Executors.newFixedThreadPool:创建一个定长的线程池,可以控制线程的最大并发数,超出的线程会在队列中等待
3、Executors.newScheduledThreadPool:创建一个定长的线程池,支持定时、周期性的任务执行
4、Executors.newSingleThreadExecutor:创建一个单线程化的线程池,使用一个唯一的工作线程执行任务,保证所有任务按照指定顺序(先入先出或者优先级)执行
5、Executors.newSingleThreadScheduledExecutor:创建一个单线程化的线程池,支持定时、周期性的任务执行
6、Executors.newWorkStealingPool:创建一个具有并行级别的work-stealing线程池
ThreadPoolExecutor核心类
上文已经讲述了我们创建线程池常见的几种方式,这些方式JUC下Executors都已经提供。那么,这些常用的方法是如何创建线程池的呢?我们先查看选择一个创建方式查看源码:
//Executors.newFixedThreadPool 创建定长线程池方式
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
查看源码可知,创建一个定长线程池的静态方法内部实例化了一个线程池执行类ThreadPoolExecutor,再次进入ThreadPoolExecutor类查看源码:
//ThreadPoolExecutor 线程池执行类的一个有参数构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
// ThreadPoolExecutor 线程池执行类内部构造方法,
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
查看源码可知,创建线程池是本质上是实例化了一个ThreadPoolExecutor 线程执行类,且传入了多个构造参数。ThreadPoolExecutor 线程执行类内部此时仅仅是将这些配置参数赋值给这些变量,已备后续线程池执行时候对线程的创建、销毁、调用等操作。
根据线程池的使用场景,我选用excute() 执行方法进行源码解读:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
如源码所示,可知excute()方法主要做了如下三件事:
1、如果运行线程少于corePoolSize 核心线程会尝试启动一个新线程执行任务,并在addWorker方法中检验runState和workerCount以防止报警。
2、如果任务可以成功排队,也要检查是否应该新建一个线程。因为可能在上次检查后已有线程死亡或者线程池关闭,这个时候就需要回滚排队重新创建一个线程执行任务。
3、如果任务不能排队,我们应该尝试新增一个线程。如果新增线程失败我们应该知道已经关闭或者饱和,此时就会用拒绝策略提示用户
当然还有其他的一些方法如:submit()提交任务、shutdown()关闭线程池、shutdownNow()立即关闭线程池。这些源码也较为简单,可以自行阅读。
ThreadPoolExecutor 类构造参数含义
corePoolSize:核心线程数量
maximumPoolSize: 最大线程数量
keepAliveTime:空闲线程存活时间,当线程数量大于corePoolSize核心线程数,且这些线程处于空闲状态,你们超过这个存活时间的线程将被销毁
TimeUnit:线程时间单位
BlockingQueue:线程阻塞队列,我们可以根据自身情况传入喜欢的阻塞队列
ThreadFactory: 线程创建工厂
RejectedExecutionHandler:线程池拒绝策略,线程池根据传入的配置参数在特定情况下会触发拒绝策略,目前常用的拒绝策略有:
1、直接抛出异常,这也是默认的策略,实现类为AbortPolicy;
2、用调用者所在的线程来执行任务,实现类为CallerRunsPolicy;
3、丢弃队列中最靠前的任务并执行当前任务,实现类为DiscardOldestPolicy;
4、直接丢弃当前任务,实现类为DiscardPolicy。
线程池运行规则
线程池执行任务运行规则如下:
1、如果运行线程数小于 corePoolSize 核心线程数,无论核心线程是否空闲都会新建一个线程执行
2、如果运行线程数大于、等于 corePoolSize 核心线程数,小于 maximumPoolSize 最大线程数,如果BlockingQueue 阻塞队列已满则新建一个线程执行,如果没有满则放入阻塞队列等待空闲线程执行
3、如果运行线程数大于maximumPoolSize,且BlockingQueue 阻塞队列已满则会执行RejectedExecutionHandler 异常策略,默认是直接抛出异常
4、如果运行线程数据大于 corePoolSize 核心线程数量,且存在空闲线程的情况,空闲线程会在 keepAliveTime 存活时间超时被踢掉,直至线程数等于 corePoolSize 核心线程数量
线程设置数量
1、对于CPU密集型任务,需要尽量的压榨CPU,一般建议线程数量为 nCPU + 1
2、对于IO密集型任务,一般建议线程数量为 2nCPU
结语
线程池的灵活运用是多线程开发以满足高并发场景的一大利器,在开发高并发功能业务时候,应当合理使用多线程。特别是应当理解 ThreadPoolExecutor 核心类源码设计,以便于我们创建出适宜的线程池。水能载舟亦能覆舟,良好多线程运用可以提升系统吞吐量,滥用多线程也会导致异常情况的发生。