当前位置: 首页 > article >正文

【RocketMQ】消费失败重试与死信消息

🎯 导读:本文档详细介绍了RocketMQ中的重试机制与死信消息处理方法。对于生产者而言,文档提供了如何配置重试次数的具体示例;而对于消费者,它解释了默认情况下消息消费失败后的重试策略,并展示了如何通过代码自定义重试次数。当消息经过多次重试仍无法成功消费时,RocketMQ会将其标记为死信消息,并存入特定的死信队列中。文档还提供了处理死信队列的两种策略:一种是编写专门的消费者来处理这些消息,另一种是在达到一定重试次数后签收消息并通知人工干预。此外,还包括了关于死信消息生产和消费的基本示例代码。

文章目录

  • RocketMQ 重试机制
    • 生产者重试
    • 消费者重试
  • RocketMQ 死信消息
    • 消息生产者
    • 消息消费者
    • 死信消费者
    • 控制台显示

RocketMQ 重试机制

生产者重试

// 失败的情况重发3次(同步)
producer.setRetryTimesWhenSendFailed(3);
// 失败的情况重发3次(异步)
producer.setRetryTimesWhenSendAsyncFailed(3);
// 消息在1S内没有发送成功,就会重试
producer.send(msg, 1000);

【示例代码】

@Test
public void retryProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("retry-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    // 如果发送失败要重试几次(同步),不设置默认值是2
    producer.setRetryTimesWhenSendFailed(3);
    // 如果发送失败要重试几次(异步)
//        producer.setRetryTimesWhenSendAsyncFailed(3);
    String key = UUID.randomUUID().toString();
    System.out.println(key);
    Message message = new Message("retryTopic", "vip1", key, "我是vip666的文章".getBytes());
    producer.send(message);
    System.out.println("发送成功");
    producer.shutdown();
}

消费者重试

如果消息消费失败,默认会重试16次,重试的时间间隔:10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

能否自定义重试次数?

可以,重试的次数一般设置为5

// 消费失败,重试几次
consumer.setMaxReconsumeTimes(5);

如果重试了16次(并发模式是16次,顺序模式下重试次数是 int 类型最大值) 都是失败的,怎么处理?

认为该消息是死信消息,将消息放在一个死信主题中去,名称:%DLQ%消费者组名,最后再实现一个消费者去消费死信消息,一般是发邮件发短信通知人工处理、做一些记录

在这里插入图片描述

死信队列只有一个队列

在这里插入图片描述

当消息处理失败的时候 该如何正确的处理?

方案一:处理死信队列,如果每个死信队列都写一个消费者,很麻烦

/**
 * 方案一
 * 死信队列消费者
 * @throws Exception
 */
@Test
public void retryDeadConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("%DLQ%retry-consumer-group", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            MessageExt messageExt = msgs.get(0);
            System.out.println(new Date());
            System.out.println(new String(messageExt.getBody()));
            System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
            // 业务报错了 返回null 返回 RECONSUME_LATER 都会重试
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

方案二:在实际生产过程中,一般重试3-5次,如果还没有消费成功,则可以把消息签收了,通知人工等处理

/**
 * 方案二
 * 重试次数较多,直接做日志记录、通知人工处理
 * @throws Exception
 */
@Test
public void retryConsumer2() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("retryTopic", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            MessageExt messageExt = msgs.get(0);
            System.out.println(new Date());
            try {
                // 业务处理,模拟报错
                handleDb();
            } catch (Exception e) {
                // 重试
                int reconsumeTimes = messageExt.getReconsumeTimes();
                if (reconsumeTimes >= 3) {
                    // 重试次数太大,不要重试了
                    System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            // 业务报错了 返回null 返回 RECONSUME_LATER 都会重试
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

private void handleDb() {
    int i = 10 / 0;
}

RocketMQ 死信消息

当消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列

  • 当一条消息初次消费失败, RocketMQ 会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue),死信队列是死信Topic下分区数唯一的单独队列。
  • 如果产生了死信消息,对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。
    • 可以利用 RocketMQ Admin 工具或者 RocketMQ Dashboard 上查询到对应死信消息的信息。
    • 也可以监听死信队列,进行自己的业务上的逻辑,写日志、通知人工处理

消息生产者

@Test
public void testDeadMsgProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("dead-group");
    producer.setNamesrvAddr("localhost:9876");
    producer.start();
    Message message = new Message("dead-topic", "我是一个死信消息".getBytes());
    producer.send(message);
    producer.shutdown();
}

消息消费者

@Test
public void testDeadMsgConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dead-group");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.subscribe("dead-topic", "*");
    // 设置最大消费重试次数 2 次
    consumer.setMaxReconsumeTimes(2);
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println(msgs);
            // 测试消费失败
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    });
    consumer.start();
    System.in.read();
}

死信消费者

注意权限问题

在这里插入图片描述

@Test
public void testDeadMq() throws  Exception{
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dead-group");
    consumer.setNamesrvAddr("localhost:9876");
    // 消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列
    // 队列名称 默认是 %DLQ% + 消费者组名
    consumer.subscribe("%DLQ%dead-group", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println(msgs);
            // 处理消息 签收了
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

控制台显示

在这里插入图片描述

在这里插入图片描述


http://www.kler.cn/a/325523.html

相关文章:

  • go chan底层分析
  • Jenkins与不同阶段测试的完美结合
  • C语言初阶习题【30】字符串左旋
  • TCP 连接状态标识 | SYN, FIN, ACK, PSH, RST, URG
  • Android15源码编译问题处理
  • Emacs 折腾日记(九)——elisp 数组与序列
  • 低代码平台中的宿主概念解析与字典、角色、岗位及权限管理
  • 金属增材制造咋突破?纳米纹理粉末如何助力金属增材制造?
  • C++ bitset(位图)的模拟实现
  • JS设计模式之状态模式:优雅地管理应用中产生的不同状态
  • Java并发:互斥锁,读写锁,公平锁,Condition,StampedLock
  • 穿越数字迷雾:探索IT领域的无限可能
  • 后端开发面试题8(附答案)
  • k8s_资源管理介绍
  • Spring Boot驱动的在线房产租赁服务
  • next 从入门到精通
  • 【RabbitMQ】幂等性、顺序性
  • js 如何监听 body 内容是否改变
  • 【WPF】桌面程序开发之窗口的用户控件详解
  • MATLAB绘图基础
  • 利用 PostgreSQL 构建 RAG 系统实现智能问答
  • 小程序视频编辑SDK解决方案,轻量化视频制作解决方案
  • 三菱PLC数据 转 profinet IO项目案例
  • 使用 Docker 部署 RStudio 的终极教程
  • Spring Boot 点餐系统:一键订餐解决方案
  • C++之Person类中调用Date类