当前位置: 首页 > article >正文

@KafkaListener注解

@KafkaListener(topics = "${xxxx}", containerFactory = "xxx")
public void consume(String message) {
    service.save(xx);
}

在 Spring Boot 中,使用 @KafkaListener 注解消费 Kafka 消息时,如果 service.save(cleanWikiSource) 抛出异常,消息不会被认为已经成功消费完毕。相应的行为取决于 Kafka 配置的 ack 模式以及错误处理机制:

默认情况(ack 模式为 BATCH 或 RECORD):

如果 service.save(cleanWikiSource) 抛出异常,Kafka 会自动将消息标记为未成功消费,并会重新尝试交付此消息(即消息会被重新消费)。
如果配置了 max.poll.interval.ms 或 max.poll.records 等参数,可以控制消费者重试间隔和次数。
手动确认(ack 模式为 MANUAL 或 MANUAL_IMMEDIATE):

如果您启用了手动提交确认(通过 KafkaListener 配置 ackMode = MANUAL),则在成功处理完消息后需手动调用 Acknowledgment.acknowledge() 方法标记消息已被消费。
如果 cleanWikiSourceService.save 方法失败且没有调用 acknowledge(),那么消息将会被重新投递。
错误处理机制:

可以配置 SeekToCurrentErrorHandler(适用于 KafkaListenerContainerFactory)来处理消费异常。此类错误处理器允许指定重试次数和延迟时间,消息在达到最大重试次数后将跳过或交给死信队列。


http://www.kler.cn/news/357588.html

相关文章:

  • 使用docker搭建lnmp运行WordPress
  • 2011年国赛高教杯数学建模B题交巡警服务平台的设置与调度解题全过程文档及程序
  • 光纤光学的基本方程
  • 纯血鸿蒙!
  • BI 的前置计算
  • 2023年ICPC亚洲合肥赛区赛 C. Cyclic Substrings
  • 关于Android12以上的闪屏兼容
  • 深度学习速通系列:中文命名实体识别(NER)数据集大全(含链接)
  • Python 基础语法
  • 【C++】deque(空间适配器))
  • pip安装指定版本的tensorflow
  • ChatGPT Plus 升级全攻略
  • 基于SSM+微信小程序的打印室预约管理系统(打印2)
  • leetcode动态规划(三)-爬楼梯
  • 如何给手机换ip地址
  • sql 常用语法
  • NFT Insider #152:The Sandbox Alpha 第4季开启
  • Java基础系列和实战
  • HTTP Proxy环境下部署Microsoft Entra Connect和Health Agents
  • 一个fiber对象有哪些属性,是怎样创建出来的