从零搭建微服务项目Pro(第1-1章——Quartz实现定时任务模块)
前言:
在企业项目中,往往有定时任务发布的需求,比如每天晚9点将今日数据备份一次,或每月一号将上月的销售数据邮件发送给对应的工作人员。显然这些操作不可能是人工到时间点调用一次接口,需要编写专门的模块完成任务的调度。
Quartz 是一个强大的 Java 作业调度库,支持持久化和集群模式,适合需要高可用性和负载均衡的应用场景。其使用方法相对简单。只需要实现Job接口定义调度任务,通过JobDetail传入调度所需数据,并定义触发器,即可通过Scheduler类完成任务调度,其中每个任务对应一组Trigger和JobDetail类。下为每5秒调用一次DateJob类的execute方法,共调用6次的示例。
public class DateJob implements Job {
@Override
public void execute(JobExecutionContext arg0) throws JobExecutionException {
System.out.println(new Date());
}
}
//从标准SchedulerFactory获取默认scheduler
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
JobDetail job = JobBuilder.newJob(DateJob.class).withIdentity("job1", "group1").build();
Trigger trigger1 = TriggerBuilder.newTrigger()
.withIdentity("trigger1", "group1").startNow()
.withSchedule(SimpleScheduleBuilder
.simpleSchedule()
.withIntervalInSeconds(5)
.withRepeatCount(5))
.build();
scheduler.scheduleJob(job, trigger1);
scheduler.start();
但如果有多个定时任务时就需要定义多个实现类,且无法将任务状态维护在数据库中,针对上述问题,本文实现一种基于Quartz的定时任务模块,只需要将定时任务所需要的参数定义在数据库中,模块可从数据库中生成对应的定时任务并进行维护。本文将从易到难逐步实践搭建完整模块并验证各部分代码内容。具体实现类图如下,其中虚线框部分为Quartz自带库。
上述类图为模块简化版本,实际还能引入装饰器模式、事件订阅和发布、AOP处理异常等进一步优化,本章将详细讲述如何从零搭建一个Quartz模块,并在后续通过一或两章的内容补充上述功能,并将模块整合至之前的微服务项目Base中,链接如下。
从零搭建微服务项目Base(第0章——微服务项目结构搭建)_从0创建微服务项目-CSDN博客https://blog.csdn.net/wlf2030/article/details/145206361本章项目源码如下:
wlf728050719/SpringCloudPro1-1https://github.com/wlf728050719/SpringCloudPro1-1以及本专栏会持续更新微服务项目,每一章的项目都会基于前一章项目进行功能的完善,欢迎小伙伴们关注!同时如果只是对单章感兴趣也不用从头看,只需下载前一章项目即可,每一章都会有前置项目准备部分,跟着操作就能实现上一章的最终效果,当然如果是一直跟着做可以直接跳过这一部分。其中Base篇为基础微服务搭建,Pro篇为复杂模块实现。
一、项目创建
1.Spring Boot项目
2.添加Quartz依赖(也可不加,后续可直接替换pom文件导入依赖)
3.删去无用文件以及resources下application.properties,创建application.yml内容如下:
(记得替换成自己的数据库,并测试连接成功,否则启动服务时会报找不到数据源)
server:
port: 6699
spring:
datasource:
url: jdbc:mysql://localhost:3306/db_quartz?useSSL=false
username: root
password: 15947035212
driver-class-name: com.mysql.jdbc.Driver
application:
name: quartz
mybatis:
mapper-locations: classpath:mapper/*Mapper.xml
type-aliases-package: cn.bit.pro1_1.core.mapper
4.pom内容替换为下面内容:(尽量保证依赖版本相同,特别是mybatis版本与spring版本一致)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.bit</groupId>
<artifactId>Pro1_1</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>Pro1_1</name>
<description>Pro1_1</description>
<dependencies>
<!-- Web依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.9.RELEASE</version>
</dependency>
<!-- Quartz -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<!-- Mysql驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!--Mybatis-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
<version>1.18.36</version>
<optional>true</optional>
</dependency>
<!--test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
创建包结构如下,最外层中controller为quartz模块对外接口,core为核心,service中定义类为定时任务真正执行的业务类,core中enums定义任务执行的结果、任务类型、任务状态枚举,exception定义任务执行的异常类,handler定义java,springbean的任务处理类以及接口和工厂类。mapper和service主要服务操纵数据库任务数据,util定义工具类。
二、Scheduler入门demo
service包下创建DateService类,controller创建QuartzController类。
DateService实现job接口,执行方法为输出当前时间。QuartzController定义一个对外接口,测试quratz的简单使用。
package cn.bit.pro1_1.service;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import java.util.Date;
public class DateService implements Job {
@Override
public void execute(JobExecutionContext arg0) throws JobExecutionException {
System.out.println(new Date());
}
}
package cn.bit.pro1_1.controller;
import cn.bit.pro1_1.service.DateService;
import lombok.AllArgsConstructor;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/quartz")
@AllArgsConstructor
public class QuartzController {
@GetMapping("/test")
public String test() throws SchedulerException {
//从标准SchedulerFactory获取默认scheduler
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
JobDetail job = JobBuilder.newJob(DateService.class).withIdentity("job1", "group1").build();
Trigger trigger1 = TriggerBuilder.newTrigger()
.withIdentity("trigger1", "group1").startNow()
.withSchedule(SimpleScheduleBuilder
.simpleSchedule()
.withIntervalInSeconds(5)
.withRepeatCount(5))
.build();
scheduler.scheduleJob(job, trigger1);
scheduler.start();
return "ok";
}
}
启动服务:
能够看到控制台每五秒输出一次当前时间并执行6次。如果显示网页错误可能端口被占用,可以自行在application.xml文件中配置端口。
三、任务相关类定义
创建Task类以及对应枚举以及异常类,具体如下图:
Task内容如下:
package cn.bit.pro1_1.core;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class Task {
private Integer id;
private String taskName;
private String taskGroup;
private Integer type;//1、java类 2、Spring Bean 3、http请求
private String beanName;//bean名称
private String className;//java类名
private String path;//rest请求路径
private String methodName;//方法名
private String params;//方法参数
private String cronExpression;//cron表达式
private String description;//描述
private Integer status;//任务当前状态
private Integer result;//任务执行结果
}
枚举内容分别如下:
package cn.bit.pro1_1.core.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public enum Result {
FAIL(0,"失败"),
SUCCESS(1,"成功")
;
private final Integer code;
private final String desc;
}
package cn.bit.pro1_1.core.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public enum TaskStatus {
PAUSE(0, "暂停中"),
RUNNING(1, "运行中");
private final Integer code;
private final String desc;
}
package cn.bit.pro1_1.core.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public enum TaskType {
JAVA_CLASS(1,"java类"),
SPRING_BEAN(2,"spring bean"),
HTTP(3,"http");
private final Integer code;
private final String desc;
}
异常类
package cn.bit.pro1_1.core.exception;
import cn.bit.pro1_1.core.Task;
import lombok.Getter;
@Getter
public class TaskException extends Exception {
private final Task task;
public TaskException(String message, Task task) {
super(message);
this.task = task;
}
}
四、handler接口、实现类以及工厂类
在util中创建SpringContextHolder工具类方便其他类拿取ApplicationContext。
package cn.bit.pro1_1.core.util;
import lombok.Getter;
import lombok.NonNull;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;
@Service
public class SpringContextHolder implements ApplicationContextAware {
@Getter
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(@NonNull ApplicationContext applicationContext) throws BeansException {
SpringContextHolder.applicationContext = applicationContext;
}
}
在handler中创建如下类和接口。
ITaskHandler内容
package cn.bit.pro1_1.core.handler;
import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.exception.TaskException;
public interface ITaskHandler {
void invoke(Task task) throws TaskException;
}
JavaClassTaskHandler内容,使用全类名调用方法
package cn.bit.pro1_1.core.handler.impl;
import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.enums.Result;
import cn.bit.pro1_1.core.exception.TaskException;
import cn.bit.pro1_1.core.handler.ITaskHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@Slf4j
@Component
public class JavaClassTaskHandler implements ITaskHandler {
@Override
public void invoke(Task task) throws TaskException {
try {
Object target;
Class<?> clazz;
Method method;
Result returnValue;
clazz = Class.forName(task.getClassName());
target = clazz.newInstance();
if (task.getParams() == null || task.getParams().isEmpty()) {
method = target.getClass().getDeclaredMethod(task.getMethodName());
ReflectionUtils.makeAccessible(method);
returnValue = (Result) method.invoke(target);
} else {
method = target.getClass().getDeclaredMethod(task.getMethodName(), String.class);
ReflectionUtils.makeAccessible(method);
returnValue = (Result) method.invoke(target, task.getParams());
}
//判断业务是否执行成功
if (returnValue == null || Result.FAIL.equals(returnValue))
throw new TaskException("JavaClassTaskHandler方法执行失败", task);
} catch (NoSuchMethodException e) {
throw new TaskException("JavaClassTaskHandler找不到对应方法", task);
} catch (InvocationTargetException | IllegalAccessException e) {
throw new TaskException("JavaClassTaskHandler执行反射方法异常", task);
} catch (ClassCastException e) {
throw new TaskException("JavaClassTaskHandler方法返回值定义错误", task);
} catch (ClassNotFoundException e) {
throw new TaskException("JavaClassTaskHandler找不到对应类", task);
} catch (InstantiationException e) {
throw new TaskException("JavaClassTaskHandler实例化错误", task);
}
}
}
SpringBeanTaskHandler内容,通过Spring Bean的名称获取对应类以及方法。
package cn.bit.pro1_1.core.handler.impl;
import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.enums.Result;
import cn.bit.pro1_1.core.exception.TaskException;
import cn.bit.pro1_1.core.handler.ITaskHandler;
import cn.bit.pro1_1.core.util.SpringContextHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@Slf4j
@Component
public class SpringBeanTaskHandler implements ITaskHandler {
@Override
public void invoke(Task task) throws TaskException {
try {
Object target;
Method method;
Result returnValue;
//上下文寻找对应bean
target = SpringContextHolder.getApplicationContext().getBean(task.getBeanName());
//寻找对应方法
if(task.getParams()==null|| task.getParams().isEmpty())
{
method = target.getClass().getDeclaredMethod(task.getMethodName());
ReflectionUtils.makeAccessible(method);
returnValue = (Result) method.invoke(target);
}
else{
method = target.getClass().getDeclaredMethod(task.getMethodName(), String.class);
ReflectionUtils.makeAccessible(method);
returnValue = (Result) method.invoke(target, task.getParams());
}
//判断业务是否执行成功
if(returnValue==null || Result.FAIL.equals(returnValue))
throw new TaskException("SpringBeanTaskHandler方法执行失败", task);
}catch (NoSuchBeanDefinitionException e){
throw new TaskException("SpringBeanTaskHandler找不到对应bean", task);
} catch (NoSuchMethodException e) {
throw new TaskException("SpringBeanTaskHandler找不到对应方法", task);
} catch (InvocationTargetException | IllegalAccessException e) {
throw new TaskException("SpringBeanTaskHandler执行反射方法异常", task);
} catch (ClassCastException e) {
throw new TaskException("SpringBeanTaskHandler方法返回值定义错误", task);
}
}
}
TaskHandlerFactory内容,根据Task类型判断使用哪种handler处理
package cn.bit.pro1_1.core.handler;
import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.enums.TaskType;
import cn.bit.pro1_1.core.handler.impl.JavaClassTaskHandler;
import cn.bit.pro1_1.core.handler.impl.SpringBeanTaskHandler;
import cn.bit.pro1_1.core.util.SpringContextHolder;
import org.springframework.stereotype.Component;
@Component
public class TaskHandlerFactory {
public static ITaskHandler getTaskHandler(Task task) {
ITaskHandler taskHandler = null;
if(TaskType.SPRING_BEAN.getCode().equals(task.getType())) {
taskHandler = SpringContextHolder.getApplicationContext().getBean(SpringBeanTaskHandler.class);
}
if(TaskType.JAVA_CLASS.getCode().equals(task.getType())) {
taskHandler = SpringContextHolder.getApplicationContext().getBean(JavaClassTaskHandler.class);
}
return taskHandler;
}
}
五、业务定义
最外层的service层中再创建两个测试业务分别测试bean名称调用和java全类名调用两种业务。
内容如下:
package cn.bit.pro1_1.service;
import cn.bit.pro1_1.core.enums.Result;
import org.springframework.stereotype.Service;
@Service("test")
public class TestBeanService {
public Result test(String param){
System.out.println(param);
return Result.SUCCESS;
}
}
package cn.bit.pro1_1.service;
import cn.bit.pro1_1.core.enums.Result;
public class TestJavaService {
public Result test(String param){
System.out.println(param);
return Result.SUCCESS;
}
}
controller中增加测试接口test1和test2
package cn.bit.pro1_1.controller;
import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.exception.TaskException;
import cn.bit.pro1_1.core.handler.ITaskHandler;
import cn.bit.pro1_1.core.handler.TaskHandlerFactory;
import cn.bit.pro1_1.service.DateService;
import lombok.AllArgsConstructor;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/quartz")
@AllArgsConstructor
public class QuartzController {
@GetMapping("/test")
public String test() throws SchedulerException {
//从标准SchedulerFactory获取默认scheduler
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
JobDetail job = JobBuilder.newJob(DateService.class).withIdentity("job1", "group1").build();
Trigger trigger1 = TriggerBuilder.newTrigger()
.withIdentity("trigger1", "group1").startNow()
.withSchedule(SimpleScheduleBuilder
.simpleSchedule()
.withIntervalInSeconds(5)
.withRepeatCount(5))
.build();
scheduler.scheduleJob(job, trigger1);
scheduler.start();
return "ok";
}
@GetMapping("/test1")
public String test1()
{
Task task = new Task();
task.setType(2);
task.setBeanName("test");
task.setMethodName("test");
task.setParams("test");
ITaskHandler taskHandler = TaskHandlerFactory.getTaskHandler(task);
try {
taskHandler.invoke(task);
} catch (TaskException e) {
e.printStackTrace();
return "error";
}
return "success";
}
@GetMapping("/test2")
public String test2()
{
Task task = new Task();
task.setType(1);
task.setClassName("cn.bit.pro1_1.service.TestJavaService");
task.setMethodName("test");
task.setParams("test");
ITaskHandler taskHandler = TaskHandlerFactory.getTaskHandler(task);
try {
taskHandler.invoke(task);
} catch (TaskException e) {
e.printStackTrace();
return "error";
}
return "success";
}
}
调用test1
再调用test2
无论是bean调用还是类名调用均成功。
六、任务调度器
core中定义SysJob以及TaskManager类
SysJob实现job接口,完成之前反射调用的封装。内容如下:
package cn.bit.pro1_1.core;
import cn.bit.pro1_1.core.exception.TaskException;
import cn.bit.pro1_1.core.handler.ITaskHandler;
import cn.bit.pro1_1.core.handler.TaskHandlerFactory;
import lombok.extern.slf4j.Slf4j;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
@Slf4j
public class SysJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext){
Task task = (Task) jobExecutionContext.getJobDetail().getJobDataMap().get("task");
ITaskHandler handler = TaskHandlerFactory.getTaskHandler(task);
try {
handler.invoke(task);
} catch (TaskException e) {
log.error("{},Task:{}", e.getMessage(), e.getTask().toString());
}
}
}
TaskManger则对外提供服务管理调度器中任务状态。内容如下:(其中注释部分为持久化部分)
package cn.bit.pro1_1.core;
import cn.bit.pro1_1.core.enums.TaskStatus;
import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.List;
@Slf4j
@Service
@AllArgsConstructor
public class TaskManager {
private Scheduler scheduler;
// private TaskService taskService;
@PostConstruct
public void init() throws SchedulerException {
log.info("TaskManager初始化开始...");
// List<Task> tasks = taskService.selectAllTask();
// if(tasks != null && !tasks.isEmpty()) {
// for (Task task : tasks)
// {
// addOrUpdateTask(task);
// }
// log.info("共加载{}项任务", tasks.size());
// }
log.info("TaskManager初始化结束...");
}
public static JobKey getJobKey(@NonNull Task task) {
return JobKey.jobKey(task.getTaskName(),task.getTaskGroup());
}
public static TriggerKey getTriggerKey(@NonNull Task task) {
return TriggerKey.triggerKey(task.getTaskName(),task.getTaskGroup());
}
public void addOrUpdateTask(@NonNull Task task) throws SchedulerException {
CronTrigger trigger = null;
JobKey jobKey = getJobKey(task);
TriggerKey triggerKey = getTriggerKey(task);
trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
// 判断触发器是否存在(如果存在说明之前运行过但是在当前被禁用了,如果不存在说明一次都没运行过)
if (trigger == null) {
// 新建一个工作任务 指定任务类型为串接进行的
JobDetail jobDetail = JobBuilder.newJob(SysJob.class).withIdentity(jobKey).build();
// 将任务信息添加到任务信息中
jobDetail.getJobDataMap().put("task",task);
// 将cron表达式进行转换
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCronExpression());
// 创建触发器并将cron表达式对象给塞入
trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder)
.build();
// 在调度器中将触发器和任务进行组合
scheduler.scheduleJob(jobDetail, trigger);
// if(taskService.selectTaskByNameAndGroup(task.getTaskName(),task.getTaskGroup()) == null) {
// taskService.insertTask(task);
// }else
// taskService.updateTask(task);
}
else {
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCronExpression());
// 按照新的规则进行
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(cronScheduleBuilder)
.build();
// 将任务信息更新到任务信息中
trigger.getJobDataMap().put("task", task);
// 重启
scheduler.rescheduleJob(triggerKey, trigger);
// taskService.updateTask(task);
}
// 如任务状态为暂停
if (task.getStatus().equals(TaskStatus.PAUSE.getCode())) {
this.pauseJob(task);
}
}
public void pauseJob(@NonNull Task task) throws SchedulerException {
scheduler.pauseJob(getJobKey(task));
task.setStatus(TaskStatus.PAUSE.getCode());
// taskService.updateTask(task);
}
}
controller中添加start和pause方法,验证调度是否成功。
package cn.bit.pro1_1.controller;
import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.TaskManager;
import cn.bit.pro1_1.core.exception.TaskException;
import cn.bit.pro1_1.core.handler.ITaskHandler;
import cn.bit.pro1_1.core.handler.TaskHandlerFactory;
import cn.bit.pro1_1.service.DateService;
import lombok.AllArgsConstructor;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/quartz")
@AllArgsConstructor
public class QuartzController {
private TaskManager taskManager;
@GetMapping("/test")
public String test() throws SchedulerException {
//从标准SchedulerFactory获取默认scheduler
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
JobDetail job = JobBuilder.newJob(DateService.class).withIdentity("job1", "group1").build();
Trigger trigger1 = TriggerBuilder.newTrigger()
.withIdentity("trigger1", "group1").startNow()
.withSchedule(SimpleScheduleBuilder
.simpleSchedule()
.withIntervalInSeconds(5)
.withRepeatCount(5))
.build();
scheduler.scheduleJob(job, trigger1);
scheduler.start();
return "ok";
}
@GetMapping("/test1")
public String test1()
{
Task task = new Task();
task.setType(2);
task.setBeanName("test");
task.setMethodName("test");
task.setParams("test");
ITaskHandler taskHandler = TaskHandlerFactory.getTaskHandler(task);
try {
taskHandler.invoke(task);
} catch (TaskException e) {
e.printStackTrace();
return "error";
}
return "success";
}
@GetMapping("/test2")
public String test2()
{
Task task = new Task();
task.setType(1);
task.setClassName("cn.bit.pro1_1.service.TestJavaService");
task.setMethodName("test");
task.setParams("test");
ITaskHandler taskHandler = TaskHandlerFactory.getTaskHandler(task);
try {
taskHandler.invoke(task);
} catch (TaskException e) {
e.printStackTrace();
return "error";
}
return "success";
}
@GetMapping("/start")
public String start() throws SchedulerException {
Task task = new Task();
task.setTaskName("task1");
task.setTaskGroup("group1");
task.setType(2);
task.setBeanName("test");
task.setMethodName("test");
task.setParams("test");
task.setCronExpression("*/5 * * * * ?");
task.setStatus(1);
taskManager.addOrUpdateTask(task);
return "ok";
}
@GetMapping("/pause")
public String pause() throws SchedulerException {
Task task = new Task();
task.setTaskName("task1");
task.setTaskGroup("group1");
task.setType(2);
task.setBeanName("test");
task.setMethodName("test");
task.setParams("test");
task.setCronExpression("*/5 * * * * ?");
task.setStatus(1);
taskManager.pauseJob(task);
return "ok";
}
}
启动服务,调用接口
能够查看到任务正不断被调用。调用暂停接口后任务不再触发。
七、持久化定时任务状态
在对应数据库创建表,sql如下:
CREATE TABLE tb_task (
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
task_name VARCHAR(255) NOT NULL,
task_group VARCHAR(255) NOT NULL,
type INT NOT NULL,
bean_name VARCHAR(255) NULL,
class_name VARCHAR(255) NULL,
path VARCHAR(255) NULL,
method_name VARCHAR(255) NULL,
params VARCHAR(255) NULL,
cron_expression VARCHAR(255) NOT NULL,
description TEXT NULL,
status INT NOT NULL DEFAULT 0,
result INT NULL
);
创建对应mapper,service,serviceImpl以及mapper.xml
内容如下:
package cn.bit.pro1_1.core.mapper;
import cn.bit.pro1_1.core.Task;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
@Mapper
public interface TaskMapper {
List<Task> selectAllTask();
int updateTask(@Param("task") Task task);
int insertTask(@Param("task") Task task);
Task selectTaskByNameAndGroup(@Param("name") String name, @Param("group") String group );
}
package cn.bit.pro1_1.core.service;
import cn.bit.pro1_1.core.Task;
import java.util.List;
public interface TaskService {
List<Task> selectAllTask();
int updateTask(Task task);
int insertTask(Task task);
Task selectTaskByNameAndGroup(String taskName, String groupName);
}
package cn.bit.pro1_1.core.service.impl;
import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.mapper.TaskMapper;
import cn.bit.pro1_1.core.service.TaskService;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
@AllArgsConstructor
public class TaskServiceImpl implements TaskService {
private TaskMapper taskMapper;
@Override
public List<Task> selectAllTask() {
return taskMapper.selectAllTask();
}
@Override
public int updateTask(Task task) {
System.out.println(task.toString());
return taskMapper.updateTask(task);
}
@Override
public int insertTask(Task task) {
return taskMapper.insertTask(task);
}
@Override
public Task selectTaskByNameAndGroup(String taskName, String groupName) {
return taskMapper.selectTaskByNameAndGroup(taskName, groupName);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.bit.pro1_1.core.mapper.TaskMapper">
<resultMap id="TaskResultMap" type="cn.bit.pro1_1.core.Task">
<result property="id" column="id"/>
<result property="taskName" column="task_name"/>
<result property="taskGroup" column="task_group"/>
<result property="type" column="type"/>
<result property="beanName" column="bean_name"/>
<result property="className" column="class_name"/>
<result property="path" column="path"/>
<result property="methodName" column="method_name"/>
<result property="params" column="params"/>
<result property="cronExpression" column="cron_expression"/>
<result property="description" column="description"/>
<result property="status" column="status"/>
<result property="result" column="result"/>
</resultMap>
<insert id="insertTask">
INSERT INTO tb_task (
task_name,
task_group,
type,
bean_name,
class_name,
path,
method_name,
params,
cron_expression,
description,
status,
result
) VALUES (
#{task.taskName},
#{task.taskGroup},
#{task.type},
#{task.beanName},
#{task.className},
#{task.path},
#{task.methodName},
#{task.params},
#{task.cronExpression},
#{task.description},
#{task.status},
#{task.result}
)
</insert>
<select id="selectAllTask" resultMap="TaskResultMap">
SELECT
id,
task_name,
task_group,
type,
bean_name,
class_name,
path,
method_name,
params,
cron_expression,
description,
status,
result
FROM tb_task;
</select>
<select id="selectTaskByNameAndGroup" resultType="cn.bit.pro1_1.core.Task">
select * from tb_task
where
task_name = #{name} and
task_group = #{group}
</select>
<update id="updateTask">
UPDATE tb_task
SET
type = #{task.type},
bean_name = #{task.beanName},
class_name = #{task.className},
path = #{task.path},
method_name = #{task.methodName},
params = #{task.params},
cron_expression = #{task.cronExpression},
description = #{task.description},
status = #{task.status},
result = #{task.result}
WHERE
task_name = #{task.taskName} and
task_group = #{task.taskGroup};
</update>
</mapper>
并将之前TaskManager中与持久层相关注释还原
package cn.bit.pro1_1.core;
import cn.bit.pro1_1.core.enums.TaskStatus;
import cn.bit.pro1_1.core.service.TaskService;
import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.List;
@Slf4j
@Service
@AllArgsConstructor
public class TaskManager {
private Scheduler scheduler;
private TaskService taskService;
@PostConstruct
public void init() throws SchedulerException {
log.info("TaskManager初始化开始...");
List<Task> tasks = taskService.selectAllTask();
if(tasks != null && !tasks.isEmpty()) {
for (Task task : tasks)
{
addOrUpdateTask(task);
}
log.info("共加载{}项任务", tasks.size());
}
log.info("TaskManager初始化结束...");
}
public static JobKey getJobKey(@NonNull Task task) {
return JobKey.jobKey(task.getTaskName(),task.getTaskGroup());
}
public static TriggerKey getTriggerKey(@NonNull Task task) {
return TriggerKey.triggerKey(task.getTaskName(),task.getTaskGroup());
}
public void addOrUpdateTask(@NonNull Task task) throws SchedulerException {
CronTrigger trigger = null;
JobKey jobKey = getJobKey(task);
TriggerKey triggerKey = getTriggerKey(task);
trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
// 判断触发器是否存在(如果存在说明之前运行过但是在当前被禁用了,如果不存在说明一次都没运行过)
if (trigger == null) {
// 新建一个工作任务 指定任务类型为串接进行的
JobDetail jobDetail = JobBuilder.newJob(SysJob.class).withIdentity(jobKey).build();
// 将任务信息添加到任务信息中
jobDetail.getJobDataMap().put("task",task);
// 将cron表达式进行转换
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCronExpression());
// 创建触发器并将cron表达式对象给塞入
trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder)
.build();
// 在调度器中将触发器和任务进行组合
scheduler.scheduleJob(jobDetail, trigger);
if(taskService.selectTaskByNameAndGroup(task.getTaskName(),task.getTaskGroup()) == null) {
taskService.insertTask(task);
}else
taskService.updateTask(task);
}
else {
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCronExpression());
// 按照新的规则进行
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(cronScheduleBuilder)
.build();
// 将任务信息更新到任务信息中
trigger.getJobDataMap().put("task", task);
// 重启
scheduler.rescheduleJob(triggerKey, trigger);
taskService.updateTask(task);
}
// 如任务状态为暂停
if (task.getStatus().equals(TaskStatus.PAUSE.getCode())) {
this.pauseJob(task);
}
}
public void pauseJob(@NonNull Task task) throws SchedulerException {
scheduler.pauseJob(getJobKey(task));
task.setStatus(TaskStatus.PAUSE.getCode());
taskService.updateTask(task);
}
}
并在controller定义接口,通过请求体获取task
package cn.bit.pro1_1.controller;
import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.TaskManager;
import cn.bit.pro1_1.core.exception.TaskException;
import cn.bit.pro1_1.core.handler.ITaskHandler;
import cn.bit.pro1_1.core.handler.TaskHandlerFactory;
import cn.bit.pro1_1.service.DateService;
import lombok.AllArgsConstructor;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/quartz")
@AllArgsConstructor
public class QuartzController {
private TaskManager taskManager;
@GetMapping("/test")
public String test() throws SchedulerException {
//从标准SchedulerFactory获取默认scheduler
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
JobDetail job = JobBuilder.newJob(DateService.class).withIdentity("job1", "group1").build();
Trigger trigger1 = TriggerBuilder.newTrigger()
.withIdentity("trigger1", "group1").startNow()
.withSchedule(SimpleScheduleBuilder
.simpleSchedule()
.withIntervalInSeconds(5)
.withRepeatCount(5))
.build();
scheduler.scheduleJob(job, trigger1);
scheduler.start();
return "ok";
}
@GetMapping("/test1")
public String test1()
{
Task task = new Task();
task.setType(2);
task.setBeanName("test");
task.setMethodName("test");
task.setParams("test");
ITaskHandler taskHandler = TaskHandlerFactory.getTaskHandler(task);
try {
taskHandler.invoke(task);
} catch (TaskException e) {
e.printStackTrace();
return "error";
}
return "success";
}
@GetMapping("/test2")
public String test2()
{
Task task = new Task();
task.setType(1);
task.setClassName("cn.bit.pro1_1.service.TestJavaService");
task.setMethodName("test");
task.setParams("test");
ITaskHandler taskHandler = TaskHandlerFactory.getTaskHandler(task);
try {
taskHandler.invoke(task);
} catch (TaskException e) {
e.printStackTrace();
return "error";
}
return "success";
}
@GetMapping("/start")
public String start() throws SchedulerException {
Task task = new Task();
task.setTaskName("task1");
task.setTaskGroup("group1");
task.setType(2);
task.setBeanName("test");
task.setMethodName("test");
task.setParams("test");
task.setCronExpression("*/5 * * * * ?");
task.setStatus(1);
taskManager.addOrUpdateTask(task);
return "ok";
}
@GetMapping("/pause")
public String pause() throws SchedulerException {
Task task = new Task();
task.setTaskName("task1");
task.setTaskGroup("group1");
task.setType(2);
task.setBeanName("test");
task.setMethodName("test");
task.setParams("test");
task.setCronExpression("*/5 * * * * ?");
task.setStatus(1);
taskManager.pauseJob(task);
return "ok";
}
@PostMapping("/task")
public String addOrUpdateTask(@RequestBody Task task) throws SchedulerException {
taskManager.addOrUpdateTask(task);
return "ok";
}
}
同时为方便观察调用时间将原有输出test添加上时间
启动服务
使用postman发请求,注意此时发送的任务状态为暂停
同时数据库中增加了一条任务
将status改为1
定时任务被调用,目前是每次update任务是立刻执行一次,为bug后续会修复,可关注。
同时数据库状态改变
改变调用时间从10秒一次变成5秒一次
关掉服务器,再重启服务,从数据库中加载定时任务并执行。
最后:
目前定时模块的基本需求已经能够满足,由于模块借鉴实习公司的模块,持久化部分还存在小bug,先把基础思路和实现发出来作为一个抛砖引玉的作用。bug主要集中在持久层部分,例如在spring项目运行时无法修改调用的方法,公司的处理方法是直接暴力在前端杜绝更改,不过修改其实也很简单在addOrupdate部分针对已有任务移除并重新添加一次即可,但个人认为将add和update操作放在一个函数里导致这个bug的根本原因,同时也因为这个二合一的函数导致我持久层的代码改了四五遍,后续更新Pro1-2章会修复这个问题,同时添加日志异步等功能,Pro1-3章计划是把quartz模块添加到前言中提到的之前的微服务项目中。欢迎大家关注和收藏,你们的支持是我更新的最大动力。