JAVA 线程池,及7大参数,4大拒绝策略详解
为什么要使用线程池
线程的生命周期:运行、就绪、运行、阻塞、死亡
下面是一个简单的创建多线程的方法。注意:工作中不可取。
创建线程的时候,我们避不开线程的生命周期。上面的方法虽然可以创建多线程,但是创建完成后,我们可能还需要进行销毁,如果中间出现异常就可能会导致回收不了,或者在线程里面又创建一个线程,而线程切换也需要消耗时间和空间,就会导致线程管理起来很困难。
为了解决找个问题,我们参考一下阿里的做法:通过线程池的方式来管理线程。当然如果你有其他更好的管理线程的方式也可以。
JDK 常用的线程池
线程池作为一种池化技术,实现起来比较困难,但是 JDK下面的 java.util.concurrent
包提供了几种类型的线程池,主要通过 Executors
类中的静态工厂方法来创建。
下面简单列出两种线程池的使用示例
public static void main(String[] args) {
System.out.println("--- 创建唯一的线程 ---");
ExecutorService executorService01 = Executors.newSingleThreadExecutor();
executorService01.execute(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + ":" + i);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
System.out.println("--- Fixed 线程数5---");
ExecutorService executorService02 = Executors.newFixedThreadPool(5);
for (int t = 0; t < 10; t++) {
executorService02.execute(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + ":" + i);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
}
}
上面代码运行,我们发现,线程创建后不会自动被销毁,而且线程可以被复用,可以减少每次创建线程消耗的时间和资源的浪费。
但是我们看阿里的开发手册上不允许使用 Executors
来创建,而需要使用 ThreadPoolExecutor
类来创建,如下图:
我们看一下,刚刚使用的两种线程池的源码,发现其实都是由 ThreadPoolExecutor
类创建的线程。
所以后续我们就可以使用 ThreadPoolExecutor
类来自定义线程池。
为什么阿里不允许使用 Executors
来创建线程池呢?我们可以在文档上看到
我们按照手册上写的看一下源码。
我们看到它是使用阻塞式队列来存线程,以链表的方式。链表理论上是没有容量限制的,但是其实是有限制的,最大的容量就是 Integer.MAX_VALUE(23亿左右),理论上这个queue 可以放 23亿左右,因为没有限制长度,所以可能造成这个 queue 里面存放很多线程,从而造成 OOM 错误。
为了避免上面的问题,我们一般自定义线程池。
自定义线程池
线程池结构图
如上图所示,执行顺序为:当提交任务数大于核心线程数时,会优先将任务放到阻塞队列中。当阻塞队列饱和时,会扩充线程池中的线程数,直到达到最大线程数。当任务数超出最大线程数时,就会触发线程池的拒绝策略。
核心参数
序列 | 参数名 | 含义 |
---|---|---|
1 | corePoolSize | 核心线程数 |
2 | maximumPoolSize | 最大线程数(必须大于核心线程数) |
3 | keepAliveTime | 空闲线程的存活时间 |
4 | Unit | 时间单位 |
5 | workQueue | 用于存放任务的队列 |
6 | threadFactory | 线程工厂、用来创建新线程 |
7 | handler | 处理被拒绝的任务 |
-
核心线程数(corePoolSize):指线程池中保持活动状态的最少线程数,即使在空闲时也不会被回收。当有新的任务提交时,如果当前活动线程数小于核心线程数,则会创建新的线程来处理任务。
-
最大线程数(maximumPoolSize):线程池中允许存在的最大线程数,当任务队列已满且当前活动线程数小于最大线程数时,线程池会创建新的线程,直到达到最大线程数。
-
空闲线程存活时间(keepAliveTime):指空闲线程在被回收之前可以等待新任务的最长时间。当线程池中的线程数量超过核心线程数,并且处于空闲状态时,这些多余的线程在超过指定时间后会被回收销毁。
-
时间单位(unit):空闲线程存活时间的单位,可以是秒、毫秒、分钟等。
-
任务队列(workQueue):用于存放等待执行任务的阻塞队列。可以选择不同的队列类型来实现不同的调度策略。当线程池中的线程都在工作且任务队列已满时,新的任务会被拒绝执行。
-
线程工厂(threadFactory):用于创建新线程的工厂,可以自定义线程的名称、优先级等属性。
-
拒绝策略(rejectedExecutionHandler):当线程池已满并且任务无法执行时的处理策略,如何处理新提交的任务。
拒绝策略
拒绝策略在线程池已满且无法接受新的任务时会被触发:
- 线程池的线程数量已经达到了最大线程数,无法再创建新的线程来执行任务。
- 线程池中的任务队列已满,无法接受新的任务。
当同时满足上面两个条件时,拒绝策略会被触发,根据所选的拒绝策略进行相应的处理。
需要注意的是,拒绝策略的触发并不代表任务一定会被丢弃或忽视,而是指当线程池已达到最大容量且任务队列已满时,新提交的任务无法被正常处理,因此需要通过拒绝策略来决定如何处理这些无法接受的任务。
-
AbortPolicy (中止策略)
默认的拒绝策略,当线程池已满且任务队列也已满时,会抛出 RejectedExecutionException 异常来拒绝新提交的任务。
-
CallerRunsPolicy (调用者运行策略)
当线程池已满且任务队列也已满时,新提交的任务会由提交任务的线程来执行(调用线程自己执行),而不会开启新的线程。 -
DiscardPolicy (丢弃策略)
当线程池已满且任务队列也已满时,直接丢弃新提交的任务,不做任何处理。 -
DiscardOldestPolicy (丢弃最老策略)
当线程池已满且任务队列也已满时,丢弃最早提交的任务,然后尝试执行新提交的任务。
自定义线程池示例代码
import java.util.concurrent.*;
public class CustomerThreadPool {
static class MyAbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public MyAbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("任务 " + r.toString() +
" 拒绝 from " +
e.toString());
}
}
public static void main(String[] args){
int corePoolSize = 4;
int maximumPoolSize = 10;
long keepAliveTime = 500;
TimeUnit unit = TimeUnit.MILLISECONDS;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
RejectedExecutionHandler handler = null;
ThreadPoolExecutor myPool = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
r->{
System.out.println("创建线程:"+r);
return new Thread(r);
},
new MyAbortPolicy()
);
for (int t=0;t<20;t++) {
myPool.execute(()->{
for (int i=0;i<10;i++) {
System.out.println(Thread.currentThread().getName()+":"+i);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}