RabbitMQ 之 死信队列
一、死信的概念
先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理
解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息
消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
二、死信的来源
1、消息 TTL 过期
2、队列达到最大长度(队列满了,无法再添加数据到 mq 中)
3、消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
三、死信实战
代码架构图
生产者正常情况下走的是普通的交换机,这个交换机的类型是 direct ,它和普通队列之间的关系是一个叫 "zhangsan" 的路由 key, 正常情况下会被 C1 消费。但是发生了上面所说的三种情况中的一种,成为了死信,然后被转换到死信交换机中,这个死信交换机也是 direct 类型,它们之间的 routingKey 是 "lisi",然后就进入了死信队列,死信队列由 C2 消费。
消息 TTL 过期
1.application.yml 配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: metieVirtualHosts
# listener:
# simple:
# retry:
# ### 开启消费者(程序出现异常的情况下会进行重试)
# enabled: true
# ### 最大重试次数
# max-attempts: 5
# ### 重试间隔次数
# initial-interval: 3000
# acknowledge-mode: manual
server:
port: 8080
###死信队列
mayikt:
dlx:
exchange: mayikt_dlx_exchange
queue: mayikt_order_dlx_queue
routingkey: dlx
### 订单
order:
exchange: mayikt_order_exchange
queue: mayikt_order_queue
routingkey: mayikt.order
2MQConfig配置
package com.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @description:
* @author: huangan
* @date: 2024/11/23 20:48
*/
@Component
public class DeadLetterMQConfig {
/**
* 订单交换机
*/
@Value("${mayikt.order.exchange}")
private String orderExchange;
/**
* 订单队列
*/
@Value("${mayikt.order.queue}")
private String orderQueue;
/**
* 订单路由key
*/
@Value("${mayikt.order.routingkey}")
private String orderRoutingkey;
/**
* 死信交换机
*/
@Value("${mayikt.dlx.exchange}")
private String dlxExchange;
/**
* 死信队列
*/
@Value("${mayikt.dlx.queue}")
private String dlxQueue;
/**
* 死信路由
*/
@Value("${mayikt.dlx.routingkey}")
private String dlxRoutingkey;
/**
* 声明死信交换机
* @return
*/
@Bean
public DirectExchange dlxExchange(){
return new DirectExchange(dlxExchange);
}
/**
* 声明死信队列
* @return
*/
@Bean
public Queue dlxQueue(){
return new Queue(dlxQueue);
}
/**
* 声明订单业务交换机
* @return
*/
@Bean
public DirectExchange orderExchange(){
return new DirectExchange(orderExchange);
}
/**
* 声明订单队列
* @return
*/
@Bean
public Queue orderQueue(){
//订单队列要绑定死信交换机
Map<String,Object> arguments = new HashMap<>(2);
arguments.put("x-dead-letter-exchange",dlxExchange);
arguments.put("x-dead-letter-routing-key",dlxRoutingkey);
return new Queue(orderQueue,true,false,false,arguments);
}
/**
* 绑定死信队列到死信交换机
* @return
*/
@Bean
public Binding binding(){
return BindingBuilder.bind(dlxQueue())
.to(dlxExchange()).with(dlxRoutingkey);
}
/**
* 绑定订单队列到订单交换机
* @return
*/
@Bean
public Binding orderBinding(){
return BindingBuilder.bind(orderQueue())
.to(orderExchange())
.with(orderRoutingkey);
}
}
3.生产者OrderProducer
package com.producer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @description:
* @author: huangan
* @date: 2024/11/23 21:58
*/
@RestController
public class OrderProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 订单交换机
*/
@Value("${mayikt.order.exchange}")
private String orderExchange;
/**
* 订单路由key
*/
@Value("${mayikt.order.routingkey}")
private String orderRoutingkey;
@RequestMapping("/sendOrder")
public String sendOrder(){
String msg ="测试数据";
rabbitTemplate.convertAndSend(orderExchange,orderRoutingkey,msg,message -> {
//设置消息过期时间10秒过期
message.getMessageProperties().setExpiration("10000");
return message;
});
return "success";
}
}
4.订单消费者
package com.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @description: 订单队列
* @author: huangan
* @date: 2024/11/23 22:13
*/
@Component
@Slf4j
public class OrderConsumer {
/**
* 监听队列回调方法
* @param msg
*/
@RabbitListener(queues = "mayikt_order_queue")
public void orderConsumer(String msg){
log.info(">>正常订单消费者消费MSG:{}<<",msg);
}
}
5.死信消费者
package com.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @description: 死信队列
* @author: huangan
* @date: 2024/11/23 22:17
*/
@Slf4j
@Component
public class OrderDlxConsumer {
/**
* 监听队列回调方法
* @param msg
*/
@RabbitListener(queues = "mayikt_order_dlx_queue")
public void orderConsumer(String msg){
log.info(">>死信队列消费订单消费者消费MSG:{}<<",msg);
}
}