RocketMQ消息发送---源码解析
我们知道rocketMQ的消息发送支持很多特性,如同步发送,异步发送,oneWay发送,也支持超时机制,回调机制,并且能够保证消息的可靠性和消息发送的限流,底层使用netty框架等等,如此多的特性,我们跟进源码来详细看看
同步发送
我们先来看看同步发送,首先明确,同步的概念指的是发送消息后,阻塞等待直到收到broker的ack后返回。
- 先看看最简单的send方法,只有一个msg参数,在DefaultMQProducer中
- 这里进入
DefaultMQProducerImpl
,并指定发送形式为SYNC,实际上**没有在send中设置回调函数的消息的发送形式就是SYNC - 进入
sendDefaultImpl
方法
代码很长,我们关注几个点即可
- getRetryTimesWhenSendFailed很明显是同步发送的时候才会去设置重试次数,这里跟进去会发现默认重试次数是2次
- for循环里面就是根据重试次数循环
- selectOneMessageQueue会获取一个将要发送给的队列,这里里面默认是轮询机制,即每一次重试或者下一条消息将要发送,会获取下一个可用队列
- 最后sendKernelImpl,我们再跟进
sendKernelImpl
方法
- 关键就是调用了
MQClientAPIImpl
的发送接口
后续进入sendMessage后,经过一些跳转,最后会来到invokeSyncImpl
方法:
- 一个是 channel.writeAndFlush(request),真正通过netty发送消息了,并且添加了一个listener来更新response的结果
- 同步发送会涉及到阻塞等待broker的ack,我们可以看到这个waitResponse,里面调用countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);,阻塞一段用户设置的超时时间,直到得到结果时里面countDownLatch会countDown,从而进入后面的逻辑
- 后面就是判断这个获取到的结果是否为null,是null的话要抛出异常,这样前面的重试for循环就会捕获异常并进入下一次循环
异步发送
当我们携带消息的callback的时候,消息就是异步发送的,异步发送前面的代码逻辑跟同步发送很像,直到MQClientAPIImpl之前都是一样,只是多了一个callback参数以及指定发送形式为ASYNC,还有一个特殊点就是异步发送的重试问题
异步发送的重试问题
MQ中,重试是发送端必须要做的,对于broker来说并不在意发送端以何种形式发送消息,其工作只是返回一个ack告诉发送端消息已经抵达从而保证可靠性。
之前我们分析同步发送的时候,是同步发送的时候才会去设置重试次数,并在for中进行重试,而异步的重试次数是1,所以不是在for循环中实现重试的
这个点个人看源码前也在想如果自己实现要怎么实现?既然是异步发送,客户端已经无法通过阻塞拿到结果并对结果判断从而决定是否重试,异步的场景下,发送完消息客户端的主线程就不再跟踪消息,那如何进行重试呢?
主线程不跟踪,或许可以开启另一个线程来跟踪异步消息,这就需要我们对于所有发出去的异步消息封装一个future,通过这个线程来监控这个future的状态。rocketMQ就是这样做的,具体流程我们后续再看
异步发送实现
由于在这个方法之前,前面的逻辑跟同步发送一样,我们直接看这个方法的关键部分
MQClientAPIImpl的sendAsync
-
我们看到
invokeAsync
的前半部分,这里和同步发送就有很大的不同,传入了一个回调,这个回调的operationComplete方法一开始获取response,若没有sendCallback(客户端设置的消息发送后的回调),进行一些结果的处理就结束
-
我们看到
invokeAsync
的后半部分,这里如果response不是null的,意味着异步发送的消息收到ack,成功发送了,就调用sendCallback.onSuccess(sendResult);,出现异常调用onExceptionImpl,如果response是null的,意味着异步发送的消息没有收到ack,那么直接调用onExceptionImpl,这个onExceptionImpl我们再次跟进看看:
-
可以看到,这里面继续调用了sendMessageAsync,并且再次出现异常时会递归调用onExceptionImpl,最外层有一个重试次数的判断,记录目前的重试次数是否大于最大次数,这样就实现了异步发送的重试,我们再看看invokerAsync里面是什么,以及如何调用这个回调的方法
NettyRemotingAbstract的invokeAsyncImpl
1. semaphoreAsync
- 在深入异步回调机制前,我们先看看这个信号量,前面同步发送的时候是没有这个信号量的,跟进这个信号量会发现会根据配置文件初始化信号量的值,发现oneWay发送和异步发送的最大限制是65535
public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = //
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535"));
public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE =
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535"));
为什么同步的不设置,异步设置呢?
异步发送的核心特点是客户端不会等待每条消息的发送确认,而是批量发送消息,然后由回调函数异步处理结果。如果发送的消息量过大,可能会导致消息的发送状态(成功/失败)处理变得复杂,系统需要确保消息不会丢失或重复,而这需要额外的机制来确保消息的可靠性。因此,为了避免过多消息并发,RocketMQ 对异步发送进行了条数限制。
同步发送每次发送一条消息时,都需要等待该消息的发送确认,直到确认返回,才会继续发送下一条消息。这种逐条确认机制天然可以避免积压太多未确认的消息,因此没有需要限制最大消息条数的问题。
2.responseTable
- 跟同步发送的逻辑很像,同样创建了responseFuture(封装了以上提到的invokeAsync方法里的invokeCallback回调),并通过channel.writeAndFlush(request)发送消息并在listener中更新结果状态,但是没有阻塞操作waitReponse,那么是如何感应返回结果的状态,并告知前面的MQClientAPIImpl是否进行重试的呢?
- 关注以上的resposeTable,rocketMQ通过这个table实现了结果状态的感应
NettyRemotingAbstract的scanResponseTable
-
这个resposeTable实际上是responseFuture的list集合
-
NettyRemotingAbstract初始化的时候会开启一个线程,每1s扫描一次这个list集合
-
如果已经超时,将rf放到一个临时的rflist中
-
遍历rflist,依次调用其executeInvokeCallback,注意这个时候rf超时意即超过用户设置的异步消息最大的超时时间,无论消息是否收到ack
-
可以看到executeInvokeCallback就是调用responseFuture封装的以上提到的invokeAsync方法里的invokeCallback回调
具体的调用链是:netty的listener更新responseFuture的状态->扫描线程调用responseFuture持有的invokeCallback回调(即其中的operationComplete方法)->invokeCallback的operationComplete调用用户设置的回调
简单来说就是扫描线程会调用到期的responseFuture的回调,回调中判断其结果状态(netty的listener设置),成功则调用用户设置的回调,失败则递归调用sendAsync
总结
RocketMQ的异步和同步发送实际上还有很多特性,本文跟进的是主要流程,且重点关注了异步发送的重试机制,在看源码的过程中发现rocketMQ的设计确实很精妙,自己在项目中也采用了部分思想,也希望能够帮助到大家,后续会尝试写写broker的刷盘机制的源码。