rocketmq5源码系列--(二)--生产者发送消息
这是broker源码系列第一篇。还是和往常一样,建议copy到本地阅读
broker是基于netty的
rocketmq队列分物理队列和逻辑队列,物理队列只有一个而逻辑队列有很多个
rocketmq 物理队列,一个物理队列对应一个文件,一个物理队列可以对应多个逻辑队列
rocketmq 静态队列文档:https://www.jgaonet.com/mindoc/docs/rocketmq/statictopic-RocketMQ_Static_Topic_Logic_Queue_%E8%AE%BE%E8%AE%A1.md
客户端使用静态队列即用的是逻辑队列,然后broker根据mapping通过逻辑队列的id来获取实际物理队列的
rocketmq5所谓的一致性:
rocketmq5所谓的一致性指的是本地事务和发送消息这个操作的一致性,即本地事务与发送消息这个操作要么都成功要么都失败,不能本地事务成功提交,但是消息发送却失败了即导致当前流程多处理一个对象a,而后面的流程会漏掉这个消息即不会对a进行处理,也不能本地事务失败但消息却提交成功,这样会导致本地少处理一个对象a而后面的流程多处理了这个对象a,这一切都源于消息一旦发送就不能撤回,所以解决办法就是确定本地事务提交成功后,本流程发出的消息才对下游可见。!!!这个事务消息不是指生产者发送一条消息,然后消费者消费成功并ack后才算事务成功,而是生产者一旦成功把消息提交到broker,事务流程就算完了。如果流程1处理一下,然后发一条消息给rocketmq,然后流程2处理完该消息,这样才算一个完整流程的话,那么就是分布式事务的事情了,也就是说这里的本地事务和发送消息的一致性可以简单看做是整个分布式事务中的一环。
rocketmq的解决办法就是2pc,只不过这个2pc是生产者和broker之间的即1:先发送prepare消息给broker,broker返回ack后再执行本地事务,本地事务执行成功后再发送commited/rollback消息给broker,如果是commited则broker也提交该消息,提交成功后返回ack给生产者,此时事务结束,就是说一旦提交,消息就变成普通消息,就对下游可见,如果是rollback,则直接删除,这样下游也肯定看不到。一旦提交,消息就变成普通消息,就对下游可见。
存在一些场景:如果本地事务提交成功但是还没有发送消息,此时就断了,那么就会导致本地成功但broker不知道是否成功,所以broker在事务超时后就会回查即主动询问生产者,因为此时网络断了,所以肯定无法询问原生产者,然后官方文档说的是回查同组任一其他生产者,也就是说问你问不到,那broker就问组内其他生产者,因为生产者b要能查生产者a的事务执行结果,所以这里就要求同组内的所有的生产者都必须能访问全局数据即同组任一生产者都能访问同组任意其他生产者的事务执行结果,除非同组所有生产者都访问不到,不过如果所有生产者都访问不到那肯定是网络出了问题,那肯定是生产大事故。因为可能一直回查不到生产者导致超过最大重试次数,那么broker就会把该消息丢到指定的xxx队列,和死信队列一样,要开发者新建一个消费者去处理这个xxx队列下的所有消息
原理明白了,那么broker的处理流程不用看就大致知道是个怎么回事了,这个事务消息的实现肯定是要client sdk配合的
!!!rocketMQ读写队列
1:(本点摘抄于网络文章)读写队列,则是在做路由信息时使用。在消息发送时,使用写队列个数返回路由信息,而消息消费时按照读队列个数返回路由信息。在物理文件层面,只有写队列才会创建文件。举个例子:写队列个数是8,设置的读队列个数是4.这个时候,会创建8个文件夹,代表0 1 2 3 4 5 6 7,但在消息消费时,路由信息只返回4,在具体拉取消息时,就只会消费0 1 2 3这4个队列中的消息,4 5 6 7中的信息压根就不会被消费。反过来,如果写队列个数是4,读队列个数是8,在生产消息时只会往0 1 2 3中生产消息,消费消息时则会从0 1 2 3 4 5 6 7所有的队列中消费,当然 4 5 6 7中压根就没有消息 ,假设消费group有两个消费者,事实上只有第一个消费者在真正的消费消息(0 1 2 3),第二个消费者压根就消费不到消息。由此可见,只有readQueueNums>=writeQueueNums,程序才能正常进行。最佳实践是readQueueNums=writeQueueNums。那rocketmq为什么要区分读写队列呢?直接强制readQueueNums=writeQueueNums,不就没有问题了吗?rocketmq设置读写队列数的目的在于方便队列的缩容和扩容。思考一个问题,一个topic在每个broker上创建了128个队列,现在需要将队列缩容到64个,怎么做才能100%不会丢失消息,并且无需重启应用程序?最佳实践:先缩容写队列128->64,写队列由0 1 2 ......127缩至 0 1 2 ........63。等到64 65 66......127中的消息全部消费完后,再缩容读队列128->64.(同时缩容写队列和读队列可能会导致部分消息未被消费)
2:(个人猜测,因为对rocketmq5还不太懂):如1中所述,队列实际只有一份,标号0~x,然后读队列k和写队列k,只要他们的k即queueid相同,那么就对应的是同一个底层的物理队列,说是物理队列,实际上也是一个逻辑队列,底层只有一个逻辑上的CommitLog大文件,不管是读还是写队列,都是用的这个逻辑文件,然后这个CommitLog逻辑文件实际上会被分成多个小的offset连续的MappedFile物理文件
存疑:topic.unitMode是什么
1:broker流程
1.1:broker流程骨架
NettyRemotingServer.NettyServerHandler.channelRead0
NettyRemotingAbstract.processMessageReceived
switch (msg.getType()) { #首先判断收到的是请求还是响应
case REQUEST_COMMAND:
NettyRemotingAbstract.processRequestCommand(ctx, msg);
Runnable run = buildProcessRequestHandler(ctx, cmd, pair, opaque) #创建一个task,后续会丢到线程池异步执行
return () -> {
RemotingHelper.parseChannelRemoteAddr(ctx.channel()); #首先解析地址
doBeforeRpcHooks(remoteAddr, cmd); #然后执行before rpchook,不过默认是0个
this.requestPipeline.execute(ctx, cmd); #然后执行请求pipeline,这里是authenticationPipeline
#注意,这里不是处理请求的地方
AuthenticationPipeline.execute
response = pair.getObject1().processRequest(ctx, cmd); #!!!这里负责处理请求,这是一个多态
#不同的请求对应不同的类
1:pull消费者对应的处理函数 #1:pull消费者对应的处理函数
PullMessageProcessor.processRequest
......
2:生产者发来消息对应的处理函数 #2:生产者发来消息对应的处理函数
SendMessageProcessor.processRequest
......
3:admin相关命令对应的处理函数 #3:admin相关命令对应的处理函数
AdminBrokerProcessor.processRequest
switch (request.getCode()) {
case RequestCode.UPDATE_AND_CREATE_TOPIC:
return this.updateAndCreateTopic(ctx, request);
case RequestCode.UPDATE_AND_CREATE_TOPIC_LIST:
....
case ...
doAfterRpcHooks(remoteAddr, cmd, response); #然后执行after rpchook,不过默认是0个
writeResponse(ctx.channel(), cmd, response);
break;
case RESPONSE_COMMAND:
NettyRemotingAbstract.processResponseCommand(ctx, msg);
break;
1:生产者事务或非事务单条消息发送流程
SendMessageProcess.processRequest
case RequestCode.CONSUMER_SEND_MSG_BACK: #如果这个消息是消费者消费失败而发送回来的消息
AbstractSendMessageProcessor.consumerSendMsgBack #!!!也就是说sendMessage不仅仅用于生产者
...暂时略...
default:
TopicQueueMappingManager.buildTopicQueueMappingContext #构建mappingContext,
#这个ctx包含了静态队列即新增的逻辑队列的信息
if requestHeader.getLo() ==False: #如果该topic没有使用逻辑队列则置空该ctx的指定字段
return new TopicQueueMappingContext(topic,null,null,null,null) #许多字段都设置为空表示没有使用逻辑队列
#后面流程的代码会检测这个ctx的这些字段
mappingDetail = TopicQueueMappingManager.getTopicQueueMapping(topic)#尝试获取该topic对应的逻辑队列和物理队列的映射信息
#即这个ctx保存的是这个topic对应的逻辑队列的所有信息
if mappingDetail == null: #如果没有找到则说明没有使用静态队列
return new TopicQueueMappingContext(topic, null, null, null, null)#许多字段都设置为空表示没有使用逻辑队列
TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId) #根据请求中的globalId即逻辑队列的id获取对应的信息
return new TopicQueueMappingContext(topic, globalId, mappingDetail, mappingItemList, leaderItem);#构建并返回ctx
#此条件下返回对的ctx包含了静态队列的相关信息
#!!!1-静态队列:逻辑队列映射到物理队列
#!!!即用户访问逻辑队列,然后broker上会把逻辑队列映射到实际的物理队列
#!!!也就是说broker这里先判断这个请求是不是用了逻辑队列
#!!!如果没用就返回ctx1,如果用了就查找逻辑队列和物理队列的映射信息
#!!!然后返回ctx2。
#!!!所以下一步就是检测返回的ctx的指定字段,如果用了逻辑队列
#!!!则需要把请求中的队列id重写为实际的物理队列的id
#!!!也就是说用户请求中的id用户以为是物理队列的id
#!!!实际上这个id是逻辑队列的id,broker会自动转换成实际的物理队里的id
#!!!这个静态逻辑队列是rocketmq5新增的流处理方面的,略
#!!!2-笔记:不管用没用,反正后面的代码都是通过这个ctx来获取信息
#!!!所以可以用一个ctx来实现多种场景,一套代码,即代码更通用
TopicQueueMappingManager.rewriteRequestForStaticTopic #rewrite操作就是检测ctx判断是否用了逻辑队列
#如果用了,就把请求的队列id重写为该逻辑队列对应的实际的物理队列的id
if mappingContext.getMappingDetail() == null: #如果ctx.mappingDetail为空则表示没用逻辑队列
return null #此时返回null表ok
requestHeader.setQueueId(mappingItem.getQueueId()) #getQueueId表示实际的物理队列的id
#这里就用实际的物理队列的id替换掉请求中的逻辑队列id
return null #return null 表ok
SendMessageProcessor.buildMsgContext #这里根据前面的信息来构建sendmsg使用的msgCtx
SendMessageProcessor.executeSendMessageHookBefore #执行send前的hook操作
#开始send
if requestHeader.isBatch(): #如果是批量发送
SendMessageProcess.sendBatchMessage #则调用sendBatchMessage
AbstractSendMessageProcessor.executeSendMessageHookAfter #处理afterHook消息
else : #反之则是发送单条消息
SendMessageProcessor.sendMessage #调用sendMessage
SendMessageProcessor.preSend #preSend:1:创建response对象;2:检查请求
RemotingCommand.setXXX #设置response对象的各种字段
......
AbstractSendMessageProcessor.msgCheck #检查请求
PermName.isWriteable #1:检查broker是否可写
TopicValidator.validateTopic #2:校验topic是否有效
#1:topic名字不能为空;2:topic名不能含有非法字符;3:topic名长度不能超过限制
TopicValidator.isNotAllowedSendTopic #3:检查topic是否可以推送消息(通过检查该topic是否在黑名单中)
TopicConfigManager.selectTopicConfig #通过topic检查了则获取topic的信息,如果不存在则创建
if null == topicConfig: #如果不存在
TopicConfigManager.createTopicInSendMessageMethod #则创建topic
...topic创建过程待补充...
queueIdInt = requestHeader.getQueueId(); #检查请求中的queueId即物理队列id是否有效
idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());#检查最大且有效地队列id
if queueIdInt >= idValid: #这个队列id不能超过读或者写队列的最大id
return failed #后面读写队列时肯定会检测队列是否可读可写,所以不用担心
#要开始处理请求了
SendMessageRequestHeader.getQueueId #获取queueId,不管是否用的是逻辑队列
#经过前面的处理,到此处queueid必定是物理队列id
if queueIdInt < 0:
queueIdInt = randomQueueId(topicConfig.getWriteQueueNums()) #如果queueId为-1表示随便放到一个队列就行
#???我猜有些逻辑比如顺序消息/分组消息
#应该就是通过控制queueId来实现的
#这个逻辑应该是放在客户端sdk中,client先获取所有队列信息
#然后再根据需要来确定把消息放到哪个队列
#broker只需要根据消息中指定的queueid来执行就好了
MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); #构建内部使用的消息,就是说我们存的时候使用的消息不是请求中的消息
#请求中的消息只是broker内部使用的MessageExtBrokerInner消息的一个字段
#就是后续流程中使用的就是inner消息了而不是最初的请求消息了
msgInner.setTopic(requestHeader.getTopic()); #设置inner消息的topic
msgInner.setQueueId(queueIdInt); #设置inner消息的queueid字段
...省去一系列msgInner.setXXX...
SendMessageProcessorhandleRetryAndDLQ #处理重试和死信队列
if null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX): #如果这是重试topic
if reconsumeTimes > maxReconsumeTimes || #如果重试次数超过了阈值
sendRetryMessageToDeadLetterQueueDirectly #或者配置了直接发送到死信队列
{
newTopic = MixAll.getDLQTopic(groupName); #先获取消费者组对应的死信队列topic
queueIdInt = randomQueueId(DLQ_NUMS_PER_GROUP); #为该消息创建一个死信id
msg.setTopic(newTopic); #然后就把当前请求的目的topic重置为死信队列的topic
#意思就是本次消息发送请求还是会继续写,但是会丢到死信队列
#!!!sendMessage流程不仅仅用于生产者生产消息
#!!!还用于消费者消费失败消息投递回broker
#!!!即SendMessage就是SendMessage的功能
#!!!不管调用者是谁
msg.setQueueId(queueIdInt);
}
if Boolean.parseBoolean(traFlag) && #如果是事务消息
!(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0) #并且xxx,这两个条件没看懂为啥
{
if BrokerConfig.isRejectTransactionMessage: #如果我们配置broker拒绝事务消息
return #那么直接返回表示拒绝本次消息
sendTransactionPrepareMessage = true; #反之设置标记,标记本次是事务消息
}
else:
sendTransactionPrepareMessage = false;
if BrokerConfig.isAsyncSendEnable() #如果是配置文件中设置的是异步master
{ #即本次消息无需等待slave同步
if sendTransactionPrepareMessage: #并且如果是事务消息
TransactionalMessageServiceImpl.asyncPrepareMessage #则异步执行事务消息第一阶段:prepare
#!!!事务消息的流程只是简单修改一下topic
#!!!然后当成普通消息来调用asyncPutMessage来处理
#!!!即asyncPutMessage是sendMessage最核心的流程
#!!!不管同步还是异步,事务还是非事务
#!!!最终都会来到asyncPutMessage/asyncPutBatchMessage
TransactionMessageBridge.asyncPutHalfMessage #修改消息然后putmessage
TransactionMessageBridge.parseHalfMessageInner #修消息的各种属性
topic=TransactionalMessageUtil.buildHalfTopic() #事务半消息topic名字是固定的,为RMQ_SYS_TRANS_HALF_TOPIC
MessageExtBrokerInner.setTopic(topic); #!!!最核心的就是修改消息的目的topic名字
MessageExtBrokerInner.setQueueId(0); #修改queueid
MessageExtBrokerInner.setXXX #还会新增一大堆属性,略
DefaultMessageStore.asyncPutMessage #修改了消息的topic后就可以把该消息当做普通的消息来走asyncPutMessage流程
......
-- -- - > commit or rollback
#当本地事务执行完毕后就会返回消息给broker,不过发的是EndTransaction消息而不是sendMessage消息
EndTransactionProcessor.processRequest
if MessageSysFlag.TRANSACTION_COMMIT_TYPE: #如果是结果是commit
#就是把消息写入commitLog
TransactionalMessageServiceImpl.commitMessage #从commitLog文件中先取出之前保存的prepare状态的halfMsg
#!!!任何topic下的消息都是写入同一个commitLog文件中
TransactionalMessageServiceImpl.getHalfMessageByOffset(commitLogOffset) #根据offset直接从commitLog文件获取halfMsg
EndTransactionProcessor. rejectCommitOrRollback #检查消息是否超时
#就是说不能发送halfMsg开始事务后隔很久才提交
EndTransactionProcessor.checkPrepareMessage #检查prepare消息的状态就是有可能prepareMsg
#和endTransactionMsg的各种信息匹配不上
EndTransactionProcessor.endMessageTransaction #如果一切ok,那么就从从halfMsg中还原原消息
MessageExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)) #很简单,就是把topic改回原来的真实的topic
EndTransactionProcessor.sendFinalMessage #就是把还原的消息走一遍putMessage流程来保存到commitLog文件
DefaultMessageStore.putMessage
DefaultMessageStore.waitForPutResult(DefaultMessageStore.asyncPutMessage(msg)
#最终还是来到asyncPutMessage
#只不过这里会等待直到这个异步操作完成
#!!!此时事务消息就像一个普通消息一样了
#!!!一旦写入,就对消费者可见
TransactionMessageServiceImpl.deletePrepareMessage #put完成后从halfMsg Topic中删除该消息
#当然不是物理删除,而是写入一条commit消息
#因为消息一旦写入commitLog就是不可变的
#和commit相比没有put操作,因为rollback就代表取消
TransactionMessageServiceImpl.getOpMessage #创建一条op消息
topic=RMQ_SYS_TRANS_OP_HALF_TOPIC #这条消息会发往指定的OP_HALF_TOPIC
new Message(topic,TransactionalMessageUtil.REMOVE_TAG) #这条op会打上REMOVE_TAG
#因为代表对应的halfMsg可以删除了因为事务已完成
TransactionalMessageBridge.writeOp #调用putMessage把刚创建的OP消息写入OP_HALF_TOPIC
#任何消息的写入操作最终都会调用asyncPutMessage函数
else if MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: #如果是rollback,则删除halfMsg
TransactionMessageServiceImpl.rollbackMessage #就去根据offset直接从commitLog中取出该消息
EndTransactionProcessor. rejectCommitOrRollback #检查消息是否超时
#就是说不能发送halfMsg开始事务后隔很久才提交
EndTransactionProcessor.checkPrepareMessage #检查prepare消息的状态就是有可能prepareMsg
#和endTransactionMsg的各种信息匹配不上
TransactionMessageServiceImpl.deletePrepareMessage #直接从halfMsg Topic中删除该消息
......
-- -- - > check
#因为事务可能超时,所以会有一个定时线程不断扫描halfMsg Topic下的所有消息
#如果超过指定时间,则回查生产者,如果达到最大回查次数,则丢到一个类似死信队列的队列里去
TransactionalMessageServiceImpl.run #一个单独的check线程
while !this.isStopped: #每隔一段时间就检查一下所有未完成的事务,默认30s
BrokerConfig.getTransactionCheckInterval
TransactionalMessageServiceImpl.waitForRunning #执行wait和check
TransactionalMessageServiceImpl.onWaitEnd #在wait结束的后执行check操作
TransactionalMessageServiceImpl.check #执行check操作
topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC #halfMsg对应的topic就是这个topic
TransactionalMessageBridge.fetchMessageQueues #获取这个topic下的所有队列
#!!!queueid相同的读写队列是同一个队列
#因为所有topic的消息都存在同一个commitLog文件中
#所以commitLog中同topic的消息不是连续存放的
#所以肯定不能直接遍历commitLog文件
#因为每个topic的消费队列中保存的肯定是这个topic的消息
#所以直接遍历该topic的所有队列里的消息即可
#!!!就是像消费者一样消费队列里的消息
#!!!消费完就后就不应该再可见
for MessageQueue messageQueue : msgQueues: #遍历halfmsg topic下的所有消费队列
opQueue = getOpQueue(messageQueue) #获取该halfMsg Queue对应的opQueue
#即一条halfMsg代表2pc中的一条prepare日志
#opQueue中保存的消息代表的是某个事务已经完成
#即一条opMsg代表2pc中的一条commited或rollback日志
halfOffset=TransactionalMessageBridge.fetchConsumeOffset(messageQueue)#获取halfQueue的consumeOffset
opOffset=TransactionalMessageBridge.fetchConsumeOffset(opQueue); #获取opQueue的consumeOffset
TransactionalMessageServiceImpl.fillOpRemoveMap#读取opQueue,来标记哪些事务已经完成了
TransactionalMessageServiceImpl.pullOpMsg #从op队列消费消息
#???暂不清楚这个操作是否会更新consumeOffset
#???因为下面有一个手动更新offset的方法
#会把已经完成的事务消息放到一个removeMap中
#空消息则直接放到doneOpOffset
#因为空消息直接算已经处理完毕
while true: #遍历该队列的所有消息
if System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT:
break #限制单个队列的遍历时间为1分钟
if removeMap.containsKey(i): #如果这个halfMsg对应的offset在removeMap中
#removeMap:key=halfOffset,value=opOffset
#那么就表明这个消息已经commit/rolllback
doneOpOffset.add #把对应的opMsg放到doneOpOffset队列表示已完成无需进行check
else:
TransactionalMessageServiceImpl.getHalfMsg(messageQueue, i)#根据offset从halfMsgQueue中获取halfMsg
TransactionalMessageServiceImpl.pullHalfMsg #pull方式从halfMsg Queue中取出消息
#???暂不知道是否会更新offset
#???看代码好像不会
if TransactionalMessageServiceImpl.needDiscard || #如果消息重试的次数超过了阈值则返回true
#如果没有,则msg的重试次数+1
TransactionalMessageServiceImpl.needSkip #如果halfmsg所在的文件已经expire了
#默认72h即消息过期了
{
DefaultTransactionalMessageCheckListener.resolveDiscardMsg #那么就把该消息丢到类死信队列
DefaultTransactionalMessageCheckListener.toMessageExtBrokerInner #修改msg的目的topic
TopicConfigManager.createTopicOfTranCheckMaxTime
RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC #目的topic为RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC
MessageExt.setTopic(topic) #设置消息的目的topic
DefaultMessageStore.putMessage #此时把消息当做普通消息一样丢到类死信队列
#后面就是asyncPutMessage流程了,略
}
if msgExt.getStoreTimestamp() >= startTime:#如果消息的存储时间晚于check操作的开始时间
#表明这些消息是在check开启之后才存储的,很新
#所以无需check,因为queue中消息是递增的
#所以这个消息之后的消息都比当前消息新,
#所以后面的消息更加不需要检查,所以直接break
break
valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
#消息生存时间=now-消息产生时间
checkImmunityTime = transactionTimeout #checkImmunityTime表示消息冷却时间
#即生存时间<冷却时间的消息都可以跳过检查
#因为该事务消息才开始,没必要检查
checkImmunityTimeStr = msgExt.getUserProperty(CHECK_IMMUNITY_TIME_IN_SECONDS)
if null != checkImmunityTimeStr: #timeStr不为空表示我们设置了该消息的冷却时间
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout)
if -1 == tiemstr: #如果timestr!=-1则表明设置了有效冷却时间
checkImmunityTime=transactionTimeout #则用我们配置的冷却时间
else
checkImmunityTime *= 1000 #否则直接用事务超时时间来表示冷却时间,默认是6s
#即如果事务没有超时就不会check
if valueOfCurrentMinusBorn < checkImmunityTime: #如果消息还没达到检查时间
ok=checkPrepareQueueOffset #检查这条免检消息是否在removemap中
#在则说明该消息已经完成了,可以跳过,返回true
#否则需要把该消息重新写到halfMsgQueue的末尾
#如果消息追加到halfMsg Topic末尾失败
#则返回false表示不可以跳过检查需要继续往下走
#然后在check实施环节写入halfMsgQueue中
property=msgExt.getUserProperty(PREPARED_QUEUE_OFFSET) #获取immu halfMsg的字段
if property==null: #如果为空表明这是第一次遇到这个消息
TransactionalMessageServiceImpl.putImmunityMsgBackToHalfQueue
#那么直接put immu消息,put的时候会设置这个属性字段
#如果put成功就可以跳过本次check否则本次必须check
else: #msg中取出的属性值不为null则表明不是第一次处理这个消息
if -1 == prepareQueueOffset: #但是属性值为-1即无效值
return false #则直接返回false表示不可以跳过
else:
if removeMap.containsKey(prepareQueueOffset): #如果在removeMap中则表示这个消息虽然处于免检状态
#但是已经commited/rollback即事务消息已完成
return true #那么就可以跳过即返回true
else:
TransactionalMessageServiceImpl.putImmunityMsgBackToHalfQueue #否则要put immu halfMsg
TransactionalMessageBridge.renewImmunityHalfMessageInner #设置该属性字段
TransactionalMessageBridge.renewHalfMessageInner #copy一份旧消息
msgInner.setWaitStoreMsgOK(false) #肯定是异步写入因为同步效率肯定很低
msgExt.getUserProperty(PREPARED_QUEUE_OFFSET) #看消息是否已经设置了该属性值
if null != queueOffsetFromPrepare: #如果有
MessageAccessor.putProperty(queueOffsetFromPrepare) #则还是设置原值
else: #如果没有
MessageAccessor.putProperty(msgExt.getQueueOffset()) #则从原消息中获取
#???没搞懂这个字段干嘛用的
#???也没搞懂这个逻辑,他都有了还设置干嘛
DefaultMessageStore.putMessage #renew之后就是调用put把消息写回halfmsg topic
if ok: #如果checkPrepareQueueOffset ok则跳过这条消息
newOffset = i + 1 #更新索引为下一条消息
i++
continue #直接返回
else: #如果消息没有配置checkImmunityTimestr
#那么就默认使用的是事务超时时间
if 0 <= valueOfCurrentMinusBorn &&
valueOfCurrentMinusBorn < checkImmunityTime) #如果消息还没达到检查冷却时间,说明消息过新
{
break; #那么这条消息以及这条消息之后的消息都直接跳过
#即结束本队列的本次check之旅
}
boolean isNeedCheck =
(opMsg == null && #如果op列表为空
valueOfCurrentMinusBorn > checkImmunityTime) || #并且该halfMsg超过了检查冷却时间
(opMsg != null && #或者对应的opmsg队列不为空
opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout) ||
#???并且最后一条op消息的时间超过了事务超时时间
(valueOfCurrentMinusBorn <= -1) #或者消息的生存时间小于等于-1
#那么该事务消息就需要回查生产者
if isNeedCheck: #如果需要回查
TransactionalMessageServiceImpl.putBackHalfMsgQueue #把消息重新写回halfMsg队列末尾
TransactionalMessageBridge.renewHalfMessageInner
TransactionalMessageBridge.putMessageReturnResult
DefaultMessageStore.putMessage
AbstractTransactionalMessageCheckListener.resolveHalfMsg #回查生产者事务状态
thread.run{ #丢到异步线程去发送
AbstractTransactionalMessageCheckListener.sendCheckMessage
Broker2Client.checkProducerTransactionState #发送回查消息给client
}
else:
nextOpOffset= pullResult.getNextBeginOffset() #更新变量来开始下一次循环
pullResult = fillOpRemoveMap #end while
if newOffset != halfOffset: #如果halfMsg queue消费了消息
TransactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset) #则更新halfMsgQueue的offset
ConsumerOffsetManager.commitOffset #提交consumer的commitOffset
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset) #更新
if newOpOffset != opOffset:
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset) #end for即本队列已结束开始下一个队列
#!!!因为消费流程还没看所以上面的回查逻辑可能还有点小错和迷糊,但八九不离十了
#!!!但八九不离十了,这里总结一下大概检查逻辑:
#!!!halfMsg queue和opQueue的offset都是递增的
#!!!如果该事务消息已经完成则不需要检查,
#!!!可以不作任何处理,即跳过check
#!!!如果该事务消息还没有完成且还没达到检查时间
#!!!因为queue中的消息时间是递增的,
#!!!所以后面的消息都可以跳过检查
#!!!也就是说该队列这个点之后的消息都留到下次循环再去处理
#!!!所以这个队列以及队列之后的消息无需再次写到halfmsg queue
#!!!直接从原位置开始再次检查就行
#!!!如果某个消息还没有完成且本次不需要check
#!!!因为halfMsg queue的offset是只增不减的
#!!!所以再次把该消息追加到halfMsg queue的末尾
#!!!然后继续检查该消息对应的queue的下一条消息
#!!!这样当offset增加后还是可以处理该消息的
#!!!总之核心就是offset只能增不能减
#!!!核心就是不能回过头去读当前offset之前的数据
#20241121 22:42 又是加班攒调休的一天,今天好冷,还差一点就能捋清了,
#但是不想弄了,太伤神了,专注力已经无了
else:
DefaultMessageStore.asyncPutMessage #如果不是事务消息则异步写入存储,默认是DefaultMessageStore
#!!!一个broker只有一个MessageStore对象
for all hook in putMessageHookList : #执行before putMsg hook
putMessageHook.executeBeforePutMessage(msg);
CommitLog.asyncPutMessage #执行写入操作,即提交commitLog,commitLog包含完整消息
#!!!一个broker只有一个MessageStore对象,
#!!!一个MessageStore对象只有一个CommitLog对象
#!!!也就是说不管有多少个队列,都是用的同一个CommitLog
#!!!所以这里没有根据queueid来选择对应的commitLog
#!!!因为一个broker只有一个commitlog
#!!!一个CommitLog对象代表一个大的逻辑文件CommitLog
#!!!在这个逻辑CommitLog文件中offset是连续的从0开始的
#!!!这个逻辑CommitLog文件实际上是由多个小的mappedFile组成
#!!!这些mappedFile有两个偏移,起始偏移和文件内偏移
#!!!起始偏移表示mappedFile的第一个字节在整个逻辑CommitLog文件中的位移
#!!!文件内偏移就是当前mappedFile文件的写入位置即文件内偏移
#!!!这些MappedFile的[起始offset,endOffset]是连续的
#!!!比如第一个是[0,x],那么第二个就是[x+1,y]...[n+1,z]
#!!!也就是说物理读写队列实际上都是逻辑上的
#!!!底层只有一个逻辑CommitLog文件
#!!!还有,静态队列的逻辑队列则是构建在逻辑读写队列之上的逻辑读写队列
if !MessageConfig.isDuplicationEnable(): #如果没有开启消息重复,则会给每条消息添加一个时间戳以区分唯一和有序
#这个开关默认是关闭的,即默认是会加时间戳的
MessageExt.setStoreTimestamp(System.currentTimeMillis())#设置消息的时间戳
MessageExt.setBodyCRC #设置消息的crc
MessageExt.setVersion #设置消息的版本
MessageExt.setxxxx #还会设置消息的一系列字段,略
ThreadLocal<PutMessageThreadLocal>.get #获取put操作相对应的配置,比如encoder、builder
String topicQueueKey = generateKey(putMessageThreadLocal.getKeyBuilder(), msg) #!!!topicQueueKey=TopicName-QueueId
keyBuilder.append(messageExt.getTopic());
keyBuilder.append('-');
keyBuilder.append(messageExt.getQueueId());
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile() #获取最后一个内存映射文件(前面的都写满了)
#一个broker
if mappedFile == null: #如果为空则表示集群还没有存储过消息
currOffset = 0; #这里先设置offset=0即消息的存储索引是从0开始
else:
currOffset = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition() #否则offset=文件起始偏移+文件内偏移,
#即内容会追加到该文件末尾
#即offset表示消息的全局offset
#这个offset会跟着消息一同存储到磁盘
needAckNums = MessageStoreConfig.getInSyncReplicas() #needAckNums表示需要几个副本写入成功才算成功
#InSyncReplcias表示处于In-sync状态的副本数即健康的副本数
#默认为1,因为master也算他本身的一个副本,
#就是说至少要本身写成功才算成功,当然,这是废话
#如果master是ASYNC_MASTER或者
#enableControllerMode和ackAckInSyncStateSet这两个都开启的时候
#In-sync replicas将被忽略,也就是说写完master就直接返回了
#即使in-sync>1也不会去写副本
#这里说的副本、slave是同一个意思
ha = needHandleHA(msg) #是否需要处理HA
if !messageExt.isWaitStoreMsgOK || #如果该消息配置为不需要等待storeok即写入完成
MessageStoreConfig.isDuplicationEnable || #???(不太懂)或者开启了DuplicationEnable开关(不过默认是关的)
BrokerRole.SYNC_MASTER != MessageStoreConfig.getBrokerRole #或者master类型不为SYNC即master为async即不需要同步
{
return false; #那么就不需要ha,反之则需要
}
return true
if needHandleHA && BrokerConfig.isEnableControllerMode: #如果需要ha并且开启了controller模式
#controller模式下会自动设置节点的角色无需人工干预
num=DefaultHAService.inSyncReplicasNums(currOffset) #获取健康的副本数即commitlogIndex等于master的commitlogIndex的副本数
insyncNums=1 #从1开始,因为master本身也算一个健康的replica
for HAConnection conn : this.connectionList: #遍历集群所有slave连接
isOk=this.isInSyncSlave(currOffset, conn) #判断该节点是否ok
if currOffset - conn.getSlaveAckOffset() < MessageStoreConfig.getHaMaxGapNotInSync():
return true; #如果当前master节点的offset-slave的offset小于配置的阈值
#那么就认为slave是in-sync
return false; #否则就认为该slave是非sync的
inSyncNums++;
}
if num < MessageStoreConfig.getMinInSyncReplicas: #如果haservice中显示的处于in-sync状态即健康的副本数
return failed #小于要求的最小副本数,那么就返回失败
if MessageStoreConfig.isAllAckInSyncStateSet: #如果配置了要求所有副本都ack才算成功(不过默认是关闭的)
needAckNums = MixAll.ALL_ACK_IN_SYNC_STATE_SET #那么就更新needAckNums-1表示要求所有节点都ack
#注意,这里是增大needAckNums,因为此时needAckNums只能增不能减
#因为条件只能更严格,不能更松,如果更松就达不到ha要求了
else if needHandleHA &&
BrokerConfig.isEnableSlaveActingMaster #如果开启了ha但是没有开启controller模式但是开启了slaveActMaster
{ #不太懂,反正大意及随后master挂了但是slave可以临时充当master
#完成只有master才能完成的部分功能
inSyncReplicas = Math.min(DefaultMessageStore.getAliveReplicaNumInGroup,DefaultHAService.inSyncReplicasNums)
#min(存活的副本数,inSync的副本数)
needAckNums = CommitLog.calcNeedAckNums
needAckNums = MessageStoreConfig.getInSyncReplicas()#获取配置的要求多少个副本数才算成功
#注意:MessageStoreConfig.getInsyncReplicas表示
#要求这么多个副本同步成功才算成功
#而DefaultHAService.inSyncReplicasNums表示处于in-sync的副本数
if MessageStoreConfig.isEnableAutoInSyncReplicas: #如果配置了允许动态调整needAckNums即允许缩小needAckNums
#即我配置了要求3个副本成功,但是我打开了这个autoInSync开关
#那么我可以根据实际情况调小needAckNums,即我目前即使只有1个健康副本
#我也能通过调小needAckNums=2来使得本次请求仍然满足一致性要求的副本数
#从而本次请求可以继续往下执行
#!!!但是needAckNums不能小于要求的最小副本数
#!!!就是说needAckNums必须大于等于minInsync(这个参数可配置)
needAckNums = Math.min(needAckNums, inSyncReplicas); #看是否需要调小needAckNums,因为我最多只有这么多个健康的副本
needAckNums = Math.max(needAckNums,MessageStoreConfig.getMinInSyncReplicas());
#needAckNums必须大于配置的最少副本数
}
if needAckNums > inSyncReplicas: #经过调整好needAckNums代表最终要求的副本数
#如果要求的副本数比可用的副本数还多,那么本次请求就无法满足副本数要求
return failed #所以返回fail表示本次sendmsg操作失败
TopicQueueLock.lock(topicQueueKey) #锁住指定的队列,队列key为 topicName-queueId
#???是不是会同时锁住生产者消费者队列
#???因为貌似sendMsg时会处理消费者队列的逻辑偏移
needAssignOffset = true; #needAssignOffset表示是否要给消息赋一个逻辑位移
#会同时存储消息的物理位移和逻辑位移
#物理位移表示该消息在commitLog中的全局存储位移(实际也是一个逻辑位移)
#消息存储后会被分配给某个消费队列,然后这个消息在这个消费队列中的位移
#就叫做逻辑位移
if MessageStoreConfig.isDuplicationEnable && #如果开启了Duplication(不过默认是关闭的)
MessageStoreConfig.getBrokerRole != BrokerRole.SLAVE #并且broker是master
{
needAssignOffset = false #那么此时就不需要给消息赋逻辑位移。
#???不懂,为什么此时就不需要
}
if needAssignOffset: #如果要给消息赋值逻辑位移
DefaultMessageStore.assignOffset #那么就设置消息在消费队列中的offset即逻辑位移
tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()) #获取消息的标志
if tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || #如果不是事务消息
tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) #或者该事物消息已经提交即已经commit
{ #事务消息已提交就表示该消息可以被正常消费了
AbstractConsumeQueueStore.assignQueueOffset(msg) #给消息赋值逻辑位移
ConsumeQueueStore.findOrCreateConsumeQueue #先获取消息会被分配给哪个消费队列,
#这是个多态方法,这里为ConsumerQueueStore
ConcurrentMap<Integer, ConsumeQueueInterface> map = consumeQueueTable.get(topic); #根据topic获取所有的队列信息
ConsumeQueueInterface logic = map.get(msg.getQueueId()); #!!!sendMsg请求中的queueId是读队列id也是写队列id
#!!!所以这里直接根据请求消息中的queueid获取对应消费队列对象
if logic !=null: #如果找到了,就直接返回
return logic #logic这个变量名也可以表明消费队列实际也是一个逻辑队列
else: #没有找到则表明这个队列一直没人消费过,所以创建新的
#新的消费队列对象的offset会从0开始
...创建ConsumrQueue对象,略... #!!!一个consuemrQueue对应一个文件
#!!!这个文件用来存放分配给这个consuermQueue的msg的offset
#!!!消费消息的时候先从consumerQueue对应的文件中读取offset
#!!!然后再拿着这个offset去commitLog中读
#!!!也就是说顺序写随机读
return newLogic
......
msg.setQueueOffset(consumerQueue.getQueueOffset) #设置消息的queueOffset即消息在指定消费队列中的逻辑位移
}
PutMessageLock.lock() #PutMessage锁
if !MessageStoreConfig.isDuplicationEnable: #如果关闭了消息重复,则给每个消息加上一个时间戳(默认是关闭的)
msg.setStoreTimestamp(beginLockTimestamp) #pustMessage函数开头设置过一次,这里又设置一次,即更新为当前时间
DefaultMappedFile.appendMessage #写入内存映射文件
switch (result.getStatus()) {
case PUT_OK: #如果提交到commitlog文件ok
CommitLog.onCommitLogAppend #就执行提交完毕后对应的钩子函数,默认是空操作
break;
case END_OF_FILE: #如果文件满了
CommitLog.onCommitLogAppend #先执行提交完毕这个动作对应的钩子函数,默认是空操作
CommitLog.mappedFileQueue.getLastMappedFile #然后创建一个新的mappedfile
DefaultMappedFile.appendMessage #然后在此提交到新的commitlog文件
#!!!此时还只是写到内存缓冲区
CommitLog.onCommitLogAppend #执行提交完毕这个动作对应的钩子函数,默认是空操作
PutMessageLock.unlock() #解锁PutMessage
if OK:
DefaultMessageStore.increaseOffset #更新指定消费队列的queueOffset,因为消息已经成功写入磁盘了
#而该消息会被分配给某个指定的队列x,也就是说队列x新增了消息
#所以更新消费队列x的queueOffset
if tranType == MessageSysFlag.TRANSACTION_NOT_TYPE ||
tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE)
{
ConsumeQueueStore.increaseQueueOffset(msg, messageNum); #更新指定ConsumerQueue的queuOffset
findOrCreateConsumeQueue(msg.getTopic(), msg.getQueueId());#根据queueId获取对应的队列
ConsumeQueue.increaseQueueOffset(this.queueOffsetOperator, msg, messageNum); #更新该队列的queueOffset
#不知道是不是会持久化到磁盘
}
TopicQueueLock.unlock(topicQueueKey) #解锁队列
CommitLog.handleDiskFlushAndHA #刷盘和ha
CommitLog.handleDiskFlush #刷盘,数据已经写入,只是还没有强制刷盘而已
CommitLog.DefaultFlushManager.handleDiskFlush #根据配置文件来决定是同步刷盘还是异步刷盘
...刷盘流程留到下一个大篇幅去弄了,这里暂时跳过了...
if !ha: #如果不需要处理ha即没有开启ha则直接返回ok
return ok
CommitLog.handleHA #处理HA
if needAckNums<=1: #如果只需要1个副本ack
return OK #直接返回ok,因为master broker本身也算一个
nextOffset = result.getWroteOffset() + result.getWroteBytes() #否则需要同步,这里设置偏移量,即下一个写入位置
GroupCommitRequest request = new GroupCommitRequest #构造同步请求
DefaultHAService.putRequest(request); #丢给haservice线程去异步处理ha
DefaultHAService.getWaitNotifyObject().wakeupAll() #唤醒沉睡的haservice线程
#如果所有haservice都正在运行没有sleep
#那么就什么也不做
#haservice是一个类似事件循环的线程
#如果没事做就wait了,直到被唤醒
...ha相关流程留到下一个大篇幅去弄了,这里暂时跳过了... #rocketmq5中线程a把新消息写入commitLog后
#会有主从同步线程b来异步把新增的数据同步给slave
#所以ha线程c的逻辑就是不断询问slave,同步到哪了
#一旦某个slave返回的ackOffset>nextOffset
#就知道该slave ok了,所以ackNum++
#直到ackNum>needAckNum即达到quorum个slave ok就可以了
#就会完成future
SendMessageProcessor.handlePutMessageResult #处理写入结果
switch putMessageResult.getPutMessageStatus()
case XX:
sendOk=true
......
case YY:
sendOk=false
if sendOK: #如果写入成功
BrokerStatsManager.incXXXX #那么就更新broker的status就是broker的状态
SendMessageStore.rewriteResponseForStaticTopic #处理静态topic,流场景用的,暂不了解,略
SendMessageStore.doResponse
AbstractSendMessageProcessor.executeSendMessageHookAfter #处理afterHook消息
}
else{ #如果是同步写入
if sendTransactionPrepareMessage: #并且如果是事务消息
TransactionalMessageServiceImpl.prepareMessage #同步执行事务消息第一阶段:prepare
else:
DefaultMessageStore.putMessage #如果不是事务消息则同步写入存储
#getMesageStore有多个实现类,默认是DefaultMessageStore
DefaultMessageStore.waitForPutResult(DefaultMessageStore.asyncPutMessage(msg))
#同步写入底层也是调用异步写入,
#只不过同步写入的时候会等待异步写入返回的future完成
...下面就是异步写入的逻辑了,上面已叙述,故此处略...
SendMessageProcessor.handlePutMessageResult #处理写入结果
AbstractSendMessageProcessor.executeSendMessageHookAfter #处理afterHook消息
}
2:handleHA流程
2.1:提交任务到haservice(上面已叙述)
CommitLog.asyncPutMessage
CommitLog.handleDiskFlushAndHA
CommitLog.handleHA
HAService haService = this.defaultMessageStore.getHaService(); #获取haservice
long nextOffset = result.getWroteOffset() + result.getWroteBytes();#获取要同步的偏移量,即下一个写入位置
#!!!大意就是另一个主从同步线程不断
#!!!会把master的commitLog中新增加的内容同步给slave
#!!!当slave同步到nextOffset或者超过nextOffset时
#!!!就表示本消息已同步,检测到quorum个slaveok ha就返回成功
GroupCommitRequest request = new GroupCommitRequest(nextOffset,DefaultMessageStoreConfig.getSlaveTimeout(), needAckNums);
#构建同步请求
DefaultHAService.putRequest #把同步请求丢到haservice去异步执行
GroupTransferService.putRequest #丢到ha的工作队列
lock.lock()
this.requestsWrite.add(request) #丢到write队列,这个write队列专门用来存放新增请求
#他有两个队列,write和read,工作线程会从read线程取数据
#而新数据则是会追加到write队列
#然后工作队列空了就会来一个swap即交换write队列和read队列变量的引用
#这样两个队列一个优点就是可以避免读写操作互锁
lock.unlock
DefaultHAService.getWaitNotifyObject().wakeupAll(); #如果haservice工作线程在sleep那么就唤醒,如果是running则什么也不做
2.2:haservice启动和运行
BrokerStartup.main
BrokerStartup.start
BrokerContorller.start
BrokerController.startBasicService
DefaultMessageStore.start
DefaultHAService.start
GroupTransferService.start
......
GroupTransferService.run
while (!this.isStopped()) {
GroupTransferService.waitForRunning(10); #没事干就会等待10s,有事干就会被提前唤醒
GroupTransferService.onWaitEnd #等待结束的时候
GroupTransferService.swapRequests #交换read/write两个队列
tmp = this.requestsWrite
this.requestsWrite = this.requestsRead
this.requestsRead = tmp
GroupTransferService.doWaitTransfer() #前面handlHA函数里把同步请求丢到了GroupTransferService的write队列
#所以doWaitTransfer会交换read/write队列,然后处理read队列中的请求
#对于每个请求,就是不断询问slave同步到哪里了,
#一旦超过nextOffset,slave ok 就+1知道达到quorum个节点,就完成本次请求
for req : this.requestsRead #遍历read队列的所有请求
for i = 0; !transferOK && deadLine - System.nanoTime() > 0; i++: #对于每个请求,只要没有超时或者ok,就一直询问
if i > 0: #如果不是第一次
GroupTransferService..waitForRunning(1) #就等一秒再开始下次询问
if !allAckInSyncStateSet && req.getAckNums() <= 1:
transferOK = haService.getPush2SlaveMaxOffset().get() >= req.getNextOffset()#master也算一个
continue;
if allAckInSyncStateSet && this.haService instanceof AutoSwitchHAService: #allAck=true并且haservice
#是AutoSwitcHAserver的实例
#暂不直到AutoSwitchHAService干嘛的,
#不懂为什么allAck=true时还需要加上这个条件
#此时必须等待所有副本ok
int ackNums = 1 #ackNums从1开始,因为master也算一个
for conn : haService.getConnectionList: #遍历所有slave
if SyncStateSet.contains(slaveId) && #如果该slave位于该master的slave集合中
AutoSwitchHAConnection.getSlaveAckOffset>= req.getNextOffset #并且该slave报告的ackOffset超过了nextOffset
{ #就表明对于这个请求,这个slave已经同步了
ackNums++; #那么就把针对这个请求的ackNums+1
}
if ackNums >= syncStateSet.size(): #此时是要求所有都ack,所以直到所有的都ok才返回ok
transferOK = true;
break;
else: #同上面的流程,区别在于
int ackNums = 1;
for conn : haService.getConnectionList:
if conn.getSlaveAckOffset() >= req.getNextOffset():
ackNums++;
if ackNums >= req.getAckNums() #达到quorum个slave返回ok就行了
transferOK = true;
break;
req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT)
#返回结果,要么ok,要么超时
#broker很多地方都是future实现的