当前位置: 首页 > article >正文

基于 Spring Boot + Quartz 实现定时任务持久化配置

前言

关于使用 Spring 实现定时任务的配置,可以参考官方文档:

https://docs.spring.io/spring-framework/reference/integration/scheduling.html#scheduling-enable-annotation-support

https://docs.spring.io/spring-boot/reference/io/quartz.html

https://docs.spring.io/spring-boot/reference/features/task-execution-and-scheduling.html#page-title

如果使用 SpringBoot + Quartz 方式实现定时任务的配置并将任务配置持久化到数据库中则需要我们手动配置,下面是具体的配置流程:

配置流程

1、引入 Quartz 依赖
在 pom.xml 中添加 Quartz 依赖,确保项目具备定时任务调度的功能:

<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.3.2</version>
</dependency>

2、配置 ScheduleConfig 类
新建一个 ScheduleConfig 配置类,用于配置 Quartz 的调度器和其他必要属性。如下是配置类的代码:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
 
import javax.sql.DataSource;
import java.util.Properties;
 
@Configuration
public class ScheduleConfig {
 
    @Bean
    public SchedulerFactoryBean incrementIndexSchedulerFactory(DataSource dataSource) {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        // 设置数据源
        factory.setDataSource(dataSource);
        // quartz参数
        Properties prop = new Properties();
        // 定义 Quartz 调度器的实例名称
        prop.put("org.quartz.scheduler.instanceName", "IncrementIndexScheduler");
        /*
         *  定义当前 Quartz 实例的 ID。instanceId 在 Quartz 集群环境中用于唯一标识每个 Scheduler 实例
         *  "AUTO" 表示 Quartz 会自动生成一个唯一的实例 ID,这通常用于集群模式下
         */
        prop.put("org.quartz.scheduler.instanceId", "AUTO");
 
        // 线程池配置
        prop.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
        // 定义线程池中的线程数量
        prop.put("org.quartz.threadPool.threadCount", "20");
        // 定义线程的优先级
        prop.put("org.quartz.threadPool.threadPriority", "5");
 
        // JobStore 配置
        // 指定 Quartz 使用的任务存储机制。在这里,JobStoreTX 是 Quartz 的一种基于数据库的任务存储方式
        prop.put("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
        //prop.put("org.quartz.jobStore.class", "org.springframework.scheduling.quartz.LocalDataSourceJobStore");
 
        // 集群配置
        /*
         * 启用 Quartz 的集群模式。设置为 true 时,表示 Quartz 实例将运行在集群环境中,多个实例可以协同工作,共同管理调度任务。
         * 当集群模式启用时,多个 Quartz 实例可以同时连接到同一个数据库,数据库中的任务会由这些实例协调调度。
         * 在这种模式下,某个任务只能由一个实例执行,避免重复执行。
         */
        prop.put("org.quartz.jobStore.isClustered", "true");
        /*
         * 配置 Quartz 集群中的实例之间的检查间隔时间,单位是毫秒
         * 15000 表示每 15 秒,Quartz 实例会向数据库报告它仍在运行(即“心跳”)
         * Quartz 集群中的每个节点都会定期通过数据库“打卡”来表明自己还活跃。
         * 如果某个实例超过了该时间间隔没有打卡,其他实例会认为它已经宕机,接管它负责的任务。
         */
        prop.put("org.quartz.jobStore.clusterCheckinInterval", "15000");
        /*
         * 设置 Quartz 同时处理“错过触发”(misfire)的任务数。
         * 1 表示每次最多处理 1 个错过触发的任务。
         * 当任务到达计划的执行时间,但由于某些原因(如系统负载或故障)未能及时触发执行时,就会发生“misfire”。
         * Quartz 允许在后续调度时重新处理这些错过的任务。
         */
        prop.put("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1");
        /*
         * 指定数据库事务隔离级别为 Serializable,即最严格的隔离级别,以避免并发操作导致的数据问题
         * Serializable 是 SQL 标准中的最高事务隔离级别,能够防止脏读、不可重复读和幻读问题。
         * 在 Quartz 集群环境中,这种设置确保调度任务的执行顺序不会因为并发问题而被破坏,从而保证调度的准确性。
         */
        prop.put("org.quartz.jobStore.txIsolationLevelSerializable", "true");
        /*
         * misfireThreshold 用于指定 Quartz 检测“任务错过触发”的时间阈值(单位为毫秒)。
         * 当任务触发时间超过这个阈值时,Quartz 会将这个任务标记为“misfire”(错过触发)。
         * 12000 表示 Quartz 会将超过预定触发时间 12 秒的任务视为错过触发(misfire)。
         */
        prop.put("org.quartz.jobStore.misfireThreshold", "12000");
 
        prop.put("org.quartz.jobStore.tablePrefix", "increment_index_qrtz_");
        factory.setQuartzProperties(prop);
 
        factory.setSchedulerName("IncrementIndexScheduler");
        // 延时启动
        factory.setStartupDelay(1);
        /*
         * 将 Spring 的 ApplicationContext 注入到 Quartz 的 SchedulerContext,
         * 允许 Quartz 的任务在执行时能够访问 Spring 的应用上下文,从而获取 Spring 管理的 Bean。
         */
        factory.setApplicationContextSchedulerContextKey("applicationContextKey");
 
        factory.setOverwriteExistingJobs(true);
        // 设置自动启动,默认为 true
        factory.setAutoStartup(true);
        return factory;
    }
}

3、创建 Quartz 数据表
Quartz 需要一些特定的数据库表来存储调度任务相关的数据。具体的 sql 可以访问 quartz 的 github 项目中,进行下载:

https://github.com/quartz-scheduler/quartz/tree/main/quartz/src/main/resources/org/quartz/impl/jdbcjobstore

本例中使用的是 mysql 数据库,需要注意的是,表的前缀为 “increment_index_qrtz_”,因此 SQL 脚本如下:

#
# Quartz seems to work best with the driver mm.mysql-2.0.7-bin.jar
#
# PLEASE consider using mysql with innodb tables to avoid locking issues
#
# In your Quartz properties file, you'll need to set
# org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#
DROP TABLE IF EXISTS increment_index_qrtz_fired_triggers;
DROP TABLE IF EXISTS increment_index_qrtz_paused_trigger_grps;
DROP TABLE IF EXISTS increment_index_qrtz_scheduler_state;
DROP TABLE IF EXISTS increment_index_qrtz_locks;
DROP TABLE IF EXISTS increment_index_qrtz_simple_triggers;
DROP TABLE IF EXISTS increment_index_qrtz_simprop_triggers;
DROP TABLE IF EXISTS increment_index_qrtz_cron_triggers;
DROP TABLE IF EXISTS increment_index_qrtz_blob_triggers;
DROP TABLE IF EXISTS increment_index_qrtz_triggers;
DROP TABLE IF EXISTS increment_index_qrtz_job_details;
DROP TABLE IF EXISTS increment_index_qrtz_calendars;
 
CREATE TABLE increment_index_qrtz_job_details
(
    sched_name        VARCHAR(120) NOT NULL,
    job_name          VARCHAR(200) NOT NULL,
    job_group         VARCHAR(200) NOT NULL,
    description       VARCHAR(250) NULL,
    job_class_name    VARCHAR(250) NOT NULL,
    is_durable        VARCHAR(1)   NOT NULL,
    is_nonconcurrent  VARCHAR(1)   NOT NULL,
    is_update_data    VARCHAR(1)   NOT NULL,
    requests_recovery VARCHAR(1)   NOT NULL,
    job_data          BLOB         NULL,
    PRIMARY KEY (sched_name, job_name, job_group)
);
 
CREATE TABLE increment_index_qrtz_triggers
(
    sched_name     VARCHAR(120) NOT NULL,
    trigger_name   VARCHAR(200) NOT NULL,
    trigger_group  VARCHAR(200) NOT NULL,
    job_name       VARCHAR(200) NOT NULL,
    job_group      VARCHAR(200) NOT NULL,
    description    VARCHAR(250) NULL,
    next_fire_time BIGINT(13)   NULL,
    prev_fire_time BIGINT(13)   NULL,
    priority       INTEGER      NULL,
    trigger_state  VARCHAR(16)  NOT NULL,
    trigger_type   VARCHAR(8)   NOT NULL,
    start_time     BIGINT(13)   NOT NULL,
    end_time       BIGINT(13)   NULL,
    calendar_name  VARCHAR(200) NULL,
    misfire_instr  SMALLINT(2)  NULL,
    job_data       BLOB         NULL,
    PRIMARY KEY (sched_name, trigger_name, trigger_group),
    FOREIGN KEY (sched_name, job_name, job_group)
        REFERENCES increment_index_qrtz_job_details (sched_name, job_name, job_group)
);
 
CREATE TABLE increment_index_qrtz_simple_triggers
(
    sched_name      VARCHAR(120) NOT NULL,
    trigger_name    VARCHAR(200) NOT NULL,
    trigger_group   VARCHAR(200) NOT NULL,
    repeat_count    BIGINT(7)    NOT NULL,
    repeat_interval BIGINT(12)   NOT NULL,
    times_triggered BIGINT(10)   NOT NULL,
    PRIMARY KEY (sched_name, trigger_name, trigger_group),
    FOREIGN KEY (sched_name, trigger_name, trigger_group)
        REFERENCES increment_index_qrtz_triggers (sched_name, trigger_name, trigger_group)
);
 
CREATE TABLE increment_index_qrtz_cron_triggers
(
    sched_name      VARCHAR(120) NOT NULL,
    trigger_name    VARCHAR(200) NOT NULL,
    trigger_group   VARCHAR(200) NOT NULL,
    cron_expression VARCHAR(200) NOT NULL,
    time_zone_id    VARCHAR(80),
    PRIMARY KEY (sched_name, trigger_name, trigger_group),
    FOREIGN KEY (sched_name, trigger_name, trigger_group)
        REFERENCES increment_index_qrtz_triggers (sched_name, trigger_name, trigger_group)
);
 
CREATE TABLE increment_index_qrtz_simprop_triggers
(
    sched_name    VARCHAR(120)   NOT NULL,
    trigger_name  VARCHAR(200)   NOT NULL,
    trigger_group VARCHAR(200)   NOT NULL,
    str_prop_1    VARCHAR(512)   NULL,
    str_prop_2    VARCHAR(512)   NULL,
    str_prop_3    VARCHAR(512)   NULL,
    int_prop_1    INT            NULL,
    int_prop_2    INT            NULL,
    long_prop_1   BIGINT         NULL,
    long_prop_2   BIGINT         NULL,
    dec_prop_1    NUMERIC(13, 4) NULL,
    dec_prop_2    NUMERIC(13, 4) NULL,
    bool_prop_1   VARCHAR(1)     NULL,
    bool_prop_2   VARCHAR(1)     NULL,
    PRIMARY KEY (sched_name, trigger_name, trigger_group),
    FOREIGN KEY (sched_name, trigger_name, trigger_group)
        REFERENCES increment_index_qrtz_triggers (sched_name, trigger_name, trigger_group)
);
 
CREATE TABLE increment_index_qrtz_blob_triggers
(
    sched_name    VARCHAR(120) NOT NULL,
    trigger_name  VARCHAR(200) NOT NULL,
    trigger_group VARCHAR(200) NOT NULL,
    blob_data     BLOB         NULL,
    PRIMARY KEY (sched_name, trigger_name, trigger_group),
    FOREIGN KEY (sched_name, trigger_name, trigger_group)
        REFERENCES increment_index_qrtz_triggers (sched_name, trigger_name, trigger_group)
);
 
CREATE TABLE increment_index_qrtz_calendars
(
    sched_name    VARCHAR(120) NOT NULL,
    calendar_name VARCHAR(200) NOT NULL,
    calendar      BLOB         NOT NULL,
    PRIMARY KEY (sched_name, calendar_name)
);
 
CREATE TABLE increment_index_qrtz_paused_trigger_grps
(
    sched_name    VARCHAR(120) NOT NULL,
    trigger_group VARCHAR(200) NOT NULL,
    PRIMARY KEY (sched_name, trigger_group)
);
 
CREATE TABLE increment_index_qrtz_fired_triggers
(
    sched_name        VARCHAR(120) NOT NULL,
    entry_id          VARCHAR(95)  NOT NULL,
    trigger_name      VARCHAR(200) NOT NULL,
    trigger_group     VARCHAR(200) NOT NULL,
    instance_name     VARCHAR(200) NOT NULL,
    fired_time        BIGINT(13)   NOT NULL,
    sched_time        BIGINT(13)   NOT NULL,
    priority          INTEGER      NOT NULL,
    state             VARCHAR(16)  NOT NULL,
    job_name          VARCHAR(200) NULL,
    job_group         VARCHAR(200) NULL,
    is_nonconcurrent  VARCHAR(1)   NULL,
    requests_recovery VARCHAR(1)   NULL,
    PRIMARY KEY (sched_name, entry_id)
);
 
CREATE TABLE increment_index_qrtz_scheduler_state
(
    sched_name        VARCHAR(120) NOT NULL,
    instance_name     VARCHAR(200) NOT NULL,
    last_checkin_time BIGINT(13)   NOT NULL,
    checkin_interval  BIGINT(13)   NOT NULL,
    PRIMARY KEY (sched_name, instance_name)
);
 
CREATE TABLE increment_index_qrtz_locks
(
    sched_name VARCHAR(120) NOT NULL,
    lock_name  VARCHAR(40)  NOT NULL,
    PRIMARY KEY (sched_name, lock_name)
);
 
COMMIT;

4、初始化定时任务
接下来,定义一个 IncrementIndexTaskConfig 类(名字自己定义),确保项目启动时定时任务会根据数据库的配置自动加载和初始化。

@Configuration
public class IncrementIndexTaskConfig {
 
    private final IndexUpdatePipelineService indexUpdatePipelineService;
 
    private final Scheduler scheduler;
 
    public IncrementIndexTaskConfig(IndexUpdatePipelineService indexUpdatePipelineService,
                                    @Qualifier("incrementIndexSchedulerFactory") Scheduler scheduler) {
        this.indexUpdatePipelineService = indexUpdatePipelineService;
        this.scheduler = scheduler;
    }
 
    @PostConstruct
    public void init() throws SchedulerException {
        scheduler.clear();
        List<SolrIndexUpdatePipeline> indexUpdatePipelines = indexUpdatePipelineService.listAll();
        for (SolrIndexUpdatePipeline indexUpdatePipeline : indexUpdatePipelines) {
             ScheduleUtils.createIncrementIndexScheduleJob(scheduler, indexUpdatePipeline);
        }
    }
}

可以看到这个类注入了一个 Service (自己根据自身需要选择性注入你所需要的)以及 Scheduler 对象,在 Bean 加载时会执行 init() 方法,方法里先执行 scheduler.clear(); 方法目的是先清空之前保存的定时任务配置,重新注册。

此时会读取数据库通过工具类 ScheduleUtils 进行创建定时任务。

5、定义 ScheduleUtils 工具类
ScheduleUtils 类用于创建定时任务,它可以将定时任务的具体执行逻辑和 Quartz 的配置进行绑定:

public class ScheduleUtils {
 
    public static void createIncrementIndexScheduleJob(Scheduler scheduler, SolrIndexUpdatePipeline indexUpdatePipeline) throws SchedulerException {
        Class<? extends Job> jobClass = QuartzDisallowConcurrentExecution.class;
        // 构建job信息
        Long jobId = indexUpdatePipeline.getId();
        String jobGroup = INCREMENT_INDEX_JOB_GROUP;
        JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(getJobKey(jobId, jobGroup)).build();
 
        // 表达式调度构建器
        CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(indexUpdatePipeline.getCron());
 
        // 按新的cronExpression表达式构建一个新的trigger
        CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(getTriggerKey(jobId, jobGroup))
                .withSchedule(cronScheduleBuilder).build();
 
        // 放入参数,运行时的方法可以获取
        jobDetail.getJobDataMap().put(ScheduleConstants.TASK_PROPERTIES, indexUpdatePipeline);
        // 判断是否存在
        if (scheduler.checkExists(getJobKey(jobId, jobGroup))) {
            // 防止创建时存在数据问题 先移除,然后在执行创建操作
            scheduler.deleteJob(getJobKey(jobId, jobGroup));
        }
        scheduler.scheduleJob(jobDetail, trigger);
        // 暂停任务
        if (Boolean.FALSE.equals(indexUpdatePipeline.getStatus())) {
            scheduler.pauseJob(ScheduleUtils.getJobKey(jobId, jobGroup));
        }
    }
 
    /**
     * 构建任务键对象
     */
    public static JobKey getJobKey(Long jobId, String jobGroup) {
        return JobKey.jobKey(ScheduleConstants.TASK_JOB + jobId, jobGroup);
    }
 
    /**
     * 构建任务触发对象
     */
    public static TriggerKey getTriggerKey(Long jobId, String jobGroup) {
        return TriggerKey.triggerKey(ScheduleConstants.TASK_TRIGGER + jobId, jobGroup);
    }
}

6、定时任务执行类
上面的代码可以看到,定时任务实际执行的类是 QuartzDisallowConcurrentExecution 该类定义如下:

@DisallowConcurrentExecution
public class QuartzDisallowConcurrentExecution extends AbstractQuartzJob<JSONObject>{
    @Override
    protected JSONObject doExecute(JobExecutionContext context, SolrIndexUpdatePipeline indexUpdatePipeline) {
        return SpringUtil.getBean(IncrementIndexTask.class).doExecute(context, indexUpdatePipeline);
    }
}

在 Quartz 中,@DisallowConcurrentExecution 是一个用于防止并发执行同一个 Job 类的注解。它的作用是确保同一个 Job 的多个实例不会在同一时间并发运行,即使调度触发的频率很高,也只会串行地执行任务。

上面代码的父类 AbstractQuartzJob 内容如下:

@Slf4j
public abstract class AbstractQuartzJob<T> implements Job {
 
    /**
     * 线程本地变量
     */
    private static final ThreadLocal<LocalDateTime> THREAD_LOCAL = new ThreadLocal<>();
 
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        SolrIndexUpdatePipeline solrIndexUpdatePipeline = new SolrIndexUpdatePipeline();
        // 从任务参数中获取数据, 并拷贝到实体类中
        BeanUtils.copyProperties(context.getMergedJobDataMap().get(TASK_PROPERTIES), solrIndexUpdatePipeline);
 
        try {
            before(context, solrIndexUpdatePipeline);
            T result = doExecute(context, solrIndexUpdatePipeline);
            after(context, solrIndexUpdatePipeline, result,null);
        } catch (Exception e) {
            log.error("任务执行异常  - :", e);
            after(context, solrIndexUpdatePipeline, null, e);
        }
    }
 
    /**
     * 执行前
     *
     * @param context             工作执行上下文对象
     * @param indexUpdatePipeline 索引更新流水线
     */
    protected void before(JobExecutionContext context, SolrIndexUpdatePipeline indexUpdatePipeline) {
        THREAD_LOCAL.set(LocalDateTime.now());
    }
 
    /**
     * 执行后
     *
     * @param context             工作执行上下文对象
     * @param indexUpdatePipeline 索引更新流水线
     */
    protected void after(JobExecutionContext context, SolrIndexUpdatePipeline indexUpdatePipeline, T result, Exception e) {
        LocalDateTime startTime = THREAD_LOCAL.get();
        THREAD_LOCAL.remove();
 
        // TODO 日志记录
    }
 
    /**
     * 执行方法,由子类重载
     *
     * @param context             工作执行上下文对象
     * @param indexUpdatePipeline 索引更新流水线
     * @throws Exception 执行过程中的异常
     */
    protected abstract T doExecute(JobExecutionContext context, SolrIndexUpdatePipeline indexUpdatePipeline) throws Exception;
}

至于 SpringUtil.getBean(IncrementIndexTask.class).doExecute(context, indexUpdatePipeline); 中的 IncrementIndexTask是最终执行任务的类,它负责处理业务逻辑:

@Slf4j
@Component
public class IncrementIndexTask {
 
    public JSONObject doExecute(JobExecutionContext context, SolrIndexUpdatePipeline indexUpdatePipeline) {
        Trigger trigger = context.getTrigger();
        // 获取下次执行时间
        Date nextFireTime = trigger.getNextFireTime();
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.info("IncrementIndexTask doExecute, id:{}, next execution time: {}", indexUpdatePipeline.getId(), format.format(nextFireTime));
        return null;
    }
}

以上就是主要的配置内容。

遇到的问题:

1、Caused by: org.quartz.SchedulerConfigException: DataSource name not set.
在这里插入图片描述
项目启动后报了上面的问题, 该原因是 spring-boot-starter-parent 的版本问题导致的,spring-boot-starter-parent 版本不一样引用的 quartz的默认数据源的路径也发生了改变。 解决办法如下:

如果 spring-boot-starter-parent 的版本为 2.5.6 之后的版本 在 定时任务JDBC配置文件中应该修改如下操作:

prop.put("org.quartz.jobStore.class", "org.springframework.scheduling.quartz.LocalDataSourceJobStore");

如果 spring-boot-starter-parent 的版本为 2.5.6 之前的版本 在 定时任务JDBC配置文件中应该修改如下操作:

prop.put("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");

参考博文
https://blog.csdn.net/lml21lml/article/details/129204453


http://www.kler.cn/news/340926.html

相关文章:

  • 水下图像增强(论文复现)
  • Anaconda创建环境
  • 深度学习:循环神经网络—RNN的原理
  • Python知识点:利用Python工具,如何使用OpenCV进行图像处理
  • Tailwind Css的使用
  • java的LinkedList
  • 搜维尔科技:使用 SenseGlove Nova 2 远程操作机械手,实现了对鸡蛋的精细操控
  • elasticsearch ES DBA常用语句
  • 手写mybatis之细化XML语句构建器,完善静态SQL解析
  • 从0到1:如何利用MemFire Cloud快速上线你的应用?
  • TCP协议的三次握手与四次挥手的过程
  • 机器学习可视化教程——混淆矩阵与回归图
  • 递归基本操作总结
  • 数据中心运维挑战:性能监控的困境与智能化解决方案的探寻
  • 快餐食品检测系统源码分享[一条龙教学YOLOV8标注好的数据集一键训练_70+全套改进创新点发刊_Web前端展示]
  • 超轻巧modbus调试助手使用说明
  • recyclerView(kotlin)
  • 非结构化数据管理中的元数据应用与实践
  • ctf.bugku-eval
  • 髓质脊髓三叉神经核文献阅读笔记