Kafka性质小结
1、关于消息偏移量的确认
消息的确认包括自动确认和手动确认,通常采用手动确认的方式,配置项和代码块分别如下所示。这里需要注意的是,当消息1、2、3顺序到达,2偏移量确认失败,3偏移量确认成功时,2的偏移量将被覆盖,即后续将从3的偏移量开始消费,不会再次消费消息2 !!!
spring:
kafka:
consumer:
......
# 关闭自动提交偏移量
enable-auto-commit: false
listener:
# 拉取数据方式: single、batch
type: single
# 偏移量提交方式:手动
ack-mode: manual_immediate
@Component
public class MessageConsumer {
@KafkaListener(topics = "topic-smy", groupId = "my_group1")
public void handleMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
// 消息处理
System.out.println("接收到消息, topic = "+ record.topic()
+",partition:"+record.partition()
+",offset = "+record.offset()
+",key = "+record.key()
+",value = "+record.value());
if(!"0".equals(record.value())) {
// 消息偏移量确认
ack.acknowledge();
}
}
}