【RocketMQ】消费失败重试与死信消息
🎯 导读:本文档详细介绍了RocketMQ中的重试机制与死信消息处理方法。对于生产者而言,文档提供了如何配置重试次数的具体示例;而对于消费者,它解释了默认情况下消息消费失败后的重试策略,并展示了如何通过代码自定义重试次数。当消息经过多次重试仍无法成功消费时,RocketMQ会将其标记为死信消息,并存入特定的死信队列中。文档还提供了处理死信队列的两种策略:一种是编写专门的消费者来处理这些消息,另一种是在达到一定重试次数后签收消息并通知人工干预。此外,还包括了关于死信消息生产和消费的基本示例代码。
文章目录
- RocketMQ 重试机制
- 生产者重试
- 消费者重试
- RocketMQ 死信消息
- 消息生产者
- 消息消费者
- 死信消费者
- 控制台显示
RocketMQ 重试机制
生产者重试
// 失败的情况重发3次(同步)
producer.setRetryTimesWhenSendFailed(3);
// 失败的情况重发3次(异步)
producer.setRetryTimesWhenSendAsyncFailed(3);
// 消息在1S内没有发送成功,就会重试
producer.send(msg, 1000);
【示例代码】
@Test
public void retryProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("retry-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
// 如果发送失败要重试几次(同步),不设置默认值是2
producer.setRetryTimesWhenSendFailed(3);
// 如果发送失败要重试几次(异步)
// producer.setRetryTimesWhenSendAsyncFailed(3);
String key = UUID.randomUUID().toString();
System.out.println(key);
Message message = new Message("retryTopic", "vip1", key, "我是vip666的文章".getBytes());
producer.send(message);
System.out.println("发送成功");
producer.shutdown();
}
消费者重试
如果消息消费失败,默认会重试16次,重试的时间间隔:10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
能否自定义重试次数?
可以,重试的次数一般设置为5
// 消费失败,重试几次
consumer.setMaxReconsumeTimes(5);
如果重试了16次(并发模式是16次,顺序模式下重试次数是 int 类型最大值) 都是失败的,怎么处理?
认为该消息是死信消息,将消息放在一个死信主题中去,名称:%DLQ%消费者组名
,最后再实现一个消费者去消费死信消息,一般是发邮件发短信通知人工处理、做一些记录
死信队列只有一个队列
当消息处理失败的时候 该如何正确的处理?
方案一:处理死信队列,如果每个死信队列都写一个消费者,很麻烦
/**
* 方案一
* 死信队列消费者
* @throws Exception
*/
@Test
public void retryDeadConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("%DLQ%retry-consumer-group", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt messageExt = msgs.get(0);
System.out.println(new Date());
System.out.println(new String(messageExt.getBody()));
System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
// 业务报错了 返回null 返回 RECONSUME_LATER 都会重试
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
方案二:在实际生产过程中,一般重试3-5次,如果还没有消费成功,则可以把消息签收了,通知人工等处理
/**
* 方案二
* 重试次数较多,直接做日志记录、通知人工处理
* @throws Exception
*/
@Test
public void retryConsumer2() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("retryTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt messageExt = msgs.get(0);
System.out.println(new Date());
try {
// 业务处理,模拟报错
handleDb();
} catch (Exception e) {
// 重试
int reconsumeTimes = messageExt.getReconsumeTimes();
if (reconsumeTimes >= 3) {
// 重试次数太大,不要重试了
System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 业务报错了 返回null 返回 RECONSUME_LATER 都会重试
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
private void handleDb() {
int i = 10 / 0;
}
RocketMQ 死信消息
当消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列
- 当一条消息初次消费失败, RocketMQ 会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue),死信队列是死信Topic下分区数唯一的单独队列。
- 如果产生了死信消息,对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。
- 可以利用 RocketMQ Admin 工具或者 RocketMQ Dashboard 上查询到对应死信消息的信息。
- 也可以监听死信队列,进行自己的业务上的逻辑,写日志、通知人工处理
消息生产者
@Test
public void testDeadMsgProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("dead-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message message = new Message("dead-topic", "我是一个死信消息".getBytes());
producer.send(message);
producer.shutdown();
}
消息消费者
@Test
public void testDeadMsgConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dead-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("dead-topic", "*");
// 设置最大消费重试次数 2 次
consumer.setMaxReconsumeTimes(2);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(msgs);
// 测试消费失败
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
System.in.read();
}
死信消费者
注意权限问题
@Test
public void testDeadMq() throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dead-group");
consumer.setNamesrvAddr("localhost:9876");
// 消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列
// 队列名称 默认是 %DLQ% + 消费者组名
consumer.subscribe("%DLQ%dead-group", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(msgs);
// 处理消息 签收了
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}