当前位置: 首页 > article >正文

Kafka消息积压的典型场景及解决方案

Kafka消息积压的典型场景:

1.实时/消费任务挂掉

比如,我们写的实时应用因为某种原因挂掉了,并且这个任务没有被监控程序监控发现通知相关负责人,负责人又没有写自动拉起任务的脚本进行重启。
那么在我们重新启动这个实时应用进行消费之前,这段时间的消息就会被滞后处理,如果数据量很大,可就不是简单重启应用直接消费就能解决的。

>>>>  解决方案1: 

方案简述: 任务重新启动后直接消费最新的消息,对于"滞后"的历史数据采用离线程序进行"补漏"。

>>>>   具体操作1 :  消费者从最新的地方进行消费

  • 1、偏移量设置  默认位置是在 zookeeper 中设置的 
  • 2、设置消费者参数  auto.offset.reset  = latest 从最新消息开始消费
  • 3、给启动的消费组设置 动态的组ID=> 每次启动消费者重新生成一个随机的组ID保证从最新消息开始消费
  • 4、任务重启手动设置偏移量到最新的位置  。这可以通过查询Kafka的消费者API来完成。


>>>>  具体操作2 :   针对滞后的数据如何查漏补缺


Apache Kafka Connect:通过Kafka Connect可以快速实现大量数据进出Kafka,从而与其他源数据源或目标数据源进行交互。


Kafka ETL:这是一个开源的ETL工具,专门为处理Kafka数据而设计。它提供了一个简单易用的界面,使数据抽取、转换和加载变得更加容易。


Talend:这是一款功能强大的ETL工具,支持处理Kafka数据。它提供了可视化的界面,使您可以轻松地设计、部署和管理ETL作业。


Apache NiFi:这是一个用于数据集成和处理的开源软件框架,它提供了一个可视化的界面来设计ETL流程,并支持Kafka作为数据源和目标。


Logstash:这是一个开源的数据收集引擎,可以用来处理和转换Kafka数据。它提供了强大的过滤器和转换器,使您可以轻松地实现数据的抽取、转换和加载。

>>>>  解决方案2:(常用) 


方案简述: 创建新的topic并配置更多数量的分区,将积压消息的topic消费者逻辑改为直接把消息打入新的topic,将消费逻辑写在新的topic的消费者中。

-- 第一步 迁移topic数据  --> 历史数据迁移到新的Topic上
-- 第二部 使用新的消费者消费新的Topic 数据  

此方案不会缺失数据(新的消费者消费能力可以与原消费者保持一致)前提是允许延时处理一段时间才可以应用此方案


1. 创建新topic:首先,您需要创建一个新的topic。您可以使用Kafka提供的命令行工具或Kafka管理界面来创建新topic。确保新topic的partition数量与旧topic相同或更多。
2. 调整消费者配置:创建一个新的消费者,用于消费旧topic的数据。您需要设置消费者的配置,以便将数据从旧topic消费到新topic。这可以通过设置auto.offset.reset和topic.config参数来实现。
o auto.offset.reset参数用于指定消费者从哪个offset开始消费数据。您可以将其设置为earliest,以便从旧topic的开始处消费数据。
o topic.config参数可以用于设置新topic的配置。例如,您可以设置retention.ms参数来控制新topic的日志保留时间。
3. 复制数据:使用新消费者消费旧topic的数据,并将其写入新topic。确保在写入新topic时,消息的key和value与旧topic中的消息一致。这样,消息的顺序和偏移量将保持不变。
4. 验证数据:完成数据转移后,验证新topic中的数据是否与旧topic中的数据一致。您可以使用Kafka提供的工具或编写自定义脚本进行验证。
5. 删除旧topic:一旦您验证了新topic中的数据,并且确认可以删除旧topic,请将其删除以释放存储空间。请注意,在删除旧topic之前,确保所有数据都已成功迁移到新topic中。

2.Kafka分区数设置的不合理(太少)和消费者"消费能力"不足

Kafka单分区生产消息的速度qps通常很高,如果消费者因为某些原因(比如受业务逻辑复杂度影响,消费时间会有所不同),就会出现消费滞后的情况。
此外,Kafka分区数是Kafka并行度调优的最小单元,如果Kafka分区数设置的太少,会影响Kafka consumer消费的吞吐量。

>>>>   不同的情况分析及方案


1、kafka 消费能力不足: 
如果是Kafka消费能力不足,则可以考虑增加 topic 的 partition 的个数,同时提升消费者组的消费者数量,消费数 = 分区数 (二者缺一不可)
2、程序消费能力不足:
(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压。则提高每批次拉取的数量。
如果数据量很大,合理的增加Kafka分区数是关键。如果利用的是Spark流和Kafka direct approach方式,也可以对KafkaRDD进行repartition重分区,增加并行度处理。
3.Kafka消息的key不均匀,导致分区间数据不均衡

在使用Kafka producer消息时,可以为消息指定key,但是要求key要均匀,否则会出现Kafka分区间数据不均衡。可以在Kafka producer处,给key加随机后缀,使其均衡。

4、kafka数据有过期时间,一些数据就丢失了,主要是消费不及时 (消费不及时)

产生消息堆积,消费不及时,kafka数据有过期时间,一些数据就丢失了,主要是消费不及时
 
>>> 实战经验总结

  • 1、消费kafka消息时,应该尽量减少每次消费时间,可通过减少调用三方接口、读库等操作, 从而减少消息堆积的可能性。
  • 2、如果消息来不及消费,可以先存在数据库中,然后逐条消费  (还可以保存消费记录,方便定位问题)
  • 3、每次接受kafka消息时,先打印出日志,包括消息产生的时间戳。
  • 4、kafka消息保留时间(修改kafka配置文件, 默认一周)
  • 5、任务启动从上次提交offset处开始消费处理


 


http://www.kler.cn/a/283468.html

相关文章:

  • 火车车厢重排问题,C++详解
  • 微信小程序的主体文件和页面文件介绍
  • LeetCode 86.分隔链表
  • 《深度解析 C++中的弱引用(weak reference):打破循环依赖的利器》
  • 【真题笔记】21年系统架构设计师案例理论点总结
  • qt QVideoWidget详解
  • SSRF以及CSRF
  • Vue3.0项目实战(二)——大事件管理系统登录注册功能实现
  • 快讯 | Midjourney开拓硬件领域:苹果前经理加盟助力发展
  • 防御Nginx负载均衡中的拒绝服务攻击:策略与实践
  • OpenCV详细介绍
  • Eureka的生命周期管理:服务注册、续约与下线的完整流程解析
  • uniapp,uview:inputnumber或者input,当type为number的时候,在ios里输入不了小数的问题
  • 本地部署aniportrait
  • 【Redis】Redis 的消息队列 List、Streams—(六)
  • Golang测试func TestXX(t *testing.T)的使用
  • 【GPT】Coze使用开放平台接口-【2】创建工作流-语音伪造检测工作流
  • Golang | Leetcode Golang题解之第375题猜数字大小II
  • XSS漏洞
  • 数组、向量与矩阵
  • 【Unity优化】优化Android平台拖动地图表现
  • zabbix安装过程中仓库不可用问题解决
  • uniapp中H5网页怎么实现自动点击事件
  • 自动化测试经典面试题
  • 拿到一个新项目,如何开展测试?
  • 基于状态机实现WIFI模组物联网