Spring线程池优雅关闭
前言
线程池大家一定不陌生,常被用来异步执行一些耗时的任务。但是线程池如何优雅的关闭,却少有人关注。
当 JVM 进程关闭时,你提交到线程池的任务会被如何处理?如何保证任务不丢?
ThreadPoolExecutor
Java 对线程池的封装是 ThreadPoolExecutor,它的三个核心方法,和线程的优雅关闭息息相关。
1、shutdown
关闭线程池,拒绝接收新任务,已经提交的任务会继续执行。方法会立即返回,并不会阻塞线程直到任务执行完毕。
2、shutdownNow
立即关闭线程池,拒绝接收新任务,尝试中断正在执行的任务,并且返回所有未执行的任务,方法也会立即返回。注意,如果执行中的任务不响应中断,任务可能永远无法停止。
3、awaitTermination
等待线程池终止,直到所有任务执行完毕或者超时。如果线程池终止(关闭且任务执行完)返回 true,超时则返回 false。
要想让线程池优雅关闭,需要解决两个问题:
- 知道何时关闭线程池
- 知道如何正确关闭线程池
第一个问题,一般是在程序退出时关闭线程池,最常见的做法,就是运行时注册钩子函数,当 JVM 进程退出时,会自动触发我们自定义的钩子函数,在函数里面正确关闭线程池即可。
第二个问题,如何正确关闭线程池呢?毫无疑问,首先是让线程池不再接收新的任务。其次,针对已经提交到线程池里的任务,有两种处理方式:
- 把这些已提交的任务执行完,这必然会延长程序关闭的时间
- 取消执行队列中的任务,中断正在执行的任务
可以发现,这两种处理方式其实就分别对应了 shutdown() 和 shutdownNow() 方法。
第2种方式会丢任务,这通常不是我们想要的结果。第1种方式也有它的问题,如果任务非常多或者执行非常耗时,程序就会一直关不掉,这也不是我们想要的结果。此时就需要一个平衡,一般的做法是:先拒绝接收新的任务,然后给定一个超时时间,在这个时间内尽可能的把任务执行完,超时以后如果还有没执行的任务,就先把任务相关的数据保存下来,后面可以重新执行,尽量保证不丢任务。
下面是一个简单示例:
public class ThreadPoolGracefulShutdown {
final static ExecutorService executorService = Executors.newSingleThreadExecutor();
@RequiredArgsConstructor
public static class Task implements Runnable {
final Integer bizId;
@Override
public void run() {
Threads.sleep(1000);
System.out.println("task" + bizId + " completed");
}
}
static void registerShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@SneakyThrows
@Override
public void run() {
System.err.println("shutdown hook...");
executorService.shutdown();//不再接收新的任务,等待已提交的任务执行完
if (executorService.awaitTermination(5L, TimeUnit.SECONDS)) {
System.err.println("任务执行完,正常关闭");
} else {
List<Integer> bizIds = executorService.shutdownNow().stream().map(i -> ((Task) i).bizId).collect(Collectors.toList());
System.err.println("任务保存,后续执行:" + bizIds);
}
}
});
}
public static void main(String[] args) {
// 注册钩子函数
registerShutdownHook();
// 提交任务
for (int i = 1; i <= 10; i++) {
executorService.execute(new Task(i));
}
// 2秒后kill掉当前进程
new Timer(true).schedule(new TimerTask() {
@Override
public void run() {
Systems.killSelf();
}
}, 2000L);
}
}
代码解释,一个单线程的线程池,每个任务执行1秒,一次性注册10个任务。2秒后会自动kill掉当前进程,钩子函数开始执行,关闭线程池并等待5秒让任务继续执行,超时后把没执行的任务数据保存下来,以便后续执行。最终程序运行输出结果:
task1 completed
task2 completed
shutdown hook...
task3 completed
task4 completed
task5 completed
task6 completed
task7 completed
任务保存,后续执行:[8, 9, 10]
Spring线程池优雅关闭
在 Spring 环境下,线程池通常不需要我们手动关闭,Spring 对 ThreadPoolExecutor 做了二次封装,把线程池转化成 bean 进行全生命周期的管理。那么,在 Spring 环境下,线程池又该如何优雅关闭呢?
Spring 对线程池的封装类是 ThreadPoolTaskExecutor,一般用法是定义一个线程池的配置类,然后声明线程池的 bean 交给 Spring 管理。如下示例,声明了一个单线程的线程池,Spring 环境启动时会自动创建线程池,并在程序退出时自动关闭线程池。
@Configuration
public class ThreadPoolConfig {
@Bean("taskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("TASK-");
executor.setCorePoolSize(1);
executor.setMaxPoolSize(1);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
return executor;
}
}
线程池的自动创建和关闭,得益于 Spring Bean 的生命周期管理。ThreadPoolTaskExecutor 实现了 InitializingBean 和 DisposableBean 接口,线程池会在 Bean 初始化时创建,在 Bean 销毁时关闭。
Bean 初始化时,最终会调用ThreadPoolTaskExecutor#initializeExecutor
,Spring 会根据配置创建 ThreadPoolExecutor
Bean 销毁时,最终会调用ExecutorConfigurationSupport#shutdown
,Spring 会根据配置判断是等待任务执行完还是取消执行。
Spring 线程池优雅关闭的两个重要配置:
- waitForTasksToCompleteOnShutdown 是否等待任务执行
- awaitTerminationSeconds 等待线程池终止的秒数
如果 waitForTasksToCompleteOnShutdown = false,Spring 会调用 shutdownNow() 立即关闭线程池,取消任务执行。如果 waitForTasksToCompleteOnShutdown = true,Spring 会调用 shutdown() 继续执行任务,并等待 awaitTerminationMillis 秒数,超时以后,任务还是会丢掉。
看一个示例,配置一个单线程的线程池,关闭时最多等待3秒执行剩下的任务
@Configuration
public class ThreadPoolConfig {
@Bean("taskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("TASK-");
executor.setCorePoolSize(1);
executor.setMaxPoolSize(1);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(3);
return executor;
}
}
Spring 环境启动后,立即提交10个任务,每个任务执行1秒钟
@Component
@RequiredArgsConstructor
public class TaskRunner implements CommandLineRunner {
private final Log log = LogFactory.getLog("TaskRunner");
private final ThreadPoolTaskExecutor taskExecutor;
@Override
public void run(String... args) throws Exception {
for (int i = 0; i < 10; i++) {
final int index = i + 1;
taskExecutor.submit(() -> {
try {
Threads.sleep(1000L);
log.info("task" + index + " completed");
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}
启动 Spring 应用,并在2秒后kill掉进程。经过我们分析,最终应该只会有5个任务被执行。运行,发现确实如此。
2025-01-03 16:15:01 , TASK-1 , TaskRunner , task1 completed
killSelf...
2025-01-03 16:15:02 , TASK-1 , TaskRunner , task2 completed
2025-01-03 16:15:03 , TASK-1 , TaskRunner , task3 completed
2025-01-03 16:15:04 , TASK-1 , TaskRunner , task4 completed
2025-01-03 16:15:05 , TASK-1 , TaskRunner , task5 completed
2025-01-03 16:15:05 , SpringApplicationShutdownHook , o.s.s.c.ThreadPoolTaskExecutor , Timed out while waiting for executor 'taskExecutor' to terminate
需要注意的是,awaitTerminationSeconds 必须设置,不然 Spring 只调用 shutdown() 是非阻塞的,没有阻塞当前线程,程序会立即退出,线程池里的任务来不及执行,程序就已经退出了。awaitTerminationSeconds 代表程序最多等待任务执行的时间,在这之后哪怕任务没执行完,也会退出程序。
总结,Spring 环境下的线程池优雅关闭,需要设置 waitForTasksToCompleteOnShutdown = true,再设置一个充分的 awaitTerminationSeconds 让线程池有足够多的时间去执行剩下的任务。万一还是有任务超时了,就得通过改造把这些任务数据保存下来,下次继续执行。
任务依赖其它资源的问题
在 Spring 环境下,简单的异步任务通过设置一个足够长的 awaitTerminationSeconds 就可解决问题。一旦异步任务的执行依赖其它资源,但是 Spring 恰好又把这些资源释放了,任务的执行还是会失败。
举个例子,异步任务需要往 Redis 里面写数据,Spring 在收到程序退出的信号时,就会关闭上下文环境,销毁 IOC 容器释放资源。结果就是,线程池里的任务还在跑,但是 Redis 连接已经关闭了,可想而知,任务一定会跑失败。这也不是我们想要的结果。
下面这段示例代码,任务执行耗时1秒,最后会往 Redis 写数据,一次性注册5个任务,2秒后kill掉进程,给线程池预留3秒的执行时间,理论上5个任务都会执行完。
@Component
@RequiredArgsConstructor
public class TaskRunner implements CommandLineRunner {
private final Log log = LogFactory.getLog("TaskRunner");
private final ThreadPoolTaskExecutor taskExecutor;
private final RedisClient redisClient;
@Override
public void run(String... args) throws Exception {
for (int i = 0; i < 5; i++) {
final int index = i + 1;
taskExecutor.submit(() -> {
try {
Threads.sleep(1000L);
redisClient.write("key", String.valueOf(index));
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}
运行的结果是,5个任务都执行了,但是后面3个都失败了,因为 Redis 连接已经被关闭。
要解决这个问题,得从 Spring 销毁 Bean 的顺序上入手,因为线程池依赖 RedisClient,所以 RedisClient 必须在线程池销毁之后再销毁,好在 Spring 提供了相应的注解,只需加上@DependsOn
即可。
@Configuration
public class ThreadPoolConfig {
@Bean("taskExecutor")
@DependsOn("redisClient")// 确保在redisClient之前销毁
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("TASK-");
executor.setCorePoolSize(1);
executor.setMaxPoolSize(1);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(3);
return executor;
}
}
再跑一遍,5个任务都执行成功了。
最后还有一个问题,异步任务很复杂,具体我也不知道依赖哪些外部资源,没办法针对性的配置,怎么办?
这种场景下,我建议直接监听 ContextClosedEvent。因为 ContextClosedEvent 是早于 Bean 销毁前发出的,在监听器里面等到线程池全部销毁完毕,再让 Spring 销毁其它 bean。也就是说,线程池的销毁必须先于其它所有 bean 之前,简单粗暴。
下面是一个简单示例:
@Component
public class ApplicationCloseListener implements ApplicationListener<ContextClosedEvent>, ApplicationContextAware {
ApplicationContext applicationContext;
@Override
public void onApplicationEvent(ContextClosedEvent event) {
// 取出所有ThreadPoolTaskExecutor,等待销毁
applicationContext.getBeansOfType(ThreadPoolTaskExecutor.class).values().forEach(ExecutorConfigurationSupport::destroy);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
尾巴
线程池的优雅关闭,关键在于给它一个充分的超时时间,确保剩余任务在这个时间内可以执行完毕。万一还有没执行的任务,也可以先把数据保存下来,后续重新执行。在 Spring 环境下针对线程池做了二次封装,通过配置即可实现优雅关闭,但是需要注意异步任务依赖其它 bean 的问题,依赖的 bean 可能早于线程池被销毁,任务执行就会报错,可以通过配置和监听 ContextClosedEvent 两种方式延迟销毁依赖的 bean。