Xxl-Job学习笔记
目录
概述
核心架构
核心特点
应用场景
什么是任务调度
快速入门
获取源码
初始化调度数据库
基本配置
数据源datasource
邮箱email(可选)
会话令牌access token
启动调度中心
启动执行器
依赖
yaml基本配置
XxlJobConfig类配置
定义执行任务
添加执行任务
初级阶段
时间转为Cron表达式工具类
XxlJobRemoteApiUtils工具类
引入远程发送请求依赖
允许远程调用
概述
XXL-Job 是一个轻量级、分布式任务调度平台,由国内技术团队开发并开源。它旨在解决分布式系统中的定时任务调度问题,提供了一整套简单、高效且可靠的解决方案
核心架构
XXL-Job 的架构主要由以下几部分组成:
- 调度中心(Admin):负责任务的管理、调度策略、触发时机以及调度请求的发起。它提供了可视化的 Web 管理界面,方便用户进行任务的增删改查和调度监控。
-
执行器(Executor):部署在业务服务环境中,用于接收调度中心的请求并执行具体的任务逻辑
-
任务代码:由开发者编写的业务逻辑代码,注册到执行器中,由调度中心触发执行
核心特点
-
轻量级设计:核心代码简洁高效,易于集成和部署
-
分布式调度:支持多机分布式部署,可水平扩展,提高系统可用性和负载能力
-
简单易用:提供简洁的 API 和可视化界面,便于任务的创建、管理和监控
-
功能丰富:支持多种任务类型(如定时任务、周期任务、一次性任务),并提供任务分片、失败重试、任务依赖等功能
-
弹性扩缩容:支持动态添加或移除执行器节点,无需停止服务
-
高可用性:通过多节点部署和故障转移机制,确保任务的不中断执行
应用场景
XXL-Job 广泛应用于以下场景:
-
定时任务:如数据备份、报表生成、系统维护等
-
分布式任务处理:支持任务分片并行执行,提高任务处理效率
-
弹性扩缩容:根据业务量动态调整执行器数量,应对业务波动
-
业务流程自动化:实现复杂业务流程的自动化调度
什么是任务调度
我们可以思考一下下面业务场景的解决方案:
- 某电商平台需要每天上午10点,下午3点,晚上8点发放一批优惠券
- 某银行系统需要在信用卡到期还款日的前三天进行短信提醒
- 某财务系统需要在每天凌晨0:10分结算前一天的财务数据,统计汇总
以上场景就是任务调度所需要解决的问题。
任务调度是为了自动完成特定任务,在约定的特定时刻去执行任务的过程。
快速入门
官网: 分布式任务调度平台XXL-JOB
获取源码
源码仓库地址 |
https://github.com/xuxueli/xxl-job |
xxl-job: 一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。 |
GitCode - 全球开发者的开源社区,开源代码托管平台 |
获取源码解压即可
初始化调度数据库
打开项目我们可以获取到 调度数据库 ,路径为: xxl-job-master/doc/db/tables_xxl_job.sql
使用 数据库连接工具初始化运行即可。
基本配置
数据源datasource
随后打开使用 xxl-job-admin 模块,这个模块就是用于管理我们的调度。并修改我们数据相关配置:
邮箱email(可选)
email 的相关配置,就是 当我们调度执行失败的时候,可以通过 email 进行通知。具体email 的配置,通过个人的邮箱平台相关配置即可。
会话令牌access token
执行器 连接 调度中心 所需要的令牌。
启动调度中心
启动 admin 模块
调度中心访问地址: http://localhost:8080/xxl-job-admin
默认登录账号“admin/123456”,登录后运行界面如下图所示
启动执行器
打开你自己的项目,并进行相关配置。
依赖
<!-- xxl-job -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.1</version>
</dependency>
yaml基本配置
dev:
xxl:
job:
admin:
### 调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
addresses: http://localhost:8080/xxl-job-admin
### 调度中心通讯TOKEN [选填]:非空时启用;
accessToken: default
### 调度中心通讯超时时间[选填],单位秒;默认3s;
executor:
### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
appname: xxl-job-executor-sample
### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
address:
### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯使用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
ip: localhost
### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
port: 9999
### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
logpath: /data/applogs/xxl-job/jobhandler
### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
logretentiondays: 30
XxlJobConfig类配置
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class XxlJobConfig {
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.admin.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
log.info(">>>>>>>>>>> xxl-job config init.");
log.info("adminAddress:{}", adminAddresses);
log.info("appname:{}", appname);
log.info("ip:{}", ip);
log.info("port:{}", port);
log.info("accessToken:{}", accessToken);
log.info("logPath:{}", logPath);
log.info("logRetentionDays:{}", logRetentionDays);
log.info(">>>>>>>>>>> xxl-job config init finish.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}
随后启动我们自己的项目。
同时查看执行器管理器
可以发现远程注册端口节点成功。
定义执行任务
添加执行任务
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class SimpleXxlJob {
@XxlJob("simpleJobHandler") // 注解内的参数为我们运行模式为 Bean 类型对应的 JobHandler
public void simpleJobHandler() throws Exception {
System.out.println("执行定时任务,执行时间>>>>>>>>>>> xxl-job, Hello World." + new Date());
}
}
尝试执行一次。
通过我们执行一次成功后并调用对应方法,即可。可以根据我们自己需求进行启动配置对应的方法了。
初级阶段
有时候我们需要的是,用户使用自己的前端去设置触发的时间。并不是我们去xxl-job-admin 的管理端进行添加定时任务的。
时间转为Cron表达式工具类
以下是我收集的所用到的工具类。可以参考一下。
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
*
* <p>
* 将时间转为Cron表达式
* </p>
*
* @author Angindem
* @since 2025-03-08
*
*/
public class CronUtils {
private static final DateTimeFormatter FORMAT = DateTimeFormatter.ofPattern("ss mm HH dd MM ? yyyy");
public enum TimeCycle {
YEAR, MONTH, WEEK, DAY, HOUR, MINUTE, SECOND
}
/**
* 将LocalDateTime转换为cron表达式的字符串。
* @param dateTime 要转换的时间字符串
* @param format 要转换的时间格式
* @return cron表达式
*/
public static String toCronExpression(String dateTime, String format) {
LocalDateTime localDate = LocalDateTime.parse(dateTime, DateTimeFormatter.ofPattern(format));
String date = localDate.format(FORMAT);
return date;
}
/**
* 将LocalDateTime转换为cron表达式的字符串。
* @param dateTime 要转换的LocalDateTime
* @return cron表达式
*/
public static String toCronExpression(LocalDateTime dateTime) {
String date = dateTime.format(FORMAT);
return date;
}
/**
* 将多个 LocalDateTime 对象转换为一个 cron 表达式字符串
* @param times LocalDateTime 对象列表
* @return cron 表达式字符串
*/
public static String convertToCron(List<LocalDateTime> times) {
// 提取秒、分、时、日、月、周几
int second = times.get(0).getSecond();
int minute = times.get(0).getMinute();
List<Integer> hours = new ArrayList<>();
List<Integer> daysOfMonth = new ArrayList<>();
List<Integer> months = new ArrayList<>();
List<Integer> daysOfWeek = new ArrayList<>();
for (LocalDateTime time : times) {
hours.add(time.getHour());
daysOfMonth.add(time.getDayOfMonth());
months.add(time.getMonthValue());
daysOfWeek.add(time.getDayOfWeek().getValue());
}
// 构造Cron表达式
StringBuilder cron = new StringBuilder();
cron.append(second).append(" ");
cron.append(minute).append(" ");
cron.append(String.join(",", hours.stream().map(Object::toString).collect(Collectors.toList()))).append(" ");
cron.append(String.join(",", daysOfMonth.stream().map(Object::toString).collect(Collectors.toList()))).append(" ");
cron.append(String.join(",", months.stream().map(Object::toString).collect(Collectors.toList()))).append(" ");
cron.append(String.join(",", daysOfWeek.stream().map(Object::toString).collect(Collectors.toList())));
return cron.toString();
}
/**
* 将指定的 LocalDateTime 对象转换为 指定周期的 cron 表达式字符串
* @param dateTime LocalDateTime 对象
* @param timeCycle 时间周期枚举值
* @return cron 表达式字符串
*/
public static String toCronExpression(LocalDateTime dateTime, TimeCycle timeCycle) {
String cron = null;
switch (timeCycle) {
case YEAR:
cron = String.format("%d %d %d %d %d ? *", dateTime.getSecond(),
dateTime.getMinute(), dateTime.getHour(), dateTime.getDayOfMonth(),
dateTime.getMonthValue());
break;
case MONTH:
cron = String.format("%d %d %d %d * ? *", dateTime.getSecond(),
dateTime.getMinute(), dateTime.getHour(), dateTime.getDayOfMonth());
break;
case WEEK:
cron = String.format("%d %d %d ? * %d *", dateTime.getSecond(),
dateTime.getMinute(), dateTime.getHour(), dateTime.getDayOfWeek().getValue() % 7);
break;
case DAY:
cron = String.format("%d %d %d * * ? *", dateTime.getSecond(),
dateTime.getMinute(), dateTime.getHour());
break;
case HOUR:
cron = String.format("%d %d * * * ? *", dateTime.getSecond(),
dateTime.getMinute());
break;
case MINUTE:
cron = String.format("%d * * * * ? *", dateTime.getSecond());
break;
case SECOND:
cron = "0/1 * * * * ? *";
break;
default:
throw new IllegalArgumentException("Unknown time cycle: " + timeCycle);
}
return cron;
}
}
XxlJobRemoteApiUtils工具类
转完 Cron 表达式后,我们可以通过远程调用 xxl-job-admin 的对应接口进行操作添加。
引入远程发送请求依赖
<!--httpclient的坐标用于在java中发起请求-->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<!--使用fastjson解析json数据 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.42</version>
</dependency>
以下是我收集并使用的工具类,可以做一下参考。
import com.fasterxml.jackson.databind.ObjectMapper;
import com.pea.mic.domain.po.XxlJobInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Component
public class XxlJobRemoteApiUtils {
private static String adminAddresses;
private static String appname;
private static String accessToken;
private final static RestTemplate restTemplate = new RestTemplate();
private static final String ADD_URL = "/jobinfo/add";
private static final String UPDATE_URL = "/jobinfo/update";
private static final String REMOVE_URL = "/jobinfo/remove";
private static final String PAUSE_URL = "/jobinfo/pause";
private static final String START_URL = "/jobinfo/start";
@Autowired
public void init(Environment env) {
adminAddresses = env.getProperty("xxl.job.admin.addresses");
appname = env.getProperty("xxl.job.executor.appname");
accessToken = env.getProperty("xxl.job.admin.accessToken");
log.info("xxl.job.admin.addresses:{}", adminAddresses);
log.info("xxl.job.executor.appname:{}", appname);
log.info("xxl.job.accessToken:{}", accessToken);
}
public static Map getJobInfoByLocalDateTime(LocalDateTime times,
String author,
String JobDesc,
String taskHandler){
String cron = CronUtils.toCronExpression(times);
XxlJobInfo jobInfo = new XxlJobInfo();
jobInfo.setJobGroup(2);
jobInfo.setJobDesc(JobDesc);
jobInfo.setAuthor(author);
jobInfo.setScheduleType("CRON");
jobInfo.setMisfireStrategy("DO_NOTHING");
//执行时间
jobInfo.setScheduleConf(cron);
jobInfo.setGlueType("BEAN");
jobInfo.setExecutorHandler(taskHandler);
jobInfo.setExecutorRouteStrategy("FIRST");
jobInfo.setExecutorBlockStrategy("SERIAL_EXECUTION");
jobInfo.setExecutorTimeout(0);
jobInfo.setExecutorFailRetryCount(0);
jobInfo.setGlueType("BEAN");
jobInfo.setGlueRemark("GLUE代码初始化");
jobInfo.setTriggerStatus(0);
jobInfo.setTriggerLastTime(0);
jobInfo.setTriggerNextTime(0);
ObjectMapper objectMapper = new ObjectMapper();
Map map = objectMapper.convertValue(jobInfo, Map.class);
return map;
}
public static String add(Map param){
return doPost(adminAddresses + ADD_URL, param);
}
public static String update(String id, String cron){
Map param = new HashMap<>();
param.put("id", id);
param.put("jobCron", cron);
return doPost(adminAddresses + UPDATE_URL, param);
}
public static String remove(String id){
Map param = new HashMap<>();
param.put("id", id);
return doGet(adminAddresses + REMOVE_URL, param);
}
public static String pause(String id){
Map param = new HashMap<>();
param.put("id", id);
return doGet(adminAddresses + PAUSE_URL, param);
}
public static String start(String id){
Map param = new HashMap<>();
param.put("id", id);
return doGet(adminAddresses + START_URL, param);
}
public static String doPost(String url, Map fromData){
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.add("accessToken", accessToken);
HttpEntity<Map> entity = new HttpEntity<>(fromData ,headers);
log.info(entity.toString());
ResponseEntity<String> stringResponseEntity = restTemplate.postForEntity(url, entity, String.class);
return stringResponseEntity.getBody().toString();
}
public static String doGet(String url, Map<String, String> params) {
// 创建可关闭的HttpClient,使用try-with-resources确保资源自动关闭
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
// 使用URIBuilder来构建带参数的URL
URIBuilder uriBuilder = new URIBuilder(url);
// 将Map中的参数添加到URL中
if (params != null) {
for (Map.Entry<String, String> entry : params.entrySet()) {
uriBuilder.setParameter(entry.getKey(), entry.getValue());
}
}
// 创建GET请求对象
HttpGet httpGet = new HttpGet(uriBuilder.build());
httpGet.setHeader("accessToken", accessToken);
// 设置请求配置(超时等)
// httpGet.setConfig(buildRequestConfig());
// 执行请求并获取响应
try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
// 判断响应状态码是否为200(成功)
if (response.getStatusLine().getStatusCode() == 200) {
// 获取响应内容并转换为字符串
String result = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
log.info("GET Response body: {}", result);
return result;
}
}
} catch (Exception e) {
log.error("发送GET请求出错: ", e);
}
return null;
}
}
允许远程调用
通过二次开发xxl-job-admin,允许远程调用,修改方法如下:
通过发送请求并,走的流程,可以直到调用接口的时候是通过请求参数中的XXL_JOB_LOGIN_IDENTITY 进行校验,我们可以通过当我们发送的请求参数,直接获取即可cookieToken,即可。
PS:由于博主已经实验成功过了,具体方法,大家可以参考参考即可。博主就不走结果啦。
----------------------------持续更新中----------------------------