当前位置: 首页 > article >正文

kafka入门(二): 位移提交

位移提交:

Kafka的每条消息都有唯一的 offset, 用来表示消息在分区中对应的位置。有的也称之为 “偏移量”。

消费者每次在 poll() 拉取消息,它要返回的是还没有消费过的消息集,

因此,需要记录上一次消费时的消费位移,并且持久化。

消费者在消费完消息之后,需要执行消费位移的提交。

自动位移提交:

Kafka默认的消费位移的提交方式是 自动提交。

自动提交,由消费者客户端参数 enable.auto.commit 配置,默认值是 true。

默认的自动提交,是定期提交,提交的周期由 auto.commit.interval.ms 配置,默认是 5s。

自动位移提交,有可能会重复消费和消息丢失。

假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那又得从上一次位移提交的地方重新开始消费,这样就会重复消费。

手动位移提交:

手动位移提交,由消费者客户端参数 enable.auto.commit 配置, 设置为 false 就是手动位移提交。

手动位移提交,可以分为 同步提交、异步提交。

commitSync() 同步提交

同步提交,会阻塞消费者线程直到位移提交完成。

示例代码:

public class OffsetCommitSync {
    public static final String BROKER_LIST = "localhost:9092";
    public static final String TOPIC = "myTopic1";
    public static final String GROUP_ID = "group.demo";

    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //消费者订阅主题
        consumer.subscribe(Collections.singletonList(TOPIC));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                //do something
            }
            //手动提交位移
            consumer.commitSync();
            System.out.println("手动提交位移成功.");
        }

    }


    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //不自动提交,采用手动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return props;
    }


}

commitAsync() 异步提交 :

异步提交,在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交,可以使消费者的性能得到一定的增强。

异步提交,将 consumer.commitSync(); 换成 commitAsync。

如果还需要回调,就用 OffsetCommitCallback对象作为参数。

示例如下:

public class OffsetCommitAsyncCallback {
    public static final String BROKER_LIST = "localhost:9092";
    public static final String TOPIC = "myTopic1";
    public static final String GROUP_ID = "group.demo";


    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));


        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                //do something
            }
            //异步回调,如果不需要回调,就采用无参的方法
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
                                       Exception exception) {
                    if (exception == null) {
                        System.out.println(offsets);
                    } else {
                        log.error("fail to commit offsets {}", offsets, exception);
                    }
                }
            });
        }


    }


    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return props;
    }

}

参考资料:

《深入理解kafka:核心设计与实践原理》


http://www.kler.cn/a/147084.html

相关文章:

  • 【分布式技术】分布式缓存技术-旁路缓存模式(Cache Aside Pattern)
  • Vue3 -- 项目配置之eslint【企业级项目配置保姆级教程1】
  • 深度学习神经网络创新点方向
  • 代码段数据段的划分
  • 蓝桥杯介绍
  • ASP.NET 部署到IIS,访问其它服务器的共享文件 密码设定
  • 3D点云目标检测:VoxelNex解读(带源码/未完)
  • 微软 Edge 浏览器目前无法支持 avif 格式
  • gitt开源项目的意义,公司为什么会对在gitt上有开源项目的人更大机会
  • 7 最大的以1为边界的正方形
  • Idea空白目录自动折叠的问题
  • 淘宝平台商品详情平台订单接入说明
  • Python实现FA萤火虫优化算法优化循环神经网络分类模型(LSTM分类算法)项目实战
  • 智慧工厂人员定位系统源码,融合位置物联网、GIS可视化等技术,实现对人员、物资精确定位管理
  • 【网络安全】-常见的网站攻击方式详解
  • Mysql的分库分表
  • 诺威信,浪潮云,微众区块链
  • 单片机学习4——中断的概念
  • 使用 Java 来读取 Excel 文件,检查每一行中的 URL,并将不符合条件的行标记为红色
  • 数据结构 / 顺序表操作 / 顺序表尾部删除
  • C语言进阶之笔试题详解(1)
  • 基于python的NBA球员数据可视化分析的设计与实现
  • EFCore乐观并发
  • uniapp高德、百度、腾讯地图配置 SHA1
  • C语言第三十四弹--矩形逆置
  • 小航助学题库蓝桥杯题库stem选拔赛(21年3月)(含题库教师学生账号)