当前位置: 首页 > article >正文

RabbitMQ的高级特性介绍(二)

发送方确认

当消息的⽣产者将消息发送出去之后,消息到底有没有正确地到达服务器呢? 如果在消息到
达服务器之前已经丢失(比如RabbitMQ重启, 那么RabbitMQ重启期间⽣产者消息投递失败), 持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?
RabbitMQ为我们提供了两种解决⽅案:
                a. 通过事务机制实现
                b. 通过发送⽅确认(publisher confirm) 机制实现

事务机制⽐较消耗性能, 在实际⼯作中使⽤也不多,咱们主要介绍confirm机制来实现发送⽅的确认。

RabbitMQ为我们提供了两个⽅式来控制消息的可靠性投递
                1. confirm确认模式
                2. return退回模式

confirm模式

Producer 在发送消息的时候, 对发送端设置⼀个ConfirmCallback的监听,无论消息是否到达
Exchange, 这个监听都会被执行,如果Exchange成功收到, ACK( Acknowledge character , 确认
字符)为true, 如果没收到消息, ACK就为false。

需要在controller中设置回调方法

@RequestMapping("/confirm")
    public String confirm(){
        //设置回调方法
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("执行了confirm方法");
                if(ack){
                    System.out.printf("接收到消息,消息id:%s", correlationData==null ? null : correlationData.getId());
                }else{
                    System.out.printf("未接受到消息,消息id:%s,cause: %s\n",correlationData== null ? null: correlationData.getId(),cause);
                    //相应的业务逻辑处理
                }
            }
        });
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","confirm test",correlationData);
        return "消息发送成功";
    }

return退回模式

消息到达Exchange之后, 会根据路由规则匹配, 把消息放⼊Queue中. Exchange到Queue的过程, 如果⼀条消息⽆法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等), 可以选择把消息退回给发送者。消息退回给发送者时,我们可以设置⼀个返回回调方法, 对消息进行处理。

@RequestMapping("returns")
    public String returns(){
        CorrelationData correlationData = new CorrelationData("5");
        confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","returns test",correlationData);
        return "消息发送成功";
    }

常见面试题:如何保证RabbitMQ的可靠传输?

从这个图中, 可以看出, 消息可能丢失的场景以及解决⽅案:
1. ⽣产者将消息发送到 RabbitMQ失败
                a. 可能原因: 网络问题等
                b. 解决办法: 参考本章节[发送⽅确认-confirm确认模式]
2. 消息在交换机中无法路由到指定队列:
                a. 可能原因: 代码或者配置层⾯错误, 导致消息路由失败
                b. 解决办法: 参考本章节[发送⽅确认-return模式]

3. 消息队列⾃⾝数据丢失
                a. 可能原因: 消息到达RabbitMQ之后, RabbitMQ Server 宕机导致消息丢失.
                b. 解决办法: 参考创作中心-CSDN[持久性]。开启 RabbitMQ持久化, 就是消息写⼊之后会持久化到磁盘, 如果RabbitMQ 挂了, 恢复之后会自动读取之前存储的数据. (极端情况下, RabbitMQ还未持久化就挂了, 可能导致少量数据丢失, 这个概率极低, 也可以通过集群的方式提⾼可靠性)
4. 消费者异常, 导致消息丢失
                a. 可能原因: 消息到达消费者, 还没来得及消费, 消费者宕机. 消费者逻辑有问题.
                b. 解决办法: 参考本章节创作中心-CSDN[消息确认]. RabbitMQ 提供了 消费者应答机制 来使 RabbitMQ 能够感知到消费者是否消费成功消息. 默认情况下消费者应答机制是⾃动应答的, 可以开启⼿动确认当消费者确认消费成功后才会删除消息, 从⽽避免消息丢失. 除此之外, 也可以配置重试机制(参考下⼀章节),当消息消费异常时, 通过消息重试确保消息的可靠性。

重试机制

在消息传递过程中, 可能会遇到各种问题, 如⽹络故障, 服务不可用, 资源不足等, 这些问题可能导致消息处理失败. 为了解决这些问题, RabbitMQ 提供了重试机制, 允许消息在处理失败后重新发送。

自动确认

发送消息:

@RequestMapping("/retry")
    public String retry(){
        rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE,"retry","retry test");
        return "消息发送成功";
    }

消费消息:

@Component
public class RetryQueueListener {
//指定监听队列的名称
@RabbitListener(queues = Constant.RETRY_QUEUE)
public void ListenerQueue(Message message) throws Exception {
    System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
    String(message.getBody(),"UTF-8"),
    message.getMessageProperties().getDeliveryTag());
    //模拟处理失败
    int num = 3/0;
    System.out.println("处理完成");
}
}

手动确认

改为手动确认

@RabbitListener(queues = Constant.RETRY_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
    System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
    String(message.getBody(),"UTF-8"),
    message.getMessageProperties().getDeliveryTag());
    //模拟处理失败
    int num = 3/0;
    System.out.println("处理完成");
//3. ⼿动签收
    channel.basicAck(deliveryTag, true);
        }catch (Exception e){
//4. 异常了就拒绝签收
    Thread.sleep(1000);
//第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false, 则直接
    丢弃
    channel.basicNack(deliveryTag, true,true);
}
}

手动确认模式下, 消费者需要显式地对消息进⾏确认. 如果消费者在处理消息时遇到异常, 可以选择不确认消息使消息可以重新入队. 重试的控制权在于应⽤程序本⾝,而不是RabbitMQ的内部机制. 应⽤程序可以通过⾃⼰的逻辑和利⽤RabbitMQ的⾼级特性来实现有效的重试策略。

TTL

TTL(Time to Live, 过期时间),即过期时间。当消息到达存活时间之后,还没有被消费, 就会被自动清除。

有两种TTL:消息的TTL队列的TTL

假如队列的TTL是20s,消息的TTL是10s,那么消息的TTL取小值,也就是10s。

设置消息的TTL

发送消息:

@RequestMapping("/ttl")
    public String ttl(){
        System.out.println("ttl...");

        rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","ttl test",message ->{
            message.getMessageProperties().setExpiration("10000");  //单位是ms,设置过期时间为10s
            return message;
        });
        return "消息发送成功";
    }

设置队列的TTL

设置队列TTL的⽅法是在创建队列时, 加⼊ x-message-ttl 参数实现的,单位是ms。

//设置TTL
    @Bean("ttlQueue2")
    public Queue ttlQueue2(){
        return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20000).build();  //设置队列的ttl为20s
    }
@Bean("ttlBinding2")
    public Binding ttlBinding2(@Qualifier("ttlQueue2") Queue queue,@Qualifier("ttlExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();
    }

两者区别

设置队列TTL属性的⽅法, ⼀旦消息过期, 就会从队列中删除。
设置消息TTL的⽅法, 即使消息过期, 也不会马上从队列中删除, 而是在即将投递到消费者之前进⾏判定的。

为什么这两种⽅法处理的⽅式不⼀样?
因为设置队列过期时间, 队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期的消息即可。
而设置消息TTL的⽅式,每条消息的过期时间不同,如果要删除所有过期消息需要扫描整个队列, 所以不如等到此消息即将被消费时再判定是否过期, 如果过期再进⾏删除即可。

以上,关于RabbitMQ的高级特性,希望对你有所帮助。


http://www.kler.cn/a/600139.html

相关文章:

  • Transformer 通关秘籍2:利用 BERT 将文本 token 化
  • 基于微信小程序的短文写作竞赛管理系统
  • Windows桌面采集技术
  • 【Matlab】串口通信(serialport对象,读写、回调、删除等)
  • Java-腾讯云短信模板兼容阿里云短信模板-短信模板参数生成
  • 【JavaWeb学习Day27】
  • Windows下编译安装Qt5.15.0指南
  • 23种设计模式中的策略模式
  • Xshell、Xsftp、Xmanager中文版安装包及使用教程
  • Redis Sentinel(哨兵模式)高可用性解决方案
  • hackmyvm-Icecream
  • CSS圣杯布局与双飞翼布局
  • Redisson分布式锁(超时释放及锁续期)
  • AI进化论:从图灵测试到智能革命的临界点
  • Python学习第二十三天
  • Flink CEP:复杂事件处理详解
  • 架构思维:从代码实现到系统思维的进阶之路
  • 基于ConcurrentHashMap+Redisson的轻量级分布式锁架构设计与工业级实现
  • [特殊字符] C++ 常见 Socket 错误与优化指南
  • gdb/cgdb:调试器