当前位置: 首页 > article >正文

如何进行千万级别数据跑批优化

目录

  • 背景
  • 问题
  • 分析
  • 解决方案
    • 数据库问题
    • 分片广播
    • 分批获取
    • 事务控制
    • 充分利用服务器资源
    • MQ消费任务并行
    • 动态调整并发度
    • 失败任务如何继续
    • 下游接口时间
    • 线程安全
    • 异常 & 监控
  • 总结

在这里插入图片描述

背景

定义:跑批是指在特定日期对大量数据进行定时处理的过程。在金融领域,常见的跑批场景包括分户日结、账务计提、账单逾期处理和不良资产处理等。跑批具有高度的连贯性,通常在执行完跑批后,还需要对数据进行进一步处理,如发送消息队列(MQ)给下游系统,或由数据仓库进行分析等。

问题

在处理大数据量的跑批任务时,会遇到许多挑战,包括:

  • OOM(内存溢出):如果在查询跑批数据时未进行分片处理,随着业务的发展,数据量增大,很容易导致内存溢出.
  • 未进行批量处理:在业务处理过程中,如果没有采用批量处理的方式,会导致处理时间过长,并且频繁的IO操作也会成为问题.
  • 避免大事务:直接使用@Transactional注解来覆盖所有业务是不可取的,这会导致问题定位困难,并且会延长方法的处理时间.
  • 下游接口的承受能力:在设计跑批任务时,还需要考虑下游系统的承载能力。如果大量数据分批发送,而下游系统没有足够的能力进行处理,可能会引发灾难性的问题.
  • 任务时间上的隔离:通常在大数据量跑批任务之后,还会有其他业务处理任务。因此,需要严格控制跑批任务的时间和健壮性,以避免对后续业务产生影响.
  • 失败任务补偿:在分布式任务调度中,创建跑批任务后,会将任务拆分为多个子任务并发地发送到消息队列中,然后由线程池执行任务并调用远程接口。在这个过程中,任何步骤都可能出现问题,导致任务失败.

分析

通过对上述问题的总结,我们可以得出,在进行大数据量跑批任务时,代码设计需要具备以下素质:

  • 健壮性:跑批任务需要定时处理数据,不能因为其中一条数据出现异常就导致整批数据无法继续操作,因此必须具备健壮性.
  • 可靠性:针对异常数据,后续可以进行补偿处理,因此必须具备可靠性.
  • 隔离性:要避免干扰其他应用程序的正常运行.
  • 高性能:由于跑批任务通常需要处理大量数据,不能让处理时间过长,否则会挤压后续其他连贯性业务处理的时间,因此必须考虑其性能.

解决方案

数据库问题

  • 使用数据库扫表问题:遍历数据会对数据库产生很大的压力,处理速度也会越来越慢.
    • 解决方法:在每次查询时,携带上一次查询的极值,使分页查找的偏移量始终控制在0.

分片广播

  • 分片:在生产环境中,通常采用集群部署。如果一个跑批任务只在一个机器上运行,效率会很低。可以利用xxl-job的“分片广播”和“动态分片”功能.
    • 分布式调度幂等:分布式任务调度只能保证准时调用一个节点,而且通常具有失败重试功能。因此,任务幂等性是必要的,可以通过分布式锁来实现。简单起见,可以使用数据库,通过在任务表中插入一条唯一的任务记录,利用唯一键来防止重复调度.
      • 除了使用唯一键,还可以在记录中增加一个状态字段,使用乐观锁来更新状态。例如,初始状态为“初始化”,更新为“正在运行”的状态,如果更新失败,则说明其他节点已经在执行该任务.当然,分布式锁的实现方案有很多,如Redis、ZooKeeper等.
    • 集群分布式任务调度xxl-job:在执行器集群部署时,“分片广播”以执行器为维度进行分片。当任务路由策略选择“分片广播”时,一次任务调度将会广播触发对应集群中所有执行器执行一次任务,同时系统自动传递分片参数;可以根据分片参数开发分片任务.
      • 分片任务场景:例如,一个拥有10个执行器的集群来处理10万条数据,每台机器只需要处理1万条数据,耗时降低10倍.
      • 广播任务场景:广播执行器机器运行shell脚本、广播集群节点进行缓存更新等.

分批获取

  • 设置步长:分派到一个Pod负责的数据量也是庞大的,如果一次性查询出来,耗时过长容易导致超时。通常会引入步长的概念,例如,分派给Pod 1万条数据,可以将其划分为10次查询,每次查询1千条数据,从而避免数据库查询数据耗时过长.
  • 空间换时间:跑批可能会涉及到数据准备的过程,边循环跑批数据边查找所需的数据,涉及多个for嵌套的循环处理时,可以采用空间换时间的思想,将数据加载到内存中进行筛选查找,但要做好OOM防范措施,例如使用包装类来接收查找出来的数据等,毕竟内存不是无限大的.
  • 深分页:分批查询时,limit的偏移量越大,执行时间越长。例如,limit a, b会查询前a + b条数据,然后丢弃前a条数据,select *会查询所有的列,也会有回表操作。可以使用子查询优化SQL,先查出id后分页,尽量使用覆盖索引来优化.

事务控制

  • 这些操作自身是无法回滚的,这会导致数据不一致。可能RPC调用成功了,但本地事务回滚了,而RPC调用无法回滚.
  • 在事务中有远程调用,会拉长整个事务,导致事务的数据库连接一直被占用,从而可能导致数据库连接池耗尽或者单个连接超时。因此,要熟悉调用链路,将事务粒度控制在最小范围内.

充分利用服务器资源

  • 需要充分利用服务器资源,采用多线程,MySQL的CPU在罚息期间也是低于50%、IOPS使用率低于50%.
  • 其实跑数据是IO密集型的,不需要非得压榨服务器资源.

MQ消费任务并行

  • MQ消费消息队列的消息时,要在每个节点上同时运行多个子任务,才能最大化资源利用。那么就使用到线程池了,如果选择的是Kafka或者RocketMQ,他们的客户端本来就是线程池消费的,只需要合理调整客户端参数就可以了。如果使用的是Redis,那就需要自己创建一个线程池,然后让一个EventLoop线程从Redis队列中取任务,放入线程池中运行,因为我们已经使用Redis队列做缓冲,所以线程池的队列长度设为0,这里直接使用JDK提供的SynchronousQueue。(这里以Java为例)

动态调整并发度

  • 跑批任务中能动态调整速度是很重要的,有两个地方可以进行操作:
    • 任务中调用远程接口:这个速度控制其实用Thread.sleep()就好了.
    • 控制任务并发度:就是有多少个线程同时运行任务。这个控制可以通过调整线程池的线程数来实现,但线程池动态调整线程数比较麻烦。动态调整可以通过开源的限流组件来实现,比如Guava的RateLimiter。可以在每次调用远程接口前调用限流组件来控制并发速度.

失败任务如何继续

  • 一般分布式调度路径:
    • 分布式任务调度创建跑批任务;
    • 拆分子任务多线程并发地发送到消息队列;
    • 线程池执行任务调用远程接口;
  • 在这个链条中,可能导致任务失败或者中止的原因无非下面几个:
    • 服务器Pod因为其他业务影响重启导致任务中止;
    • 任务消费过程中失败,达到最大的重试次数;
    • 业务逻辑不合理或者数据膨胀导致OOM;
    • 消费时调用远程接口超时(这个很多人专注自己的业务逻辑从而忽略第三方接口的调用).
  • 其实解决起来也简单,因为其他因素导致失败,你需要记录下任务的进度,然后在失败的点去再次重试.
    • 记录进度:我们需要知道这个任务执行到哪里了,同时也要记录更新的时间,这样才知道补偿哪里,例如进行跑批捞取时,要记录我们捞取的数据区间.
    • 任务重试:编写一个补偿式的任务(比如FixJob),定时地去扫描处在中间态的任务,如果扫描到就触发补偿机制,将这个任务改成待执行状态投入消息队列.

下游接口时间

  • 跑批最怕的就是上来就干,从不考虑涉及到第三方接口时的响应时间。如果不考虑第三方接口调用时间,那么在测试时会发现频繁的YGC,这是很致命的问题,属于你设计之外的事件,但也是你必须要考虑的.
  • 解决起来也简单,在业务可以容忍的情况下,可以将调用接口的业务逻辑设计一个中间态,然后挂起我们的这个业务,随后用定时任务去查询我们的业务结果,在收到信息后继续我们的业务逻辑,避免它一直在内存中堆积.

线程安全

  • 在进行跑批时,一般会采用多线程的方式进行处理,因此要考虑线程安全的问题,比如使用线程安全的容器,使用JUC包下的工具类.

异常 & 监控

  • 异常:要保证程序的健壮性,做好异常处理,不能因为一处报错,导致整个任务执行失败,对于异常的数据可以跳过,不影响其他数据的正常执行.
  • 监控:一般大数据量跑批是业务核心中的核心,一次异常就是很大的灾难,对业务的损伤不可预估,因此要配置相应的监控措施,在发生异常前及时察觉,进而做补偿措施.

总结

在处理大数据量的跑批任务时,需要综合考虑多个方面,包括数据库优化、分片广播、分批获取、事务控制、资源利用、MQ消费、并发度调整、失败任务处理、下游接口时间、线程安全以及异常和监控等。通过合理的设计和优化,可以提高跑批任务的健壮性、可靠性和性能,从而确保业务的顺利进行.


http://www.kler.cn/a/469779.html

相关文章:

  • javaEE-网络原理-1初识
  • [Linux]Mysql9.0.1服务端脱机安装配置教程(redhat)
  • css中的部分文字特性
  • opencv CV_TM_SQDIFF未定义标识符
  • axios和fetch的实现原理以及区别,与XMLHttpRequest的关系,并结合react封装统一请求示例
  • ESP32-C3 AT WiFi AP 启 TCP Server 被动接收模式 + BLE 共存
  • GitHub 基础使用指南
  • Go语言的 的数据封装(Data Encapsulation)核心知识
  • 深入了解 ES6 Map:用法与实践
  • Ruby语言的字符串处理
  • matlab中几个取整函数的区别
  • Django AWS负载均衡器管理系统
  • XXL-CRAWLER v1.4.0 | Java爬虫框架
  • stm32的掉电检测机制——PVD
  • spring mvc源码学习笔记之六
  • Git 部署
  • 【代码随想录】刷题记录(89)-分发糖果
  • 消防设施操作员考试题库及答案
  • springboot整合dubbo和nacos进行远程rpc调用案例
  • 密码学文献引用:CryptoBib + DBLP
  • Matlab仿真径向受压圆盘光弹图像
  • MES管理系统如何解决企业制造瓶颈
  • MYSQL----------------sql 优化
  • SpringBoot环境和Maven配置
  • 使用Oracle的Debian软件包在Linux上安装MySQL
  • Java AOP 请求拦截校验Token