当前位置: 首页 > article >正文

TraceId在线程池及@Async异步线程中如何传递

何时使用线程池

提起线程池相信大家都不陌生,什么情况下会考虑使用线程池呢?我总结了一下大概是这么几种情况

第一种情况:程序中有任务需要异步执行。这样不可避免的要分配线程去执行,如果这个异步任务执行的频次很低,那就不用做额外处理,每次新建线程也是可以的;如果这个异步任务执行的很频繁,那么每次都新建线程就会有问题,问题一是线程的频繁创建与销毁会浪费系统资源,问题二是对系统资源的消耗不可控,无限制的创建线程会导致服务崩溃。

第二种情况:异步执行任务可以提高程序运行效率。主逻辑执行到某一阶段后,有几段程序可以并行执行,这几段程序结束后再将结果汇总到一起,继续执行主逻辑,类似于总分总的步骤。那么这几个可以并行执行的逻辑,就可以分几个线程去异步执行,这样就节省了主线程的运行时间。这种情况下,因为都考虑使用异步执行来提升效率了,那这个业务必然也是个比较关键且执行频繁的业务,自然也得使用线程池将线程资源控制在可控范围内。

使用线程池是管理线程的一个很好实践,它可以将线程的数量控制在一个合理的范围内,不至于耗尽系统资源;它还可以复用线程,避免线程的重复创建与销毁;在这个池子满足不了突发流量时也可以设定兜底策略。

池化思想

其实线程池就是池化思想的一个落地场景,其他的像数据库连接池,Redis连接池,对象池等都是其落地场景。

池化思想是一种优化资源管理的技术,它主要用于解决频繁创建和销毁资源带来的性能开销和系统负担问题。池化技术通过重用已有的资源实例,而不是每次都创建新的资源,从而提高系统的效率和稳定性。总结下来就是两个优点

  1. 性能提升。通过复用资源,减少了创建和销毁资源的时间消耗,减少系统资源浪费,提高系统性能。
  2. 资源控制。池化机制能够控制资源的使用上限,防止系统因为资源过度分配而崩溃。

如何跟踪处理线程池执行任务时产生的异常

接下来回到本章的主线上,如果我们在项目中使用了线程池,那么该怎么跟踪和处理线程池执行任务时产生的异常呢?

线程提交任务的两种方式及其抛异常的策略

在提出解决方案前,先来重申下线程提交任务的两种方式,及其抛异常的策略

1. execute() 方式提交任务,这种方式主要用于不需要接收返回结果的情况(Runnable 接口),当某个任务发生异常后,线程池会将这个异常打印出来,日志记录中也能观测到这个异常

举例说明

private void methodExecute() {
    // 创建一个容量为1的线程池
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
    // 循环提交任务,在i=3时提交的任务会抛出异常
    for (int i = 0; i < 10; i++) {
        int finalI = i;
        threadPoolExecutor.execute(() -> {
            if( finalI == 3) {
                int val = 10/0;
            }
            System.out.println("任务执行完成 编号-" + finalI+", 执行线程:"+Thread.currentThread().getName());
        });
        // 为了更好的观测现象这里将每次提交任务的间隔设为1秒
        ThreadUtil.sleep(1,TimeUnit.SECONDS);
    }

    ThreadUtil.sleep(10,TimeUnit.SECONDS);
    System.out.println("main - finish");
}

执行日志

任务执行完成 编号-0, 执行线程:pool-1-thread-1
任务执行完成 编号-1, 执行线程:pool-1-thread-1
任务执行完成 编号-2, 执行线程:pool-1-thread-1
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
	at com.hml.threadtest.dealexception.DealException.lambda$methodExecute$0(DealException.java:47)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
任务执行完成 编号-4, 执行线程:pool-1-thread-2
任务执行完成 编号-5, 执行线程:pool-1-thread-2
任务执行完成 编号-6, 执行线程:pool-1-thread-2
任务执行完成 编号-7, 执行线程:pool-1-thread-2
任务执行完成 编号-8, 执行线程:pool-1-thread-2
任务执行完成 编号-9, 执行线程:pool-1-thread-2
main - finish

为什么异常日志会被打印出来呢?这是因为异常的任务会走到这个方法里 java.lang.ThreadGroup#uncaughtException

另外,可以看到,线程池中当某个任务执行发生异常且未被捕获时,该任务的执行线程也会被销毁,线程池会再创建一个新的线程来补充到核心线程中。这样的话,在极端情况下,如果提交到该线程池的任务都会抛出异常的话,那么线程池的线程就会频繁的销毁重建,失去了池化的意义。

2.submit() 方式提交任务,使用这个方式多是在需要接收返回结果的情况下(Callable 接口),当然 Runnable 接口定义的线程 submit() 方法也能接。当某个任务发生异常后,线程池不再打印异常,而是将异常包装起来,包装成 ExecutionException ,只有在 futrue.get() 方法调用时才会触发此异常。

举例说明

private void methodSubmit02() {
    // 创建一个线程数为1的线程池
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

    // 循环执行任务,i=3时会提交一个有异常的任务
    List<Future<Integer>> futureList = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        int finalI = i;
        Future<Integer> future = threadPoolExecutor.submit(() -> {
            if (finalI == 3) {
                int val = 10 / 0;
            }
            System.out.println("任务执行完成 编号-" + finalI+", 执行线程:"+Thread.currentThread().getName());
            return finalI;
        });
        futureList.add(future);
    }

    // 等待5秒,等上面10个任务都处理完
    ThreadUtil.sleep(5,TimeUnit.SECONDS);
    System.out.println("开始获取结果");
    for (int i = 0; i < futureList.size(); i++) {
        try {
            Integer result = futureList.get(i).get();
        } catch (InterruptedException | ExecutionException e) {
            System.out.println("编号"+i+"发生异常:"+ExceptionUtil.stacktraceToString(e));
        }
    }

    System.out.println("main - finish");
}

执行日志

任务执行完成 编号-0, 执行线程:pool-1-thread-1
任务执行完成 编号-1, 执行线程:pool-1-thread-1
任务执行完成 编号-2, 执行线程:pool-1-thread-1
任务执行完成 编号-4, 执行线程:pool-1-thread-1
任务执行完成 编号-5, 执行线程:pool-1-thread-1
任务执行完成 编号-6, 执行线程:pool-1-thread-1
任务执行完成 编号-7, 执行线程:pool-1-thread-1
任务执行完成 编号-8, 执行线程:pool-1-thread-1
任务执行完成 编号-9, 执行线程:pool-1-thread-1
开始获取结果
编号3发生异常:java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at com.hml.threadtest.dealexception.DealException.methodSubmit02(DealException.java:107)
	at com.hml.threadtest.dealexception.DealException.main(DealException.java:32)
Caused by: java.lang.ArithmeticException: / by zero
	at com.hml.threadtest.dealexception.DealException.lambda$methodSubmit02$2(DealException.java:95)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

main - finish

通过日志可以看出来,通过 submit 方法提交的任务在发生异常后,线程池会把异常吃掉,执行线程也不会被销毁,单从日志看不到任何异常,但是当调用 future.get() 方法时,异常会抛出来。

通过上面的两个案例,可以看到,excute 与 submit 两种方式提交的任务,线程池对它们产生异常的处理方式是不一样的,那怎么能将异常处理的方式统一起来呢?很简单粗暴的方式就是在每个任务的最外层加上 try catch 自行编码处理异常。

当然你可能说 还有 UncaughtExceptionHandler 和 afterExecute 的方式来处理异常,但是 UncaughtExceptionHandler 只能处理 excute 方式的产生的异常,afterExecute 方式倒是可以编码处理两种异常,但是在配合MDC使用时会存在 traceId 混乱的情况,不知道是不是我的程序问题。这里我采用的方式是自定义线程池+包装任务。

日志跟踪

当项目出现线上问题的时候,我们通常的做法就是

  1. 先定位异常日志
  2. 然后根据traceId跟踪执行链路判断问题原因

在使用SpringBoot的前提下,通常我们跟踪一个程序执行链路的方法就是使用MDC在执行线程中加入traceId, 然后在日志中打印出 traceId 。这种方式在大多数情况下没有问题,但是遇到异步逻辑时就会遇到 traceId传递不到异步线程中的情况。 网上的解决方法也有很多,大概有以下这么几种

1. 自定义 MDCAdapter,将 copyOnThreadLocal 这个属性 由 ThreadLocal 换为 TransmittableThreadLocal(阿里提供的包),并要配置MDC的监听器使这个 MDCAdapter 生效。 在创建线程池时要用 TransmittableThreadLocal 包提供的 TtlExecutors 包一下。

2. 使用MDC自带的方法,MDC.getCopyOfContextMap() 复制主线程的 MDC上下文,MDC.setContextMap() 将复制的主线程上下文赋值到到异步线程相关的MDC中。

这里我采用第二种方法,它不用引入额外的包。

自定义线程池+包装任务

采用第二种方法,在每次向线程池提交任务时都需要将 MDC的上下文复制一下并赋值给线程池中的执行线程,这里我们可以将这个逻辑提炼出来,封装一下,我采用的是自定义线程池的方法。

首先配置一下日志格式,加上traceId,简单点的话,可以直接在配置文件中配置

# 控制台日志输出格式
logging.pattern.console=%d{yyyy-MM-dd HH:mm:ss} [%thread] [traceId=%X{traceId}] %-5level %logger{36} - %msg%n

# 文件日志输出格式
logging.pattern.file=%d{yyyy-MM-dd HH:mm:ss} [%thread] [traceId=%X{traceId}] %-5level %logger{36} - %msg%n

然后自定义一个类 CustomThreadPoolExecutor 继承了 ThreadPoolTaskExecutor。重写了execute 和 submit 方法,在执行父类的原逻辑前,自己包装了一下提交的任务(wrap 方法),加入了复制 MDC上下文的逻辑。

这里简单说下 ThreadPoolTaskExecutor ,这是Spring 提供的一个创建线程池的类,内部封装了 JUC包里ThreadPoolExecutor ,用起来的感觉和 ThreadPoolExecutor 差不多,但是它有一个独特的方法就是  submitListenable ,返回类型是 ListenableFuture ,它是 Future 的扩展,可以注册回调函数来监听任务的执行结果。

比如,可以这样用

ListenableFuture<Integer> listenableFuture = taskExecutor.submitListenable(() -> {
    Thread.sleep(1000);
    return 42;
});

listenableFuture.addCallback(result -> {
    System.out.println("Task completed successfully with result: " + result);
}, ex -> {
    System.err.println("Task failed with exception: " + ex.getMessage());
});

这种不用阻塞等待返回结果的用法可能在一些场景下会有用。

我这里自定义线程池是继承了 ThreadPoolTaskExecutor,如果继承JUC里的ThreadPoolExecutor也是没有任何问题的。

回到我们的 CustomThreadPoolExecutor 类上,下面是代码

import cn.hutool.core.exceptions.ExceptionUtil;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;


@Slf4j
public class CustomThreadPoolExecutor extends ThreadPoolTaskExecutor {


    @Override
    public void execute(Runnable task) {
        super.execute(wrap(task));
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        super.execute(wrap(task), startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(wrap(task));
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(wrap(task));
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        return super.submitListenable(wrap(task));
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        return super.submitListenable(wrap(task));
    }

    @Override
    protected void cancelRemainingTask(Runnable task) {
        super.cancelRemainingTask(task);
    }

    private Runnable wrap(Runnable runnable) {
        Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();
        return () -> {
            MDC.setContextMap(copyOfContextMap);
            try {
                runnable.run();
            } catch (Exception e) {
                log.error("have error: " + ExceptionUtil.stacktraceToString(e));
            } finally {
                MDC.clear();
            }
        };
    }

    private <T> Callable<T> wrap(Callable<T> callable) {
        Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();
        return () -> {
            MDC.setContextMap(copyOfContextMap);
            try {
                return callable.call();
            } catch (Exception e) {
                log.error("have error: " + ExceptionUtil.stacktraceToString(e));
                // 后续的 future.get() 我们仍然期望能获取到异常,所以这里将异常抛出,并记录下这个异常,方便traceId跟踪日志
                throw e;
            } finally {
                MDC.clear();
            }
        };
    }


}

定义好了这个线程池类,接下来项目中直接用这个类来创建线程池就好,其他的逻辑不需要变动。

来看两个代码示例

第一个示例,使用 excute 方式提交任务
@SpringBootTest
@Slf4j
public class ThreadTests {
    @BeforeEach
    void initCustomThreadPool() {
        // 初始化线程池
        customThreadPoolExecutor = new CustomThreadPoolExecutor();
        customThreadPoolExecutor.setCorePoolSize(3);
        customThreadPoolExecutor.setMaxPoolSize(3);
        customThreadPoolExecutor.setKeepAliveSeconds(60);
        customThreadPoolExecutor.setQueueCapacity(100);
        customThreadPoolExecutor.setAllowCoreThreadTimeOut(false);
        customThreadPoolExecutor.setThreadNamePrefix("custom-thread-");
        customThreadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        customThreadPoolExecutor.setWaitForTasksToCompleteOnShutdown(true);
        customThreadPoolExecutor.initialize();
    }

    @Test
    void testCustomSpringThreadPollExecute() {

        // 创建 n 个任务,每个任务都有自己的traceId
        for (int i = 0; i < 5; i++) {
            int finalI = i;
            String uuid = UUID.randomUUID().toString(true);
            MDC.put("traceId", uuid);
            log.info("准备执行任务,编号-" + finalI);

            customThreadPoolExecutor.execute(() -> {
                if (finalI == 3) {
                    int val = 10 / 0;
                }
                log.info("任务执行完成 编号-" + finalI);
            });

            MDC.clear();
        }

        // 阻塞主线程,不让它结束
        try {
            System.in.read();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

    }

}

执行日志

2024-09-23 18:08:02 [main] [traceId=e2decaf36cf44541993b0fb32504e296] INFO  c.h.t.ThreadTestApplicationTests - 准备执行任务,编号-0
2024-09-23 18:08:02 [main] [traceId=ddb2895605254f20b66a57ed4ec6655b] INFO  c.h.t.ThreadTestApplicationTests - 准备执行任务,编号-1
2024-09-23 18:08:02 [main] [traceId=180d676f893f42409ed0b452e3afdbfa] INFO  c.h.t.ThreadTestApplicationTests - 准备执行任务,编号-2
2024-09-23 18:08:02 [custom-thread-1] [traceId=e2decaf36cf44541993b0fb32504e296] INFO  c.h.t.ThreadTestApplicationTests - 任务执行完成 编号-0
2024-09-23 18:08:02 [main] [traceId=80f97f6cfee049deba6548c6d40fe755] INFO  c.h.t.ThreadTestApplicationTests - 准备执行任务,编号-3
2024-09-23 18:08:02 [main] [traceId=c13df8bf1405416199664beaadfd9446] INFO  c.h.t.ThreadTestApplicationTests - 准备执行任务,编号-4
2024-09-23 18:08:02 [custom-thread-2] [traceId=ddb2895605254f20b66a57ed4ec6655b] INFO  c.h.t.ThreadTestApplicationTests - 任务执行完成 编号-1
2024-09-23 18:08:02 [custom-thread-3] [traceId=180d676f893f42409ed0b452e3afdbfa] INFO  c.h.t.ThreadTestApplicationTests - 任务执行完成 编号-2
2024-09-23 18:08:02 [custom-thread-2] [traceId=c13df8bf1405416199664beaadfd9446] INFO  c.h.t.ThreadTestApplicationTests - 任务执行完成 编号-4
2024-09-23 18:08:02 [custom-thread-1] [traceId=80f97f6cfee049deba6548c6d40fe755] ERROR c.h.t.c.CustomThreadPoolExecutor - have error: java.lang.ArithmeticException: / by zero
	at com.hml.threadtest.ThreadTestApplicationTests.lambda$testCustomSpringThreadPollExecute$2(ThreadTestApplicationTests.java:177)
	at com.hml.threadtest.customthreadpool.CustomThreadPoolExecutor.lambda$wrap$0(CustomThreadPoolExecutor.java:62)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

可以看到线程池内部的线程也能正确打印出执行任务的traceId了,并且异常的任务也被捕获到了,日志中也带着traceId, 这下就很容易根据 traceId 跟踪问题了。

第二个示例,使用 submit 方式提交任务
@Test
void testCustomSpringThreadPollSubmitResult() {

    List<Future<Integer>> futureList = new ArrayList<>();

    // 创建 n 个任务,每个任务都有自己的traceId
    for (int i = 0; i < 5; i++) {
        int finalI = i;
        String uuid = UUID.randomUUID().toString(true);
        MDC.put("traceId", uuid);
        log.info("准备执行任务,编号-" + finalI);

        Future<Integer> future = customThreadPoolExecutor.submit(() -> {
            if (finalI == 3) {
                int val = 10 / 0;
            }
            log.info("任务执行完成 编号-" + finalI);
            return finalI;
        });
        futureList.add(future);

        MDC.clear();
    }

    // 等任务执行完,方便看日志
    ThreadUtil.sleep(5,TimeUnit.SECONDS);
    for (Future<Integer> future : futureList) {
        try {
            Integer result = future.get();
            log.info("receive result -> " + result);
        } catch (Exception e) {
            log.error("future.get() get error -> " + ExceptionUtil.stacktraceToString(e));
        }
    }

}

执行日志

2024-09-23 18:16:27 [main] [traceId=157907dd4bb1413f98b7781e29b12d9b] INFO  c.h.t.ThreadTestApplicationTests - 准备执行任务,编号-0
2024-09-23 18:16:27 [custom-thread-1] [traceId=157907dd4bb1413f98b7781e29b12d9b] INFO  c.h.t.ThreadTestApplicationTests - 任务执行完成 编号-0
2024-09-23 18:16:27 [main] [traceId=bc732a731db148f6b65e2759c42bc828] INFO  c.h.t.ThreadTestApplicationTests - 准备执行任务,编号-1
2024-09-23 18:16:27 [main] [traceId=72c57baf07a24a1083f8fbbbf282814a] INFO  c.h.t.ThreadTestApplicationTests - 准备执行任务,编号-2
2024-09-23 18:16:27 [custom-thread-2] [traceId=bc732a731db148f6b65e2759c42bc828] INFO  c.h.t.ThreadTestApplicationTests - 任务执行完成 编号-1
2024-09-23 18:16:27 [custom-thread-3] [traceId=72c57baf07a24a1083f8fbbbf282814a] INFO  c.h.t.ThreadTestApplicationTests - 任务执行完成 编号-2
2024-09-23 18:16:27 [main] [traceId=436fa1b93aa84fd996ffc41e461144a3] INFO  c.h.t.ThreadTestApplicationTests - 准备执行任务,编号-3
2024-09-23 18:16:27 [main] [traceId=0bf0e109e35a4fa887522737e0377051] INFO  c.h.t.ThreadTestApplicationTests - 准备执行任务,编号-4
2024-09-23 18:16:27 [custom-thread-2] [traceId=0bf0e109e35a4fa887522737e0377051] INFO  c.h.t.ThreadTestApplicationTests - 任务执行完成 编号-4
2024-09-23 18:16:27 [custom-thread-1] [traceId=436fa1b93aa84fd996ffc41e461144a3] ERROR c.h.t.c.CustomThreadPoolExecutor - have error: java.lang.ArithmeticException: / by zero
	at com.hml.threadtest.ThreadTestApplicationTests.lambda$testCustomSpringThreadPollSubmitResult$4(ThreadTestApplicationTests.java:237)
	at com.hml.threadtest.customthreadpool.CustomThreadPoolExecutor.lambda$wrap$1(CustomThreadPoolExecutor.java:76)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

2024-09-23 18:16:32 [main] [traceId=] INFO  c.h.t.ThreadTestApplicationTests - receive result -> 0
2024-09-23 18:16:32 [main] [traceId=] INFO  c.h.t.ThreadTestApplicationTests - receive result -> 1
2024-09-23 18:16:32 [main] [traceId=] INFO  c.h.t.ThreadTestApplicationTests - receive result -> 2
2024-09-23 18:16:32 [main] [traceId=] ERROR c.h.t.ThreadTestApplicationTests - future.get() get error -> java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at com.hml.threadtest.ThreadTestApplicationTests.testCustomSpringThreadPollSubmitResult(ThreadTestApplicationTests.java:251)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(
2024-09-23 18:16:32 [main] [traceId=] INFO  c.h.t.ThreadTestApplicationTests - receive result -> 4

可以看到 submit 提交的任务,如果发生异常,也会将异常日志打印出来了,并且还附带了 traceId,由于 CustomThreadPoolExecutor 中并没有吃掉这个异常,仍然抛了出来,所以 futrue.get() 方法仍然能获取到异常,以便主线程中进行进一步的处理。

看,在不引入额外jar包的情况下,我们仅仅通过自定义了一个线程池类,就将 traceId 在线程池中的传递问题解决了。

traceId在@Async异步线程中的传递

@Async 是 Spring 的一个注解,如果将它标注在一个方法上,代表这个方法会异步执行。默认情况下 @Async 搭配的异步执行器是 SimpleAsyncTaskExecutor,它持有一个 ThreadPoolTaskExecutor 线程池对象, 该线程池的配置信息在TaskExecutionProperties类中, 如下图所示(Pool是TaskExecutionProperties的内部类),核心线程数是8,最大线程数是int 最大值。

如果你想用该默认线程池,但是想调整配置的话,可以在项目的配置文件中设置,如下所示

spring.task.execution.pool.core-size=8
spring.task.execution.pool.max-size=10
spring.task.execution.pool.keep-alive=60s
spring.task.execution.pool.allow-core-thread-timeout=false
spring.task.execution.pool.queue-capacity=100

这是最普通的 ThreadPoolTaskExecutor 线程池,traceId 是传递不进来的,要想使 @Async 也能传递 traceId, 那就得用我们自定义的线程池 CustomThreadPoolExecutor, 下面给出配置类来替换线程池

新建配置类AsyncThreadPollConfig,实现AsyncConfigurer接口,重写getAsyncExecutor() 方法来替换 @Async 默认对应的线程池,另外也提供了一个自定义的可选备用线程池 customThreadPoll。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class AsyncThreadPollConfig implements AsyncConfigurer {


    @Bean
    public CustomThreadPoolExecutor defaultThreadPoll(){
        CustomThreadPoolExecutor customThreadPoolExecutor = new CustomThreadPoolExecutor();
        customThreadPoolExecutor.setCorePoolSize(10);
        customThreadPoolExecutor.setMaxPoolSize(15);
        customThreadPoolExecutor.setKeepAliveSeconds(60);
        customThreadPoolExecutor.setQueueCapacity(100);
        customThreadPoolExecutor.setAllowCoreThreadTimeOut(false);
        customThreadPoolExecutor.setThreadNamePrefix("default-async-custom-thread-");
        customThreadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        customThreadPoolExecutor.setWaitForTasksToCompleteOnShutdown(true);
        customThreadPoolExecutor.initialize();
        return customThreadPoolExecutor;
    }

    @Bean
    public CustomThreadPoolExecutor customThreadPoll(){
        CustomThreadPoolExecutor customThreadPoolExecutor = new CustomThreadPoolExecutor();
        customThreadPoolExecutor.setCorePoolSize(20);
        customThreadPoolExecutor.setMaxPoolSize(40);
        customThreadPoolExecutor.setKeepAliveSeconds(60);
        customThreadPoolExecutor.setQueueCapacity(100);
        customThreadPoolExecutor.setAllowCoreThreadTimeOut(false);
        customThreadPoolExecutor.setThreadNamePrefix("custom-async-custom-thread-");
        customThreadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        customThreadPoolExecutor.setWaitForTasksToCompleteOnShutdown(true);
        customThreadPoolExecutor.initialize();
        return customThreadPoolExecutor;
    }

    /**
     *  @Async 注解默认使用的线程池
     */
    @Override
    public Executor getAsyncExecutor() {
        return defaultThreadPoll();
    }
}

来个测试案例,看看这个配置类生效没

定义一个service类,包含一个异步方法,这里我们使用 @Async默认的线程池

@Service
@Slf4j
public class AsyncService {

    @Async
    public void exe(Integer num) {
        log.info("get num -> "+num);
    }
}

测试方法

@SpringBootTest
@Slf4j
public class ThreadTest {

    @Autowired
    private AsyncService asyncService;

    @Test
    void testAsync() {

        for (int i = 0; i < 3; i++) {
            MDC.put(traceId, UUID.randomUUID().toString(true));
            log.info("start exe num -> " + i);
            asyncService.exe(i);
            MDC.clear();
        }

        try {
            System.in.read();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        
    }
}

执行结果

2024-09-24 10:59:52 [main] [traceId=ac6897c2d6fe4fcba381b7abb482eaeb] INFO  c.h.t.ThreadTestApplicationTests - start exe num -> 0
2024-09-24 10:59:52 [main] [traceId=cb072d9147604481b70d629e164181da] INFO  c.h.t.ThreadTestApplicationTests - start exe num -> 1
2024-09-24 10:59:52 [main] [traceId=0bc2ce62854b47689f59f6c77f8e465c] INFO  c.h.t.ThreadTestApplicationTests - start exe num -> 2
2024-09-24 10:59:52 [default-async-custom-thread-1] [traceId=ac6897c2d6fe4fcba381b7abb482eaeb] INFO  com.hml.threadtest.AsyncService - get num -> 0
2024-09-24 10:59:52 [default-async-custom-thread-3] [traceId=0bc2ce62854b47689f59f6c77f8e465c] INFO  com.hml.threadtest.AsyncService - get num -> 2
2024-09-24 10:59:52 [default-async-custom-thread-2] [traceId=cb072d9147604481b70d629e164181da] INFO  com.hml.threadtest.AsyncService - get num -> 1

可以看到我们配置的默认线程池生效了,traceId 也正常传递了。

再来个测试案例,这次我们指定一下 @Async 要使用的线程池,给 @Async 加上 value 值即可,customThreadPoll 就是我们上面在 AsyncThreadPollConfig 配置类中提前定义好的 备选线程池。

@Service
@Slf4j
public class AsyncService {

    @Async("customThreadPoll")
    public void exe(Integer num) {
        log.info("get num -> "+num);
    }

}

testAsync测试方法不用动,再次执行以下,执行结果如下

2024-09-24 11:58:01 [main] [traceId=1e78ddaa36164e0ca0a92ffd803deea6] INFO  c.h.t.ThreadTestApplicationTests - start exe num -> 0
2024-09-24 11:58:01 [main] [traceId=4e75eb99fd174562bfe6ea4ec0e2d754] INFO  c.h.t.ThreadTestApplicationTests - start exe num -> 1
2024-09-24 11:58:01 [main] [traceId=24a3b77c2ae346d18b5bbb91b97919cf] INFO  c.h.t.ThreadTestApplicationTests - start exe num -> 2
2024-09-24 11:58:01 [custom-async-custom-thread-1] [traceId=1e78ddaa36164e0ca0a92ffd803deea6] INFO  com.hml.threadtest.AsyncService - get num -> 0
2024-09-24 11:58:01 [custom-async-custom-thread-3] [traceId=24a3b77c2ae346d18b5bbb91b97919cf] INFO  com.hml.threadtest.AsyncService - get num -> 2
2024-09-24 11:58:01 [custom-async-custom-thread-2] [traceId=4e75eb99fd174562bfe6ea4ec0e2d754] INFO  com.hml.threadtest.AsyncService - get num -> 1

可以看到,@Async 对应的线程池切换成功。

TransmittableThreadLocal

抛开日志不谈,如果我们有其他需求需要在 ThreadLocal 中存放一些内容,并且有可能会往子线程或者线程池中传递下去的,那么采用 TransmittableThreadLocal 是一个不错的选择,TransmittableThreadLocal 类继承了 InheritableThreadLocal ,它继承了 InheritableThreadLocal 可以往子线程中传递的特性,同时也扩展了往线程池中传递这个特性,不过线程池需要用 TtlExecutors 修饰一下。

下面举例说明一下,创建一个测试方法,测试子线程传递与线程池传递两种情况

@Test
void testThread() throws IOException {
    // 创建一个TransmittableThreadLocal
    TransmittableThreadLocal<Integer> threadLocal = new TransmittableThreadLocal<>();

    // 1.现在测试 TransmittableThreadLocal 在子线程中传递的情况
    threadLocal.set(456123);
    log.info("main thread threadLocal value -> {}", threadLocal.get());
    new Thread(() -> {
        log.info("son thread threadLocal value -> {}", threadLocal.get());
    }).start();
    threadLocal.remove();

    // 2.现在测试 TransmittableThreadLocal 在线程池中传递的情况
    // 创建一个容量为1的线程池
    ExecutorService threadPoolExecutor = new ThreadPoolExecutor(1,
            1,
            60,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(100));

    // 用TtlExecutors修饰线程池
    threadPoolExecutor = TtlExecutors.getTtlExecutorService(threadPoolExecutor);

    // 创建10个任务,让线程池去执行,看看该线程池中的这个线程是否能正常切换 threadLocal 上下文
    for (int i = 0; i < 10; i++) {
        threadLocal.set(i);
        threadPoolExecutor.execute(() -> {
            log.info(" thread pool threadLocal value -> {} ", threadLocal.get());
            threadLocal.remove();
        });
        threadLocal.remove();
    }

    System.in.read();
}

执行日志

2024-09-24 16:44:28 [main] [traceId=] INFO  c.h.t.ThreadTestApplicationTests - main thread threadLocal value -> 456123
2024-09-24 16:44:28 [Thread-3] [traceId=] INFO  c.h.t.ThreadTestApplicationTests - son thread threadLocal value -> 456123
2024-09-24 16:44:28 [pool-1-thread-1] [traceId=] INFO  c.h.t.ThreadTestApplicationTests -  thread pool threadLocal value -> 0 
2024-09-24 16:44:28 [pool-1-thread-1] [traceId=] INFO  c.h.t.ThreadTestApplicationTests -  thread pool threadLocal value -> 1 
2024-09-24 16:44:28 [pool-1-thread-1] [traceId=] INFO  c.h.t.ThreadTestApplicationTests -  thread pool threadLocal value -> 2 
2024-09-24 16:44:28 [pool-1-thread-1] [traceId=] INFO  c.h.t.ThreadTestApplicationTests -  thread pool threadLocal value -> 3 
2024-09-24 16:44:28 [pool-1-thread-1] [traceId=] INFO  c.h.t.ThreadTestApplicationTests -  thread pool threadLocal value -> 4 
2024-09-24 16:44:28 [pool-1-thread-1] [traceId=] INFO  c.h.t.ThreadTestApplicationTests -  thread pool threadLocal value -> 5 
2024-09-24 16:44:28 [pool-1-thread-1] [traceId=] INFO  c.h.t.ThreadTestApplicationTests -  thread pool threadLocal value -> 6 
2024-09-24 16:44:28 [pool-1-thread-1] [traceId=] INFO  c.h.t.ThreadTestApplicationTests -  thread pool threadLocal value -> 7 
2024-09-24 16:44:28 [pool-1-thread-1] [traceId=] INFO  c.h.t.ThreadTestApplicationTests -  thread pool threadLocal value -> 8 
2024-09-24 16:44:28 [pool-1-thread-1] [traceId=] INFO  c.h.t.ThreadTestApplicationTests -  thread pool threadLocal value -> 9 

可以看到,两种情况都OK。

文末提醒一下大家,无论是直接使用 ThreadLocal ,还是使用它的衍生产品如 MDC,TransmittableThreadLocal 等,在使用结束后都记得清理一下,如 MDC.clear();  threadLocal.remove(); 等方法, 否则的话很容易会出现内存泄漏的情况。


http://www.kler.cn/a/318745.html

相关文章:

  • git没有识别出大写字母改成小写重命名的文件目录
  • Matplotlib库中show()函数的用法
  • 穿越数据迷宫:C++哈希表的奇幻旅程
  • [Linux] Linux信号捕捉
  • git下载慢下载不了?Git国内国外下载地址镜像,git安装视频教程
  • C++ 并发专题 - 自旋锁的实现(Spinlock)
  • 低代码门户技术:构建高效应用的全新方式
  • Linux之实战命令10:htop应用实例(四十四)
  • 【中台设计】数字中台,大数据中台解决方案,中台建设指南(资料Word分享)
  • 聊天组件 Vue3-beautiful-chat 插槽
  • Golang | Leetcode Golang题解之第424题替换后的最长重复字符
  • 网安面试题1
  • Pygame中Sprite实现逃亡游戏2
  • 基础容器.
  • ECMAScript与JavaScript的区别
  • MicroPython 怎么搭建工程代码
  • 面试场景题
  • Vue3 中集成海康 H5 监控视频播放功能
  • centos安装python3.10教程
  • Unity DOTS系列之Aspect核心机制分析
  • FileLink跨网文件传输 | 跨越网络边界的利器,文件传输不再受限
  • mysqli_fetch_object() 和 mysqli_fetch_array() 函数的区别
  • 《解锁高效流程设计:深度剖析责任链模式与实战应用》
  • MySQL 的认证插件
  • android 15 Adapter TextView中英文差异 高度不一致
  • 2024云手机推荐与排行:怎样选择最适合的云手机?