记一次ThreadPoolTaskExecutor的坑
起因:
开发环境一切正常
部署到UAT环境后,项目中使用@Async修饰的方法没有执行。
临时解决方法:
先去掉该注解改成同步执行。
问题排查过程:
1.创建一个测试controller,用于观察线程池情况
package org.example.controller;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.example.service.MyTestService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
@RequestMapping(value = "/test")
@RestController
public class MyTestController {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private MyTestService myTestService;
@GetMapping({"/threadPools"})
public JSONObject threadPools(){
Map<String, ThreadPoolTaskExecutor> threadMap = applicationContext.getBeansOfType(ThreadPoolTaskExecutor.class);
String json = JSON.toJSONString(threadMap);
JSONObject jsonObject = JSON.parseObject(json);
return jsonObject;
}
@GetMapping(value = "/test1")
public String test1(){
myTestService.test1();
return "ok";
}
}
package org.example.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class MyTestService {
@Async//("taskExecutor")
public void test1(){
log.info("test1");
}
}
浏览器方法该接口:http://localhost:8080/test/threadPools
{
"taskExecutor": {
"activeCount": 3,
"threadNamePrefix": "taskExecutor-",
"poolSize": 3,
"threadPoolExecutor": {
"activeCount": 3,
"threadFactory": {
"$ref": "$.taskExecutor"
},
"largestPoolSize": 3,
"poolSize": 3,
"taskCount": 3,
"rejectedExecutionHandler": {},
"corePoolSize": 3,
"completedTaskCount": 0,
"terminating": false,
"maximumPoolSize": 5,
"queue": [],
"shutdown": false,
"terminated": false
},
"corePoolSize": 3,
"threadPriority": 5,
"maxPoolSize": 5,
"keepAliveSeconds": 60,
"daemon": false
}
}
再调用test1接口把普通任务提交到该线程池:http://localhost:8080/test/test1
再观察线程池情况:
{
"taskExecutor": {
"activeCount": 3,
"threadNamePrefix": "taskExecutor-",
"poolSize": 3,
"threadPoolExecutor": {
"activeCount": 3,
"threadFactory": {
"$ref": "$.taskExecutor"
},
"largestPoolSize": 3,
"poolSize": 3,
"taskCount": 5,
"rejectedExecutionHandler": {},
"corePoolSize": 3,
"completedTaskCount": 0,
"terminating": false,
"maximumPoolSize": 5,
"queue": [
{
"cancelled": false,
"done": false
},
{
"cancelled": false,
"done": false
}
],
"shutdown": false,
"terminated": false
},
"corePoolSize": 3,
"threadPriority": 5,
"maxPoolSize": 5,
"keepAliveSeconds": 60,
"daemon": false
}
}
发现等待队列queue节点多了几个,且activeCount一直保持3,而corePoolSize刚好也是3。
此时有理由怀疑,有3个任务没有结束,导致新的任务只能放在等待队列,因此没有执行新任务。
通过本地debug发现,系统启动后,公司的框架代码会往默认线程池里提交3个任务,而这3个任务都是while(true)循环。
package org.example.event;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.springframework.context.ApplicationEvent;
@Getter
@Setter
@ToString
public class MyEvent extends ApplicationEvent {
public MyEvent(Object source) {
super(source);
}
}
package org.example.init;
import org.example.event.MyEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class Initor {
@Autowired
private ApplicationContext applicationContext;
@PostConstruct
private void init(){
applicationContext.publishEvent(new MyEvent("这是自定义事件1"));
applicationContext.publishEvent(new MyEvent("这是自定义事件2"));
applicationContext.publishEvent(new MyEvent("这是自定义事件3"));
}
}
package org.example.listener;
import lombok.extern.slf4j.Slf4j;
import org.example.event.MyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class MyListener implements ApplicationListener<MyEvent>{
@Async//("taskExecutor")
@Override
public void onApplicationEvent(MyEvent myEvent) {
log.info("myEvent:{}",myEvent);
log.info("开始死循环");
//3个死循环进入默认线程池,而默认线程池核心线程数是3,后续加入的任务都只能放在等待队列,永远没机会执行。
while (true){
try {
Thread.sleep(10000);
log.info("myEvent");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
通过该线程池名称,搜索到该线程池的配置类:
package org.example.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
@Configuration
public class ThreadPoolConfig {
@Bean//("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程数
executor.setCorePoolSize(3);
// executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
//最大线程数
executor.setMaxPoolSize(5);
//队列容量
executor.setQueueCapacity(1000);
//线程活跃时间(秒)
executor.setKeepAliveSeconds(60);
//默认线程名称
executor.setThreadNamePrefix("taskExecutor-");
//拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// executor.initialize();不需要这句代码,因为ThreadPoolTaskExecutor实现了InitializingBean接口,其afterPropertiesSet方法会调用initialize()。
return executor;
}
}
设置线程池的核心线程数使用了系统核心数,在UAT环境刚好是3,而其他环境大于3,这就是到了UAT环境突然有问题的原因了。
通过深入debug,发现spring获取默认线程池,是通过beanName为“taskExecutor”来查找的,而上面自定义线程池, @Bean注解没有指定名称,则取方法名“taskExecutor”,而该名称,碰巧是spring默认线程池的名称,导致该自定义线程池覆盖了spring的默认线程池,从而使用@Async(没有指定名称)都用的该线程池。
最终解决方案可以有2个:
1.把该自定义线程池换个名字,不要跟spring默认线程池名称一样。
2.把该自定义线程池的核心线程数改大点,起码要超过3个。