当前位置: 首页 > article >正文

从零搭建微服务项目Pro(第1-1章——Quartz实现定时任务模块)

前言:

在企业项目中,往往有定时任务发布的需求,比如每天晚9点将今日数据备份一次,或每月一号将上月的销售数据邮件发送给对应的工作人员。显然这些操作不可能是人工到时间点调用一次接口,需要编写专门的模块完成任务的调度。

Quartz 是一个强大的 Java 作业调度库,支持持久化和集群模式,适合需要高可用性和负载均衡的应用场景。其使用方法相对简单。只需要实现Job接口定义调度任务,通过JobDetail传入调度所需数据,并定义触发器,即可通过Scheduler类完成任务调度,其中每个任务对应一组Trigger和JobDetail类。下为每5秒调用一次DateJob类的execute方法,共调用6次的示例。

public class DateJob implements Job {
    @Override
    public void execute(JobExecutionContext arg0) throws JobExecutionException {
       System.out.println(new Date());
    }

}
//从标准SchedulerFactory获取默认scheduler
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
JobDetail job = JobBuilder.newJob(DateJob.class).withIdentity("job1", "group1").build();
Trigger trigger1 = TriggerBuilder.newTrigger()
                   .withIdentity("trigger1", "group1").startNow()
                   .withSchedule(SimpleScheduleBuilder
                   .simpleSchedule()
                   .withIntervalInSeconds(5)
                   .withRepeatCount(5))
                   .build();
 scheduler.scheduleJob(job, trigger1);
 scheduler.start();

但如果有多个定时任务时就需要定义多个实现类,且无法将任务状态维护在数据库中,针对上述问题,本文实现一种基于Quartz的定时任务模块,只需要将定时任务所需要的参数定义在数据库中,模块可从数据库中生成对应的定时任务并进行维护。本文将从易到难逐步实践搭建完整模块并验证各部分代码内容。具体实现类图如下,其中虚线框部分为Quartz自带库。

上述类图为模块简化版本,实际还能引入装饰器模式、事件订阅和发布、AOP处理异常等进一步优化,本章将详细讲述如何从零搭建一个Quartz模块,并在后续通过一或两章的内容补充上述功能,并将模块整合至之前的微服务项目Base中,链接如下

从零搭建微服务项目Base(第0章——微服务项目结构搭建)_从0创建微服务项目-CSDN博客https://blog.csdn.net/wlf2030/article/details/145206361本章项目源码如下:

wlf728050719/SpringCloudPro1-1https://github.com/wlf728050719/SpringCloudPro1-1以及本专栏会持续更新微服务项目,每一章的项目都会基于前一章项目进行功能的完善,欢迎小伙伴们关注!同时如果只是对单章感兴趣也不用从头看,只需下载前一章项目即可,每一章都会有前置项目准备部分,跟着操作就能实现上一章的最终效果,当然如果是一直跟着做可以直接跳过这一部分。其中Base篇为基础微服务搭建,Pro篇为复杂模块实现。


一、项目创建

1.Spring Boot项目

2.添加Quartz依赖(也可不加,后续可直接替换pom文件导入依赖)

3.删去无用文件以及resources下application.properties,创建application.yml内容如下:

记得替换成自己的数据库,并测试连接成功,否则启动服务时会报找不到数据源

server:
  port: 6699
spring:
  datasource:
    url: jdbc:mysql://localhost:3306/db_quartz?useSSL=false
    username: root
    password: 15947035212
    driver-class-name: com.mysql.jdbc.Driver
  application:
    name: quartz
mybatis:
  mapper-locations: classpath:mapper/*Mapper.xml
  type-aliases-package: cn.bit.pro1_1.core.mapper

4.pom内容替换为下面内容:(尽量保证依赖版本相同,特别是mybatis版本与spring版本一致)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.9.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>cn.bit</groupId>
    <artifactId>Pro1_1</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>Pro1_1</name>
    <description>Pro1_1</description>

    <dependencies>
        <!-- Web依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.3.9.RELEASE</version>
        </dependency>
        <!-- Quartz -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
        </dependency>
        <!-- Mysql驱动 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>
        <!--Mybatis-->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>
        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
            <version>1.18.36</version>
            <optional>true</optional>
        </dependency>
        <!--test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

创建包结构如下,最外层中controller为quartz模块对外接口,core为核心,service中定义类为定时任务真正执行的业务类,core中enums定义任务执行的结果、任务类型、任务状态枚举,exception定义任务执行的异常类,handler定义java,springbean的任务处理类以及接口和工厂类。mapper和service主要服务操纵数据库任务数据,util定义工具类。


二、Scheduler入门demo

service包下创建DateService类,controller创建QuartzController类。

DateService实现job接口,执行方法为输出当前时间。QuartzController定义一个对外接口,测试quratz的简单使用。

package cn.bit.pro1_1.service;

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

import java.util.Date;

public class DateService implements Job {
    @Override
    public void execute(JobExecutionContext arg0) throws JobExecutionException {
        System.out.println(new Date());
    }

}
package cn.bit.pro1_1.controller;

import cn.bit.pro1_1.service.DateService;
import lombok.AllArgsConstructor;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/quartz")
@AllArgsConstructor
public class QuartzController {
    @GetMapping("/test")
    public String test() throws SchedulerException {
        //从标准SchedulerFactory获取默认scheduler
        Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
        JobDetail job = JobBuilder.newJob(DateService.class).withIdentity("job1", "group1").build();

        Trigger trigger1 = TriggerBuilder.newTrigger()
                .withIdentity("trigger1", "group1").startNow()
                .withSchedule(SimpleScheduleBuilder
                        .simpleSchedule()
                        .withIntervalInSeconds(5)
                        .withRepeatCount(5))
                .build();

        scheduler.scheduleJob(job, trigger1);
        scheduler.start();
        return "ok";
    }
}

启动服务:

能够看到控制台每五秒输出一次当前时间并执行6次。如果显示网页错误可能端口被占用,可以自行在application.xml文件中配置端口。


三、任务相关类定义

创建Task类以及对应枚举以及异常类,具体如下图:

Task内容如下:

package cn.bit.pro1_1.core;

import lombok.Data;
import lombok.ToString;

@Data
@ToString
public class Task {
    private Integer id;
    private String taskName;
    private String taskGroup;
    private Integer type;//1、java类 2、Spring Bean 3、http请求
    private String beanName;//bean名称
    private String className;//java类名
    private String path;//rest请求路径
    private String methodName;//方法名
    private String params;//方法参数
    private String cronExpression;//cron表达式
    private String description;//描述
    private Integer status;//任务当前状态
    private Integer result;//任务执行结果
}

枚举内容分别如下:

package cn.bit.pro1_1.core.enums;

import lombok.AllArgsConstructor;
import lombok.Getter;

@AllArgsConstructor
@Getter
public enum Result {
    FAIL(0,"失败"),
    SUCCESS(1,"成功")
    ;
    private final Integer code;
    private final String desc;
}
package cn.bit.pro1_1.core.enums;

import lombok.AllArgsConstructor;
import lombok.Getter;

@AllArgsConstructor
@Getter
public enum TaskStatus {
    PAUSE(0, "暂停中"),
    RUNNING(1, "运行中");
    private final Integer code;
    private final String desc;
}
package cn.bit.pro1_1.core.enums;

import lombok.AllArgsConstructor;
import lombok.Getter;

@AllArgsConstructor
@Getter
public enum TaskType {
    JAVA_CLASS(1,"java类"),
    SPRING_BEAN(2,"spring bean"),
    HTTP(3,"http");
    private final Integer code;
    private final String desc;
}

异常类

package cn.bit.pro1_1.core.exception;

import cn.bit.pro1_1.core.Task;
import lombok.Getter;

@Getter
public class TaskException extends Exception {
    private final Task task;
    public TaskException(String message, Task task) {
        super(message);
        this.task = task;
    }
}

四、handler接口、实现类以及工厂类

在util中创建SpringContextHolder工具类方便其他类拿取ApplicationContext。

package cn.bit.pro1_1.core.util;

import lombok.Getter;
import lombok.NonNull;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;

@Service
public class SpringContextHolder implements ApplicationContextAware {
    @Getter
    private static ApplicationContext applicationContext = null;

    @Override
    public void setApplicationContext(@NonNull ApplicationContext applicationContext) throws BeansException {
        SpringContextHolder.applicationContext = applicationContext;
    }
}

在handler中创建如下类和接口。

ITaskHandler内容

package cn.bit.pro1_1.core.handler;


import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.exception.TaskException;

public interface ITaskHandler {
    void invoke(Task task) throws TaskException;
}

JavaClassTaskHandler内容,使用全类名调用方法

package cn.bit.pro1_1.core.handler.impl;


import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.enums.Result;
import cn.bit.pro1_1.core.exception.TaskException;
import cn.bit.pro1_1.core.handler.ITaskHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

@Slf4j
@Component
public class JavaClassTaskHandler implements ITaskHandler {
    @Override
    public void invoke(Task task) throws TaskException {
        try {
            Object target;
            Class<?> clazz;
            Method method;
            Result returnValue;
            clazz = Class.forName(task.getClassName());
            target = clazz.newInstance();
            if (task.getParams() == null || task.getParams().isEmpty()) {
                method = target.getClass().getDeclaredMethod(task.getMethodName());
                ReflectionUtils.makeAccessible(method);
                returnValue = (Result) method.invoke(target);
            } else {
                method = target.getClass().getDeclaredMethod(task.getMethodName(), String.class);
                ReflectionUtils.makeAccessible(method);
                returnValue = (Result) method.invoke(target, task.getParams());
            }
            //判断业务是否执行成功
            if (returnValue == null || Result.FAIL.equals(returnValue))
                throw new TaskException("JavaClassTaskHandler方法执行失败", task);
        } catch (NoSuchMethodException e) {
            throw new TaskException("JavaClassTaskHandler找不到对应方法", task);
        } catch (InvocationTargetException | IllegalAccessException e) {
            throw new TaskException("JavaClassTaskHandler执行反射方法异常", task);
        } catch (ClassCastException e) {
            throw new TaskException("JavaClassTaskHandler方法返回值定义错误", task);
        } catch (ClassNotFoundException e) {
            throw new TaskException("JavaClassTaskHandler找不到对应类", task);
        } catch (InstantiationException e) {
            throw new TaskException("JavaClassTaskHandler实例化错误", task);
        }
    }
}

SpringBeanTaskHandler内容,通过Spring Bean的名称获取对应类以及方法。

package cn.bit.pro1_1.core.handler.impl;


import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.enums.Result;
import cn.bit.pro1_1.core.exception.TaskException;
import cn.bit.pro1_1.core.handler.ITaskHandler;
import cn.bit.pro1_1.core.util.SpringContextHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

@Slf4j
@Component
public class SpringBeanTaskHandler implements ITaskHandler {
    @Override
    public void invoke(Task task) throws TaskException {
        try {
            Object target;
            Method method;
            Result returnValue;
            //上下文寻找对应bean
            target = SpringContextHolder.getApplicationContext().getBean(task.getBeanName());
            //寻找对应方法
            if(task.getParams()==null|| task.getParams().isEmpty())
            {
                method = target.getClass().getDeclaredMethod(task.getMethodName());
                ReflectionUtils.makeAccessible(method);
                returnValue = (Result) method.invoke(target);
            }
            else{
                method = target.getClass().getDeclaredMethod(task.getMethodName(), String.class);
                ReflectionUtils.makeAccessible(method);
                returnValue = (Result) method.invoke(target, task.getParams());
            }
            //判断业务是否执行成功
            if(returnValue==null || Result.FAIL.equals(returnValue))
                throw new TaskException("SpringBeanTaskHandler方法执行失败", task);
        }catch (NoSuchBeanDefinitionException e){
            throw new TaskException("SpringBeanTaskHandler找不到对应bean", task);
        } catch (NoSuchMethodException e) {
            throw new TaskException("SpringBeanTaskHandler找不到对应方法", task);
        } catch (InvocationTargetException | IllegalAccessException e) {
            throw new TaskException("SpringBeanTaskHandler执行反射方法异常", task);
        } catch (ClassCastException e) {
            throw new TaskException("SpringBeanTaskHandler方法返回值定义错误", task);
        }
    }
}

TaskHandlerFactory内容,根据Task类型判断使用哪种handler处理

package cn.bit.pro1_1.core.handler;

import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.enums.TaskType;
import cn.bit.pro1_1.core.handler.impl.JavaClassTaskHandler;
import cn.bit.pro1_1.core.handler.impl.SpringBeanTaskHandler;
import cn.bit.pro1_1.core.util.SpringContextHolder;
import org.springframework.stereotype.Component;

@Component
public class TaskHandlerFactory {
    public static ITaskHandler getTaskHandler(Task task) {
        ITaskHandler taskHandler = null;
        if(TaskType.SPRING_BEAN.getCode().equals(task.getType())) {
            taskHandler = SpringContextHolder.getApplicationContext().getBean(SpringBeanTaskHandler.class);
        }
        if(TaskType.JAVA_CLASS.getCode().equals(task.getType())) {
            taskHandler = SpringContextHolder.getApplicationContext().getBean(JavaClassTaskHandler.class);
        }
        return taskHandler;
    }
}

五、业务定义

最外层的service层中再创建两个测试业务分别测试bean名称调用和java全类名调用两种业务。

内容如下:

package cn.bit.pro1_1.service;


import cn.bit.pro1_1.core.enums.Result;
import org.springframework.stereotype.Service;

@Service("test")
public class TestBeanService {

    public Result test(String param){
        System.out.println(param);
        return Result.SUCCESS;
    }
}
package cn.bit.pro1_1.service;


import cn.bit.pro1_1.core.enums.Result;

public class TestJavaService {
    public Result test(String param){
        System.out.println(param);
        return Result.SUCCESS;
    }
}

controller中增加测试接口test1和test2

package cn.bit.pro1_1.controller;

import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.exception.TaskException;
import cn.bit.pro1_1.core.handler.ITaskHandler;
import cn.bit.pro1_1.core.handler.TaskHandlerFactory;
import cn.bit.pro1_1.service.DateService;
import lombok.AllArgsConstructor;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/quartz")
@AllArgsConstructor
public class QuartzController {
    @GetMapping("/test")
    public String test() throws SchedulerException {
        //从标准SchedulerFactory获取默认scheduler
        Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
        JobDetail job = JobBuilder.newJob(DateService.class).withIdentity("job1", "group1").build();

        Trigger trigger1 = TriggerBuilder.newTrigger()
                .withIdentity("trigger1", "group1").startNow()
                .withSchedule(SimpleScheduleBuilder
                        .simpleSchedule()
                        .withIntervalInSeconds(5)
                        .withRepeatCount(5))
                .build();

        scheduler.scheduleJob(job, trigger1);
        scheduler.start();
        return "ok";
    }

    @GetMapping("/test1")
    public String test1()
    {
        Task task = new Task();
        task.setType(2);
        task.setBeanName("test");
        task.setMethodName("test");
        task.setParams("test");
        ITaskHandler taskHandler = TaskHandlerFactory.getTaskHandler(task);
        try {
            taskHandler.invoke(task);
        } catch (TaskException e) {
            e.printStackTrace();
            return "error";
        }
        return "success";
    }
    @GetMapping("/test2")
    public String test2()
    {
        Task task = new Task();
        task.setType(1);
        task.setClassName("cn.bit.pro1_1.service.TestJavaService");
        task.setMethodName("test");
        task.setParams("test");
        ITaskHandler taskHandler = TaskHandlerFactory.getTaskHandler(task);
        try {
            taskHandler.invoke(task);
        } catch (TaskException e) {
            e.printStackTrace();
            return "error";
        }
        return "success";
    }
}

调用test1

再调用test2

无论是bean调用还是类名调用均成功。


六、任务调度器

core中定义SysJob以及TaskManager类

SysJob实现job接口,完成之前反射调用的封装。内容如下:

package cn.bit.pro1_1.core;

import cn.bit.pro1_1.core.exception.TaskException;
import cn.bit.pro1_1.core.handler.ITaskHandler;
import cn.bit.pro1_1.core.handler.TaskHandlerFactory;
import lombok.extern.slf4j.Slf4j;
import org.quartz.Job;
import org.quartz.JobExecutionContext;

@Slf4j
public class SysJob implements Job {
    @Override
    public void execute(JobExecutionContext jobExecutionContext){
        Task task = (Task) jobExecutionContext.getJobDetail().getJobDataMap().get("task");
        ITaskHandler handler = TaskHandlerFactory.getTaskHandler(task);
        try {
            handler.invoke(task);
        } catch (TaskException e) {
            log.error("{},Task:{}", e.getMessage(), e.getTask().toString());
        }
    }
}

TaskManger则对外提供服务管理调度器中任务状态。内容如下:(其中注释部分为持久化部分)

package cn.bit.pro1_1.core;


import cn.bit.pro1_1.core.enums.TaskStatus;


import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.List;


@Slf4j
@Service
@AllArgsConstructor
public class TaskManager {
    private Scheduler scheduler;
//    private TaskService taskService;
    @PostConstruct
    public void init() throws SchedulerException {
        log.info("TaskManager初始化开始...");
//        List<Task> tasks = taskService.selectAllTask();
//        if(tasks != null && !tasks.isEmpty()) {
//            for (Task task : tasks)
//            {
//                addOrUpdateTask(task);
//            }
//            log.info("共加载{}项任务", tasks.size());
//        }
        log.info("TaskManager初始化结束...");
    }
    public static JobKey getJobKey(@NonNull Task task) {
        return JobKey.jobKey(task.getTaskName(),task.getTaskGroup());
    }
    public static TriggerKey getTriggerKey(@NonNull Task task) {
        return TriggerKey.triggerKey(task.getTaskName(),task.getTaskGroup());
    }
    public void addOrUpdateTask(@NonNull Task task) throws SchedulerException {
        CronTrigger trigger = null;

        JobKey jobKey = getJobKey(task);
        TriggerKey triggerKey = getTriggerKey(task);
        trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
        // 判断触发器是否存在(如果存在说明之前运行过但是在当前被禁用了,如果不存在说明一次都没运行过)
        if (trigger == null) {
            // 新建一个工作任务 指定任务类型为串接进行的
            JobDetail jobDetail = JobBuilder.newJob(SysJob.class).withIdentity(jobKey).build();
            // 将任务信息添加到任务信息中
            jobDetail.getJobDataMap().put("task",task);
            // 将cron表达式进行转换
            CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCronExpression());
            // 创建触发器并将cron表达式对象给塞入
            trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder)
                    .build();
            // 在调度器中将触发器和任务进行组合
            scheduler.scheduleJob(jobDetail, trigger);

//            if(taskService.selectTaskByNameAndGroup(task.getTaskName(),task.getTaskGroup()) == null) {
//                taskService.insertTask(task);
//            }else
//                taskService.updateTask(task);
        }
        else {
            CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCronExpression());
            // 按照新的规则进行
            trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(cronScheduleBuilder)
                    .build();
            // 将任务信息更新到任务信息中
            trigger.getJobDataMap().put("task", task);
            // 重启
            scheduler.rescheduleJob(triggerKey, trigger);
//            taskService.updateTask(task);
        }
        // 如任务状态为暂停
        if (task.getStatus().equals(TaskStatus.PAUSE.getCode())) {
            this.pauseJob(task);
        }
    }

    public void pauseJob(@NonNull Task task) throws SchedulerException {
        scheduler.pauseJob(getJobKey(task));
        task.setStatus(TaskStatus.PAUSE.getCode());
//        taskService.updateTask(task);
    }
}

controller中添加start和pause方法,验证调度是否成功。

package cn.bit.pro1_1.controller;

import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.TaskManager;
import cn.bit.pro1_1.core.exception.TaskException;
import cn.bit.pro1_1.core.handler.ITaskHandler;
import cn.bit.pro1_1.core.handler.TaskHandlerFactory;
import cn.bit.pro1_1.service.DateService;
import lombok.AllArgsConstructor;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/quartz")
@AllArgsConstructor
public class QuartzController {

    private TaskManager taskManager;
    @GetMapping("/test")
    public String test() throws SchedulerException {
        //从标准SchedulerFactory获取默认scheduler
        Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
        JobDetail job = JobBuilder.newJob(DateService.class).withIdentity("job1", "group1").build();

        Trigger trigger1 = TriggerBuilder.newTrigger()
                .withIdentity("trigger1", "group1").startNow()
                .withSchedule(SimpleScheduleBuilder
                        .simpleSchedule()
                        .withIntervalInSeconds(5)
                        .withRepeatCount(5))
                .build();

        scheduler.scheduleJob(job, trigger1);
        scheduler.start();
        return "ok";
    }

    @GetMapping("/test1")
    public String test1()
    {
        Task task = new Task();
        task.setType(2);
        task.setBeanName("test");
        task.setMethodName("test");
        task.setParams("test");
        ITaskHandler taskHandler = TaskHandlerFactory.getTaskHandler(task);
        try {
            taskHandler.invoke(task);
        } catch (TaskException e) {
            e.printStackTrace();
            return "error";
        }
        return "success";
    }
    @GetMapping("/test2")
    public String test2()
    {
        Task task = new Task();
        task.setType(1);
        task.setClassName("cn.bit.pro1_1.service.TestJavaService");
        task.setMethodName("test");
        task.setParams("test");
        ITaskHandler taskHandler = TaskHandlerFactory.getTaskHandler(task);
        try {
            taskHandler.invoke(task);
        } catch (TaskException e) {
            e.printStackTrace();
            return "error";
        }
        return "success";
    }

    @GetMapping("/start")
    public String start() throws SchedulerException {
        Task task = new Task();
        task.setTaskName("task1");
        task.setTaskGroup("group1");
        task.setType(2);
        task.setBeanName("test");
        task.setMethodName("test");
        task.setParams("test");
        task.setCronExpression("*/5 * * * * ?");
        task.setStatus(1);
        taskManager.addOrUpdateTask(task);
        return "ok";
    }
    @GetMapping("/pause")
    public String pause() throws SchedulerException {
        Task task = new Task();
        task.setTaskName("task1");
        task.setTaskGroup("group1");
        task.setType(2);
        task.setBeanName("test");
        task.setMethodName("test");
        task.setParams("test");
        task.setCronExpression("*/5 * * * * ?");
        task.setStatus(1);
        taskManager.pauseJob(task);
        return "ok";
    }
}

启动服务,调用接口

能够查看到任务正不断被调用。调用暂停接口后任务不再触发。


七、持久化定时任务状态

在对应数据库创建表,sql如下:

CREATE TABLE tb_task (
    id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
    task_name VARCHAR(255) NOT NULL,
    task_group VARCHAR(255) NOT NULL,
    type INT NOT NULL,
    bean_name VARCHAR(255) NULL,
    class_name VARCHAR(255) NULL,
    path VARCHAR(255) NULL,
    method_name VARCHAR(255) NULL,
    params VARCHAR(255) NULL,
    cron_expression VARCHAR(255) NOT NULL,
    description TEXT NULL,
    status INT NOT NULL DEFAULT 0,
    result INT NULL
);

创建对应mapper,service,serviceImpl以及mapper.xml

内容如下:

package cn.bit.pro1_1.core.mapper;


import cn.bit.pro1_1.core.Task;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

import java.util.List;

@Mapper
public interface TaskMapper {
    List<Task> selectAllTask();
    int updateTask(@Param("task") Task task);
    int insertTask(@Param("task") Task task);
    Task selectTaskByNameAndGroup(@Param("name") String name, @Param("group") String group );
}
package cn.bit.pro1_1.core.service;

import cn.bit.pro1_1.core.Task;

import java.util.List;


public interface TaskService {
    List<Task> selectAllTask();
    int updateTask(Task task);
    int insertTask(Task task);
    Task selectTaskByNameAndGroup(String taskName, String groupName);
}
package cn.bit.pro1_1.core.service.impl;

import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.mapper.TaskMapper;
import cn.bit.pro1_1.core.service.TaskService;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
@AllArgsConstructor
public class TaskServiceImpl implements TaskService {
    private TaskMapper taskMapper;
    @Override
    public List<Task> selectAllTask() {
        return taskMapper.selectAllTask();
    }

    @Override
    public int updateTask(Task task) {
        System.out.println(task.toString());
        return taskMapper.updateTask(task);
    }

    @Override
    public int insertTask(Task task) {
        return taskMapper.insertTask(task);
    }

    @Override
    public Task selectTaskByNameAndGroup(String taskName, String groupName) {
        return taskMapper.selectTaskByNameAndGroup(taskName, groupName);
    }
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.bit.pro1_1.core.mapper.TaskMapper">
    <resultMap id="TaskResultMap" type="cn.bit.pro1_1.core.Task">
        <result property="id" column="id"/>
        <result property="taskName" column="task_name"/>
        <result property="taskGroup" column="task_group"/>
        <result property="type" column="type"/>
        <result property="beanName" column="bean_name"/>
        <result property="className" column="class_name"/>
        <result property="path" column="path"/>
        <result property="methodName" column="method_name"/>
        <result property="params" column="params"/>
        <result property="cronExpression" column="cron_expression"/>
        <result property="description" column="description"/>
        <result property="status" column="status"/>
        <result property="result" column="result"/>
    </resultMap>
    <insert id="insertTask">
        INSERT INTO tb_task (
            task_name,
            task_group,
            type,
            bean_name,
            class_name,
            path,
            method_name,
            params,
            cron_expression,
            description,
            status,
            result
        ) VALUES (
                     #{task.taskName},
                     #{task.taskGroup},
                     #{task.type},
                     #{task.beanName},
                     #{task.className},
                     #{task.path},
                     #{task.methodName},
                     #{task.params},
                     #{task.cronExpression},
                     #{task.description},
                     #{task.status},
                     #{task.result}
                 )
    </insert>


    <select id="selectAllTask" resultMap="TaskResultMap">
        SELECT
            id,
            task_name,
            task_group,
            type,
            bean_name,
            class_name,
            path,
            method_name,
            params,
            cron_expression,
            description,
            status,
            result
        FROM tb_task;
    </select>
    <select id="selectTaskByNameAndGroup" resultType="cn.bit.pro1_1.core.Task">
        select * from tb_task
        where
            task_name = #{name} and
            task_group = #{group}
    </select>

    <update id="updateTask">
        UPDATE tb_task
        SET
            type = #{task.type},
            bean_name = #{task.beanName},
            class_name = #{task.className},
            path = #{task.path},
            method_name = #{task.methodName},
            params = #{task.params},
            cron_expression = #{task.cronExpression},
            description = #{task.description},
            status = #{task.status},
            result = #{task.result}
        WHERE
            task_name = #{task.taskName} and
            task_group = #{task.taskGroup};
    </update>
</mapper>

并将之前TaskManager中与持久层相关注释还原

package cn.bit.pro1_1.core;


import cn.bit.pro1_1.core.enums.TaskStatus;
import cn.bit.pro1_1.core.service.TaskService;

import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.List;


@Slf4j
@Service
@AllArgsConstructor
public class TaskManager {
    private Scheduler scheduler;
    private TaskService taskService;
    @PostConstruct
    public void init() throws SchedulerException {
        log.info("TaskManager初始化开始...");
        List<Task> tasks = taskService.selectAllTask();
        if(tasks != null && !tasks.isEmpty()) {
            for (Task task : tasks)
            {
                addOrUpdateTask(task);
            }
            log.info("共加载{}项任务", tasks.size());
        }
        log.info("TaskManager初始化结束...");
    }
    public static JobKey getJobKey(@NonNull Task task) {
        return JobKey.jobKey(task.getTaskName(),task.getTaskGroup());
    }
    public static TriggerKey getTriggerKey(@NonNull Task task) {
        return TriggerKey.triggerKey(task.getTaskName(),task.getTaskGroup());
    }
    public void addOrUpdateTask(@NonNull Task task) throws SchedulerException {
        CronTrigger trigger = null;

        JobKey jobKey = getJobKey(task);
        TriggerKey triggerKey = getTriggerKey(task);
        trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
        // 判断触发器是否存在(如果存在说明之前运行过但是在当前被禁用了,如果不存在说明一次都没运行过)
        if (trigger == null) {
            // 新建一个工作任务 指定任务类型为串接进行的
            JobDetail jobDetail = JobBuilder.newJob(SysJob.class).withIdentity(jobKey).build();
            // 将任务信息添加到任务信息中
            jobDetail.getJobDataMap().put("task",task);
            // 将cron表达式进行转换
            CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCronExpression());
            // 创建触发器并将cron表达式对象给塞入
            trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder)
                    .build();
            // 在调度器中将触发器和任务进行组合
            scheduler.scheduleJob(jobDetail, trigger);

            if(taskService.selectTaskByNameAndGroup(task.getTaskName(),task.getTaskGroup()) == null) {
                taskService.insertTask(task);
            }else
                taskService.updateTask(task);
        }
        else {
            CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCronExpression());
            // 按照新的规则进行
            trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(cronScheduleBuilder)
                    .build();
            // 将任务信息更新到任务信息中
            trigger.getJobDataMap().put("task", task);
            // 重启
            scheduler.rescheduleJob(triggerKey, trigger);
            taskService.updateTask(task);
        }
        // 如任务状态为暂停
        if (task.getStatus().equals(TaskStatus.PAUSE.getCode())) {
            this.pauseJob(task);
        }
    }

    public void pauseJob(@NonNull Task task) throws SchedulerException {
        scheduler.pauseJob(getJobKey(task));
        task.setStatus(TaskStatus.PAUSE.getCode());
        taskService.updateTask(task);
    }
}

并在controller定义接口,通过请求体获取task

package cn.bit.pro1_1.controller;

import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.TaskManager;
import cn.bit.pro1_1.core.exception.TaskException;
import cn.bit.pro1_1.core.handler.ITaskHandler;
import cn.bit.pro1_1.core.handler.TaskHandlerFactory;
import cn.bit.pro1_1.service.DateService;
import lombok.AllArgsConstructor;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/quartz")
@AllArgsConstructor
public class QuartzController {

    private TaskManager taskManager;
    @GetMapping("/test")
    public String test() throws SchedulerException {
        //从标准SchedulerFactory获取默认scheduler
        Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
        JobDetail job = JobBuilder.newJob(DateService.class).withIdentity("job1", "group1").build();

        Trigger trigger1 = TriggerBuilder.newTrigger()
                .withIdentity("trigger1", "group1").startNow()
                .withSchedule(SimpleScheduleBuilder
                        .simpleSchedule()
                        .withIntervalInSeconds(5)
                        .withRepeatCount(5))
                .build();

        scheduler.scheduleJob(job, trigger1);
        scheduler.start();
        return "ok";
    }

    @GetMapping("/test1")
    public String test1()
    {
        Task task = new Task();
        task.setType(2);
        task.setBeanName("test");
        task.setMethodName("test");
        task.setParams("test");
        ITaskHandler taskHandler = TaskHandlerFactory.getTaskHandler(task);
        try {
            taskHandler.invoke(task);
        } catch (TaskException e) {
            e.printStackTrace();
            return "error";
        }
        return "success";
    }
    @GetMapping("/test2")
    public String test2()
    {
        Task task = new Task();
        task.setType(1);
        task.setClassName("cn.bit.pro1_1.service.TestJavaService");
        task.setMethodName("test");
        task.setParams("test");
        ITaskHandler taskHandler = TaskHandlerFactory.getTaskHandler(task);
        try {
            taskHandler.invoke(task);
        } catch (TaskException e) {
            e.printStackTrace();
            return "error";
        }
        return "success";
    }

    @GetMapping("/start")
    public String start() throws SchedulerException {
        Task task = new Task();
        task.setTaskName("task1");
        task.setTaskGroup("group1");
        task.setType(2);
        task.setBeanName("test");
        task.setMethodName("test");
        task.setParams("test");
        task.setCronExpression("*/5 * * * * ?");
        task.setStatus(1);
        taskManager.addOrUpdateTask(task);
        return "ok";
    }
    @GetMapping("/pause")
    public String pause() throws SchedulerException {
        Task task = new Task();
        task.setTaskName("task1");
        task.setTaskGroup("group1");
        task.setType(2);
        task.setBeanName("test");
        task.setMethodName("test");
        task.setParams("test");
        task.setCronExpression("*/5 * * * * ?");
        task.setStatus(1);
        taskManager.pauseJob(task);
        return "ok";
    }

    @PostMapping("/task")
    public String addOrUpdateTask(@RequestBody Task task) throws SchedulerException {
        taskManager.addOrUpdateTask(task);
        return "ok";
    }
}

同时为方便观察调用时间将原有输出test添加上时间

启动服务

使用postman发请求,注意此时发送的任务状态为暂停

同时数据库中增加了一条任务

将status改为1

定时任务被调用,目前是每次update任务是立刻执行一次,为bug后续会修复,可关注。

同时数据库状态改变

改变调用时间从10秒一次变成5秒一次

关掉服务器,再重启服务,从数据库中加载定时任务并执行。


最后:

目前定时模块的基本需求已经能够满足,由于模块借鉴实习公司的模块,持久化部分还存在小bug,先把基础思路和实现发出来作为一个抛砖引玉的作用。bug主要集中在持久层部分,例如在spring项目运行时无法修改调用的方法,公司的处理方法是直接暴力在前端杜绝更改,不过修改其实也很简单在addOrupdate部分针对已有任务移除并重新添加一次即可,但个人认为将add和update操作放在一个函数里导致这个bug的根本原因,同时也因为这个二合一的函数导致我持久层的代码改了四五遍,后续更新Pro1-2章会修复这个问题,同时添加日志异步等功能,Pro1-3章计划是把quartz模块添加到前言中提到的之前的微服务项目中。欢迎大家关注和收藏,你们的支持是我更新的最大动力。


http://www.kler.cn/a/557639.html

相关文章:

  • 实现 INFINI Console 与 GitHub 的单点登录集成:一站式身份验证解决方案
  • 国产编辑器EverEdit - 洞察秋毫!内置文件比较功能!
  • 正确清理C盘空间
  • 【AI】常见的AI工具地址和学习资料链接
  • INDEMIND:AI视觉赋能服务机器人,“零”碰撞避障技术实现全天候安全
  • picgo-plugin-huawei插件发布
  • github配置sshkey
  • Apipost 与 Postman 工具实践指南:WebSocket调试与动态参数测试
  • springboot单机支持1w并发,需要做哪些优化
  • Mac m1 连接公司内网
  • 期权帮|股指期货中的套期保值如何操作?
  • 智慧校园导航系统路径规划实战(附Python源码):用A*算法实现教学楼最优路径搜索
  • 听懂 弦外之音
  • 模电之深度负反馈详解
  • JavaScript函数-函数的参数
  • 【AI】详解从数学到物理再到工程应用,人类研究新理论 新方法的研究范式 (deepseek chatgpt Gemini等)...
  • Android studio如何把新项目上传到svn仓库
  • 实现一个简单的拉取网络todo app
  • 使用Ubuntu搭建Java部署环境
  • Andorid 学习 Compose UI(1):Box