当前位置: 首页 > article >正文

RabbitMQ 发送给延迟交换机的消息调用returnedMessage 方法及returnedMessage() 方法的作用

1:returnedMessage() 方法的作用

交换机返回消息的方法-消息未送达队列触发回调
(1)常用于交换机无法路由回退消息。
(2)如果交换机绑定了备用交换机则是路由到备用交换机,此方法不回调。
(3)如果是发送到延迟交换机则回调此方法,所以如果使用延迟交换机则要对延迟交换机回调的消息过滤。

2:演示错误的Routingkey导致无法路由的现象,触发回调

1、声明交换机和队列并绑定

/**
 * 不可路由的交换机和队列配置
 *
 * @Author darren
 * @Date 2023/3/23 20:02
 */
@Configuration
@Slf4j
public class NotRoutableConfig {

    @Bean("notRoutableExchange")
    public DirectExchange getNotRoutableExchange(){
        return ExchangeBuilder.directExchange(ExchangeUtil.NOT_ROUTABLE_EXCHANGE_NAME).build();
    }

    @Bean("notRoutableQueue")
    public Queue getNotRoutableQueue() {
        return QueueBuilder.durable(QueueUtil.NOT_ROUTABLE_QUEUE_NAME).build();
    }

    @Bean
    public Binding getBinding(
            @Qualifier("notRoutableQueue") Queue notRoutableQueue,
            @Qualifier("notRoutableExchange") DirectExchange notRoutableExchange){
        return BindingBuilder.bind(notRoutableQueue).to(notRoutableExchange).with(RoutingKeyUtil.NOT_ROUTABLE_RIGHT_ROUTING_KEY);
    }
}

2、CallBack实现类


/**
 * 发布确认-消息回调类
 *
 * @Author darren
 * @Date 2023/3/21 22:38
 */
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //依赖注入 rabbitTemplate 之后再设置它的回调对象
    @PostConstruct
    public void init(){
        // 确认回调
        rabbitTemplate.setConfirmCallback(this);
        // 返回回调
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * 交换机不管是否收到消息的一个回调方法
     *
     * @param correlationData: 消息相关数据
     * @param ack: 交换机是否收到消息
     * @param cause
     */
    @Override
    public void confirm(final CorrelationData correlationData, final boolean ack, final String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if(ack){
            log.info("交换机确认回调方法已经收到id为:{} 的消息",id);
        }else{
            log.info("交换机确认回调方法还未收到id为:{} 消息, 由于原因:{}", id, cause);
        }
    }

    /**
     * 交换机返回消息的方法-消息未送达队列触发回调
     * 常用于交换机无法路由回退消息
     * 如果交换机绑定了备用交换机则是路由到备用交换机,此方法不回调。
     *
     * @param message the returned message.
     * @param replyCode the 回复 code.
     * @param replyText the 回复 text.
     * @param exchange the exchange.
     * @param routingKey the routing key.
     */
    @Override
    public void returnedMessage(final Message message, final int replyCode, final String replyText,
            final String exchange,
            final String routingKey) {
        // 排除调延迟交换机,因为消息在延迟交换机中延迟,并未送达到队列则出发了此函数回调
        if (!ExchangeUtil.DELAYED_EXCHANGE_NAME.equals(exchange)) {
            log.info("交换机返回消息的方法收到的消息:{} 交换机回复的内容:{}, 交换机是:{}, 路由 key:{}",
                    new String(message.getBody()),replyText, exchange, routingKey);
        }
    }
}

3、生产者


/**
 * 不可路由交换机-生产者
 * @Author darren
 * @Date 2023/3/23 20:16
 */
@RestController
@Slf4j
@RequestMapping("/notRoutableExchange")
public class notRoutableExchangeController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendMessage/{message}")
    public String sendNotRoutableExchangeMsg(@PathVariable("message") String message) {
        rabbitTemplate.convertAndSend(
                ExchangeUtil.NOT_ROUTABLE_EXCHANGE_NAME,
                RoutingKeyUtil.NOT_ROUTABLE_ERROR_ROUTING_KEY,
                message,
                CorrelationDataUtil.getCorrelationData());
        return "发送到不可路由交换机的消息成功";
    }
}

4、结果

http://localhost:8888/notRoutableExchange/sendMessage/heheh
UUID为:500ba6ed-845c-4ac3-80b2-343f6906a69b
交换机返回消息的方法收到的消息:heheh 交换机回复的内容:NO_ROUTE, 交换机是:not.routable.exchange, 路由 key:not.routable.error.routing.key
交换机确认回调方法已经收到id为:500ba6ed-845c-4ac3-80b2-343f6906a69b 的消息

3:如果交换机绑定了备用交换机则是路由到备用交换机,此方法不回调。

1、声明发布确认交换机和队列并绑定备份交换机

/**
 * 发布确认-配置交换机和队列
 * @Author darren
 * @Date 2023/3/21 22:02
 */
@Configuration
public class ConfirmConfig {

    /**
     * 发布确认交换机并绑定备用交换机
     * @return
     */
    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return ExchangeBuilder.directExchange(ExchangeUtil.CONFIRM_EXCHANGE_NAME)
                .durable(true).withArgument("alternate-exchange", ExchangeUtil.BACKUP_EXCHANGE_NAME).build();
    }

    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(QueueUtil.CONFIRM_QUEUE_NAME).build();
    }

    @Bean
    public Binding confirmQueueBindingConfirmExchange(
            @Qualifier("confirmExchange") DirectExchange confirmExchange,
            @Qualifier("confirmQueue") Queue confirmQueue) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(RoutingKeyUtil.CONFIRM_ROUTING_KEY);
    }
}

2、声明备份交换机和队列并绑定

/**
 * 备份交换机及队列
 * @Author darren
 * @Date 2023/3/22 20:10
 */
@Configuration
@Slf4j
public class backupConfig {

    @Bean("backupExchange")
    public FanoutExchange getBackupExchange(){
        return ExchangeBuilder.fanoutExchange(ExchangeUtil.BACKUP_EXCHANGE_NAME).build();
    }

    @Bean("backupQueue")
    public Queue getBackupQueue(){
        return QueueBuilder.durable(QueueUtil.BACKUP_QUEUE_NAME).build();
    }

    @Bean("warningQueue")
    public Queue getWarningQueue(){
        return QueueBuilder.durable(QueueUtil.WARNING_QUEUE_NAME).build();
    }

    @Bean
    public Binding backupQueueBindingBackupExchange(
            @Qualifier("backupQueue") Queue backupQueue,
            @Qualifier("backupExchange") FanoutExchange backupExchange) {
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    }

    @Bean
    public Binding warningQueueBindingBackupExchange(
            @Qualifier("warningQueue") Queue warningQueue,
            @Qualifier("backupExchange") FanoutExchange backupExchange) {
        return BindingBuilder.bind(warningQueue).to(backupExchange);
    }
}

3、消费者

/**
 * 发布确认模式队列消费者
 *
 * @Author darren
 * @Date 2023/3/21 22:49
 */
@Component
@Slf4j
public class ConfirmQueueConsumer {

    @RabbitListener(queues = QueueUtil.CONFIRM_QUEUE_NAME)
    public void receiveConfirmQueue(Message message) {
        String msg = new String(message.getBody());
        log.info("消费者收到队列:{} 的消息:{}", QueueUtil.CONFIRM_QUEUE_NAME, msg);
    }

    @RabbitListener(queues = QueueUtil.BACKUP_QUEUE_NAME)
    public void receiveWarningQueueMessage(Message message) {
        String msg = new String(message.getBody());
        log.info("消费者收到队列:{} 的消息:{}", QueueUtil.BACKUP_QUEUE_NAME, msg);
    }
}

4、生产者


/**
 * 发布确认模式-生产者
 * @Author darren
 * @Date 2023/3/21 22:12
 */
@RestController
@Slf4j
@RequestMapping("/confirmExchange")
public class ConfirmExchangeController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发布消息
     *
     * 发到交换机正确的路由可以消费成功
     * 发到交换机错误的路由无法消费,但交换机可以收到消息,因无法路由所以到达不了队列
     * @param message
     * @return
     */
    @GetMapping("/sendMessage/{message}")
    public String sendMessage(@PathVariable String message) {

        log.info("发送消息:{} 到交换机:{} ",message, ExchangeUtil.CONFIRM_EXCHANGE_NAME);
        rabbitTemplate.convertAndSend(
                ExchangeUtil.CONFIRM_EXCHANGE_NAME,
                RoutingKeyUtil.CONFIRM_ROUTING_KEY,
                message,
                CorrelationDataUtil.getCorrelationData());

        rabbitTemplate.convertAndSend(
                ExchangeUtil.CONFIRM_EXCHANGE_NAME,
                RoutingKeyUtil.CONFIRM_ERROR_ROUTING_KEY,
                message,
                CorrelationDataUtil.getCorrelationData());
        return "发布确认模式发送消息成功";
    }

}

5、结果发现 returnedMessage()没有回调

发送消息:heheh 到交换机:confirm.exchange 
UUID为:dec7a8b8-eadb-43ba-b6e3-fe1d94f28bce
UUID为:83611012-9de2-4d52-8e6d-03baf0bc1e44
交换机确认回调方法已经收到id为:dec7a8b8-eadb-43ba-b6e3-fe1d94f28bce 的消息
交换机确认回调方法已经收到id为:83611012-9de2-4d52-8e6d-03baf0bc1e44 的消息
消费者收到队列:warning.queue 的消息:heheh
消费者收到队列:confirm.queue 的消息:heheh
消费者收到队列:backup.queue 的消息:heheh

4: 如果是发送到延迟交换机则回调此方法

 

1、声明延迟交换机和队列并绑定


/**
 * 3-延时消息插件
 *
 * 原理:
 *  Exchange类型 x-delayed-message ,该类型消息支持延迟投递机制。
 *  接收到消息后并不会立即将消息投递至目标队列,而是存储在mnesia table(一个分布式数据库)中,
 *  然后检测消息延迟时间,如果达到可投递时间( 过期时间 )后,将其通过 x-delayed-type
 *  类型标记的交换机投递到目标队列中。
 * @Author darren
 * @Date 2023/3/21 20:56
 */
@Configuration
public class DelayedExchangeConfig {

    /**
     * 自定义交换机-延迟交换机
     * @return
     */
    @Bean("delayedExchange")
    public CustomExchange delayedExchange(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(
                ExchangeUtil.DELAYED_EXCHANGE_NAME,
                "x-delayed-message", true, false, args);
    }

    /**
     * 普通队列
     *
     * @return
     */
    @Bean("delayedQueue")
    public Queue delayedQueue(){
        return QueueBuilder.durable(QueueUtil.DELAYED_QUEUE_NAME).build();
    }

    /**
     * 队列绑定延时交换机
     * @param delayedQueue
     * @param delayedExchange
     * @return
     */
    @Bean
    public Binding delayedQueueBindingDelayedExchange(
            @Qualifier("delayedQueue") Queue delayedQueue,
            @Qualifier("delayedExchange") CustomExchange delayedExchange ){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange)
                .with(RoutingKeyUtil.DELAYED_ROUTING_KEY).noargs();
    }
}

2、消费者


/**
 * 延迟交换机队列消费者
 *
 * @Author darren
 * @Date 2023/3/23 16:25
 */
@Component
@Slf4j
public class DelayedQueueConsumer {
    @RabbitListener(queues = QueueUtil.DELAYED_QUEUE_NAME)
    public void receiveDelayedQueue(Message message) {
        String msg = new String(message.getBody());
        log.info("消费者收到队列:{} 的消息:{}", QueueUtil.DELAYED_QUEUE_NAME, msg);
    }
}

3、生产者

/**
 * 延时交换机-生产者
 *
 * @Author darren
 * @Date 2023/3/21 17:20
 */
@RestController
@Slf4j
@RequestMapping("/delayedExchange")
public class DelayedExchangeController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送到延时交换机
     * 消息在交换机中具有按时间排序的功能
     * @return
     */
    @GetMapping("/sendMessage/{message}/{delayTime}")
    public String sendMessage(@PathVariable String message,
            @PathVariable Integer delayTime) {
        rabbitTemplate.convertAndSend(
                ExchangeUtil.DELAYED_EXCHANGE_NAME,
                RoutingKeyUtil.DELAYED_ROUTING_KEY,
                message,
                messagePostProcessor -> {
                    messagePostProcessor.getMessageProperties().setDelay(delayTime);
                    return messagePostProcessor;
                },
                CorrelationDataUtil.getCorrelationData());
        log.info("发送一条延迟:{} 毫秒的消息:{} 给交换机:{}",
                delayTime, message, ExchangeUtil.DELAYED_EXCHANGE_NAME);
        return "发送延时交换机的消息成功";
    }
}

4、returnedMessage()方法

    /**
     * 交换机返回消息的方法-消息未送达队列触发回调
     * 常用于交换机无法路由回退消息
     * 如果交换机绑定了备用交换机则是路由到备用交换机,此方法不回调。
     *
     * @param message the returned message.
     * @param replyCode the 回复 code.
     * @param replyText the 回复 text.
     * @param exchange the exchange.
     * @param routingKey the routing key.
     */
    @Override
    public void returnedMessage(final Message message, final int replyCode, final String replyText,
            final String exchange,
            final String routingKey) {
        // 排除调延迟交换机,因为消息在延迟交换机中延迟,并未送达到队列则出发了此函数回调
        //if (!ExchangeUtil.DELAYED_EXCHANGE_NAME.equals(exchange)) {
            log.info("交换机返回消息的方法收到的消息:{} 交换机回复的内容:{}, 交换机是:{}, 路由 key:{}",
                    new String(message.getBody()),replyText, exchange, routingKey);
        //}
    }

5、结果,发现延迟交换机会回调returnedMessage()方法。所以如果有延迟队列则要排除掉。

UUID为:e8b70810-852b-400c-9fe0-bffdf1a5247d
发送一条延迟:10000 毫秒的消息:haha 给交换机:delayed.exchange
交换机返回消息的方法收到的消息:haha 交换机回复的内容:NO_ROUTE, 交换机是:delayed.exchange, 路由 key:delayed.routing.key
交换机确认回调方法已经收到id为:e8b70810-852b-400c-9fe0-bffdf1a5247d 的消息
消费者收到队列:delayed.queue 的消息:haha


http://www.kler.cn/a/4535.html

相关文章:

  • Unity3D实现WEBGL打开Window文件对话框打开/上传文件
  • vue 文件下载实现
  • 机器学习头歌(第三部分-强化学习)
  • dockerfile2.0
  • 数据挖掘实训:天气数据分析与机器学习模型构建
  • CAPL与CAN总线通信
  • RPA Framework
  • fwdiary(8) 区间dp,树形dp 记忆化搜索
  • 2023年学习系列之读出计划
  • SpringBoot整合Flink(施耐德PLC物联网信息采集)
  • Wing IDE 解决鼠标悬浮
  • 人工智能、深度学习和机器学习有哪些区别?
  • 免费空间主机是什么?怎么申请免费空间主机
  • English Learning - L2 第 9 次小组纠音 辅音 [s] [z] [ʃ] [ʒ] [h] [ʧ] [ʤ] 2023.3.25 周六
  • NDK FFmpeg音视频播放器五
  • 深入学习JavaScript系列(三)——this
  • JWT基础教程
  • 注意力汇聚 笔记
  • IO进程线程-标准IO(结)
  • 探究C/C++ typedef的秘密
  • Mysql排序后分页 分页数据有重复
  • python条件语句与循环语句
  • Unity游戏崩溃日志查询笔记 安卓平台 关于tombstone_00
  • TCC真没这么简单,一文讲透|分布式事务系列(三)
  • 面试官常问的设计模式及常用框架中设计模式的使用(一)
  • 树莓派学习笔记(八)树莓派Linux内核开发准备工作及概念