当前位置: 首页 > article >正文

Spring Boot 线程池自定义拒绝策略:解决任务堆积与丢失问题

如何通过自定义线程池提升系统稳定性

背景

在高并发系统中,线程池管理至关重要。默认线程池可能导致:

  1. 资源浪费(创建过多线程导致 OOM)
  2. 任务堆积(队列满后任务被拒绝)
  3. 任务丢失(默认拒绝策略丢弃任务
    为了防止这些问题,我们使用 Spring Boot 自定义线程池,并优化 异常处理 和 拒绝策略。

线程池方案设计

在 ExecutorConfig 类中,我们定义了两个线程池:

  1. myExecutor:用于普通任务,采用CallerRunsPolicy 避免任务丢失。
  2. oneExecutor:用于信号计算任务(单线程模式),具有 自定义异常处理 和 阻塞式拒绝策略。

代码解析

线程池 myExecutor(通用任务池)

@Bean(name = "myExecutor")
public Executor myExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(threadProperties.getCorePoolSize());
    executor.setMaxPoolSize(threadProperties.getMaxPoolSize());
    executor.setQueueCapacity(threadProperties.getQueueCapacity());
    executor.setThreadNamePrefix("signal-executor-");
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.initialize();
    return executor;
}

设计要点:
CallerRunsPolicy:线程池满了,主线程执行任务,防止丢失但可能影响性能。

线程池 oneExecutor(单线程计算池)

@Bean(name = "oneExecutor")
public Executor oneExecutor() {
    ThreadFactory threadFactory = new BasicThreadFactory.Builder()
            .uncaughtExceptionHandler(new MyThreadException())
            .namingPattern("one-thread-%s")
            .build();
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(1);
    executor.setMaxPoolSize(1);
    executor.setQueueCapacity(1);
    executor.setThreadFactory(threadFactory);
    executor.setThreadGroup(new ThreadGroup("1"));
    executor.setRejectedExecutionHandler(new CustomRejectedExecutionHandler());
    executor.initialize();
    return executor;
}

设计要点:
单线程池(保证任务顺序执行),如果无须,那就按照当前的服务节点配置来设置参数
自定义异常处理(防止线程因异常崩溃)
自定义拒绝策略(任务队列满时阻塞等待)

自定义异常处理

class MyThreadException implements Thread.UncaughtExceptionHandler {
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        log.error("异常: {},线程: {}", ExceptionUtils.getStackTrace(e), t.getName());
    }
}

作用:防止线程因未捕获异常直接终止,提升系统稳定性。当然这个是处理线程池中子任务处理业务逻辑的时候发生业务异常的处理方式,除此之外还有其他的解决方案

异常处理
  • afterExecute() 处理异常(可扩展) :用于处理执行过程中抛出的异常
  • uncaughtExceptionHandler 处理未捕获异常(默认 JVM 打印堆栈): 用于处理线程未捕获的异常;
  • RejectedExecutionHandler 处理任务拒绝:处理任务被拒绝的情况。

处理顺序:

  1. 当任务执行时,如果任务抛出异常,它会首先被 afterExecute() 捕获,并且你可以在这里进行进一步的处理。
  2. 如果任务中的异常没有被 afterExecute() 捕获或处理,且是未捕获异常,它会交由 uncaughtExceptionHandler 进行处理。
  3. RejectedExecutionHandler 是处理线程池拒绝接受新任务的情况,这通常和任务执行过程中的异常无关,主要处理线程池饱和时的情况。
    注意:beforeExecute() 在任务开始执行前调用,通常用于准备工作;
    异常处理上,beforeExecute() 不会直接处理任务执行过程中的异常,但可以捕获并处理自己内部的异常;

相关源码分析:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1️⃣ 线程池当前线程数 < corePoolSize,则尝试新增核心线程执行任务
if (workerCountOf© < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2️⃣ 线程池已满,尝试加入工作队列
if (isRunning© && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command); // 任务队列中的任务被拒绝
else if (workerCountOf(recheck) == 0)
addWorker(null, false); // 防止线程池为空,确保有线程执行任务
}
// 3️⃣ 线程池满且队列满,尝试新增非核心线程
else if (!addWorker(command, false))
reject(command); // 线程池已满,拒绝任务
}

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许中断
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 1️⃣ 执行任务
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); // ⚠ 任务执行点
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown); // 2️⃣ 任务执行后的扩展方法
}
task = null;
w.completedTasks++;
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 3️⃣ 任务异常退出,删除该线程
}
}

自定义拒绝策略-重新放回队列中
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            if (!executor.isShutdown()) {
                log.info("队列已满,阻塞等待...");
                executor.getQueue().put(r);
                log.info("任务已加入队列");
            }
        } catch (Exception e) {
            log.error("拒绝策略异常", e);
        }
    }
}

作用:
默认拒绝策略丢弃任务,而此策略会阻塞等待,确保任务不丢失。
适用于任务量较大,但不能丢失任务的场景(如消息队列处理)

自定义拒绝策略-主线程执行
    /**
     * 自定义线程池,防止使用默认线程池导致内存溢出
     *
     * @param
     * @return
     * @author bu.junjie
     * @date 2021/11/10 10:00
     */
    @Bean(name = "myExecutor")
    public Executor myExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(threadProperties.getCorePoolSize());
        executor.setMaxPoolSize(threadProperties.getMaxPoolSize());
        executor.setQueueCapacity(threadProperties.getQueueCapacity());
        executor.setThreadNamePrefix("signal-executor-");
        // 使用此策略,如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行,阻塞主线程
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

适用场景

✅ 高并发请求(如 HTTP 任务)
✅ 后台数据处理(如日志分析、批量计算)
✅ 长时间任务(如大文件处理、消息队列消费)

总结

  • 自定义线程池 防止资源浪费,提升吞吐量。
  • 异常处理 避免线程因未捕获异常而终止。
  • 优化拒绝策略 防止任务丢失,提高系统可靠性。

线程池优化是高并发系统的关键,希望本篇博客能帮助你更好地理解和应用线程池! 🚀🚀🚀

完整代码示例

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import javax.annotation.Resource;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 线程池配置参数
 *
 * @version 1.0.0
 * @createTime 2025-11-09 14:01
 */
@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {


    @Resource
    private ThreadProperties threadProperties;


    /**
     * 自定义线程池,防止使用默认线程池导致内存溢出
     *
     * @param
     * @return
     * @author bu.junjie
     * @date 2021/11/10 10:00
     */
    @Bean(name = "myExecutor")
    public Executor myExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(threadProperties.getCorePoolSize());
        executor.setMaxPoolSize(threadProperties.getMaxPoolSize());
        executor.setQueueCapacity(threadProperties.getQueueCapacity());
        executor.setThreadNamePrefix("signal-executor-");
        // 使用此策略,如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行,阻塞主线程
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }


    /**
     * 信号计算时的线程池(1号线程池)
     *
     * @param
     * @return
     * @author bu.junjie
     * @date 2022/1/5 13:01
     */
    @Bean(name = "oneExecutor")
    public Executor oneExecutor() {
        ThreadFactory threadFactory = new BasicThreadFactory.Builder()
                .uncaughtExceptionHandler(new MyThreadException())
                .namingPattern("one-thread-%s")
                .build();
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setThreadFactory(threadFactory);
        executor.setQueueCapacity(1);
        executor.setThreadGroup(new ThreadGroup("1"));
        executor.setRejectedExecutionHandler(new CustomRejectedExecutionHandler());
        executor.initialize();
        return executor;
    }

    class MyThreadException implements Thread.UncaughtExceptionHandler {

        /**
         * Method invoked when the given thread terminates due to the
         * given uncaught exception.
         * <p>Any exception thrown by this method will be ignored by the
         * Java Virtual Machine.
         *
         * @param t the thread
         * @param e the exception
         */
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            log.error("MyThreadException is   exception=【{}】,Thread = 【{}】", ExceptionUtils.getStackTrace(e), t.getName());
        }
    }

    /**
     * 拒绝策略优化
     *
     * @param
     * @author bu.junjie
     * @date 2022/1/8 14:06
     * @return
     */
    public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                // 核心改造点,由blockingqueue的offer改成put阻塞方法
                if (!executor.isShutdown()) {
                    long start = System.currentTimeMillis();
                    log.info("当前阻塞队列已满开始请求存放队列束!!!");
                    executor.getQueue().put(r);
                    log.info("存放阻塞队列成功,阻塞时间time = 【{}】", System.currentTimeMillis() - start);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

思考

为什么拒绝策略要重新抛出异常?

我们会发现默认的四种拒绝策略在处理完业务逻辑之后还会重新抛出异常,就算你是自定义的拒绝策略也需要重新抛出异常,为什么呢?不抛出会怎么样?

如果不抛出异常,调用方(业务代码)无法感知任务被拒绝,可能导致任务丢失或业务逻辑异常。

场景分析

当线程池队列满了时,会触发 rejectedExecution 方法。如果我们只是记录日志,而不抛出异常:

  • 主线程会继续执行,但任务并未真正执行,业务方无法感知到这个问题。
  • 可能导致数据丢失,尤其是在关键业务(如支付、订单、消息处理)场景中。
重新抛出异常的好处

✅ 保证调用方可以感知任务拒绝,决定是否降级处理、重试或报警。
✅ 防止静默丢失任务,保证业务的可靠性。
✅ 与 Spring 线程池默认行为保持一致,防止意外吞掉异常。

代码示例

❌ 错误示例(未抛出异常,可能导致任务丢失)

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            if (!executor.isShutdown()) {
                log.warn("队列已满,任务阻塞等待...");
                executor.getQueue().put(r); // 可能抛出异常
                log.info("任务已放入队列");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt(); // 仅恢复中断状态,但未通知调用方
        }
    }
}

问题:
调用方不会收到异常,以为任务已经成功执行,但其实可能丢失了。
例如,在支付系统中,如果订单更新任务丢失,可能导致订单状态未更新。

✅ 正确示例(重新抛出异常,保证调用方可感知)

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            if (!executor.isShutdown()) {
                log.warn("队列已满,阻塞等待...");
                executor.getQueue().put(r);
                log.info("任务成功进入队列");
                return; // 任务成功加入队列后不需要抛异常
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt(); // 恢复线程中断状态
            throw new RejectedExecutionException("任务提交被中断", e);
        } catch (Exception e) {
            log.error("任务拒绝策略发生异常", e);
            throw new RejectedExecutionException("自定义拒绝策略异常", e);
        }
    }
}

改进点:
任务成功放入队列时不会抛异常,避免不必要的错误。
如果 put() 失败,抛出 RejectedExecutionException,让业务方感知。
捕获 InterruptedException 并恢复中断状态,避免影响后续任务。
其实这个原因和为什么需要恢复线程中断一样的逻辑,也是为了让调用方感知到

业务方如何处理异常?

如果 rejectedExecution 抛出 RejectedExecutionException,业务代码可以捕获异常并进行降级,例如:

try {
    executor.execute(task);
} catch (RejectedExecutionException e) {
    log.error("线程池已满,任务执行失败,进行降级处理", e);
    // 业务降级策略,例如:
    saveToDatabaseForLaterProcessing(task);
}

降级方案:如果线程池拒绝任务,可以存入 数据库、MQ 或 重试队列,避免任务丢失。

结论

🚀 必须重新抛出异常,否则:

  • 任务可能悄悄丢失,业务方无法感知。
  • 可能影响数据一致性(如支付、订单、日志处理)。
  • 业务代码无法主动补救(重试、降级等)。

最佳实践:

  • 成功放入队列 → 不抛异常
  • 任务无法处理 → 抛出 RejectedExecutionException,让调用方感知
    这样可以既保证任务不丢失,又确保调用方有能力处理拒绝任务!🔥

自定义拒绝策略put()方法?

其实默认拒绝策略是offer()方法是非阻塞的,也就是只要队列中的任务只要有,那就去创建子线程,直至触发拒绝策略
✅ 正确示例

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            System.out.println("队列已满,阻塞等待...");
            executor.getQueue().put(r);  // 阻塞等待队列有空位
            System.out.println("任务重新加入队列:" + r.toString());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RejectedExecutionException("任务提交失败,线程被中断", e);
        }
    }
}

http://www.kler.cn/a/540073.html

相关文章:

  • 使用Qt+opencv实现游戏辅助点击工具-以阴阳师为例
  • [创业之路-289]:《产品开发管理-方法.流程.工具 》-15- 需求管理 - 第1步:原始需求收集
  • 【设计模式】【行为型模式】职责链模式(Chain of Responsibility)
  • PySide(PyQT)的 QGraphicsScene 中检测回车键
  • QT修仙之路1-1--遇见QT
  • 前端开发架构师Prompt指令的最佳实践
  • 2025年度Python最新整理的免费股票数据API接口
  • 『Apisix进阶篇』结合Consul作服务发现实战演练
  • Golang Gin框架mqtt消费者
  • 【分布式理论7】分布式调用之:服务间的(RPC)远程调用
  • ffmpeg 常用命令
  • modbus tcp,modbus,tcp几种通信方式的区别
  • 子集II(力扣90)
  • 【Linux网络编程】之守护进程
  • 2025年面试运维经验分享
  • Elasticsearch操作--笔记
  • 安宝特方案 | AR眼镜:远程医疗的“时空折叠者”,如何为生命争夺每一分钟?
  • AJAX项目——数据管理平台
  • java-初识List
  • 如何通过PHP接入DeepSeek的API
  • DevOps 所需的行为
  • 速通DeepSeek 安装部署文档
  • MYSQL关联关系查询
  • STM32+Proteus+DS18B20数码管仿真实验
  • w200基于spring boot的个人博客系统的设计与实现
  • Logo语言的学习路线