当前位置: 首页 > article >正文

RocketMQ 底层实现原理

文章目录

  • RocketMQ 底层实现原理
    • RocketMQ 消息发送流程
      • 生产者发送消息
      • Broker 存储消息
      • 消费者拉取消息
    • RocketMQ 内存管理机制
      • PageCache
      • ConsumeQueue
    • RocketMQ 崩溃恢复机制
      • 消息队列偏移量
      • Checkpoint 文件
      • CommitLog 文件校验
    • RocketMQ 操作实践

RocketMQ 底层实现原理

RocketMQ 是一款高性能、可扩展的分布式消息中间件,目前已经成为各大互联网公司的主流解决方案之一。本文将介绍 RocketMQ 的底层实现原理,以及如何使用 JAVA 语言对其进行操作和实践。

RocketMQ 消息发送流程

在 RocketMQ 中,消息的发送过程可以分为三个步骤:

  1. 生产者发送消息到 Broker;
  2. Broker 将消息存储到磁盘,并将消息持久化到 CommitLog 和 IndexFile 中;
  3. 消费者从 Broker 拉取消息并进行消费。

生产者发送消息

生产者在发送消息时,首先需要与 NameServer 进行通信,获取相应的 Broker 列表。在获取到 Broker 地址后,生产者会向其中一个 Broker 发送消息。

RocketMQ 规定每个 Topic 可以有多个队列,生产者在向 Broker 发送消息时,需要将消息按照某种规则分配到指定的队列中。默认情况下,RocketMQ 使用轮询算法将消息平均地发送到所有队列中。

Broker 存储消息

Broker 存储消息时,首先将消息追加到 CommitLog 文件中。CommitLog 文件是 RocketMQ 设计的核心文件,其中记录了所有的消息内容,是消息存储的最终落地点。

在写入 CommitLog 文件时,RocketMQ 采用了一种叫做 MappedFile 的技术。MappedFile 是 Java NIO 中的一个类,可以将一个文件或文件片段映射到内存中,从而可以直接对内存中的数据进行读写操作。

除了 CommitLog 文件之外,RocketMQ 还维护了一个 IndexFile 文件用于快速查询消息。IndexFile 中记录了消息索引的偏移量以及消息的关键字,通过 IndexFile 可以快速定位到指定消息的位置。

消费者拉取消息

消费者在拉取消息时,首先需要向 Broker 请求消息。Broker 收到请求后,会根据消费者的 offset 值返回指定数量的消息。消费者消费完消息后,需要将 offset 提交给 Broker,以便下次拉取时可以继续从该位置开始消费。

RocketMQ 内存管理机制

RocketMQ 中有两个重要的缓存设计:PageCache 和 ConsumeQueue。

PageCache

PageCache 是 RocketMQ 的物理内存缓存,主要用于加速消息的读写操作。RocketMQ 使用内存映射技术将磁盘上的 CommitLog 文件映射到内存中,这样就可以实现快速的消息读写操作。

PageCache 中的内存空间是由 JVM 进程直接申请的,因此需要考虑内存的使用效率和回收效率。默认情况下,RocketMQ 将 PageCache 的大小设为物理内存的 40%。

ConsumeQueue

ConsumeQueue 可以看作是 RocketMQ 的逻辑内存缓存,主要用于消费者快速拉取消息和跟踪消息消费进度。每个 Topic 都有自己的 ConsumeQueue,用来存储该 Topic 的所有消息。

ConsumeQueue 中的每个消息都对应着一个索引项,记录了该消息在 CommitLog 文件中的偏移量和消息长度信息。当消费者向 Broker 请求消息时,Broker 会从对应的 ConsumeQueue 中读取消息索引信息,并根据索引信息去 CommitLog 中查询实际的消息内容。

RocketMQ 崩溃恢复机制

RocketMQ 采用了日志追加的方式进行消息存储。当 Broker 崩溃或重启时,可能会出现数据丢失或消息重复等情况。为了解决这些问题,RocketMQ 实现了多种崩溃恢复机制。

消息队列偏移量

RocketMQ 维护了每个消费者所消费的消息队列偏移量。当消费者重新启动时,可以通过之前保存的偏移量继续消费未消费的消息。

Checkpoint 文件

Checkpoint 文件用于记录 CommitLog 中最后一条消息的偏移量。当 Broker 发生异常情况导致崩溃时,Broker 再次启动时可以从 Checkpoint 文件中读取偏移量,从而定位到最近一次的消息读取位置。

CommitLog 文件校验

RocketMQ 中的 CommitLog 文件是顺序写入的,因此具有很好的一致性和可靠性。Broker 在写入每个消息之前都会计算消息的 CRC 校验码,用于检测文件数据的完整性和正确性。

RocketMQ 操作实践

以下是使用 JAVA 语言在 RocketMQ 中实现生产者和消费者的示例代码。通过该代码,可以实现在本地环境下发送消息和消费消息。

// 生产者示例代码
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class RocketMQProducerExample {
    public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException {
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message message = new Message("test_topic", "Hello RocketMQ".getBytes());
        producer.send(message);

        producer.shutdown();
    }
}

// 消费者示例代码
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class RocketMQConsumerExample {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("test_topic", "*");
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msg : msgs) {
                System.out.println("Receive new message: " + new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
    }
}

上述代码实现了一个简单的生产者和消费者示例,其中,生产者向 test_topic 主题中发送消息,而消费者从 test_topic 主题中消费消息,并打印消息内容。在使用示例代码前,需要先下载 RocketMQ 并启动 NameServer 和 Broker。

综上所述,RocketMQ 是一款高性能、可扩展的分布式消息中间件,采用了多种优秀的技术设计和崩溃恢复机制,为互联网公司的消息服务提供了可靠的支持。当然,除了本文介绍的内容之外,RocketMQ 还有许多其他的优秀特性和功能,需要根据实际情况进行进一步学习和了解。


http://www.kler.cn/news/17192.html

相关文章:

  • 神秘的IP地址8.8.8.8地址到底是什么?为什么会被用作DNS服务器地址呢?
  • GPT详细安装教程-GPT软件国内也能使用
  • JVM学习(九):堆
  • 海思芯片(hi3536av100)启动模式选择
  • Linux centos搭建web服务器
  • 利用Google Colab免费使用GPU服务器详细攻略
  • 自动驾驶中地图匹配定位技术总结
  • Web常见漏洞描述及修复建议
  • 基于YOLOv5的目标检测系统详解(附MATLAB GUI版代码)
  • vue+springboot 实现人脸识别方向
  • windows 下Node.js 版本管理工具
  • Java方法引用:提高代码可读性和可维护性
  • C++和Python编程语言各自的优缺点总结,分享一下我对程序员职业规划的看法
  • 【id:59】【20分】D. 旅馆顾客统计(静态成员)
  • 应用,auto,内联函数
  • 计算机基础 -- 硬件篇
  • 2023年web前端开发之JavaScript进阶(一)
  • 开心档之C++ 指针
  • Flex弹性布局
  • Vue电商项目--axios二次封装
  • 2023-05-06 GPT替代
  • 容器适配器---deque和STL ---stack queue priority_queue的模拟实现 C++
  • 【刷题之路Ⅱ】LeetCode 61. 旋转链表
  • 【转存】SpringBoot 中的自带工具类,快速提升开发效率
  • 基于javaweb的学生就业管理系统
  • 如何固定权重,对某些层得学习率改为0?
  • 教育专题讲座(带答案)
  • 基于标签的协同过滤算法实现与个人兴趣相关的文章推荐
  • Renesas瑞萨A4M2和STM32 CAN通信
  • 程序员如何学好PHP?做好这五个方面就够了