当前位置: 首页 > article >正文

【RocketMQ】RocketMq之IndexFile深入研究

一:RocketMq 整体文件存储介绍

存储⽂件主要分为三个部分:
  • CommitLog:存储消息的元数据。所有消息都会顺序存⼊到CommitLog⽂件当中。CommitLog由多个⽂件组成,每个⽂件固定⼤⼩1G。以第⼀条消 息的偏移量为⽂件名。
  • ConsumerQueue:存储消息在CommitLog的索引。⼀个MessageQueue⼀个⽂件,记录当前MessageQueue被哪些消费者组消费到了哪⼀条CommitLog。
  • IndexFile:为了消息查询提供了⼀种通过key或时间区间来查询消息的⽅法,这种通过IndexFile来查找消息的⽅法不影响发送与消费消息的主流程。

这篇文章主要介绍IndexFile的研究,以rocketmq5.3.0版本作为研究。

二:IndexFile的文件结构

文件整理格式,如下图2-1所示

                                                图2-1 IndexFile 文件结构图


IndexFile 文件格式

  • 文件名:以时间戳命名(例如 20240301120000000),表示该文件索引的消息的时间范围。

  • 文件大小:默认为 400MB,可通过 maxIndexSize 配置调整。

  • 存储路径:默认在 ~/store/index 目录下。

每个 IndexFile 文件由三部分组成:
1. 文件头部(Header)
2. 哈希槽(Hash Slot)区域
3. 索引条目(Index Entry)区域


1. 文件头部(Header)

字段名

长度(字节)

说明

beginTimestamp

8

索引文件覆盖的最小时间戳(消息存储时间)

endTimestamp

8

索引文件覆盖的最大时间戳(消息存储时间)

beginPhyOffset

8

索引文件对应的最小物理偏移量(CommitLog 中的起始位置)

endPhyOffset

8

索引文件对应的最大物理偏移量(CommitLog 中的结束位置)

hashSlotCount

4

哈希槽数量(固定为 5,000,000)

indexCount

4

当前已写入的索引条目数量


2. 哈希槽(Hash Slot)区域

  • 哈希槽数量:固定为 500 万个(5,000,000),每个哈希槽占 4 字节

  • 哈希函数:对消息的 Key(如 UNIQ_KEYKEYS)进行哈希计算,得到槽位索引:
    slotPos = abs(hash(key)) % 5000000

每个哈希槽存储的是 索引条目区域 的起始位置(索引条目链表的头节点)。


3. 索引条目(Index Entry)区域

每个索引条目占 20 字节,包含以下字段:

字段名

长度(字节)

说明

keyHash

4

消息 Key 的哈希值(用于快速比对)

phyOffset

8

消息在 CommitLog 中的物理偏移量

timeDiff

4

消息存储时间与文件头部 beginTimestamp 的时间差(秒级)

slotValue

4

下一个索引条目的位置(用于解决哈希冲突的链表结构)


 三:IndexFile 写入和查询流程

IndexFile 写入流程:

+---------------------+
| Producer 发送消息     |
+---------------------+
          |
          v
+---------------------+
| 提取消息的 Key        | --> 如 UNIQ_KEY 或 KEYS 属性
+---------------------+
          |
          v
+---------------------+
| 检查 IndexFile 容量   | --> 是否已满?(indexCount >= indexNum)
+---------------------+
          | 是
          v
+---------------------+
| 返回 false,写入失败   |
+---------------------+
          | 否
          v
+---------------------+
| 计算 Key 的哈希值     | --> `keyHash = indexKeyHashMethod(key)`
+---------------------+
          |
          v
+---------------------+
| 计算哈希槽位置         | --> `slotPos = keyHash % hashSlotNum`
+---------------------+
          |
          v
+---------------------+
| 计算哈希槽绝对位置      | --> `absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize`
+---------------------+
          |
          v
+---------------------+
| 读取哈希槽的当前值      | --> `slotValue = mappedByteBuffer.getInt(absSlotPos)`
+---------------------+
          |
          v
+---------------------+
| 校验 slotValue 有效性  | --> 是否无效?(slotValue <= invalidIndex || slotValue > indexCount)
+---------------------+
          | 是
          v
+---------------------+
| 将 slotValue 设为无效   | --> `slotValue = invalidIndex`
+---------------------+
          | 否
          v
+---------------------+
| 计算时间差 (timeDiff)  | --> `timeDiff = (storeTimestamp - beginTimestamp) / 1000`
+---------------------+
          |
          v
+---------------------+
| 处理 timeDiff 边界值   | --> 确保 `0 <= timeDiff <= Integer.MAX_VALUE`
+---------------------+
          |
          v
+---------------------+
| 计算索引条目绝对位置     | --> `absIndexPos = IndexHeader.INDEX_HEADER_SIZE + hashSlotNum * hashSlotSize + indexCount * indexSize`
+---------------------+
          |
          v
+---------------------+
| 写入索引条目内容        |
| - keyHash            |
| - phyOffset          |
| - timeDiff           |
| - slotValue (nextIndex)|
+---------------------+
          |
          v
+---------------------+
| 更新哈希槽指向新条目     | --> `mappedByteBuffer.putInt(absSlotPos, indexCount)`
+---------------------+
          |
          v
+---------------------+
| 更新 IndexFile 头部信息 |
| - 若 indexCount <= 1,更新 beginPhyOffset 和 beginTimestamp |
| - 若 slotValue 无效,增加 hashSlotCount |
| - 增加 indexCount      |
| - 更新 endPhyOffset 和 endTimestamp |
+---------------------+
          |
          v
+---------------------+
| 返回 true,写入成功     |
+---------------------+
          |
          v
+---------------------+
| IndexFile 是否已满?   | -- 是 --> 创建新 IndexFile
| (文件大小 ≥ 400MB)    |
+---------------------+  

源码入口:org.apache.rocketmq.store.index.IndexFile#putKey

IndexFile 查询流程:

+---------------------+
| Consumer 根据 Key 查询 |
+---------------------+
          |
          v
+---------------------+
| 计算 Key 的哈希值     | --> `keyHash = Math.abs(key.hashCode())`
+---------------------+
          |
          v
+---------------------+
| 计算哈希槽位置         | --> `slotPos = keyHash % 5,000,000`
+---------------------+
          |
          v
+---------------------+
| 读取哈希槽的链表头位置   | --> `slotValue = mappedByteBuffer.getInt(slotPos * 4)`
+---------------------+
          |
          v
+---------------------+  
| 遍历链表条目           |
| while (slotValue > 0)|
+---------------------+
          |
          v
+---------------------+
| 读取索引条目:         |
| - keyHashRead       |
| - phyOffset         |
| - timeDiff          |
| - nextIndex         |
+---------------------+
          |
          v
+---------------------+
| 检查时间范围是否匹配?   | --> `storeTime = beginTimestamp + timeDiff * 1000`
| (storeTime ∈ [begin, end]?)|
+---------------------+
          | 否
          |------------------> 跳过,继续下一个条目
          | 是
          v
+---------------------+
| 比对 keyHashRead 和 keyHash |
| (是否相等?)          |
+---------------------+
          | 否
          |------------------> 跳过,继续下一个条目
          | 是
          v
+---------------------+
| 从 CommitLog 读取实际 Key |
| (检查 Key 是否一致?)    |
+---------------------+
          | 否
          |------------------> 跳过,继续下一个条目
          | 是
          v
+---------------------+
| 返回 phyOffset       | --> 添加到结果列表
+---------------------+
          |
          v
+---------------------+
| slotValue = nextIndex| --> 继续遍历下一个条目
+---------------------+
          |
          v
+---------------------+
| 遍历结束,返回结果列表   |
+---------------------+

源码入口:org.apache.rocketmq.store.index.IndexService#queryOffset

四:IndexFile解决hash冲突问题思想

RocketMQ 的 IndexFile 通过 链地址法(Chaining) 解决哈希冲突问题,其核心思想是将哈希到同一槽位的多个索引条目组织成链表结构,并通过哈希槽(Hash Slot)与索引条目(Index Entry)的关联实现高效写入和查询。以下是具体实现思想及关键设计:


1. 哈希冲突的背景

  • 哈希冲突:不同 Key 经过哈希函数计算后可能得到相同的哈希值,导致被分配到同一个哈希槽。

  • 问题:若不处理冲突,后续 Key 的索引会覆盖已有数据,导致查询结果错误。


2. 解决冲突的核心思想:链地址法

RocketMQ 的 IndexFile 采用 单链表 结构管理同一哈希槽下的所有冲突条目,具体流程如下:

(1) 写入时的链表插入

  • 新条目插入链表头部
    当新 Key 的哈希值与某槽位已有条目冲突时,新条目会被插入链表头部,并更新哈希槽指针指向新条目。

    // 新条目的 nextIndex 指向原头节点
    this.mappedByteBuffer.putInt(absIndexPos + 16, slotValue);
    // 更新哈希槽指针为新条目位置
    this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
     
    • 优势:插入时间复杂度为 O(1),无需遍历链表。

(2) 查询时的链表遍历

  • 遍历链表比对 Key
    查询时,从哈希槽指向的链表头节点开始,依次遍历所有条目,通过两次比对(哈希值 + 实际 Key)过滤冲突。

    while (nextIndexToRead > 0) {
        // 1. 读取条目内容
        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
        // 2. 比对哈希值
        if (keyHashRead == keyHash) {
            // 3. 从 CommitLog 读取实际 Key 比对
            String keyStored = readKeyFromCommitLog(phyOffsetRead);
            if (key.equals(keyStored)) {
                phyOffsets.add(phyOffsetRead);
            }
        }
        // 4. 移动到下一个节点
        nextIndexToRead = prevIndexRead;
    }

3. 关键设计优化

(1) 哈希槽数量固定

  • 默认 500 万个哈希槽

    private static final int HASH_SLOT_NUM = 5000000; // 默认槽数
    • 目的:通过大量槽位减少哈希冲突的概率,使冲突链表尽可能短。

    • 权衡:槽数过多会占用更多内存,但查询效率更高。

(2) 时间范围过滤

  • 索引条目存储时间差(timeDiff)
    每个索引条目记录消息存储时间与 IndexFile 起始时间的差值(秒级),查询时快速过滤掉不满足时间范围的条目。

    long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff * 1000L;
    if (timeRead < begin || timeRead > end) {
        continue; // 跳过不符合时间条件的条目
    }
    • 优势:减少无效条目的遍历,提升查询性能。

(3) 文件滚动(Rolling)

  • 按时间或大小滚动
    IndexFile 文件默认大小上限为 400MB,或时间跨度超过阈值时,创建新文件。

    • 目的:避免单个文件过大导致链表过长,同时支持按时间范围快速定位文件。

4. 示例场景

写入冲突场景

  • Key1: Ea#20231001123456 → 哈希值 19583063 → 槽位 18332292

  • Key2: FB#20231001123456 → 哈希值 19583063 → 槽位 18332292(冲突)

  • 处理流程

    1. Key1 写入槽位 18332292,链表头指向 Key1。

    2. Key2 写入时,插入链表头部,槽位指针更新为 Key2,Key2 的 nextIndex 指向 Key1。

查询冲突场景

  • 查询 Key: Ea#20231001123456

    1. 哈希计算定位到槽位 18332292。

    2. 遍历链表:

      • 先读取 Key2(哈希值匹配但 Key 不匹配,跳过)。

      • 再读取 Key1(哈希值 + Key 均匹配,返回 phyOffset)。

hash冲突代码调试示例

 public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        Message msg = new Message("Ea", "TagA" , ("消息1").getBytes(RemotingHelper.DEFAULT_CHARSET));
        msg.setKeys("20231001123456");
        producer.sendOneway(msg);

        Message msg2 = new Message("FB", "TagA" , ("消息3").getBytes(RemotingHelper.DEFAULT_CHARSET));
        msg2.setKeys("20231001123456");
        producer.sendOneway(msg2);
        
        producer.shutdown();
    }


http://www.kler.cn/a/531074.html

相关文章:

  • csapp笔记3.6节——控制(1)
  • C++底层学习预备:模板初阶
  • 音视频入门基础:RTP专题(8)——使用Wireshark分析RTP
  • python算法和数据结构刷题[1]:数组、矩阵、字符串
  • 前端25.1.26学习记录
  • 为什么LabVIEW适合软硬件结合的项目?
  • 机器学习day5
  • 【PDF提取局部内容改名】批量获取PDF局部文字内容改名 基于QT和百度云api的完整实现方案
  • 后盾人JS -- 原型
  • C语言教学第四课:控制结构
  • 内核定时器3-用户空间定时器
  • Docker Hub 镜像 Pull 失败的解决方案
  • AJAX笔记进阶篇
  • 《使用Ollama部署DeepSeek并进行对话全过程记录》
  • Spring 面试题【每日20道】【其二】
  • 11.1 LangChain Chains 最佳实践:从流水线设计到生产部署的全链路指南
  • 35.Word:公积金管理中心文员小谢【37】
  • string例题
  • MYSQL性能调优连接器、查询缓存、分析器、优化器、执行器、一图详解MYSQL底层工作原理
  • 泰山Office开源计划
  • 机试题——字符匹配
  • Python的那些事第十篇:隐藏细节与提供接口的艺术Python中的封装
  • Leetcode—598. 区间加法 II【简单】
  • golang命令大全7--性能优化与分析
  • Vue - readonly 与 shallowReadonly
  • 模拟实战-用CompletableFuture优化远程RPC调用