启动你的RocketMQ之旅(三)-Producer启动和发送流程(上)
前言:
👏作者简介:我是笑霸final。
📝个人主页: 笑霸final的主页2
📕系列专栏:java专栏
📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
🔥如果感觉博主的文章还不错的话,👍点赞👍 + 👀关注👀 + 🤏收藏🤏
上一章节:启动你的RocketMQ之旅(二)-broket和namesrv启动流程
下一章节:启动你的RocketMQ之旅(四)-Producer启动和发送流程(下)
目录
- 一、namesrv注册broker
- 1.1注册流程
- 二、Producer启动流程
- 三、producer消息同步发送
- 1、验证消息
- 2、查找路由
- 3、选择队列和发送消息
- 4、通信协议
一、namesrv注册broker
NameServer (Namesrv) 在 RocketMQ 架构中扮演着路由信息管理的角色。它是一个轻量级的服务,不保存状态,主要职责是接收来自 Broker 的注册请求,并为 Producer 和 Consumer 提供路由查询服务。Broker 向 Namesrv 注册的主要原因。
Broker 向 Namesrv 注册的主要原因包括:
- 提供路由信息
当 Broker 启动时,它需要将自己的地址(IP和端口)以及它所承载的 Topic 信息等注册到 Namesrv。这样,Producer 和 Consumer 就可以通过 Namesrv 查询到相关的 Broker 地址,进而建立连接进行消息的发送和接收。- 高可用性
RocketMQ 支持多个 Namesrv 实例以实现高可用。Broker 会向集群中的所有 Namesrv 发送注册请求,确保即使某个 Namesrv 出现故障,其他 Namesrv 仍然可以提供完整的路由信息给客户端。- 动态发现
在分布式环境中,Broker 可能会动态增加或减少。通过向 Namesrv 注册,Broker 能够使这种变化被及时感知,从而保证系统的灵活性和可扩展性。- 心跳机制
为了确保 Namesrv 中的路由信息是最新的,Broker 会定期(例如每隔30秒)向 Namesrv 发送心跳包来更新自己的状态。如果 Namesrv 在一定时间内没有收到某个 Broker 的心跳,就会认为该 Broker 已经离线,并从路由表中移除它的信息。- 权限控制
在注册过程中,Broker 还可以设置自己的一些权限信息,比如是否允许读取、写入等,这有助于在 Namesrv 层面实现一定的访问控制。- 数据版本管理
每次注册或者更新信息时,Broker 会携带一个数据版本号,Namesrv 通过这个版本号来判断信息是否有更新,以此避免不必要的处理和资源浪费。
1.1注册流程
注册首先会发送注册的请求到Namesrv 服务,先看看注册消息都有哪些信息
可以看到 在方法名为registerBroker的方法中,有5个参数:
- namesrvAddr:NameServer 地址。
- oneway:布尔值,指示是否为单向请求。
- timeoutMills:超时时间。
- requestHeader:请求头对象。
- body:字节数组,表示请求体数据。
namesrv是如何接收的呢?
在namesrv的DefaultRequestProcessor类下有如下代码
会根据版本信息来选择不同的方法,我们看registerBroker方法,它有两个参数
- ctx:通常代表上下文(Context),它包含了调用该方法时的环境信息。
- request:代表注册请求的具体内容
同时获取返回的注册结果
registerBroker注册的核心代码部分
try {
// !!!获取写锁并阻塞等待直到获取到锁,确保在更新数据结构时不会发生并发问题
this.lock.writeLock().lockInterruptibly();
// 从集群地址表(clusterAddrTable)中获取指定集群名称对应的Broker名称集合
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);//添加到对应的集群
// 标记是否为首次注册该Broker
boolean registerFirst = false;
// 从Broker地址表(brokerAddrTable)中查找已存在的BrokerData对象,
// 代表Broker的信息
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;//是首次注册
brokerData = new BrokerData(clusterName,
brokerName,
new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
// 获取Broker地址映射表(brokerId到brokerAddr)
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//遍历映射表,移除与当前brokerAddr相同但brokerId不同的记录,
//保证同一IP:PORT仅有一条记录
Iterator<Entry<Long, String>> it
= brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null != brokerAddr &&
brokerAddr.equals(item.getValue()) &&
brokerId != item.getKey())
{
it.remove();
}
}
// 更新或添加新的brokerId和brokerAddr到映射表中
// 根据hashMap的put方法 返回值是value
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
// 若旧地址为空,也视为首次注册
registerFirst = registerFirst || (null == oldAddr);
// 如果Topic配置信息存在且当前注册的是主节点(MixAll.MASTER_ID),
// 则根据配置版本判断是否需要更新队列数据
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr,
topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
// 创建新的BrokerLiveInfo实例,并将其添加到Broker存活状态表(brokerLiveTable)中
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
// 处理FilterServer列表,如果为空则从filterServerTable中移除,否则更新
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
// 如果当前注册的是从节点,找到主节点信息并设置结果对象的相关属性
if (MixAll.MASTER_ID != brokerId) {
String masterAddr
= brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo
= this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
// 释放锁
this.lock.writeLock().unlock();
}
上面代码解读:
1.获取写锁:确保相关数据的完整性和一致性
● 首先使用ReentrantReadWriteLock的writeLock()方法获取一个可重入写锁,并通过lockInterruptibly()阻塞式地等待获取该锁,确保在更新Name Server内部数据结构时进行同步控制。
2.维护集群信息:
● 从clusterAddrTable中查找给定集群名对应的Broker集合,如果不存在则创建新的集合并添加到表中。
● 将当前注册的Broker名称加入到集合中。
3.管理Broker地址和状态:
● 获取或初始化BrokerData对象,存储关于Broker集群、名称及地址映射信息(brokerId到brokerAddr)。
● 遍历Broker地址映射表,移除与当前相同地址但不同ID的条目,保证每个地址仅有一条记录。
● 更新Broker地址映射表,将新注册的brokerId和brokerAddr存入。
● 若为首次注册,标记registerFirst为true,后续用于判断是否需要更新Topic配置信息。
4.处理Topic配置:
● 如果Broker提供的是主节点且主题配置信息存在,则检查Topic配置版本是否有变化或者首次注册,如果有,则根据TopicConfigWrapper中的Topic配置信息创建并更新队列数据。
5.维护Broker存活状态
● 在brokerLiveTable中保存Broker的实时连接信息,包括当前时间戳、主题配置版本、通道(Channel)以及高可用服务器地址(HA Server Address)。
● 若Broker是新注册的,输出日志信息表明新Broker已成功注册。
6.处理FilterServer列表:
● 根据Broker提供的过滤器服务器列表,将其存入filterServerTable,若为空则从表中移除对应Broker的过滤器服务器信息。
7.设置返回结果:
● 如果注册的是从节点(Slave),查找其关联的主节点信息,并将主节点的HA服务器地址和地址信息设置到返回的结果对象中。
8.释放锁
二、Producer启动流程
producer启动入口是producer.start()方法里面调用的是
defaultMQProducerImpl.start()
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
/*
*异步传输的接口。启动条件如下是:
* 1、客户端配置了消息追踪: 开发者在启动生产者时
通过设置相关配置参数
* (如系统属性或XML配置文件)来开启消息追踪功能
* 2、消息追踪插件被激活:
如果使用了RocketMQ的消息追踪插件或者扩展,
那么在生产者初始化阶段,插件可能会主动创建并注入一个
有效的traceDispatcher实例
* 3、自定义实现: 在某些高级应用场景下,开发者可能
自己实现了消息追踪逻辑,并在生产者中
手动设置了非空的traceDispatcher对象。
*/
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
在defaultMQProducerImpl.start()中 流程如下
三、producer消息同步发送
同步发送的步骤为4步
1、验证消息;2、查找路由;3、选择队列;4、消息发送
1、验证消息
传入了msg和当前producer实例,进入此方法它检查如下几点
● msg是否为空
● topic名称是否符合MQ规范
● 验证该topic是否允许生产者发送消息
● 验证body是否存在和长度是否超过mq的规定(大小不超过4m)
2、查找路由
查找路由的流程如下
直接上代码
// 尝试获取或更新给定主题(topic)的发布信息
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// 从本地缓存(topicPublishInfoTable)中尝试获取与该主题对应的TopicPublishInfo对象
// topicPublishInfoTable是一个 ConcurrentMap
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 如果本地缓存中没有找到该主题的发布信息,或者已有的发布信息不可用(!topicPublishInfo.ok())
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
// 在本地缓存中添加一个新的TopicPublishInfo实例(如果当前主题不存在于缓存中)
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 从NameServer更新该主题的路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
// 再次从本地缓存中获取更新后的TopicPublishInfo对象
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
// 如果当前TopicPublishInfo对象包含有效的路由信息(isHaveTopicRouterInfo()为true)
// 或其状态为可用(ok()为true)
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
// 直接返回该Topic的发布信息
return topicPublishInfo;
} else {
// 若仍无有效路由信息,则强制从NameServer更新该主题的路由信息,
// 并传递生产者实例以确保正确处理权限等信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
// 更新后再次从本地缓存中获取最新的TopicPublishInfo对象
topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 返回最终获取到的Topic发布信息
return topicPublishInfo;
}
}
TopicPublishInfo包含的信息
● MessageQueue List: 该主题在所有Broker上可发布的消息队列列表
● 是否顺序Topic
● topicRouteData :路由信息
3、选择队列和发送消息
在这个for循环里面就是选择队列和发送消息了,在这之前,可以看到重试次数3次(同步)或者1次;
也就是这个for循环会循环3此或者1次【成功就退出】
进入for循环 ,选择队列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
启用发送延迟故障容忍的情况 后面分析
默认情况
● 第一选择队列直接走负载均衡
● 重试发送消息 规避上次的Broker队列
然后查看selectOneMessageQueue():目的是在一组MessageQueue中进行负载均衡,通过轮询算法(Round-Robin)选择下一个要发送消息的目标MessageQueue。每次调用该方法时
// 此方法用于从给定的消息队列列表(messageQueueList)中,通过轮询方式选择并返回一个MessageQueue对象。
public MessageQueue selectOneMessageQueue() {
// 针对同一个线程(ThreadLocal)进行自增 这里并不是原子操作!!!!
int index = this.sendWhichQueue.incrementAndGet();
// 计算绝对值后对messageQueueList大小取模,得到当前应该选择的索引位置
// 这是为了实现轮询策略,确保消息均匀分布到所有队列
int pos = Math.abs(index) % this.messageQueueList.size();
// 如果计算出的pos值小于0(理论上此处不会出现负数),则将其设置为0
if (pos < 0)
pos = 0;
// 根据计算出的pos索引值,从messageQueueList中获取并返回对应位置的MessageQueue对象
return this.messageQueueList.get(pos);
}
然后返回for循环,开发发送消息
for (; times < timesTotal; times++) {
// 获取上一次尝试发送时使用的Broker名称(如果尚未初始化,则为null)
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 根据topicPublishInfo和可能存在的上次发送失败的broker,选择一个新的MessageQueue用于发送消息
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
// 如果成功选择了MessageQueue
if (mqSelected != null) {
// 更新当前选择的消息队列
mq = mqSelected;
// 记录已尝试发送消息的Broker名称
brokersSent[times] = mq.getBrokerName();
try {
// 获取当前时间作为前一次尝试的时间戳
beginTimestampPrev = System.currentTimeMillis();
// 如果不是第一次发送(即正在进行重试),则在重发时更新消息的主题名称,加入命名空间信息
if (times > 0) {
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
// 计算从开始发送到当前尝试所消耗的时间
long costTime = beginTimestampPrev - beginTimestampFirst;
// 检查是否已经超过了设定的超时时间
if (timeout < costTime) {
// 若已超时,则标记为超时并跳出重试循环
callTimeout = true;
break;
}
// 调用核心发送逻辑,向选定的MessageQueue发送消息
sendResult = this.sendKernelImpl(
msg,
mq,
communicationMode,
sendCallback,
topicPublishInfo,
timeout - costTime);
// 获取当前时间作为结束时间戳
endTimestamp = System.currentTimeMillis();
/*
* 根据消息的发送时长,更新broker不可用时间
* 最后一个参数 消息发送成功是false,发送失败是true
* 后面的异常处理【此处代码省略】 最后一个参数是true
*/
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
// 根据通信模式处理发送结果
switch (communicationMode) {
case ASYNC:
// 异步模式下,直接返回null(异步发送不等待响应)
return null;
case ONEWAY:
// 单向模式下,同样直接返回null(只发送不关心响应和结果)
return null;
case SYNC:
// 同步模式下,检查发送状态
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
// 如果发送未成功,并且配置允许在存储失败时重试其他Broker,则继续下一次重试
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
// 发送成功或满足条件的情况下,返回发送结果
return sendResult;
default:
// 非法的通信模式,不做任何处理(理论上不应出现这种情况)
break;
}
} //.....异常捕获
}
发送消息的核心代码 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
启用发送延迟故障容忍的情况
在消息的发送过程中有如下代码
● 根据消息的发送时长,更新broker不可用时间
● 最后一个参数 消息发送成功是false,发送失败是true
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
进入次方法 发现需要启用sendLatencyFaultEnable。在选择消息队列的使用也有这个判断。sendLatencyFaultEnable默认为false。现在我们分析sendLatencyFaultEnable=true的情况
producer.setSendLatencyFaultEnable(true);打开发送延迟故障容忍
当 isolation=true ,duration=30000 ,相当于latencyMax[5] 就会对应notAvailableDuration[5],
currentLatency(当前延迟)——>duration(持续时间表)对应关系
[0,50)->0 ; [50,100)->0 ; [100,550)->0 ; [550,1000)->30000 ;
[1000,2000)->60000 ; [2000,3000)->120000 ; [3000,15000)->180000 ; [15000, + )->600000 ;
继续updateFaultItem方法
// brokerName 、 上次发送用的时间 、 duration
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
// 根据brokerName的到故障表
FaultItem old = this.faultItemTable.get(name);//faultItemTable是ConcurrentHashMap
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
//设置 延迟时间
faultItem.setCurrentLatency(currentLatency);
//设置此broker可用的时间: 当前时间 +notAvailableDuration[]时间
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
//存入 故障表
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
然后在回到队列选择代码 进入启用发送延迟故障容忍的情况
// 如果开启 发送延迟故障容忍的情况下 默认是未开启
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().incrementAndGet();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// todo 判断该消息队列所在的broker是否根据延迟容错策略可用。
// 如果是,则直接返回这个消息队列
// isAvailable 其实就是判断当前时间 是否 超 过当前broker能启动的时间
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
// todo 如果上面没有找到可用的broker,尝试通过latencyFaultTolerance
// 策略至少选择一个可能不是最优但可接受的broker名称。
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
// todo 果找到了非最优broker且其有可用消息队列,则调用selectOneMessageQueue()方法获得一个消息队列,
// 并将其broker信息更新为notBestBroker,同时设置一个合适的队列ID。
// 返回的是当前broker的队列数量
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
} else {
// todo 否则 说明所选broker无可用消息队列,将该broker从延迟容错策略中移除。
// 然后走,默认
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
选择一个可能不是最优但可接受的broker名称 原理
从故障表中获得所有的FaultItem 然后根据一定的规则排序,最后在中位数附近随机选择一个.
4、通信协议
发送消息用的是rockemq自己的协议,如下
- 命令标识:code: 命令码,用于标识具体的命令类型或错误码,使得接收方能够识别请求的具体动作或者响应的状态。
- 语言标记:language: 表示使用的编程语言,这里是LanguageCode.JAVA,表明命令由Java程序生成。
- 版本信息:version: 协议版本号,确保不同版本间通信的兼容性。
- 唯一标识:opaque: 请求ID,每次新建命令时递增,用于追踪请求和响应之间的关联。
- 标志位:flag: 用于标识请求或响应的类型,例如区分普通请求和单向请求(RPC_TYPE 和 RPC_ONEWAY)以及响应命令。
- 辅助信息:remark: 备注字段,通常用于携带额外的描述性文本信息。
- 扩展字段:extFields: 一个HashMap,用于存储自定义键值对,可用于传递额外的业务参数或元数据。
- 自定义命令头:customHeader: CommandCustomHeader接口的实现,用于封装特定业务场景下的定制化头部信息。
- 序列化类型:serializeTypeCurrentRPC: 表示当前RPC请求使用的序列化方式,可以是JSON
- 消息正文:body: 存储实际要传输的二进制数据内容。