当前位置: 首页 > article >正文

【RocketMQ 存储】- 异步刷盘服务 FlushRealTimeService

文章目录

  • 1. 前言
  • 2. 从 submitFlushRequest 出发
  • 3. FlushRealTimeService
    • 3.1 概述
    • 3.2 核心逻辑 - run
    • 3.3 MappedFileQueue#flush
  • 4. 小结


本文章基于 RocketMQ 4.9.3

1. 前言

RocketMQ 存储部分系列文章:

  • 【RocketMQ 存储】- RocketMQ存储类 MappedFile
  • 【RocketMQ 存储】- 一文总结 RocketMQ 的存储结构-基础
  • 【RocketMQ 存储】- broker 端存储单条消息的逻辑
  • 【RocketMQ 存储】- broker 端存储批量消息的逻辑
  • 【RocketMQ 存储】- 同步刷盘和异步刷盘
  • 【RocketMQ 存储】- 同步刷盘服务 GroupCommitService

上一篇文章中,我们解析了同步刷盘服务 GroupCommitService,既然有同步刷盘服务,当然就有异步刷盘服务了,异步刷盘服务是 FlushRealTimeService,当然除了异步刷盘服务还有异步提交服务,这篇文章我们就先来看下异步刷盘服务。


2. 从 submitFlushRequest 出发

在解析 FlushRealTimeService 的源码之前,还是先来看下 submitFlushRequest 方法,看看在里面是如何处理异步刷盘的。
在这里插入图片描述
看里面的逻辑,对于异步刷盘,需要看看是否开启了堆外缓存,如果没有启动堆外缓存,那么唤醒异步刷盘服务 FlushRealTimeService。因为没有开启堆外缓存,消息就是写入 MappedByteBuffer,我们知道 MappedByteBuffer 是通过 fileChannel.mmap 创建出来的,所以这部分数据只要写入 MappedByteBuffer 就等于写入了 Page Cache,不需要再额外 Commit,所以直接唤醒异步刷盘服务 FlushRealTimeService。


3. FlushRealTimeService

3.1 概述

那么现在就来看下这个异步刷盘服务,同样的这个服务也是继承 FlushCommitLogService,跟同步刷盘服务一样。

/**
 * 异步刷盘
 */
class FlushRealTimeService extends FlushCommitLogService {
	 // 上一次刷盘时间
	 private long lastFlushTimestamp = 0;
	 // 打印日志用的
	 private long printTimes = 0;
	
	 ...
}

abstract class FlushCommitLogService extends ServiceThread {
    protected static final int RETRY_TIMES_OVER = 10;
}

异步刷盘没有同步刷盘那么复杂,因为同步刷盘要考虑超时之类的,但是异步刷盘由于不需要阻塞生产者,所以只需要确保刷盘成功就行了。

因此这个类里面参数很简单,就一个 lastFlushTimestamp,这个参数记录了上一次刷盘的时间,至于用这个参数就是因为要用来判断当前时间距离上一次有没有超过一个限制,默认 10s,如果超过这个时间,那么只要有数据就立刻刷盘,而不是等到脏页达到 flushPhysicQueueLeastPages 才刷盘。

这是因为就算是异步刷盘,也不能频繁刷,频繁刷盘会对写入性能产生比较大的影响,同时在上几篇文章中我们说过,这部分内存由于临时 mlock 住,所以不会被交换到 swap 空间,但是如果频繁刷盘,刷盘之后如果接着往 MappedByteBuffer 里面继续写入就会继续产生缺页中断。具体可以看这篇文章:从 Linux 内核角度探秘 JDK MappedByteBuffer。

所以 RocketMQ 会用一个 flushPhysicQueueLeastPages 来控制刷盘的频率,这个变量表示脏页数量,默认是 4,也就是 16K,一般来说就算刷盘也得等待脏页数达到 flushPhysicQueueLeastPages 才进行刷盘,处理上面说的例外。


3.2 核心逻辑 - run

public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");
	// 异步刷盘就是一直死循环刷盘
    while (!this.isStopped()) {
		...
	}	
	...
}

run 方法是异步刷盘的核心逻辑,在 run 方法中如果刷盘服务没有停止,就会一直死循环刷盘,那么下面就来看下 while 循环里面的逻辑。

首先就判断下是不是需要固定延时执行,其实就是看看是不是每一次刷盘后 sleep 一段时间,默认是 true。

boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();

接着获取刷盘时间间隔,配合上面 flushCommitLogTimed 使用,默认是 500ms。

int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();

然后获取刷盘的最小页数(操作系统),也就是脏页数最小到 flushPhysicQueueLeastPages 才能刷盘,默认是 4,大小是 16K。异步刷盘就算不会阻塞生产者请求,但是也不能频繁刷盘,所以这里有一个脏页的概念,脏页最少到 flushPhysicQueueLeastPages 才允许刷盘,但是这只是其中一个条件。

int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();

接着获取距离上一次刷盘的最大延时,默认 10s,意思是如果当前刷盘时间距离上一次超过 10s,flushPhysicQueueLeastPages 就会被设置成 0,意思就是只要有脏数据就能刷盘。所以不是说一定要脏页达到 16K 才允许刷盘,如果距离上一次刷盘太久也会立马进行刷盘的。

int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

获取当前时间,判断如果当前时间距离上一次刷盘时间 >= 10s,那么设置 flushPhysicQueueLeastPages = 0,意味者只要有脏数据就需要刷盘。

// 当前时间
long currentTimeMillis = System.currentTimeMillis();
// 如果当前时间距离上一次刷盘时间 >= 10s,那么设置 flushPhysicQueueLeastPages = 0,意味者只要有脏数据就需要刷盘
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
    // 设置刷盘时间为当前时间
    this.lastFlushTimestamp = currentTimeMillis;
    flushPhysicQueueLeastPages = 0;
    // printTimes 每 10 的倍数打印一次进度
    printFlushProgress = (printTimes++ % 10) == 0;
}

下面就是睡眠,也就是判断需不需要固定延时执行,就是上面的 flushCommitLogTimed 参数,如果是定时刷盘,那么当前线程睡眠指定的间隔时间,默认 500ms,否则这里就是最多睡眠 500ms,因为可以通过 wakeup 去唤醒当前线程提前中断睡眠。

try{
	// 需要固定延时执行
	if (flushCommitLogTimed) {
		// 如果是定时刷盘,那么当前线程睡眠指定的间隔时间,默认 500ms
	    Thread.sleep(interval);
	} else {
	    // 否则这里就是最多睡眠 500ms,因为可以通过 wakeup 去唤醒当前线程提前中断睡眠
	    this.waitForRunning(interval);
	}
	...
} catch {
	CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
    this.printFlushProgress();
}

当睡眠结束,就要开始刷盘了,执行核心刷盘逻辑,指定最小刷盘页数(操作系统)。

// 起始时间,开始刷盘
long begin = System.currentTimeMillis();
// 刷盘逻辑,指定最小刷盘页数(操作系统)
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);

刷盘之后,万一 RocketMQ 崩溃,总得有一些恢复的手段,所以下面会记录最新 commitlog 的刷盘时间点,数据恢复会用到。

// 获取消息存储时间
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
    // 设置下最新 commitlog 的刷盘时间点,数据恢复会用到
    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}

最后打印下刷盘的消耗时间。

// 刷盘消耗时间
long past = System.currentTimeMillis() - begin;
if (past > 500) {
    // 打印下刷盘时间
    log.info("Flush data to disk costs {} ms", past);
}

这里面就是刷盘的逻辑,那么当异步刷盘服务正常关闭的时候,就需要把剩下的数据全部刷到磁盘中,然后关闭,当然了刷盘肯定不是一定能成功的,这里成不成功主要看有没有脏数据刷盘了,如果没有那么 result 就会返回 true,下次就不执行了。 RocketMQ 会设置默认最多 10 次刷盘操作。

// 异步刷盘服务正常关闭的时候,一次性执行 10 次刷盘操作
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
    result = CommitLog.this.mappedFileQueue.flush(0);
    CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}

this.printFlushProgress();

CommitLog.log.info(this.getServiceName() + " service end");

好了,上面就是刷盘的全部逻辑,下面给出全部的代码。

public void run() {
   CommitLog.log.info(this.getServiceName() + " service started");

   // 异步刷盘就是一直死循环刷盘
   while (!this.isStopped()) {
       // 判断下是不是需要固定延时执行,其实就是看看是不是每一次刷盘后 sleep 一段时间,默认是 true
       boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
       // 刷盘时间间隔,配合上面 flushCommitLogTimed 使用,默认是 500ms
       int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
       // 刷盘的最小页数(操作系统),也就是脏页数最小到 flushPhysicQueueLeastPages 才能刷盘,默认是 4,大小是 16K
       int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
       // 距离上一次刷盘的最大延时,默认 10s,意思是如果当前刷盘时间距离上一次超过 10s,flushPhysicQueueLeastPages 就会被设置成 0,意思就是只要有脏数据就能刷盘
       int flushPhysicQueueThoroughInterval =
           CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

       // 打印刷盘进度
       boolean printFlushProgress = false;

       // 当前时间
       long currentTimeMillis = System.currentTimeMillis();
       // 如果当前时间距离上一次刷盘时间 >= 10s,那么设置 flushPhysicQueueLeastPages = 0,意味者只要有脏数据就需要刷盘
       if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
           // 设置刷盘时间为当前时间
           this.lastFlushTimestamp = currentTimeMillis;
           flushPhysicQueueLeastPages = 0;
           // printTimes 每 10 的倍数打印一次进度
           printFlushProgress = (printTimes++ % 10) == 0;
       }

       try {
           // 需要固定延时执行
           if (flushCommitLogTimed) {
               // 如果是定时刷盘,那么当前线程睡眠指定的间隔时间,默认 500ms
               Thread.sleep(interval);
           } else {
               // 否则这里就是最多睡眠 500ms,因为可以通过 wakeup 去唤醒当前线程提前中断睡眠
               this.waitForRunning(interval);
           }

           // 打印进度
           if (printFlushProgress) {
               this.printFlushProgress();
           }

           // 起始时间,开始刷盘
           long begin = System.currentTimeMillis();
           // 刷盘逻辑,指定最小刷盘页数(操作系统)
           CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
           // 获取消息存储时间
           long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
           if (storeTimestamp > 0) {
               // 设置下最新 commitlog 的刷盘时间点,数据恢复会用到
               CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
           }
           // 刷盘消耗时间
           long past = System.currentTimeMillis() - begin;
           if (past > 500) {
               // 打印下刷盘时间
               log.info("Flush data to disk costs {} ms", past);
           }
       } catch (Throwable e) {
           CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
           this.printFlushProgress();
       }
   }

   // 异步刷盘服务正常关闭的时候,一次性执行 10 次刷盘操作
   boolean result = false;
   for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
       result = CommitLog.this.mappedFileQueue.flush(0);
       CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
   }

   this.printFlushProgress();

   CommitLog.log.info(this.getServiceName() + " service end");
}

3.3 MappedFileQueue#flush

这个是刷盘逻辑,下面来看下,首先根据刷到哪个位置了,去找到对应的 mappedFile,MappedFileQueue 中通过 flushedWhere 来记录上一次刷盘到哪个地方,下一次刷盘就可以接着从 flushedWhere 开始刷盘,所以我们第一步要做的就是通过 flushedWhere 找到对应的 MappedFile。

boolean result = true;
// 首先根据刷到哪个位置了,去找到对应的 mappedFile
// 如果 flushedWhere = 0,就表示还没有开始写入数据,这时候返回第一个文件
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);

获取到 MappedFile 之后,就可以开始刷盘了,这里面的刷盘逻辑就看注解,核心的方法 mappedFile.flush 在前面讲解 MappedFile 的时候已经详细介绍过了。

if (mappedFile != null) {
    // 存储时间
    long tmpTimeStamp = mappedFile.getStoreTimestamp();
    // 刷盘,这里返回结果判断逻辑如下
    // 1.如果没有使用读写分离,就获取 wrotePosition 的位置,就是 MappedByteBuffer 的 position
    // 2.如果使用了读写分离,就获取 committedPosition 的位置,因为使用读写分离,那么数据需要先写入
    //   堆外缓存,再刷盘,所以 committedPosition 就是写入堆外缓存的位置
    int offset = mappedFile.flush(flushLeastPages);
    // 因为一个 MappedFile 文件会映射一个 ByteBuffer,所以上面的 offset 在 MappedByteBuffer 中的偏移量
    // 而下面这个全局偏移量就是: mappedFile 的起始偏移量(文件名) + offset
    long where = mappedFile.getFileFromOffset() + offset;
    // result 就表示 flushedWhere 是不是最新的位点
    result = where == this.flushedWhere;
    // 更新 flushedWhere
    this.flushedWhere = where;
    if (0 == flushLeastPages) {
        // 如果最少刷盘页数为 0,就是说只要有数据就更新,那么更新存储时间戳
        // 如果最小刷盘页不为 0,就不会刷新这个参数
        this.storeTimestamp = tmpTimeStamp;
    }
}

记得刷完盘的时候维护 flushedWhere 刷盘位置和 storeTimestamp 最新的消息刷盘时间。


4. 小结

好了,这里就是异步刷盘的逻辑,下一篇文章就要开始讲解异步提交服务了。





如有错误,欢迎指出!!!


http://www.kler.cn/a/539195.html

相关文章:

  • C++ STL容器之vector的使用及复现
  • UE5.5 PCGFrameWork--GPU CustomHLSL
  • ASP.NET Core JWT Version
  • Linux系统-centos防火墙firewalld详解
  • postgresql 游标(cursor)的使用
  • DeepSeek从入门到精通:全面掌握AI大模型的核心能力
  • Python 报错分析:IndexError: list index out of range
  • Node.js 中模块化
  • 什么是Prompt工程?
  • 蓝耘智算平台与DeepSeek R1模型:推动深度学习发展
  • 企业如何利用DeepSeek提升网络安全管理水平
  • 【JAVAFX】textarea插入数据后滚动条自动到底部
  • 量化交易数据获取:xtquant库的高效应用
  • Transformer中的嵌入位置编码
  • Golang:Go 1.23 版本新特性介绍
  • 小程序实现消息订阅通知完整实践及踩坑记录
  • AI绘画:开启艺术与科技融合的未来之门(10/10)
  • Unity3D仿星露谷物语开发28之切换场景
  • 【神经网络框架】非局部神经网络
  • [LeetCode]day18 202.快乐数
  • Redis的数据过期策略和数据淘汰策略
  • 【计算机视觉】多分辨率金字塔全解析 ✨
  • 机试题——D路通信
  • Sparse4D v3:推进端到端3D检测和跟踪
  • Android系统SELinux详解
  • 携手AWS,零成本在EKS上体验AutoMQ企业版