当前位置: 首页 > article >正文

@EnableScheduling 和 @Scheduled 实现定时任务的任务延期问题

前言

在复盘 ieg 一面看到定时任务阻塞的问题时,研究了下 @EnableScheduling 的源码,觉得可以单开一篇文章讲一讲

本文主要讲述了使用 @EnableScheduling 可能出现的线程阻塞导致定时任务延期的问题,也顺便解释了动态定时任务源码上的实现

引用文章:

@Schedule定时任务+分布式环境:@Schedule定时任务+分布式环境,这些坑你一定得注意!!! (qq.com)

java 中的线程池参数:java中四种线程池及poolSize、corePoolSize、maximumPoolSize_maximum-pool-size-CSDN博客

线程池的拒绝策略:线程池的RejectedExecutionHandler(拒绝策略)-CSDN博客

Java 中实现定时任务:Java中实现定时任务,有多少种解决方案?好久没更新博客了,最近上班做了点小东西,总结复盘一下。主要介绍了定时任务的三种 - 掘金 (juejin.cn)

线程阻塞问题

问题根源

Java中 使用 Springboot 自带的定时任务 @EnableScheduling 和 @Scheduled 注解,会装配一个 SchedulingConfiguration 的类

@Target(ElementType.TYPE)  
@Retention(RetentionPolicy.RUNTIME)  
@Import(SchedulingConfiguration.class)  
@Documented  
public @interface EnableScheduling {  
  
}
@Configuration(proxyBeanMethods = false)  
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)  
public class SchedulingConfiguration {  
    @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)  
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)  
    public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {  
        return new ScheduledAnnotationBeanPostProcessor();  
    }  
}

这个配置类又会创建一个 ScheduledAnnotationBeanPostProcessor 的 Bean

在这个类的无参构造中又初始化了一个 ScheduledTaskRegistrar 的对象

public ScheduledAnnotationBeanPostProcessor() {  
    this.registrar = new ScheduledTaskRegistrar();  
}

在 创建单例或刷新上下文之后,会执行 finishRegistration 方法,最后执行 registrar 的 afterPropertiesSet 方法:

@Override  
public void afterSingletonsInstantiated() {  
    // Remove resolved singleton classes from cache  
    this.nonAnnotatedClasses.clear();  

    if (this.applicationContext == null) {  
        // Not running in an ApplicationContext -> register tasks early...  
        finishRegistration();  
    }  
}  
  
@Override  
public void onApplicationEvent(ContextRefreshedEvent event) {  
    if (event.getApplicationContext() == this.applicationContext) {  
        // Running in an ApplicationContext -> register tasks this late...  
        // giving other ContextRefreshedEvent listeners a chance to perform  
        // their work at the same time (e.g. Spring Batch's job registration).  
        finishRegistration();  
    }  
}  
  
private void finishRegistration() {  
    if (this.scheduler != null) {  
        this.registrar.setScheduler(this.scheduler);  
    }  
    
    // ...
    
    this.registrar.afterPropertiesSet();  
}

ScheduledTaskRegistrar 的成员变量包括任务的执行器以及几种类型的定时任务列表

@Nullable  
private TaskScheduler taskScheduler;  
  
@Nullable  
private ScheduledExecutorService localExecutor;  
  
@Nullable  
private List<TriggerTask> triggerTasks;  
  
@Nullable  
private List<CronTask> cronTasks;

afterPropertiesSet 方法会获取一个执行器

@Override  
public void afterPropertiesSet() {  
    scheduleTasks();  
}  
  
/**  
* Schedule all registered tasks against the underlying  
* {@linkplain #setTaskScheduler(TaskScheduler) task scheduler}.  
*/  
@SuppressWarnings("deprecation")  
protected void scheduleTasks() {  
    if (this.taskScheduler == null) {  
        this.localExecutor = Executors.newSingleThreadScheduledExecutor();  
        this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);  
    }  
    if (this.triggerTasks != null) {  
        for (TriggerTask task : this.triggerTasks) {  
            addScheduledTask(scheduleTriggerTask(task));  
        }  
    }  
    if (this.cronTasks != null) {  
        for (CronTask task : this.cronTasks) {  
            addScheduledTask(scheduleCronTask(task));  
        }  
    }  
    if (this.fixedRateTasks != null) {  
        for (IntervalTask task : this.fixedRateTasks) {  
            addScheduledTask(scheduleFixedRateTask(task));  
        }  
    }  
    if (this.fixedDelayTasks != null) {  
        for (IntervalTask task : this.fixedDelayTasks) {  
            addScheduledTask(scheduleFixedDelayTask(task));  
        }  
    }  
}

进入 newSingleThreadScheduledExecutor 可以看到,默认使用了一个 corePoolSize 为 1, maximumPoolSize 为 Integer.MAX_VALUE 的线程池

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {  
    return new DelegatedScheduledExecutorService  
        (new ScheduledThreadPoolExecutor(1));  
}
public ScheduledThreadPoolExecutor(int corePoolSize) {  
    super(corePoolSize, Integer.MAX_VALUE,  
        DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,  
        new DelayedWorkQueue());  
}
public ThreadPoolExecutor(int corePoolSize,  
                        int maximumPoolSize,  
                        long keepAliveTime,  
                        TimeUnit unit,  
                        BlockingQueue<Runnable> workQueue) {  
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,  
    Executors.defaultThreadFactory(), defaultHandler);  
}

而线程池主要有几个重要的参数分别是:

  1. corePoolSize:线程池的基本大小。
  2. maximumPoolSize:线程池中允许的最大线程数。
  3. poolSize:线程池中当前线程的数量。

当提交一个新任务时,若

  1. poolSize < corePoolSize : 创建新线程处理该任务
  2. poolSize = corePoolSize : 将任务置于阻塞队列中
  3. 阻塞队列的容量达到上限,且这时 poolSize < maximumPoolSize :
  4. 阻塞队列满了,且 poolSize = maximumPoolSize : 那么线程池已经达到极限,会根据饱和策略 RejectedExecutionHandler 拒绝新的任务,默认是 AbortPolicy 会丢掉任务并抛出异常

解决方案

注入自己编写的线程池,自行设置参数:

@Configuration  
 public class MyTheadPoolConfig {  
   
     @Bean  
     public TaskExecutor taskExecutor() {  
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
         //设置核心线程数  
         executor.setCorePoolSize(10);  
         //设置最大线程数  
         executor.setMaxPoolSize(20);  
         //缓冲队列200:用来缓冲执行任务的队列  
         executor.setQueueCapacity(200);  
         //线程活路时间 60 秒  
         executor.setKeepAliveSeconds(60);  
         //线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池  
         // 这里我继续沿用 scheduling 默认的线程名前缀  
         executor.setThreadNamePrefix("nzc-create-scheduling-");  
         //设置拒绝策略  
         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());  
         executor.setWaitForTasksToCompleteOnShutdown(true);  
         return executor;  
     }  
 }

在定时任务的类上再加一个 @EnableAsync 注解,给方法添加一个 @Async 即可

@Slf4j  
@Component  
@EnableAsync  
@EnableScheduling  
public class ScheduleService {  

    @Autowired  
    TaskExecutor taskExecutor;  

    @Async(value = "taskExecutor")  
    @Scheduled(cron = "0/5 * * * * ? ")  
    public void testSchedule() {  
         try {  
             Thread.sleep(10000);  
             log.info("当前执行任务的线程号ID===>{}", Thread.currentThread().getId());  
         } catch (Exception e) {  
             e.printStackTrace();  
         }   
    }  
}

动态定时任务

上面提到了

@EnableScheduling 导入了 SchedulingConfiguration,SchedulingConfiguration 又创建了 ScheduledAnnotationBeanPostProcessor 的Bean,ScheduledAnnotationBeanPostProcessor 又实例化了 ScheduledTaskRegistrar 对象,即

@EnableScheduling -> SchedulingConfiguration -> ScheduledAnnotationBeanPostProcessor -> ScheduledTaskRegistrar

实际上,在 ScheduledAnnotationBeanPostProcessor 的 finishRegistration 方法中,会先获取所有实现了 SchedulingConfigurer 接口的 Bean,并执行他们的 configureTasks 方法


private void finishRegistration() {  
    if (this.scheduler != null) {  
        this.registrar.setScheduler(this.scheduler);  
    }  

    if (this.beanFactory instanceof ListableBeanFactory) {  
        Map<String, SchedulingConfigurer> beans =  
        ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);  
        List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());  
        AnnotationAwareOrderComparator.sort(configurers);  
        for (SchedulingConfigurer configurer : configurers) {  
            configurer.configureTasks(this.registrar);  
        }  
    }
    // ...
}

我们可以通过配置一个实现了 SchedulingConfigurer 接口的 Bean,实现动态加载定时任务的执行时间

@Data  
@Slf4j  
@Component  
@RequiredArgsConstructor  
@PropertySource("classpath:task-config.ini")  
public class ScheduleTask implements SchedulingConfigurer {  
  
    // private Long timer = 100 * 1000L;  

    @Value("${printTime.cron}")  
    private String cron;  

    @Override  
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {  
        // 间隔触发的任务  
        taskRegistrar.addTriggerTask(new Runnable() {  
        @Override  
        public void run(){  
           // ...
        }
        }, new Trigger() {  
        @Override  
        public Date nextExecutionTime(TriggerContext triggerContext) {  
            // 使用CronTrigger触发器,可动态修改cron表达式来操作循环规则  
            CronTrigger cronTrigger = new CronTrigger(cron);  
            Date nextExecutionTime = cronTrigger.nextExecutionTime(triggerContext);  
            return nextExecutionTime;  
            
            // 使用PerodicTrigger触发器,修改timer变量指定操作间隔,单位为毫秒
            // PeriodicTrigger periodicTrigger = new PeriodicTrigger(timer);  
            // Date nextExecutionTime = periodicTrigger.nextExecutionTime(triggerContext);  
            // return nextExecutionTime;  
        }  
        });  
    }  
}


http://www.kler.cn/a/314244.html

相关文章:

  • 【入门级】计算机网络学习
  • 认识机器学习中的经验风险最小化准则
  • MySQL素材怎么导入Navicat???
  • Kutools for Excel 简体中文版 - 官方正版授权
  • sql模糊关联匹配
  • pyqt鸟瞰
  • Linux:login shell和non-login shell以及其配置文件
  • MQ入门(4)
  • kubernetes基础命令
  • 论文阅读--Planning-oriented Autonomous Driving(一)
  • “华为杯”第十二届中国研究生数学建模竞赛-D题:面向节能的单/多列车优化决策问题研究(续)(附MATLAB代码实现)
  • IPsec-VPN中文解释
  • 数据结构:二叉树(一)
  • 【计算机网络】详解UDP套接字网络字节序IP地址端口号
  • 2025年最新大数据毕业设计选题-Hadoop综合项目
  • TCP客户端编码和解码处理:发送和接收指定编码消息
  • 深度学习——基础知识
  • 初识zookeeper
  • 计组(蒋)期末速成笔记1
  • 基于MATLAB的运动模糊图像处理
  • LVGL学习
  • ES6 -- 2015
  • 肺结节检测系统源码分享
  • CLion/Git版本控制
  • 使用Rust直接编译单个的Solidity合约
  • VCG 顶点区域生长