当前位置: 首页 > article >正文

【RocketMQ 存储】CommitLogDispatcherBuildConsumeQueue 构建 ConsumeQueue 索引

文章目录

  • 1. 前言
  • 2. ConsumeQueue 索引构建
  • 3. CommitLogDispatcherBuildConsumeQueue#dispatch
  • 4. ConsumeQueue 索引构建 - putMessagePositionInfo
    • 4.1 findConsumeQueue
    • 4.2 putMessagePositionInfoWrapper 构建索引文件消息
    • 4.3 putMessagePositionInfo 写入索引
    • 4.4 fillPreBlank 填充消息索引
    • 4.5 appendMessage 追加消息到 MappedByteBuffer 中
  • 5. 小结


本文章基于 RocketMQ 4.9.3

1. 前言

RocketMQ 存储部分系列文章:

  • 【RocketMQ 存储】- RocketMQ存储类 MappedFile
  • 【RocketMQ 存储】- 一文总结 RocketMQ 的存储结构-基础
  • 【RocketMQ 存储】- broker 端存储单条消息的逻辑
  • 【RocketMQ 存储】- broker 端存储批量消息的逻辑
  • 【RocketMQ 存储】- 同步刷盘和异步刷盘
  • 【RocketMQ 存储】- 同步刷盘服务 GroupCommitService
  • 【RocketMQ 存储】- 异步刷盘服务 FlushRealTimeService
  • 【RocketMQ 存储】- 异步提交服务 CommitRealTimeService
  • 【RocketMQ 存储】RocketMQ 如何高效创建 MappedFile
  • 【RocketMQ 存储】消息重放服务-ReputMessageService

2. ConsumeQueue 索引构建

在上一篇文章中,我们介绍了 ReputMessageService 服务是如何获取 CommitLog 里面的消息进行重放,那么这篇文章我们就讲解 CommitLog 里面的消息是如何重放构建成 ConsumeQueue 索引的,构建的 doDispatch 方法如下:

public void doDispatch(DispatchRequest req) {
    for (CommitLogDispatcher dispatcher : this.dispatcherList) {
        dispatcher.dispatch(req);
    }
}

CommitLogDispatcher 是消息分发接口,里面的核心构建分发就是 dispatch。

public interface CommitLogDispatcher {

    /**
     *  Dispatch messages from store to build consume queues, indexes, and filter data
     * @param request dispatch message request
     */
    void dispatch(final DispatchRequest request);
}

这个类有三个实现类,其中的 CommitLogDispatcherBuildConsumeQueue 就是构建 ConsumeQueue 索引的类。

在这里插入图片描述
那么在看具体源码之前,我们来回顾下 ConsumeQueue 的索引结构。

在这里插入图片描述

  • 首先是 8 个字节的 CommitLog Offset,就是消息在 CommitLog 中的偏移量,通过这个偏移量可以快速定位到消息的存储位置,从而读取到消息。
  • 其次是消息的长度,其实读取 CommitLog 的时候 CommitLog 中记录的第一个字段就是消息的总长度,所以这个消息长度更多是用来更新一些偏移量,比如当前 CnnsumeQueue 下消息最大有效偏移量。
  • 最后是 tag hashcode,其实就是 tag 值的 hashCode,在 RocketMQ 中,消费者可以通过指定 tag 来订阅特定类型的消息,同样的可以通过这个 tag hashcode 快速过滤掉不是这个消费者订阅的 tag。

3. CommitLogDispatcherBuildConsumeQueue#dispatch

这个类就是构建 ConsumeQueue 索引类,下面就看下里面的构建方法。

/**
 * ConsumeQueue 构建服务
 */
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        // 获取消息的类型 sysFlag
        final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
        switch (tranType) {
            // 非事务消息
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            // 事务 commit 消息
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                // 这就代表了这条消息可以用于构建 ConsumeQueue
                DefaultMessageStore.this.putMessagePositionInfo(request);
                break;
            // 事务一阶段 prepare 消息和事务一阶段 rollback 消息,不处理
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
        }
    }
}

这个方法逻辑很简单,就是根据方法的事务状态判断是否需要构建 ConsumeQueue 索引,对于一条消息,如果是事务一阶段的 prepare 或者 rollback,那么就说明这条消息还不能被消费者看见,或者说这条消息一阶段本地事务执行失败了需要回滚,这种情况肯定不能构建 ConsumeQueue 索引的。

那么对于普通消息或者说 commit 状态的消息就可以构建索引了。


4. ConsumeQueue 索引构建 - putMessagePositionInfo

下面就看下如何构建 ConsumeQueue 索引的,就是 putMessagePositionInfo 方法。

/**
 * 写入消息位置信息
 * @param dispatchRequest
 */
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    /**
     * 根据 topic 和队列 id 找到对应的 ConsumeQueue
     */
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    /**
     * 构建 ConsumeQueue 索引文件消息
     */
    cq.putMessagePositionInfoWrapper(dispatchRequest, checkMultiDispatchQueue(dispatchRequest));
}

这个方法里面主要就是两个逻辑

  1. 首先要构建 ConsumeQueue 索引,那么肯定需要找到这条消息的索引需要构建在哪个 topic 下面的哪个队列。
  2. 找到 ConsumeQueue 后,再调用里面的 putMessagePositionInfoWrapper 方法去构建索引。

当然了这里面有一部分逻辑涉及到 LMQ,LMQ 是 RocketMQ 在 4.9.3 提出的轻量消息队列,主要面向小微设备消息传输的轻量级消息队列,支持 MQTT 协议,主要是对于 ConsumeQueue 的多队列分发构建索引,当然我们这里就不涉及这里面的源码讲解了。


4.1 findConsumeQueue

这个方法就是根据 topic 和队列 id 找到对应的 ConsumeQueue,RocketMQ 提供了一个 consumeQueueTable 队列集合,存储了 topic -> (queueId, ConsumeQueue) 的集合,大家知道一个 topic 下面是会分为多个 ConsumeQueue 的,所以可以根据这个集合去找。

// 从 consumeQueueTable 中根据 topic 获取 ConsumeQueue 集合
ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);

如果说这个 map 是空的,这时候就创建一个集合设置到 map 中,不过感觉这里写的有点啰嗦了。

if (null == map) {
    // 这里面就是如果不存在集合就创建一个放到 consumeQueueTable 里面
    ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
    // putIfAbsent 意思是如果已经存在旧 map 了,那么 newMap 就不会设置到里面
    // 同时返回旧 map,如果没有映射关系就把 newMap 设置进去
    ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
    // 这里意思就是如果原来就存在就设置旧的,否则就设置新的
    if (oldMap != null) {
        map = oldMap;
    } else {
        map = newMap;
    }
}

上面我们拿到了 map,这时候在 map 中通过 queueId 去找对应的 ConsumeQueue。

// 获取对应的 ConsumeQueue
ConsumeQueue logic = map.get(queueId);

如果这个 ConsumeQueue 是空,那么新建立一个设置到 map 中。

// 如果是空,那么就重新建一个,这里说明 ConsumeQueue 是延时建立的
if (null == logic) {
    ConsumeQueue newLogic = new ConsumeQueue(
        topic,      // topic
        queueId,    // queueId
        StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), // ConsumeQueue 的存储路径
        this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(), // ConsumeQueue 的大小,一个 ConsumeQueue 可存储 30w 条数据,每条数据 20B
        this); // DefaultMessageStore
    // 存入 map 中,如果已存在就获取旧的
    ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
    if (oldLogic != null) {
        logic = oldLogic;
    } else {
   		// LMQ 的逻辑,ConsumeQueue 队列数 + 1
        if (MixAll.isLmq(topic)) {
            lmqConsumeQueueNum.getAndIncrement();
        }
        logic = newLogic;
    }
}

最后返回创建的消息队列,这里感觉可以写得更简单的。


4.2 putMessagePositionInfoWrapper 构建索引文件消息

首先获取写入的最大重试次数 30 次,然后判断 ConsumeQueue 文件是否可写。

// 最大重试次数 30 次
final int maxRetries = 30;
// 判断 ConsumeQueue 文件是否可写
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();

isCQWriteable 判断文件是否可写,当然这里的判断不是说简单用一个标记位来判断的。

public boolean isWriteable() {
    if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT | DISK_FULL_BIT | WRITE_INDEX_FILE_ERROR_BIT)) == 0) {
        return true;
    }

    return false;
}

flagBits 是 DefaultMessgeQueue 中专门用于记录存储的状态,比如

  • NOT_WRITEABLE_BIT 标记 ConsumeQueue 是否可写
  • WRITE_LOGICS_QUEUE_ERROR_BIT 标记是否上一次写入的时候发生了错误
  • DISK_FULL_BIT 标记磁盘是否满了
  • WRITE_INDEX_FILE_ERROR_BIT 标记 IndexFile 的写入是否发生错误,这里说是写入,实际上是创建

当发生上面这些错误的时候就返回 false 表示不可继续往 Consume Queue 里面写入,否则就可以继续写入。

继续回到 putMessagePositionInfoWrapper 方法,往下看,写入 ConsumeQueue 的逻辑最多会重试 30 次,在 for 循环里面去重复执行。

// 重试 30 次
for (int i = 0; i < maxRetries && canWrite; i++) {
	...
}

在 for 循环中首先处理 ConsumeQueue 扩展消息的 tagsCode,扩展信息后面会出文章介绍,这里就不多赘述。

// 普通消息的 tagCode
long tagsCode = request.getTagsCode();
// 是否支持 ConsumeQueue 扩展消息写入
if (isExtWriteEnable()) {
    // 扩展信息
    ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
    cqExtUnit.setFilterBitMap(request.getBitMap());
    cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
    cqExtUnit.setTagsCode(request.getTagsCode());

    long extAddr = this.consumeQueueExt.put(cqExtUnit);
    if (isExtAddr(extAddr)) {
        tagsCode = extAddr;
    } else {
        log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
            topic, queueId, request.getCommitLogOffset());
    }
}

接下来调用 putMessagePositionInfo 方法写入消息到 ConsumeQueue 中。

// 写入消息到 ConsumeQueue 中
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
                request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());

如果写入成功,那么记录下最新存储时间。

if (result) {
    // 写入成功了
    if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
        this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
        // 设置队列的物理消息最新消息存储时间
        this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
    }
    // 设置队列的逻辑消息最新消息存储时间(ConsumeQueue 存储时间)
    this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
    if (multiQueue) {
        multiDispatchLmqQueue(request, maxRetries);
    }
    return;
}

这几个时间点都是消息存储的最新时间,RocketMQ 的 StockCheckPoint 文件就是根据这几个时间来进行 Broker 异常重启文件恢复的。

那如果写入失败,就睡眠 1s 重新执行,最多执行 30 次。

else {
    // XXX: warn and notify me
    log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
        + " failed, retry " + i + " times");

    try {
        // 写入失败,睡眠 1s 再次执行
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        log.warn("", e);
    }
}

在 for 循环执行之后如果都还没有写入成功,那么就标记写入 ConsumeQueue 失败,也就是标记 WRITE_LOGICS_QUEUE_ERROR_BIT

// XXX: warn and notify me
log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
// 到这里就是 30 次都没能写入成功,这里就设置下存储状态是写入逻辑队列失败
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();

好了,上面就是这个方法的全部逻辑了,下面给出全部代码。

/**
 * 追加消息到 ConsumeQueue 中
 * @param request
 * @param multiQueue
 */
public void putMessagePositionInfoWrapper(DispatchRequest request, boolean multiQueue) {
    // 最大重试次数 30 次
    final int maxRetries = 30;
    // 判断 ConsumeQueue 文件是否可写
    boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
    // 重试 30 次
    for (int i = 0; i < maxRetries && canWrite; i++) {
        // 普通消息的 tagCode
        long tagsCode = request.getTagsCode();
        // 是否支持 ConsumeQueue 扩展消息写入
        if (isExtWriteEnable()) {
            // 扩展信息
            ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
            cqExtUnit.setFilterBitMap(request.getBitMap());
            cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
            cqExtUnit.setTagsCode(request.getTagsCode());

            long extAddr = this.consumeQueueExt.put(cqExtUnit);
            if (isExtAddr(extAddr)) {
                tagsCode = extAddr;
            } else {
                log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
                    topic, queueId, request.getCommitLogOffset());
            }
        }
        // 写入消息到 ConsumeQueue 中
        boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
            request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
        if (result) {
            // 写入成功了
            if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
                this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
                // 设置队列的物理消息最新消息存储时间
                this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
            }
            // 设置队列的逻辑消息最新消息存储时间(ConsumeQueue 存储时间)
            this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
            if (multiQueue) {
                multiDispatchLmqQueue(request, maxRetries);
            }
            return;
        } else {
            // XXX: warn and notify me
            log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
                + " failed, retry " + i + " times");

            try {
                // 写入失败,睡眠 1s 再次执行
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                log.warn("", e);
            }
        }
    }

    // XXX: warn and notify me
    log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
    // 到这里就是 30 次都没能写入成功,这里就设置下存储状态是写入逻辑队列失败
    this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}

4.3 putMessagePositionInfo 写入索引

/**
 * 写入消息到 ConsumeQueue 中
 * @param offset    消息在 CommitLog 中的物理偏移量
 * @param size      消息大小
 * @param tagsCode  消息 tagsCode,普通消息是 hashCode,延时消息是消息投递到延时队列的时间
 * @param cqOffset  消息在 ConsumeQueue 中的偏移量
 * @return
 */
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
    final long cqOffset) {
	...
}

首先判断下如果消息在 Commitlog 中的偏移量 + 消息大小 <= 已处理的消息的最大偏移量,大家注意 maxPhysicOffset 这个字段是 ConsumeQueue 中已经存储的消息在 CommitLog 中的最大物理偏移量,而不是 CommitLog 中的消息最大物理偏移量。所以如果满足这个条件,说明这条消息已经在 ConsumeQueue 中构建索引了,这时候就没必要继续处理了。

// 如果消息偏移量 + 消息大小 <= 已处理的消息的最大偏移量
if (offset + size <= this.maxPhysicOffset) {
    log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
    // 说明消息已经处理过了,不需要再处理了
    return true;
}

接下来就是往 ConsumeQueue 中写入索引了,那么首先我们就要把 ConsumeQueue 索引的消息给到一个临时 Buffer 中,这个 Buffer 就是 byteBufferIndex,其实就是写入下面图中索引的三个属性。

在这里插入图片描述

// 切换读模式,byteBufferIndex 是临时存储的 ByteBuffer,可以被反复利用,这里切换读模式就相当于将 position 重置了,然后就可以重新写入
this.byteBufferIndex.flip();
// 限制 20B
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
// 存入 8 个字节的消息物理偏移量(CommitLog 偏移量)
this.byteBufferIndex.putLong(offset);
// 存入 4 个字节的消息物理大小
this.byteBufferIndex.putInt(size);
// 存入 8 个字节的 tagsCode,普通消息是消息 tags 的 hashCode,延时消息是消息投递到延时队列的时间
this.byteBufferIndex.putLong(tagsCode);

大家不要被 this.byteBufferIndex.flip() 给迷惑了,这个方法说是切换读模式,其实在里面就是将 position 重置,然后就可以从下标 0 开始继续写入了,byteBufferIndex 是一个 HeapByteBuffer。

我们知道索引是顺序写入 ConsumeQueue 文件中的,所以接下来要做的就是求出这个索引消息在 ConsumeQueue 中的偏移量。

// 消息在 ConsumeQueue 中的具体偏移量,因为 consumeQueueOffset 其实相当于是索引下标,所以要求出具体的偏移量还得 * 20
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

consumeQueueOffset 这里要 * 20,是因为传过来的 cqOffset 是下标索引,其实就是当前这条索引是第几条 ConsumeQueue 索引,所以实际求起始偏移量的时候还得 * 20。

获取到偏移量之后,就根据偏移量获取或创建要写入的 MappedFile,其实一般就是最后一个 MappedFile,因为是顺序写的。这个方法在 【RocketMQ 存储】RocketMQ 如何高效创建 MappedFile 中介绍过了,简单来说就是如果获取不到就提交创建 MappedFile 的请求。

// 根据偏移量获取或创建要写入的 MappedFile,其实一般就是最后一个 MappedFile,因为是顺序写的
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);

获取到这个文件之后,如果发现这个 MappedFile 是 ConsumeQueue 中的第一个索引文件,同时 cqOffset != 0 表示不是第一条消息,但是 MappedFile 的写指针位置却是 0,那么说明前面的 ConsumeQueue 索引有可能因为某种原因被删掉了,此时要写入的消息是第一条消息,所以就在这条消息之前把空出来的这部分填充一些条目上去。同时更新最新的刷盘和提交位置以及已处理的消息在 CommitLog 中的最小偏移量。
在这里插入图片描述

// 创建好了 MappedFile 后,如果发现这个 MappedFile 是 ConsumeQueue 中的第一个索引文件
// 同时 cqOffset != 0 表示不是第一条消息,但是 MappedFile 的写指针位置却是 0
// 说明当前消息是第一个文件的第一条消息,但是却不是从 0 位置开始写入,这种情况下这个文件前面部分就需要填充一些条目了
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
    // 更新已处理的消息在 CommitLog 中的最小偏移量
    this.minLogicOffset = expectLogicOffset;
    // 更新刷盘位置和已提交的位置
    this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
    this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
    // 在 ByteBuffer 的 0-expectLogicOffset 位置填充消息索引条目
    this.fillPreBlank(mappedFile, expectLogicOffset);
    log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
        + mappedFile.getWrotePosition());
}

接着看下面的代码,如果不符合上面的条件,那么就判断下 cqOffset != 0,如果符合这个条件,就说明是顺序写入的,这时候计算出 MappedFile 中写指针 wrotePosition 的位置。

  • 判断如果写入位置小于 currentLogicOffset,说明这条索引已经构建过了
  • 如果这里不相等,那么说明 expectLogicOffset > currentLogicOffset,说明计算出来的写入位置在当前写指针 wrotePosition 后面,说明这个逻辑队列有可能出 BUG 了,一般都不会走到这里
    在这里插入图片描述
// 这里就是顺着文件写入,所以需要判断下这个索引条目是否已经被构建过了
if (cqOffset != 0) {
    // 计算当前 MappedFile 的写指针所在的位置
    long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
    // 如果写入位置小于 currentLogicOffset,说明这条索引已经构建过了
    if (expectLogicOffset < currentLogicOffset) {
        log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
            expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
        return true;
    }

    // 如果这里不相等,那么说明这个队列顺序有可能是错的
    // 注意这里是没有返回 false 的,就是说消息还是可以继续写入
    if (expectLogicOffset != currentLogicOffset) {
        LOG_ERROR.warn(
            "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
            expectLogicOffset,
            currentLogicOffset,
            this.topic,
            this.queueId,
            expectLogicOffset - currentLogicOffset
        );
    }
}

上面如果发生 BUG 是没有返回 false 的,而是继续写入,而注意继续写入的位置就是 wrotePosition 而不是 cqOffset相当于说忽略了 cqOffset

但是在写入之前,记录下最大写入的 ConsumeQueue 索引的物理偏移量,大家要记住这个文件是顺序写入的。

this.maxPhysicOffset = offset + size;

return mappedFile.appendMessage(this.byteBufferIndex.array());

好了,这个方法就解析到这里了,下面就给出所有的代码。

/**
 * 写入消息到 ConsumeQueue 中
 * @param offset    消息在 CommitLog 中的物理偏移量
 * @param size      消息大小
 * @param tagsCode  消息 tagsCode,普通消息是 hashCode,延时消息是消息投递到延时队列的时间
 * @param cqOffset  消息在 ConsumeQueue 中的偏移量
 * @return
 */
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
    final long cqOffset) {

    // 如果消息偏移量 + 消息大小 <= 已处理的消息的最大偏移量
    if (offset + size <= this.maxPhysicOffset) {
        log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
        // 说明消息已经处理过了,不需要再处理了
        return true;
    }

    // 切换读模式,byteBufferIndex 是临时存储的 ByteBuffer,可以被反复利用,这里切换读模式就相当于将 position 重置了,然后就可以重新写入
    this.byteBufferIndex.flip();
    // 限制 20B
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    // 存入 8 个字节的消息物理偏移量(CommitLog 偏移量)
    this.byteBufferIndex.putLong(offset);
    // 存入 4 个字节的消息物理大小
    this.byteBufferIndex.putInt(size);
    // 存入 8 个字节的 tagsCode,普通消息是消息 tags 的 hashCode,延时消息是消息投递到延时队列的时间
    this.byteBufferIndex.putLong(tagsCode);

    // 消息在 ConsumeQueue 中的具体偏移量,因为 consumeQueueOffset 其实相当于是索引下标,所以要求出具体的偏移量还得 * 20
    final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

    // 根据偏移量获取或创建要写入的 MappedFile,其实一般就是最后一个 MappedFile,因为是顺序写的
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    if (mappedFile != null) {

        // 创建好了 MappedFile 后,如果发现这个 MappedFile 是 ConsumeQueue 中的第一个索引文件
        // 同时 cqOffset != 0 表示不是第一条消息,但是 MappedFile 的写指针位置却是 0
        // 说明当前消息是第一个文件的第一条消息,但是却不是从 0 位置开始写入,这种情况下这个文件前面部分就需要填充一些条目了
        if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
            // 更新已处理的消息在 CommitLog 中的最小偏移量
            this.minLogicOffset = expectLogicOffset;
            // 更新刷盘位置和已提交的位置
            this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
            this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
            // 在 ByteBuffer 的 0-expectLogicOffset 位置填充消息索引条目
            this.fillPreBlank(mappedFile, expectLogicOffset);
            log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                + mappedFile.getWrotePosition());
        }

        // 这里就是顺着文件写入,所以需要判断下这个索引条目是否已经被构建过了
        if (cqOffset != 0) {
            // 计算当前 MappedFile 的写指针所在的位置
            long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
            // 如果写入位置小于 currentLogicOffset,说明这条索引已经构建过了
            if (expectLogicOffset < currentLogicOffset) {
                log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                    expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                return true;
            }

            // 如果这里不相等,那么说明这个队列顺序有可能是错的
            if (expectLogicOffset != currentLogicOffset) {
                LOG_ERROR.warn(
                    "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                    expectLogicOffset,
                    currentLogicOffset,
                    this.topic,
                    this.queueId,
                    expectLogicOffset - currentLogicOffset
                );
            }
        }
        this.maxPhysicOffset = offset + size;
        return mappedFile.appendMessage(this.byteBufferIndex.array());
    }
    return false;
}

4.4 fillPreBlank 填充消息索引

这个方法就是上面说的对 MappedFile 前面空出的部分进行填充,这个方法的逻辑就是填充 wrotePosition - untilWhere 这部分的空白。

private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {
    ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
    // 消息物理偏移量
    byteBuffer.putLong(0L);
    // 消息长度
    byteBuffer.putInt(Integer.MAX_VALUE);
    // 消息 tagsCode
    byteBuffer.putLong(0L);

    int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
    for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
        mappedFile.appendMessage(byteBuffer.array());
    }
}

方法的逻辑不复杂,直接看代码就行了,要注意的是填入的索引条目:

  • 物理偏移量:0
  • 消息长度:Integer.MAX_VALUE
  • 消息 tagsCode:0

4.5 appendMessage 追加消息到 MappedByteBuffer 中

这个方法在【RocketMQ 存储】- RocketMQ存储类 MappedFile 中已经解析过了,所以这里我就不再讲解,直接给出代码。

/**
 * 追加消息到 MappedByteBuffer 中,这里是 ConsumeQueue 的调用
 * @param data 要写入的数据
 * @return
 */
public boolean appendMessage(final byte[] data) {
    // 获取写指针的位置
    int currentPos = this.wrotePosition.get();

    // 判断写入这些数据之后会不会超出这个 MappedByteBuffer 的边界
    if ((currentPos + data.length) <= this.fileSize) {
        try {
            // slice 获取这个 MappedByteBuffer 的视图
            ByteBuffer buf = this.mappedByteBuffer.slice();
            // 设置写入位置
            buf.position(currentPos);
            // 写入数据到 MappedByteBuffer
            buf.put(data);
        } catch (Throwable e) {
            log.error("Error occurred when append message to mappedFile.", e);
        }
        // 重新设置写指针的位置
        this.wrotePosition.addAndGet(data.length);
        return true;
    }

    return false;
}

5. 小结

好了,到这里我们就讲解了 CommitLogDispatcherBuildConsumeQueue 是如何构建 ConsumeQueue 索引的,下一篇文章我们将会讲解 IndexFile 索引的构建。





如有错误,欢迎指出!!!


http://www.kler.cn/a/548931.html

相关文章:

  • 蓝桥杯 Java B 组之栈的应用(括号匹配、表达式求值)
  • 【第13章:自监督学习与少样本学习—13.3 自监督学习与少样本学习在图像识别、语言理解等领域的应用探索】
  • Unity实现UI拖拽
  • 腿足机器人之八- 腿足机器人动力学
  • 代码随想录算法训练营第三十八天| 动态规划02
  • BY组态:工业自动化的未来,触手可及
  • Uniapp 实现顶部标签页切换功能?
  • 【一起学Rust 框架篇 Tauri2.0框架】Tauri2.0环境搭建与项目创建
  • 【第11章:生成式AI与创意应用—11.3 AI艺术创作的实现与案例分析:DeepArt、GANBreeder等】
  • 联合概率:定义、公式和示例
  • 【第3章:卷积神经网络(CNN)——3.2卷积层、池化层、全连接层的详细介绍】
  • Tomcat的升级
  • 启程C++
  • Pycharm 2024在解释器提供的python控制台中运行py文件
  • 04性能监控与调优篇(D4_JVM参数)
  • 【算法与数据结构】并查集详解+题目
  • CPU占用很高排查方案
  • Linux操作系统:网络配置与系统监控优化
  • MySQL、MariaDB 和 TDSQL 的区别
  • SQLite Select 语句详解