【RocketMQ 存储】CommitLogDispatcherBuildConsumeQueue 构建 ConsumeQueue 索引
文章目录
- 1. 前言
- 2. ConsumeQueue 索引构建
- 3. CommitLogDispatcherBuildConsumeQueue#dispatch
- 4. ConsumeQueue 索引构建 - putMessagePositionInfo
- 4.1 findConsumeQueue
- 4.2 putMessagePositionInfoWrapper 构建索引文件消息
- 4.3 putMessagePositionInfo 写入索引
- 4.4 fillPreBlank 填充消息索引
- 4.5 appendMessage 追加消息到 MappedByteBuffer 中
- 5. 小结
本文章基于 RocketMQ 4.9.3
1. 前言
RocketMQ 存储部分系列文章:
- 【RocketMQ 存储】- RocketMQ存储类 MappedFile
- 【RocketMQ 存储】- 一文总结 RocketMQ 的存储结构-基础
- 【RocketMQ 存储】- broker 端存储单条消息的逻辑
- 【RocketMQ 存储】- broker 端存储批量消息的逻辑
- 【RocketMQ 存储】- 同步刷盘和异步刷盘
- 【RocketMQ 存储】- 同步刷盘服务 GroupCommitService
- 【RocketMQ 存储】- 异步刷盘服务 FlushRealTimeService
- 【RocketMQ 存储】- 异步提交服务 CommitRealTimeService
- 【RocketMQ 存储】RocketMQ 如何高效创建 MappedFile
- 【RocketMQ 存储】消息重放服务-ReputMessageService
2. ConsumeQueue 索引构建
在上一篇文章中,我们介绍了 ReputMessageService 服务是如何获取 CommitLog 里面的消息进行重放,那么这篇文章我们就讲解 CommitLog 里面的消息是如何重放构建成 ConsumeQueue 索引的,构建的 doDispatch 方法如下:
public void doDispatch(DispatchRequest req) {
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);
}
}
CommitLogDispatcher 是消息分发接口,里面的核心构建分发就是 dispatch。
public interface CommitLogDispatcher {
/**
* Dispatch messages from store to build consume queues, indexes, and filter data
* @param request dispatch message request
*/
void dispatch(final DispatchRequest request);
}
这个类有三个实现类,其中的 CommitLogDispatcherBuildConsumeQueue
就是构建 ConsumeQueue 索引的类。
那么在看具体源码之前,我们来回顾下 ConsumeQueue 的索引结构。
- 首先是 8 个字节的 CommitLog Offset,就是消息在 CommitLog 中的偏移量,通过这个偏移量可以快速定位到消息的存储位置,从而读取到消息。
- 其次是消息的长度,其实读取 CommitLog 的时候 CommitLog 中记录的第一个字段就是消息的总长度,所以这个消息长度更多是用来更新一些偏移量,比如当前 CnnsumeQueue 下消息最大有效偏移量。
- 最后是 tag hashcode,其实就是 tag 值的 hashCode,在 RocketMQ 中,消费者可以通过指定 tag 来订阅特定类型的消息,同样的可以通过这个 tag hashcode 快速过滤掉不是这个消费者订阅的 tag。
3. CommitLogDispatcherBuildConsumeQueue#dispatch
这个类就是构建 ConsumeQueue 索引类,下面就看下里面的构建方法。
/**
* ConsumeQueue 构建服务
*/
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
// 获取消息的类型 sysFlag
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
// 非事务消息
case MessageSysFlag.TRANSACTION_NOT_TYPE:
// 事务 commit 消息
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// 这就代表了这条消息可以用于构建 ConsumeQueue
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
// 事务一阶段 prepare 消息和事务一阶段 rollback 消息,不处理
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
这个方法逻辑很简单,就是根据方法的事务状态判断是否需要构建 ConsumeQueue 索引,对于一条消息,如果是事务一阶段的 prepare
或者 rollback
,那么就说明这条消息还不能被消费者看见,或者说这条消息一阶段本地事务执行失败了需要回滚,这种情况肯定不能构建 ConsumeQueue 索引的。
那么对于普通消息或者说 commit
状态的消息就可以构建索引了。
4. ConsumeQueue 索引构建 - putMessagePositionInfo
下面就看下如何构建 ConsumeQueue 索引的,就是 putMessagePositionInfo 方法。
/**
* 写入消息位置信息
* @param dispatchRequest
*/
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
/**
* 根据 topic 和队列 id 找到对应的 ConsumeQueue
*/
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
/**
* 构建 ConsumeQueue 索引文件消息
*/
cq.putMessagePositionInfoWrapper(dispatchRequest, checkMultiDispatchQueue(dispatchRequest));
}
这个方法里面主要就是两个逻辑
- 首先要构建 ConsumeQueue 索引,那么肯定需要找到这条消息的索引需要构建在哪个 topic 下面的哪个队列。
- 找到 ConsumeQueue 后,再调用里面的 putMessagePositionInfoWrapper 方法去构建索引。
当然了这里面有一部分逻辑涉及到 LMQ,LMQ 是 RocketMQ 在 4.9.3 提出的轻量消息队列,主要面向小微设备消息传输的轻量级消息队列,支持 MQTT 协议,主要是对于 ConsumeQueue 的多队列分发构建索引,当然我们这里就不涉及这里面的源码讲解了。
4.1 findConsumeQueue
这个方法就是根据 topic 和队列 id 找到对应的 ConsumeQueue,RocketMQ 提供了一个 consumeQueueTable
队列集合,存储了 topic -> (queueId, ConsumeQueue)
的集合,大家知道一个 topic 下面是会分为多个 ConsumeQueue 的,所以可以根据这个集合去找。
// 从 consumeQueueTable 中根据 topic 获取 ConsumeQueue 集合
ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
如果说这个 map 是空的,这时候就创建一个集合设置到 map 中,不过感觉这里写的有点啰嗦了。
if (null == map) {
// 这里面就是如果不存在集合就创建一个放到 consumeQueueTable 里面
ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
// putIfAbsent 意思是如果已经存在旧 map 了,那么 newMap 就不会设置到里面
// 同时返回旧 map,如果没有映射关系就把 newMap 设置进去
ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
// 这里意思就是如果原来就存在就设置旧的,否则就设置新的
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
上面我们拿到了 map,这时候在 map 中通过 queueId 去找对应的 ConsumeQueue。
// 获取对应的 ConsumeQueue
ConsumeQueue logic = map.get(queueId);
如果这个 ConsumeQueue 是空,那么新建立一个设置到 map 中。
// 如果是空,那么就重新建一个,这里说明 ConsumeQueue 是延时建立的
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(
topic, // topic
queueId, // queueId
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), // ConsumeQueue 的存储路径
this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(), // ConsumeQueue 的大小,一个 ConsumeQueue 可存储 30w 条数据,每条数据 20B
this); // DefaultMessageStore
// 存入 map 中,如果已存在就获取旧的
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
// LMQ 的逻辑,ConsumeQueue 队列数 + 1
if (MixAll.isLmq(topic)) {
lmqConsumeQueueNum.getAndIncrement();
}
logic = newLogic;
}
}
最后返回创建的消息队列,这里感觉可以写得更简单的。
4.2 putMessagePositionInfoWrapper 构建索引文件消息
首先获取写入的最大重试次数 30 次,然后判断 ConsumeQueue 文件是否可写。
// 最大重试次数 30 次
final int maxRetries = 30;
// 判断 ConsumeQueue 文件是否可写
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
isCQWriteable
判断文件是否可写,当然这里的判断不是说简单用一个标记位来判断的。
public boolean isWriteable() {
if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT | DISK_FULL_BIT | WRITE_INDEX_FILE_ERROR_BIT)) == 0) {
return true;
}
return false;
}
flagBits 是 DefaultMessgeQueue 中专门用于记录存储的状态,比如
NOT_WRITEABLE_BIT
标记 ConsumeQueue 是否可写WRITE_LOGICS_QUEUE_ERROR_BIT
标记是否上一次写入的时候发生了错误DISK_FULL_BIT
标记磁盘是否满了WRITE_INDEX_FILE_ERROR_BIT
标记 IndexFile 的写入是否发生错误,这里说是写入,实际上是创建
当发生上面这些错误的时候就返回 false 表示不可继续往 Consume Queue 里面写入,否则就可以继续写入。
继续回到 putMessagePositionInfoWrapper
方法,往下看,写入 ConsumeQueue 的逻辑最多会重试 30 次,在 for 循环里面去重复执行。
// 重试 30 次
for (int i = 0; i < maxRetries && canWrite; i++) {
...
}
在 for 循环中首先处理 ConsumeQueue 扩展消息的 tagsCode,扩展信息后面会出文章介绍,这里就不多赘述。
// 普通消息的 tagCode
long tagsCode = request.getTagsCode();
// 是否支持 ConsumeQueue 扩展消息写入
if (isExtWriteEnable()) {
// 扩展信息
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
cqExtUnit.setFilterBitMap(request.getBitMap());
cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
cqExtUnit.setTagsCode(request.getTagsCode());
long extAddr = this.consumeQueueExt.put(cqExtUnit);
if (isExtAddr(extAddr)) {
tagsCode = extAddr;
} else {
log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
topic, queueId, request.getCommitLogOffset());
}
}
接下来调用 putMessagePositionInfo
方法写入消息到 ConsumeQueue 中。
// 写入消息到 ConsumeQueue 中
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
如果写入成功,那么记录下最新存储时间。
if (result) {
// 写入成功了
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
// 设置队列的物理消息最新消息存储时间
this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
}
// 设置队列的逻辑消息最新消息存储时间(ConsumeQueue 存储时间)
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
if (multiQueue) {
multiDispatchLmqQueue(request, maxRetries);
}
return;
}
这几个时间点都是消息存储的最新时间,RocketMQ 的 StockCheckPoint 文件就是根据这几个时间来进行 Broker 异常重启文件恢复的。
那如果写入失败,就睡眠 1s 重新执行,最多执行 30 次。
else {
// XXX: warn and notify me
log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
+ " failed, retry " + i + " times");
try {
// 写入失败,睡眠 1s 再次执行
Thread.sleep(1000);
} catch (InterruptedException e) {
log.warn("", e);
}
}
在 for 循环执行之后如果都还没有写入成功,那么就标记写入 ConsumeQueue 失败,也就是标记 WRITE_LOGICS_QUEUE_ERROR_BIT
。
// XXX: warn and notify me
log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
// 到这里就是 30 次都没能写入成功,这里就设置下存储状态是写入逻辑队列失败
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
好了,上面就是这个方法的全部逻辑了,下面给出全部代码。
/**
* 追加消息到 ConsumeQueue 中
* @param request
* @param multiQueue
*/
public void putMessagePositionInfoWrapper(DispatchRequest request, boolean multiQueue) {
// 最大重试次数 30 次
final int maxRetries = 30;
// 判断 ConsumeQueue 文件是否可写
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
// 重试 30 次
for (int i = 0; i < maxRetries && canWrite; i++) {
// 普通消息的 tagCode
long tagsCode = request.getTagsCode();
// 是否支持 ConsumeQueue 扩展消息写入
if (isExtWriteEnable()) {
// 扩展信息
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
cqExtUnit.setFilterBitMap(request.getBitMap());
cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
cqExtUnit.setTagsCode(request.getTagsCode());
long extAddr = this.consumeQueueExt.put(cqExtUnit);
if (isExtAddr(extAddr)) {
tagsCode = extAddr;
} else {
log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
topic, queueId, request.getCommitLogOffset());
}
}
// 写入消息到 ConsumeQueue 中
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
if (result) {
// 写入成功了
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
// 设置队列的物理消息最新消息存储时间
this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
}
// 设置队列的逻辑消息最新消息存储时间(ConsumeQueue 存储时间)
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
if (multiQueue) {
multiDispatchLmqQueue(request, maxRetries);
}
return;
} else {
// XXX: warn and notify me
log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
+ " failed, retry " + i + " times");
try {
// 写入失败,睡眠 1s 再次执行
Thread.sleep(1000);
} catch (InterruptedException e) {
log.warn("", e);
}
}
}
// XXX: warn and notify me
log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
// 到这里就是 30 次都没能写入成功,这里就设置下存储状态是写入逻辑队列失败
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
4.3 putMessagePositionInfo 写入索引
/**
* 写入消息到 ConsumeQueue 中
* @param offset 消息在 CommitLog 中的物理偏移量
* @param size 消息大小
* @param tagsCode 消息 tagsCode,普通消息是 hashCode,延时消息是消息投递到延时队列的时间
* @param cqOffset 消息在 ConsumeQueue 中的偏移量
* @return
*/
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
...
}
首先判断下如果消息在 Commitlog 中的偏移量 + 消息大小 <= 已处理的消息的最大偏移量,大家注意 maxPhysicOffset
这个字段是 ConsumeQueue 中已经存储的消息在 CommitLog 中的最大物理偏移量,而不是 CommitLog 中的消息最大物理偏移量。所以如果满足这个条件,说明这条消息已经在 ConsumeQueue 中构建索引了,这时候就没必要继续处理了。
// 如果消息偏移量 + 消息大小 <= 已处理的消息的最大偏移量
if (offset + size <= this.maxPhysicOffset) {
log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
// 说明消息已经处理过了,不需要再处理了
return true;
}
接下来就是往 ConsumeQueue 中写入索引了,那么首先我们就要把 ConsumeQueue 索引的消息给到一个临时 Buffer 中,这个 Buffer 就是 byteBufferIndex
,其实就是写入下面图中索引的三个属性。
// 切换读模式,byteBufferIndex 是临时存储的 ByteBuffer,可以被反复利用,这里切换读模式就相当于将 position 重置了,然后就可以重新写入
this.byteBufferIndex.flip();
// 限制 20B
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
// 存入 8 个字节的消息物理偏移量(CommitLog 偏移量)
this.byteBufferIndex.putLong(offset);
// 存入 4 个字节的消息物理大小
this.byteBufferIndex.putInt(size);
// 存入 8 个字节的 tagsCode,普通消息是消息 tags 的 hashCode,延时消息是消息投递到延时队列的时间
this.byteBufferIndex.putLong(tagsCode);
大家不要被 this.byteBufferIndex.flip()
给迷惑了,这个方法说是切换读模式,其实在里面就是将 position
重置,然后就可以从下标 0 开始继续写入了,byteBufferIndex
是一个 HeapByteBuffer。
我们知道索引是顺序写入 ConsumeQueue 文件中的,所以接下来要做的就是求出这个索引消息在 ConsumeQueue 中的偏移量。
// 消息在 ConsumeQueue 中的具体偏移量,因为 consumeQueueOffset 其实相当于是索引下标,所以要求出具体的偏移量还得 * 20
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
consumeQueueOffset
这里要 * 20,是因为传过来的 cqOffset
是下标索引,其实就是当前这条索引是第几条 ConsumeQueue 索引,所以实际求起始偏移量的时候还得 * 20。
获取到偏移量之后,就根据偏移量获取或创建要写入的 MappedFile,其实一般就是最后一个 MappedFile,因为是顺序写的。这个方法在 【RocketMQ 存储】RocketMQ 如何高效创建 MappedFile 中介绍过了,简单来说就是如果获取不到就提交创建 MappedFile 的请求。
// 根据偏移量获取或创建要写入的 MappedFile,其实一般就是最后一个 MappedFile,因为是顺序写的
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
获取到这个文件之后,如果发现这个 MappedFile 是 ConsumeQueue 中的第一个索引文件,同时 cqOffset != 0 表示不是第一条消息,但是 MappedFile 的写指针位置却是 0,那么说明前面的 ConsumeQueue 索引有可能因为某种原因被删掉了,此时要写入的消息是第一条消息,所以就在这条消息之前把空出来的这部分填充一些条目上去。同时更新最新的刷盘和提交位置以及已处理的消息在 CommitLog 中的最小偏移量。
// 创建好了 MappedFile 后,如果发现这个 MappedFile 是 ConsumeQueue 中的第一个索引文件
// 同时 cqOffset != 0 表示不是第一条消息,但是 MappedFile 的写指针位置却是 0
// 说明当前消息是第一个文件的第一条消息,但是却不是从 0 位置开始写入,这种情况下这个文件前面部分就需要填充一些条目了
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
// 更新已处理的消息在 CommitLog 中的最小偏移量
this.minLogicOffset = expectLogicOffset;
// 更新刷盘位置和已提交的位置
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
// 在 ByteBuffer 的 0-expectLogicOffset 位置填充消息索引条目
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}
接着看下面的代码,如果不符合上面的条件,那么就判断下 cqOffset != 0
,如果符合这个条件,就说明是顺序写入的,这时候计算出 MappedFile 中写指针 wrotePosition
的位置。
- 判断如果写入位置小于 currentLogicOffset,说明这条索引已经构建过了
- 如果这里不相等,那么说明
expectLogicOffset > currentLogicOffset
,说明计算出来的写入位置在当前写指针wrotePosition
后面,说明这个逻辑队列有可能出 BUG 了,一般都不会走到这里
// 这里就是顺着文件写入,所以需要判断下这个索引条目是否已经被构建过了
if (cqOffset != 0) {
// 计算当前 MappedFile 的写指针所在的位置
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
// 如果写入位置小于 currentLogicOffset,说明这条索引已经构建过了
if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}
// 如果这里不相等,那么说明这个队列顺序有可能是错的
// 注意这里是没有返回 false 的,就是说消息还是可以继续写入
if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset,
currentLogicOffset,
this.topic,
this.queueId,
expectLogicOffset - currentLogicOffset
);
}
}
上面如果发生 BUG 是没有返回 false 的,而是继续写入,而注意继续写入的位置就是 wrotePosition
而不是 cqOffset
,相当于说忽略了 cqOffset。
但是在写入之前,记录下最大写入的 ConsumeQueue 索引的物理偏移量,大家要记住这个文件是顺序写入的。
this.maxPhysicOffset = offset + size;
return mappedFile.appendMessage(this.byteBufferIndex.array());
好了,这个方法就解析到这里了,下面就给出所有的代码。
/**
* 写入消息到 ConsumeQueue 中
* @param offset 消息在 CommitLog 中的物理偏移量
* @param size 消息大小
* @param tagsCode 消息 tagsCode,普通消息是 hashCode,延时消息是消息投递到延时队列的时间
* @param cqOffset 消息在 ConsumeQueue 中的偏移量
* @return
*/
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
// 如果消息偏移量 + 消息大小 <= 已处理的消息的最大偏移量
if (offset + size <= this.maxPhysicOffset) {
log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
// 说明消息已经处理过了,不需要再处理了
return true;
}
// 切换读模式,byteBufferIndex 是临时存储的 ByteBuffer,可以被反复利用,这里切换读模式就相当于将 position 重置了,然后就可以重新写入
this.byteBufferIndex.flip();
// 限制 20B
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
// 存入 8 个字节的消息物理偏移量(CommitLog 偏移量)
this.byteBufferIndex.putLong(offset);
// 存入 4 个字节的消息物理大小
this.byteBufferIndex.putInt(size);
// 存入 8 个字节的 tagsCode,普通消息是消息 tags 的 hashCode,延时消息是消息投递到延时队列的时间
this.byteBufferIndex.putLong(tagsCode);
// 消息在 ConsumeQueue 中的具体偏移量,因为 consumeQueueOffset 其实相当于是索引下标,所以要求出具体的偏移量还得 * 20
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
// 根据偏移量获取或创建要写入的 MappedFile,其实一般就是最后一个 MappedFile,因为是顺序写的
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
// 创建好了 MappedFile 后,如果发现这个 MappedFile 是 ConsumeQueue 中的第一个索引文件
// 同时 cqOffset != 0 表示不是第一条消息,但是 MappedFile 的写指针位置却是 0
// 说明当前消息是第一个文件的第一条消息,但是却不是从 0 位置开始写入,这种情况下这个文件前面部分就需要填充一些条目了
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
// 更新已处理的消息在 CommitLog 中的最小偏移量
this.minLogicOffset = expectLogicOffset;
// 更新刷盘位置和已提交的位置
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
// 在 ByteBuffer 的 0-expectLogicOffset 位置填充消息索引条目
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}
// 这里就是顺着文件写入,所以需要判断下这个索引条目是否已经被构建过了
if (cqOffset != 0) {
// 计算当前 MappedFile 的写指针所在的位置
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
// 如果写入位置小于 currentLogicOffset,说明这条索引已经构建过了
if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}
// 如果这里不相等,那么说明这个队列顺序有可能是错的
if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset,
currentLogicOffset,
this.topic,
this.queueId,
expectLogicOffset - currentLogicOffset
);
}
}
this.maxPhysicOffset = offset + size;
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
4.4 fillPreBlank 填充消息索引
这个方法就是上面说的对 MappedFile 前面空出的部分进行填充,这个方法的逻辑就是填充 wrotePosition - untilWhere
这部分的空白。
private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {
ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
// 消息物理偏移量
byteBuffer.putLong(0L);
// 消息长度
byteBuffer.putInt(Integer.MAX_VALUE);
// 消息 tagsCode
byteBuffer.putLong(0L);
int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
mappedFile.appendMessage(byteBuffer.array());
}
}
方法的逻辑不复杂,直接看代码就行了,要注意的是填入的索引条目:
- 物理偏移量:0
- 消息长度:Integer.MAX_VALUE
- 消息 tagsCode:0
4.5 appendMessage 追加消息到 MappedByteBuffer 中
这个方法在【RocketMQ 存储】- RocketMQ存储类 MappedFile 中已经解析过了,所以这里我就不再讲解,直接给出代码。
/**
* 追加消息到 MappedByteBuffer 中,这里是 ConsumeQueue 的调用
* @param data 要写入的数据
* @return
*/
public boolean appendMessage(final byte[] data) {
// 获取写指针的位置
int currentPos = this.wrotePosition.get();
// 判断写入这些数据之后会不会超出这个 MappedByteBuffer 的边界
if ((currentPos + data.length) <= this.fileSize) {
try {
// slice 获取这个 MappedByteBuffer 的视图
ByteBuffer buf = this.mappedByteBuffer.slice();
// 设置写入位置
buf.position(currentPos);
// 写入数据到 MappedByteBuffer
buf.put(data);
} catch (Throwable e) {
log.error("Error occurred when append message to mappedFile.", e);
}
// 重新设置写指针的位置
this.wrotePosition.addAndGet(data.length);
return true;
}
return false;
}
5. 小结
好了,到这里我们就讲解了 CommitLogDispatcherBuildConsumeQueue 是如何构建 ConsumeQueue 索引的,下一篇文章我们将会讲解 IndexFile 索引的构建。
如有错误,欢迎指出!!!