当前位置: 首页 > article >正文

Xxl-Job学习笔记

目录

概述

核心架构

核心特点

应用场景

什么是任务调度

快速入门

获取源码

初始化调度数据库

基本配置

数据源datasource

邮箱email(可选)

会话令牌access token

启动调度中心

启动执行器

依赖

yaml基本配置

XxlJobConfig类配置

定义执行任务

添加执行任务

初级阶段

时间转为Cron表达式工具类

XxlJobRemoteApiUtils工具类

引入远程发送请求依赖

允许远程调用

概述

        XXL-Job 是一个轻量级、分布式任务调度平台,由国内技术团队开发并开源。它旨在解决分布式系统中的定时任务调度问题,提供了一整套简单、高效且可靠的解决方案

核心架构

XXL-Job 的架构主要由以下几部分组成:

  • 调度中心(Admin):负责任务的管理、调度策略、触发时机以及调度请求的发起。它提供了可视化的 Web 管理界面,方便用户进行任务的增删改查和调度监控。
  • 执行器(Executor):部署在业务服务环境中,用于接收调度中心的请求并执行具体的任务逻辑

  • 任务代码:由开发者编写的业务逻辑代码,注册到执行器中,由调度中心触发执行

核心特点

  • 轻量级设计:核心代码简洁高效,易于集成和部署

  • 分布式调度:支持多机分布式部署,可水平扩展,提高系统可用性和负载能力

  • 简单易用:提供简洁的 API 和可视化界面,便于任务的创建、管理和监控

  • 功能丰富:支持多种任务类型(如定时任务、周期任务、一次性任务),并提供任务分片、失败重试、任务依赖等功能

  • 弹性扩缩容:支持动态添加或移除执行器节点,无需停止服务

  • 高可用性:通过多节点部署和故障转移机制,确保任务的不中断执行

应用场景

XXL-Job 广泛应用于以下场景:

  • 定时任务:如数据备份、报表生成、系统维护等

  • 分布式任务处理:支持任务分片并行执行,提高任务处理效率

  • 弹性扩缩容:根据业务量动态调整执行器数量,应对业务波动

  • 业务流程自动化:实现复杂业务流程的自动化调度

什么是任务调度

我们可以思考一下下面业务场景的解决方案:

  • 某电商平台需要每天上午10点,下午3点,晚上8点发放一批优惠券
  • 某银行系统需要在信用卡到期还款日的前三天进行短信提醒
  • 某财务系统需要在每天凌晨0:10分结算前一天的财务数据,统计汇总

以上场景就是任务调度所需要解决的问题。

任务调度是为了自动完成特定任务,在约定的特定时刻去执行任务的过程。

快速入门

官网: 分布式任务调度平台XXL-JOB

获取源码

源码仓库地址
https://github.com/xuxueli/xxl-job
xxl-job: 一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
GitCode - 全球开发者的开源社区,开源代码托管平台

获取源码解压即可

初始化调度数据库

打开项目我们可以获取到 调度数据库 ,路径为: xxl-job-master/doc/db/tables_xxl_job.sql

使用 数据库连接工具初始化运行即可。

基本配置

数据源datasource

随后打开使用 xxl-job-admin 模块,这个模块就是用于管理我们的调度。并修改我们数据相关配置:

邮箱email(可选)

email 的相关配置,就是 当我们调度执行失败的时候,可以通过 email 进行通知。具体email 的配置,通过个人的邮箱平台相关配置即可。

会话令牌access token

执行器 连接 调度中心 所需要的令牌。

启动调度中心

启动 admin 模块

调度中心访问地址: http://localhost:8080/xxl-job-admin

默认登录账号“admin/123456”,登录后运行界面如下图所示

启动执行器

打开你自己的项目,并进行相关配置。

依赖
<!--       xxl-job -->
        <dependency>
            <groupId>com.xuxueli</groupId>
            <artifactId>xxl-job-core</artifactId>
            <version>2.3.1</version>
        </dependency>
yaml基本配置
dev:
  xxl:
    job:
      admin:
        ### 调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
        addresses: http://localhost:8080/xxl-job-admin
        ### 调度中心通讯TOKEN [选填]:非空时启用;
        accessToken: default
        ### 调度中心通讯超时时间[选填],单位秒;默认3s;
      executor:
        ### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
        appname: xxl-job-executor-sample
        ### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
        address:
        ### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯使用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
        ip: localhost
        ### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
        port: 9999
        ### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
        logpath: /data/applogs/xxl-job/jobhandler
        ### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
        logretentiondays: 30
XxlJobConfig类配置
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class XxlJobConfig {

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;
    @Value("${xxl.job.admin.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init.");
        log.info("adminAddress:{}", adminAddresses);
        log.info("appname:{}", appname);
        log.info("ip:{}", ip);
        log.info("port:{}", port);
        log.info("accessToken:{}", accessToken);
        log.info("logPath:{}", logPath);
        log.info("logRetentionDays:{}", logRetentionDays);
        log.info(">>>>>>>>>>> xxl-job config init finish.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        return xxlJobSpringExecutor;
    }
}

随后启动我们自己的项目。

同时查看执行器管理器

可以发现远程注册端口节点成功。

定义执行任务

添加执行任务

import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class SimpleXxlJob {

    @XxlJob("simpleJobHandler")   // 注解内的参数为我们运行模式为 Bean 类型对应的 JobHandler
    public void simpleJobHandler() throws Exception {
        System.out.println("执行定时任务,执行时间>>>>>>>>>>> xxl-job, Hello World." + new Date());
    }
}

尝试执行一次。

通过我们执行一次成功后并调用对应方法,即可。可以根据我们自己需求进行启动配置对应的方法了。

初级阶段

有时候我们需要的是,用户使用自己的前端去设置触发的时间。并不是我们去xxl-job-admin 的管理端进行添加定时任务的。

时间转为Cron表达式工具类

以下是我收集的所用到的工具类。可以参考一下。


import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
 *
 * <p>
 * 将时间转为Cron表达式
 * </p> 
 *
 * @author Angindem
 * @since 2025-03-08
 *
 */
public class CronUtils {

    private static final DateTimeFormatter FORMAT = DateTimeFormatter.ofPattern("ss mm HH dd MM ? yyyy");

    public enum TimeCycle {
        YEAR, MONTH, WEEK, DAY, HOUR, MINUTE, SECOND
    }

    /**
     * 将LocalDateTime转换为cron表达式的字符串。
     * @param dateTime 要转换的时间字符串
     * @param format 要转换的时间格式
     * @return cron表达式
     */
    public static String toCronExpression(String dateTime, String format) {
        LocalDateTime localDate = LocalDateTime.parse(dateTime, DateTimeFormatter.ofPattern(format));
        String date = localDate.format(FORMAT);
        return date;
    }

    /**
     * 将LocalDateTime转换为cron表达式的字符串。
     * @param dateTime 要转换的LocalDateTime
     * @return cron表达式
     */

    public static String toCronExpression(LocalDateTime dateTime) {
        String date = dateTime.format(FORMAT);
        return date;
    }

    /**
     * 将多个 LocalDateTime 对象转换为一个 cron 表达式字符串
     * @param times LocalDateTime 对象列表
     * @return cron 表达式字符串
     */
    public static String convertToCron(List<LocalDateTime> times) {
        // 提取秒、分、时、日、月、周几
        int second = times.get(0).getSecond();
        int minute = times.get(0).getMinute();
        List<Integer> hours = new ArrayList<>();
        List<Integer> daysOfMonth = new ArrayList<>();
        List<Integer> months = new ArrayList<>();
        List<Integer> daysOfWeek = new ArrayList<>();

        for (LocalDateTime time : times) {
            hours.add(time.getHour());
            daysOfMonth.add(time.getDayOfMonth());
            months.add(time.getMonthValue());
            daysOfWeek.add(time.getDayOfWeek().getValue());
        }

        // 构造Cron表达式
        StringBuilder cron = new StringBuilder();
        cron.append(second).append(" ");
        cron.append(minute).append(" ");
        cron.append(String.join(",", hours.stream().map(Object::toString).collect(Collectors.toList()))).append(" ");
        cron.append(String.join(",", daysOfMonth.stream().map(Object::toString).collect(Collectors.toList()))).append(" ");
        cron.append(String.join(",", months.stream().map(Object::toString).collect(Collectors.toList()))).append(" ");
        cron.append(String.join(",", daysOfWeek.stream().map(Object::toString).collect(Collectors.toList())));
        return cron.toString();
    }

    /**
     * 将指定的 LocalDateTime 对象转换为 指定周期的 cron 表达式字符串
     * @param dateTime LocalDateTime 对象
     * @param timeCycle 时间周期枚举值
     * @return cron 表达式字符串
     */
    public static String toCronExpression(LocalDateTime dateTime, TimeCycle timeCycle) {
        String cron = null;
        switch (timeCycle) {
            case YEAR:
                cron = String.format("%d %d %d %d %d ? *", dateTime.getSecond(),
                        dateTime.getMinute(), dateTime.getHour(), dateTime.getDayOfMonth(),
                        dateTime.getMonthValue());
                break;
            case MONTH:
                cron = String.format("%d %d %d %d * ? *", dateTime.getSecond(),
                        dateTime.getMinute(), dateTime.getHour(), dateTime.getDayOfMonth());
                break;
            case WEEK:
                cron = String.format("%d %d %d ? * %d *", dateTime.getSecond(),
                        dateTime.getMinute(), dateTime.getHour(), dateTime.getDayOfWeek().getValue() % 7);
                break;
            case DAY:
                cron = String.format("%d %d %d * * ? *", dateTime.getSecond(),
                        dateTime.getMinute(), dateTime.getHour());
                break;
            case HOUR:
                cron = String.format("%d %d * * * ? *", dateTime.getSecond(),
                        dateTime.getMinute());
                break;
            case MINUTE:
                cron = String.format("%d * * * * ? *", dateTime.getSecond());
                break;
            case SECOND:
                cron = "0/1 * * * * ? *";
                break;
            default:
                throw new IllegalArgumentException("Unknown time cycle: " + timeCycle);
        }
        return cron;
    }
}

XxlJobRemoteApiUtils工具类

转完 Cron 表达式后,我们可以通过远程调用 xxl-job-admin 的对应接口进行操作添加。

引入远程发送请求依赖
        <!--httpclient的坐标用于在java中发起请求-->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.13</version>
        </dependency>

        <!--使用fastjson解析json数据 -->
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.42</version>
        </dependency>

以下是我收集并使用的工具类,可以做一下参考。


import com.fasterxml.jackson.databind.ObjectMapper;
import com.pea.mic.domain.po.XxlJobInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;

@Slf4j
@Component
public class XxlJobRemoteApiUtils {

    private static String adminAddresses;

    private static String appname;

    private static String accessToken;

    private final static RestTemplate restTemplate = new RestTemplate();

    private static final String ADD_URL = "/jobinfo/add";
    private static final String UPDATE_URL = "/jobinfo/update";
    private static final String REMOVE_URL = "/jobinfo/remove";
    private static final String PAUSE_URL = "/jobinfo/pause";
    private static final String START_URL = "/jobinfo/start";

    @Autowired
    public void init(Environment env) {
        adminAddresses = env.getProperty("xxl.job.admin.addresses");
        appname = env.getProperty("xxl.job.executor.appname");
        accessToken = env.getProperty("xxl.job.admin.accessToken");
        log.info("xxl.job.admin.addresses:{}", adminAddresses);
        log.info("xxl.job.executor.appname:{}", appname);
        log.info("xxl.job.accessToken:{}", accessToken);
    }


    public static Map getJobInfoByLocalDateTime(LocalDateTime times,
                                                String author,
                                                String JobDesc,
                                                String taskHandler){
        String cron = CronUtils.toCronExpression(times);
        XxlJobInfo jobInfo = new XxlJobInfo();
        jobInfo.setJobGroup(2);
        jobInfo.setJobDesc(JobDesc);
        jobInfo.setAuthor(author);
        jobInfo.setScheduleType("CRON");
        jobInfo.setMisfireStrategy("DO_NOTHING");
        //执行时间
        jobInfo.setScheduleConf(cron);
        jobInfo.setGlueType("BEAN");
        jobInfo.setExecutorHandler(taskHandler);
        jobInfo.setExecutorRouteStrategy("FIRST");
        jobInfo.setExecutorBlockStrategy("SERIAL_EXECUTION");
        jobInfo.setExecutorTimeout(0);
        jobInfo.setExecutorFailRetryCount(0);
        jobInfo.setGlueType("BEAN");
        jobInfo.setGlueRemark("GLUE代码初始化");
        jobInfo.setTriggerStatus(0);
        jobInfo.setTriggerLastTime(0);
        jobInfo.setTriggerNextTime(0);
        ObjectMapper objectMapper = new ObjectMapper();
        Map map = objectMapper.convertValue(jobInfo, Map.class);
        return map;
    }

    public static String add(Map param){
        return doPost(adminAddresses + ADD_URL, param);
    }

    public static String update(String id, String cron){
        Map param = new HashMap<>();
        param.put("id", id);
        param.put("jobCron", cron);
        return doPost(adminAddresses + UPDATE_URL, param);
    }

    public static String remove(String id){
        Map param = new HashMap<>();
        param.put("id", id);
        return doGet(adminAddresses + REMOVE_URL, param);
    }

    public static String pause(String id){
        Map param = new HashMap<>();
        param.put("id", id);
        return doGet(adminAddresses + PAUSE_URL, param);
    }

    public static String start(String id){
        Map param = new HashMap<>();
        param.put("id", id);
        return doGet(adminAddresses + START_URL, param);
    }

    public static String doPost(String url, Map fromData){
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        headers.add("accessToken", accessToken);
        HttpEntity<Map> entity = new HttpEntity<>(fromData ,headers);
        log.info(entity.toString());
        ResponseEntity<String> stringResponseEntity = restTemplate.postForEntity(url, entity, String.class);
        return stringResponseEntity.getBody().toString();
    }

    public static String doGet(String url, Map<String, String> params) {
        // 创建可关闭的HttpClient,使用try-with-resources确保资源自动关闭
        try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
            // 使用URIBuilder来构建带参数的URL
            URIBuilder uriBuilder = new URIBuilder(url);

            // 将Map中的参数添加到URL中
            if (params != null) {
                for (Map.Entry<String, String> entry : params.entrySet()) {
                    uriBuilder.setParameter(entry.getKey(), entry.getValue());
                }
            }

            // 创建GET请求对象
            HttpGet httpGet = new HttpGet(uriBuilder.build());
            httpGet.setHeader("accessToken", accessToken);
            // 设置请求配置(超时等)
//            httpGet.setConfig(buildRequestConfig());

            // 执行请求并获取响应
            try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
                // 判断响应状态码是否为200(成功)
                if (response.getStatusLine().getStatusCode() == 200) {
                    // 获取响应内容并转换为字符串
                    String result = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
                    log.info("GET Response body: {}", result);
                    return result;
                }
            }
        } catch (Exception e) {
            log.error("发送GET请求出错: ", e);
        }
        return null;
    }
}

允许远程调用

通过二次开发xxl-job-admin,允许远程调用,修改方法如下:

通过发送请求并,走的流程,可以直到调用接口的时候是通过请求参数中的XXL_JOB_LOGIN_IDENTITY 进行校验,我们可以通过当我们发送的请求参数,直接获取即可cookieToken,即可。

PS:由于博主已经实验成功过了,具体方法,大家可以参考参考即可。博主就不走结果啦。

----------------------------持续更新中----------------------------


http://www.kler.cn/a/585002.html

相关文章:

  • Vue系统学习day01
  • 258.反转字符串中的单词
  • 【每日学点HarmonyOS Next知识】span问题、组件标识属性、属性动画回调、图文混排、相对布局问题
  • Linux 部署Java应用程序
  • OpenCV实现图像分割与无缝合并
  • FORTRAN语言的数据结构
  • GStreamer —— 2.17、Windows下Qt加载GStreamer库后运行 - “播放教程 5:色彩平衡“(附:完整源码)
  • FastJSON与Java序列化:数据处理与转换的关键技术
  • Python爬虫实战:基于 Scrapy 框架的腾讯视频数据采集研究
  • 『Rust』Rust运行环境搭建
  • Linux笔记之通配符和正则表达式的区别
  • cocos creator 3.8如何在代码中打印drawcall,fps
  • Matlab 灰度质心法提取条纹中心线
  • Git的详细使用方法
  • 基于stm32的视觉物流机器人
  • 智慧城市新基建!图扑智慧路灯,点亮未来城市生活!
  • AWS云编排详解-Cloud Formation
  • 一文讲清楚CUDA与PyTorch、GPU之间的关系
  • Gemini Robotics:Google DeepMind 让 AI 机器人真正“动”起来!
  • 深度学习——Diffusion Model学习,扩散模型