【开源项目】Dynamic-Tp核心流程源码解读
序.介绍
dynamic-tp 是一款动态线程池组件,可以实现线程池的实时动态调参及监控报警,线程池配置放在配置中心统一管理,达成业务代码零侵入,支持多配置中心的选择和常见的第三方组件的线程池的集成管理。
官网
: https://dynamictp.top/
Gitee
: https://gitee.com/dromara/dynamic-tp
Github
: https://github.com/dromara/dynamic-tp
详细介绍及组件的基本使用,可以访问 dynamic-tp 官网。
快速入门
引入依赖
<dependency>
<groupId>cn.dynamictp</groupId>
<artifactId>dynamic-tp-spring-cloud-starter-nacos</artifactId>
<version>1.1.0</version>
</dependency>
本地bootstrap.xml
server:
port: 8099
spring:
application:
name: dynamic-tp-example
profiles:
active: dev
cloud:
nacos:
discovery:
server-addr: localhost:8848
config:
server-addr: ${spring.cloud.nacos.discovery.server-addr}
file-extension: yml
远程配置
# 动态线程池配置文件,建议单独开一个文件放到配置中心,字段详解看readme介绍
spring:
dynamic:
tp:
enabled: true
enabledBanner: true # 是否开启banner打印,默认true
enabledCollect: true # 是否开启监控指标采集,默认false
collectorTypes: micrometer,logging # 监控数据采集器类型(logging | micrometer | internal_logging),默认micrometer
logPath: /home/logs # 监控日志数据路径,默认 ${user.home}/logs
monitorInterval: 5 # 监控时间间隔(报警判断、指标采集),默认5s
nacos: # nacos配置,不配置有默认值(规则name-dev.yml这样),cloud应用不需要配置
dataId: dynamic-tp-nacos-demo-dtp-dev.yml
group: DEFAULT_GROUP
configType: yml # 配置文件类型
platforms: # 通知报警平台配置
- platform: wechat
urlKey: 3a7500-1287-4bd-a798-c5c3d8b69c # 替换
receivers: test1,test2 # 接受人企微名称
- platform: ding
urlKey: f80dad441fcd655438f4a08dcd6a # 替换
secret: SECb5441fa6f375d5b9d21 # 替换,非sign模式可以没有此值
receivers: 15810119805 # 钉钉账号手机号
- platform: lark
urlKey: 0d944ae7-b24a-40 # 替换
receivers: test1,test2 # 接受人飞书名称/openid
tomcatTp: # tomcat web server线程池配置
corePoolSize: 100
maximumPoolSize: 401
keepAliveTime: 60
jettyTp: # jetty web server线程池配置
corePoolSize: 100
maximumPoolSize: 400
undertowTp: # undertow web server线程池配置
corePoolSize: 100
maximumPoolSize: 400
keepAliveTime: 60
hystrixTp: # hystrix 线程池配置
- threadPoolName: hystrix1
corePoolSize: 100
maximumPoolSize: 400
keepAliveTime: 60
dubboTp: # dubbo 线程池配置
- threadPoolName: dubboTp#20880
corePoolSize: 100
maximumPoolSize: 400
keepAliveTime: 60
rocketMqTp: # rocketmq 线程池配置
- threadPoolName: group1#topic1
corePoolSize: 200
maximumPoolSize: 400
keepAliveTime: 60
executors: # 动态线程池配置,都有默认值,采用默认值的可以不配置该项,减少配置量
- threadPoolName: dtpExecutor1
executorType: common # 线程池类型common、eager:适用于io密集型
corePoolSize: 6
maximumPoolSize: 8
queueCapacity: 202
queueType: VariableLinkedBlockingQueue # 任务队列,查看源码QueueTypeEnum枚举类
rejectedHandlerType: CallerRunsPolicy # 拒绝策略,查看RejectedTypeEnum枚举类
keepAliveTime: 50
allowCoreThreadTimeOut: false # 是否允许核心线程池超时
threadNamePrefix: test # 线程名前缀
waitForTasksToCompleteOnShutdown: false # 参考spring线程池设计,优雅关闭线程池
awaitTerminationSeconds: 5 # 单位(s)
preStartAllCoreThreads: false # 是否预热所有核心线程,默认false
runTimeout: 200 # 任务执行超时阈值,目前只做告警用,单位(ms)
queueTimeout: 100 # 任务在队列等待超时阈值,目前只做告警用,单位(ms)
taskWrapperNames: ["ttl"] # 任务包装器名称,集成TaskWrapper接口
notifyItems: # 报警项,不配置自动会按默认值配置(变更通知、容量报警、活性报警、拒绝报警、任务超时报警)
- type: capacity # 报警项类型,查看源码 NotifyTypeEnum枚举类
enabled: true
threshold: 80 # 报警阈值
platforms: [ding,wechat] # 可选配置,不配置默认拿上层platforms配置的所以平台
interval: 120 # 报警间隔(单位:s)
- type: change
enabled: true
- type: liveness
enabled: true
threshold: 80
- type: reject
enabled: true
threshold: 1
- type: run_timeout
enabled: true
threshold: 1
- type: queue_timeout
enabled: true
threshold: 1
启动类加 @EnableDynamicTp
注解
测试效果
@Slf4j
@RestController
@SuppressWarnings("all")
public class TestController {
@Resource
private ThreadPoolExecutor dtpExecutor1;
@GetMapping("/test")
public String test() throws InterruptedException {
dtpExecutor1.execute(() -> {
log.info("i am dynamic-tp-test-1 task");
});
return "success";
}
}
源码分析
设计思路
- 提供一个功能入口可以将
配置
构造成一个线程池对象,内部维护一个线程池注册表,将配置
对应的线程池添加至注册表中。 - 实例化线程池对象,Spring 环境则注入依赖 Bean,以供 IOC 容器使用。
- 如果
配置
指向的是远端配置中心,则注册监听器,当远端注册配置中心刷新时回调,当前系统监听到回调刷新配置
,刷新线程池(动态调参),刷新本地线程池注册表。 - 线程池调参监控,异常报警
DtpBeanDefinitionRegistrar
添加EnableDynamicTp
,引入DtpBeanDefinitionRegistrar
。
@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({DtpBeanDefinitionRegistrar.class})
public @interface EnableDynamicTp {
}
DtpBeanDefinitionRegistrar
- 从 Environment 读取配置信息绑定到
DtpProperties
- 获取配置文件中配置的线程池,如果没有则结束
- 遍历线程池,绑定配置构造线程池所需要的属性,根据配置中的
executorType
注册不同类型的线程池 Bean BeanUtil#registerIfAbsent()
注册 Bean
public class DtpBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware {
private static final Logger log = LoggerFactory.getLogger(DtpBeanDefinitionRegistrar.class);
private Environment environment;
public DtpBeanDefinitionRegistrar() {
}
public void setEnvironment(Environment environment) {
this.environment = environment;
}
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
DtpProperties dtpProperties = new DtpProperties();
PropertiesBinder.bindDtpProperties(this.environment, dtpProperties);
List<ThreadPoolProperties> executors = dtpProperties.getExecutors();
if (CollectionUtils.isEmpty(executors)) {
log.warn("DynamicTp registrar, no executors are configured.");
} else {
executors.forEach((x) -> {
Class<?> executorTypeClass = ExecutorType.getClass(x.getExecutorType());
Map<String, Object> properties = this.buildPropertyValues(x);
Object[] args = this.buildConstructorArgs(executorTypeClass, x);
BeanUtil.registerIfAbsent(registry, x.getThreadPoolName(), executorTypeClass, properties, args);
});
}
}
private Map<String, Object> buildPropertyValues(ThreadPoolProperties tpp) {
Map<String, Object> properties = Maps.newHashMap();
properties.put("threadPoolName", tpp.getThreadPoolName());
properties.put("threadPoolAliasName", tpp.getThreadPoolAliasName());
properties.put("allowCoreThreadTimeOut", tpp.isAllowCoreThreadTimeOut());
properties.put("waitForTasksToCompleteOnShutdown", tpp.isWaitForTasksToCompleteOnShutdown());
properties.put("awaitTerminationSeconds", tpp.getAwaitTerminationSeconds());
properties.put("preStartAllCoreThreads", tpp.isPreStartAllCoreThreads());
properties.put("runTimeout", tpp.getRunTimeout());
properties.put("queueTimeout", tpp.getQueueTimeout());
List<NotifyItem> notifyItems = NotifyItem.mergeAllNotifyItems(tpp.getNotifyItems());
properties.put("notifyItems", notifyItems);
properties.put("notifyEnabled", tpp.isNotifyEnabled());
List<TaskWrapper> taskWrappers = TaskWrappers.getInstance().getByNames(tpp.getTaskWrapperNames());
properties.put("taskWrappers", taskWrappers);
return properties;
}
private Object[] buildConstructorArgs(Class<?> clazz, ThreadPoolProperties tpp) {
Object taskQueue;
if (clazz.equals(EagerDtpExecutor.class)) {
taskQueue = new TaskQueue(tpp.getQueueCapacity());
} else {
taskQueue = QueueTypeEnum.buildLbq(tpp.getQueueType(), tpp.getQueueCapacity(), tpp.isFair(), tpp.getMaxFreeMemory());
}
return new Object[]{tpp.getCorePoolSize(), tpp.getMaximumPoolSize(), tpp.getKeepAliveTime(), tpp.getUnit(), taskQueue, new NamedThreadFactory(tpp.getThreadNamePrefix()), RejectHandlerGetter.buildRejectedHandler(tpp.getRejectedHandlerType())};
}
}
DtpPostProcessor
DtpPostProcessor
利用了 Spring 容器启动 BeanPostProcessor
机制增强机制,在 bean 初始化的时候调用 postProcessAfterInitialization
,它实现了获取被 IOC 容器托管的线程池 bean 然后注册到本地的注册表中。
public class DtpPostProcessor implements BeanPostProcessor {
private static final Logger log = LoggerFactory.getLogger(DtpPostProcessor.class);
public DtpPostProcessor() {
}
public Object postProcessAfterInitialization(@NonNull Object bean, @NonNull String beanName) throws BeansException {
if (!(bean instanceof ThreadPoolExecutor) && !(bean instanceof ThreadPoolTaskExecutor)) {
return bean;
} else if (bean instanceof DtpExecutor) {
DtpExecutor dtpExecutor = (DtpExecutor)bean;
if (bean instanceof EagerDtpExecutor) {
((TaskQueue)dtpExecutor.getQueue()).setExecutor((EagerDtpExecutor)dtpExecutor);
}
this.registerDtp(dtpExecutor);
return dtpExecutor;
} else {
ApplicationContext applicationContext = ApplicationContextHolder.getInstance();
String dtpAnnotationVal;
try {
DynamicTp dynamicTp = (DynamicTp)applicationContext.findAnnotationOnBean(beanName, DynamicTp.class);
if (Objects.nonNull(dynamicTp)) {
dtpAnnotationVal = dynamicTp.value();
} else {
BeanDefinitionRegistry registry = (BeanDefinitionRegistry)applicationContext;
AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition)registry.getBeanDefinition(beanName);
MethodMetadata methodMetadata = (MethodMetadata)annotatedBeanDefinition.getSource();
if (Objects.isNull(methodMetadata) || !methodMetadata.isAnnotated(DynamicTp.class.getName())) {
return bean;
}
dtpAnnotationVal = ((Map)Optional.ofNullable(methodMetadata.getAnnotationAttributes(DynamicTp.class.getName())).orElse(Collections.emptyMap())).getOrDefault("value", "").toString();
}
} catch (NoSuchBeanDefinitionException var9) {
log.error("There is no bean with the given name {}", beanName, var9);
return bean;
}
String poolName = StringUtils.isNotBlank(dtpAnnotationVal) ? dtpAnnotationVal : beanName;
if (bean instanceof ThreadPoolTaskExecutor) {
ThreadPoolTaskExecutor taskExecutor = (ThreadPoolTaskExecutor)bean;
this.registerCommon(poolName, taskExecutor.getThreadPoolExecutor());
} else {
this.registerCommon(poolName, (ThreadPoolExecutor)bean);
}
return bean;
}
}
private void registerDtp(DtpExecutor executor) {
DtpRegistry.registerDtp(executor, "beanPostProcessor");
}
private void registerCommon(String poolName, ThreadPoolExecutor executor) {
ExecutorWrapper wrapper = new ExecutorWrapper(poolName, executor);
DtpRegistry.registerCommon(wrapper, "beanPostProcessor");
}
}
- 获取到 bean 后,如果是非线程池类型则结束。
- 如果是
DtpExecutor
则注册到 DTP_REGISTRY 注册表中 - 如果是 非动态线程池且标注了
@DynamicTp
注解则注册到 COMMON_REGISTRY 注册表中 - 如果是 非动态线程池且未标注
@DynamicTp
注解则结束不做增强
CloudNacosRefresher
CloudNacosRefresher
实现了AbstractRefresher
,当接收到RefreshScopeRefreshedEvent
,进行刷新
public class CloudNacosRefresher extends AbstractRefresher implements SmartApplicationListener {
private static final Logger log = LoggerFactory.getLogger(CloudNacosRefresher.class);
public CloudNacosRefresher() {
}
public boolean supportsEventType(@NonNull Class<? extends ApplicationEvent> eventType) {
return RefreshScopeRefreshedEvent.class.isAssignableFrom(eventType);
}
public void onApplicationEvent(@NonNull ApplicationEvent event) {
if (event instanceof RefreshScopeRefreshedEvent) {
this.doRefresh(this.dtpProperties);
}
}
}
AbstractRefresher#doRefresh()
,刷新DtpRegistry
protected void doRefresh(DtpProperties dtpProperties) {
DtpRegistry.refresh(dtpProperties);
this.publishEvent(dtpProperties);
}
private void publishEvent(DtpProperties dtpProperties) {
RefreshEvent event = new RefreshEvent(this, dtpProperties);
ApplicationContextHolder.publishEvent(event);
}
DtpRegistry#refresh()
,根据配置刷新DtpExecutor
,执行消息变更通知, NoticeManager.doNoticeAsync(new ExecutorWrapper(executor), oldProp, diffKeys);
。
public static void refresh(DtpProperties properties) {
if (!Objects.isNull(properties) && !CollectionUtils.isEmpty(properties.getExecutors())) {
properties.getExecutors().forEach((x) -> {
if (StringUtils.isBlank(x.getThreadPoolName())) {
log.warn("DynamicTp refresh, threadPoolName must not be empty.");
} else {
DtpExecutor dtpExecutor = (DtpExecutor)DTP_REGISTRY.get(x.getThreadPoolName());
if (Objects.isNull(dtpExecutor)) {
log.warn("DynamicTp refresh, cannot find specified dtpExecutor, name: {}.", x.getThreadPoolName());
} else {
refresh(dtpExecutor, x);
}
}
});
} else {
log.warn("DynamicTp refresh, empty threadPoolProperties.");
}
}
private static void refresh(DtpExecutor executor, ThreadPoolProperties properties) {
if (properties.getCorePoolSize() >= 0 && properties.getMaximumPoolSize() > 0 && properties.getMaximumPoolSize() >= properties.getCorePoolSize() && properties.getKeepAliveTime() >= 0L) {
DtpMainProp oldProp = ExecutorConverter.convert(executor);
doRefresh(executor, properties);
DtpMainProp newProp = ExecutorConverter.convert(executor);
if (oldProp.equals(newProp)) {
log.warn("DynamicTp refresh, main properties of [{}] have not changed.", executor.getThreadPoolName());
} else {
List<FieldInfo> diffFields = EQUATOR.getDiffFields(oldProp, newProp);
List<String> diffKeys = (List)diffFields.stream().map(FieldInfo::getFieldName).collect(Collectors.toList());
NoticeManager.doNoticeAsync(new ExecutorWrapper(executor), oldProp, diffKeys);
log.info("DynamicTp refresh, name: [{}], changed keys: {}, corePoolSize: [{}], maxPoolSize: [{}], queueType: [{}], queueCapacity: [{}], keepAliveTime: [{}], rejectedType: [{}], allowsCoreThreadTimeOut: [{}]", new Object[]{executor.getThreadPoolName(), diffKeys, String.format("%s => %s", oldProp.getCorePoolSize(), newProp.getCorePoolSize()), String.format("%s => %s", oldProp.getMaxPoolSize(), newProp.getMaxPoolSize()), String.format("%s => %s", oldProp.getQueueType(), newProp.getQueueType()), String.format("%s => %s", oldProp.getQueueCapacity(), newProp.getQueueCapacity()), String.format("%ss => %ss", oldProp.getKeepAliveTime(), newProp.getKeepAliveTime()), String.format("%s => %s", oldProp.getRejectType(), newProp.getRejectType()), String.format("%s => %s", oldProp.isAllowCoreThreadTimeOut(), newProp.isAllowCoreThreadTimeOut())});
}
} else {
log.error("DynamicTp refresh, invalid parameters exist, properties: {}", properties);
}
}
DtpAdapterListener
DtpAdapterListener
处于 adapter 模块,该模块主要是对些三方组件中的线程池进行管理(例如 Tomcat,Jetty 等),通过 spring 的事件发布监听机制来实现与核心流程解耦。
如果需要对Tomcat进行管理,需要加入依赖
<dependency>
<groupId>cn.dynamictp</groupId>
<artifactId>dynamic-tp-spring-boot-starter-adapter-webserver</artifactId>
<version>1.0.8</version>
</dependency>
该jar包会注入WebServerTpAutoConfiguration
@Configuration
@ConditionalOnWebApplication
@AutoConfigureAfter({BaseBeanAutoConfiguration.class})
public class WebServerTpAutoConfiguration {
public WebServerTpAutoConfiguration() {
}
@Bean
@ConditionalOnTomcatWebServer
public TomcatDtpAdapter tomcatTpHandler() {
return new TomcatDtpAdapter();
}
@Bean
@ConditionalOnJettyWebServer
public JettyDtpAdapter jettyTpHandler() {
return new JettyDtpAdapter();
}
@Bean
@ConditionalOnUndertowWebServer
public UndertowDtpAdapter undertowTpHandler() {
return new UndertowDtpAdapter();
}
}
TomcatDtpAdapter
继承了AbstractWebServerDtpAdapter
,而AbstractWebServerDtpAdapter
实现了ApplicationListener
,在容器启动的时候会重写onApplicationEvent
。
public void onApplicationEvent(WebServerInitializedEvent event) {
try {
DtpProperties dtpProperties = (DtpProperties)ApplicationContextHolder.getBean(DtpProperties.class);
this.initialize();
this.refresh(dtpProperties);
} catch (Exception var3) {
log.error("Init web server thread pool failed.", var3);
}
}
protected void initialize() {
if (this.executorWrapper == null) {
ApplicationContext applicationContext = ApplicationContextHolder.getInstance();
WebServer webServer = ((WebServerApplicationContext)applicationContext).getWebServer();
this.executorWrapper = this.doGetExecutorWrapper(webServer);
log.info("DynamicTp adapter, web server executor init end, executor: {}", this.executorWrapper.getExecutor());
}
}
TomcatDtpAdapter#refresh
,会根据配置文件中的数据设置线程池大小。
public class TomcatDtpAdapter extends AbstractWebServerDtpAdapter {
private static final Logger log = LoggerFactory.getLogger(TomcatDtpAdapter.class);
private static final String POOL_NAME = "tomcatTp";
public TomcatDtpAdapter() {
}
public ExecutorWrapper doGetExecutorWrapper(WebServer webServer) {
TomcatWebServer tomcatWebServer = (TomcatWebServer)webServer;
return new ExecutorWrapper("tomcatTp", tomcatWebServer.getTomcat().getConnector().getProtocolHandler().getExecutor());
}
public ThreadPoolStats getPoolStats() {
ThreadPoolExecutor executor = (ThreadPoolExecutor)this.getWrapper().getExecutor();
return ThreadPoolStats.builder().corePoolSize(executor.getCorePoolSize()).maximumPoolSize(executor.getMaximumPoolSize()).queueType(executor.getQueue().getClass().getSimpleName()).queueCapacity(executor.getQueue().size() + executor.getQueue().remainingCapacity()).queueSize(executor.getQueue().size()).queueRemainingCapacity(executor.getQueue().remainingCapacity()).activeCount(executor.getActiveCount()).taskCount(executor.getTaskCount()).completedTaskCount(executor.getCompletedTaskCount()).largestPoolSize(executor.getLargestPoolSize()).poolSize(executor.getPoolSize()).waitTaskCount(executor.getQueue().size()).poolName("tomcatTp").build();
}
public void refresh(DtpProperties dtpProperties) {
SimpleTpProperties properties = dtpProperties.getTomcatTp();
if (!Objects.isNull(properties)) {
ExecutorWrapper executorWrapper = this.getWrapper();
ThreadPoolExecutor executor = (ThreadPoolExecutor)executorWrapper.getExecutor();
this.checkParams(executor.getMaximumPoolSize(), properties);
DtpMainProp oldProp = ExecutorConverter.ofSimple("tomcatTp", executor.getCorePoolSize(), executor.getMaximumPoolSize(), executor.getKeepAliveTime(properties.getUnit()));
this.doRefresh(executor, properties);
DtpMainProp newProp = ExecutorConverter.ofSimple("tomcatTp", executor.getCorePoolSize(), executor.getMaximumPoolSize(), executor.getKeepAliveTime(properties.getUnit()));
if (oldProp.equals(newProp)) {
log.warn("DynamicTp adapter refresh, main properties of [{}] have not changed.", "tomcatTp");
} else {
log.info("DynamicTp adapter [{}] refreshed end, corePoolSize: [{}], maxPoolSize: [{}], keepAliveTime: [{}]", new Object[]{"tomcatTp", String.format("%s => %s", oldProp.getCorePoolSize(), newProp.getCorePoolSize()), String.format("%s => %s", oldProp.getMaxPoolSize(), newProp.getMaxPoolSize()), String.format("%s => %s", oldProp.getKeepAliveTime(), newProp.getKeepAliveTime())});
}
}
}
private void doRefresh(ThreadPoolExecutor executor, SimpleTpProperties properties) {
if (!Objects.equals(executor.getKeepAliveTime(properties.getUnit()), properties.getKeepAliveTime())) {
executor.setKeepAliveTime((long)properties.getKeepAliveTime(), properties.getUnit());
}
if (!Objects.equals(executor.getCorePoolSize(), properties.getCorePoolSize())) {
executor.setCorePoolSize(properties.getCorePoolSize());
}
if (!Objects.equals(executor.getMaximumPoolSize(), properties.getMaximumPoolSize())) {
executor.setMaximumPoolSize(properties.getMaximumPoolSize());
}
}
private ExecutorWrapper getWrapper() {
ExecutorWrapper executorWrapper = this.getExecutorWrapper();
if (!Objects.isNull(executorWrapper) && !Objects.isNull(executorWrapper.getExecutor())) {
return executorWrapper;
} else {
log.warn("Tomcat web server threadPool is null.");
throw new DtpException("Tomcat web server threadPool is null.");
}
}
}
当配置修改的时候,DtpAdapterListener
会监听到对应的修改事件。
@Slf4j
public class DtpAdapterListener implements GenericApplicationListener {
@Override
public boolean supportsEventType(ResolvableType resolvableType) {
Class<?> type = resolvableType.getRawClass();
if (type != null) {
return RefreshEvent.class.isAssignableFrom(type)
|| CollectEvent.class.isAssignableFrom(type)
|| AlarmCheckEvent.class.isAssignableFrom(type);
}
return false;
}
@Override
public void onApplicationEvent(@NonNull ApplicationEvent event) {
try {
if (event instanceof RefreshEvent) {
doRefresh(((RefreshEvent) event).getDtpProperties());
} else if (event instanceof CollectEvent) {
doCollect(((CollectEvent) event).getDtpProperties());
} else if (event instanceof AlarmCheckEvent) {
doAlarmCheck(((AlarmCheckEvent) event).getDtpProperties());
}
} catch (Exception e) {
log.error("DynamicTp adapter, event handle failed.", e);
}
}
protected void doRefresh(DtpProperties dtpProperties) {
val handlerMap = ApplicationContextHolder.getBeansOfType(DtpAdapter.class);
if (CollectionUtils.isEmpty(handlerMap)) {
return;
}
handlerMap.forEach((k, v) -> v.refresh(dtpProperties));
}
}
AlarmManager
AlarmManager
静态方法块中构造了责任链,在需要报警通知的时候,调用责任链执行
@Slf4j
public class AlarmManager {
private static final ExecutorService ALARM_EXECUTOR = ThreadPoolBuilder.newBuilder()
.threadPoolName("dtp-alarm")
.threadFactory("dtp-alarm")
.corePoolSize(1)
.maximumPoolSize(2)
.workQueue(LINKED_BLOCKING_QUEUE.getName(), 2000, false, null)
.rejectedExecutionHandler(RejectedTypeEnum.DISCARD_OLDEST_POLICY.getName())
.taskWrappers(TaskWrappers.getInstance().getByNames(Sets.newHashSet("mdc")))
.buildDynamic();
private static final InvokerChain<BaseNotifyCtx> ALARM_INVOKER_CHAIN;
static {
ALARM_INVOKER_CHAIN = NotifyFilterBuilder.getAlarmInvokerChain();
}
private AlarmManager() { }
}
获取容器中NotifyFilter
的Bean。
public class NotifyFilterBuilder {
private NotifyFilterBuilder() { }
public static InvokerChain<BaseNotifyCtx> getAlarmInvokerChain() {
val filters = ApplicationContextHolder.getBeansOfType(NotifyFilter.class);
Collection<NotifyFilter> alarmFilters = Lists.newArrayList(filters.values());
alarmFilters.add(new AlarmBaseFilter());
alarmFilters = alarmFilters.stream()
.filter(x -> x.supports(NotifyTypeEnum.ALARM))
.sorted(Comparator.comparing(Filter::getOrder))
.collect(Collectors.toList());
return InvokerChainFactory.buildInvokerChain(new AlarmInvoker(), alarmFilters.toArray(new NotifyFilter[0]));
}
}
AlarmManager#doAlarm
,根据告警类型获取告警项配置,一个线程池可以配置多个NotifyItem,执行责任链。
public static void doAlarm(ExecutorWrapper executorWrapper, NotifyItemEnum notifyItemEnum) {
NotifyHelper.getNotifyItem(executorWrapper, notifyItemEnum).ifPresent(notifyItem -> {
val alarmCtx = new AlarmCtx(executorWrapper, notifyItem);
ALARM_INVOKER_CHAIN.proceed(alarmCtx);
});
}
执行责任链
public class AlarmInvoker implements Invoker<BaseNotifyCtx> {
@Override
public void invoke(BaseNotifyCtx context) {
val alarmCtx = (AlarmCtx) context;
val executorWrapper = alarmCtx.getExecutorWrapper();
val notifyItem = alarmCtx.getNotifyItem();
val alarmInfo = AlarmCounter.getAlarmInfo(executorWrapper.getThreadPoolName(), notifyItem.getType());
alarmCtx.setAlarmInfo(alarmInfo);
try {
DtpNotifyCtxHolder.set(context);
NotifierHandler.getInstance().sendAlarm(NotifyItemEnum.of(notifyItem.getType()));
AlarmCounter.reset(executorWrapper.getThreadPoolName(), notifyItem.getType());
} finally {
DtpNotifyCtxHolder.remove();
}
}
}
NotifierHandler#sendAlarm
,获取不同的DtpNotifier
,发送告警消息
@Slf4j
public final class NotifierHandler {
private static final Map<String, DtpNotifier> NOTIFIERS = new HashMap<>();
private NotifierHandler() {
// 适配SPI
ServiceLoader<DtpNotifier> loader = ServiceLoader.load(DtpNotifier.class);
for (DtpNotifier notifier : loader) {
NOTIFIERS.put(notifier.platform(), notifier);
}
DtpNotifier dingNotifier = new DtpDingNotifier(new DingNotifier());
DtpNotifier wechatNotifier = new DtpWechatNotifier(new WechatNotifier());
DtpNotifier larkNotifier = new DtpLarkNotifier(new LarkNotifier());
NOTIFIERS.put(dingNotifier.platform(), dingNotifier);
NOTIFIERS.put(wechatNotifier.platform(), wechatNotifier);
NOTIFIERS.put(larkNotifier.platform(), larkNotifier);
}
public void sendAlarm(NotifyItemEnum notifyItemEnum) {
NotifyItem notifyItem = DtpNotifyCtxHolder.get().getNotifyItem();
for (String platformId : notifyItem.getPlatformIds()) {
NotifyHelper.getPlatform(platformId).ifPresent(p -> {
DtpNotifier notifier = NOTIFIERS.get(p.getPlatform().toLowerCase());
if (notifier != null) {
notifier.sendAlarmMsg(p, notifyItemEnum);
}
});
}
}
}
AbstractDtpNotifier#sendAlarmMsg
,发送消息。Notifier
有DingNotifier
,EmailNotifier
,WechatNotifier
等实现类
@Override
public void sendAlarmMsg(NotifyPlatform notifyPlatform, NotifyItemEnum notifyItemEnum) {
String content = buildAlarmContent(notifyPlatform, notifyItemEnum);
if (StringUtils.isBlank(content)) {
log.debug("Alarm content is empty, ignore send alarm message.");
return;
}
notifier.send(notifyPlatform, content);
}
DtpMonitor
对系统进行监控
@Slf4j
public class DtpMonitor implements ApplicationRunner, Ordered {
private static final ScheduledExecutorService MONITOR_EXECUTOR = new ScheduledThreadPoolExecutor(
1, new NamedThreadFactory("dtp-monitor", true));
@Resource
private DtpProperties dtpProperties;
@Override
public void run(ApplicationArguments args) {
MONITOR_EXECUTOR.scheduleWithFixedDelay(this::run,
0, dtpProperties.getMonitorInterval(), TimeUnit.SECONDS);
}
private void run() {
List<String> dtpNames = DtpRegistry.listAllDtpNames();
List<String> commonNames = DtpRegistry.listAllCommonNames();
checkAlarm(dtpNames);
collect(dtpNames, commonNames);
}
private void collect(List<String> dtpNames, List<String> commonNames) {
if (!dtpProperties.isEnabledCollect()) {
return;
}
dtpNames.forEach(x -> {
DtpExecutor executor = DtpRegistry.getDtpExecutor(x);
ThreadPoolStats poolStats = MetricsConverter.convert(executor);
doCollect(poolStats);
});
commonNames.forEach(x -> {
ExecutorWrapper wrapper = DtpRegistry.getCommonExecutor(x);
ThreadPoolStats poolStats = MetricsConverter.convert(wrapper);
doCollect(poolStats);
});
publishCollectEvent();
}
private void checkAlarm(List<String> dtpNames) {
dtpNames.forEach(x -> {
DtpExecutor executor = DtpRegistry.getDtpExecutor(x);
AlarmManager.doAlarmAsync(executor, SCHEDULE_NOTIFY_ITEMS);
});
publishAlarmCheckEvent();
}
private void doCollect(ThreadPoolStats threadPoolStats) {
try {
CollectorHandler.getInstance().collect(threadPoolStats, dtpProperties.getCollectorTypes());
} catch (Exception e) {
log.error("DynamicTp monitor, metrics collect error.", e);
}
}
private void publishCollectEvent() {
CollectEvent event = new CollectEvent(this, dtpProperties);
ApplicationContextHolder.publishEvent(event);
}
private void publishAlarmCheckEvent() {
AlarmCheckEvent event = new AlarmCheckEvent(this, dtpProperties);
ApplicationContextHolder.publishEvent(event);
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE + 2;
}
}