当前位置: 首页 > article >正文

Kafka技术详解[6]: 创建主题

目录

 创建主题

 相关概念

 主题:Topic

 分区:Partition

副本:Replication

 副本类型:Leader & Follower

 日志:Log

 创建第一个主题

执行指令

 ZooKeeper节点变化

 数据存储位置

 创建第二个主题

 执行指令

ZooKeeper节点变化

 数据存储位置

 创建第三个主题

 执行指令

 ZooKeeper节点变化

 数据存储位置

 创建主题流程

 命令行提交创建指令

 Controller接收创建主题请求

 创建主题


 创建主题

 相关概念

 主题:Topic

Kafka是一个分布式消息传输系统,采用发布/订阅模式。主题(Topic)是对消息进行逻辑分类的一种手段,由外部业务场景定义(除了两个用于记录消费者偏移量和事务处理的固定主题)。消息的生产者必须将消息发送到特定的主题,而消费者则从特定的主题中获取消息,并且可以同时消费一个或多个主题的数据。

 分区:Partition

分区(Partition)是主题的物理分割,用于解决单个Broker节点上的负载和吞吐量问题。每个主题至少包含一个分区,默认情况下分区数量为1。每个分区都是一个有序的消息队列,并且分区内的每条消息都有一个唯一的偏移量(Offset)。

副本:Replication

为了提高系统的容错性和可靠性,Kafka允许为每个分区创建多个副本(Replication)。副本不能放在同一个Broker上,以避免单点故障导致的数据丢失。通常,分区的一个副本作为Leader副本,负责所有读写操作,其他副本作为Follower副本,用于数据备份。

 副本类型:Leader & Follower

Leader副本处理所有的读写请求,而Follower副本则保持与Leader副本的数据同步。只有Leader副本可以接受读写操作。

 日志:Log

Kafka接收的消息数据最终存储在Log日志文件中。每个主题创建后,都会为其创建对应的分区数据Log文件,并准备好写入数据。

 创建第一个主题

执行指令
[lzl@kafka-broker1 ~]$ cd /opt/module/kafka
[lzl@kafka-broker1 kafka]$ bin/kafka-topics.sh --bootstrap-server kafka-broker1:9092 --create --topic first-topic
 ZooKeeper节点变化

创建后,ZooKeeper中的/config/topics/brokers/topics节点会新增与新主题相关的子节点。

 数据存储位置

主题创建后,数据存储在分区Leader副本所在的Broker节点上。

 创建第二个主题

 执行指令
[lzl@kafka-broker1 ~]$ cd /opt/module/kafka
[lzl@kafka-broker1 kafka]$ bin/kafka-topics.sh --bootstrap-server kafka-broker1:9092 --create --topic second-topic --partitions 3
ZooKeeper节点变化

创建带有3个分区的主题后,ZooKeeper会记录每个分区的配置信息。

 数据存储位置

分区Leader副本分别位于不同的Broker节点上。

 创建第三个主题

 执行指令
[lzl@kafka-broker1 ~]$ cd /opt/module/kafka
[lzl@kafka-broker1 kafka]$ bin/kafka-topics.sh --bootstrap-server kafka-broker1:9092 --create --topic third-topic --partitions 3 --replication-factor 3
 ZooKeeper节点变化

创建带有3个分区和3个副本的主题后,ZooKeeper记录每个分区的配置和其副本分布。

 数据存储位置

分区的多个副本分布在不同的Broker节点上,以提供高可用性和容错能力。

以上步骤展示了如何通过命令行工具创建具有不同分区和副本数目的主题,并简述了创建主题时涉及到的关键概念和ZooKeeper中的节点变化。

 创建主题流程

 命令行提交创建指令
  1. 通过命令行提交指令
    • 指令中包含操作类型(--create)、主题名称(--topic)、主题分区数量(--partitions)、主题分区副本数量(--replication-factor)、副本分配策略(--replica-assignment)等参数。
  2. 指令处理
    • 提交至客户端进行处理。
    • 客户端获取指令后,对指令参数进行校验:
      • 操作类型取值:createlistalterdescribedelete,只能存在一个。
      • 分区数量为大于1的整数。
      • 主题是否已存在。
      • 分区副本数量大于1且小于Short.MaxValue,一般取值小于等于Broker数量。
  3. 封装主题对象
    • 将参数封装为主题对象(NewTopic)。
  4. 创建通信对象
    • 设定请求标记(CREATE_TOPICS),查找Controller,通过通信对象向Controller发起创建主题的网络请求。
 Controller接收创建主题请求
  1. 接收网络请求
    • Controller节点接收到网络请求(Acceptor),并将请求数据封装成请求对象放置在队列(requestQueue)中。
  2. 处理请求
    • 请求控制器(KafkaRequestHandler)周期性从队列中获取请求对象(BaseRequest)。
    • 将请求对象转发给请求处理器(KafkaApis),根据请求对象的类型调用创建主题的方法。
 创建主题
  1. 请求处理器校验主题参数

    • 如果分区数量没有设置,则采用Kafka启动时加载的配置项:num.partitions(默认值为1)。
    • 如果副本数量没有设置,则采用Kafka启动时加载的配置项:default.replication.factor(默认值为1)。
  2. 分区副本分配

    • 使用replica-assignment参数指定的方案创建分区副本。
    • 如果未指定replica-assignment参数,则按照Kafka内部逻辑分配,当前采用的是未指定机架信息的副本分配策略。
      • 分区起始索引设置为0。
      • 计算每一个分区的所有副本位置:
        • 副本起始索引 = (分区编号 + 随机值)% BrokerID列表长度
        • 其他副本索引 = (第一个副本索引 + (1 +(副本分配间隔 + n)% (BrokerID列表长度 - 1))) % BrokerID列表长度
  3. 示例计算

    • 假设当前分区编号:0。

    • BrokerID列表:【1,2,3,4】。

    • 副本数量:4。

    • 随机值(BrokerID列表长度):2。

    • 副本分配间隔随机值(BrokerID列表长度):2。

      • 第一个副本索引:(0 + 2)% 4 = 2。

      • 第一个副本所在BrokerID:3。

      • 第二个副本索引:(2 +(1+(2+0)%3))% 4 = 1。

      • 第二个副本所在BrokerID:2。

      • 第三个副本索引:(2 +(1+(2+1)%3))% 4 = 3。

      • 第三个副本所在BrokerID:4。

      • 第四个副本索引:(2 +(1+(2+2)%3))% 4 = 0。

      • 第四个副本所在BrokerID:1。

    • 最终分区0的副本所在的Broker节点列表为【3,2,4,1】,其他分区采用同样算法。

  4. 保存分区副本ID列表

    • 通过索引位置获取副本节点ID。
    • 保存分区以及对应的副本ID列表。
  5. ZK端创建节点

    • /config/topics节点下,增加当前主题节点,节点类型为持久类型。
    • /brokers/topics节点下,增加当前主题及相关节点,节点类型为持久类型。
  6. Controller节点处理

    • 启动后,在/brokers/topics节点增加监听器,一旦节点发生变化,会触发相应功能:
      • 获取需要新增的主题信息。
      • 更新当前Controller节点保存的主题状态数据。
      • 更新分区状态机的状态为:NewPartition
      • 更新副本状态机的状态:NewReplica
      • 更新分区状态机的状态为:OnlinePartition,从正常的副本列表中获取第一个作为分区的Leader副本,所有副本作为分区的同步副本列表(ISR)。
      • 在ZK路径/brokers/topics/主题名上增加分区节点/partitions,及状态/state节点。
      • 更新副本状态机的状态:OnlineReplica
  7. 发送请求

    • Controller节点向主题的各个分区副本所属Broker节点发送LeaderAndIsrRequest请求。
    • 向所有Broker发送UPDATE_METADATA请求,更新自身缓存。
      • Controller向分区所属的Broker发送请求。
      • Broker节点接收到请求后,根据分区状态信息,设定当前的副本为Leader或Follower,并创建底层的数据存储文件目录和空的数据文件。
        • 文件目录名:主题名 + 分区编号。
        • 文件名:
          • 0000000000000000.log:数据文件,用于存储消息。
          • 0000000000000000.index:索引文件,用于定位数据。
          • 0000000000000000.timeindex:时间索引文件,用于定位数据。

 


http://www.kler.cn/news/321591.html

相关文章:

  • css div多边框斜角边框
  • 配置virtualbox,在windows中与ubuntu共享文件夹
  • Halcon基础系列1-基础算子
  • uni-app canvas文本自动换行
  • 探索 Snowflake 与 Databend 的云原生数仓技术与应用实践 | Data Infra NO.21 回顾
  • 基于matlab语音滤波系统
  • 排序--希尔排序
  • C#入门教程
  • python 实现knn sklearn K近邻分类算法
  • 最新的iOS 18版本和Android 15版本系统分别升级了哪些功能?
  • [大语言模型] 情感认知在大型语言模型中的近期进展-2024-09-26
  • SLF4J报错log4j又报错
  • 新书推荐——《深度学习精粹与PyTorch实践》
  • 网络安全专业,在校大学生如何赚外快,实现财富自由?零基础入门到精通,收藏这一篇就够了
  • Linux下的基本指令/命令(二)
  • 面试小妙招:轻松绕过五大“坑”,展现真实自我
  • python中数据处理库,机器学习库以及自动化与爬虫
  • 设计模式-行为型模式-中介者模式
  • 【C#】DllImport的使用
  • 力扣19 删除链表的倒数第N个节点 Java版本
  • eBPF系列:开发流程
  • 【HarmonyOS】鸿蒙仿iOS线性渐变实现
  • 如何像专家一样修复任何 iPhone 上的“iPhone 已禁用”错误
  • 【Go】Go语言切片(Slice)深度剖析与应用实战
  • chsharp文件如何查找在unity中使用的 位置?
  • 【React】组件通信
  • docker windows下清理后,磁盘空间未释放原因及解决方法
  • 计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-09-27
  • Linux之我不会
  • 【stm32】TIM定时器输出比较-PWM驱动LED呼吸灯/舵机/直流电机