【RocketMQ 存储】- broker 端存储批量消息的逻辑
文章目录
- 1. 前言
- 2. DefaultMessageStore#asyncPutMessages 批量新增消息
- 2.1 Commit#asyncPutMessages
- 2.2 Commit#encode 批量编码消息
- 2.3 MappedFile#appendMessages 添加批量消息
- 2.4 MappedFile#appendMessagesInner添加批量消息
- 2.5 CommitLog#doAppend
- 3. 小结
本文章基于 RocketMQ 4.9.3
1. 前言
上一篇文章中我们解析了单条消息添加逻辑,那么这篇文章中我们就来解析下批量新增消息的逻辑,批量新增消息的入口在 DefaultMessageStore#asyncPutMessages。上一篇文章的地址:【RocketMQ 存储】- broker 端存储单条消息的逻辑
2. DefaultMessageStore#asyncPutMessages 批量新增消息
上一篇文章我们介绍了单条消息的添加,这篇文章中批量新增的一些方法和单挑消息的新增是一模一样的,所以这里直接看注释就行了。
但是这里要注意一点,消息参数 MessageExtBatch 里面的 body 参数是一个 byte[]
数组,而所有要添加的消息会存储到一个数组中,也就是拼接的方式。
/**
* 消息批量添加
* @param messageExtBatch the message batch
* @return
*/
public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
// 1.检查下存储服务的状态
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
// 这里就是消息存储服务不可用或者操作系统页繁忙,直接返回结果
return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
}
// 2.校验消息看看是否合法
PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
// 消息长度不合法
return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
}
// 3.当前时间
long beginTime = this.getSystemClock().now();
// 4.存储消息的核心逻辑
CompletableFuture<PutMessageResult> resultFuture = this.commitLog.asyncPutMessages(messageExtBatch);
// 这里就是结果处理
resultFuture.thenAccept((result) -> {
// 当消息存储完成之后,lambda 表达式会被调用
// 消息消耗的时间
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);
}
// 设置下存储消息的消耗时间和最大消耗时间
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
// 这里就是存储失败了,新增存储失败的次数
this.storeStatsService.getPutMessageFailedTimes().add(1);
}
});
return resultFuture;
}
2.1 Commit#asyncPutMessages
/**
* broker 异步存储消息
* @param messageExtBatch 批量消息
* @return
*/
public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch messageExtBatch) {
// 设置消息的存储时间
messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
AppendMessageResult result;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
// 消息的事务类型
final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
// 事务消息不会走这个方法,普通消息才可以使用批量添加
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
}
if (messageExtBatch.getDelayTimeLevel() > 0) {
// 延时消息也不可以批量添加
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
}
// 设置 producer 端的地址
InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
// 如果是 IPV6,就设置下消息的发送端标记
messageExtBatch.setBornHostV6Flag();
}
// 设置 broker 端的地址
InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
// 如果是 IPV6,就设置下消息的发送端标记
messageExtBatch.setStoreHostAddressV6Flag();
}
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
// 获取 CommitLog 中最后一个 MappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 细粒度锁
PutMessageThreadLocal pmThreadLocal = this.putMessageThreadLocal.get();
// 消息编码器
MessageExtEncoder batchEncoder = pmThreadLocal.getEncoder();
// 消息存储上下文,里面包括 consumeQueueTable 的 key(topic-queueId)和批量消息
PutMessageContext putMessageContext = new PutMessageContext(generateKey(pmThreadLocal.getKeyBuilder(), messageExtBatch));
// 设置编码后的消息,说是编码,其实就是用一个 ByteBuffer 存储这批消息
messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext));
// 加锁
putMessageLock.lock();
try {
// 当前时间
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
// 加锁之后设置添加消息的初始时间
this.beginTimeInLock = beginLockTimestamp;
// 设置消息的存储时间为加锁的时间,确保全局有序
messageExtBatch.setStoreTimestamp(beginLockTimestamp);
// 这里就是说如果获取不到 MappedFile 或者获取到的 MappedFile 已经写满了,这时候会获取或者创建下一个 MappedFile
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
// 如果还是获取不到,那么说明创建失败了
if (null == mappedFile) {
// 返回结果 CREATE_MAPEDFILE_FAILED
log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
// 核心逻辑,追加消息到 CommitLog 中
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
// 下面判断下追加消息的结果
switch (result.getStatus()) {
// 添加成功,直接退出
case PUT_OK:
break;
// 文件剩余空间不足,那么初始化新的文件并尝试再次存储
case END_OF_FILE:
// 记录上一个文件
unlockMappedFile = mappedFile;
// 创建一个新的文件,重新往里面写入
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// 创建失败,直接返回错误结果 CREATE_MAPEDFILE_FAILED
log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
}
// 创建成功后再次添加消息
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
// 这里就是消息的长度或者属性长度错误
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
default:
// 未知错误
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}
// 这里就是加锁的时间,也是添加消息所耗费的时间
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
} finally {
// 最后解锁并且重置 beginTimeInLock
beginTimeInLock = 0;
putMessageLock.unlock();
}
// 如果加锁时间超过了 500ms
if (elapsedTimeInLock > 500) {
// 记录日志
log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result);
}
// 文件写满了并启用文件预热
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
// 对 unlockMappedFile 解锁,这里解锁是使用 munlock 解除这片内存背后锁定的 page cache,这下就能够交换到 swap 空间了
// 因为这边 unlockMappedFile 已经写满了,所以这片空间可以解锁方便 swap 交换到磁盘 swap 空间,读写的重点在新的 MappedFile
// 文件上,对于旧的 unlockMappedFile 读写就没有那么多了
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
// 返回结果
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// 下面是一些数据统计,如写入 topic 的消息数 + 1
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(result.getMsgNum());
// 存储的消息总字节 + result.getWroteBytes()
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(result.getWroteBytes());
// 上面添加消息到 CommitLog 只是添加到背后的 ByteBuffer,接下来需要提交刷盘请求
CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, messageExtBatch);
// RocketMQ 主节点写入数据之后,向从节点提交消息复制请求,让 slave 节点去同步 master 节点新写入的消息
CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, messageExtBatch);
// 处理刷盘请求
return flushOKFuture.thenCombine(replicaOKFuture, (flushStatus, replicaStatus) -> {
// 外层会阻塞等待刷盘请求和从节点复制请求的结果
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(flushStatus);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
}
return putMessageResult;
});
}
要注意,对于普通消息才可以使用这个批量添加的方法,事务消息和延时消息是不可以批量添加的。
2.2 Commit#encode 批量编码消息
在这个方法中首先初始化 encoderBuffer,就是存储解析之后的消息。
// 重置 ByteBuffer,这里不是线程安全的
encoderBuffer.clear();
int totalMsgLen = 0;
// 消息具体内容,MessageExtBatch 里面的 body 存储了所有的 Message,顺序存储
ByteBuffer messagesByteBuff = messageExtBatch.wrap();
然后通过消息类型来分配 producer 和 broker 的地址 ByteBuffer。
// 消息类型
int sysFlag = messageExtBatch.getSysFlag();
// producer 的地址,ip + port
int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
// broker 的地址,ip + port
int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
// 分配两个 ByteBuffer
ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
接着获取消息的属性长度。
// 批量消息的属性
String batchPropStr = MessageDecoder.messageProperties2String(messageExtBatch.getProperties());
final byte[] batchPropData = batchPropStr.getBytes(MessageDecoder.CHARSET_UTF8);
int batchPropDataLen = batchPropData.length;
if (batchPropDataLen > Short.MAX_VALUE) {
// 如果属性字节数 > 32767,那么说明消息异常
CommitLog.log.warn("Properties size of messageExtBatch exceeded, properties size: {}, maxSize: {}.", batchPropDataLen, Short.MAX_VALUE);
throw new RuntimeException("Properties size of messageExtBatch exceeded!");
}
// 消息属性长度
final short batchPropLen = (short) batchPropDataLen;
接下来遍历编码的 messagesByteBuff,一条一条写入到 encoderBuffer 中。
// 消息数
int batchSize = 0;
while (messagesByteBuff.hasRemaining()) {
...
}
下面看下 while 里面的逻辑,首先就是获取各种动态变量,如 bodyLen,propertiesLen 等,因为要计算消息长度就需要传入这些动态变量来计算。
// 消息数 + 1
batchSize++;
// 1.消息的总长度(TOTALSIZE)
messagesByteBuff.getInt();
// 2.魔法数(MAGICCODE),用于标识消息的版本或类型
messagesByteBuff.getInt();
// 3.消息体的 CRC(BODYCRC),用于校验消息体的完整性
messagesByteBuff.getInt();
// 4.消息的标志(FLAG),用于标识消息的一些特性
int flag = messagesByteBuff.getInt();
// 5.消息体长度、消息体的指针
int bodyLen = messagesByteBuff.getInt();
int bodyPos = messagesByteBuff.position();
// 6.获取消息体 CRC 校验码
int bodyCrc = UtilAll.crc32(messagesByteBuff.array(), bodyPos, bodyLen);
messagesByteBuff.position(bodyPos + bodyLen);
// 7.消息属性长度、消息属性指针
short propertiesLen = messagesByteBuff.getShort();
int propertiesPos = messagesByteBuff.position();
messagesByteBuff.position(propertiesPos + propertiesLen);
在上面的方法中 messagesByteBuff.getInt() 等起到的作用就是将 position 指针向后移动。
我举个例子,上面这些代码中读取 properties 属性的时候 short propertiesLen = messagesByteBuff.getShort();
这个方法获取到了属性的长度,这时候 position 指向的位置是属性起始位置。
上面关注绿色部分就行了,这时候 position 指向了属性内容的第一个下标位置,然后设置 messagesByteBuff.position(propertiesPos + propertiesLen)
,比如按照上面的图,这个 propertiesLen
算出来就是 4,所以这里就是将 position 挪到属性后面的第一个下标。
看完这个例子大家应该就知道这里 position 是怎么移动的了。但是除了属性、topic 等,对于批量消息,这里会额外判断需不需要新增分隔符,批量消息中每一条消息都有属性值,而除了每条消息自身属性值之外,MessageExtBatch.properties
提供了所有消息共用的属性值。
这个分隔符就是用来分割自身属性值和共用属性值的。
// 8.messagesByteBuff 是否需要添加一个属性分隔符再添加属性(body 和 properties 的分隔符),这个分隔符就是 PROPERTY_SEPARATOR
boolean needAppendLastPropertySeparator = propertiesLen > 0 && batchPropLen > 0
&& messagesByteBuff.get(messagesByteBuff.position() - 1) != MessageDecoder.PROPERTY_SEPARATOR;
好了,继续 encode 的逻辑,下面就是 topic 的处理以及属性的长度计算。
// 9.消息 topic
final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
// 10.消息 topic 的长度
final int topicLength = topicData.length;
// 11.如果需要添加,那么 totalPropLen(属性部分总长度)需要 + 1,这个 1 就表示属性分隔符的长度(char 类型)
int totalPropLen = needAppendLastPropertySeparator ? propertiesLen + batchPropLen + 1
: propertiesLen + batchPropLen;
到这里我们就获取了所有动态计算的长度,下面就要计算消息总长度了。
// 12.计算消息总长度
final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, totalPropLen);
/**
* 计算消息长度
* @param sysFlag 是否是 IPV6
* @param bodyLength 消息体长度
* @param topicLength topic 长度
* @param propertiesLength 消息属性长度
* @return
*/
protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {
// 如果是 IPV6 就是 20,否则是 8,这是因为 bornhost 是包括 IP + port 的,所以会多上 4 字节
int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
// 这里也是同理
int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
// 计算一条 CommitLog 消息的长度
final int msgLen = 4 //TOTALSIZE
+ 4 //MAGICCODE
+ 4 //BODYCRC
+ 4 //QUEUEID
+ 4 //FLAG
+ 8 //QUEUEOFFSET
+ 8 //PHYSICALOFFSET
+ 4 //SYSFLAG
+ 8 //BORNTIMESTAMP
+ bornhostLength //BORNHOST
+ 8 //STORETIMESTAMP
+ storehostAddressLength //STOREHOSTADDRESS
+ 4 //RECONSUMETIMES
+ 8 //Prepared Transaction Offset
+ 4 + (bodyLength > 0 ? bodyLength : 0) //BODY
+ 1 + topicLength //TOPIC
+ 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength
+ 0;
return msgLen;
}
计算出长度之后,就需要校验消息长度了,长度不合法的话就抛出异常。
// 13.消息长度合法校验
if (msgLen > this.maxMessageSize) {
// 如果消息长度不合法,那么抛出异常
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen
+ ", maxMessageSize: " + this.maxMessageSize);
throw new RuntimeException("message size exceeded");
}
下面再去计算总长度,这个总长度是所有消息的长度,msgLen 是单条消息的长度,如果总长度超过 1024 * 1024 * 4,也就是 4M,就抛出异常,这个长度是默认值,可以自己设置。
// 14.totalMsgLen 就是消息的总长度
totalMsgLen += msgLen;
// Determines whether there is sufficient free space
if (totalMsgLen > maxMessageSize) {
// 如果消息总长度大于 maxMessageSize,那么抛出异常
throw new RuntimeException("message size exceeded");
}
上面校验通过之后,就到了消息写入阶段了,消息会写入到 encoderBuffer 中。
// 1 TOTALSIZE
this.encoderBuffer.putInt(msgLen);
// 2 MAGICCODE
this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.encoderBuffer.putInt(bodyCrc);
// 4 QUEUEID
this.encoderBuffer.putInt(messageExtBatch.getQueueId());
// 5 FLAG
this.encoderBuffer.putInt(flag);
// 6 QUEUEOFFSET
this.encoderBuffer.putLong(0);
// 7 PHYSICALOFFSET
this.encoderBuffer.putLong(0);
// 8 SYSFLAG
this.encoderBuffer.putInt(messageExtBatch.getSysFlag());
// 9 BORNTIMESTAMP
this.encoderBuffer.putLong(messageExtBatch.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(bornHostHolder, bornHostLength);
this.encoderBuffer.put(messageExtBatch.getBornHostBytes(bornHostHolder));
// 11 STORETIMESTAMP
this.encoderBuffer.putLong(messageExtBatch.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(storeHostHolder, storeHostLength);
this.encoderBuffer.put(messageExtBatch.getStoreHostBytes(storeHostHolder));
// 13 RECONSUMETIMES
this.encoderBuffer.putInt(messageExtBatch.getReconsumeTimes());
// 14 Prepared Transaction Offset, batch does not support transaction
this.encoderBuffer.putLong(0);
// 15 BODY
this.encoderBuffer.putInt(bodyLen);
if (bodyLen > 0)
// messagesByteBuff 里面记录了 body 和 properties,这里就是根据不同下标来获取不同内容,这里是 body
this.encoderBuffer.put(messagesByteBuff.array(), bodyPos, bodyLen);
// 16 TOPIC
this.encoderBuffer.put((byte) topicLength);
this.encoderBuffer.put(topicData);
// 17 PROPERTIES
this.encoderBuffer.putShort((short) totalPropLen);
if (propertiesLen > 0) {
// 这里是获取 properties 长度
this.encoderBuffer.put(messagesByteBuff.array(), propertiesPos, propertiesLen);
}
if (batchPropLen > 0) {
// 这里是获取 properties 具体内容
if (needAppendLastPropertySeparator) {
// 需要新增一个属性分隔值 MessageDecoder.PROPERTY_SEPARATOR
this.encoderBuffer.put((byte) MessageDecoder.PROPERTY_SEPARATOR);
}
// 再添加 MessageBatch 里面设置的批量公共属性值
this.encoderBuffer.put(batchPropData, 0, batchPropLen);
}
上面就是一条消息的内容,到这里就是 while 循环里面的全部内容。
那么当消息全部设置到 encoderBuffer
中,这时候就设置消息数,也就是一共有多少条消息,然后再设置偏移量数组,最后再切换读模式,外层就可以直接通过 encoderBuffer 读取消息了。
// 设置消息数(多少条消息)
putMessageContext.setBatchSize(batchSize);
// 设置偏移量数组
putMessageContext.setPhyPos(new long[batchSize]);
// 切换读模式,外层就可以直接根据 position 进行调用了
encoderBuffer.flip();
这里就是批量编码消息的逻辑,下面就是整体代码。
/**
* 批量消息编码
* @param messageExtBatch
* @param putMessageContext
* @return
*/
protected ByteBuffer encode(final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) {
// 重置 ByteBuffer,这里不是线程安全的
encoderBuffer.clear();
int totalMsgLen = 0;
// 消息具体内容,MessageExtBatch 里面的 body 存储了所有的 Message,顺序存储
ByteBuffer messagesByteBuff = messageExtBatch.wrap();
// 消息类型
int sysFlag = messageExtBatch.getSysFlag();
// producer 的地址,ip + port
int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
// broker 的地址,ip + port
int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
// 分配两个 ByteBuffer
ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
// 批量消息的属性
String batchPropStr = MessageDecoder.messageProperties2String(messageExtBatch.getProperties());
final byte[] batchPropData = batchPropStr.getBytes(MessageDecoder.CHARSET_UTF8);
int batchPropDataLen = batchPropData.length;
if (batchPropDataLen > Short.MAX_VALUE) {
// 如果属性字节数 > 32767,那么说明消息异常
CommitLog.log.warn("Properties size of messageExtBatch exceeded, properties size: {}, maxSize: {}.", batchPropDataLen, Short.MAX_VALUE);
throw new RuntimeException("Properties size of messageExtBatch exceeded!");
}
// 消息属性长度
final short batchPropLen = (short) batchPropDataLen;
// 消息数
int batchSize = 0;
while (messagesByteBuff.hasRemaining()) {
// 消息数 + 1
batchSize++;
// 1.消息的总长度(TOTALSIZE)
messagesByteBuff.getInt();
// 2.魔法数(MAGICCODE),用于标识消息的版本或类型
messagesByteBuff.getInt();
// 3.消息体的 CRC(BODYCRC),用于校验消息体的完整性
messagesByteBuff.getInt();
// 4.消息的标志(FLAG),用于标识消息的一些特性
int flag = messagesByteBuff.getInt();
// 5.消息体长度、消息体的指针
int bodyLen = messagesByteBuff.getInt();
int bodyPos = messagesByteBuff.position();
// 6.获取消息体 CRC 校验码
int bodyCrc = UtilAll.crc32(messagesByteBuff.array(), bodyPos, bodyLen);
messagesByteBuff.position(bodyPos + bodyLen);
// 7.消息属性长度、消息属性指针
short propertiesLen = messagesByteBuff.getShort();
int propertiesPos = messagesByteBuff.position();
messagesByteBuff.position(propertiesPos + propertiesLen);
// 8.messagesByteBuff 是否需要添加一个属性分隔符再添加属性(body 和 properties 的分隔符),这个分隔符就是 PROPERTY_SEPARATOR
boolean needAppendLastPropertySeparator = propertiesLen > 0 && batchPropLen > 0
&& messagesByteBuff.get(messagesByteBuff.position() - 1) != MessageDecoder.PROPERTY_SEPARATOR;
// 9.消息 topic
final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
// 10.消息 topic 的长度
final int topicLength = topicData.length;
// 11.如果需要添加,那么 totalPropLen(属性部分总长度)需要 + 1,这个 1 就表示属性分隔符的长度(char 类型)
int totalPropLen = needAppendLastPropertySeparator ? propertiesLen + batchPropLen + 1
: propertiesLen + batchPropLen;
// 12.计算消息总长度
final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, totalPropLen);
// 13.消息长度合法校验
if (msgLen > this.maxMessageSize) {
// 如果消息长度不合法,那么抛出异常
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen
+ ", maxMessageSize: " + this.maxMessageSize);
throw new RuntimeException("message size exceeded");
}
// 14.totalMsgLen 就是消息的总长度
totalMsgLen += msgLen;
// Determines whether there is sufficient free space
if (totalMsgLen > maxMessageSize) {
// 如果消息总长度大于 maxMessageSize,那么抛出异常
throw new RuntimeException("message size exceeded");
}
// 下面就是具体的消息写入了,消息会写入到 encoderBuffer 中
// 1 TOTALSIZE
this.encoderBuffer.putInt(msgLen);
// 2 MAGICCODE
this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.encoderBuffer.putInt(bodyCrc);
// 4 QUEUEID
this.encoderBuffer.putInt(messageExtBatch.getQueueId());
// 5 FLAG
this.encoderBuffer.putInt(flag);
// 6 QUEUEOFFSET
this.encoderBuffer.putLong(0);
// 7 PHYSICALOFFSET
this.encoderBuffer.putLong(0);
// 8 SYSFLAG
this.encoderBuffer.putInt(messageExtBatch.getSysFlag());
// 9 BORNTIMESTAMP
this.encoderBuffer.putLong(messageExtBatch.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(bornHostHolder, bornHostLength);
this.encoderBuffer.put(messageExtBatch.getBornHostBytes(bornHostHolder));
// 11 STORETIMESTAMP
this.encoderBuffer.putLong(messageExtBatch.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(storeHostHolder, storeHostLength);
this.encoderBuffer.put(messageExtBatch.getStoreHostBytes(storeHostHolder));
// 13 RECONSUMETIMES
this.encoderBuffer.putInt(messageExtBatch.getReconsumeTimes());
// 14 Prepared Transaction Offset, batch does not support transaction
this.encoderBuffer.putLong(0);
// 15 BODY
this.encoderBuffer.putInt(bodyLen);
if (bodyLen > 0)
// messagesByteBuff 里面记录了 body 和 properties,这里就是根据不同下标来获取不同内容,这里是 body
this.encoderBuffer.put(messagesByteBuff.array(), bodyPos, bodyLen);
// 16 TOPIC
this.encoderBuffer.put((byte) topicLength);
this.encoderBuffer.put(topicData);
// 17 PROPERTIES
this.encoderBuffer.putShort((short) totalPropLen);
if (propertiesLen > 0) {
// 这里是获取 properties 长度
this.encoderBuffer.put(messagesByteBuff.array(), propertiesPos, propertiesLen);
}
if (batchPropLen > 0) {
// 这里是获取 properties 具体内容
if (needAppendLastPropertySeparator) {
// 需要新增一个属性分隔值 MessageDecoder.PROPERTY_SEPARATOR
this.encoderBuffer.put((byte) MessageDecoder.PROPERTY_SEPARATOR);
}
// 再添加属性值
this.encoderBuffer.put(batchPropData, 0, batchPropLen);
}
}
// 设置消息数(多少条消息)
putMessageContext.setBatchSize(batchSize);
// 设置偏移量数组
putMessageContext.setPhyPos(new long[batchSize]);
// 切换读模式,外层就可以直接根据 position 进行调用了
encoderBuffer.flip();
return encoderBuffer;
}
2.3 MappedFile#appendMessages 添加批量消息
/**
* 和上面的 appendMessage 差不多,这里是批量添加消息的逻辑
* @param messageExtBatch
* @param cb
* @param putMessageContext
* @return
*/
public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
return appendMessagesInner(messageExtBatch, cb, putMessageContext);
}
2.4 MappedFile#appendMessagesInner添加批量消息
/**
* 把消息追加到 CommitLog 文件结尾,这个方法是 CommitLog 调用的
* @param messageExt
* @param cb
* @param putMessageContext
* @return
*/
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
assert messageExt != null;
assert cb != null;
// 获取写指针的位置
int currentPos = this.wrotePosition.get();
// 写指针小于文件大小,可以写入
if (currentPos < this.fileSize) {
// 是否开启堆外缓存,如果开启了就使用 writeBuffer 进行写入(读写分离)
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
// 标记写入位置
byteBuffer.position(currentPos);
// 消息写入结果
AppendMessageResult result;
if (messageExt instanceof MessageExtBrokerInner) {
// 单条消息写入
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBrokerInner) messageExt, putMessageContext);
} else if (messageExt instanceof MessageExtBatch) {
// 批量消息写入
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBatch) messageExt, putMessageContext);
} else {
// 不知道是什么
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
// 更新写入位置
this.wrotePosition.addAndGet(result.getWroteBytes());
// 设置消息存入的时间(最新)
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
这里是单条消息的追加和批量消息的追加,都会调用这个方法,我们主要看的是里面的批量追加逻辑。
2.5 CommitLog#doAppend
这个就是最终的核心追加方法,来看下方法的定义。
/**
* 批量添加消息
* @param fileFromOffset
* @param byteBuffer
* @param maxBlank
* @param messageExtBatch, backed up by a byte array
* @param putMessageContext
* @return
*/
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) {
...
}
首先通过 byteBuffer.mark 去做下标记,意味者 mark 设置为 position,这个 position 通过 slice 初始化出来就是 0。
// 设置下 byteBuffer 的 mark 标记为 position
byteBuffer.mark();
然后获取写指针的位置,这个位置就是在 CommitLog 中的物理偏移量,fileFromOffset
代表这个 CommitLog 的起始位置,byteBuffer.position()
代表当前这个文件写到哪个位置。
// 写指针的位置,物理偏移量
long wroteOffset = fileFromOffset + byteBuffer.position();
接下来获取 ConsumeQueue 中对应的最大物理偏移量并更新。
// ConsumeQueue 消息偏移量的 key,就是 topic-queueId
String key = putMessageContext.getTopicQueueTableKey();
// 获取偏移量
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
// 初始化为 0
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
long beginQueueOffset = queueOffset;
下面继续获取消息类型、producer 端地址长度(ip + port)、broker 端的地址长度(ip + port),因为这几个是动态变量,需要计算出来。
// 消息类型
int sysFlag = messageExtBatch.getSysFlag();
// producer 端的地址长度(ip + port)
int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
// broker 端的地址长度(ip + port)
int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
然后构建消息 ID,在消息发送的时候会在 broker 端生成一个唯一 ID 进行标识,批量消息也是一样的,生成的消息 ID 格式如下:(ip1 + port1 + phyPosArray[0])+ (ip2 + port2 + phyPosArray[1])...
,phyPosArray 是前面创建出来的物理偏移量数组,但是到这里好像也没有初始化,所以不太能确定。
// 构建消息 ID,在消息发送的时候会在 broker 端生成一个唯一 ID 进行标识
Supplier<String> msgIdSupplier = () -> {
// 如果是 IPV4,那么长度就是 16,IPV6 长度就是 28,默认就是 16
// 这个消息 ID 组成是: ip + port + wroteOffset
int msgIdLen = storeHostLength + 8;
// 有多少条消息
int batchCount = putMessageContext.getBatchSize();
// 物理偏移量
long[] phyPosArray = putMessageContext.getPhyPos();
// 分配 msgIdLen 长度的 ByteBuffer
ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
// 将 socketAddress 里面的 ip + port 设置到 byteBuffer 中,然后切换读模式
MessageExt.socketAddress2ByteBuffer(messageExtBatch.getStoreHost(), msgIdBuffer);
// clear 重置 position
msgIdBuffer.clear();
StringBuilder buffer = new StringBuilder(batchCount * msgIdLen * 2 + batchCount - 1);
// 遍历所有的物理偏移量
for (int i = 0; i < phyPosArray.length; i++) {
// 设置物理偏移量,msgIdBuffer = ip + port + phyPos
msgIdBuffer.putLong(msgIdLen - 8, phyPosArray[i]);
// 获取消息 ID
String msgId = UtilAll.bytes2string(msgIdBuffer.array());
if (i != 0) {
// 使用 "," 拼接
buffer.append(',');
}
buffer.append(msgId);
}
// 返回这批消息生成的唯一 ID
return buffer.toString();
};
下面再对编码的消息 ByteBuffer 做一下标记 mark。
// 标记 position 的位置
messagesByteBuff.mark();
int index = 0;
进入 while 循环,接下来就在 while 循环里面追加消息。
while (messagesByteBuff.hasRemaining()) {
...
}
在 while 循环中首先记录下几个属性。
// 获取 messagesByteBuff 的 position
final int msgPos = messagesByteBuff.position();
// 1.消息长度
final int msgLen = messagesByteBuff.getInt();
// 2.获取下消息体长度,这里是用来记录下日志的
final int bodyLen = msgLen - 40;
// 3.如果消息长度比最大的消息长度都要大,这时候就记录日志,返回结果
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
// 4.所有消息的长度
totalMsgLen += msgLen;
然后确认空间是否可用,注意一个 CommitLog 文件需要保留下 8 个空白空间,至于为什么留下 8 个字节空余空间,就是因为要把 msgStoreItemMemory 写入文件结尾,用于标识文件写满了。
// 5.确认空间是否可用,注意一个 CommitLog 文件需要保留下 8 个空白空间
if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
// 这里就是空间不够用了,需要返回一个错误的魔数,魔数是 BLANK_MAGIC_CODE
// 首先重置 msgStoreItemMemory 的属性,这个是一个临时变量,存储要处理的消息
this.msgStoreItemMemory.clear();
// 1.存储消息总长度
this.msgStoreItemMemory.putInt(maxBlank);
// 2.魔术设置为 BLANK_MAGIC_CODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// 重置 messagesByteBuff
messagesByteBuff.reset();
// 重置 byteBuffer,这时候将 position 重新设置成方法开头 mark 的位置
byteBuffer.reset();
// 这里留 END_FILE_MIN_BLANK_LENGTH(8)个 空位就是为了把 msgStoreItemMemory 写入文件结尾
byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
// 返回错误结果,错误结果是 END_OF_FILE
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdSupplier, messageExtBatch.getStoreTimestamp(),
beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
到这里就是没有问题,可以把消息写入 byteBuffer 中,但是写入前修改下消息的总长度。
// 消息全部写入了之后重新设置 position 为 0
messagesByteBuff.position(0);
// 设置消息总长度
messagesByteBuff.limit(totalMsgLen);
// 接着把 messagesByteBuff 里面的数据追加到 byteBuffer 中
byteBuffer.put(messagesByteBuff);
// 重置为空
messageExtBatch.setEncodedBuff(null);
// 返回结果
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdSupplier,
messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
消息写入后更新 ConsumeQueue 的最大物理偏移量,同时设置消息条数到 result 里面准备返回。
// 设置消息条数
result.setMsgNum(msgNum);
// 更新 ConsumeQueue 的最大偏移量为 queueOffset
CommitLog.this.topicQueueTable.put(key, queueOffset);
return result;
好了,这里就是全部逻辑了,下面就是全部代码。
/**
* 批量添加消息
* @param fileFromOffset
* @param byteBuffer
* @param maxBlank
* @param messageExtBatch, backed up by a byte array
* @param putMessageContext
* @return
*/
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) {
// 设置下 byteBuffer 的 mark 标记为 position
byteBuffer.mark();
// 写指针的位置,物理偏移量
long wroteOffset = fileFromOffset + byteBuffer.position();
// ConsumeQueue 消息偏移量的 key,就是 topic-queueId
String key = putMessageContext.getTopicQueueTableKey();
// 获取偏移量
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
// 初始化为 0
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
long beginQueueOffset = queueOffset;
int totalMsgLen = 0;
int msgNum = 0;
// 起始时间
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// 获取到之前编码过的 ByteBuffer
ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff();
// 消息类型
int sysFlag = messageExtBatch.getSysFlag();
// producer 端的地址长度(ip + port)
int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
// broker 端的地址长度(ip + port)
int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
// 构建消息 ID,在消息发送的时候会在 broker 端生成一个唯一 ID 进行标识
Supplier<String> msgIdSupplier = () -> {
// 如果是 IPV4,那么长度就是 16,IPV6 长度就是 28,默认就是 16
// 这个消息 ID 组成是: ip + port + wroteOffset
int msgIdLen = storeHostLength + 8;
// 有多少条消息
int batchCount = putMessageContext.getBatchSize();
// 物理偏移量
long[] phyPosArray = putMessageContext.getPhyPos();
// 分配 msgIdLen 长度的 ByteBuffer
ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
// 将 socketAddress 里面的 ip + port 设置到 byteBuffer 中,然后切换读模式
MessageExt.socketAddress2ByteBuffer(messageExtBatch.getStoreHost(), msgIdBuffer);
// clear 重置 position
msgIdBuffer.clear();
StringBuilder buffer = new StringBuilder(batchCount * msgIdLen * 2 + batchCount - 1);
// 遍历所有的物理偏移量
for (int i = 0; i < phyPosArray.length; i++) {
// 设置物理偏移量,msgIdBuffer = ip + port + phyPos
msgIdBuffer.putLong(msgIdLen - 8, phyPosArray[i]);
// 获取消息 ID
String msgId = UtilAll.bytes2string(msgIdBuffer.array());
if (i != 0) {
// 使用 "," 拼接
buffer.append(',');
}
buffer.append(msgId);
}
// 返回这批消息生成的唯一 ID
return buffer.toString();
};
// 标记 position 的位置
messagesByteBuff.mark();
int index = 0;
while (messagesByteBuff.hasRemaining()) {
// 获取 messagesByteBuff 的 position
final int msgPos = messagesByteBuff.position();
// 1.消息长度
final int msgLen = messagesByteBuff.getInt();
// 2.获取下消息体长度,这里是用来记录下日志的
final int bodyLen = msgLen - 40;
// 3.如果消息长度比最大的消息长度都要大,这时候就记录日志,返回结果
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
// 4.所有消息的长度
totalMsgLen += msgLen;
// 5.确认空间是否可用,注意一个 CommitLog 文件需要保留下 8 个空白空间
if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
// 这里就是空间不够用了,需要返回一个错误的魔数,魔数是 BLANK_MAGIC_CODE
// 首先重置 msgStoreItemMemory 的属性,这个是一个临时变量,存储要处理的消息
this.msgStoreItemMemory.clear();
// 1.存储消息总长度
this.msgStoreItemMemory.putInt(maxBlank);
// 2.魔术设置为 BLANK_MAGIC_CODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// 重置 messagesByteBuff
messagesByteBuff.reset();
// 重置 byteBuffer,这时候将 position 重新设置成方法开头 mark 的位置
byteBuffer.reset();
// 这里留 END_FILE_MIN_BLANK_LENGTH(8)个 空位就是为了把 msgStoreItemMemory 写入文件结尾
byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
// 返回错误结果,错误结果是 END_OF_FILE
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdSupplier, messageExtBatch.getStoreTimestamp(),
beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
// move to add queue offset and commitlog offset
int pos = msgPos + 20;
// 设置在 ConsumeQueue 的消息偏移量
messagesByteBuff.putLong(pos, queueOffset);
pos += 8;
// 设置物理偏移量,也就是在 CommitLog 中的偏移量,wroteOffset + (所有消息长度 - 当前消息的长度)
messagesByteBuff.putLong(pos, wroteOffset + totalMsgLen - msgLen);
// 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
pos += 8 + 4 + 8 + bornHostLength;
// 设置消息存储的时间
messagesByteBuff.putLong(pos, messageExtBatch.getStoreTimestamp());
// 设置在 CommitLog 中的偏移量,每一条消息都需要设置
putMessageContext.getPhyPos()[index++] = wroteOffset + totalMsgLen - msgLen;
// ConsumeQueue 偏移量 +1,所以正常来求出 ConsumeQueue 的偏移量要通过 queueOffset * 20 求出来
queueOffset++;
// 消息数 + 1
msgNum++;
// 设置 position,下一条消息将从 msgPos + msgLen 开始写入
messagesByteBuff.position(msgPos + msgLen);
}
// 消息全部写入了之后重新设置 position 为 0
messagesByteBuff.position(0);
// 设置消息总长度
messagesByteBuff.limit(totalMsgLen);
// 接着把 messagesByteBuff 里面的数据追加到 byteBuffer 中
byteBuffer.put(messagesByteBuff);
// 重置为空
messageExtBatch.setEncodedBuff(null);
// 返回结果
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdSupplier,
messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
// 设置消息条数
result.setMsgNum(msgNum);
// 更新 ConsumeQueue 的最大偏移量为 queueOffset
CommitLog.this.topicQueueTable.put(key, queueOffset);
return result;
}
3. 小结
这篇文章介绍了 broker 端存储批量消息的逻辑,但是从上一篇文章到这篇文章,我们就能发现消息存储的时候只是存储到了 ByteBuffer,而最终在 CommitLog#asyncPutMessages 方法中会添加一个刷盘请求,那么这个刷盘请求如何起作用?RocketMQ 有多少种刷盘请求?这些问题我们都需要探讨。
如有错误,欢迎指出!!!