【分布式定时任务】XXL-JOB_2.4.1部署与实战
部署
分布式任务调度平台XXL-JOBXXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。https://www.xuxueli.com/xxl-job/
一 数据库配置
- 执行初始化数据库SQL
xxl-job/doc/db/tables_xxl_job.sql at 2.4.1 · xuxueli/xxl-job · GitHubA distributed task scheduling framework.(分布式任务调度平台XXL-JOB) - xxl-job/doc/db/tables_xxl_job.sql at 2.4.1 · xuxueli/xxl-jobhttps://github.com/xuxueli/xxl-job/blob/2.4.1/doc/db/tables_xxl_job.sql
#
# XXL-JOB v2.4.1
# Copyright (c) 2015-present, xuxueli.
CREATE database if NOT EXISTS `xxl_job` default character set utf8mb4 collate utf8mb4_unicode_ci;
use `xxl_job`;
SET NAMES utf8mb4;
CREATE TABLE `xxl_job_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`job_group` int(11) NOT NULL COMMENT '执行器主键ID',
`job_desc` varchar(255) NOT NULL,
`add_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`author` varchar(64) DEFAULT NULL COMMENT '作者',
`alarm_email` varchar(255) DEFAULT NULL COMMENT '报警邮件',
`schedule_type` varchar(50) NOT NULL DEFAULT 'NONE' COMMENT '调度类型',
`schedule_conf` varchar(128) DEFAULT NULL COMMENT '调度配置,值含义取决于调度类型',
`misfire_strategy` varchar(50) NOT NULL DEFAULT 'DO_NOTHING' COMMENT '调度过期策略',
`executor_route_strategy` varchar(50) DEFAULT NULL COMMENT '执行器路由策略',
`executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
`executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
`executor_block_strategy` varchar(50) DEFAULT NULL COMMENT '阻塞处理策略',
`executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
`executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',
`glue_type` varchar(50) NOT NULL COMMENT 'GLUE类型',
`glue_source` mediumtext COMMENT 'GLUE源代码',
`glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE备注',
`glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间',
`child_jobid` varchar(255) DEFAULT NULL COMMENT '子任务ID,多个逗号分隔',
`trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行',
`trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次调度时间',
`trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次调度时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`job_group` int(11) NOT NULL COMMENT '执行器主键ID',
`job_id` int(11) NOT NULL COMMENT '任务,主键ID',
`executor_address` varchar(255) DEFAULT NULL COMMENT '执行器地址,本次执行的地址',
`executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
`executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
`executor_sharding_param` varchar(20) DEFAULT NULL COMMENT '执行器任务分片参数,格式如 1/2',
`executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',
`trigger_time` datetime DEFAULT NULL COMMENT '调度-时间',
`trigger_code` int(11) NOT NULL COMMENT '调度-结果',
`trigger_msg` text COMMENT '调度-日志',
`handle_time` datetime DEFAULT NULL COMMENT '执行-时间',
`handle_code` int(11) NOT NULL COMMENT '执行-状态',
`handle_msg` text COMMENT '执行-日志',
`alarm_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '告警状态:0-默认、1-无需告警、2-告警成功、3-告警失败',
PRIMARY KEY (`id`),
KEY `I_trigger_time` (`trigger_time`),
KEY `I_handle_code` (`handle_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_log_report` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`trigger_day` datetime DEFAULT NULL COMMENT '调度-时间',
`running_count` int(11) NOT NULL DEFAULT '0' COMMENT '运行中-日志数量',
`suc_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行成功-日志数量',
`fail_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行失败-日志数量',
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `i_trigger_day` (`trigger_day`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_logglue` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`job_id` int(11) NOT NULL COMMENT '任务,主键ID',
`glue_type` varchar(50) DEFAULT NULL COMMENT 'GLUE类型',
`glue_source` mediumtext COMMENT 'GLUE源代码',
`glue_remark` varchar(128) NOT NULL COMMENT 'GLUE备注',
`add_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_registry` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`registry_group` varchar(50) NOT NULL,
`registry_key` varchar(255) NOT NULL,
`registry_value` varchar(255) NOT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `i_g_k_v` (`registry_group`,`registry_key`,`registry_value`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_group` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`app_name` varchar(64) NOT NULL COMMENT '执行器AppName',
`title` varchar(12) NOT NULL COMMENT '执行器名称',
`address_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '执行器地址类型:0=自动注册、1=手动录入',
`address_list` text COMMENT '执行器地址列表,多地址逗号分隔',
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`username` varchar(50) NOT NULL COMMENT '账号',
`password` varchar(50) NOT NULL COMMENT '密码',
`role` tinyint(4) NOT NULL COMMENT '角色:0-普通用户、1-管理员',
`permission` varchar(255) DEFAULT NULL COMMENT '权限:执行器ID列表,多个逗号分割',
PRIMARY KEY (`id`),
UNIQUE KEY `i_username` (`username`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_lock` (
`lock_name` varchar(50) NOT NULL COMMENT '锁名称',
PRIMARY KEY (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `address_type`, `address_list`, `update_time`) VALUES (1, 'xxl-job-executor-sample', '示例执行器', 0, NULL, '2018-11-03 22:21:31' );
INSERT INTO `xxl_job_info`(`id`, `job_group`, `job_desc`, `add_time`, `update_time`, `author`, `alarm_email`, `schedule_type`, `schedule_conf`, `misfire_strategy`, `executor_route_strategy`, `executor_handler`, `executor_param`, `executor_block_strategy`, `executor_timeout`, `executor_fail_retry_count`, `glue_type`, `glue_source`, `glue_remark`, `glue_updatetime`, `child_jobid`) VALUES (1, 1, '测试任务1', '2018-11-03 22:21:31', '2018-11-03 22:21:31', 'XXL', '', 'CRON', '0 0 0 * * ? *', 'DO_NOTHING', 'FIRST', 'demoJobHandler', '', 'SERIAL_EXECUTION', 0, 0, 'BEAN', '', 'GLUE代码初始化', '2018-11-03 22:21:31', '');
INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`) VALUES (1, 'admin', 'e10adc3949ba59abbe56e057f20f883e', 1, NULL);
INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock');
commit;
- 创建job_user用户作为数据库管理账号
CREATE USER 'job_user'@'%' IDENTIFIED BY 'xxljob0819';
GRANT ALL PRIVILEGES ON xxl_job.* TO 'job_user'@'%';
FLUSH PRIVILEGES;
二 DOCKER部署
# 拉取镜像
docker pull xuxueli/xxl-job-admin:2.4.1
# 启动
docker run -d \
-e PARAMS="--spring.datasource.url=jdbc:mysql://ip:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai --spring.datasource.username=job_user --spring.datasource.password=xxljob0819" \
-p 7080:8080 \
-v /opt/docker/xxljob/tmp:/data/applogs \
--name xxl-job \
xuxueli/xxl-job-admin:2.4.1
三 验证
- 访问:http://ip:port/xxl-job-admin/toLogin
- 默认用户:admin/123456
- 重置密码
代码
一 JAVA配置
1 依赖
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.4.1</version>
</dependency>
2 配置
### 调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
xxl.job.admin.addresses=http://ip:port/xxl-job-admin
### 执行器通讯TOKEN [选填]:非空时启用;
xxl.job.accessToken=default_token
### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
xxl.job.executor.appname=${spring.application.name}
### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
xxl.job.executor.address=
### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
xxl.job.executor.ip=
### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
xxl.job.executor.port=8800
### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
xxl.job.executor.logretentiondays=30
3 自动装配
package com.xx.core.basic.xxljob.configurations;
import com.xx.core.basic.xxljob.properties.XxlJobProperty;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
/**
* XXL-JOB自动装配配置类
*
* @author wuzijing
* @since 2024-08-19
*/
@Slf4j
@AutoConfiguration
@EnableConfigurationProperties({
XxlJobProperty.class
})
public class XxlJobAutoConfiguration {
@Bean
public XxlJobSpringExecutor xxlJobExecutor(XxlJobProperty xxlJobProperty) {
log.info("xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(xxlJobProperty.getAdmin().getAddresses());
xxlJobSpringExecutor.setAppname(xxlJobProperty.getExecutor().getAppname());
xxlJobSpringExecutor.setIp(xxlJobProperty.getExecutor().getIp());
xxlJobSpringExecutor.setPort(xxlJobProperty.getExecutor().getPort());
xxlJobSpringExecutor.setAccessToken(xxlJobProperty.getAccessToken());
xxlJobSpringExecutor.setLogPath(xxlJobProperty.getExecutor().getLogpath());
xxlJobSpringExecutor.setLogRetentionDays(xxlJobProperty.getExecutor().getLogretentiondays());
return xxlJobSpringExecutor;
}
}
package com.xx.core.basic.xxljob.properties;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* XXL-JOB配置参数类
*
* @author wuzijing
* @since 2024-08-19
*/
@Getter
@Setter
@ConfigurationProperties(prefix = "xxl.job")
public class XxlJobProperty {
/**
* 执行器通讯TOKEN [选填]:非空时启用;
*/
private String accessToken;
/**
* 调度中心配置
*/
private JobAdminProperty admin;
/**
* 执行器配置
*/
private JobExecutorProperty executor;
}
package com.xx.core.basic.xxljob.properties;
import lombok.Getter;
import lombok.Setter;
/**
* XXL-JOB执行期配置参数类
*
* @author wuzijing
* @since 2024-08-19
*/
@Getter
@Setter
public class JobExecutorProperty {
/**
* 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
*/
private String appname;
/**
* 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
*/
private String address;
/**
* 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
*/
private String ip;
/**
* 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
*/
private Integer port;
/**
* 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
*/
private String logpath;
/**
* 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
*/
private Integer logretentiondays;
}
package com.xx.core.basic.xxljob.properties;
import lombok.Getter;
import lombok.Setter;
/**
* XXL-JOB调度中心配置参数类
*
* @author wuzijing
* @since 2024-08-19
*/
@Getter
@Setter
public class JobAdminProperty {
/**
* 调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
*/
private String addresses;
}
二 任务实现
1 代码
package com.xx.storage.module.task;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.dynamic.datasource.annotation.DSTransactional;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.xx.core.basic.constant.IntegerConstant;
import com.xx.storage.module.bean.StrgConfigDO;
import com.xx.storage.module.bean.StrgFileDO;
import com.xx.storage.module.constants.RedissonLockKeyConstant;
import com.xx.storage.module.dao.StrgConfigDao;
import com.xx.storage.module.dao.StrgFileDao;
import com.xx.storage.module.utils.AmazonS3Util;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 存储文件信息表清理定时任务
*
* @author wuzijing
* @since 2024-08-20
*/
@Slf4j
@Component
@DSTransactional
public class StrgFileCleanTask {
@Resource
private StrgConfigDao strgConfigDao;
@Resource
private StrgFileDao strgFileDao;
@Resource
private RedissonClient redissonClient;
private record FileGroup(String endpointUrl, String bucketName, String keyId, String keySecret,
List<String> objectNameList) {
}
/**
* 存储文件信息表清理定时任务
* <p>
* 入参设定:yyyy-MM-dd 清除指定日期(00:00:00)前的文件
* 无参设定:清除一个月(00:00:00)前的文件
*/
@XxlJob("strgFileCleanTask")
public void strgFileCleanTask() {
// 设置清除时间
LocalDateTime clearTime;
String param = XxlJobHelper.getJobParam();
if (!StrUtil.isBlankOrUndefined(param)) {
clearTime = LocalDateTimeUtil.beginOfDay(LocalDateTimeUtil.parse(param, DatePattern.NORM_DATE_PATTERN));
} else {
clearTime = LocalDateTimeUtil.beginOfDay(LocalDateTimeUtil.now().minusMonths(IntegerConstant.ONE));
}
if (ObjectUtil.isNull(clearTime)) {
XxlJobHelper.handleFail("存储文件信息表清理定时任务执行失败,入参解析异常:param=" + param);
return;
}
RLock rLock = redissonClient.getLock(RedissonLockKeyConstant.FILE_CLEAN_TASK);
if (!rLock.tryLock()) {
XxlJobHelper.handleFail("存储文件信息表清理定时任务执行失败,当前任务执行中");
return;
}
try {
// 获取对应配置
LambdaQueryWrapper<StrgConfigDO> configWrapper = new LambdaQueryWrapper<>();
configWrapper.eq(StrgConfigDO::getIsClear, IntegerConstant.ONE);
List<StrgConfigDO> configDOList = strgConfigDao.selectList(configWrapper);
if (ObjectUtil.isEmpty(configDOList)) {
XxlJobHelper.handleSuccess(this.successResult(clearTime, IntegerConstant.ZERO));
return;
}
// 获取待清理文件列表
LambdaQueryWrapper<StrgFileDO> fileWrapper = new LambdaQueryWrapper<>();
fileWrapper.in(StrgFileDO::getConfigCode, configDOList.stream().map(StrgConfigDO::getCode));
fileWrapper.and(and -> {
and.isNull(StrgFileDO::getBizGuid).or();
and.eq(StrgFileDO::getBizGuid, StrUtil.EMPTY);
});
fileWrapper.le(StrgFileDO::getModifyTime, clearTime);
List<StrgFileDO> fileDOList = strgFileDao.selectList(fileWrapper);
if (ObjectUtil.isEmpty(fileDOList)) {
XxlJobHelper.handleSuccess(this.successResult(clearTime, IntegerConstant.ZERO));
return;
}
// 文件分组
Map<String, FileGroup> fileGroupMap = new HashMap<>();
for (StrgFileDO fileDO : fileDOList) {
FileGroup fileGroup = fileGroupMap.get(fileDO.getEndpointUrl() + fileDO.getBucketName() + fileDO.getKeyId() + fileDO.getKeySecret());
if (ObjectUtil.isNull(fileGroup)) {
fileGroup = new FileGroup(fileDO.getEndpointUrl(), fileDO.getBucketName(), fileDO.getKeyId(), fileDO.getKeySecret(), List.of(fileDO.getPath()));
} else {
fileGroup.objectNameList().add(fileDO.getPath());
fileGroup = new FileGroup(fileDO.getEndpointUrl(), fileDO.getBucketName(), fileDO.getKeyId(), fileDO.getKeySecret(), fileGroup.objectNameList());
}
fileGroupMap.put(fileDO.getEndpointUrl() + fileDO.getBucketName() + fileDO.getKeyId() + fileDO.getKeySecret(), fileGroup);
}
// 开始清理文件
fileGroupMap.forEach((key, fileGroup) -> {
AmazonS3Util.deleteFile(fileGroup.endpointUrl(), fileGroup.bucketName(), fileGroup.keyId(), fileGroup.keySecret(), fileGroup.objectNameList());
});
XxlJobHelper.handleSuccess(this.successResult(clearTime, fileDOList.size()));
} catch (Exception error) {
log.error("存储文件信息表清理定时任务执行异常,错误信息:{}", error.getMessage());
XxlJobHelper.handleFail("存储文件信息表清理定时任务执行异常,错误信息:" + error.getMessage());
} finally {
rLock.unlock();
}
}
/**
* 成功结果
*
* @param time 清理时间
* @param num 清理成功数量
* @return 成功结果消息
*/
private String successResult(LocalDateTime time, Integer num) {
return "存储文件信息表" + LocalDateTimeUtil.format(time, DatePattern.NORM_DATE_PATTERN) + "前数据清理定时任务执行成功,清理文件数:" + num;
}
}
2 配置
(1)执行器配置
(2)任务配置