RabbitMQ 发送给延迟交换机的消息调用returnedMessage 方法及returnedMessage() 方法的作用
1:returnedMessage() 方法的作用
交换机返回消息的方法-消息未送达队列触发回调 (1)常用于交换机无法路由回退消息。 (2)如果交换机绑定了备用交换机则是路由到备用交换机,此方法不回调。 (3)如果是发送到延迟交换机则回调此方法,所以如果使用延迟交换机则要对延迟交换机回调的消息过滤。
2:演示错误的Routingkey导致无法路由的现象,触发回调
1、声明交换机和队列并绑定
/**
* 不可路由的交换机和队列配置
*
* @Author darren
* @Date 2023/3/23 20:02
*/
@Configuration
@Slf4j
public class NotRoutableConfig {
@Bean("notRoutableExchange")
public DirectExchange getNotRoutableExchange(){
return ExchangeBuilder.directExchange(ExchangeUtil.NOT_ROUTABLE_EXCHANGE_NAME).build();
}
@Bean("notRoutableQueue")
public Queue getNotRoutableQueue() {
return QueueBuilder.durable(QueueUtil.NOT_ROUTABLE_QUEUE_NAME).build();
}
@Bean
public Binding getBinding(
@Qualifier("notRoutableQueue") Queue notRoutableQueue,
@Qualifier("notRoutableExchange") DirectExchange notRoutableExchange){
return BindingBuilder.bind(notRoutableQueue).to(notRoutableExchange).with(RoutingKeyUtil.NOT_ROUTABLE_RIGHT_ROUTING_KEY);
}
}
2、CallBack实现类
/**
* 发布确认-消息回调类
*
* @Author darren
* @Date 2023/3/21 22:38
*/
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
//依赖注入 rabbitTemplate 之后再设置它的回调对象
@PostConstruct
public void init(){
// 确认回调
rabbitTemplate.setConfirmCallback(this);
// 返回回调
rabbitTemplate.setReturnCallback(this);
}
/**
* 交换机不管是否收到消息的一个回调方法
*
* @param correlationData: 消息相关数据
* @param ack: 交换机是否收到消息
* @param cause
*/
@Override
public void confirm(final CorrelationData correlationData, final boolean ack, final String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if(ack){
log.info("交换机确认回调方法已经收到id为:{} 的消息",id);
}else{
log.info("交换机确认回调方法还未收到id为:{} 消息, 由于原因:{}", id, cause);
}
}
/**
* 交换机返回消息的方法-消息未送达队列触发回调
* 常用于交换机无法路由回退消息
* 如果交换机绑定了备用交换机则是路由到备用交换机,此方法不回调。
*
* @param message the returned message.
* @param replyCode the 回复 code.
* @param replyText the 回复 text.
* @param exchange the exchange.
* @param routingKey the routing key.
*/
@Override
public void returnedMessage(final Message message, final int replyCode, final String replyText,
final String exchange,
final String routingKey) {
// 排除调延迟交换机,因为消息在延迟交换机中延迟,并未送达到队列则出发了此函数回调
if (!ExchangeUtil.DELAYED_EXCHANGE_NAME.equals(exchange)) {
log.info("交换机返回消息的方法收到的消息:{} 交换机回复的内容:{}, 交换机是:{}, 路由 key:{}",
new String(message.getBody()),replyText, exchange, routingKey);
}
}
}
3、生产者
/**
* 不可路由交换机-生产者
* @Author darren
* @Date 2023/3/23 20:16
*/
@RestController
@Slf4j
@RequestMapping("/notRoutableExchange")
public class notRoutableExchangeController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendMessage/{message}")
public String sendNotRoutableExchangeMsg(@PathVariable("message") String message) {
rabbitTemplate.convertAndSend(
ExchangeUtil.NOT_ROUTABLE_EXCHANGE_NAME,
RoutingKeyUtil.NOT_ROUTABLE_ERROR_ROUTING_KEY,
message,
CorrelationDataUtil.getCorrelationData());
return "发送到不可路由交换机的消息成功";
}
}
4、结果
http://localhost:8888/notRoutableExchange/sendMessage/heheh
UUID为:500ba6ed-845c-4ac3-80b2-343f6906a69b
交换机返回消息的方法收到的消息:heheh 交换机回复的内容:NO_ROUTE, 交换机是:not.routable.exchange, 路由 key:not.routable.error.routing.key
交换机确认回调方法已经收到id为:500ba6ed-845c-4ac3-80b2-343f6906a69b 的消息
3:如果交换机绑定了备用交换机则是路由到备用交换机,此方法不回调。
1、声明发布确认交换机和队列并绑定备份交换机
/**
* 发布确认-配置交换机和队列
* @Author darren
* @Date 2023/3/21 22:02
*/
@Configuration
public class ConfirmConfig {
/**
* 发布确认交换机并绑定备用交换机
* @return
*/
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return ExchangeBuilder.directExchange(ExchangeUtil.CONFIRM_EXCHANGE_NAME)
.durable(true).withArgument("alternate-exchange", ExchangeUtil.BACKUP_EXCHANGE_NAME).build();
}
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(QueueUtil.CONFIRM_QUEUE_NAME).build();
}
@Bean
public Binding confirmQueueBindingConfirmExchange(
@Qualifier("confirmExchange") DirectExchange confirmExchange,
@Qualifier("confirmQueue") Queue confirmQueue) {
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(RoutingKeyUtil.CONFIRM_ROUTING_KEY);
}
}
2、声明备份交换机和队列并绑定
/**
* 备份交换机及队列
* @Author darren
* @Date 2023/3/22 20:10
*/
@Configuration
@Slf4j
public class backupConfig {
@Bean("backupExchange")
public FanoutExchange getBackupExchange(){
return ExchangeBuilder.fanoutExchange(ExchangeUtil.BACKUP_EXCHANGE_NAME).build();
}
@Bean("backupQueue")
public Queue getBackupQueue(){
return QueueBuilder.durable(QueueUtil.BACKUP_QUEUE_NAME).build();
}
@Bean("warningQueue")
public Queue getWarningQueue(){
return QueueBuilder.durable(QueueUtil.WARNING_QUEUE_NAME).build();
}
@Bean
public Binding backupQueueBindingBackupExchange(
@Qualifier("backupQueue") Queue backupQueue,
@Qualifier("backupExchange") FanoutExchange backupExchange) {
return BindingBuilder.bind(backupQueue).to(backupExchange);
}
@Bean
public Binding warningQueueBindingBackupExchange(
@Qualifier("warningQueue") Queue warningQueue,
@Qualifier("backupExchange") FanoutExchange backupExchange) {
return BindingBuilder.bind(warningQueue).to(backupExchange);
}
}
3、消费者
/**
* 发布确认模式队列消费者
*
* @Author darren
* @Date 2023/3/21 22:49
*/
@Component
@Slf4j
public class ConfirmQueueConsumer {
@RabbitListener(queues = QueueUtil.CONFIRM_QUEUE_NAME)
public void receiveConfirmQueue(Message message) {
String msg = new String(message.getBody());
log.info("消费者收到队列:{} 的消息:{}", QueueUtil.CONFIRM_QUEUE_NAME, msg);
}
@RabbitListener(queues = QueueUtil.BACKUP_QUEUE_NAME)
public void receiveWarningQueueMessage(Message message) {
String msg = new String(message.getBody());
log.info("消费者收到队列:{} 的消息:{}", QueueUtil.BACKUP_QUEUE_NAME, msg);
}
}
4、生产者
/**
* 发布确认模式-生产者
* @Author darren
* @Date 2023/3/21 22:12
*/
@RestController
@Slf4j
@RequestMapping("/confirmExchange")
public class ConfirmExchangeController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发布消息
*
* 发到交换机正确的路由可以消费成功
* 发到交换机错误的路由无法消费,但交换机可以收到消息,因无法路由所以到达不了队列
* @param message
* @return
*/
@GetMapping("/sendMessage/{message}")
public String sendMessage(@PathVariable String message) {
log.info("发送消息:{} 到交换机:{} ",message, ExchangeUtil.CONFIRM_EXCHANGE_NAME);
rabbitTemplate.convertAndSend(
ExchangeUtil.CONFIRM_EXCHANGE_NAME,
RoutingKeyUtil.CONFIRM_ROUTING_KEY,
message,
CorrelationDataUtil.getCorrelationData());
rabbitTemplate.convertAndSend(
ExchangeUtil.CONFIRM_EXCHANGE_NAME,
RoutingKeyUtil.CONFIRM_ERROR_ROUTING_KEY,
message,
CorrelationDataUtil.getCorrelationData());
return "发布确认模式发送消息成功";
}
}
5、结果发现 returnedMessage()没有回调
发送消息:heheh 到交换机:confirm.exchange
UUID为:dec7a8b8-eadb-43ba-b6e3-fe1d94f28bce
UUID为:83611012-9de2-4d52-8e6d-03baf0bc1e44
交换机确认回调方法已经收到id为:dec7a8b8-eadb-43ba-b6e3-fe1d94f28bce 的消息
交换机确认回调方法已经收到id为:83611012-9de2-4d52-8e6d-03baf0bc1e44 的消息
消费者收到队列:warning.queue 的消息:heheh
消费者收到队列:confirm.queue 的消息:heheh
消费者收到队列:backup.queue 的消息:heheh
4: 如果是发送到延迟交换机则回调此方法
1、声明延迟交换机和队列并绑定
/**
* 3-延时消息插件
*
* 原理:
* Exchange类型 x-delayed-message ,该类型消息支持延迟投递机制。
* 接收到消息后并不会立即将消息投递至目标队列,而是存储在mnesia table(一个分布式数据库)中,
* 然后检测消息延迟时间,如果达到可投递时间( 过期时间 )后,将其通过 x-delayed-type
* 类型标记的交换机投递到目标队列中。
* @Author darren
* @Date 2023/3/21 20:56
*/
@Configuration
public class DelayedExchangeConfig {
/**
* 自定义交换机-延迟交换机
* @return
*/
@Bean("delayedExchange")
public CustomExchange delayedExchange(){
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(
ExchangeUtil.DELAYED_EXCHANGE_NAME,
"x-delayed-message", true, false, args);
}
/**
* 普通队列
*
* @return
*/
@Bean("delayedQueue")
public Queue delayedQueue(){
return QueueBuilder.durable(QueueUtil.DELAYED_QUEUE_NAME).build();
}
/**
* 队列绑定延时交换机
* @param delayedQueue
* @param delayedExchange
* @return
*/
@Bean
public Binding delayedQueueBindingDelayedExchange(
@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayedExchange") CustomExchange delayedExchange ){
return BindingBuilder.bind(delayedQueue).to(delayedExchange)
.with(RoutingKeyUtil.DELAYED_ROUTING_KEY).noargs();
}
}
2、消费者
/**
* 延迟交换机队列消费者
*
* @Author darren
* @Date 2023/3/23 16:25
*/
@Component
@Slf4j
public class DelayedQueueConsumer {
@RabbitListener(queues = QueueUtil.DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message) {
String msg = new String(message.getBody());
log.info("消费者收到队列:{} 的消息:{}", QueueUtil.DELAYED_QUEUE_NAME, msg);
}
}
3、生产者
/**
* 延时交换机-生产者
*
* @Author darren
* @Date 2023/3/21 17:20
*/
@RestController
@Slf4j
@RequestMapping("/delayedExchange")
public class DelayedExchangeController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送到延时交换机
* 消息在交换机中具有按时间排序的功能
* @return
*/
@GetMapping("/sendMessage/{message}/{delayTime}")
public String sendMessage(@PathVariable String message,
@PathVariable Integer delayTime) {
rabbitTemplate.convertAndSend(
ExchangeUtil.DELAYED_EXCHANGE_NAME,
RoutingKeyUtil.DELAYED_ROUTING_KEY,
message,
messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setDelay(delayTime);
return messagePostProcessor;
},
CorrelationDataUtil.getCorrelationData());
log.info("发送一条延迟:{} 毫秒的消息:{} 给交换机:{}",
delayTime, message, ExchangeUtil.DELAYED_EXCHANGE_NAME);
return "发送延时交换机的消息成功";
}
}
4、returnedMessage()方法
/**
* 交换机返回消息的方法-消息未送达队列触发回调
* 常用于交换机无法路由回退消息
* 如果交换机绑定了备用交换机则是路由到备用交换机,此方法不回调。
*
* @param message the returned message.
* @param replyCode the 回复 code.
* @param replyText the 回复 text.
* @param exchange the exchange.
* @param routingKey the routing key.
*/
@Override
public void returnedMessage(final Message message, final int replyCode, final String replyText,
final String exchange,
final String routingKey) {
// 排除调延迟交换机,因为消息在延迟交换机中延迟,并未送达到队列则出发了此函数回调
//if (!ExchangeUtil.DELAYED_EXCHANGE_NAME.equals(exchange)) {
log.info("交换机返回消息的方法收到的消息:{} 交换机回复的内容:{}, 交换机是:{}, 路由 key:{}",
new String(message.getBody()),replyText, exchange, routingKey);
//}
}
5、结果,发现延迟交换机会回调returnedMessage()方法。所以如果有延迟队列则要排除掉。
UUID为:e8b70810-852b-400c-9fe0-bffdf1a5247d
发送一条延迟:10000 毫秒的消息:haha 给交换机:delayed.exchange
交换机返回消息的方法收到的消息:haha 交换机回复的内容:NO_ROUTE, 交换机是:delayed.exchange, 路由 key:delayed.routing.key
交换机确认回调方法已经收到id为:e8b70810-852b-400c-9fe0-bffdf1a5247d 的消息
消费者收到队列:delayed.queue 的消息:haha