消息中间件之RocketMQ源码分析(六)
Consumer消费方式
RocketMQ的消费方式包含Pull和Push两种
- Pull方式。
用户主动Pull消息,自主管理位点,可以灵活地掌控消费进度和消费速度,适合流计算、消费特别耗时等特殊的消费场景。
缺点也显而易见,需要从代码层面精准地控制消费,对开发人员有一定要求,再RocketMQ中
org.apache.rocketmq.client.consumer.DefaultMQPullConsume是默认的Pull消费者实现类
1.fetchSubscribeMessageQueues(String topic).拉取全部可以消费的Queue.如果某一个Broker下线,这里也可以实时感知到
2.遍历全部Queue,拉取每个Queue可以消费的消息
3.如果拉取到消息,则执行用户编写的消费代码
4.保存消费进度。消费进度可以执行updateConsumeOffset()方法,将消息位点上报给Broker,也可以自行保存消费位点。比如流计算平台Flink使用Pull方式拉取消息消费,通过Checkpoint管理消费进度 - Push方式。
代码介入非常简单,适合大部分业务场景。缺点灵活度差,在了解消费原理后,排查消费问题可简单快捷.在RocketMQ中org.apache.rocketmq.client.consumer.DefaultMQPushConsumer是默认的Push消费者实现类
1.初始化Push消费者实例。业务代码初始化DefaultMQPushConsumer实例,启动Push服务PullMessageService.该服务是一个线程服务,不断执行run()方法拉取已经订阅Topic的全部队列的消息,将消息保存在本地的缓存队列中
启动PullMessageService的拉取服务
PullMessageService不断拉取消息。pullRequestQueue中保存着待拉取地Topic和Queue消息,程序不断从pullRequestQueue中获取PullRequest并执行拉取消息方法
消费者拉取消息并消费,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
基本校验。校验ProcessQueue是否dropped;校验消费者服务状态是否正常;校验消费者是否被挂起。在Rebalance时,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance方法在运行时设置ProcessQueue.setDropped(true)的逻辑,,设置成功后,在执行拉取消息时,将不再拉取dropped为true的ProcessQueue
拉取条数、字节数限制检查。如果本地缓存消息数量大于配置的最大拉取条数(默认1000,可以调整),则延迟一段时间再拉取;
如果本地缓存消息字节数大于配置的最大缓存字节数,则延迟i短时间再拉取,这两种校验方式都相当于本地流控
并发消费和顺序消费校验。
在并发消费时,processQueue.getMaxSpan()方法是用于计算本地缓存队列中的哥消息和最后一个消息的offset差值。
顺序消费时,如果当前拉取的队列在Broker没有被锁定,说明已经由拉取正在执行,当前拉取请求晚点执行,如果不是第一次拉取,
需要先计算最新的拉取位点并修正最新的待拉取位点信息,再执行拉取
本地缓存队列的Span如果大于配置的最大差值(默认2000,可以调整),
则认为本地消费过慢,需要执行本地流控
队列锁定
订阅关系校验。如果待拉取的Topic在本地缓存中订阅关系为空,则本地拉取不执行,待下一个正常心跳或者Rebalance后订阅关系恢复正常,方可正常拉取
封装拉取请求和拉取后的回调对象PullCallback。这里主要将消息拉取请求和拉取结果处理封装成PullCallback,
并通过调用PullAPIWrapper.pullKernelImpl()方法将拉取请求发出去。
如果拉取到消息,那么将消息保存到对应的本地缓存队列ProcessQueue中,然后将这些消息交给ConsumeService服务;
2.核心-消费消息。由消费服务ConsumeMessageConcurrentlyService或者ConsumeMessagOrderlyService
将本地缓存队列中的消息不断放入到消费线程池,异步回调业务消费代码,此时业务代码可以消费消息
3.核心-保存消费进度。业务代码消费后,将消费结果返回给消费服务,再由消费服务将消费进度保存在本地,
由消费进度管理服务定时和不定时地持久化到本地(LcoalFileOffsetStore支持)或者远程Broker(RemoteBrokerOffsetStore支持)中,
对于消费失败地消息,RocketMQ客户端处理后发回给Broker,并告知消费失败
- Pull和Push方式的比较