Quartz任务调度框架实现任务动态执行
说明:之前使用Quartz,都是写好Job,指定一个时间点,到点执行。最近有个需求,需要根据前端用户设置的时间点去执行,也就是说任务执行的时间点是动态变化的。本文介绍如何用Quartz任务调度框架实现任务动态执行。
基本实现
(0)思路
基本思路是,根据用户传入的时间点,转为cron表达式,定时去刷新容器内的JOB,更新任务的cron表达式。
(1)搭建项目
创建一个项目,pom文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.12</version>
<relativePath/>
</parent>
<groupId>com.hezy</groupId>
<artifactId>auto_quartz</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.3</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.6</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.8</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
我项目的是postgresql,application.yml如下
server:
port: 8988
spring:
datasource:
url: jdbc:postgresql://localhost:5432/demo
username: postgres
password: 123456
driver-class-name: org.postgresql.Driver
创建一张任务调度表,包含了任务名(对应Java类中JOB的Bean名称),任务的cron表达式;
-- 创建任务调度表
create table public.t_task
(
id varchar(32) not null
constraint pc_task_id
primary key,
task_name varchar(50) not null,
cron varchar(50) not null,
is_deleted integer default 0 not null
);
comment on table public.t_task is '任务调度表';
comment on column public.t_task.task_name is '任务名';
comment on column public.t_task.cron is 'cron表达式';
comment on column public.t_task.is_deleted is '是否删除,1是,0否,默认0';
(2)引入Quartz
创建两个关于Quartz的配置类
(MyJobFactory)
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.scheduling.quartz.AdaptableJobFactory;
import org.springframework.stereotype.Component;
/**
* 任务工厂
*/
@Component
public class MyJobFactory extends AdaptableJobFactory {
@Autowired
private AutowireCapableBeanFactory capableBeanFactory;
/**
* 创建JOB实例
*/
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Object jobInstance = super.createJobInstance(bundle);
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
(TaskConfig)
import org.quartz.Scheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
/**
* 任务配置类
*/
@Configuration
@DependsOn("myJobFactory")
public class TaskConfig {
@Autowired
private MyJobFactory myJobFactory;
/**
* 加载任务工厂
*/
@Bean
public SchedulerFactoryBean schedulerFactoryBean() {
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
schedulerFactoryBean.setJobFactory(myJobFactory);
return schedulerFactoryBean;
}
@Bean
public Scheduler scheduler() {
return schedulerFactoryBean().getScheduler();
}
}
(3) 数据库操作实现
创建一个TaskDTO
import lombok.Data;
@Data
public class TaskDTO {
private String id;
/**
* 任务名
*/
private String taskName;
/**
* CRON表达式
*/
private String cron;
}
写一个对应的Mapper,里面创建一个查询方法
import com.hezy.pojo.TaskDTO;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import java.util.List;
@Mapper
public interface TaskMapper {
@Select("select id, task_name taskName, cron from t_task where is_deleted = 0")
List<TaskDTO> getAllTask();
}
插入两条数据,如下:
insert into public.t_task (id, task_name, cron, is_deleted)
values ('1', 'SimpleTask1', '0/5 * * * * ?', 0);
insert into public.t_task (id, task_name, cron, is_deleted)
values ('2', 'SimpleTask2', '0/30 * * * * ?', 0);
测试,没得问题
import com.hezy.mapper.TaskMapper;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class TaskMapperTest {
@Autowired
private TaskMapper taskMapper;
@Test
public void testInsert() {
System.out.println(taskMapper.getAllTask());
}
}
(4)创建任务容器
创建一个任务容器,每10秒去刷新容器内的所有Job的cron表达式
import com.hezy.constant.TaskConst;
import com.hezy.mapper.TaskMapper;
import com.hezy.pojo.TaskDTO;
import lombok.extern.log4j.Log4j2;
import org.quartz.SchedulerException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 任务容器
*/
@Component
@DependsOn(value = {"quartzManager"})
@Log4j2
public class TaskContext {
@Autowired
private QuartzManager quartzManager;
@Autowired
private TaskMapper taskMapper;
/**
* 更新任务
* 刷新频率:10秒
*/
@Scheduled(fixedRate = 10000)
public void update() {
try {
// 查出所有任务
List<TaskDTO> taskDTOS = taskMapper.getAllTask();
// 遍历操作
for (TaskDTO taskDto : taskDTOS) {
this.quartzManager.startJobTask(taskDto.getTaskName(), TaskConst.GroupEnum.SYSTEM.name(), taskDto.getCron());
}
} catch (SchedulerException e) {
log.error("初始化定时任务异常", e);
}
}
}
需要注意,@Scheduled注解,需要在启动类上加@EnableScheduling注解才能生效
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class Start {
public static void main(String[] args) {
SpringApplication.run(Start.class, args);
}
}
创建一个常量,表示任务的组名
(TaskConst)
/**
* 任务常量
*/
public class TaskConst {
/**
* 任务分组
*/
public enum GroupEnum {
SYSTEM;
}
}
任务管理器,对比传入的任务名、任务cron,更新或创建对应的任务;
import lombok.extern.log4j.Log4j2;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 定时任务管理器
* @author hezhongying
* @create 2024/12/11
*/
@Configuration
@Log4j2
public class QuartzManager {
@Autowired
private Scheduler scheduler;
@Autowired
private List<Job> taskList;
private Map<String, Job> taskMap;
@PostConstruct
public void init() {
this.taskMap = taskList.stream().collect(Collectors.toMap(t -> t.getClass().getSimpleName(), t -> t));
}
/**
* 启动任务
* @param jobName 任务名
* @param group 组名
* @param cron cron表达式
* @throws SchedulerException
*/
public void startJobTask(String jobName, String group, String cron) throws SchedulerException {
// 创建Job
JobKey jobKey = new JobKey(jobName, group);
// 判断任务是否已存在,不存在新增,存在更新
if (scheduler.checkExists(jobKey)) {
if (modifyJob(jobName, group, cron)) {
log.info("任务:{} 修改成功", jobName);
}
} else {
if (createJob(jobName, group, cron)) {
log.info("任务:{} 新增成功", jobName);
}
}
}
/**
* 新增任务
* @param jobName 任务名称
* @param group 分组
* @param cron CRON表达式
* @throws SchedulerException
*/
private boolean createJob(String jobName, String group, String cron) throws SchedulerException {
// 任务新增
if (taskMap.containsKey(jobName)) {
// 执行任务
Class<? extends Job> taskClazz = taskMap.get(jobName).getClass();
JobDetail jobDetail = JobBuilder.newJob(taskClazz).withIdentity(jobName, group).build();
// 执行时间正则
CronScheduleBuilder cronBuilder = CronScheduleBuilder.cronSchedule(cron);
CronTrigger cronTrigger = TriggerBuilder
.newTrigger()
.withIdentity(jobName, group)
.withSchedule(cronBuilder)
.build();
scheduler.scheduleJob(jobDetail, cronTrigger);
return true;
} else {
log.debug("任务没有配置执行配置{}", jobName);
}
return false;
}
/**
* 修改
*
* @param jobName 任务名
* @param group 组名
* @param cron cron表达式
* @return true,修改成功;false,修改失败
* @throws SchedulerException
*/
public boolean modifyJob(String jobName, String group, String cron) throws SchedulerException {
TriggerKey triggerKey = new TriggerKey(jobName, group);
Trigger trigger = scheduler.getTrigger(triggerKey);
if (trigger == null) {
log.info("未存在的触发器[{}-{}]", jobName, group);
return false;
}
String oldCron = ((CronTrigger) trigger).getCronExpression();
// 判断cron是否相等,相等就不用改
if (!cron.equals(oldCron)) {
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cron);
CronTrigger newTrigger = TriggerBuilder
.newTrigger()
.withIdentity(jobName, group)
.withSchedule(cronScheduleBuilder)
.build();
Date date = scheduler.rescheduleJob(triggerKey, newTrigger);
return date != null;
} else {
log.info("任务:{} CRON表达式没有变化", jobName);
}
return false;
}
/**
* 暂停所有任务
*
* @throws SchedulerException
*/
public void pauseAll() throws SchedulerException {
this.scheduler.pauseAll();
}
/**
* 指定任务暂停
*
* @param jobName 任务名
* @param group 组名
* @throws SchedulerException
*/
public void pause(String jobName, String group) throws SchedulerException {
JobKey jobKey = new JobKey(jobName, group);
JobDetail jobDetail = this.scheduler.getJobDetail(jobKey);
if (jobDetail == null) {
return;
}
this.scheduler.pauseJob(jobKey);
}
/**
* 恢复所有任务
*
* @throws SchedulerException
*/
public void resumeAllJob() throws SchedulerException {
this.scheduler.resumeAll();
}
/**
* 删除指定任务
*
* @param jobName 任务名
* @param group 组名
* @throws SchedulerException
*/
public void delete(String jobName, String group) throws SchedulerException {
JobKey jobKey = new JobKey(jobName, group);
JobDetail jobDetail = this.scheduler.getJobDetail(jobKey);
if (jobDetail == null) {
return;
}
this.scheduler.deleteJob(jobKey);
}
}
注意这行代码,可自动注入容器内所有的Job,也就是所有实现了Quartz框架Job接口的类;
@Autowired
private List<Job> taskList;
(5)创建两个Job
创建两个简单的Job
(SimpleTask1)
import lombok.extern.log4j.Log4j2;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 简单任务1
*/
@Component
@Log4j2
public class SimpleTask1 implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) {
log.info("{} 定时任务执行时间:{}", "SimpleTask1", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
}
}
(SimpleTask2)
import lombok.extern.log4j.Log4j2;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 简单任务2
*/
@Component
@Log4j2
public class SimpleTask2 implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) {
log.info("{} 定时任务执行时间:{}", "SimpleTask2", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
}
}
(6)启动测试
启动项目,查看控制台
此时,项目不停,修改数据对应任务的cron表达式,
查看控制台
到这,动态调度任务就实现了。
更进一步
回到我最开始提到的需求,再来考虑一些细节。
-
前端传递的时间如何转为cron表达式;
-
如何实现不同任务对同一个Job的动态执行;
第一个问题,前端传递的任务数据,可能是这样的,任务“每天、每周、每月”,然后设置一个时间点,要求任务按照这个周期来执行,如果将这些信息转为对应的cron表达式,需要有一段代码来实现这些映射关系。
如下:
/**
* 生成CRON表达式
* @param type 类型:每天、每周、每月
* @param hour 时
* @param minute 分
* @param dayOfWeek 星期
* @param dayOfMonth 日期
* @return CRON表达式或null
*/
public static String generateCronExpression(int type, int hour, int minute, int dayOfWeek, int dayOfMonth) {
switch (type) {
case 1:
return String.format("0 %d %d * * ?", minute, hour);
case 2:
return String.format("0 %d %d ? * %s", minute, hour, TaskConst.DayOfWeekEnum.getByCode(dayOfWeek).getName());
case 3:
return String.format("0 %d %d %d * ?", minute, hour, dayOfMonth);
default:
return null;
}
}
(星期映射)
cron表达式末尾不能填对应星期的数字,而需要填星期几的英文简称
/**
* 星期枚举
*/
public enum DayOfWeekEnum {
SUNDAY(1, "SUN"),
MONDAY(2, "MON"),
TUESDAY(3, "TUE"),
WEDNESDAY(4, "WED"),
THURSDAY(5, "THU"),
FRIDAY(6, "FRI"),
SATURDAY(7, "SAT");
private final Integer code;
private final String name;
DayOfWeekEnum(Integer code, String name) {
this.code = code;
this.name = name;
}
public Integer getCode() {
return code;
}
public String getName() {
return name;
}
/**
* 根据code查询枚举
*/
public static DayOfWeekEnum getByCode(Integer code) {
for (DayOfWeekEnum value : DayOfWeekEnum.values()) {
if (value.getCode().equals(code)) {
return value;
}
}
return null;
}
/**
* 根据name查询枚举
*/
public static DayOfWeekEnum getByName(String name) {
for (DayOfWeekEnum value : DayOfWeekEnum.values()) {
if (value.getName().equals(name)) {
return value;
}
}
return null;
}
}
但这种方式局限于我上面说的,只能“每天、每周、每月”的某时、某分执行,而不能更精准到秒、或者每月的最后一周,这样更灵活的设置。(思考)或许可以考虑直接将cron表达式的设置,开放给前端,由用户直接来设置,后端只需校验cron的合法性即可。
第二个问题,有这样的情况:我的任务调度表中的多条记录,执行的是同一个Job,我的Job中会根据记录的唯一ID去查找这个记录对应的数据来处理。这种情况要怎样,还能实现动态调度。
如下,两条记录,对应同一个Job,但是cron不同,执行周期不同;
首先,在定时刷新这里,就不能只传入taskName,再传入一个唯一标识,我这里就传入ID;
/**
* 更新任务
* 刷新频率:10秒
*/
@Scheduled(fixedRate = 10000)
public void update() {
try {
// 查出所有任务
List<TaskDTO> taskDTOS = taskMapper.taskInfo();
// 遍历操作
for (TaskDTO taskDto : taskDTOS) {
// 带上唯一标识
this.quartzManager.startJobTask(taskDto.getTaskName(), taskDto.getId(), TaskConst.GroupEnum.SYSTEM.name(), taskDto.getCron());
}
} catch (SchedulerException e) {
log.error("初始化定时任务异常", e);
}
}
任务管理器这里,判断是否有当前任务,就用任务名_唯一标识
判断,绑定Job的时候,就用Job名,即数据库记录的taskName,如下:
/**
* 启动任务
* @param name Job名,即Job类名
* @param id 唯一标识
* @param group 组名
* @param cron cron表达式
* @throws SchedulerException
*/
public void startJobTask(String name, String id, String group, String cron) throws SchedulerException {
// 任务名,用Job名和唯一标识拼接
String jobName = String.format("%s_%s", name, id);
// 创建Job
JobKey jobKey = new JobKey(jobName, group);
// 判断任务是否已存在,不存在新增,存在更新
if (scheduler.checkExists(jobKey)) {
if (modifyJob(jobName, id, group, cron)) {
log.info("任务:{} 修改成功", jobName);
}
} else {
if (createJob(name, jobName, id, group, cron)) {
log.info("任务:{} 新增成功", jobName);
}
}
}
新增Job
/**
* 新增任务
* @param name JOB名
* @param jobName 任务名称
* @param id 唯一标识
* @param group 分组
* @param cron CRON表达式
* @throws SchedulerException
*/
private boolean createJob(String name, String jobName, String id, String group, String cron) throws SchedulerException {
// 任务新增,用Job名判断当前容器中是否存在这个Job
if (taskMap.containsKey(name)) {
// 执行任务:获取Job类时,也用Job名
Class<? extends Job> taskClazz = taskMap.get(name).getClass();
// 创建任务式,用Job和唯一标识拼接作为任务名
JobDetail jobDetail = JobBuilder.newJob(taskClazz).withIdentity(jobName, group).build();
// 创建 JobDataMap 并放入唯一标识 id
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("id", id);
// 执行时间正则
CronScheduleBuilder cronBuilder = CronScheduleBuilder.cronSchedule(cron);
CronTrigger cronTrigger = TriggerBuilder
.newTrigger()
.withIdentity(jobName, group)
.usingJobData(jobDataMap)
.withSchedule(cronBuilder)
.build();
scheduler.scheduleJob(jobDetail, cronTrigger);
return true;
} else {
log.debug("任务没有配置执行配置{}", jobName);
}
return false;
}
修改Job
/**
* 修改
*
* @param jobName 任务名
* @param group 组名
* @param cron cron表达式
* @return true,修改成功;false,修改失败
* @throws SchedulerException
*/
public boolean modifyJob(String jobName, String id, String group, String cron) throws SchedulerException {
TriggerKey triggerKey = new TriggerKey(jobName, group);
Trigger trigger = scheduler.getTrigger(triggerKey);
if (trigger == null) {
log.info("未存在的触发器[{}-{}]", jobName, group);
return false;
}
// 创建 JobDataMap 并放入唯一标识 id
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("id", id);
String oldCron = ((CronTrigger) trigger).getCronExpression();
// 判断cron是否相等,相等就不用改
if (!cron.equals(oldCron)) {
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cron);
CronTrigger newTrigger = TriggerBuilder
.newTrigger()
.withIdentity(jobName, group)
.usingJobData(jobDataMap)
.withSchedule(cronScheduleBuilder)
.build();
Date date = scheduler.rescheduleJob(triggerKey, newTrigger);
return date != null;
} else {
log.info("任务:{} CRON表达式没有变化", jobName);
}
return false;
}
Job实现里,打印唯一标识,用于区分。如果你有业务需求,完全可以用这个唯一标识,去查询数据库,找出这个唯一标识所拥有的数据,然后做一系列操作。
/**
* 简单任务1
*/
@Component
@Log4j2
public class SimpleTask1 implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) {
log.info("{} 定时任务执行时间:{}", jobExecutionContext.getMergedJobDataMap().get("id"), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
}
}
启动程序,可见两条记录各自的执行周期。
一个Job,各自运行,让博主想起黑塞 《悉达多》中的一句话:可是也有另一些人,一些为数不多的人,像沿着一条固定轨道运行的星星,没有风吹得到它们;它们有自身的规律和轨道。
总结
本文介绍了如何使用Quartz任务调度框架实现任务动态执行,参考下面这篇文章:
- SpringBoot整合quartz完成定时任务执行配置热修改