当前位置: 首页 > article >正文

启动你的RocketMQ之旅(三)-Producer启动和发送流程(上)

前言
👏作者简介:我是笑霸final。
📝个人主页: 笑霸final的主页2
📕系列专栏:java专栏
📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
🔥如果感觉博主的文章还不错的话,👍点赞👍 + 👀关注👀 + 🤏收藏🤏

上一章节:启动你的RocketMQ之旅(二)-broket和namesrv启动流程
下一章节:启动你的RocketMQ之旅(四)-Producer启动和发送流程(下)

目录

  • 一、namesrv注册broker
    • 1.1注册流程
  • 二、Producer启动流程
  • 三、producer消息同步发送
    • 1、验证消息
    • 2、查找路由
    • 3、选择队列和发送消息
  • 4、通信协议

一、namesrv注册broker

NameServer (Namesrv) 在 RocketMQ 架构中扮演着路由信息管理的角色。它是一个轻量级的服务,不保存状态,主要职责是接收来自 Broker 的注册请求,并为 Producer 和 Consumer 提供路由查询服务。Broker 向 Namesrv 注册的主要原因。

Broker 向 Namesrv 注册的主要原因包括:

  • 提供路由信息
    当 Broker 启动时,它需要将自己的地址(IP和端口)以及它所承载的 Topic 信息等注册到 Namesrv。这样,Producer 和 Consumer 就可以通过 Namesrv 查询到相关的 Broker 地址,进而建立连接进行消息的发送和接收。
  • 高可用性
    RocketMQ 支持多个 Namesrv 实例以实现高可用。Broker 会向集群中的所有 Namesrv 发送注册请求,确保即使某个 Namesrv 出现故障,其他 Namesrv 仍然可以提供完整的路由信息给客户端。
  • 动态发现
    在分布式环境中,Broker 可能会动态增加或减少。通过向 Namesrv 注册,Broker 能够使这种变化被及时感知,从而保证系统的灵活性和可扩展性。
  • 心跳机制
    为了确保 Namesrv 中的路由信息是最新的,Broker 会定期(例如每隔30秒)向 Namesrv 发送心跳包来更新自己的状态。如果 Namesrv 在一定时间内没有收到某个 Broker 的心跳,就会认为该 Broker 已经离线,并从路由表中移除它的信息。
  • 权限控制
    在注册过程中,Broker 还可以设置自己的一些权限信息,比如是否允许读取、写入等,这有助于在 Namesrv 层面实现一定的访问控制。
  • 数据版本管理
    每次注册或者更新信息时,Broker 会携带一个数据版本号,Namesrv 通过这个版本号来判断信息是否有更新,以此避免不必要的处理和资源浪费。

1.1注册流程

注册首先会发送注册的请求到Namesrv 服务,先看看注册消息都有哪些信息
在这里插入图片描述
可以看到 在方法名为registerBroker的方法中,有5个参数:

  • namesrvAddr:NameServer 地址。
  • oneway:布尔值,指示是否为单向请求。
  • timeoutMills:超时时间。
  • requestHeader:请求头对象。
  • body:字节数组,表示请求体数据。

namesrv是如何接收的呢?
在namesrv的DefaultRequestProcessor类下有如下代码
在这里插入图片描述
会根据版本信息来选择不同的方法,我们看registerBroker方法,它有两个参数

  • ctx:通常代表上下文(Context),它包含了调用该方法时的环境信息。
  • request:代表注册请求的具体内容

同时获取返回的注册结果

registerBroker注册的核心代码部分

try {
                // !!!获取写锁并阻塞等待直到获取到锁,确保在更新数据结构时不会发生并发问题
                this.lock.writeLock().lockInterruptibly();
                 // 从集群地址表(clusterAddrTable)中获取指定集群名称对应的Broker名称集合
                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                if (null == brokerNames) {
                    brokerNames = new HashSet<String>();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                brokerNames.add(brokerName);//添加到对应的集群
                 // 标记是否为首次注册该Broker
                boolean registerFirst = false;
                // 从Broker地址表(brokerAddrTable)中查找已存在的BrokerData对象,
                // 代表Broker的信息
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null == brokerData) {
                    registerFirst = true;//是首次注册
                    brokerData = new BrokerData(clusterName, 
                                                brokerName, 
                                                new HashMap<Long, String>());
                    this.brokerAddrTable.put(brokerName, brokerData);
                }
                 // 获取Broker地址映射表(brokerId到brokerAddr)
                Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
                //遍历映射表,移除与当前brokerAddr相同但brokerId不同的记录,
                //保证同一IP:PORT仅有一条记录
                Iterator<Entry<Long, String>> it 
                            = brokerAddrsMap.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<Long, String> item = it.next();
                    if (null != brokerAddr && 
                        brokerAddr.equals(item.getValue()) && 
                        brokerId != item.getKey()) 
                    {
                        it.remove();
                    }
                }
                // 更新或添加新的brokerId和brokerAddr到映射表中
                // 根据hashMap的put方法 返回值是value
                String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                // 若旧地址为空,也视为首次注册
                registerFirst = registerFirst || (null == oldAddr);

                // 如果Topic配置信息存在且当前注册的是主节点(MixAll.MASTER_ID),
                // 则根据配置版本判断是否需要更新队列数据
                if (null != topicConfigWrapper
                    && MixAll.MASTER_ID == brokerId) {
                    if (this.isBrokerTopicConfigChanged(brokerAddr, 
                                            topicConfigWrapper.getDataVersion())
                        || registerFirst) {
                        ConcurrentMap<String, TopicConfig> tcTable =
                            topicConfigWrapper.getTopicConfigTable();
                        if (tcTable != null) {
                            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                this.createAndUpdateQueueData(brokerName, entry.getValue());
                            }
                        }
                    }
                }
         // 创建新的BrokerLiveInfo实例,并将其添加到Broker存活状态表(brokerLiveTable)中
                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                    new BrokerLiveInfo(
                        System.currentTimeMillis(),
                        topicConfigWrapper.getDataVersion(),
                        channel,
                        haServerAddr));
                if (null == prevBrokerLiveInfo) {
                    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                }
         // 处理FilterServer列表,如果为空则从filterServerTable中移除,否则更新
                if (filterServerList != null) {
                    if (filterServerList.isEmpty()) {
                        this.filterServerTable.remove(brokerAddr);
                    } else {
                        this.filterServerTable.put(brokerAddr, filterServerList);
                    }
                }
         // 如果当前注册的是从节点,找到主节点信息并设置结果对象的相关属性
                if (MixAll.MASTER_ID != brokerId) {
                    String masterAddr 
                    = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                    if (masterAddr != null) {
                        BrokerLiveInfo brokerLiveInfo 
                        = this.brokerLiveTable.get(masterAddr);
                        if (brokerLiveInfo != null) {
                            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                            result.setMasterAddr(masterAddr);
                        }
                    }
                }
            } finally {
        // 释放锁
                this.lock.writeLock().unlock();
            }

上面代码解读:
1.获取写锁:确保相关数据的完整性和一致性
● 首先使用ReentrantReadWriteLock的writeLock()方法获取一个可重入写锁,并通过lockInterruptibly()阻塞式地等待获取该锁,确保在更新Name Server内部数据结构时进行同步控制。
2.维护集群信息:
● 从clusterAddrTable中查找给定集群名对应的Broker集合,如果不存在则创建新的集合并添加到表中。
● 将当前注册的Broker名称加入到集合中。
3.管理Broker地址和状态:
● 获取或初始化BrokerData对象,存储关于Broker集群、名称及地址映射信息(brokerId到brokerAddr)。
● 遍历Broker地址映射表,移除与当前相同地址但不同ID的条目,保证每个地址仅有一条记录。
● 更新Broker地址映射表,将新注册的brokerId和brokerAddr存入。
● 若为首次注册,标记registerFirst为true,后续用于判断是否需要更新Topic配置信息。
4.处理Topic配置:
● 如果Broker提供的是主节点且主题配置信息存在,则检查Topic配置版本是否有变化或者首次注册,如果有,则根据TopicConfigWrapper中的Topic配置信息创建并更新队列数据。
5.维护Broker存活状态
● 在brokerLiveTable中保存Broker的实时连接信息,包括当前时间戳、主题配置版本、通道(Channel)以及高可用服务器地址(HA Server Address)。
● 若Broker是新注册的,输出日志信息表明新Broker已成功注册。
6.处理FilterServer列表:
● 根据Broker提供的过滤器服务器列表,将其存入filterServerTable,若为空则从表中移除对应Broker的过滤器服务器信息。
7.设置返回结果:
● 如果注册的是从节点(Slave),查找其关联的主节点信息,并将主节点的HA服务器地址和地址信息设置到返回的结果对象中。
8.释放锁

二、Producer启动流程

producer启动入口是producer.start()方法里面调用的是defaultMQProducerImpl.start()

public void start() throws MQClientException {
        this.setProducerGroup(withNamespace(this.producerGroup));
        this.defaultMQProducerImpl.start();
        if (null != traceDispatcher) {
            /*
            *异步传输的接口。启动条件如下是:
            * 1、客户端配置了消息追踪: 开发者在启动生产者时
                通过设置相关配置参数
            *   (如系统属性或XML配置文件)来开启消息追踪功能
            * 2、消息追踪插件被激活: 
                如果使用了RocketMQ的消息追踪插件或者扩展,
                那么在生产者初始化阶段,插件可能会主动创建并注入一个
                有效的traceDispatcher实例
            * 3、自定义实现: 在某些高级应用场景下,开发者可能
                自己实现了消息追踪逻辑,并在生产者中
                手动设置了非空的traceDispatcher对象。
            */
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

在defaultMQProducerImpl.start()中 流程如下
在这里插入图片描述

三、producer消息同步发送

同步发送的步骤为4步
1、验证消息;2、查找路由;3、选择队列;4、消息发送

1、验证消息

在这里插入图片描述
传入了msg和当前producer实例,进入此方法它检查如下几点
● msg是否为空
● topic名称是否符合MQ规范
● 验证该topic是否允许生产者发送消息
● 验证body是否存在和长度是否超过mq的规定(大小不超过4m)

2、查找路由

查找路由的流程如下
在这里插入图片描述

直接上代码

// 尝试获取或更新给定主题(topic)的发布信息
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    // 从本地缓存(topicPublishInfoTable)中尝试获取与该主题对应的TopicPublishInfo对象
    // topicPublishInfoTable是一个 ConcurrentMap
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);

    // 如果本地缓存中没有找到该主题的发布信息,或者已有的发布信息不可用(!topicPublishInfo.ok())
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        // 在本地缓存中添加一个新的TopicPublishInfo实例(如果当前主题不存在于缓存中)
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());

        // 从NameServer更新该主题的路由信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);

        // 再次从本地缓存中获取更新后的TopicPublishInfo对象
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    // 如果当前TopicPublishInfo对象包含有效的路由信息(isHaveTopicRouterInfo()为true)
    // 或其状态为可用(ok()为true)
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        // 直接返回该Topic的发布信息
        return topicPublishInfo;
    } else {
        // 若仍无有效路由信息,则强制从NameServer更新该主题的路由信息,
        // 并传递生产者实例以确保正确处理权限等信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);

        // 更新后再次从本地缓存中获取最新的TopicPublishInfo对象
        topicPublishInfo = this.topicPublishInfoTable.get(topic);

        // 返回最终获取到的Topic发布信息
        return topicPublishInfo;
    }
}

TopicPublishInfo包含的信息
● MessageQueue List: 该主题在所有Broker上可发布的消息队列列表
● 是否顺序Topic
● topicRouteData :路由信息

3、选择队列和发送消息

在这个for循环里面就是选择队列和发送消息了,在这之前,可以看到重试次数3次(同步)或者1次;
也就是这个for循环会循环3此或者1次【成功就退出】在这里插入图片描述

进入for循环 ,选择队列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);在这里插入图片描述

启用发送延迟故障容忍的情况 后面分析
默认情况
● 第一选择队列直接走负载均衡
● 重试发送消息 规避上次的Broker队列在这里插入图片描述

然后查看selectOneMessageQueue():目的是在一组MessageQueue中进行负载均衡,通过轮询算法(Round-Robin)选择下一个要发送消息的目标MessageQueue。每次调用该方法时

// 此方法用于从给定的消息队列列表(messageQueueList)中,通过轮询方式选择并返回一个MessageQueue对象。
public MessageQueue selectOneMessageQueue() {
    // 针对同一个线程(ThreadLocal)进行自增 这里并不是原子操作!!!!
    int index = this.sendWhichQueue.incrementAndGet();

    // 计算绝对值后对messageQueueList大小取模,得到当前应该选择的索引位置
    // 这是为了实现轮询策略,确保消息均匀分布到所有队列
    int pos = Math.abs(index) % this.messageQueueList.size();

    // 如果计算出的pos值小于0(理论上此处不会出现负数),则将其设置为0
    if (pos < 0)
        pos = 0;

    // 根据计算出的pos索引值,从messageQueueList中获取并返回对应位置的MessageQueue对象
    return this.messageQueueList.get(pos);
}

然后返回for循环,开发发送消息

  for (; times < timesTotal; times++) {
      // 获取上一次尝试发送时使用的Broker名称(如果尚未初始化,则为null)
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
       // 根据topicPublishInfo和可能存在的上次发送失败的broker,选择一个新的MessageQueue用于发送消息
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
               // 如果成功选择了MessageQueue
    if (mqSelected != null) {
        // 更新当前选择的消息队列
        mq = mqSelected;
        // 记录已尝试发送消息的Broker名称
        brokersSent[times] = mq.getBrokerName();

        try {
            // 获取当前时间作为前一次尝试的时间戳
            beginTimestampPrev = System.currentTimeMillis();

            // 如果不是第一次发送(即正在进行重试),则在重发时更新消息的主题名称,加入命名空间信息
            if (times > 0) {
                msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
            }

            // 计算从开始发送到当前尝试所消耗的时间
            long costTime = beginTimestampPrev - beginTimestampFirst;

            // 检查是否已经超过了设定的超时时间
            if (timeout < costTime) {
                // 若已超时,则标记为超时并跳出重试循环
                callTimeout = true;
                break;
            }

            // 调用核心发送逻辑,向选定的MessageQueue发送消息
            sendResult = this.sendKernelImpl(
                                        msg, 
                                        mq, 
                                        communicationMode, 
                                        sendCallback, 
                                        topicPublishInfo, 
                                        timeout - costTime);

            // 获取当前时间作为结束时间戳
            endTimestamp = System.currentTimeMillis();

           /*
            * 根据消息的发送时长,更新broker不可用时间
            * 最后一个参数  消息发送成功是false,发送失败是true
            * 后面的异常处理【此处代码省略】  最后一个参数是true
            */
            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

            // 根据通信模式处理发送结果
            switch (communicationMode) {
                case ASYNC:
                    // 异步模式下,直接返回null(异步发送不等待响应)
                    return null;
                case ONEWAY:
                    // 单向模式下,同样直接返回null(只发送不关心响应和结果)
                    return null;
                case SYNC:
                    // 同步模式下,检查发送状态
                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                        // 如果发送未成功,并且配置允许在存储失败时重试其他Broker,则继续下一次重试
                        if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                            continue;
                        }
                    }
                    
                    // 发送成功或满足条件的情况下,返回发送结果
                    return sendResult;
                default:
                    // 非法的通信模式,不做任何处理(理论上不应出现这种情况)
                    break;
                        }
                    } //.....异常捕获
                }

发送消息的核心代码 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

启用发送延迟故障容忍的情况
在消息的发送过程中有如下代码
● 根据消息的发送时长,更新broker不可用时间
● 最后一个参数 消息发送成功是false,发送失败是true

this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

进入次方法 发现需要启用sendLatencyFaultEnable。在选择消息队列的使用也有这个判断。sendLatencyFaultEnable默认为false。现在我们分析sendLatencyFaultEnable=true的情况
producer.setSendLatencyFaultEnable(true);打开发送延迟故障容忍
在这里插入图片描述
在这里插入图片描述

当 isolation=true ,duration=30000 ,相当于latencyMax[5] 就会对应notAvailableDuration[5],
currentLatency(当前延迟)——>duration(持续时间表)对应关系
[0,50)->0 ; [50,100)->0 ; [100,550)->0 ; [550,1000)->30000 ;
[1000,2000)->60000 ; [2000,3000)->120000 ; [3000,15000)->180000 ; [15000, + )->600000 ;在这里插入图片描述
继续updateFaultItem方法

// brokerName   、  上次发送用的时间 、 duration
    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
        // 根据brokerName的到故障表
        FaultItem old = this.faultItemTable.get(name);//faultItemTable是ConcurrentHashMap
        if (null == old) {
            final FaultItem faultItem = new FaultItem(name);
            //设置 延迟时间
            faultItem.setCurrentLatency(currentLatency);
            //设置此broker可用的时间: 当前时间 +notAvailableDuration[]时间
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            //存入 故障表
            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    }

然后在回到队列选择代码 进入启用发送延迟故障容忍的情况

  // 如果开启 发送延迟故障容忍的情况下 默认是未开启
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    // todo 判断该消息队列所在的broker是否根据延迟容错策略可用。
                    // 如果是,则直接返回这个消息队列
                    // isAvailable 其实就是判断当前时间 是否 超 过当前broker能启动的时间
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }

                // todo 如果上面没有找到可用的broker,尝试通过latencyFaultTolerance
                //  策略至少选择一个可能不是最优但可接受的broker名称。
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                // todo 果找到了非最优broker且其有可用消息队列,则调用selectOneMessageQueue()方法获得一个消息队列,
                //  并将其broker信息更新为notBestBroker,同时设置一个合适的队列ID。
                // 返回的是当前broker的队列数量
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    }
                    return mq;
                } else {
                    // todo 否则 说明所选broker无可用消息队列,将该broker从延迟容错策略中移除。
                    //  然后走,默认
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

选择一个可能不是最优但可接受的broker名称 原理
在这里插入图片描述
从故障表中获得所有的FaultItem 然后根据一定的规则排序,最后在中位数附近随机选择一个.

4、通信协议

发送消息用的是rockemq自己的协议,如下
在这里插入图片描述在这里插入图片描述

  1. 命令标识:code: 命令码,用于标识具体的命令类型或错误码,使得接收方能够识别请求的具体动作或者响应的状态。
  2. 语言标记:language: 表示使用的编程语言,这里是LanguageCode.JAVA,表明命令由Java程序生成。
  3. 版本信息:version: 协议版本号,确保不同版本间通信的兼容性。
  4. 唯一标识:opaque: 请求ID,每次新建命令时递增,用于追踪请求和响应之间的关联。
  5. 标志位:flag: 用于标识请求或响应的类型,例如区分普通请求和单向请求(RPC_TYPE 和 RPC_ONEWAY)以及响应命令。
  6. 辅助信息:remark: 备注字段,通常用于携带额外的描述性文本信息。
  7. 扩展字段:extFields: 一个HashMap,用于存储自定义键值对,可用于传递额外的业务参数或元数据。
  8. 自定义命令头:customHeader: CommandCustomHeader接口的实现,用于封装特定业务场景下的定制化头部信息。
  9. 序列化类型:serializeTypeCurrentRPC: 表示当前RPC请求使用的序列化方式,可以是JSON
  10. 消息正文:body: 存储实际要传输的二进制数据内容。

http://www.kler.cn/a/568433.html

相关文章:

  • SpringCloud系列教程(十):token验证
  • Hadoop之01:HDFS分布式文件系统
  • 纳米材料简介
  • 【VitePress】vitepress 中 markdown 写作使用
  • Hadoop之02:MR-图解
  • 基于Flask实现的多语言Hello World
  • 企业如何将ERP和BPM项目结合提升核心竞争力
  • 【数据挖掘】Matplotlib
  • Kubespray部署企业级高可用K8S指南
  • 基于Python Django的人脸识别上课考勤系统(附源码,部署)
  • uniapp 系统学习,从入门到实战(七)—— 网络请求与数据交互
  • 《国密算法开发实战:从合规落地到性能优化》
  • springboot项目Maven打包遇到的问题总结
  • 使用 REINFORCE 算法强化梯度策略
  • Android Studio安装与配置详解
  • 为你详细介绍系统数据库的概念结构、逻辑结构、物理结构设计方法,以及数据库的物理独立性的相关内容:
  • 正向代理、反向代理
  • 网络安全与等保2.0
  • 【Java项目】基于Spring Boot的体质测试数据分析及可视化设计
  • 力扣2662. 前往目标的最小代价