当前位置: 首页 > article >正文

Spring线程池优雅关闭

前言

线程池大家一定不陌生,常被用来异步执行一些耗时的任务。但是线程池如何优雅的关闭,却少有人关注。

当 JVM 进程关闭时,你提交到线程池的任务会被如何处理?如何保证任务不丢?

ThreadPoolExecutor

Java 对线程池的封装是 ThreadPoolExecutor,它的三个核心方法,和线程的优雅关闭息息相关。

1、shutdown

关闭线程池,拒绝接收新任务,已经提交的任务会继续执行。方法会立即返回,并不会阻塞线程直到任务执行完毕。

2、shutdownNow

立即关闭线程池,拒绝接收新任务,尝试中断正在执行的任务,并且返回所有未执行的任务,方法也会立即返回。注意,如果执行中的任务不响应中断,任务可能永远无法停止。

3、awaitTermination

等待线程池终止,直到所有任务执行完毕或者超时。如果线程池终止(关闭且任务执行完)返回 true,超时则返回 false。

要想让线程池优雅关闭,需要解决两个问题:

  • 知道何时关闭线程池
  • 知道如何正确关闭线程池

第一个问题,一般是在程序退出时关闭线程池,最常见的做法,就是运行时注册钩子函数,当 JVM 进程退出时,会自动触发我们自定义的钩子函数,在函数里面正确关闭线程池即可。

第二个问题,如何正确关闭线程池呢?毫无疑问,首先是让线程池不再接收新的任务。其次,针对已经提交到线程池里的任务,有两种处理方式:

  1. 把这些已提交的任务执行完,这必然会延长程序关闭的时间
  2. 取消执行队列中的任务,中断正在执行的任务

可以发现,这两种处理方式其实就分别对应了 shutdown() 和 shutdownNow() 方法。

第2种方式会丢任务,这通常不是我们想要的结果。第1种方式也有它的问题,如果任务非常多或者执行非常耗时,程序就会一直关不掉,这也不是我们想要的结果。此时就需要一个平衡,一般的做法是:先拒绝接收新的任务,然后给定一个超时时间,在这个时间内尽可能的把任务执行完,超时以后如果还有没执行的任务,就先把任务相关的数据保存下来,后面可以重新执行,尽量保证不丢任务。

下面是一个简单示例:

public class ThreadPoolGracefulShutdown {
    final static ExecutorService executorService = Executors.newSingleThreadExecutor();

    @RequiredArgsConstructor
    public static class Task implements Runnable {
        final Integer bizId;

        @Override
        public void run() {
            Threads.sleep(1000);
            System.out.println("task" + bizId + " completed");
        }
    }

    static void registerShutdownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @SneakyThrows
            @Override
            public void run() {
                System.err.println("shutdown hook...");
                executorService.shutdown();//不再接收新的任务,等待已提交的任务执行完
                if (executorService.awaitTermination(5L, TimeUnit.SECONDS)) {
                    System.err.println("任务执行完,正常关闭");
                } else {
                    List<Integer> bizIds = executorService.shutdownNow().stream().map(i -> ((Task) i).bizId).collect(Collectors.toList());
                    System.err.println("任务保存,后续执行:" + bizIds);
                }
            }
        });
    }

    public static void main(String[] args) {
        // 注册钩子函数
        registerShutdownHook();
        // 提交任务
        for (int i = 1; i <= 10; i++) {
            executorService.execute(new Task(i));
        }

        // 2秒后kill掉当前进程
        new Timer(true).schedule(new TimerTask() {
            @Override
            public void run() {
                Systems.killSelf();
            }
        }, 2000L);
    }
}

代码解释,一个单线程的线程池,每个任务执行1秒,一次性注册10个任务。2秒后会自动kill掉当前进程,钩子函数开始执行,关闭线程池并等待5秒让任务继续执行,超时后把没执行的任务数据保存下来,以便后续执行。最终程序运行输出结果:

task1 completed
task2 completed
shutdown hook...
task3 completed
task4 completed
task5 completed
task6 completed
task7 completed
任务保存,后续执行:[8, 9, 10]

Spring线程池优雅关闭

在 Spring 环境下,线程池通常不需要我们手动关闭,Spring 对 ThreadPoolExecutor 做了二次封装,把线程池转化成 bean 进行全生命周期的管理。那么,在 Spring 环境下,线程池又该如何优雅关闭呢?

Spring 对线程池的封装类是 ThreadPoolTaskExecutor,一般用法是定义一个线程池的配置类,然后声明线程池的 bean 交给 Spring 管理。如下示例,声明了一个单线程的线程池,Spring 环境启动时会自动创建线程池,并在程序退出时自动关闭线程池。

@Configuration
public class ThreadPoolConfig {

    @Bean("taskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("TASK-");
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setQueueCapacity(100);
        executor.setKeepAliveSeconds(60);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        return executor;
    }
}

线程池的自动创建和关闭,得益于 Spring Bean 的生命周期管理。ThreadPoolTaskExecutor 实现了 InitializingBean 和 DisposableBean 接口,线程池会在 Bean 初始化时创建,在 Bean 销毁时关闭。

Bean 初始化时,最终会调用ThreadPoolTaskExecutor#initializeExecutor,Spring 会根据配置创建 ThreadPoolExecutor

Bean 销毁时,最终会调用ExecutorConfigurationSupport#shutdown,Spring 会根据配置判断是等待任务执行完还是取消执行。

Spring 线程池优雅关闭的两个重要配置:

  • waitForTasksToCompleteOnShutdown 是否等待任务执行
  • awaitTerminationSeconds 等待线程池终止的秒数

如果 waitForTasksToCompleteOnShutdown = false,Spring 会调用 shutdownNow() 立即关闭线程池,取消任务执行。如果 waitForTasksToCompleteOnShutdown = true,Spring 会调用 shutdown() 继续执行任务,并等待 awaitTerminationMillis 秒数,超时以后,任务还是会丢掉。

看一个示例,配置一个单线程的线程池,关闭时最多等待3秒执行剩下的任务

@Configuration
public class ThreadPoolConfig {
    @Bean("taskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("TASK-");
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setQueueCapacity(100);
        executor.setKeepAliveSeconds(60);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(3);
        return executor;
    }
}

Spring 环境启动后,立即提交10个任务,每个任务执行1秒钟

@Component
@RequiredArgsConstructor
public class TaskRunner implements CommandLineRunner {
    private final Log log = LogFactory.getLog("TaskRunner");
    private final ThreadPoolTaskExecutor taskExecutor;

    @Override
    public void run(String... args) throws Exception {
        for (int i = 0; i < 10; i++) {
            final int index = i + 1;
            taskExecutor.submit(() -> {
                try {
                    Threads.sleep(1000L);
                    log.info("task" + index + " completed");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

启动 Spring 应用,并在2秒后kill掉进程。经过我们分析,最终应该只会有5个任务被执行。运行,发现确实如此。

2025-01-03 16:15:01 , TASK-1 , TaskRunner , task1 completed
killSelf...
2025-01-03 16:15:02 , TASK-1 , TaskRunner , task2 completed
2025-01-03 16:15:03 , TASK-1 , TaskRunner , task3 completed
2025-01-03 16:15:04 , TASK-1 , TaskRunner , task4 completed
2025-01-03 16:15:05 , TASK-1 , TaskRunner , task5 completed
2025-01-03 16:15:05 , SpringApplicationShutdownHook , o.s.s.c.ThreadPoolTaskExecutor , Timed out while waiting for executor 'taskExecutor' to terminate

需要注意的是,awaitTerminationSeconds 必须设置,不然 Spring 只调用 shutdown() 是非阻塞的,没有阻塞当前线程,程序会立即退出,线程池里的任务来不及执行,程序就已经退出了。awaitTerminationSeconds 代表程序最多等待任务执行的时间,在这之后哪怕任务没执行完,也会退出程序。

总结,Spring 环境下的线程池优雅关闭,需要设置 waitForTasksToCompleteOnShutdown = true,再设置一个充分的 awaitTerminationSeconds 让线程池有足够多的时间去执行剩下的任务。万一还是有任务超时了,就得通过改造把这些任务数据保存下来,下次继续执行。

任务依赖其它资源的问题

在 Spring 环境下,简单的异步任务通过设置一个足够长的 awaitTerminationSeconds 就可解决问题。一旦异步任务的执行依赖其它资源,但是 Spring 恰好又把这些资源释放了,任务的执行还是会失败。

举个例子,异步任务需要往 Redis 里面写数据,Spring 在收到程序退出的信号时,就会关闭上下文环境,销毁 IOC 容器释放资源。结果就是,线程池里的任务还在跑,但是 Redis 连接已经关闭了,可想而知,任务一定会跑失败。这也不是我们想要的结果。

下面这段示例代码,任务执行耗时1秒,最后会往 Redis 写数据,一次性注册5个任务,2秒后kill掉进程,给线程池预留3秒的执行时间,理论上5个任务都会执行完。

@Component
@RequiredArgsConstructor
public class TaskRunner implements CommandLineRunner {
    private final Log log = LogFactory.getLog("TaskRunner");
    private final ThreadPoolTaskExecutor taskExecutor;
    private final RedisClient redisClient;

    @Override
    public void run(String... args) throws Exception {
        for (int i = 0; i < 5; i++) {
            final int index = i + 1;
            taskExecutor.submit(() -> {
                try {
                    Threads.sleep(1000L);
                    redisClient.write("key", String.valueOf(index));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

运行的结果是,5个任务都执行了,但是后面3个都失败了,因为 Redis 连接已经被关闭。

要解决这个问题,得从 Spring 销毁 Bean 的顺序上入手,因为线程池依赖 RedisClient,所以 RedisClient 必须在线程池销毁之后再销毁,好在 Spring 提供了相应的注解,只需加上@DependsOn即可。

@Configuration
public class ThreadPoolConfig {
    @Bean("taskExecutor")
    @DependsOn("redisClient")// 确保在redisClient之前销毁
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("TASK-");
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setQueueCapacity(100);
        executor.setKeepAliveSeconds(60);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(3);
        return executor;
    }
}

再跑一遍,5个任务都执行成功了。

最后还有一个问题,异步任务很复杂,具体我也不知道依赖哪些外部资源,没办法针对性的配置,怎么办?

这种场景下,我建议直接监听 ContextClosedEvent。因为 ContextClosedEvent 是早于 Bean 销毁前发出的,在监听器里面等到线程池全部销毁完毕,再让 Spring 销毁其它 bean。也就是说,线程池的销毁必须先于其它所有 bean 之前,简单粗暴。

下面是一个简单示例:

@Component
public class ApplicationCloseListener implements ApplicationListener<ContextClosedEvent>, ApplicationContextAware {
    ApplicationContext applicationContext;

    @Override
    public void onApplicationEvent(ContextClosedEvent event) {
        // 取出所有ThreadPoolTaskExecutor,等待销毁
        applicationContext.getBeansOfType(ThreadPoolTaskExecutor.class).values().forEach(ExecutorConfigurationSupport::destroy);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

尾巴

线程池的优雅关闭,关键在于给它一个充分的超时时间,确保剩余任务在这个时间内可以执行完毕。万一还有没执行的任务,也可以先把数据保存下来,后续重新执行。在 Spring 环境下针对线程池做了二次封装,通过配置即可实现优雅关闭,但是需要注意异步任务依赖其它 bean 的问题,依赖的 bean 可能早于线程池被销毁,任务执行就会报错,可以通过配置和监听 ContextClosedEvent 两种方式延迟销毁依赖的 bean。


http://www.kler.cn/a/466589.html

相关文章:

  • 【强化学习】演员评论家Actor-Critic算法(万字长文、附代码)
  • linux-centos-安装miniconda3
  • Fabric部署-docker安装
  • PHP7和PHP8的最佳实践
  • 基于YOLO11的道路缺陷检测系统
  • ScheduledExecutorService详解
  • YOLOv8/YOLOv11改进 添加CBAM、GAM、SimAM、EMA、CAA、ECA、CA等多种注意力机制
  • GWAS数据和软件下载
  • JeeSite 快速开发平台:全能企业级快速开发解决方案|GitCode 光引计划征文展示
  • 【深度学习进阶】基于CNN的猫狗图片分类项目
  • pycharm 命令行下的链接,不自动形成链接和定位了。
  • 深入解析-正则表达式
  • github加速源配置
  • RK3588+FPGA全国产异步LED显示屏控制卡/屏幕拼接解决方案
  • HTML——61. 单行文本框和密码输入框(主讲input元素的type属性)
  • DC-2 靶场渗透
  • 深度解析 LDA 与聚类结合的文本主题分析实战
  • Flutter踩坑记-第三方SDK不兼容Gradle 8.0,需适配namespace
  • 【Java回顾】Day2 正则表达式----异常处理
  • 曲速引擎前端代码生成器 6.6.0 介绍
  • LLM - 使用 LLaMA-Factory 部署大模型 HTTP 多模态服务 (4)
  • 小程序26-事件绑定和事件对象
  • c#中集中常见的集合去重方式
  • 智能型企业的发展与开源AI智能名片S2B2C商城小程序的应用
  • docker 安装与配置 gitlab
  • 为什么 Netflix 部分功能放弃React,选择“原生”JavaScript?