当前位置: 首页 > article >正文

分布式任务调度

今天我们讲讲分布式定时任务调度—ElasticJob。

一、概述

1、什么是分布式任务调度

我们可以思考⼀下下⾯业务场景的解决⽅案:

  • 某电商平台需要每天上午10点,下午3点,晚上8点发放⼀批优惠券

  • 某银⾏系统需要在信⽤卡到期还款⽇的前三天进⾏短信提醒

  • 某财务系统需要在每天凌晨0:10分结算前⼀天的财务数据,统计汇总

以上场景就是任务调度所需要解决的问题

任务调度是为了⾃动完成特定任务,在约定的特定时刻去执⾏任务的过程

2、为什么需要分布式调度

我们在之前使⽤过Spring中提供的定时任务注解@Scheduled,在业务类中⽅法中贴上这个注解然后在启动类上贴上 @EnableScheduling 注解

那为什么又需要分布式调度?

感觉Spring给我们提供的这个注解可以完成任务调度的功能,好像已经完美解决问题了,为什么还需要分布式呢?

主要有如下这⼏点原因:

1.单机处理极限:原本1分钟内需要处理1万个订单,但是现在需要1分钟内处理10万个订单;原来⼀个统计需要1⼩时,现在业务⽅需要10分钟就统计出来。你也许会说,你也可以多线程、单机多进程处理。的确,多线程并⾏处理可以提⾼单位时间的处理效率,但是单机能⼒毕竟有限(主要是CPU、内存和磁盘),始终会有单机处理不过来的情况。

2.⾼可⽤:单机版的定式任务调度只能在⼀台机器上运⾏,如果程序或者系统出现异常就会导致功能不可⽤。虽然可以在单机程序实现的⾜够稳定,但始终有机会遇到⾮程序引起的故障,⽽这个对于⼀个系统的核⼼功能来说是不可接受的。

3.防⽌重复执⾏: 在单机模式下,定时任务是没什么问题的。但当我们部署了多台服务,同时⼜每台服务⼜有定时任务时,可能会出现任务重复执行

这个时候就需要分布式的任务调度来实现了。

3、Elastic-Job介绍

Elastic-Job是⼀个分布式调度的解决⽅案,由当当⽹开源,它由两个相互独⽴的⼦项⽬Elastic-job-Lite和

Elastic-Job-Cloud组成,使⽤Elastic-Job可以快速实现分布式任务调度。

Elastic-Job的地址: ElasticJob - Distributed scheduled job solution

功能列表:

  • 分布式调度协调

在分布式环境中,任务能够按照指定的调度策略执⾏,并且能够避免同⼀任务多实例重复执⾏。

  • 丰富的调度策略:

基于成熟的定时任务作业框架Quartz cron表达式执⾏定时任务。

  • 弹性拓容缩容

当集群中增加⼀个实例,它应当能够被选举被执⾏任务;当集群减少⼀个实例时,他所执⾏的任务能被转移到别的示例中执⾏。

  • 失效转移

某示例在任务执⾏失败后,会被转移到其他实例执⾏。

  • 错过执⾏任务重触发

若因某种原因导致作业错过执⾏,⾃动记录错误执⾏的作业,并在下次次作业完成后⾃动触发。

  • ⽀持并⾏调度

⽀持任务分⽚,任务分⽚是指将⼀个任务分成多个⼩任务在多个实例同时执⾏。

  • 作业分⽚⼀致性

当任务被分⽚后,保证同⼀分⽚在分布式环境中仅⼀个执⾏实例。

  • ⽀持作业⽣命周期操作

可以动态对任务进⾏开启及停⽌操作。

  • 丰富的作业类型

⽀持Simple、DataFlow、Script三种作业类型

系统运行架构图

二、Zookeeper下载

建议:zookeeper3.4.6以上版本,JDK1.7以上,maven在3.0.4以上

我这里将Zookeeper安装到了Linux系统上,以我自己安装为例,可自行选择。

1.上传,将zookeeper-3.4.11.tar.gz上传到/usr/local/src/soft/zookeeper目录下

2.解压文件到指定目录

tar -zxvf /usr/local/src/soft/zookeeper-3.4.11.tar.gz -C /usr/local/src/soft/zookeeper

3.拷贝配置文件

cp /usr/local/src/soft/zookeeper/zookeeper-3.4.11/conf/zoo_sample.cfg /usr/local/src/soft/zookeeper/zookeeper-3.4.11/conf/zoo.cfg

4.启动

/usr/local/src/soft/zookeeper/apache-zookeeper-3.5.6-bin/bin/zkServer.sh start

5.检查进程是否开启

这个命令不一定有效

jps

可以试试这个命令,查看状态

/usr/local/src/soft/zookeeper/apache-zookeeper-3.5.6-bin/bin/zkServer.sh status

三、任务分片参数

1、分片的概念

作业分片是指任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的应用实例分别执行某一个或者几个分布项。

例如:Elastic-job快速入门中文件备份的案例,现有两台服务器,每台服务器分别跑一个应用实例。为了快速执行作业,那么可以讲任务分成4片,每个应用实例都执行两片。作业遍历数据逻辑应为:实例1查找text和image类型文件执行备份,实例2査找radio和vedio类型文件执行备份。如果由于服务器拓容应用实例数量增加为4,则作业谝历数据的逻辑应为: 4个实例分别处理text,image,radio,video类型的文件。

可以看到,通过对任务的合理分片化,从而达到任务并行处理的效果.

  • 当只有一台机器时,给定时任务分片四个,在机器中启动四个线程,分别处理四个分片的内容

  • 当只有两台机器时,分片由两台机器进行分配,A负责索引为0,1的内容,B负责索引为2,3的内容

  • 三台机器,如图

  • 四台机器,平均分摊

集群之后,可以分摊CPU的处理压力,提高数据处理的速度,那到底几台机器好呢?

分片数建议是机器个数的倍数

在秒杀项目中,我们将秒杀商品的场次分成了10、12、14三个场次。在这里我们就根据场次将其分成三片

2、分片分配机制

ElasticJob的分片分配机制

  • Zookeeper 协调:ElasticJob 通过 Zookeeper 协调任务实例的分片分配。每个任务实例启动时,会向 Zookeeper 注册自己,并获取分配给自己的分片信息。

  • 动态分片分配:如果任务实例的数量发生变化(比如新增或减少实例),Zookeeper 会重新分配分片,确保每个分片都有任务实例处理。

  • 分片参数传递:分片参数通过 ShardingContext 传递给任务实例,任务实例根据分片参数执行对应的逻辑。

ElasticJob 通过 Zookeeper 协调,将分片分配给不同的任务实例。每个任务实例启动时,会从 Zookeeper 获取分配给自己的分片信息(分片编号和分片参数)。

三、项目集成

1、依赖添加

<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>2.1.5</version>
</dependency>
<!--zookeeper客户端-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.10.0</version>
</dependency>

2、分布式调度配置

1)注册中心配置

获取zk的地址和任务名称,将任务注册到zk注册中心中

@Configuration
public class RegistryCenterConfig {
    @Bean(initMethod = "init")
    public CoordinatorRegistryCenter createRegistryCenter(@Value("${elasticjob.zookeeper-url}") String zookeeperUrl, @Value("${elasticjob.group-name}") String groupName) {
        //zk的配置
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(zookeeperUrl,groupName);
        //设置zk超时时间
        zookeeperConfiguration.setSessionTimeoutMilliseconds(100);
        //创建注册中心
        CoordinatorRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
        return zookeeperRegistryCenter;
    }
}
2)分布式调度参数配置
#分布式定时任务配置
elasticjob:
  zookeeper-url: 192.168.88.130:2181
  group-name: shop-job-group
  jobCron:
    #3分钟执行一次
    seckillProduct: 0 0/3 * * * ?
3)定时任务配置

需要使用定时任务的服务可能不止一个,不同的定时任务,表达式和分片参数、个数都不一样

将不同的定时任务创建不同的LiteJobConfiguration对象,指定不同的参数类型,将其创建为一个Bean,交给spring容器管理

@Configuration
public class BusinessJobConfig {
@Bean(initMethod = "init")
    public SpringJobScheduler initSPJob(CoordinatorRegistryCenter registryCenter, SeckillProductJob seckillProductJob){
        LiteJobConfiguration jobConfiguration = ElasticJobUtil.createJobConfiguration(
                seckillProductJob.getClass(),
                seckillProductJob.getCron(),//任务类的cron表达式
                seckillProductJob.getShardingTotalCount(), //分片个数
                seckillProductJob.getShardingParameters(), //分片参数
                seckillProductJob.isDataflowType());//不是dataflow类型
        SpringJobScheduler springJobScheduler = new SpringJobScheduler(seckillProductJob, registryCenter,jobConfiguration );
        return springJobScheduler;
    }
}    
4)分布式调度工具类

主要是用于创建LiteJobConfiguration对象,为不同的定时任务定义不同的配置类型

  • 指定定时任务类

  • 任务类的cron表达式

  • 分片个数

  • 分片参数

  • 是否为dataflow类型

public class ElasticJobUtil {
    public static LiteJobConfiguration createJobConfiguration(final Class<? extends SimpleJob> jobClass,final String cron,final int shardingTotalCount,
final String shardingItemParameters,boolean dataflowType) {
        // 定义作业核心配置
        JobCoreConfiguration.Builder jobCoreConfigurationBuilder = JobCoreConfiguration.newBuilder(jobClass.getSimpleName(), cron, shardingTotalCount);
        if(!StringUtils.isEmpty(shardingItemParameters)){
            jobCoreConfigurationBuilder.shardingItemParameters(shardingItemParameters);
        }
        JobTypeConfiguration jobConfig = null;
        if(dataflowType){
            jobConfig = new DataflowJobConfiguration(jobCoreConfigurationBuilder.build(),jobClass.getCanonicalName(),true);
        }else {
            // 定义SIMPLE类型配置
            jobConfig = new SimpleJobConfiguration(jobCoreConfigurationBuilder.build(), jobClass.getCanonicalName());
        }
        // 定义Lite作业根配置
        LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(jobConfig).overwrite(true).build();
        return simpleJobRootConfig;
    }
    public static LiteJobConfiguration createDefaultSimpleJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron) {
        // 创建默认的SIMPLE类型作业配置
        return createJobConfiguration(jobClass,cron,1,null,false);
    }
    public static LiteJobConfiguration createDefaultDataFlowJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron) {
        // 创建默认的DataFlow类型作业配置
        return createJobConfiguration(jobClass,cron,1,null,true);
    }
}
5)任务分片参数配置
  1. 删除之前的数据

  2. 查询当天的数据同步Redis,定时任务每天执行一次

  3. 给定时任务分片处理,分三片

给定时任务做分片处理:0=10,1=12,2=14(第10场秒杀任务,第12场.........)

jobSharding:
  seckillProduct:
    shardingParameters: 0=10,1=12,2=14
    shardingTotalCount: 3
    dataflowType: false

3、定时任务类

初始化秒杀商品定时任务

@Data
@RefreshScope
@Component
public class SeckillProductJob implements SimpleJob {

    //表达式
    @Value("${elasticjob.jobCron.seckillProduct}")
    private String cron;
    //分片参数
    @Value("${jobSharding.seckillProduct.shardingParameters}")
    private String shardingParameters;
    //分片个数
    @Value("${jobSharding.seckillProduct.shardingTotalCount}")
    private int shardingTotalCount;

    @Value("${jobSharding.seckillProduct.dataflowType}")
    private boolean dataflowType;

    @Resource
    private SeckillProductFeignApi seckillProductFeignApi;

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public void execute(ShardingContext shardingContext) {
        String time = shardingContext.getShardingParameter();
        //远程调用商品服务获取秒杀列表集合
        Result<List<SeckillProductVo>> result = seckillProductFeignApi.queryByTime(Integer.valueOf(time));
        if (result==null||result.hasError()) {
            //通知管理员
            return;
        }
        List<SeckillProductVo> seckillProductVoList = result.getData();
        //获取秒杀商品key-seckillProductHash:10
        String key = JobRedisKey.SECKILL_PRODUCT_HASH.getRealKey(time);
       
        //删除之前的缓存
        redisTemplate.delete(key);

        HashMap<String, String> seckillProductMap = new HashMap<>();
        //存储集合数据到Redis中
        for (SeckillProductVo vo : seckillProductVoList) {
            seckillProductMap.put(vo.getId().toString(), JSON.toJSONString(vo));
        }
        redisTemplate.opsForHash().putAll(key, seckillProductMap);
        System.out.println("分布式商品秒杀任务执行...............");
    }
}

秒杀列表缓存成功

4、任务分片处理逻辑

分片参数设置为 0=10,1=12,2=14,表示分片0处理时间参数为10的任务,分片1处理时间参数为12的任务,分片2处理时间参数为14的任务。ElasticJob 会根据分片参数将任务分片,并将每个分片分配给不同的任务实例执行。

  • 分片总数(ShardingTotalCount):表示任务的总分片数。比如你有3场秒杀活动,可以将分片总数设置为3,每个分片处理一场秒杀活动。

  • 分片参数(ShardingParameters):可以为每个分片指定参数,比如分片0处理第一场秒杀,分片1处理第二场秒杀,分片2处理第三场秒杀。

ElasticJob 通过 Zookeeper 协调,将分片分配给不同的任务实例。每个任务实例启动时,会从 Zookeeper 获取分配给自己的分片信息(分片编号和分片参数)。

今天的分享结束,感兴趣的兄弟请点赞、收藏,关注我不迷路!下期再见!

原文地址:https://blog.csdn.net/weixin_65935065/article/details/146406965
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/594873.html

相关文章:

  • 【MySQL速成指南】数据库定义语言(DDL)详解:从建库到改表!
  • c++简单实现redis
  • 华为参访预约,团队考察体验黑科技之旅
  • PostgreSQL 存储过程
  • 如何在云端平台上建立 30,000 名用户的网页 MMO游戏环境-2 (服务器)
  • 基于javaweb的SpringBoot成绩管理系统设计与实现(源码+文档+部署讲解)
  • 大数据学习(77)-Hive详解
  • C#/.NET/.NET Core技术前沿周刊 | 第 30 期(2025年3.10-3.16)
  • 实时监控、数据分析!Web-Check构建你的网站健康检测系统实操方案
  • 如何将外网 Git 仓库完整迁移到本地仓库并保留提交历史(附原理详解)
  • 群体智能优化算法-斑马优化算法 (Zebra Optimization Algorithm, ZOA,含Matlab源代码)
  • 嵌入式硬件篇---WIFI模块
  • 汇编代码中嵌入回调函数的优化说明
  • ZMC600E,多核异构如何成就机器人精准控制?
  • 【数学建模】模糊综合评价模型详解、模糊集合论简介
  • 10-- 网络攻击防御原理全景解析 | 从单包攻防到DDoS军团作战(包你看一遍全记住)
  • DeDeCMS靶场
  • Github 2025-03-20 Go开源项目日报Top10
  • 【RHCE实验】搭建主从DNS、WEB等服务器
  • 快速入手-基于Django的mysql配置(三)