当前位置: 首页 > article >正文

RocketMQ消息发送---源码解析

我们知道rocketMQ的消息发送支持很多特性,如同步发送,异步发送,oneWay发送,也支持超时机制,回调机制,并且能够保证消息的可靠性和消息发送的限流,底层使用netty框架等等,如此多的特性,我们跟进源码来详细看看

同步发送

我们先来看看同步发送,首先明确,同步的概念指的是发送消息后,阻塞等待直到收到broker的ack后返回。

  • 先看看最简单的send方法,只有一个msg参数,在DefaultMQProducer中

![[Pasted image 20250112205058.png]]

  • 这里进入DefaultMQProducerImpl,并指定发送形式为SYNC,实际上**没有在send中设置回调函数的消息的发送形式就是SYNC![[Pasted image 20250112205425.png]]
  • 进入sendDefaultImpl方法
    ![[Pasted image 20250112210141.png]]

代码很长,我们关注几个点即可

  • getRetryTimesWhenSendFailed很明显是同步发送的时候才会去设置重试次数,这里跟进去会发现默认重试次数是2次
  • for循环里面就是根据重试次数循环
  • selectOneMessageQueue会获取一个将要发送给的队列,这里里面默认是轮询机制,即每一次重试或者下一条消息将要发送,会获取下一个可用队列
  • 最后sendKernelImpl,我们再跟进

sendKernelImpl方法
![[Pasted image 20250112210506.png]]

  • 关键就是调用了MQClientAPIImpl的发送接口

后续进入sendMessage后,经过一些跳转,最后会来到invokeSyncImpl方法:
![[Pasted image 20250112212310.png]]

  • 一个是 channel.writeAndFlush(request),真正通过netty发送消息了,并且添加了一个listener来更新response的结果
  • 同步发送会涉及到阻塞等待broker的ack,我们可以看到这个waitResponse,里面调用countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);,阻塞一段用户设置的超时时间,直到得到结果时里面countDownLatch会countDown,从而进入后面的逻辑
  • 后面就是判断这个获取到的结果是否为null,是null的话要抛出异常,这样前面的重试for循环就会捕获异常并进入下一次循环

异步发送

当我们携带消息的callback的时候,消息就是异步发送的,异步发送前面的代码逻辑跟同步发送很像,直到MQClientAPIImpl之前都是一样,只是多了一个callback参数以及指定发送形式为ASYNC,还有一个特殊点就是异步发送的重试问题

异步发送的重试问题

MQ中,重试是发送端必须要做的,对于broker来说并不在意发送端以何种形式发送消息,其工作只是返回一个ack告诉发送端消息已经抵达从而保证可靠性。

之前我们分析同步发送的时候,是同步发送的时候才会去设置重试次数,并在for中进行重试,而异步的重试次数是1,所以不是在for循环中实现重试的

这个点个人看源码前也在想如果自己实现要怎么实现?既然是异步发送,客户端已经无法通过阻塞拿到结果并对结果判断从而决定是否重试,异步的场景下,发送完消息客户端的主线程就不再跟踪消息,那如何进行重试呢?

主线程不跟踪,或许可以开启另一个线程来跟踪异步消息,这就需要我们对于所有发出去的异步消息封装一个future,通过这个线程来监控这个future的状态。rocketMQ就是这样做的,具体流程我们后续再看

异步发送实现

由于在这个方法之前,前面的逻辑跟同步发送一样,我们直接看这个方法的关键部分

MQClientAPIImpl的sendAsync
![[Pasted image 20250112215630.png]]

  • 我们看到invokeAsync的前半部分,这里和同步发送就有很大的不同,传入了一个回调,这个回调的operationComplete方法一开始获取response,若没有sendCallback(客户端设置的消息发送后的回调),进行一些结果的处理就结束
    ![[Pasted image 20250112215750.png]]

  • 我们看到invokeAsync的后半部分,这里如果response不是null的,意味着异步发送的消息收到ack,成功发送了,就调用sendCallback.onSuccess(sendResult);,出现异常调用onExceptionImpl,如果response是null的,意味着异步发送的消息没有收到ack,那么直接调用onExceptionImpl,这个onExceptionImpl我们再次跟进看看:
    ![[Pasted image 20250112220708.png]]

  • 可以看到,这里面继续调用了sendMessageAsync,并且再次出现异常时会递归调用onExceptionImpl,最外层有一个重试次数的判断,记录目前的重试次数是否大于最大次数,这样就实现了异步发送的重试,我们再看看invokerAsync里面是什么,以及如何调用这个回调的方法

NettyRemotingAbstract的invokeAsyncImpl
![[Pasted image 20250112221033.png]]

1. semaphoreAsync

  • 在深入异步回调机制前,我们先看看这个信号量,前面同步发送的时候是没有这个信号量的,跟进这个信号量会发现会根据配置文件初始化信号量的值,发现oneWay发送和异步发送的最大限制是65535
    public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = //
        Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535"));
    public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE =
        Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535"));

为什么同步的不设置,异步设置呢?

异步发送的核心特点是客户端不会等待每条消息的发送确认,而是批量发送消息,然后由回调函数异步处理结果。如果发送的消息量过大,可能会导致消息的发送状态(成功/失败)处理变得复杂,系统需要确保消息不会丢失或重复,而这需要额外的机制来确保消息的可靠性。因此,为了避免过多消息并发,RocketMQ 对异步发送进行了条数限制。

同步发送每次发送一条消息时,都需要等待该消息的发送确认,直到确认返回,才会继续发送下一条消息。这种逐条确认机制天然可以避免积压太多未确认的消息,因此没有需要限制最大消息条数的问题。

2.responseTable

  • 跟同步发送的逻辑很像,同样创建了responseFuture(封装了以上提到的invokeAsync方法里的invokeCallback回调),并通过channel.writeAndFlush(request)发送消息并在listener中更新结果状态,但是没有阻塞操作waitReponse,那么是如何感应返回结果的状态,并告知前面的MQClientAPIImpl是否进行重试的呢?
  • 关注以上的resposeTable,rocketMQ通过这个table实现了结果状态的感应

NettyRemotingAbstract的scanResponseTable

  • 这个resposeTable实际上是responseFuture的list集合

  • NettyRemotingAbstract初始化的时候会开启一个线程,每1s扫描一次这个list集合
    ![[Pasted image 20250112223327.png]]

  • 如果已经超时,将rf放到一个临时的rflist中

  • 遍历rflist,依次调用其executeInvokeCallback,注意这个时候rf超时意即超过用户设置的异步消息最大的超时时间,无论消息是否收到ack
    ![[Pasted image 20250112223529.png]]

  • 可以看到executeInvokeCallback就是调用responseFuture封装的以上提到的invokeAsync方法里的invokeCallback回调

具体的调用链是:netty的listener更新responseFuture的状态->扫描线程调用responseFuture持有的invokeCallback回调(即其中的operationComplete方法)->invokeCallback的operationComplete调用用户设置的回调

简单来说就是扫描线程会调用到期的responseFuture的回调,回调中判断其结果状态(netty的listener设置),成功则调用用户设置的回调,失败则递归调用sendAsync

总结

RocketMQ的异步和同步发送实际上还有很多特性,本文跟进的是主要流程,且重点关注了异步发送的重试机制,在看源码的过程中发现rocketMQ的设计确实很精妙,自己在项目中也采用了部分思想,也希望能够帮助到大家,后续会尝试写写broker的刷盘机制的源码。


http://www.kler.cn/a/505059.html

相关文章:

  • django在线考试系统
  • mayavi -> python 3D可视化工具Mayavi的安装
  • VSCode Live Server 插件安装和使用
  • Termora跨平台 SSH/SFTP/Terminal 客户端工具
  • Codeforces Round 996 (Div. 2)(4 / 6)
  • Go语言之路————func
  • 【FAQ】HarmonyOS SDK 闭源开放能力 —Map Kit(4)
  • 系统架构设计师考点—UML建模和设计模式
  • ASP.NET Core 全局异常处理
  • 【NLP高频面题 - 分布式训练篇】分布式训练主要解决大模型训练中的哪些问题?
  • Android中下载 HAXM 报错 HAXM installation failed,如何解决?
  • Jmeter进行http接口并发测试
  • MyBatis 中动态 SQL 标签
  • 后端技术选型 sa-token校验学习 中 文档学习
  • 庖丁解java(一篇文章学java)
  • 浅谈PHP之线程锁
  • 【实践】操作系统智能助手OS Copilot新功能测评
  • C语言初阶习题【30】字符串左旋
  • ECharts 折线图隐藏标点
  • Maven 配置本地仓库
  • 矩阵碰一碰发视频之视频剪辑功能开发全解析,支持OEM
  • 音频语言模型与多模态体系结构
  • redis监控会不会统计lua里面执行的命令次数
  • Docker save load 镜像 tag 为 <none>
  • 学习threejs,使用RollControls相机控制器
  • JavaScript-正则表达式方法(RegExp)