当前位置: 首页 > article >正文

Spring扩展点实战-动态线程池

Spring扩展点实战-动态线程池

  • 功能需求
  • 代码实现

功能需求

1. 根据配置的参数,动态创建线程池
2. 将动态线程池bean交给Spring管理
3. 后续使用线程池可以从Spring容器获取线程bean使用
4. 后续修改可以直接对线程池的bean修改
5. 监控达到阈值告警

代码实现

利用Spring的BeanDefinition实现系统自动读取配置,注册线程池Bean。这里选择实现BeanDefinitionRegistryPostProcessor接口(也可是选择实现ImportBeanDefinitionRegistrar接口)。
系统的配置信息会被读取到environment中,因此通过实现EnvironmentAware来获取配置信息。

配置信息如下:

spring:
  dtp:
    executors:
      #线程池1
      - poolName: dtpExecutor1
        corePoolSize: 5
        maximumPoolSize: 10
      #线程池2
      - poolName: dtpExecutor2
        corePoolSize: 2
        maximumPoolSize: 15

定义接收线程池参数的对象

@Data
public class ThreadPoolProperties {

    private String poolName;
    private int corePoolSize;
    private int maximumPoolSize;
    private long keepAliveTime = 10;
    private int queueSize = 5;
}

@Data
public class DtpProperties {
    private List<ThreadPoolProperties> executors;
}

自定义线程池类

public class DtpThreadPoolExecutor extends ThreadPoolExecutor {
    public DtpThreadPoolExecutor(ThreadPoolProperties properties) {
        super(properties.getCorePoolSize(), properties.getMaximumPoolSize(), properties.getKeepAliveTime(), TimeUnit.SECONDS, new ArrayBlockingQueue<>(properties.getQueueSize()));
    }
}

根据配置信息注册BeanDefinition

public class DtpBeanDefinitionRegistrar implements BeanDefinitionRegistryPostProcessor, EnvironmentAware {
    private Environment environment;

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
        BindResult<DtpProperties> bindResult = Binder.get(environment).bind("spring.dtp", DtpProperties.class);
        DtpProperties dtpProperties = bindResult.get();
        List<ThreadPoolProperties> executors = dtpProperties.getExecutors();
        if(Objects.isNull(executors)) {
            System.out.println("无动态线程池配置文件");
            return;
        }
        for(ThreadPoolProperties properties : executors) {
            BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(DtpThreadPoolExecutor.class);
            beanDefinitionBuilder.addConstructorArgValue(properties);
            registry.registerBeanDefinition(properties.getPoolName(), beanDefinitionBuilder.getBeanDefinition());
        }

    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

    }

    @Override
    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }
}

现在可以使用就可以使用自定义的线程池

//使用内部线程池
@GetMapping("/placeOrder")
    public String placeOrder() {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3));
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("内部线程池下单...");
            }
        });

        return "success";
    }

@Autowired
    private DtpThreadPoolExecutor dtpExecutor1;
//使用自定义线程池
@GetMapping("/placeOrder1")
    public String placeOrder1() {
        dtpExecutor1.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("动态线程池下单...");
            }
        });
        return "success";
    }

我们需要对线程池的线程有一些管理,所以建一个线程池的工具类,管理所有线程。那么就需要在线程池Bean创建完成后进行处理。这里选择实现BeanPostProcessor。

线程池工具类:

public class DtpRegistrarUtil {
    private static final Map<String, DtpThreadPoolExecutor> dtpExecutorMap = new ConcurrentHashMap<>();

    /**
     * 添加线程
     * @param key
     * @param value
     */
    public static void registerExecutor(String key, DtpThreadPoolExecutor value) {
        dtpExecutorMap.put(key, value);
    }

    /**
     * 获取线程
     * @param name
     * @return
     */
    public static DtpThreadPoolExecutor getExecutor(String name) {
        return dtpExecutorMap.get(name);
    }

    /**
     * 修改线程参数
     * @param name
     * @param properties
     */
    public static void refresh(String name, ThreadPoolProperties properties) {
        DtpThreadPoolExecutor dtpThreadPoolExecutor = dtpExecutorMap.get(name);
        dtpThreadPoolExecutor.setCorePoolSize(properties.getCorePoolSize());
        dtpThreadPoolExecutor.setMaximumPoolSize(properties.getMaximumPoolSize());
        // ...

    }

    /**
     * 获取所有线程池的名称
     * @return
     */
    public static List<String> getAllExecutorNames() {
         return dtpExecutorMap.keySet().stream().toList();
    }
}

线程池设置到工具类

@Component
public class DtpThreadRegistrarProcessor implements BeanPostProcessor {

    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if(bean instanceof DtpThreadPoolExecutor) {
            DtpRegistrarUtil.registerExecutor(beanName, (DtpThreadPoolExecutor)bean);
        }
        return bean;
    }

}

利用工具类对线程池参数进行实时修改。这里通过事件发布来实现解耦。

自定义事件

public class DtpEvent extends ApplicationEvent {
    private ThreadPoolProperties properties;

    public DtpEvent(ThreadPoolProperties properties) {
        super(properties);
        this.properties = properties;
    }

    public ThreadPoolProperties getProperties() {
        return properties;
    }

事件监听器

@Component
public class DtpEventListener implements ApplicationListener<DtpEvent> {
    @Override
    public void onApplicationEvent(DtpEvent event) {
        System.out.println(event);
        ThreadPoolProperties properties = event.getProperties();
        DtpRegistrarUtil.refresh(properties.getPoolName(), properties);
    }
}

我们想要对线程池的实时状态有感知,使用这个工具类对线程池状态进行查看和预警。定时输出线程池状态需要用到定时任务。定时任务可以在容器启动时启动,容器关闭时关闭。

线程池监控和预警定时任务

@Component
public class DtpMonitor implements SmartLifecycle {

    private ScheduledFuture<?> scheduledFuture;

    private boolean isRunning;

    @Override
    public void start() {
        scheduledFuture = Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                monitor();
                alarm();
            }
        }, 5, 5, TimeUnit.SECONDS);
        isRunning = true;
    }

    @Override
    public void stop() {
        scheduledFuture.cancel(false);
        isRunning = false;
        System.out.println("线程池定时任务停止");
    }

    @Override
    public boolean isRunning() {
        return isRunning;
    }

    private void monitor() {
        List<String> allExecutorNames = DtpRegistrarUtil.getAllExecutorNames();
        for(String name : allExecutorNames) {
            DtpThreadPoolExecutor executor = DtpRegistrarUtil.getExecutor(name);
            System.out.println("线程池名称:"+name);
            System.out.println("线程池核心线程数:"+executor.getCorePoolSize());
            System.out.println("线程池最大线程数:"+executor.getMaximumPoolSize());
        }

    }

    private void alarm() {
        // 配置预警值
        int max = 1;
        List<String> allExecutorNames = DtpRegistrarUtil.getAllExecutorNames();
        for(String name : allExecutorNames) {
            DtpThreadPoolExecutor executor = DtpRegistrarUtil.getExecutor(name);
            if(executor.getActiveCount() > max) {
                System.out.println("线程池"+name+"告警!");
            }
        }
    }
}

现在动态线程池的基本功能已经完成。我们想要在其他项目中更方便的接入这个功能,比如通过一个注解。再利用@Import的导入功能,将这些Bean导入,无需在Bean加@Component。

定义注解以及导入配置类

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Import(DtpImportSelector.class)
public @interface EnableDtpThreadPoolExecutor {
}

public class DtpImportSelector implements ImportSelector {
    @Override
    public String[] selectImports(AnnotationMetadata importingClassMetadata) {
        return new String[] {DtpBeanDefinitionRegistrar.class.getName(),
                DtpMonitor.class.getName(),
                DtpThreadRegistrarProcessor.class.getName(),
                DtpEventListener.class.getName()};
    }
}

只需在启动类上加这个注解即可接入功能

@SpringBootApplication
@EnableDtpThreadPoolExecutor
public class DynamicThreadPoolApplication {
    public static void main(String[] args) {
        SpringApplication.run(DynamicThreadPoolApplication.class, args);
    }
}

以上,功能全部完成。


http://www.kler.cn/a/554991.html

相关文章:

  • Rust编程语言入门教程 (七)函数与控制流
  • 在 Centos7 上部署 ASP.NET 8.0 + YOLOv11 的踩坑实录
  • Web 后端 请求与响应
  • Spring Boot 集成 RabbitMQ 并实现消息确认机制
  • 3月营销日历:开启春日盛宴,绽放生活魅力
  • 浅谈网络 | 容器网络之Cilium
  • 设计模式之适配模式是什么?以及在Spring AOP中的拦截器链的使用源码解析。
  • C# 十六进制字符串转换为十进制
  • 解决 `pip is configured with locations that require TLS/SSL` 错误
  • 10-R数组
  • 如何调用 DeepSeek API:详细教程与示例
  • 人工智能 - AIGC 和AGI 有什么区别
  • 【十一】Golang 指针
  • Redis7——基础篇(六)
  • MAC-OS低版本升级到高版本——亲测有效
  • 后端开发:开启技术世界的新大门
  • 【CI/CD】持续集成及 Jenkins
  • AI前端开发:跨领域合作的新机遇与挑战
  • C++17 中的 std::to_chars 和 std::from_chars:高效且安全的字符串转换工具
  • scratch猜年龄互动小游戏 2024年12月scratch四级真题 中国电子学会 图形化编程 scratch四级真题和答案解析