当前位置: 首页 > article >正文

Kafka 命令详解及使用示例

文章目录

  • Kafka 命令详解及使用示例
  • Kafka 命令详解
    • `kafka-topics.sh`:主题管理
      • 创建主题
      • 创建带副本的主题
      • 修改主题分区数
      • 了解分区分布
      • 列出主题
      • 查看主题详情
      • 删除主题
    • `kafka-console-producer.sh`:消息生产者
      • 发送消息到主题
      • 带键值对的消息
      • 消息生产性能优化
      • 带分区键的消息发送
    • `kafka-console-consumer.sh`:消息消费者
      • 消费主题中的消息
      • 只读取键值对消息
      • 实时消费消息
      • 只消费特定分区的消息
      • 以 JSON 格式输出消息
    • `kafka-consumer-groups.sh`:消费者组管理
      • 查看消费者组信息
      • 查看消费者组的偏移量信息
      • 重置消费者组的偏移量
    • `kafka-configs.sh`:配置管理
      • 查看主题配置
      • 修改主题配置
    • `kafka-acls.sh`:访问控制列表管理
      • 为用户创建权限
      • 删除用户权限
  • 示例总结


Kafka 命令详解及使用示例

Kafka 是一个分布式流处理平台,提供了高吞吐量、低延迟的消息系统。Kafka 主要用于消息发布-订阅模式中的消息传输,广泛应用于数据管道、日志系统、事件追踪等场景。本文将介绍 Kafka 中常用的命令行工具及其具体使用方式,帮助开发者更好地管理和使用 Kafka。


Kafka 命令详解

kafka-topics.sh:主题管理

主题(Topic)是 Kafka 中消息的逻辑分类,所有消息都发送到指定的主题中。kafka-topics.sh 用于管理主题,包括创建、删除、列出主题等操作。

创建主题

bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
  • --topic:主题名称。
  • --partitions:分区数,消息将分布在多个分区中。
  • --replication-factor:副本因子,用于消息的高可用性。

创建带副本的主题

在分布式环境中,副本对于 Kafka 来说至关重要,它能确保在 Broker 故障时,消息不会丢失。创建主题时设置合适的副本数和分区数非常关键。

bin/kafka-topics.sh --create --topic important-topic --partitions 5 --replication-factor 3 --bootstrap-server localhost:9092
  • --partitions 设置为 5,意味着主题的数据会被分散到 5 个分区中,提升并发处理能力。
  • --replication-factor 设置为 3,确保每个分区有 3 个副本(在不同的 Broker 上),提高容错性。

注意:副本数不能超过集群中的 Broker 数量,生产环境中一般设置副本数为 3,保证高可用性。

修改主题分区数

Kafka 支持在线扩展主题的分区数。可以在不停止服务的情况下动态增加分区数,但要注意增加分区会影响数据的顺序性,因为 Kafka 不会自动对已存在的数据进行重分配。

bin/kafka-topics.sh --alter --topic important-topic --partitions 10 --bootstrap-server localhost:9092

此命令将 important-topic 的分区数从 5 扩展到 10 个。

了解分区分布

通过 --describe 命令,可以查看每个分区在哪些 Broker 上存储,并了解它们的副本状态。

bin/kafka-topics.sh --describe --topic important-topic --bootstrap-server localhost:9092

输出的结果会显示每个分区的副本和首领(Leader)在哪个 Broker 上。Leader 是处理读写请求的副本,其他副本是跟随者,用于容错。

列出主题

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

查看主题详情

bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092

删除主题

bin/kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092

kafka-console-producer.sh:消息生产者

Kafka 提供了一个控制台生产者工具,允许我们从命令行发送消息到指定主题。

发送消息到主题

bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092

输入消息后按 Enter 发送到 Kafka 主题。

带键值对的消息

bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"

在这里,消息的键和值通过冒号分隔,例如:

key1:value1
key2:value2

消息生产性能优化

在高吞吐量场景下,可以通过调整生产者配置来提高性能。例如,批量发送消息和异步生产可以显著提高效率。

bin/kafka-console-producer.sh --topic fast-topic --bootstrap-server localhost:9092 --producer-property batch.size=16384 --producer-property linger.ms=5
  • batch.size:控制批量消息的大小(以字节为单位),Kafka 会尝试将消息累积到这个大小后一起发送。
  • linger.ms:在批量消息发送前的等待时间,可以通过稍微延迟发送消息来增加批量的大小。

此外,生产者可以配置为异步发送,这样可以减少网络等待时间:

--producer-property acks=1
  • acks=1 表示只等待 Leader 确认即可继续发送消息,这种方式可以提高性能,但有可能在 Leader 故障时丢失部分消息。

带分区键的消息发送

指定消息发送到特定的分区时,可以使用 key 参数,这在有状态的消息处理(如事务处理)场景中非常重要。

bin/kafka-console-producer.sh --topic partitioned-topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"

这样每个消息都会根据键(key)被分配到相同的分区。例如,key1:message1key1:message2 会发送到相同的分区。


kafka-console-consumer.sh:消息消费者

消费者用于从 Kafka 主题中读取消息。kafka-console-consumer.sh 是 Kafka 提供的命令行消费者工具。

消费主题中的消息

bin/kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092
  • --from-beginning:从主题的起始位置读取所有消息。

只读取键值对消息

bin/kafka-console-consumer.sh --topic my-topic --bootstrap-server localhost:9092 --property print.key=true --property key.separator=,

这样读取的消息会显示为键和值的格式,例如:

key1,value1

实时消费消息

使用 kafka-console-consumer.sh 来实时消费主题中的消息:

bin/kafka-console-consumer.sh --topic fast-topic --bootstrap-server localhost:9092

如果要从主题的起始位置读取消息,可以添加 --from-beginning 参数。

只消费特定分区的消息

Kafka 支持直接从某个分区中读取消息。在某些场景下(如故障恢复或日志分析),我们可能只需要处理某个分区的数据:

bin/kafka-console-consumer.sh --topic important-topic --bootstrap-server localhost:9092 --partition 0 --offset 10

此命令从分区 0 开始读取第 10 条消息。

以 JSON 格式输出消息

Kafka 消费者可以输出 JSON 格式的消息,方便后续处理和分析:

bin/kafka-console-consumer.sh --topic json-topic --bootstrap-server localhost:9092 --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.separator=, --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

kafka-consumer-groups.sh:消费者组管理

Kafka 的消费者组允许多个消费者一起协同消费消息,每个分区的消息只能被一个组内的消费者消费。kafka-consumer-groups.sh 可以用于管理消费者组。

查看消费者组信息

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

查看消费者组的偏移量信息

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

输出信息包括每个分区的已消费消息偏移量以及消费者的状态。

重置消费者组的偏移量

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-earliest --execute --topic my-topic
  • --to-earliest:将偏移量重置为最早的消息。

kafka-configs.sh:配置管理

Kafka 主题和代理的配置可以通过 kafka-configs.sh 进行管理。

查看主题配置

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe

修改主题配置

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --add-config retention.ms=172800000

此命令将主题 my-topic 的消息保留时间修改为 2 天(单位为毫秒)。


kafka-acls.sh:访问控制列表管理

Kafka 提供了基于 ACL(访问控制列表)的权限管理。kafka-acls.sh 用于管理权限。

为用户创建权限

bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Alice --operation All --topic my-topic

此命令允许用户 Alice 对主题 my-topic 执行所有操作。

删除用户权限

bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --allow-principal User:Alice --operation All --topic my-topic

示例总结

我们通过几个简单的示例介绍了 Kafka 的基本操作:

  1. 创建主题 my-topic,并通过控制台生产者发送消息。
  2. 使用控制台消费者从该主题中读取消息。
  3. 管理消费者组的偏移量,重置到最早的消息。
  4. 修改主题的保留时间,以及管理用户的权限。

Kafka 提供了丰富的命令行工具,用于主题、消费者组、配置、权限等的管理。灵活使用这些命令,可以帮助我们高效地维护 Kafka 集群。


http://www.kler.cn/a/313309.html

相关文章:

  • 使用LPT wiggler jtag自制三星单片机(sam88 core)编程器-S3F9454
  • 学习ASP.NET Core的身份认证(基于JwtBearer的身份认证6)
  • C++ 模拟真人鼠标轨迹算法 - 防止游戏检测
  • React 中hooks之useReducer使用场景和方法总结
  • 写作利器:如何用 PicGo + GitHub 图床提高创作效率
  • 要获取本地的公网 IP 地址(curl ifconfig.me)
  • 半导体器件制造5G智能工厂数字孪生物联平台,推进制造业数字化转型
  • java--章面向对象编程(高级部分)
  • 在 Debian 12 上安装 Java 21
  • 【VUE3.0】动手做一套像素风的前端UI组件库---Button
  • iftop流量监控工具
  • 第三方软件测评机构简析:软件安全性测试的方法和流程
  • WSL2+Ubuntu 22.04搭建Qt开发环境+中文输入法
  • 云计算课程作业1
  • react native(expo)多语言适配
  • 技术成神之路:设计模式(十四)享元模式
  • 论文中译英的最佳解决方案?ChatGPT自我反思翻译法了解一下!
  • 分享3款开源、免费的Avalonia UI控件库
  • 引领长期投资新篇章:价值增长与财务安全的双重保障
  • JSBSim中的运动方程模型(更新ing........
  • 计算机视觉—3d点云数据基础
  • VUE3配置路由(超级详细)
  • Python知识点:使用Cython进行Python性能优化
  • YOLO交通目标识别数据集(红绿灯-汽车-自行车-卡车等)
  • 无人机视角电力巡检资产检测与异常判别数据集
  • 【数据结构】排序算法---快速排序