Spring的任务调度
Spring的任务调度
1.概述
Spring框架为任务调度提供了专门的解决方案。在Spring框架的org.springframework.scheduling包中,通过对JDK 的ScheduledExecutorService接口的实例进行封装,对外提供了一些注解和接口,为开发者处理定时任务提供了统一的配置方式。
Spring框架借助了JDK的能力实现了任务调度,但是开发者通过Spring来实现定时任务调度是非常简单和便捷的。如果需要单机环境下要实现任务调度的功能,使用Spring 的任务调度方式无疑是首选。
2.Spring任务调度的使用
我们使用Spring来进行定时任务调度的话,主要是两种方式。1. @Scheduled注解的方式。2. 基于SchedulingConfigurer接口的方式。这两种方式都需要使用@EnableScheduling注解来开启Spring框架的定时任务调度功能。
2.1 @Scheduled注解的方式
@Scheduled注解的方式比较简单,我们只要在需要调度方法上加上@Scheduled注解,然后通过配置@Scheduled注解的元素值,来控制任务调度的频率。这种配置任务调度的频率方式有点硬编码的味道。若后期若需要调整任务调度的频率,则需要修改代码后重启服务才能生效。
2.1.1 @Scheduled注解
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
public @interface Scheduled {
String CRON_DISABLED = ScheduledTaskRegistrar.CRON_DISABLED;
String cron() default "";
String zone() default "";
long fixedDelay() default -1;
String fixedDelayString() default "";
long fixedRate() default -1;
String fixedRateString() default "";
long initialDelay() default -1;
String initialDelayString() default "";
}
@Scheduled注解的元素值说明:
cron | 一个类似cron的任务执行表达式 |
zone | cron表达时解析使用的时区,默认为服务器的本地时区。 |
fixedDelay | 上一次任务执行结束到下一次任务执行开始的固定时间延迟,单位为ms。 |
fixedDelayString | 同fixedDelay,返回的是延迟的字符串形式 |
fixedRate | 以固定的时间间隔来执行Scheduled注释的任务,单位为ms。使用fixedRate执行任务调度时,若上一次任务还未执行完毕,则将下一次任务加入worker队列,等上一次任务执行完成后,才能执行下一次任务。 |
fixedRateString | 同fixedRate,返回的是时间间隔的字符串形式。 |
initialDelay | 首次执行Scheduled注释任务的延迟时间,单位ms。 |
initialDelayString | 同initialDelay,返回的是延迟的字符串形式。 |
TimeUnit | 执行任务的时间单位,默认是ms。 |
要使用@Scheduled注解方式进行任务调度的话,我们只要在需要进行调度的方法上加上@Scheduled注解,再通过注解的属性值对任务调度的频率进行设置。最后增加一个@EnableScheduling注解来启用Spring定时任务的执行功能。在Spring中只需要通过以上的几步设置,就可以完成一个任务调度的功能。
2.1.2 示例代码
@Component
@EnableScheduling
public class MyTask {
@Scheduled(cron = "0/5 * * * * ?")
public void cronFun(){
System.out.println("Thread "+Thread.currentThread().getName()+" execute cronFun! "+DateUtil.DateToString(new Date()));
}
@Scheduled(fixedDelay = 1000 * 3,initialDelay = 1000 * 5)
public void fixedDelayFun(){
System.out.println("Thread "+Thread.currentThread().getName()+" execute fixedDelayFun! "+DateUtil.DateToString(new Date()));
}
@Scheduled(fixedRate = 1000 * 6, initialDelay = 1000 * 5)
public void fixedRateFun(){
System.out.println("Thread "+Thread.currentThread().getName()+" execute fixedRateFun! "+DateUtil.DateToString(new Date()));
}
}
在以上的示例代码中,分别使用了三种方式来模拟进行了任务的调度。
- cron任务执行表达式的方式:我们定义了一个每5秒钟一次频率的cron表达式。cron表达式是由若干数字、空格、符号按一定的规则,组成的一组字符串,用来表达时间的信息。该字符串由6个空格分为7个域,每一个域代表一个时间含义。
- fixedDelay是以固定的延迟来执行下一个任务。在示例代码中设置了每延迟3秒钟来执行下一个任务。初次执行调度任务会延迟5秒。在每一个任务执行完成后,都会停顿3秒钟,才会再执行下一个任务。
- fixedRate以固定的时间间隔来执行调度任务。在示例代码中设置了每6秒钟的时间间隔频率来执行下一个任务。初次执行调度任务会延迟5秒。
- 使用fixedRate进行任务调度时需要注意的是,若任务的执行时间超过了时间间隔,也就是上一次任务还未执行完毕,而又有执行下一次任务的调度。此时程序会将下一次任务暂时加入worker队列进行等待,直到上一次任务执行完成后,再执行队列中任务。在示例代码中,我们使用线程休眠的方式来模拟任务的执行时间,我们设置任务的执行时间为10秒,超过了调度的时间间隔6秒。那么此时任务调度就会变成每次会过10秒后,才会再执行下次任务。
2.2 SchedulingConfigurer接口方式
在Spring中,虽然使用@Scheduled注解的方式来进行任务调度确实简单易用,但是这种相当于硬编码的方式,一旦设定了任务的执行频率,在任务的执行过程中就无法改变。
Spring还提供了通过实现SchedulingConfigurer接口的方式来配置定时任务。SchedulingConfigurer是一个函数式接口,通过实现SchedulingConfigurer接口来配置任务调度,我们可以在任务运行过程中动态的调整任务的执行时间或频率,而无需修改代码和重启服务。这种方式特别适合于需要根据不同环境和需求,来实现定时任务执行频率差异化的场景。
SchedulingConfigurer接口
@FunctionalInterface
public interface SchedulingConfigurer {
void configureTasks(ScheduledTaskRegistrar taskRegistrar);
}
示例代码:
@Component
public class CronTriggerSchedulingConfigurer implements SchedulingConfigurer {
@Value("${schedul.config.trigger.cron}")
private String cron;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.addTriggerTask(() ->{
System.out.println("this is cron triggerTask! "+DateUtil.DateToString(new Date()));
},
trigger ->{
return new CronTrigger(cron).nextExecutionTime(trigger);
});
}
}
上述的示例代码中CronTriggerSchedulingConfigurer类实现了SchedulingConfigurer接口,在重写接口的configureTasks方法时,我们向ScheduledTaskRegistrar实例注册了一个TriggerTask类型的任务。TriggerTask类的构造函数需要两个参数,一个是代表任务业务逻辑的Runnable实例。另一个是触发任务执行频率的Trigger实例。这里Trigger我们使用了CronTrigger,这就意味着任务调度将以cron表达式定义的方式来执行。
代码中使用了@Value注解来注入cron表达式,在实际开发中,我们就可以使用Apollo配置或者从数据库读取的方式来获取cron表达式。这样当我们修改了cron表达式,程序在下次执行任务时,就读到新的cron表达式,那么任务调度就会以新的频率来执行任务。在示例中cron表达式配置为schedul.config.trigger.cron = 0/6 * * * * ?,表示定时任务会以6秒钟一次的频率来执行。
在configureTasks方法中,如果触发任务执行频率使用了PeriodicTrigger的实例,那么任务调度将会以周期形式来触发执行。PeriodicTrigger是可以用周期时间间隔fixedRate或者周期时间延迟fixedDelay这两种具体形式来触发任务。我们可以通过设置PeriodicTrigger实例的属性fixedRate来具体决定使用哪种方式。
period | long类型 | 表示执行任务的周期时长,fixedRate模式下表示执行任务的时间间隔,fixedDelay模式下表示延迟多长时间后才执行下一个任务。 |
timeUnit | TimeUnit类型 | 表示period周期的时间单位,默认是毫秒。 |
initialDelay | long类型 | 表示延迟多长时间后,才开始执行第一次任务。 |
fixedRate | boolean类型 | 表示是以fixedRate固定时间间隔模式,还是fixedDelay固定延迟模式来执行任务,默认为false也就是fixedDelay模式。 |
@Component
public class PeriodicTriggerSchedulingConfigurer implements SchedulingConfigurer {
@Value("${schedul.config.trigger.periodic}")
private long periodic;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
System.out.println("Periodic triggerTask start at "+DateUtil.DateToString(new Date()));
taskRegistrar.addTriggerTask(() ->{
ConcurrentUtil.Sleep(5);
System.out.println("Thread "+Thread.currentThread().getName()+" execute periodic triggerTask! "+DateUtil.DateToString(new Date()));
},
trigger ->{
PeriodicTrigger periodicTrigger = new PeriodicTrigger(periodic);
periodicTrigger.setInitialDelay(6000);
return periodicTrigger.nextExecutionTime(trigger);
});
}
}
上述代码中的PeriodicTriggerSchedulingConfigurer使用了PeriodicTrigger这个周期性任务的执行触发器。触发器的initialDelay属性设置为6秒,表示第一次任务会延迟6秒后才执行。由于fixedRate属性默认为false, 那么任务将以fixedDelay固定延迟的模式来执行。schedul.config.trigger.periodic设置是的3000毫秒,所以任务在执行完成后,会延迟个3秒钟才能执行下一个的任务。任务执行时间是5秒加上延迟的3秒,所以每执行一次任务的周期是8秒钟。
当我们把PeriodicTrigger触发器的fixedRate属性默认为true时, 那么任务调度就会以fixedRate固定时间间隔的模式来执行。由于任务的周期periodic设置是的3秒,而任务的执行时间是5秒,任务执行时间比周期时间长,最终每次执行任务的时间周期就是5秒钟。
@Component
public class PeriodicTriggerSchedulingConfigurer implements SchedulingConfigurer {
@Value("${schedul.config.trigger.periodic}")
private long periodic;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
System.out.println("Periodic triggerTask start at "+DateUtil.DateToString(new Date()));
taskRegistrar.addTriggerTask(() ->{
System.out.println("Thread "+Thread.currentThread().getName()+" execute periodic triggerTask! "+DateUtil.DateToString(new Date()));
ConcurrentUtil.Sleep(5);
},
trigger ->{
PeriodicTrigger periodicTrigger = new PeriodicTrigger(periodic);
periodicTrigger.setInitialDelay(6000);
periodicTrigger.setFixedRate(true);
return periodicTrigger.nextExecutionTime(trigger);
});
}
}
3.Spring任务调度的原理
3.1 @EnableScheduling注解
我们在使用Spring进行定时任务调度的时候,需要使用@EnableScheduling注解来开启任务调度的功能。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
}
在@EnableScheduling注解的源码中,我们发现@EnableScheduling注解又通过@Import注解引入了SchedulingConfiguration这个配置类。
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {
@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
在SchedulingConfiguration配置类中会使用Bean注解的方式来注入一个ScheduledAnnotationBeanPostProcessor类型的实例。ScheduledAnnotationBeanPostProcessor是一个Spring中典型的后置处理器,它是任务调度的核心类。Spring定时任务调度的功能主要是在这个类中实现的。
其实Spring中有许多类似的@EnableXXX+@Import的组合,这种组合可以实现以可插拔的方式来开启和关闭某项功能,可拓展性极强。
3.2 ScheduledAnnotationBeanPostProcessor
ScheduledAnnotationBeanPostProcessor类间接继承了BeanPostProcessor接口,这个后置处理器实现了BeanPostProcessor接口的postProcessAfterInitialization方法。postProcessAfterInitialization方法是在Bean实例化以后会执行的回调方法。
在ScheduledAnnotationBeanPostProcessor类的postProcessAfterInitialization方法中,Spring会找出所有带有@Scheduled注解的方法,然后再根据@Scheduled注解中配置的任务调度信息,将这些方法注册成一个个的定时任务。
3.2.1调度任务的注册
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
//对一些特殊类型的Bean不做处理,直接返回。
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof ScheduledExecutorService) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
//获取bean的最终目标类
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
//判断目标类中是否含有@Scheduled注解
if (!this.nonAnnotatedClasses.contains(targetClass) &&
AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
//获取目标类标记了Scheduled注解方法
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
//如果目标类中没有@Scheduled注解方法
//就把这个目标类缓存到nonAnnotatedClasses的Set集合中
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
}
}
else {
// Non-empty set of methods
//遍历所有Scheduled注解方法,通过processScheduled方法把@Scheduled注解
//方法注册成定时任务
annotatedMethods.forEach((method, scheduledMethods) ->
scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
if (logger.isTraceEnabled()) {
logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
}
在postProcessAfterInitialization方法中,通过反射的方式获取到所有@Scheduled注解的方法,然后再遍历这些方法,把这些注解和方法作为参数来调用processScheduled方法,processScheduled方法中会根据注解信息的不同,把方法注册成不同类型的定时任务。
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
//通过参数bean和method来创建一个Runnable的实例
Runnable runnable = createRunnable(bean, method);
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
//从Scheduled注解解析获取注解中initialDelay属性值
//initialDelay有数值和字符串两种配置方式,这两方式只能取其一
long initialDelay = scheduled.initialDelay();
String initialDelayString = scheduled.initialDelayString();
if (StringUtils.hasText(initialDelayString)) {
Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
if (this.embeddedValueResolver != null) {
initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
}
if (StringUtils.hasLength(initialDelayString)) {
try {
initialDelay = parseDelayAsLong(initialDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
}
}
}
//从Scheduled注解中获取cron的任务执行表达式
//如果存在cron表达式,则向ScheduledTaskRegistrar注册一个CronTask类型的任务
//注意Scheduled注解中使用了cron表达式,就不能使用initialDelay
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
if (!Scheduled.CRON_DISABLED.equals(cron)) {
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
}
else {
timeZone = TimeZone.getDefault();
}
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
}
// At this point we don't need to differentiate between initial delay set or not anymore
if (initialDelay < 0) {
initialDelay = 0;
}
//从Scheduled注解中获取fixedDelay属性值
//如果配置了fixedDelay属性值,
//则向ScheduledTaskRegistrar注册一个FixedDelayTask类型的任务
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
if (this.embeddedValueResolver != null) {
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
}
if (StringUtils.hasLength(fixedDelayString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedDelay = parseDelayAsLong(fixedDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
}
//从Scheduled注解中获取fixedRate属性值
//如果配置了fixedRate属性值,
//则向ScheduledTaskRegistrar注册一个FixedRateTask类型的任务
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
if (this.embeddedValueResolver != null) {
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
if (StringUtils.hasLength(fixedRateString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedRate = parseDelayAsLong(fixedRateString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
}
// Check whether we had any attribute set
Assert.isTrue(processedSchedule, errorMessage);
// Finally register the scheduled tasks
synchronized (this.scheduledTasks) {
Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
regTasks.addAll(tasks);
}
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException(
"Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
}
processScheduled方法共有三个参数:1. Scheduled注解。2.带有Scheduled注解method方法。3.包含了调度任务method方法Bean。
processScheduled方法中会对Scheduled注解中的属性值进行解析,根据注解中属性值的不同,则向ScheduledTaskRegistrar实例注册不同类型的任务。
- 若Scheduled注解中配置了cron的任务执行表达式,则向ScheduledTaskRegistrar注册一个CronTask类型的任务。
- 若Scheduled注解配置了fixedDelay属性,则向ScheduledTaskRegistrar注册一个FixedDelayTask类型的任务。
- 若Scheduled注解配置了fixedRate属性值,则向ScheduledTaskRegistrar注册一个FixedRateTask类型的任务
- 若Scheduled注解中配置了initialDelay属性,那么FixedDelayTask和FixedRateTask的任务首次执行会进行延迟。
在配置Scheduled注解的属性值时,需要注意的是:
- cron表达式和initialDelay属性不能同时配置。
- cron表达式,fixedDelay,fixedRate这三种任务类型每次只能选择其中的一种类型进行配置。
3.2.2调度任务的执行
程序中把Scheduled注解的方法注册成调度任务后,还需要对这些注册的任务进行执行的操作。
ScheduledAnnotationBeanPostProcessor类继承了ApplicationListener<ContextRefreshed-
Event>接口,实现了接口的onApplicationEvent(E event)方法。所以这个后置处理器类也是一个事件监听器,该监听器主要用于监听ContextRefreshedEvent类型的事件。ContextRefreshedEvent事件类是Spring内部为我们提供的一个事件。该事件类的一个参数就是ApplicationContext对象,即当前的容器。Spring是通过容器的refresh()刷新方法来创建一个IOC容器时,容器创建过程的最后一步finishRefresh()方法会调用publishEvent(new ContextRefreshedEvent(this))方法发布了ContextRefreshedEvent事件。此时Spring容器中的bean都已经创建完成,容器也已经初始化完成了。publishEvent方法会触发ScheduledAnnotationBeanPostProcessor监听器所实现的onApplicationEvent方法。在onApplicationEvent方法中又调用了finishRegistration()方法。在finishRegistration()方法中,就会把已经解析注册任务给调度执行起来。
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext() == this.applicationContext) {
finishRegistration();
}
}
private void finishRegistration() {
if (this.scheduler != null) {
this.registrar.setScheduler(this.scheduler);
}
//这里主要是对SchedulingConfigurer接口方式配置的调度任务进行处理
if (this.beanFactory instanceof ListableBeanFactory) {
//获取容器中所有SchedulingConfigurer类型的实例
Map<String, SchedulingConfigurer> beans =
((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
//遍历容器中的每个SchedulingConfigurer实例,依次调用实例的configureTasks方法
//通过configureTasks方法,就把用户自定义的任务注册到了ScheduledTaskRegistrar实例中
for (SchedulingConfigurer configurer : configurers) {
configurer.configureTasks(this.registrar);
}
}
//当ScheduledTaskRegistrar中没有任务调度的线程池时,
//就会在容器中去查找线程池,如果容器中存在线程池的话,就通过setTaskScheduler方法
//把这个线程池用于执行要进行调度的任务
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
try {
// Search for TaskScheduler bean...
//通过resolveSchedulerBean方法按类型在容器中查找TaskScheduler类型的Bean实例
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
}
catch (NoUniqueBeanDefinitionException ex) {
logger.trace("Could not find unique TaskScheduler bean", ex);
//如果容器中存在多个TaskScheduler类型的Bean实例
//就再通过resolveSchedulerBean方法按名称的方式
//来确定一个TaskScheduler类型的Bean实例
try {
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskScheduler bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
logger.trace("Could not find default TaskScheduler bean", ex);
// Search for ScheduledExecutorService bean next...
//若容器中不存在TaskScheduler类型的实例
//就再去查找ScheduledExecutorService类型的实例。
//也是先按类型再容器中查找ScheduledExecutorService类型的实例
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
}
catch (NoUniqueBeanDefinitionException ex2) {
logger.trace("Could not find unique ScheduledExecutorService bean", ex2);
//同样当容器中存在多个ScheduledExecutorService类型的实例的话
//就会再按名称的方式来确定一个
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
}
catch (NoSuchBeanDefinitionException ex3) {
if (logger.isInfoEnabled()) {
logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex2.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex2) {
logger.trace("Could not find default ScheduledExecutorService bean", ex2);
// Giving up -> falling back to default scheduler within the registrar...
logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
}
}
}
//最后调用ScheduledTaskRegistrar实例的afterPropertiesSet方法
//在该方法中,程序会通过线程池把注册的调度任务给执行起来
this.registrar.afterPropertiesSet();
}
通过分析以上finishRegistration()方法的源码,发现finishRegistration()方法主要做了三件事情:
- 把SchedulingConfigurer接口类型调度任务注册到ScheduledTaskRegistrar实例。
- 给ScheduledTaskRegistrar实例设置一个用于执行调度任务的线程池。
- 调用ScheduledTaskRegistrar实例的afterPropertiesSet()方法,把注册的调度任务给执行起来。
3.2.1.1 SchedulingConfigurer接口调度任务的注册
SchedulingConfigurer接口中就声明了一个方法configureTasks(ScheduledTaskRegistrar taskRegistrar),方法中需要传入一个ScheduledTaskRegistrar类型的实例参数。
从代码中可以看出,Spring会首先从容器中查找所有SchedulingConfigurer类型的Bean实例,对这些Bean实例进行排序,然后依次遍历每个实例的并调用其configureTasks方法。在configureTasks方法中就可以把用户自定义的任务,注册到ScheduledTaskRegistrar实例中。
3.2.1.2 设置线程池
Spring首先将@Scheduled注解的调度任务解析出来后,就会注册到ScheduledTaskRegistrar的实例。接下来就是对这些调度的任务的执行,对于任务的执行,Spring会把这些任务交给线程池来执行,所以到目前为止我们就需要一个线程池。那么执行任务的调度线程池从何而来,Spring是做了如下的处理:
1. Spring会首先在容器中取查找TaskScheduler类型的实例,如果容器中有且只存在一个TaskScheduler类型的实例,就把这个TaskScheduler类型实例的线程池设置到ScheduledTaskRegistrar实例中,以后调度任务的执行就使用这个线程池。
2. 如果容器中存在多个TaskScheduler类型的实例,程序就会抛出NoUniqueBeanDefinitionException异常。程序会捕捉NoUniqueBeanDefinitionException异常,在对NoUniqueBeanDefinitionException异常进行处理时,会再按名称来确定一个TaskScheduler实例。Bean的名称会默认为taskScheduler。
3.如果容器中不存在TaskScheduler类型的实例,程序会抛出NoSuchBeanDefinitionException异常。程序在捕捉到NoSuchBeanDefinitionException异常进行处理的时候,又会在容器中取查找ScheduledExecutorService类型的线程池。
4. 程序查找ScheduledExecutorService类型线程池的步骤和以上查询TaskScheduler线程池步骤大致相似。如果容器中有且只存在一个ScheduledExecutorService实例,就直接返回这个实例。如果存在多个也是抛出NoUniqueBeanDefinitionException异常,在处理NoUniqueBeanDefinitionException这个异常时,同样是再按名称taskScheduler确定一个。若容器不存在ScheduledExecutorService类型的实例,在处理NoSuchBeanDefinitionException异常的时候,只是记录一下日志不做其他任务的处理,表明此时容器中用户没有配置线程池。
在以上对线程池查找的每个步骤中,其实都调用了resolveSchedulerBean的方法。resolveSchedulerBean的方法是通过布尔参数的byName,来确定是按类型查找还是按名称来查找。
private <T> T resolveSchedulerBean(BeanFactory beanFactory, Class<T> schedulerType, boolean byName) {
if (byName) {
T scheduler = beanFactory.getBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, schedulerType);
if (this.beanName != null && this.beanFactory instanceof ConfigurableBeanFactory) {
((ConfigurableBeanFactory) this.beanFactory).registerDependentBean(
DEFAULT_TASK_SCHEDULER_BEAN_NAME, this.beanName);
}
return scheduler;
}
else if (beanFactory instanceof AutowireCapableBeanFactory) {
NamedBeanHolder<T> holder = ((AutowireCapableBeanFactory) beanFactory).resolveNamedBean(schedulerType);
if (this.beanName != null && beanFactory instanceof ConfigurableBeanFactory) {
((ConfigurableBeanFactory) beanFactory).registerDependentBean(holder.getBeanName(), this.beanName);
}
return holder.getBeanInstance();
}
else {
return beanFactory.getBean(schedulerType);
}
}
线程池设置的流程图:
3.2.1.3 执行任务
现在是Spring已经将@Scheduled注解的调度任务解析出来并注册了,执行调度任务线程池也有了(如果用户没有配置线程池,Spring 接下来会自己默认创建一个),接下来就是任务的执行了。
调度任务的执行Spring是调用了ScheduledTaskRegistrar类的afterPropertiesSet方法。在afterPropertiesSet方法中又调用了scheduleTasks方法进行任务的执行。
this.registrar.afterPropertiesSet();
public void afterPropertiesSet() {
scheduleTasks();
}
protected void scheduleTasks() {
//如果程序中没有用户自定义配置线程池,
//程序就会默认创建一个单线程的线程池
if (this.taskScheduler == null) {
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
//对TriggerTask 触发类型的任务进行执行
if (this.triggerTasks != null) {
for (TriggerTask task : this.triggerTasks) {
addScheduledTask(scheduleTriggerTask(task));
}
}
//对CronTask Cron表达式类型的任务进行执行
if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) {
addScheduledTask(scheduleCronTask(task));
}
}
//对固定时间间隔的任务进行执行
if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}
//对固定延迟任务进行执行
if (this.fixedDelayTasks != null) {
for (IntervalTask task : this.fixedDelayTasks) {
addScheduledTask(scheduleFixedDelayTask(task));
}
}
}
从scheduleTasks方法的源码可以看出,Spring会先判断用户有没有配置自定义的线程池,如果没有的话,就会默认创建一个单线程的线程池来执行调度任务。
Spring在解析调度任务的时候,是按照cron表达式,fixedDelay,fixedRate,TriggerTask这几种不同的任务类型进行注册。所以在调度任务执行的时,也是分别遍历各种类型任务的集合,依次来执行各种的调度任务。
3.2.1.3.1 CronTask和TriggerTask任务执行
从scheduleTasks方法的代码可以看出,Spring是通过调用scheduleTriggerTask方法和scheduleCronTask方法来分别执行CronTask和TriggerTask类型的任务。
public ScheduledTask scheduleCronTask(CronTask task) {
ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
boolean newTask = false;
if (scheduledTask == null) {
scheduledTask = new ScheduledTask(task);
newTask = true;
}
if (this.taskScheduler != null) {
scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
}
else {
addCronTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return (newTask ? scheduledTask : null);
}
public ScheduledTask scheduleTriggerTask(TriggerTask task) {
ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
boolean newTask = false;
if (scheduledTask == null) {
scheduledTask = new ScheduledTask(task);
newTask = true;
}
if (this.taskScheduler != null) {
scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
}
else {
addTriggerTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return (newTask ? scheduledTask : null);
}
TaskScheduler线程池实例调用了schedule方法来执行CronTask和TriggerTask任务。在Spring的TaskScheduler线程池类中组合封装了JDK的ScheduledExecutorService实例。TaskScheduler线程池对任务的执行最终都是委托给了ScheduledExecutorService实例来执行的。
我们知道ScheduledExecutorService实例执行schedule方法是一次性的,执行一次后就不会再次执行了,但是实际中的CronTask和TriggerTask任务都是周期循环执行的。这里面其实是Spring自己做了一些工作来支持循环周期执行。原理很简单,简单概括就是Spring在进行schedule任务调度时,每次会根据当前调度的时间信息,计算出下一次任务的执行时间,然后不断的重复进行任务调度。下面我们通过源码来具体分析实现的细节。
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
//获取JDK调度线程池的实例用于执行任务
ScheduledExecutorService executor = getScheduledExecutor();
try {
ErrorHandler errorHandler = this.errorHandler;
if (errorHandler == null) {
errorHandler = TaskUtils.getDefaultErrorHandler(true);
}
//实例化一个ReschedulingRunnable的实例,并调用实例schedule方法来调度任务
return new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule();
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
我们以TaskScheduler接口其中的一个实现类ThreadPoolTaskScheduler为例来进行分析。在schedule方法中,有一行非常关键的代码是用任务,触发器,线程池等参数构建了一个ReschedulingRunnable的对象,然后再调用这个对象的schedule方法。
public ScheduledFuture<?> schedule() {
synchronized (this.triggerContextMonitor) {
//计算出下一次任务的执行时间
this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
if (this.scheduledExecutionTime == null) {
return null;
}
//现在下次任务的执行时间减去当前的时间
//计算出距离下次任务的时间间隔
long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
//将ReschedulingRunnable自身对象作为任务提交给线程池执行
this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
return this;
}
}
在ReschedulingRunnable类中的schedule方法中,Spring会根据CronTask或者TriggerTask任务的触发机制计算出下一次任务的执行时间。然后用计算出的下一次任务的执行时间减去系统当前时间得到距离下一次任务的时间间隔。源码中用这个时间间隔作为任务延迟的参数,把ReschedulingRunnable对象本身this作为任务,调用了ScheduledExecutorService实例的schedule方法来提交任务进行执行。ReschedulingRunnable之所以能够作为任务,是因为这个类实现了Runnable接口。
Runnable接口中定义了一个run方法,当Runnable接口的实例提交到线程池后,其定义的run方法就会被执行。ReschedulingRunnable作为Runnable接口的实现类,自然也给run方法给出了自己的实现。
public void run() {
//记录调度任务实际开始执行的时间
Date actualExecutionTime = new Date();
//这里是执行用户自定义的调度任务
super.run();
//记录调度任务实际执行结束的时间
Date completionTime = new Date();
synchronized (this.triggerContextMonitor) {
Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
//在TriggerContext的上下文中更新任务执行的时间信息
this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
if (!obtainCurrentFuture().isCancelled()) {
//再一次调用schedule方法,触发下一次的任务调度
schedule();
}
}
}
在run方法中,首先调用父类的run方法,这里通过super.run()方法来执行用户自定义的调度任务,也就是@Scheduled注解的方法,并会记录下执行这些任务实际开始和结束时间。然后用任务的调度时间,执行任务实际开始和结束时间,在TriggerContext的上下文中更新任务执行的时间信息。在做完这一切以后,Spring 会再次调用schedule方法触发下一次的任务调度。而这个schedule方法又是把ReschedulingRunnable对象本身作为任务提交,那么ReschedulingRunnable对象作为任务时,其自身的run()方法又会被调用,这样ReschedulingRunnable类的schedule()方法会触发run()方法,而run()方法又会schedule()方法,这样就达到了循环调用的目的。
3.2.1.3.2 FixedRate任务执行
在ScheduledTaskRegistrar实例中,通过调用scheduleFixedRateTask方法来执行FixedRate类型的调度任务。
public ScheduledTask scheduleFixedRateTask(FixedRateTask task) {
ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
boolean newTask = false;
if (scheduledTask == null) {
scheduledTask = new ScheduledTask(task);
newTask = true;
}
if (this.taskScheduler != null) {
if (task.getInitialDelay() > 0) {
Date startTime = new Date(System.currentTimeMillis() + task.getInitialDelay());
scheduledTask.future =
this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), startTime, task.getInterval());
}
else {
scheduledTask.future =
this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), task.getInterval());
}
}
else {
addFixedRateTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return (newTask ? scheduledTask : null);
}
在scheduleFixedRateTask方法中,对FixedRate类型调度任务的执行其实又是调用了TaskScheduler线程池实例的scheduleWithFixedDelay方法。
本例中我们以TaskScheduler线程池实例之一的ThreadPoolTaskScheduler类来进行分析。ThreadPoolTaskScheduler类的scheduleWithFixedDelay方法的源码如下。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period) {
//获取JDK的ScheduledExecutorService实例线程池
ScheduledExecutorService executor = getScheduledExecutor();
long initialDelay = startTime.getTime() - System.currentTimeMillis();
try {
//调用JDK线程池的scheduleAtFixedRate方法来执行固定间隔的任务
return executor.scheduleAtFixedRate(errorHandlingTask(task, true), initialDelay, period, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
从以上源码中可以出,FixedRate类型的任务执行比较简单,由于JDK中的ScheduledExecutorService线程池本身就支持执行FixedRate这种固定时间间隔的任务。所以ThreadPoolTaskScheduler类中会先实例化一个ScheduledExecutorService类型的线程池,把任务封装成了一个DelegatingErrorHandlingRunnable类型的任务,然后程序就直接调用ScheduledExecutorService线程池实例的scheduleAtFixedRate方法来执行调度任务。
3.2.1.3.2 FixedDelay任务执行
FixedDelay和FixedRate任务的实现原理十分相似,由于JDK中的ScheduledExecutorService线程池也支持执行FixedDelay这种固定时间延迟的任务。所以程序中对FixedDelay任务,最终是委托给了ScheduledExecutorService线程池实例来执行。