当前位置: 首页 > article >正文

kafka 一步步探究消费者组与分区分配策略

本期主要聊聊kafka消费者组与分区

消费者组 & 消费者

图片

每个消费者都需要归属每个消费者组,每个分区只能被消费者组中一个消费者消费

上面这段话还不够直观,我们举个例子来说明。

  • 订单系统 订单消息通过 order_topic 发送,该topic 有 5个分区

  • 结算系统 订阅订单变更消息订阅订单的消息,进行相关结算业务,结算系统的消费者组是order_consumer_settlement

    • 结算系统有三台服务器,所有消费者组是order_consumer_settlement有三个消费者,由于每个分区被不同消费者(服务器)处理,所以消息不会被不同结算服务节点重复消费,且每条消息都有一个唯一的机器处理结算

  • 审计系统 也订阅订单变更消息,它的消费者组是order_consumer_audit ,审计系统部署2台,它的消息处理与结算系统之间毫不相干。

如果订单系统发送一条订单变更消息,该消息所在分区是确定的,结算系统的一个服务节点会消费到这条消息,审计系统的一个服务节点也会消费到该消息。

通常情况下我们一个 应用 对应某个Topic 的 消费者组,该应用的每个服务节点都对应一个消费者。

按照应用对应消费者组方式,一条消息有且只有一个服务节点处理,在排除配置和一些特殊情况,消息不会重复消息,也不会丢失

消费者分区与主题数量关系

可以在日志中grep Adding newly assigned partition关键字查看当前服务节点分配的分区

图片

不同数量消费者分区情况

  • 分区数量为10 ,消费者组有2个节点

    • 消费者分配到的分区是 5、5

  • 分区数量为10 ,消费者组有3个节点

    • 消费者分配到的分区是 4、3、3

  • 分区数量为10 ,消费者组有4个节点

    • 消费者分配到的分区是 3、2、2、3

  • 分区数量为10 ,消费者组有5个节点

    • 消费者分配到的分区是 2、2、2、2、2

从上面数据看每个消费者分配的分区数量是均匀的

举一反三

如果 分区数量 小于 消费者数量,猜测会有消费者一个分区也分配不到测试分区数量为3 , 消费者数量为4 时。下面日志证明结论正确的

图片

刨根到底,分区分配策略如何调整

前面都是默认情况下,分区配置策略,不同版本默认策略是不同的,通过partition.assignment.strategy 配置,常见分配策略有

RangeAssignor

  • RangeAssignor策略对每个Topic进行独立的分区分配

  • 分区按照分区ID进行排序,然后订阅这个Topic的消费组的消费者再进行排序

  • 分配时尽量均衡地将分区分配给消费者

缺点只保证单个主题情况下均匀分配,对于消费者同时订阅多个主题情况下可能会造成 总体分区分配不合理。

RoundRobinAssignor

  • 所有消费者和所有分区都进行排序,然后按照轮询的方式将分区分配给消费者

该策略在RangeAssignor进行了优化,当然如果消费者订阅的主题列表不同情况下分配结果也是不均匀的。

StickyAssignor

它在RangeAssignor的基础上引入了“粘性”的限制。当消费者组中消费者离开或加入,尽量保留现有的分配结果,并使新的分配结果均衡。

CooperativeStickyAssignor

它是Kafka 2.4.0 引入的一种新的分配策略。它将原来的一次全局分区重平衡改为多次小规模分区重平衡。这种策略能够更平滑地处理消费者加入或离开的情况,减少因全局重平衡带来的性能开销。

How 修改默认策略 ?

参考下面代码设置ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG 甚至可以自定义策略

   private Properties getConfig() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "xxxx:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,consumerGroup);
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,Arrays.asList(RangeAssignor.class.getName()))
        return properties;
    }


http://www.kler.cn/news/311390.html

相关文章:

  • Reactor介绍,如何从简易版本的epoll修改成Reactor模型(demo版本代码+详细介绍)
  • YOLOv5/v8 + 双目相机测距
  • 学习大数据DAY58 增量抽取数据表
  • JavaWeb项目打包、部署至Tomcat并启动的全程指南(图文详解)
  • saltstack远程执行
  • 基于SpringBoot+Vue+MySQL的热门网络游戏推荐系统
  • 【网站架构部署与优化】web服务与http协议
  • 十大排序算法的特点及应用场景
  • 英飞凌最新AURIX™TC4x芯片介绍
  • kafka原理剖析及实战演练
  • MySQL-binlog、redolog和undolog的区别
  • android BLE 蓝牙的连接(二)
  • AI生成内容:优点与缺点
  • Docker实操:安装MySQL5.7详解(保姆级教程)
  • 【软考】数据字典(DD)
  • 游戏、网关等服务借助Docker容器化并使用Kubernetes部署、更新等
  • MySQL 中的 EXPLAIN 命令:洞察查询性能的利器
  • MySQL 中的索引覆盖扫描:加速查询的秘密武器
  • 【Linux】Ubuntu 22.04 shell实现MySQL5.7 tar 一键安装
  • 独立站技能树之建站33项自检清单 1.0丨出海笔记
  • STM32 HAL freertos零基础(十一)中断管理
  • Linux技术04-IPVS
  • 游戏如何对抗定制挂
  • Linux线程基础
  • Java-测试-Mockito 入门篇
  • FTP、SFTP安装,整合Springboot教程
  • 基于剪切板的高速翻译工具
  • 【Qt | QAction】Qt 的 QAction 类介绍
  • 电脑键盘功能基础知识汇总
  • Leetcode面试经典150题-130.被围绕的区域