当前位置: 首页 > article >正文

【RocketMQ 存储】- broker 端存储批量消息的逻辑

文章目录

  • 1. 前言
  • 2. DefaultMessageStore#asyncPutMessages 批量新增消息
    • 2.1 Commit#asyncPutMessages
    • 2.2 Commit#encode 批量编码消息
    • 2.3 MappedFile#appendMessages 添加批量消息
    • 2.4 MappedFile#appendMessagesInner添加批量消息
    • 2.5 CommitLog#doAppend
  • 3. 小结


本文章基于 RocketMQ 4.9.3

1. 前言

上一篇文章中我们解析了单条消息添加逻辑,那么这篇文章中我们就来解析下批量新增消息的逻辑,批量新增消息的入口在 DefaultMessageStore#asyncPutMessages。上一篇文章的地址:【RocketMQ 存储】- broker 端存储单条消息的逻辑


2. DefaultMessageStore#asyncPutMessages 批量新增消息

上一篇文章我们介绍了单条消息的添加,这篇文章中批量新增的一些方法和单挑消息的新增是一模一样的,所以这里直接看注释就行了。

但是这里要注意一点,消息参数 MessageExtBatch 里面的 body 参数是一个 byte[] 数组,而所有要添加的消息会存储到一个数组中,也就是拼接的方式。

/**
 * 消息批量添加
 * @param messageExtBatch the message batch
 * @return
 */
public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
    // 1.检查下存储服务的状态
    PutMessageStatus checkStoreStatus = this.checkStoreStatus();
    if (checkStoreStatus != PutMessageStatus.PUT_OK) {
        // 这里就是消息存储服务不可用或者操作系统页繁忙,直接返回结果
        return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
    }

    // 2.校验消息看看是否合法
    PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
    if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
        // 消息长度不合法
        return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
    }

    // 3.当前时间
    long beginTime = this.getSystemClock().now();
    // 4.存储消息的核心逻辑
    CompletableFuture<PutMessageResult> resultFuture = this.commitLog.asyncPutMessages(messageExtBatch);

    // 这里就是结果处理
    resultFuture.thenAccept((result) -> {
        // 当消息存储完成之后,lambda 表达式会被调用
        // 消息消耗的时间
        long elapsedTime = this.getSystemClock().now() - beginTime;
        if (elapsedTime > 500) {
            log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);
        }

        // 设置下存储消息的消耗时间和最大消耗时间
        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

        if (null == result || !result.isOk()) {
            // 这里就是存储失败了,新增存储失败的次数
            this.storeStatsService.getPutMessageFailedTimes().add(1);
        }
    });

    return resultFuture;
}

2.1 Commit#asyncPutMessages

/**
 * broker 异步存储消息
 * @param messageExtBatch 批量消息
 * @return
 */
public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch messageExtBatch) {
    // 设置消息的存储时间
    messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
    AppendMessageResult result;

    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

    // 消息的事务类型
    final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());

    if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
        // 事务消息不会走这个方法,普通消息才可以使用批量添加
        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
    }
    if (messageExtBatch.getDelayTimeLevel() > 0) {
        // 延时消息也不可以批量添加
        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
    }

    // 设置 producer 端的地址
    InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
    if (bornSocketAddress.getAddress() instanceof Inet6Address) {
        // 如果是 IPV6,就设置下消息的发送端标记
        messageExtBatch.setBornHostV6Flag();
    }

    // 设置 broker 端的地址
    InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
    if (storeSocketAddress.getAddress() instanceof Inet6Address) {
        // 如果是 IPV6,就设置下消息的发送端标记
        messageExtBatch.setStoreHostAddressV6Flag();
    }

    long elapsedTimeInLock = 0;
    MappedFile unlockMappedFile = null;
    // 获取 CommitLog 中最后一个 MappedFile
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

    // 细粒度锁
    PutMessageThreadLocal pmThreadLocal = this.putMessageThreadLocal.get();
    // 消息编码器
    MessageExtEncoder batchEncoder = pmThreadLocal.getEncoder();

    // 消息存储上下文,里面包括 consumeQueueTable 的 key(topic-queueId)和批量消息
    PutMessageContext putMessageContext = new PutMessageContext(generateKey(pmThreadLocal.getKeyBuilder(), messageExtBatch));
    // 设置编码后的消息,说是编码,其实就是用一个 ByteBuffer 存储这批消息
    messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext));

    // 加锁
    putMessageLock.lock();
    try {
        // 当前时间
        long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
        // 加锁之后设置添加消息的初始时间
        this.beginTimeInLock = beginLockTimestamp;

        // 设置消息的存储时间为加锁的时间,确保全局有序
        messageExtBatch.setStoreTimestamp(beginLockTimestamp);

        // 这里就是说如果获取不到 MappedFile 或者获取到的 MappedFile 已经写满了,这时候会获取或者创建下一个 MappedFile
        if (null == mappedFile || mappedFile.isFull()) {
            mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
        }
        // 如果还是获取不到,那么说明创建失败了
        if (null == mappedFile) {
            // 返回结果 CREATE_MAPEDFILE_FAILED
            log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
        }

        // 核心逻辑,追加消息到 CommitLog 中
        result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
        // 下面判断下追加消息的结果
        switch (result.getStatus()) {
            // 添加成功,直接退出
            case PUT_OK:
                break;
            // 文件剩余空间不足,那么初始化新的文件并尝试再次存储
            case END_OF_FILE:
                // 记录上一个文件
                unlockMappedFile = mappedFile;
                // 创建一个新的文件,重新往里面写入
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                if (null == mappedFile) {
                    // 创建失败,直接返回错误结果 CREATE_MAPEDFILE_FAILED
                    log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                }
                // 创建成功后再次添加消息
                result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
                break;
            case MESSAGE_SIZE_EXCEEDED:
            case PROPERTIES_SIZE_EXCEEDED:
                // 这里就是消息的长度或者属性长度错误
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
            case UNKNOWN_ERROR:
            default:
                // 未知错误
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
        }
        // 这里就是加锁的时间,也是添加消息所耗费的时间
        elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
    } finally {
        // 最后解锁并且重置 beginTimeInLock
        beginTimeInLock = 0;
        putMessageLock.unlock();
    }

    // 如果加锁时间超过了 500ms
    if (elapsedTimeInLock > 500) {
        // 记录日志
        log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result);
    }

    // 文件写满了并启用文件预热
    if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
        // 对 unlockMappedFile 解锁,这里解锁是使用 munlock 解除这片内存背后锁定的 page cache,这下就能够交换到 swap 空间了
        // 因为这边 unlockMappedFile 已经写满了,所以这片空间可以解锁方便 swap 交换到磁盘 swap 空间,读写的重点在新的 MappedFile
        // 文件上,对于旧的 unlockMappedFile 读写就没有那么多了
        this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
    }

    // 返回结果
    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

    // 下面是一些数据统计,如写入 topic 的消息数 + 1
    storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(result.getMsgNum());
    // 存储的消息总字节 + result.getWroteBytes()
    storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(result.getWroteBytes());

    // 上面添加消息到 CommitLog 只是添加到背后的 ByteBuffer,接下来需要提交刷盘请求
    CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, messageExtBatch);
    // RocketMQ 主节点写入数据之后,向从节点提交消息复制请求,让 slave 节点去同步 master 节点新写入的消息
    CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, messageExtBatch);
    // 处理刷盘请求
    return flushOKFuture.thenCombine(replicaOKFuture, (flushStatus, replicaStatus) -> {
        // 外层会阻塞等待刷盘请求和从节点复制请求的结果
        if (flushStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(flushStatus);
        }
        if (replicaStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(replicaStatus);
        }
        return putMessageResult;
    });

}

要注意,对于普通消息才可以使用这个批量添加的方法,事务消息和延时消息是不可以批量添加的。


2.2 Commit#encode 批量编码消息

在这个方法中首先初始化 encoderBuffer,就是存储解析之后的消息。

// 重置 ByteBuffer,这里不是线程安全的
encoderBuffer.clear();

int totalMsgLen = 0;
// 消息具体内容,MessageExtBatch 里面的 body 存储了所有的 Message,顺序存储
ByteBuffer messagesByteBuff = messageExtBatch.wrap();

然后通过消息类型来分配 producer 和 broker 的地址 ByteBuffer。

// 消息类型
int sysFlag = messageExtBatch.getSysFlag();
// producer 的地址,ip + port
int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
// broker 的地址,ip + port
int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
// 分配两个 ByteBuffer
ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);

接着获取消息的属性长度。

// 批量消息的属性
String batchPropStr = MessageDecoder.messageProperties2String(messageExtBatch.getProperties());
final byte[] batchPropData = batchPropStr.getBytes(MessageDecoder.CHARSET_UTF8);
int batchPropDataLen = batchPropData.length;
if (batchPropDataLen > Short.MAX_VALUE) {
    // 如果属性字节数 > 32767,那么说明消息异常
    CommitLog.log.warn("Properties size of messageExtBatch exceeded, properties size: {}, maxSize: {}.", batchPropDataLen, Short.MAX_VALUE);
    throw new RuntimeException("Properties size of messageExtBatch exceeded!");
}
// 消息属性长度
final short batchPropLen = (short) batchPropDataLen;

接下来遍历编码的 messagesByteBuff,一条一条写入到 encoderBuffer 中。

// 消息数
int batchSize = 0;
while (messagesByteBuff.hasRemaining()) {
	...
}

下面看下 while 里面的逻辑,首先就是获取各种动态变量,如 bodyLen,propertiesLen 等,因为要计算消息长度就需要传入这些动态变量来计算。

// 消息数 + 1
batchSize++;
// 1.消息的总长度(TOTALSIZE)
messagesByteBuff.getInt();
// 2.魔法数(MAGICCODE),用于标识消息的版本或类型
messagesByteBuff.getInt();
// 3.消息体的 CRC(BODYCRC),用于校验消息体的完整性
messagesByteBuff.getInt();
// 4.消息的标志(FLAG),用于标识消息的一些特性
int flag = messagesByteBuff.getInt();
// 5.消息体长度、消息体的指针
int bodyLen = messagesByteBuff.getInt();
int bodyPos = messagesByteBuff.position();
// 6.获取消息体 CRC 校验码
int bodyCrc = UtilAll.crc32(messagesByteBuff.array(), bodyPos, bodyLen);
messagesByteBuff.position(bodyPos + bodyLen);
// 7.消息属性长度、消息属性指针
short propertiesLen = messagesByteBuff.getShort();
int propertiesPos = messagesByteBuff.position();
messagesByteBuff.position(propertiesPos + propertiesLen);

在上面的方法中 messagesByteBuff.getInt() 等起到的作用就是将 position 指针向后移动。

我举个例子,上面这些代码中读取 properties 属性的时候 short propertiesLen = messagesByteBuff.getShort(); 这个方法获取到了属性的长度,这时候 position 指向的位置是属性起始位置。
在这里插入图片描述

上面关注绿色部分就行了,这时候 position 指向了属性内容的第一个下标位置,然后设置 messagesByteBuff.position(propertiesPos + propertiesLen),比如按照上面的图,这个 propertiesLen 算出来就是 4,所以这里就是将 position 挪到属性后面的第一个下标。
在这里插入图片描述
看完这个例子大家应该就知道这里 position 是怎么移动的了。但是除了属性、topic 等,对于批量消息,这里会额外判断需不需要新增分隔符,批量消息中每一条消息都有属性值,而除了每条消息自身属性值之外,MessageExtBatch.properties 提供了所有消息共用的属性值。

这个分隔符就是用来分割自身属性值和共用属性值的。

// 8.messagesByteBuff 是否需要添加一个属性分隔符再添加属性(body 和 properties 的分隔符),这个分隔符就是 PROPERTY_SEPARATOR
boolean needAppendLastPropertySeparator = propertiesLen > 0 && batchPropLen > 0
            && messagesByteBuff.get(messagesByteBuff.position() - 1) != MessageDecoder.PROPERTY_SEPARATOR;

好了,继续 encode 的逻辑,下面就是 topic 的处理以及属性的长度计算。

// 9.消息 topic
final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);

// 10.消息 topic 的长度
final int topicLength = topicData.length;

// 11.如果需要添加,那么 totalPropLen(属性部分总长度)需要 + 1,这个 1 就表示属性分隔符的长度(char 类型)
int totalPropLen = needAppendLastPropertySeparator ? propertiesLen + batchPropLen + 1
                                                     : propertiesLen + batchPropLen;

到这里我们就获取了所有动态计算的长度,下面就要计算消息总长度了。

// 12.计算消息总长度
final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, totalPropLen);

/**
 * 计算消息长度
 * @param sysFlag           是否是 IPV6
 * @param bodyLength        消息体长度
 * @param topicLength       topic 长度
 * @param propertiesLength  消息属性长度
 * @return
 */
protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {
    // 如果是 IPV6 就是 20,否则是 8,这是因为 bornhost 是包括 IP + port 的,所以会多上 4 字节
    int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
    // 这里也是同理
    int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
    // 计算一条 CommitLog 消息的长度
    final int msgLen = 4 //TOTALSIZE
        + 4 //MAGICCODE
        + 4 //BODYCRC
        + 4 //QUEUEID
        + 4 //FLAG
        + 8 //QUEUEOFFSET
        + 8 //PHYSICALOFFSET
        + 4 //SYSFLAG
        + 8 //BORNTIMESTAMP
        + bornhostLength //BORNHOST
        + 8 //STORETIMESTAMP
        + storehostAddressLength //STOREHOSTADDRESS
        + 4 //RECONSUMETIMES
        + 8 //Prepared Transaction Offset
        + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY
        + 1 + topicLength //TOPIC
        + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength
        + 0;
    return msgLen;
}

计算出长度之后,就需要校验消息长度了,长度不合法的话就抛出异常。

// 13.消息长度合法校验
if (msgLen > this.maxMessageSize) {
    // 如果消息长度不合法,那么抛出异常
    CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen
        + ", maxMessageSize: " + this.maxMessageSize);
    throw new RuntimeException("message size exceeded");
}

下面再去计算总长度,这个总长度是所有消息的长度,msgLen 是单条消息的长度,如果总长度超过 1024 * 1024 * 4,也就是 4M,就抛出异常,这个长度是默认值,可以自己设置。

// 14.totalMsgLen 就是消息的总长度
totalMsgLen += msgLen;
// Determines whether there is sufficient free space
if (totalMsgLen > maxMessageSize) {
    // 如果消息总长度大于 maxMessageSize,那么抛出异常
    throw new RuntimeException("message size exceeded");
}

上面校验通过之后,就到了消息写入阶段了,消息会写入到 encoderBuffer 中。

// 1 TOTALSIZE
this.encoderBuffer.putInt(msgLen);
// 2 MAGICCODE
this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.encoderBuffer.putInt(bodyCrc);
// 4 QUEUEID
this.encoderBuffer.putInt(messageExtBatch.getQueueId());
// 5 FLAG
this.encoderBuffer.putInt(flag);
// 6 QUEUEOFFSET
this.encoderBuffer.putLong(0);
// 7 PHYSICALOFFSET
this.encoderBuffer.putLong(0);
// 8 SYSFLAG
this.encoderBuffer.putInt(messageExtBatch.getSysFlag());
// 9 BORNTIMESTAMP
this.encoderBuffer.putLong(messageExtBatch.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(bornHostHolder, bornHostLength);
this.encoderBuffer.put(messageExtBatch.getBornHostBytes(bornHostHolder));
// 11 STORETIMESTAMP
this.encoderBuffer.putLong(messageExtBatch.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(storeHostHolder, storeHostLength);
this.encoderBuffer.put(messageExtBatch.getStoreHostBytes(storeHostHolder));
// 13 RECONSUMETIMES
this.encoderBuffer.putInt(messageExtBatch.getReconsumeTimes());
// 14 Prepared Transaction Offset, batch does not support transaction
this.encoderBuffer.putLong(0);
// 15 BODY
this.encoderBuffer.putInt(bodyLen);
if (bodyLen > 0)
    // messagesByteBuff 里面记录了 body 和 properties,这里就是根据不同下标来获取不同内容,这里是 body
    this.encoderBuffer.put(messagesByteBuff.array(), bodyPos, bodyLen);
// 16 TOPIC
this.encoderBuffer.put((byte) topicLength);
this.encoderBuffer.put(topicData);
// 17 PROPERTIES
this.encoderBuffer.putShort((short) totalPropLen);
if (propertiesLen > 0) {
    // 这里是获取 properties 长度
    this.encoderBuffer.put(messagesByteBuff.array(), propertiesPos, propertiesLen);
}
if (batchPropLen > 0) {
    // 这里是获取 properties 具体内容
    if (needAppendLastPropertySeparator) {
        // 需要新增一个属性分隔值 MessageDecoder.PROPERTY_SEPARATOR
        this.encoderBuffer.put((byte) MessageDecoder.PROPERTY_SEPARATOR);
    }
    // 再添加 MessageBatch 里面设置的批量公共属性值
    this.encoderBuffer.put(batchPropData, 0, batchPropLen);
}

上面就是一条消息的内容,到这里就是 while 循环里面的全部内容。

那么当消息全部设置到 encoderBuffer 中,这时候就设置消息数,也就是一共有多少条消息,然后再设置偏移量数组,最后再切换读模式,外层就可以直接通过 encoderBuffer 读取消息了。

// 设置消息数(多少条消息)
putMessageContext.setBatchSize(batchSize);
// 设置偏移量数组
putMessageContext.setPhyPos(new long[batchSize]);
// 切换读模式,外层就可以直接根据 position 进行调用了
encoderBuffer.flip();

这里就是批量编码消息的逻辑,下面就是整体代码。

/**
 * 批量消息编码
 * @param messageExtBatch
 * @param putMessageContext
 * @return
 */
protected ByteBuffer encode(final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) {
    // 重置 ByteBuffer,这里不是线程安全的
    encoderBuffer.clear();
    int totalMsgLen = 0;
    // 消息具体内容,MessageExtBatch 里面的 body 存储了所有的 Message,顺序存储
    ByteBuffer messagesByteBuff = messageExtBatch.wrap();

    // 消息类型
    int sysFlag = messageExtBatch.getSysFlag();
    // producer 的地址,ip + port
    int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
    // broker 的地址,ip + port
    int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
    // 分配两个 ByteBuffer
    ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
    ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);

    // 批量消息的属性
    String batchPropStr = MessageDecoder.messageProperties2String(messageExtBatch.getProperties());
    final byte[] batchPropData = batchPropStr.getBytes(MessageDecoder.CHARSET_UTF8);
    int batchPropDataLen = batchPropData.length;
    if (batchPropDataLen > Short.MAX_VALUE) {
        // 如果属性字节数 > 32767,那么说明消息异常
        CommitLog.log.warn("Properties size of messageExtBatch exceeded, properties size: {}, maxSize: {}.", batchPropDataLen, Short.MAX_VALUE);
        throw new RuntimeException("Properties size of messageExtBatch exceeded!");
    }
    // 消息属性长度
    final short batchPropLen = (short) batchPropDataLen;

    // 消息数
    int batchSize = 0;
    while (messagesByteBuff.hasRemaining()) {
        // 消息数 + 1
        batchSize++;
        // 1.消息的总长度(TOTALSIZE)
        messagesByteBuff.getInt();
        // 2.魔法数(MAGICCODE),用于标识消息的版本或类型
        messagesByteBuff.getInt();
        // 3.消息体的 CRC(BODYCRC),用于校验消息体的完整性
        messagesByteBuff.getInt();
        // 4.消息的标志(FLAG),用于标识消息的一些特性
        int flag = messagesByteBuff.getInt();
        // 5.消息体长度、消息体的指针
        int bodyLen = messagesByteBuff.getInt();
        int bodyPos = messagesByteBuff.position();
        // 6.获取消息体 CRC 校验码
        int bodyCrc = UtilAll.crc32(messagesByteBuff.array(), bodyPos, bodyLen);
        messagesByteBuff.position(bodyPos + bodyLen);
        // 7.消息属性长度、消息属性指针
        short propertiesLen = messagesByteBuff.getShort();
        int propertiesPos = messagesByteBuff.position();
        messagesByteBuff.position(propertiesPos + propertiesLen);
        // 8.messagesByteBuff 是否需要添加一个属性分隔符再添加属性(body 和 properties 的分隔符),这个分隔符就是 PROPERTY_SEPARATOR
        boolean needAppendLastPropertySeparator = propertiesLen > 0 && batchPropLen > 0
                    && messagesByteBuff.get(messagesByteBuff.position() - 1) != MessageDecoder.PROPERTY_SEPARATOR;

        // 9.消息 topic
        final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);

        // 10.消息 topic 的长度
        final int topicLength = topicData.length;

        // 11.如果需要添加,那么 totalPropLen(属性部分总长度)需要 + 1,这个 1 就表示属性分隔符的长度(char 类型)
        int totalPropLen = needAppendLastPropertySeparator ? propertiesLen + batchPropLen + 1
                                                             : propertiesLen + batchPropLen;
        // 12.计算消息总长度
        final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, totalPropLen);

        // 13.消息长度合法校验
        if (msgLen > this.maxMessageSize) {
            // 如果消息长度不合法,那么抛出异常
            CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen
                + ", maxMessageSize: " + this.maxMessageSize);
            throw new RuntimeException("message size exceeded");
        }

        // 14.totalMsgLen 就是消息的总长度
        totalMsgLen += msgLen;
        // Determines whether there is sufficient free space
        if (totalMsgLen > maxMessageSize) {
            // 如果消息总长度大于 maxMessageSize,那么抛出异常
            throw new RuntimeException("message size exceeded");
        }

        // 下面就是具体的消息写入了,消息会写入到 encoderBuffer 中
        // 1 TOTALSIZE
        this.encoderBuffer.putInt(msgLen);
        // 2 MAGICCODE
        this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
        // 3 BODYCRC
        this.encoderBuffer.putInt(bodyCrc);
        // 4 QUEUEID
        this.encoderBuffer.putInt(messageExtBatch.getQueueId());
        // 5 FLAG
        this.encoderBuffer.putInt(flag);
        // 6 QUEUEOFFSET
        this.encoderBuffer.putLong(0);
        // 7 PHYSICALOFFSET
        this.encoderBuffer.putLong(0);
        // 8 SYSFLAG
        this.encoderBuffer.putInt(messageExtBatch.getSysFlag());
        // 9 BORNTIMESTAMP
        this.encoderBuffer.putLong(messageExtBatch.getBornTimestamp());
        // 10 BORNHOST
        this.resetByteBuffer(bornHostHolder, bornHostLength);
        this.encoderBuffer.put(messageExtBatch.getBornHostBytes(bornHostHolder));
        // 11 STORETIMESTAMP
        this.encoderBuffer.putLong(messageExtBatch.getStoreTimestamp());
        // 12 STOREHOSTADDRESS
        this.resetByteBuffer(storeHostHolder, storeHostLength);
        this.encoderBuffer.put(messageExtBatch.getStoreHostBytes(storeHostHolder));
        // 13 RECONSUMETIMES
        this.encoderBuffer.putInt(messageExtBatch.getReconsumeTimes());
        // 14 Prepared Transaction Offset, batch does not support transaction
        this.encoderBuffer.putLong(0);
        // 15 BODY
        this.encoderBuffer.putInt(bodyLen);
        if (bodyLen > 0)
            // messagesByteBuff 里面记录了 body 和 properties,这里就是根据不同下标来获取不同内容,这里是 body
            this.encoderBuffer.put(messagesByteBuff.array(), bodyPos, bodyLen);
        // 16 TOPIC
        this.encoderBuffer.put((byte) topicLength);
        this.encoderBuffer.put(topicData);
        // 17 PROPERTIES
        this.encoderBuffer.putShort((short) totalPropLen);
        if (propertiesLen > 0) {
            // 这里是获取 properties 长度
            this.encoderBuffer.put(messagesByteBuff.array(), propertiesPos, propertiesLen);
        }
        if (batchPropLen > 0) {
            // 这里是获取 properties 具体内容
            if (needAppendLastPropertySeparator) {
                // 需要新增一个属性分隔值 MessageDecoder.PROPERTY_SEPARATOR
                this.encoderBuffer.put((byte) MessageDecoder.PROPERTY_SEPARATOR);
            }
            // 再添加属性值
            this.encoderBuffer.put(batchPropData, 0, batchPropLen);
        }
    }
    // 设置消息数(多少条消息)
    putMessageContext.setBatchSize(batchSize);
    // 设置偏移量数组
    putMessageContext.setPhyPos(new long[batchSize]);
    // 切换读模式,外层就可以直接根据 position 进行调用了
    encoderBuffer.flip();
    return encoderBuffer;
}

2.3 MappedFile#appendMessages 添加批量消息

/**
 * 和上面的 appendMessage 差不多,这里是批量添加消息的逻辑
 * @param messageExtBatch
 * @param cb
 * @param putMessageContext
 * @return
 */
public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb,
        PutMessageContext putMessageContext) {
    return appendMessagesInner(messageExtBatch, cb, putMessageContext);
}

2.4 MappedFile#appendMessagesInner添加批量消息

/**
 * 把消息追加到 CommitLog 文件结尾,这个方法是 CommitLog 调用的
 * @param messageExt
 * @param cb
 * @param putMessageContext
 * @return
 */
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
        PutMessageContext putMessageContext) {
    assert messageExt != null;
    assert cb != null;
    // 获取写指针的位置
    int currentPos = this.wrotePosition.get();
    // 写指针小于文件大小,可以写入
    if (currentPos < this.fileSize) {
        // 是否开启堆外缓存,如果开启了就使用 writeBuffer 进行写入(读写分离)
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        // 标记写入位置
        byteBuffer.position(currentPos);
        // 消息写入结果
        AppendMessageResult result;
        if (messageExt instanceof MessageExtBrokerInner) {
            // 单条消息写入
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                    (MessageExtBrokerInner) messageExt, putMessageContext);
        } else if (messageExt instanceof MessageExtBatch) {
            // 批量消息写入
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                    (MessageExtBatch) messageExt, putMessageContext);
        } else {
            // 不知道是什么
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        // 更新写入位置
        this.wrotePosition.addAndGet(result.getWroteBytes());
        // 设置消息存入的时间(最新)
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

这里是单条消息的追加和批量消息的追加,都会调用这个方法,我们主要看的是里面的批量追加逻辑。


2.5 CommitLog#doAppend

这个就是最终的核心追加方法,来看下方法的定义。

/**
* 批量添加消息
* @param fileFromOffset
* @param byteBuffer
* @param maxBlank
* @param messageExtBatch, backed up by a byte array
* @param putMessageContext
* @return
*/
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
  final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) {
	...
}

首先通过 byteBuffer.mark 去做下标记,意味者 mark 设置为 position,这个 position 通过 slice 初始化出来就是 0

// 设置下 byteBuffer 的 mark 标记为 position
byteBuffer.mark();

然后获取写指针的位置,这个位置就是在 CommitLog 中的物理偏移量,fileFromOffset 代表这个 CommitLog 的起始位置,byteBuffer.position() 代表当前这个文件写到哪个位置。

// 写指针的位置,物理偏移量
long wroteOffset = fileFromOffset + byteBuffer.position();

接下来获取 ConsumeQueue 中对应的最大物理偏移量并更新。

// ConsumeQueue 消息偏移量的 key,就是 topic-queueId
String key = putMessageContext.getTopicQueueTableKey();
// 获取偏移量
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
    // 初始化为 0
    queueOffset = 0L;
    CommitLog.this.topicQueueTable.put(key, queueOffset);
}
long beginQueueOffset = queueOffset;

下面继续获取消息类型、producer 端地址长度(ip + port)、broker 端的地址长度(ip + port),因为这几个是动态变量,需要计算出来。

// 消息类型
int sysFlag = messageExtBatch.getSysFlag();
// producer 端的地址长度(ip + port)
int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
// broker 端的地址长度(ip + port)
int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;

然后构建消息 ID,在消息发送的时候会在 broker 端生成一个唯一 ID 进行标识,批量消息也是一样的,生成的消息 ID 格式如下:(ip1 + port1 + phyPosArray[0])+ (ip2 + port2 + phyPosArray[1])...,phyPosArray 是前面创建出来的物理偏移量数组,但是到这里好像也没有初始化,所以不太能确定。

// 构建消息 ID,在消息发送的时候会在 broker 端生成一个唯一 ID 进行标识
Supplier<String> msgIdSupplier = () -> {
    // 如果是 IPV4,那么长度就是 16,IPV6 长度就是 28,默认就是 16
    // 这个消息 ID 组成是: ip + port + wroteOffset
    int msgIdLen = storeHostLength + 8;
    // 有多少条消息
    int batchCount = putMessageContext.getBatchSize();
    // 物理偏移量
    long[] phyPosArray = putMessageContext.getPhyPos();
    // 分配 msgIdLen 长度的 ByteBuffer
    ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
    // 将 socketAddress 里面的 ip + port 设置到 byteBuffer 中,然后切换读模式
    MessageExt.socketAddress2ByteBuffer(messageExtBatch.getStoreHost(), msgIdBuffer);
    // clear 重置 position
    msgIdBuffer.clear();

    StringBuilder buffer = new StringBuilder(batchCount * msgIdLen * 2 + batchCount - 1);
    // 遍历所有的物理偏移量
    for (int i = 0; i < phyPosArray.length; i++) {
        // 设置物理偏移量,msgIdBuffer = ip + port + phyPos
        msgIdBuffer.putLong(msgIdLen - 8, phyPosArray[i]);
        // 获取消息 ID
        String msgId = UtilAll.bytes2string(msgIdBuffer.array());
        if (i != 0) {
            // 使用 "," 拼接
            buffer.append(',');
        }
        buffer.append(msgId);
    }
    // 返回这批消息生成的唯一 ID
    return buffer.toString();
};

下面再对编码的消息 ByteBuffer 做一下标记 mark。

// 标记 position 的位置
messagesByteBuff.mark();
int index = 0;

进入 while 循环,接下来就在 while 循环里面追加消息。

while (messagesByteBuff.hasRemaining()) {
	...
}

在 while 循环中首先记录下几个属性。

// 获取 messagesByteBuff 的 position
final int msgPos = messagesByteBuff.position();
// 1.消息长度
final int msgLen = messagesByteBuff.getInt();
// 2.获取下消息体长度,这里是用来记录下日志的
final int bodyLen = msgLen - 40;
// 3.如果消息长度比最大的消息长度都要大,这时候就记录日志,返回结果
if (msgLen > this.maxMessageSize) {
    CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen
        + ", maxMessageSize: " + this.maxMessageSize);
    return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
// 4.所有消息的长度
totalMsgLen += msgLen;

然后确认空间是否可用,注意一个 CommitLog 文件需要保留下 8 个空白空间,至于为什么留下 8 个字节空余空间,就是因为要把 msgStoreItemMemory 写入文件结尾,用于标识文件写满了。

// 5.确认空间是否可用,注意一个 CommitLog 文件需要保留下 8 个空白空间
if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
    // 这里就是空间不够用了,需要返回一个错误的魔数,魔数是 BLANK_MAGIC_CODE
    // 首先重置 msgStoreItemMemory 的属性,这个是一个临时变量,存储要处理的消息
    this.msgStoreItemMemory.clear();
    // 1.存储消息总长度
    this.msgStoreItemMemory.putInt(maxBlank);
    // 2.魔术设置为 BLANK_MAGIC_CODE
    this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
    // 3 The remaining space may be any value
    // 重置 messagesByteBuff
    messagesByteBuff.reset();
    // 重置 byteBuffer,这时候将 position 重新设置成方法开头 mark 的位置
    byteBuffer.reset();
    // 这里留 END_FILE_MIN_BLANK_LENGTH(8)个 空位就是为了把 msgStoreItemMemory 写入文件结尾
    byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
    // 返回错误结果,错误结果是 END_OF_FILE
    return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdSupplier, messageExtBatch.getStoreTimestamp(),
        beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}

到这里就是没有问题,可以把消息写入 byteBuffer 中,但是写入前修改下消息的总长度。

// 消息全部写入了之后重新设置 position 为 0
messagesByteBuff.position(0);
// 设置消息总长度
messagesByteBuff.limit(totalMsgLen);
// 接着把 messagesByteBuff 里面的数据追加到 byteBuffer 中
byteBuffer.put(messagesByteBuff);
// 重置为空
messageExtBatch.setEncodedBuff(null);
// 返回结果
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdSupplier,
 messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

消息写入后更新 ConsumeQueue 的最大物理偏移量,同时设置消息条数到 result 里面准备返回。

// 设置消息条数
result.setMsgNum(msgNum);
// 更新 ConsumeQueue 的最大偏移量为 queueOffset
CommitLog.this.topicQueueTable.put(key, queueOffset);

return result;

好了,这里就是全部逻辑了,下面就是全部代码。

/**
* 批量添加消息
* @param fileFromOffset
* @param byteBuffer
* @param maxBlank
* @param messageExtBatch, backed up by a byte array
* @param putMessageContext
* @return
*/
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
   final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) {
   // 设置下 byteBuffer 的 mark 标记为 position
   byteBuffer.mark();
   // 写指针的位置,物理偏移量
   long wroteOffset = fileFromOffset + byteBuffer.position();
   // ConsumeQueue 消息偏移量的 key,就是 topic-queueId
   String key = putMessageContext.getTopicQueueTableKey();
   // 获取偏移量
   Long queueOffset = CommitLog.this.topicQueueTable.get(key);
   if (null == queueOffset) {
       // 初始化为 0
       queueOffset = 0L;
       CommitLog.this.topicQueueTable.put(key, queueOffset);
   }
   long beginQueueOffset = queueOffset;
   int totalMsgLen = 0;
   int msgNum = 0;

   // 起始时间
   final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
   // 获取到之前编码过的 ByteBuffer
   ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff();

   // 消息类型
   int sysFlag = messageExtBatch.getSysFlag();
   // producer 端的地址长度(ip + port)
   int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
   // broker 端的地址长度(ip + port)
   int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
   // 构建消息 ID,在消息发送的时候会在 broker 端生成一个唯一 ID 进行标识
   Supplier<String> msgIdSupplier = () -> {
       // 如果是 IPV4,那么长度就是 16,IPV6 长度就是 28,默认就是 16
       // 这个消息 ID 组成是: ip + port + wroteOffset
       int msgIdLen = storeHostLength + 8;
       // 有多少条消息
       int batchCount = putMessageContext.getBatchSize();
       // 物理偏移量
       long[] phyPosArray = putMessageContext.getPhyPos();
       // 分配 msgIdLen 长度的 ByteBuffer
       ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
       // 将 socketAddress 里面的 ip + port 设置到 byteBuffer 中,然后切换读模式
       MessageExt.socketAddress2ByteBuffer(messageExtBatch.getStoreHost(), msgIdBuffer);
       // clear 重置 position
       msgIdBuffer.clear();

       StringBuilder buffer = new StringBuilder(batchCount * msgIdLen * 2 + batchCount - 1);
       // 遍历所有的物理偏移量
       for (int i = 0; i < phyPosArray.length; i++) {
           // 设置物理偏移量,msgIdBuffer = ip + port + phyPos
           msgIdBuffer.putLong(msgIdLen - 8, phyPosArray[i]);
           // 获取消息 ID
           String msgId = UtilAll.bytes2string(msgIdBuffer.array());
           if (i != 0) {
               // 使用 "," 拼接
               buffer.append(',');
           }
           buffer.append(msgId);
       }
       // 返回这批消息生成的唯一 ID
       return buffer.toString();
   };

   // 标记 position 的位置
   messagesByteBuff.mark();
   int index = 0;
   while (messagesByteBuff.hasRemaining()) {
       // 获取 messagesByteBuff 的 position
       final int msgPos = messagesByteBuff.position();
       // 1.消息长度
       final int msgLen = messagesByteBuff.getInt();
       // 2.获取下消息体长度,这里是用来记录下日志的
       final int bodyLen = msgLen - 40;
       // 3.如果消息长度比最大的消息长度都要大,这时候就记录日志,返回结果
       if (msgLen > this.maxMessageSize) {
           CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen
               + ", maxMessageSize: " + this.maxMessageSize);
           return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
       }
       // 4.所有消息的长度
       totalMsgLen += msgLen;
       // 5.确认空间是否可用,注意一个 CommitLog 文件需要保留下 8 个空白空间
       if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
           // 这里就是空间不够用了,需要返回一个错误的魔数,魔数是 BLANK_MAGIC_CODE
           // 首先重置 msgStoreItemMemory 的属性,这个是一个临时变量,存储要处理的消息
           this.msgStoreItemMemory.clear();
           // 1.存储消息总长度
           this.msgStoreItemMemory.putInt(maxBlank);
           // 2.魔术设置为 BLANK_MAGIC_CODE
           this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
           // 3 The remaining space may be any value
           // 重置 messagesByteBuff
           messagesByteBuff.reset();
           // 重置 byteBuffer,这时候将 position 重新设置成方法开头 mark 的位置
           byteBuffer.reset();
           // 这里留 END_FILE_MIN_BLANK_LENGTH(8)个 空位就是为了把 msgStoreItemMemory 写入文件结尾
           byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
           // 返回错误结果,错误结果是 END_OF_FILE
           return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdSupplier, messageExtBatch.getStoreTimestamp(),
               beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
       }
       // move to add queue offset and commitlog offset
       int pos = msgPos + 20;
       // 设置在 ConsumeQueue 的消息偏移量
       messagesByteBuff.putLong(pos, queueOffset);
       pos += 8;
       // 设置物理偏移量,也就是在 CommitLog 中的偏移量,wroteOffset + (所有消息长度 - 当前消息的长度)
       messagesByteBuff.putLong(pos, wroteOffset + totalMsgLen - msgLen);
       // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
       pos += 8 + 4 + 8 + bornHostLength;
       // 设置消息存储的时间
       messagesByteBuff.putLong(pos, messageExtBatch.getStoreTimestamp());
       // 设置在 CommitLog 中的偏移量,每一条消息都需要设置
       putMessageContext.getPhyPos()[index++] = wroteOffset + totalMsgLen - msgLen;
       // ConsumeQueue 偏移量 +1,所以正常来求出 ConsumeQueue 的偏移量要通过 queueOffset * 20 求出来
       queueOffset++;
       // 消息数 + 1
       msgNum++;
       // 设置 position,下一条消息将从 msgPos + msgLen 开始写入
       messagesByteBuff.position(msgPos + msgLen);
   }

   // 消息全部写入了之后重新设置 position 为 0
   messagesByteBuff.position(0);
   // 设置消息总长度
   messagesByteBuff.limit(totalMsgLen);
   // 接着把 messagesByteBuff 里面的数据追加到 byteBuffer 中
   byteBuffer.put(messagesByteBuff);
   // 重置为空
   messageExtBatch.setEncodedBuff(null);
   // 返回结果
   AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdSupplier,
       messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
   // 设置消息条数
   result.setMsgNum(msgNum);
   // 更新 ConsumeQueue 的最大偏移量为 queueOffset
   CommitLog.this.topicQueueTable.put(key, queueOffset);

   return result;
}

3. 小结

这篇文章介绍了 broker 端存储批量消息的逻辑,但是从上一篇文章到这篇文章,我们就能发现消息存储的时候只是存储到了 ByteBuffer,而最终在 CommitLog#asyncPutMessages 方法中会添加一个刷盘请求,那么这个刷盘请求如何起作用?RocketMQ 有多少种刷盘请求?这些问题我们都需要探讨。





如有错误,欢迎指出!!!


http://www.kler.cn/a/527831.html

相关文章:

  • 21.3-启动流程、编码风格(了解) 第21章-FreeRTOS项目实战--基础知识之新建任务、启动流程、编码风格、系统配置 文件组成和编码风格(了解)
  • 商密测评题库详解:商用密码应用安全性评估从业人员考核题库详细解析(8)
  • 自制一个入门STM32 四足机器人具体开发顺序
  • 春节期间,景区和酒店如何合理用工?
  • 一文了解性能优化的方法
  • buu-pwn1_sctf_2016-好久不见29
  • CE-PBFT:大规模联盟区块链的高可用一致性算法
  • Unet 改进:在encoder和decoder间加入TransformerBlock
  • 【leetcode强化练习·二叉树】同时运用两种思维解题
  • 【Java异步编程】CompletableFuture基础(1):创建不同线程的子任务、子任务链式调用与异常处理
  • 黑马点评 - 商铺类型缓存练习题(Redis List实现)
  • Hive:复杂数据类型之Map函数
  • 深度学习之“数据的相关性”
  • 人工智能导论--第1章-知识点与学习笔记
  • 用一个例子详细说明python单例模式
  • 【AI论文】VideoAuteur:迈向长叙事视频
  • gentoo linux中安装希沃白板5
  • Docker技术简介
  • Longformer:处理长文档的Transformer模型
  • 6.二分算法
  • 舵机型号与识别
  • Go学习:Go语言中if、switch、for语句与其他编程语言中相应语句的格式区别
  • 三天急速通关JavaWeb基础知识:Day 2 前端基础知识(计划有变,前端工程化部分暂时搁置)
  • Vue.js 生命周期钩子在 Composition API 中的应用
  • 《解锁DeepSeek本地部署:开启你的专属AI之旅》
  • 绝对值线性化