当前位置: 首页 > article >正文

kafka 超详细的消息订阅与消息消费几种方式

kafka 消息订阅与消息消费几种方式

本文主要内容

  • 消费者订阅几种方式

    • 订阅多个主题

    • 按正则表达式订阅

  • 消息消费几种方式

    • 按分区消费

    • 按主题消费

    • 不区分

笔者建议一开始学习Kafka最好不要用SpringBoot 集成方式,因为SpringBoot推崇用注解方式,比如@KafkaListener 等,就可以直接消费,这样不能直接接触kafka-client一些api, 且SpringBoot 给我们提供了很多默认配置,我们几乎零配置也可以使用,实际上kafka很多配置很重要的,不容忽视。

消费者订阅几种方式

KafkaConsumer 给我们提供了几种订阅消息方式,我们可以订阅多个消息。示例代码如下

kafkaConsumer.subscribe(Arrays.asList("topicA","topicB"));
        
kafkaConsumer.subscribe(Pattern.compile("topic-*"));

kafkaConsumer.assign(Arrays.asList(new TopicPartition("topicA",0)));

订阅多个主题

void subscribe(Collection<String> topics) 对应上面第一行代码,这是最常见的订阅方式

按正则表达式订阅

void subscribe(Pattern pattern) 符合正则的主题都会被消费

有人创建了新的主题,并且与正则匹配,消费者也可以消费到

这种方式需要能对多种消息处理,对于一些能通用处理,不感知具体业务数据的场景比较合适。比如B系统需要同步A系统数据,我们按正则订阅,当A系统有新的数据需要同步,这是只需要A发满足条件正则的消息,B系统无需任何改动。

订阅指定分区

void assign(Collection<TopicPartition> partitions);正常业务不会使用,如果订阅的分区不存在,会报错。一些特殊场景,比如需要精确控制消费者消费消息,自定义分区分配策略时 可能会用到assign 方法



消息消费

kafka 采用客户端 拉取模式进行消息消费

poll() 返回所订阅的主题上一组消息ConsumerRecords ,我们可以对消息进行按主题、按分区进行处理,当然可以统一处理,不分主题和分区

ConsumerRecords<String,String> records =    kafkaConsumer.poll(Duration.ofMillis(1000));

不区分主题、分区

for(ConsumerRecord<String,String> record : records){
                        // 处理消息
}

按partition 处理

Set<TopicPartition> topicPartitions = records.partitions();
for(TopicPartition topicPartition : topicPartitions){
    List<ConsumerRecord<String, String>> tpRecords = records.records(topicPartition);
}

按主题

  Iterable<ConsumerRecord<String,String>> iter = records.records(topic);

思考

kafka 给我们提供了灵活的消息订阅以及消息消费方式,我们需要根据实际业务场景选择。无论哪种场景都离不开 主题分区 ,最主要的是分区,当我们选择了某种订阅方式如果主题分区  发生了变化 ,消息还能正常消费吗

选择了按正则订阅消息方式, 后面创建了新的主题,该消息能被正常消费吗

选择了指定分区订阅, 如果后面扩容了新的分区,新分区消息能消费吗?

List<PartitionInfo> partitionsFor(String topic) 能获取分区情况,如果需要按分区订阅,该方法一定用的上


按分区维度消费消息,对于手动提交消息位移场景非常有用

主题分类处理消息也很常见,因为不同主题消息格式可能是不一样的,根据主题区分,很容易将不同的消息分类处理。


http://www.kler.cn/news/311554.html

相关文章:

  • 【运维】自定义exporter
  • Redis——笔记01
  • 【PyQt5】object属性
  • Java中的异步编程模式:CompletableFuture与Reactive Programming的实战
  • 性格类型识别系统源码分享
  • DTD 实体
  • 【HTTP】HTTP报文格式和抓包
  • C++初阶:STL详解(五)——vector的模拟实现
  • 【JOIN 详解】SQL连接全面解析:从基础到实战
  • PostgreSQL主从切换测试
  • 使用BGP及静态路由方式实现链路冗余和ByPass
  • C:字符串函数(完)-学习笔记
  • 北斗盒子TD20——水上作业的安全防线,落水报警守护生命
  • React 中的延迟加载
  • 音视频入门基础:AAC专题(10)——FFmpeg源码中计算AAC裸流每个packet的pts、dts、pts_time、dts_time的实现
  • AUTOSAR_EXP_ARAComAPI的5章笔记(6)
  • 高级java每日一道面试题-2024年9月18日-设计模式篇-JDK动态代理,CGLIB代理,AspectJ区别?
  • 组件封装有哪些注意事项—面试常问优美回答
  • 2024网站建设比较好的公司都有哪些
  • re题(35)BUUCTF-[FlareOn4]IgniteMe
  • Docker Redis 7.2.3 部署
  • Spark实操学习
  • 集合框架底层使用了什么数据结构
  • 关于 Goroutines 和并发控制的 Golang 难题
  • 【网络安全的神秘世界】目录遍历漏洞
  • AJAX Jquery $.get $.post $.getJSON
  • STP生成树
  • css 中 em 单位怎么用
  • 医疗数据分析师
  • Uniapp的alertDialog返回值+async/await处理确定/取消问题