当前位置: 首页 > article >正文

RabbitMQ---消息确认和持久化

(一)消息确认

1.概念

生产者发送消息后,到达消费端会有以下情况:

1.消息处理成功

2.消息处理异常

   如果RabbitMQ把消息发送给消费者后就把消息删除,那么就可能会导致,消息处理异常想要再获取这条消息的时候,造成消息丢失,但如果消息处理成功,不需要这条消息了,就可以进行删除。此时就不会有问题

 所以如何保证消费者成功接收并正确处理?我们RabbitMQ就给我们提供了消息确认机制

 消费者在订阅队列的时候,我们可以指定autoAck参数,根据这个参数,我们可以把确认机制大致分为两种(Spring boot有三种,但是本质也是这两种)

  自动确认:当autoAck为true时,RabbitMQ只要发送了这个消息,就会从内存中删除,不会管消费者是否收到了这些消息,所以自动确认的可靠性不高

  手动确认:当autoAck为false时,RabbitMQ会等待消费者调用Basic.Ack,回复确认信号后,RabbitMQ才会从队列中删除消息,所以手动确认的可靠性是比较高的

当我们设置为手动确认后,队列中的消息就分为了两个部分:

1.等待投递给消费者的消息

2.已经投递给消费者但是没有收到确认信号的消息

也就是Ready和Unacked

如果RabbitMQ一致没有收到消费者的确认信号,并且消息对应的消费者断开连接,RabbitMQ就会安排消息重新入队列,等待投递给下一个消费者

 

 

 

2.Spring Boot的三种策略和代码演示

Springboot给我们提供了三种策略(本质上还是自动确认和手动确认)

1)AcknowledgeMode.NONE

 这种模式下,就是标准的自动确认,消息一旦投递给消费者,不管消费者是否正确处理了消息,RabbitMQ就会自动确认消息,并且从队列中移除,所以消息是很有可能丢失的

 2)AcknowledgeMode.AUTO(默认)

这种模式下,消息成功处理时会自动确认消息,如果消息过程中抛出了异常就不会确认消息

3)AckniwledgeMode.MANUAL

这种模式下,就是标准的手动确认模式,消费者必须在成功处理消息后调用basicAck方法来确认消息,如果消息没有被确认,RabbitMQ就会认为消息没有被成功处理,会重新投递该消息,这种模式会提高消息的可靠性,因为消息不会丢失,而是重新入队

代码演示

配置文件代码

spring:
  rabbitmq:
    addresses: amqp://student:student@62.234.46.219:5672/test
    listener:
      simple:
        acknowledge-mode: NONE

声明交换机和队列代码

@Component
public class Config {
    @Bean("ackExchange")
    public Exchange ackExchange(){
        return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).durable(true).build();
    }
    @Bean("ackQueue")
    public Queue ackQueue(){
        return QueueBuilder.durable(Constants.ACK_QUEUE).build();
    }
    @Bean
    public Binding ackBind(@Qualifier("ackExchange") Exchange ackExchange,@Qualifier("ackQueue") Queue queue){
        return BindingBuilder.bind(queue).to(ackExchange).with("ack").noargs();
    }
}

生产者代码


@RequestMapping("producer")
@RestController
public class AckProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("ack")
    public String ackPro(){
        rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","ack test");
        return "发送成功";
    }
}

然后我们发送一条消息看现象(此时我们还没写消费者代码,所以没有自动确认) 

消费者代码

@Component
public class AckConsumer {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void ListenerQueue(Message message, Channel channel){
        System.out.println("接收到消息: "+ new String(message.getBody())+" TagID: "
                +message.getMessageProperties().getDeliveryTag());
//        int num=3/0;     //模拟失败
        System.out.println("处理完成");
    }
}

此时我们没有出错,我们看能否正确接收消息,已经队列中消息是否存在 

 

我们发现能够准确的接收消息

如果我们此时抛出异常呢? 

 

 

  我们发现抛出了异常后,我们后端并没有接收到消息,但是队列的消息也消失了,所以就会导致我们的消息不见了,这也就是自动确认的弊端 

此时我们把确认策略改成AUTO

我们这里只需要改配置文件即可

截图体现不出来,但是结果是一直在循环,因为我们如果设置确认策略为AUTO,在运行正常时,会自动确认,并且从队列中删除,如果抛出异常,就不会确认消息,造成循环

 

我们再来演示下正确情况 

 

此时我们再来说一下手动确认策略

首先更改配置文件

  其次和我们刚刚演示AUTO和NONE时,代码是没有改动的,但是手动确认,因为要我们手动做一些处理,所以代码也要有一定的改变 

  那我们先来讲一下手动确认方法

手动确认方法:

1.肯定确认

RabbitMQ已经知道该消息并且成功的处理消息,可以丢弃了 

我们来解释下这个方法的参数

deliveryTag:消息的唯一标识,是一个单调递增的64位长整型,每个channel之间是独立的,所以在每个channel上是唯一的,当消费者确认一条消息时,需要用对应的信道上进行确认

multiple:是否批量确认,如果为false,就只确认当前值,如果为true就会确定小于等于的值

2.否定确认

有两个方法

他们两个的区别不大,唯一区别就是是否支持一次性批量拒绝消息

我们来看一些这个requeue这个参数,这个参数表示拒绝后,消息要如何处理,如果为true,RabbitMQ就会把他重新入队,发给下一个消费者,如果为false,RabbitMQ就会把他从队列中删除,不会把他发送给信道消费者

消费者代码

@Component
public class AckConsumer {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void ListenerQueue(Message message, Channel channel) throws IOException {
        long Tag=message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("接收到消息: "+ new String(message.getBody())+" TagID: "
                    +Tag);
            int num=3/0;     //模拟失败
            channel.basicAck(Tag,false);
            System.out.println("处理完成");
        }catch (Exception e){
            channel.basicNack(Tag,false,true);
            channel.basicReject(Tag,true);
        }
    }
}

出现异常时的现象 

因为我们是设置出现异常重新入队给下一个消费者,所以会循环

如果消息成功接收

 

(二)持久化

  我们刚刚做的消息确认和之前说过七大工作模式中的发送确认都是为了保证消息不丢失,能够正确的接收,但是我们如何保证RabbitMQ服务停掉以后,生产者发送过的消息不丢失(存储在RabbitMQ中的消息)?

1.RabbitMQ的持久化

  RabbitMQ的持久化分为三个部分:交换机持久化,队列持久化和消息持久化

1)交换机持久化

  交换机持久化是通过在声明交换机的时候,将durable参数设置为true实现的,相当于将交换机的属性在服务器中存储,M服务关闭后再次重新,RabbitMQ会自动重新建立交换机,此时就相当于一直存在

上面是我们把交换机设置为持久化的代码

其实我们不手动设置,默认交换机也是持久的,我们来看源码 

 

 

我们发现就算不设置,他默认也是true,在我们创建的时候也是持久化的

2)队列持久化

队列持久化也是在声明队列时调用方法来实现的

如果队列不设置持久化,MQ重启时,队列就会被删掉,与此同时消息是存在队列中的,所以队列上的消息也会小时

所以我们要设置消息持久化,那么一定要设置队列持久化否则没用

这是设置为持久化的代码

这是设置为非持久化的代码

那我们还是来看一眼源码

然后我们点进这个QueueBuilder看看

 

我们发现这个就是给名字赋值用的

再来看看durable中的setDurable

我们发现是设置持久化的,因为是boolean类型,默认都是false所以非持久化就不需要做处理了

3)消息持久化

消息持久化

消息实现持久化就需要把消息的投递模式进行更改

设置了队列和消息的持久化,RabbitMQ服务器重启后,消息才会存在,其他情况下,重新MQ都武器消息都会丢失

消息持久化代码

@RequestMapping("producer")
@RestController
public class AckProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("ack")
    public String ackPro(){
        String s1="ack test";
        Message message=new Message(s1.getBytes(StandardCharsets.UTF_8),new MessageProperties());
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack",message);
        return "发送成功";
    }
}

注意我们这里传的不是一个单独字符串了,而是一个消息 

  RabbitMQ会默认将消息视为持久化的,除非队列被声明为非持久化,或者发送消息时被标记为非持久化 

问题:

 注:如果我们将所有消息都设置为持久化会严重影响RabbitMQ的性能,写入磁盘的速度比写入内存的速度慢很多,所以我们在选择消息持久化的时候,要做一个衡量

如果我们将交换机,队列,消息都设置了持久化后,就能保证百分百不丢失数据了吗?

答案是错误的

1.在持久化消息存入RabbitMQ时,还有一段时间,消息是在缓存上的还没有写入硬盘,因为RabbitMQ并不会为每条消息都同步存盘,因为这样会严重影响性能,所以会有缓存,等待数据一起写入硬盘。如果在这是MQ重启了,消息还没有写入硬盘,那这些消息就会丢失

2.消费者设置自动确认,然后消费者接收消息后宕机,此时消息也会丢失,此时我们只需要设置为手动确认即可

第一个问题的解决方案我们下一篇博客在讲

  

 

 

 


http://www.kler.cn/a/504787.html

相关文章:

  • nginx 配置ssl_dhparam好处及缺点
  • Elasticsearch:使用全文搜索在 ES|QL 中进行过滤 - 8.17
  • 网络层协议-----IP协议
  • OpenCV基础:矩阵的创建、检索与赋值
  • ORACLE-表空间和分区控制
  • Entity 的材质(棋盘、条纹、网格)
  • lanqiaoOJ 3333:肖恩的排序 ← 双指针+排序(从大到小)
  • mock服务-通过json定义接口自动实现mock服务
  • Python在WRF模型自动化运行及前后处理中实践技术应用-包括数据处理、模型运行、结果可视化等步骤。
  • 72_List列表原理
  • 计算机组成原理简答题、名词解释整理(考研、期末)
  • Android Perfetto 系列
  • Python 在企业级应用中的两大硬伤
  • 极客说|Azure AI Agent Service 结合 AutoGen/Semantic Kernel 构建多智能体解决⽅案
  • 如何发布自己的第一个Chrome扩展程序
  • 基于微信小程序的社区门诊管理系统php+论文源码调试讲解
  • C++ 类模板教程
  • 分布式ID的实现方案
  • Pacs系统开发之Dcm4chee代码结构分析
  • 搭建 RUST 交叉编译环境
  • 建筑综合布线可视化管理
  • 大模型微调介绍-Prompt-Tuning
  • WPS excel使用宏编辑器合并 Sheet工作表
  • 苍穹外卖(七) 缓存商品、购物车
  • 【React】新建React项目
  • Flume【部署 01】CentOS Linux release 7.5 安装配置 apache-flume-1.9.0 并验证