Spring扩展点实战-动态线程池
Spring扩展点实战-动态线程池
- 功能需求
- 代码实现
功能需求
1. 根据配置的参数,动态创建线程池
2. 将动态线程池bean交给Spring管理
3. 后续使用线程池可以从Spring容器获取线程bean使用
4. 后续修改可以直接对线程池的bean修改
5. 监控达到阈值告警
代码实现
利用Spring的BeanDefinition实现系统自动读取配置,注册线程池Bean。这里选择实现BeanDefinitionRegistryPostProcessor接口(也可是选择实现ImportBeanDefinitionRegistrar接口)。
系统的配置信息会被读取到environment中,因此通过实现EnvironmentAware来获取配置信息。
配置信息如下:
spring:
dtp:
executors:
#线程池1
- poolName: dtpExecutor1
corePoolSize: 5
maximumPoolSize: 10
#线程池2
- poolName: dtpExecutor2
corePoolSize: 2
maximumPoolSize: 15
定义接收线程池参数的对象
@Data
public class ThreadPoolProperties {
private String poolName;
private int corePoolSize;
private int maximumPoolSize;
private long keepAliveTime = 10;
private int queueSize = 5;
}
@Data
public class DtpProperties {
private List<ThreadPoolProperties> executors;
}
自定义线程池类
public class DtpThreadPoolExecutor extends ThreadPoolExecutor {
public DtpThreadPoolExecutor(ThreadPoolProperties properties) {
super(properties.getCorePoolSize(), properties.getMaximumPoolSize(), properties.getKeepAliveTime(), TimeUnit.SECONDS, new ArrayBlockingQueue<>(properties.getQueueSize()));
}
}
根据配置信息注册BeanDefinition
public class DtpBeanDefinitionRegistrar implements BeanDefinitionRegistryPostProcessor, EnvironmentAware {
private Environment environment;
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
BindResult<DtpProperties> bindResult = Binder.get(environment).bind("spring.dtp", DtpProperties.class);
DtpProperties dtpProperties = bindResult.get();
List<ThreadPoolProperties> executors = dtpProperties.getExecutors();
if(Objects.isNull(executors)) {
System.out.println("无动态线程池配置文件");
return;
}
for(ThreadPoolProperties properties : executors) {
BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(DtpThreadPoolExecutor.class);
beanDefinitionBuilder.addConstructorArgValue(properties);
registry.registerBeanDefinition(properties.getPoolName(), beanDefinitionBuilder.getBeanDefinition());
}
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
}
@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}
}
现在可以使用就可以使用自定义的线程池
//使用内部线程池
@GetMapping("/placeOrder")
public String placeOrder() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3));
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println("内部线程池下单...");
}
});
return "success";
}
@Autowired
private DtpThreadPoolExecutor dtpExecutor1;
//使用自定义线程池
@GetMapping("/placeOrder1")
public String placeOrder1() {
dtpExecutor1.execute(new Runnable() {
@Override
public void run() {
System.out.println("动态线程池下单...");
}
});
return "success";
}
我们需要对线程池的线程有一些管理,所以建一个线程池的工具类,管理所有线程。那么就需要在线程池Bean创建完成后进行处理。这里选择实现BeanPostProcessor。
线程池工具类:
public class DtpRegistrarUtil {
private static final Map<String, DtpThreadPoolExecutor> dtpExecutorMap = new ConcurrentHashMap<>();
/**
* 添加线程
* @param key
* @param value
*/
public static void registerExecutor(String key, DtpThreadPoolExecutor value) {
dtpExecutorMap.put(key, value);
}
/**
* 获取线程
* @param name
* @return
*/
public static DtpThreadPoolExecutor getExecutor(String name) {
return dtpExecutorMap.get(name);
}
/**
* 修改线程参数
* @param name
* @param properties
*/
public static void refresh(String name, ThreadPoolProperties properties) {
DtpThreadPoolExecutor dtpThreadPoolExecutor = dtpExecutorMap.get(name);
dtpThreadPoolExecutor.setCorePoolSize(properties.getCorePoolSize());
dtpThreadPoolExecutor.setMaximumPoolSize(properties.getMaximumPoolSize());
// ...
}
/**
* 获取所有线程池的名称
* @return
*/
public static List<String> getAllExecutorNames() {
return dtpExecutorMap.keySet().stream().toList();
}
}
线程池设置到工具类
@Component
public class DtpThreadRegistrarProcessor implements BeanPostProcessor {
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if(bean instanceof DtpThreadPoolExecutor) {
DtpRegistrarUtil.registerExecutor(beanName, (DtpThreadPoolExecutor)bean);
}
return bean;
}
}
利用工具类对线程池参数进行实时修改。这里通过事件发布来实现解耦。
自定义事件
public class DtpEvent extends ApplicationEvent {
private ThreadPoolProperties properties;
public DtpEvent(ThreadPoolProperties properties) {
super(properties);
this.properties = properties;
}
public ThreadPoolProperties getProperties() {
return properties;
}
事件监听器
@Component
public class DtpEventListener implements ApplicationListener<DtpEvent> {
@Override
public void onApplicationEvent(DtpEvent event) {
System.out.println(event);
ThreadPoolProperties properties = event.getProperties();
DtpRegistrarUtil.refresh(properties.getPoolName(), properties);
}
}
我们想要对线程池的实时状态有感知,使用这个工具类对线程池状态进行查看和预警。定时输出线程池状态需要用到定时任务。定时任务可以在容器启动时启动,容器关闭时关闭。
线程池监控和预警定时任务
@Component
public class DtpMonitor implements SmartLifecycle {
private ScheduledFuture<?> scheduledFuture;
private boolean isRunning;
@Override
public void start() {
scheduledFuture = Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
monitor();
alarm();
}
}, 5, 5, TimeUnit.SECONDS);
isRunning = true;
}
@Override
public void stop() {
scheduledFuture.cancel(false);
isRunning = false;
System.out.println("线程池定时任务停止");
}
@Override
public boolean isRunning() {
return isRunning;
}
private void monitor() {
List<String> allExecutorNames = DtpRegistrarUtil.getAllExecutorNames();
for(String name : allExecutorNames) {
DtpThreadPoolExecutor executor = DtpRegistrarUtil.getExecutor(name);
System.out.println("线程池名称:"+name);
System.out.println("线程池核心线程数:"+executor.getCorePoolSize());
System.out.println("线程池最大线程数:"+executor.getMaximumPoolSize());
}
}
private void alarm() {
// 配置预警值
int max = 1;
List<String> allExecutorNames = DtpRegistrarUtil.getAllExecutorNames();
for(String name : allExecutorNames) {
DtpThreadPoolExecutor executor = DtpRegistrarUtil.getExecutor(name);
if(executor.getActiveCount() > max) {
System.out.println("线程池"+name+"告警!");
}
}
}
}
现在动态线程池的基本功能已经完成。我们想要在其他项目中更方便的接入这个功能,比如通过一个注解。再利用@Import的导入功能,将这些Bean导入,无需在Bean加@Component。
定义注解以及导入配置类
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Import(DtpImportSelector.class)
public @interface EnableDtpThreadPoolExecutor {
}
public class DtpImportSelector implements ImportSelector {
@Override
public String[] selectImports(AnnotationMetadata importingClassMetadata) {
return new String[] {DtpBeanDefinitionRegistrar.class.getName(),
DtpMonitor.class.getName(),
DtpThreadRegistrarProcessor.class.getName(),
DtpEventListener.class.getName()};
}
}
只需在启动类上加这个注解即可接入功能
@SpringBootApplication
@EnableDtpThreadPoolExecutor
public class DynamicThreadPoolApplication {
public static void main(String[] args) {
SpringApplication.run(DynamicThreadPoolApplication.class, args);
}
}
以上,功能全部完成。