Kafka 主题管理
主题作为消息的归类,分区则是对消息的二次归类。分区可以有一至多个副本,每个副本对应一个日志文件。
分区的划分不仅为Kafka提供了可伸缩性、水平扩展的功能,还通过多副本机制来为Kafka提供数据冗余以提高可靠性。
图 主题、分区、副本和日志之间的关系
1 主题管理
可以通过kafka-topics.sh脚本来执行主题的创建、查看、修改、删除等操作。也可以通过KafkaAdminClient的方式实现。
1.1 创建主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic topic-manager1 --partitions 3 --replication-factor 3
这个命令是创建一个名为topic-manager1的主题,其有3个分区,每个分区有3个副本。
图 查看topic-manager1 的分区副本和分配细节
1.1.1 日志文件
日志文件目录由broker的配置log.dir决定,默认为“/tmp/kafka-logs/”。
在执行完创建主题的脚本后,Kafka会在日志文件目录下创建<topic>-<partition> 命名格式的文件夹。文件夹下保存的是该主题的<partition>分区某个副本的日志信息。
图 日志文件相关信息
1.1.2 zookeeper
当创建一个主题时,会在Zookeeper的brokers/topics/目录下创建一个同名的实节点,该节点中记录了该主题的分区副本分配方案。
图 zookeeper 的 /brokers/topics/topic-manager1 实节点
zookeeper 还记录了创建broker、主题等时指定的配置信息。
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic topic-manager2 --partitions 1 --replication-factor 1 --config max.message.bytes=10000
上面的脚本创建了topic-manager2 主题,并通过--config 配置了参数。
图 zookeeper的/config/topics/topic-manager2 实节点
1.1.3 主题查看
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看当前所有可用的主题。
图 --list 指令类型
bin/kafka-topics.sh -describe --zookeeper localhost:2181 --topic topic-manager1,topic-manager2
查看topic-manager1及topic-manager2的副本详细分配方案
图 --describe 指令类型
可以指定多个主题,用逗号隔开。
1.2 分区分配
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic topic-manager3 --replica-assignment 0:1,1:2,2:0
该脚本通过--replica-assignment 参数来手动指定分区副本的方案。
图 查看topic-manager3 的分区副本和分配细节
如果不手动指定分区副本的分配方案,则由Kafka自动分配。有两种策略:未指定机架信息和指定机架信息。
1.2.1 未指定机架信息
副本分配的目标是均衡分布与副本隔离。
均衡分布:副本应均衡地分散在各个broker上,以避免某些broker负载过重而其他broker负载过轻的情况。
副本隔离:对应某个broker上分配的分区,它的其他副本应放置在其他broker上,以确保数据的可用性和容错性。
1.2.2 指定机架信息
机架信息在集群管理中指的是描述机架及其内部布局和状态的各种数据,主要包含以下内容:
- 机架标志,机架ID及机架具体位置。
- 机架设备信息,设备类型、设备型号和序列号、设备状态。
- 机架布局,设备布局图、U位信息。
- 电源和散热信息,电源分配、散热系统。
- 网络连接信息,网络拓补、端口和链路信息。
- 其他信息,机架容量、机架访问权限。
Kafka 分配副本时考虑机架的因素,以避免将整个分区的所有副本都放在同一个机架的broker上。机架感知主要有以下作用:
- 提高容错性,保证至少有一个副本存在于其他机架的broker上,从而确保数据的可靠性。
- 优化网络性能,机架之间的网络连接可能存在一定的延迟和带宽限制。通过机架感知,Kafka将副本分配到不同的机架,以减少跨机架的数据传输,提高系统的整体性能。
1.3 主题修改
--alter 指令可以用来增加主题的分区数及变更主题的配置。
bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic par1 --partitions 2 --config max.message.bytes=20000
图 查看修改后的主题
新增分区不会对历史消息产生影响,它们还会继续保留在它们最初被分配的分区里。但会改变消息的分区策略,对消息的后续处理和消费者行为产生影响。建议在一开始就设置好分区数量,避免以后对其进行调整。
1.3.1 不支持减少分区
Kafka现不支持减少分区。主要在于需要考虑被删除的分区中的消息如何处理:
- 如果消息随着分区一起删除,那么消息的可靠性得不到保障。
- 消息不删除,那消息如何保留:a)直接存储到现有分区的尾部,消息的时间戳就不会递增,且消息逻辑顺序也更混乱。b)消息量很大的时候,内部的数据复制会占用很大的资源,在复制期间,主题的可用性、消息顺序性、事务性问题都需要考虑。
1.4 主题删除
删除主题,可以是否一些资源,会删除主题下的所有消息。删除主题,需要将broker的delete.topic.enable参数配置为true。
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic par1
1.4.1 通过zookeeper删除主题
使用kafka-topics.sh 删除主题的行为本质上只是在zookeeper中的/admin/delete_topics路径下创建一个与待删除主题同名的节点,以此标记该主题为待删除的状态(真正删除主题的动作也是由Kafka的控制器完成)。
create /admin/delete_topics/par3 ""
1.4.2 手动删除
1)删除Zookeeper中的节点 /config/topics/主题名称
deleteall /config/topics/par1
2)删除Zookeeper中的节点 /brokers/topics/主题名称 及其子节点。
deleteall /brokers/topics/par1
3)删除集群中所有与被删除主题相关的文件。
rm -rf /tmp/kafka-logs/par1-0
1.5 配置管理
kafka-configs.sh 专门用来对配置进行操作。--alter 变更配置,--describe查看动态配置。支持对主题、broker、用户和客户端这4个类型进行配置管理。
bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type brokers --entity-name 0
entity-type | entity-name |
topics | 指定主题的名称。 |
brokers | 指定brokerId值。 |
clients | 指定clientId值,即KafkaProducer或KafkaConsumer的client.id参数配置的值。 |
users | 指定用户名。 |
表 entity-type 和 entity-name 的对应关系
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name top1 --add-config cleanup.policy=compact,max.message.bytes=10000