Kafka 中的偏移量是什么?它解决了哪些问题?
在 Kafka 中,偏移量(offset)是为分区中的每条记录(消息)分配的唯一标识符。偏移量是 Kafka 用来维护分区内消息顺序的连续整数。它们帮助生产者、消费者和代理(broker)追踪消息在分区中的具体位置。
Kafka 中的偏移量工作原理
生产者行为:
生产者将消息写入特定主题的分区。每条新消息都会被追加到日志中,Kafka 会为其分配下一个连续的偏移量。
生产者并不直接管理偏移量,这一过程由 Kafka 自动处理。
消费者行为:
消费者从分区中获取消息,并负责提交偏移量。
• 已提交偏移量(Committed Offset):消费者成功处理的最后偏移量。
消费者可以指定开始读取的偏移量:• 最早(Earliest):从分区的开头开始读取。
• 最新(Latest):仅读取新消息。
代理的角色:
代理将消息及其关联的偏移量存储在日志中。偏移量在分区内是不可变的且唯一的。
实时示例:支付处理系统
场景:一个电子商务系统使用 Kafka 处理支付交易。
1. 主题(Topic):
payment-transactions
包含支付请求,每条支付记录包括交易 ID(transactionId)、金额(amount)和状态(status)等详情。2. 分区(Partitions):
假设该主题有 3 个分区以实现负载均衡:
• 分区 0:偏移量 [0, 1, 2, 3…]
• 分区 1:偏移量 [0, 1, 2, 3…]
• 分区 2:偏移量 [0, 1, 2, 3…]
3. 生产者(Producer):
支付网关生产者将支付请求写入 Kafka,消息分布在各个分区中。4. 消费者(Consumers):
支付处理微服务中的消费者读取消息以处理支付:• 消费者 A 处理分区 0。
• 消费者 B 处理分区 1。
• 消费者 C 处理分区 2。
5. 偏移量追踪:
• 处理后提交:每次处理完一条支付记录后,消费者提交最后成功处理的消息的偏移量。
• 如果消费者在处理分区 0 的偏移量 3 后失败,下一个消费者(或重启后)将从偏移量 4 继续。
偏移量管理及示例
Kafka 提供两种偏移量管理策略:
1. 自动偏移量管理(默认):
Kafka 以固定间隔自动提交偏移量。
风险:即使消息未被处理,也可能被标记为已处理。2. 手动偏移量管理:
消费者在处理消息后显式提交偏移量。
提供更多控制权并保证处理完成。
以下是手动偏移量管理的代码示例:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; publicclassManualOffsetExample { publicstaticvoidmain(String[] args) { Propertiesprops=newProperties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "payment-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("enable.auto.commit", "false"); // 禁用自动提交 KafkaConsumer<String, String> consumer = newKafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("payment-transactions")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("Processing transaction with offset %d: %s%n", record.offset(), record.value()); // 此处添加业务逻辑,例如处理支付 // 处理完成后手动提交偏移量 consumer.commitSync(); } } } finally { consumer.close(); } } }
偏移量管理的优势
1. 可重放性(Replayability):
Kafka 允许消费者通过重置偏移量重新读取消息,例如重新处理失败的交易。2. 并行性(Parallelism):
在分区主题中,消费者可以独立管理偏移量,实现可扩展的并行处理。3. 容错性(Fault Tolerance):
如果消费者失败,另一个消费者可以从最后提交的偏移量继续处理。
实时问题:消费者失败
场景:
• 一个支付消费者在处理分区 0 的偏移量 4 后崩溃。
• 消费者组的另一个实例接管并从偏移量 4 恢复处理。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/590109.html 如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!