当前位置: 首页 > article >正文

kafka 消息位移提交几种方式:消息重复消息、消息丢失的关键

消费位移

Kafka 中的位移(offset)是用来记录消息在分区中的位置的标志,简单说就是记录消费者的消费进度,每次消息消费后需要更新消费进度,也就是位移提交

由此可见一旦位移提交发生异常,会导致消费进度不正确,就必然发生消息丢失或者重复消费

消息位移存储内部主题__consumer_offsets消息消费后需要执行位移的提交

消息位移提交几种方式

自动提交

enable.auto.commit 配置为true 默认每5s 提交一次 (auto.commit.interval.ms)拉取消息之前也会检查 是否可以进行位移提交

消息重复消费例子

消费者拉取了一批消息,消费后,消费位移自动提交前应用崩溃了,下次应用恢复,又从上次位移提交的地方消费

通过减小位移提交的时间间隔,能减少消息重复消费的可能,但会使消息位移提交频繁

消息丢失例子

消费者拉取到消息,此时消息位移刚好自动提交,但消息还没来及处理,然后应用崩溃了,下次应该恢复了,由于位移已经提交, 未处理的几条消息,就丢失了。

除了极端情况下消息可能存在丢失或重复消费,重复消息业务可以通过幂等性保证, 但消息丢失是可怕的,我们甚至都不知道

手动提交

对于业务来说消息拉取后,正确处理完才算消费了,自动提交可以更加灵活精准控制消息位移的提交

使用方式 设置enable.auto.commit 配置为false

同步提交

它会阻塞当前线程,直到提交成功或发生错误。同步提交确保位移提交的可靠性,但会增加延迟。

ConsumerRecords<String,String> records =    kafkaConsumer.poll(Duration.ofMillis(1000));

for(ConsumerRecord<String,String> record:records){
// 消息处理逻辑
}

kafkaConsumer.commitSync();

异步提交

它不会阻塞当前线程,提交过程在后台进行。异步提交提高了性能,但需要处理可能的提交失败情况。

kafkaConsumer.commitASync()

KafkaConsumer API 还为手动提交提供了带参数的方法

commitSync(Map<TopicPartition, OffsetAndMetadata>;

commitAsync(Map<TopicPartition, OffsetAndMetadata>)

总结

一般情况我们消息位移自动提交就可以满足我们大部分场景,当然也有场景需要控制消息位移提交,需要我们在可靠性与性能之间做取舍,自动位移提交代码稍微复杂点,需要处理好位移提交失败的情况。


http://www.kler.cn/a/315501.html

相关文章:

  • 论文翻译 | The Capacity for Moral Self-Correction in Large Language Models
  • 【自用】0-1背包问题与完全背包问题的Java实现
  • Jetpack 之 Ink API初探
  • 豆瓣均分9:不容错过的9本大模型入门宝藏书籍,非常详细收藏我这一篇就够了
  • 树形dp总结
  • Python数据类型(一):bool布尔类型
  • Docker_基础初识
  • 新能源汽车知识点集萃
  • Python办公自动化教程(003):PDF的加密
  • HarmonyOS Next开发----使用XComponent自定义绘制
  • 【乐企-工具篇】有关乐企发票文件生成- OFD和PDF文件生成
  • 四、JVM原理-4.1、JVM介绍
  • vue中 <template> 与 <template lang=“jade“>的对比,哪个性能好
  • 数据结构之希尔排序
  • 轻代码的概念学习笔记
  • http和https的区别及get和post请求的区别
  • Vue3新组件transition(动画过渡)
  • Java API 之集合框架进阶
  • 软件测试面试题(5)——二面(游戏测试)
  • 【PLW003】设备器材云端管理平台v1.0(SpringBoot+Mybatis+NodeJS+MySQL前后端分离)
  • LeetCode题练习与总结:回文链表--234
  • [JavaEE]———进程、进程的数据结构、进程的调度
  • 【优选算法之二分查找】No.5--- 经典二分查找算法
  • Linux之实战命令03:stat应用实例(三十七)
  • 如何使用 maxwell 同步到 redis?
  • 如何在 CentOS 中管理用户、组和服务状态