当前位置: 首页 > article >正文

RabbitMQ原理剖析

目录

RabbitMQ原理剖析

RabbitMQ的消息持久化存储在哪里?

存储位置

存储机制

持久化设置

RabbitMQ的消息消费者怎么知道消费到哪了?消费过程是什么样的?消费后的消息会被删除吗?后续还能再次消费吗?

1. 消费者如何知道消费到哪了?

2. 消费过程是什么样的?

3. 消费后的消息会被删除吗?

4. 后续还能再次消费吗?

RabbitMQ消费者在处理消息时需要维护内部状态或偏移量来记录消费进度,这个状态和偏移量存储在哪里?

消息确认机制

内部数据结构

消费者状态管理

总结


RabbitMQ原理剖析

参考文章:

RabbitMQ原理剖析-CSDN博客

看这篇文章由此引发一些其他问题

RabbitMQ的消息持久化存储在哪里?

参考:

RabbitMQ 存储机制_rabbitmq存储机制-CSDN博客

rabbitmqTemplate持久化消息 rabbitmq如何持久化_mob6454cc6aeeaf的技术博客_51CTO博客

RabbitMqPersistence:RabbitMq的持久化_51CTO博客_rabbitmq持久化

RabbitMQ消息队列之持久化机制详解_java_脚本之家

RabbitMQ的消息持久化存储主要在磁盘上,具体存储位置和相关机制如下:

存储位置

RabbitMQ的持久化消息和非持久化消息在必要时都可以被写入到磁盘。默认情况下,RabbitMQ会在$RABBITMQ_HOME/var/lib/mnesia/rabbit@$HOSTNAME/路径下存储消息数据,这个路径下包含queues、msg_store_persistent、msg_store_transient这三个文件夹,分别用于存储对应的信息(不同版本的目录位置可能有所不同)。

存储机制

  1. 持久化消息:当持久化消息到达队列时,它会被立即写入到磁盘中,并且同时也会在内存中保存一份以加快读取速度。这些消息被存储在msg_store_persistent文件夹中。
  2. 非持久化消息:非持久化消息一般只保存在内存中。但是,当内存紧张时,这些消息会被换入到磁盘中,以节省内存空间。这些被换入磁盘的非持久化消息被存储在msg_store_transient文件夹中。需要注意的是,当RabbitMQ服务重启后,这些非持久化消息会丢失。

此外,RabbitMQ的持久化层还包含一个逻辑上的概念——持久层(Persistent Layer),它实际包含两个部分:

  1. 队列索引(rabbit_queue_index):负责维护队列中落盘消息的信息,包括消息的存储地点、是否已被交付给消费者、是否已被消费者ack等。每个队列都有与之对应的一个队列索引。
  2. 消息存储(rabbit_msg_store):以键值对的形式存储消息,它被所有队列共享,在每个节点中有且只有一个。消息(包括消息体、属性和headers)可以直接存储在队列索引中,也可以被保存在消息存储中。

为了优化性能,较小的消息通常存储在队列索引中,而较大的消息则存储在消息存储中。这个消息大小的界定可以通过queue_index_embed_msgs_below来配置,默认大小为4096B。

持久化设置

在RabbitMQ中,要实现消息的持久化存储,需要在发送消息时设置消息的持久化标识,并在定义队列时设置队列的持久化标识。

  1. 消息持久化:在发送消息时,可以通过设置BasicProperties的deliveryMode为2来实现消息的持久化。例如,使用MessageProperties.PERSISTENT_TEXT_PLAIN作为BasicProperties的参数来发送持久化消息。
  2. 队列持久化:在定义队列时,可以通过设置durable参数为true来实现队列的持久化。例如,在声明队列时,将durable参数设置为true。

综上所述,RabbitMQ的消息持久化存储主要依赖于磁盘上的相关文件夹和文件,并通过持久层和特定的存储机制来确保消息的可靠性和持久性。

RabbitMQ的消息消费者怎么知道消费到哪了?消费过程是什么样的?消费后的消息会被删除吗?后续还能再次消费吗?

参考:

RabbitMQ消费端之消息确认及消息预取_rabbitmq 怎么查看已经消费了的消息-CSDN博客

java 消费rabbitmq 数据 rabbitmq如何消费_mob64ca1403c772的技术博客_51CTO博客

rabbitmq消费后删除消息 - 智能助手

Rabbitmq消息重复消费_java rabbitmq发起一条消息怎么一直在消费-CSDN博客

RabbitMQ的消息消费者通过一系列机制来跟踪消费进度,并确保消息的正确处理和删除。以下是关于RabbitMQ消息消费过程的详细解答:

1. 消费者如何知道消费到哪了?

RabbitMQ使用消息确认机制来跟踪消息的消费进度。当消费者从队列中接收到消息并成功处理后,它会向RabbitMQ发送一个确认(ack)消息,表明该消息已经被成功消费。RabbitMQ在收到确认后,会将该消息从队列中删除。

在消费过程中,消费者通常会维护一个内部状态或偏移量,以记录已经消费到的消息位置。这个状态或偏移量通常与RabbitMQ中的消息索引或唯一ID相关联,以确保消费者能够准确地知道自己已经消费了哪些消息。

2. 消费过程是什么样的?

RabbitMQ的消费过程通常包括以下几个步骤:

  • 消费者连接到RabbitMQ服务器,并声明要消费的队列。
  • RabbitMQ将队列中的消息发送给消费者。
  • 消费者接收并处理消息。
  • 消费者向RabbitMQ发送确认(ack)消息,表示消息已被成功处理。
  • RabbitMQ收到确认后,从队列中删除该消息。

3. 消费后的消息会被删除吗?

是的,消费后的消息会被RabbitMQ从队列中删除。这是基于消息确认机制来完成的。只有当消费者成功处理消息并发送确认消息后,RabbitMQ才会将消息从队列中删除。如果消费者在处理消息时遇到错误或异常,它可以选择发送一个拒绝(nack)或拒绝并重新排队(reject)的消息给RabbitMQ,以指示消息处理失败。在这种情况下,RabbitMQ可能会根据配置将消息重新放入队列或发送到死信队列。

4. 后续还能再次消费吗?

  • 已删除的消息:一旦消息被消费者成功消费并确认,RabbitMQ就会从队列中删除该消息。因此,已删除的消息无法再次被消费。
  • 未确认或失败的消息:如果消息在处理过程中未被确认或处理失败,RabbitMQ可能会根据配置将消息重新放入队列或发送到其他队列(如死信队列)。在这种情况下,其他消费者或重新尝试的消费者可以再次消费这些消息。

需要注意的是,为了避免消息重复消费的问题,消费者在处理消息时应该保持幂等性,即多次处理同一消息应该产生相同的结果。这可以通过在数据库中检查消息的唯一ID或状态来实现。

综上所述,RabbitMQ通过消息确认机制来跟踪消息的消费进度,并确保消息的正确处理和删除。消费者在处理消息时需要维护内部状态或偏移量来记录消费进度,并保持幂等性以避免消息重复消费的问题。

RabbitMQ消费者在处理消息时需要维护内部状态或偏移量来记录消费进度,这个状态和偏移量存储在哪里?

参考:

RabbitMQ——消息存储_rabbitmq消息存储位置-CSDN博客

rabbitmq 消息存储_rabbitmq消息存在哪-CSDN博客

rabbitmq数据存储的位置在哪 - 问答 - 亿速云

RabbitMQ实战指南——存储机制总结_rabbitmq 消息是如何存储的-CSDN博客

RabbitMQ消费者在处理消息时,并不需要消费者本身显式地维护内部状态或偏移量来记录消费进度。RabbitMQ的消息确认机制和内部数据结构已经为消费者处理了这些工作。

消息确认机制

当消费者从RabbitMQ接收并处理消息后,它会向RabbitMQ发送一个确认(ack)信号。这个确认信号是RabbitMQ跟踪消息消费进度的关键。一旦RabbitMQ收到确认信号,它就会将相应的消息从队列中移除,从而确保消息不会被重复消费。

内部数据结构

RabbitMQ内部维护了多张表来记录消息的状态和位置。这些表包括但不限于:

  • 消息索引表:记录了消息在文件中的存储位置、消息长度、引用次数等信息。这个表帮助RabbitMQ快速定位消息并处理消费者的请求。
  • 文件描述信息表:记录了存储消息的文件的描述信息,如文件名、有效数据大小、左右关联的文件信息等。这个表帮助RabbitMQ管理消息存储文件,并在必要时进行文件合并和删除。

消费者状态管理

虽然消费者不需要显式地维护内部状态或偏移量,但它们在处理消息时仍然需要一些状态管理来确保消息的正确处理。例如:

  • 消息处理状态:消费者需要跟踪每条消息的处理状态,以确保消息被正确处理。这可以通过在消费者内部维护一个状态机或使用数据库来实现。
  • 重试机制:如果消息处理失败,消费者可能需要实现重试机制来重新处理消息。这可以通过记录失败消息的唯一ID或状态来实现,并在稍后重试时检查这些记录。

总结

RabbitMQ通过其内部的消息确认机制和数据结构来跟踪消息的消费进度。消费者不需要显式地维护内部状态或偏移量来记录消费进度,但它们仍然需要一些状态管理来确保消息的正确处理。这些状态管理可以在消费者内部实现,也可以通过使用数据库或其他持久化存储来实现。


http://www.kler.cn/a/351029.html

相关文章:

  • 11-1.Android 项目结构 - androidTest 包与 test 包(单元测试与仪器化测试)
  • STM32 FreeRTOS移植
  • [Do374]Ansible一键搭建sftp实现用户批量增删
  • 【BLE】CC2541之ADC
  • Hadoop3.x 万字解析,从入门到剖析源码
  • Flutter插件制作、本地/远程依赖及缓存机制深入剖析(原创-附源码)
  • Go4 和对 Go 的贡献
  • Excelize 开源基础库 2.9.0 版本正式发布
  • 基于php的旅游管理系统
  • Hadoop等大数据处理框架的Java API
  • Ansible自动化运维实践:从入门到进阶
  • Java 枚举类
  • 【深度学习】阿里云GPU服务器免费试用3月
  • 【Python】 list dict数据合并汇总demo
  • LinkedList和链表(上)
  • no WeWorkFinanceSdk in java.library.path
  • 嵌入式数据结构中树与查找方法实现
  • Java 8 Stream API:从基础到高级,掌握流处理的艺术
  • RabbitMQ 入门(四)SpringAMQP五种消息类型
  • 在 Windows 环境下,Git 默认会自动处理 CRLF 和 LF 之间的转换。
  • 探索MB15镁合金棒:高强度与轻质性的完美结合
  • 编译Thingsboard3.8.0的过程记录
  • 【人工智能】解释性AI(Explainable AI)——揭开机器学习模型的“黑箱”
  • 架构师备考-背诵精华(架构开发方法)
  • 利用LangGraph和Waii实现你的chat2db!
  • 嵌入式工业显示器在食品生产行业的应用