当前位置: 首页 > article >正文

【kafka实践】11|消费位移提交

消费者位移

消费者位移这一节介绍了消费者位移的基本概念和消息格式,本节我们来聊聊消费位移的提交。

Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。

提交位移主要是为了记录Consumer 的消费进度,这样当 Consumer 发生重启之后,就能够从 Kafka 中读取之前提交的位移,从而继续消费,避免以避免重复消费,或消息丢失等。换句话说,位移提交是 Kafka 提供给你的一个工具或语义保障,你负责维持这个语义保障,即如果你提交了位移 X,那么 Kafka 会认为所有位移值小于 X 的消息你都已经成功消费了。

因为位移提交非常灵活,你完全可以提交任何位移值。假设你的 Consumer 消费了 10 条消息,你提交的位移值却是 20,那么从理论上讲就丢失了10条数据;相反地,如果你提交的位移值是 5,那么就重复消费5条数据。所以你对位移提交的管理直接影响了你的 Consumer 所能提供的消息语义保障。

位移提交

从使用角度来说位移提交分为自动提交和手动提交;从 Consumer 的角度来说,位移提交分为同步提交和异步提交。

自动提交

默认情况下就是自动提交,你根本无需关心位移提交的事情,Consumer 端有个参数 enable.auto.commit默认值是 true,即 Consumer 默认自动提交位移的。还有个参数auto.commit.interval.ms,默认值是 5 秒,即每 5 秒会为你自动提交一次位移。

这里我们用一段简单的代码来看看这两个参数怎么使用

Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "kafka_test");
     // 自动提交
     props.put("enable.auto.commit", "true");
     // 间隔2秒  
     props.put("auto.commit.interval.ms", "2000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("topic"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records) {
            // process
         }
             
     }
手动提交

设置 enable.auto.commit 为 false,还需要调用相应的 API 手动提交位移,KafkaConsumer.commitSync()。

// props.put("enable.auto.commit", "false");
while (true) {
            ConsumerRecords<String, String> records =
                        consumer.poll(Duration.ofSeconds(1));
            // 处理消息
            process(records); 
            try {
                        // 同步提交
                        consumer.commitSync();
            } catch (CommitFailedException e) {
                        handle(e); // 处理提交失败异常
            }
}

commitSync()有一个缺陷,提交时Consumer 程序会处于阻塞状态,在生产系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。虽然也可以选择拉长提交间隔,但这样做的后果是 Consumer 的提交频率下降,在下次 Consumer 重启回来后,会有更多的消息被重新消费。鉴于这个问题,Kafka 提供了另一个 异步API 方法:KafkaConsumer.commitAsync()。

不过commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。

我们可以将 commitSync 和 commitAsync 组合使用以规避这样的问题:

   try {
           while(true) {
                        ConsumerRecords<String, String> records = 
                                    consumer.poll(Duration.ofSeconds(1));
                        process(records); 
                        // 异步提交规避阻塞
                        commitAysnc(); 
            }
} catch(Exception e) {
            
} finally {
            try {
                        // 使用同步阻塞式提交兜底
                        consumer.commitSync(); 
  } finally {
       consumer.close();
}
}

同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后,我们既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性,如果你自行编写代码开发一套 Kafka Consumer 应用,可以尝试使用上面的代码范例来实现手动的位移提交。

其实还有一种更高级的提交方式,就是分批量提交,就不再这里展开,留给大家查资料学习,也欢迎各位同学在评论区交流讨论!


http://www.kler.cn/a/154493.html

相关文章:

  • WebRTC视频 03 - 视频采集类 VideoCaptureDS 上篇
  • 以太坊基础知识结构详解
  • day-83 最少翻转次数使二进制矩阵回文 II
  • 深入理解Flutter生命周期函数之StatefulWidget(一)
  • 丑数动态规划
  • 【链路层】空口数据包详解(4):数据物理通道协议数据单元(PDU)
  • vue之mixin混入
  • 005、简单页面-容器组件
  • IDC MarketScape2023年分布式数据库报告:OceanBase位列“领导者”类别,产品能力突出
  • MySQL- CRUD-单表查询
  • Redis集群详解
  • gRPC Java、Go、PHP使用例子
  • 我爱上这38个酷炫的数据大屏(附 Python 源码)
  • ⭐ Unity 里让 Shader 动画在 Scene 面板被持续刷新
  • spring boot定时器实现定时同步数据
  • 深入理解Java中继承的高级使用方案
  • 不可抗力因素包括什么内容
  • 编译ubuntu kernel
  • 软件工程 - 第8章 面向对象建模 - 4 - 物理体系结构建模
  • 【Android知识笔记】性能优化专题(五)
  • php 中生成订单号
  • Adobe Bridge——牵线搭桥
  • 【ArcGIS Pro微课1000例】0040:ArcGIS Pro创建北极点、南极点
  • pandas基础操作2
  • Jmeter分布式压测
  • 基于PAM自定义ssh登陆认证