【RocketMQ 存储】- 异常退出恢复逻辑 recoverAbnormally
文章目录
- 1. 前言
- 2. recover 异常退出恢复服务
- 3. CommitLog#recoverAbnormally
- 4. CommitLog#isMappedFileMatchedRecover
- 5. 小结
本文章基于 RocketMQ 4.9.3
1. 前言
RocketMQ 存储部分系列文章:
-
【RocketMQ 存储】- RocketMQ存储类 MappedFile
-
【RocketMQ 存储】- 一文总结 RocketMQ 的存储结构-基础
-
【RocketMQ 存储】- broker 端存储单条消息的逻辑
-
【RocketMQ 存储】- broker 端存储批量消息的逻辑
-
【RocketMQ 存储】- 同步刷盘和异步刷盘
-
【RocketMQ 存储】- 同步刷盘服务 GroupCommitService
-
【RocketMQ 存储】- 异步刷盘服务 FlushRealTimeService
-
【RocketMQ 存储】- 异步提交服务 CommitRealTimeService
-
【RocketMQ 存储】RocketMQ 如何高效创建 MappedFile
-
【RocketMQ 存储】消息重放服务-ReputMessageService
-
【RocketMQ 存储】CommitLogDispatcherBuildConsumeQueue 构建 ConsumeQueue 索引
-
【RocketMQ 存储】CommitLogDispatcherBuildIndex 构建 IndexFile 索引
-
【RocketMQ 存储】ConsumeQueue 刷盘服务 FlushConsumeQueueService
-
【RocketMQ 存储】- ConsumeQueue 过期清除服务CleanConsumeQueueService
-
【RocketMQ 存储】- CommitLog 过期清除服务 CleanCommitLogService
-
【RocketMQ 存储】- 正常退出恢复逻辑 recoverNormally
2. recover 异常退出恢复服务
如果 RocketMQ 因为某种原因异常关闭,这种情况下 abort
文件是不会被删掉的,那么在 broker 启动的时候初始化 DefaultMessageStore 就会检测出是异常退出重启,这种情况下就会调用异常退出恢复方法 recoverAbnormally
。
这个方法我们在上一篇文章 【RocketMQ 存储】- 正常退出恢复逻辑 recoverNormally 中已经讲解过,这里就不多说了,下面就简单看下文件是否是异常退出的判断逻辑。
/**
* Temp 临时文件,RocketMQ 启动的时候会创建一个大小为 0 的文件,当服务正常关闭(destory、shutdown),就会调用删除方法把这个临时文件删掉,
* 否则如果是异常关闭那么就不会删掉这个文件,这样一来只需要根据这个文件就能判断服务是不是正常关闭的
* @return
*/
private boolean isTempFileExist() {
// 获取临时文件的位置: ${home}/store/abort
String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
// 构建 File 文件
File file = new File(fileName);
// 判断这个文件是否存在
return file.exists();
}
同时也看下异常退出方法的入口。
/**
* 恢复 CommitLog 和 ConsumeQueue 中的数据到内存中
* @param lastExitOK
*/
private void recover(final boolean lastExitOK) {
// 恢复所有 ConsumeQueue 文件,返回的是 ConsumeQueue 中存储的最大有效 CommitLog 偏移量
long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
// 上一次 Broker 退出是正常退出还是异常退出
if (lastExitOK) {
// 这里就是正常退出,所以正常恢复 CommitLog
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
// 这里就是异常退出,所以异常恢复 CommitLog
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
// 最后恢复 topicQueueTable
this.recoverTopicQueueTable();
}
3. CommitLog#recoverAbnormally
异常恢复不同于正常恢复,正常恢复由于在 broker 启动前的写入是正常的,所以正常恢复可以从倒数第三个文件开始进行遍历,但是异常恢复不确定在哪一个文件出了问题,所以需要从后往前遍历找到第一个正确的 CommitLog 文件。
// 从最后一个文件开始往前找到第一个正确存储 CommitLog 消息的文件
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
// 校验整个 CommitLog 文件是不是一个正确的文件
if (this.isMappedFileMatchedRecover(mappedFile)) {
log.info("recover from this mapped file " + mappedFile.getFileName());
break;
}
}
接着从第一个正确的 CommitLog 文件开始恢复,恢复的流程和正常恢复的流程差不多,有一点不同的是由于是异常恢复,所以恢复的时候对于正常的消息索引需要重新构建 ConsumeQueue 和 IndexFile 索引,当遇到第一条异常的消息 (CRC 校验不通过,魔术不合法、记录的长度和实际求出来的长度不一样) 就会退出恢复流程,表示找到第一条不合法的消息,接着根据这条消息的物理偏移量去销毁 CommitLog 和 ConsumeQueue 中的无效文件,因为里面的非法我们基本都在上一篇文章正常恢复中讲过,所以整个恢复逻辑直接看下面注释就行。
/**
* 异常恢复,异常恢复由于没有记录 ConsumeQueue 和 CommitLog 的最新刷盘时间点,所以需要从后往前遍历找到第一个正确的 CommitLog 文件
* @param maxPhyOffsetOfConsumeQueue
*/
@Deprecated
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
// recover by the minimum time stamp
// 是否需要启用 CRC32 校验文件,就是确保消息在数据传输和文件存储过程中没有出现问题,如位翻转、数据损坏等
// 由于 CRC32 校验需要计算校验和,因此会对性能产生一定的影响
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
// 所有的 MappedFile 文件
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// 从最后一个文件开始往前找到第一个正确存储 CommitLog 消息的文件
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
// 校验整个 CommitLog 文件是不是一个正确的文件
if (this.isMappedFileMatchedRecover(mappedFile)) {
log.info("recover from this mapped file " + mappedFile.getFileName());
break;
}
}
// 从第一个正确的 CommitLog 文件开始恢复
if (index < 0) {
index = 0;
mappedFile = mappedFiles.get(index);
}
// 获取 CommotLog 对应的 ByteBuffer 视图
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
// 获取文件的初始偏移量,默认是文件名
long processOffset = mappedFile.getFileFromOffset();
// 已经校验过的有效 offset
long mappedFileOffset = 0;
while (true) {
// 校验本条消息是否合法
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
// 消息大小
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
// 消息正常,同时没有到文件的尾部
if (size > 0) {
// 合法的 mappedFileOffset 加上消息大小
mappedFileOffset += size;
// 如果允许消息索引重复转发构建
if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
// 这里判断下如果这条消息的物理偏移量小于 CommitLog 中的提交位置
if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
// 重新构建 ConsumeQueue、IndexFile 索引
this.defaultMessageStore.doDispatch(dispatchRequest);
}
} else {
// 重新构建 ConsumeQueue、IndexFile 索引
this.defaultMessageStore.doDispatch(dispatchRequest);
}
}
// Come the end of the file, switch to the next file
// Since the return 0 representatives met last hole, this can
// not be included in truncate offset
// 这里就是到文件尾部了,就需要跳到下一个文件继续恢复
else if (size == 0) {
index++;
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances should
// not happen
log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
// 这里就是没有到最后一个文件,所以设置下下一个文件的各个参数
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
} else {
// 当前消息异常,那么后续所有文件都不需要恢复了
log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
break;
}
}
// CommitLog 文件的最大有效偏移量
processOffset += mappedFileOffset;
// 设置刷盘位置
this.mappedFileQueue.setFlushedWhere(processOffset);
// 设置提交位置
this.mappedFileQueue.setCommittedWhere(processOffset);
// 删掉有效偏移量 processOffset 之后的文件
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// 上面清除了 CommitLog 中的无效数据,下面就要清除 ConsumeQueue 中的无效数据
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
// 删除 ConsumeQueue 中的无效数据
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
}
// Commitlog case files are deleted
else {
// 这里就是 CommitLog 下面的所有文件都不存在
log.warn("The commitlog files are deleted, and delete the consume queue files");
// 重置最新刷盘位置和提交位置
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
// 销毁所有 ConsumeQueue 文件
this.defaultMessageStore.destroyLogics();
}
}
4. CommitLog#isMappedFileMatchedRecover
这个方法用于校验整个 CommitLog 文件是不是一个正确的文件,主要是校验下面几个方面的内容。
- 如果魔数 != MESSAGE_MAGIC_CODE,就说明不是合法的 CommitLog 文件,校验失败
- 如果消息在 broker 端的存储时间为 0,就表示不是正常的消息,校验失败
由于 StoreCheckPoint 文件中存储了 ConsumeQueue、IndexFile、CommitLog 的最新刷盘的消息的存储时间戳,啥意思呢?就是消息刷盘的时候会记录下来最新的消息在 broker 端存储的时间,也就是 storeTimeStamp
。
// CommitLog 文件的最新刷盘的消息存储在 broker 端的时间戳
private volatile long physicMsgTimestamp = 0;
// ConsumeQueue 文件的最新刷盘的消息的存储时间戳
private volatile long logicsMsgTimestamp = 0;
// IndexFile 文件的最新刷盘的消息的存储时间戳
private volatile long indexMsgTimestamp = 0;
校验的时候就需要用这几个变量来判断,比如判断 IndexFile 合不合法就这个文件最新的存储时间 storeTimestamp
和记录的 indexMsgTimestamp
比较,如果是 storeTimestamp <= indexMsgTimestamp
,那么说明这个文件最新消息的存储时间比记录的时间戳都要小,也就是这个文件里面的消息都被刷盘了。
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
&& this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
// 检测就是判断下如果 checkpoint 中存储的 IndexFile 最小刷盘时间比当前这个 CommitLog 文件的最新数据存储时间要大
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
log.info("find check timestamp, {} {}",
storeTimestamp,
UtilAll.timeMillisToHumanString(storeTimestamp));
// 这里就直接返回 true,表示当前文件是合法的,因为这个文件最新消息的存储时间比记录的时间戳都要小,说明这个文件里面的消息都被刷盘了
return true;
}
}
如果是没有使用安全的 Index 索引模式,那么就校验 ConsumeQueue 和 IndexFile 的时间戳。
// 这里就是普通模式,检查下 checkpoint 中最小刷盘时间比当前这个 CommitLog 文件的最新数据存储时间要大
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
log.info("find check timestamp, {} {}",
storeTimestamp,
UtilAll.timeMillisToHumanString(storeTimestamp));
// 这里就直接返回 true,表示当前文件是合法的,因为这个文件最新消息的存储时间比记录的时间戳都要小,说明这个文件里面的消息都被刷盘了
return true;
}
# StoreCheckPoint#getMinTimestamp
/**
* 获取 physicMsgTimestamp 和 logicsMsgTimestamp 的最小值并且将去 3s
* @return
*/
public long getMinTimestamp() {
long min = Math.min(this.physicMsgTimestamp, this.logicsMsgTimestamp);
min -= 1000 * 3;
if (min < 0)
min = 0;
return min;
}
下面给出全部的逻辑。
/**
* 检查文件是否是一个正常的 MappedFile 文件,也就是 CommitLog 里面的消息是不是正确的
* @param mappedFile
* @return
*/
private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) {
// 获取 ByteBuffer 视图
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
// 获取文件开头的魔数
int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);
// 如果魔数 != -626843481,就说明不是合法的 CommitLog 文件,校验失败
if (magicCode != MESSAGE_MAGIC_CODE) {
return false;
}
// 获取 sysFlag,也就是消息的属性类型
int sysFlag = byteBuffer.getInt(MessageDecoder.SYSFLAG_POSITION);
// 获取下消息生成的 Producer 端地址
int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
// 获取消息在 broker 端存储的偏移量
int msgStoreTimePos = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + bornhostLength;
// 获取消息在 broker 端存储的最新时间
long storeTimestamp = byteBuffer.getLong(msgStoreTimePos);
if (0 == storeTimestamp) {
// 如果存储时间为 0,就表示不是正常的消息,校验错误
return false;
}
// 使用安全的 Index 索引模式,也就是要对 IndexFile 进行检测
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
&& this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
// 检测就是判断下如果 checkpoint 中存储的 IndexFile 最小刷盘时间比当前这个 CommitLog 文件的最新数据存储时间要大
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
log.info("find check timestamp, {} {}",
storeTimestamp,
UtilAll.timeMillisToHumanString(storeTimestamp));
// 这里就直接返回 true,表示当前文件是合法的,因为这个文件最新消息的存储时间比记录的时间戳都要小,说明这个文件里面的消息都被刷盘了
return true;
}
} else {
// 这里就是普通模式,检查下 checkpoint 中最小刷盘时间比当前这个 CommitLog 文件的最新数据存储时间要大
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
log.info("find check timestamp, {} {}",
storeTimestamp,
UtilAll.timeMillisToHumanString(storeTimestamp));
// 这里就直接返回 true,表示当前文件是合法的,因为这个文件最新消息的存储时间比记录的时间戳都要小,说明这个文件里面的消息都被刷盘了
return true;
}
}
return false;
}
5. 小结
到这里我们就讲完异常退出恢复逻辑 recoverAbnormally 了,里面很多逻辑在前一篇文章中也说过了,所以这里就不详细解释每一个方法的逻辑。
如有错误,欢迎指出!!!