当前位置: 首页 > article >正文

rocketmq-pull模式-消费重平衡和拉取PullTaskImpl线程

1、观察consumer的线程模型

使用arthas分析
在这里插入图片描述

  • MQClientFactoryScheduledThread 定时任务线程 (和push模式一致)
    定时任务线程,包含如下任务:
    每2分钟更新nameServer列表
    每30秒更新topic的路由信息
    每30秒检查broker的存活,发送心跳请求
    每5秒持久化消费队列的offset。如果是广播模式,持久化在本地;如果是集群模式,反馈给broker
    每分钟调整线程池大小(实际上并没有作用。因为最终执行是空方法)

  • PullMessageService 这个线程是专门给push模式使用的。在pull模式下,没有作用

  • RebalanceService 重平衡线程。每20秒执行一次
    pull模式和push模式都用该线程。区别在于messageQueue变更后,处理有区别。
    详细看
    org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic
    org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance
    org.apache.rocketmq.client.impl.consumer.RebalanceImpl#messageQueueChanged
    后面的类图也会红色标记出来

  • PullMsgThread-lite_pull_consumer_testN线程
    pull模式下,每个队列messageQueue会包装成一个PullTaskImpl任务,从broker中拉取msg。放到线程池中死循环执行。这是线程名称。下文有描述

整体工作流程图如下:

在这里插入图片描述

步骤分析

步骤1、启动后触发重平衡线程RebalanceService

重平衡任务org.apache.rocketmq.client.impl.consumer.RebalanceService#run()
重平衡线程获取到topic对应的Set<> mqSet,当前消费者组group的所有消费者List<> cidAll
cid就是消费端的唯一标识。格式如下:“ip@pid#时间戳”,比如127.01.01.01@1723#2926328724786400
关于重平衡的前期部分,已经在push模式有描述,参考文章rocketmq-push模式-消费侧重平衡-类流程图分析
不再详述

步骤2、messageQueue有更新,调用MessageQueueListener

监听器类org.apache.rocketmq.client.consumer.MessageQueueListener是专门给pull模式使用的

public void updateAssignQueueAndStartPullTask(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
        MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
        switch (messageModel) {
            case BROADCASTING:
                updateAssignedMessageQueue(topic, mqAll);
                updatePullTask(topic, mqAll);
                break;
            case CLUSTERING:
                updateAssignedMessageQueue(topic, mqDivided);
                updatePullTask(topic, mqDivided);
                break;
            default:
                break;
        }
    }

监听变化后,会做两件事updateAssignedMessageQueue和updatePullTask

步骤3、更新updateAssignedMessageQueue

org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl#assignedMessageQueue
该对象是pull模式专用。

public class AssignedMessageQueue {

    private final ConcurrentHashMap<MessageQueue, MessageQueueState> assignedMessageQueueState;

    private RebalanceImpl rebalanceImpl;

    public AssignedMessageQueue() {
        assignedMessageQueueState = new ConcurrentHashMap<MessageQueue, MessageQueueState>();
    }

    public void setRebalanceImpl(RebalanceImpl rebalanceImpl) {
        this.rebalanceImpl = rebalanceImpl;
    }

从该类来看,最重要的字段是assignedMessageQueueState,存储的是订阅的所有topic的所有MessageQueue,做map存储。
MessageQueueState的信息如下

 private class MessageQueueState {
        private MessageQueue messageQueue;
        private ProcessQueue processQueue;
        private volatile boolean paused = false;
        private volatile long pullOffset = -1;
        private volatile long consumeOffset = -1;
        private volatile long seekOffset = -1;

        private MessageQueueState(MessageQueue messageQueue, ProcessQueue processQueue) {
            this.messageQueue = messageQueue;
            this.processQueue = processQueue;
        }

pullOffset 、consumeOffset 、seekOffset 这几个字段有比较大的作用,消费过程中,有多处会设置这几个值。较复杂,后续再做统一的分析

updateAssignedMessageQueue()方法,主要的目的是对assignedMessageQueueState做处理,重平衡后做新增add或者移除操作

步骤4、更新updatePullTask

private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
        Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
            if (next.getKey().getTopic().equals(topic)) {
                if (!mqNewSet.contains(next.getKey())) {
                    next.getValue().setCancelled(true);
                    it.remove();
                }
            }
        }
        startPullTask(mqNewSet);
    }

用到的队列MessageQueue,taskTable会维护对应的PullTaskImpl线程任务。
org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl#taskTable

针对该topic,如果当前内存的taskTable中的MessageQueue已经不再有效,就会将PullTaskImpl的状态设置成Cancel,使得对应线程终止运行。

startPullTask()

 private void startPullTask(Collection<MessageQueue> mqSet) {
        for (MessageQueue messageQueue : mqSet) {
            if (!this.taskTable.containsKey(messageQueue)) {
                PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
                this.taskTable.put(messageQueue, pullTask);
                this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
            }
        }
    }

是将当前队列,包装成PullTaskImpl的线程任务,提交给线程池scheduledThreadPoolExecutor。在arthas上,线程名称就是PullMsgThread-lite_pull_consumer_testN。核心线程数是20

步骤5、PullTaskImpl线程执行

对队列做流控策略。检查cachedMessageCount、cachedMessageSizeInMiB、processQueue.getMaxSpan()
如果不满足要求,则延时再做任务

long cachedMessageCount = processQueue.getMsgCount().get();
                long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

                if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) {
                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
                    if ((queueFlowControlTimes++ % 1000) == 0) {
                        log.warn(
                            "The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
                            defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
                    }
                    return;
                }

                if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
                    if ((queueFlowControlTimes++ % 1000) == 0) {
                        log.warn(
                            "The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
                            defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
                    }
                    return;
                }

                if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) {
                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
                    if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                        log.warn(
                            "The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
                            processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), queueMaxSpanFlowControlTimes);
                    }
                    return;
                }

步骤6、同步请求broker,拉取消息

try {
                    SubscriptionData subscriptionData;
                    String topic = this.messageQueue.getTopic();
                    if (subscriptionType == SubscriptionType.SUBSCRIBE) {
                        subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
                    } else {
                        String subExpression4Assign = topicToSubExpression.get(topic);
                        subExpression4Assign = subExpression4Assign == null ? SubscriptionData.SUB_ALL : subExpression4Assign;
                        subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression4Assign);
                    }

                    PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
                    if (this.isCancelled() || processQueue.isDropped()) {
                        return;
                    }
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
                            synchronized (objLock) {
                                if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
                                    processQueue.putMessage(pullResult.getMsgFoundList());
                                    submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
                                }
                            }
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn("The pull request offset illegal, {}", pullResult.toString());
                            break;
                        default:
                            break;
                    }
                    updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue);
                } catch (InterruptedException interruptedException) {

关键代码是PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
该方法是同步调用,默认一次拉取10条消息

步骤7、将消息msg放到consumeRequestCache中

private void submitConsumeRequest(ConsumeRequest consumeRequest) {
        try {
            consumeRequestCache.put(consumeRequest);
        } catch (InterruptedException e) {
            log.error("Submit consumeRequest error", e);
        }
    }

org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl#consumeRequestCache

步骤8、任务PullTaskImpl循环执行

PullTaskImpl的run方法,最后是将this,又重新投到线程池中,实现循环执行。

 if (!this.isCancelled()) {
    scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
 } else {
     log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
 }

查看官方消费的例子,调用poll方法,从consumeRequestCache上获取msg

public class LitePullConsumerSubscribe {

    public static volatile boolean running = true;

    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";

    public static void main(String[] args) throws Exception {
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
        litePullConsumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
        litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        litePullConsumer.subscribe("TopicTest", "*");
        litePullConsumer.start();
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.println(new Date());
                System.out.printf("%s%n", messageExts);
            }
        } finally {
            litePullConsumer.shutdown();
        }
    }
}

关键代码是litePullConsumer.poll();死循环执行。
poll方法,从consumeRequestCache上获取msg

public synchronized List<MessageExt> poll(long timeout) {
        try {
            checkServiceState();
            if (timeout < 0) {
                throw new IllegalArgumentException("Timeout must not be negative");
            }

            if (defaultLitePullConsumer.isAutoCommit()) {
                maybeAutoCommit();
            }
            long endTime = System.currentTimeMillis() + timeout;

            ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);

            if (endTime - System.currentTimeMillis() > 0) {
                while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
                    consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                    if (endTime - System.currentTimeMillis() <= 0) {
                        break;
                    }
                }
            }

总结

后续有空再总结。主要是比较push和pull模式。

后续分析

offset在消费拉取过程中,是如何使用的?和broker是如何交互的?


http://www.kler.cn/a/466963.html

相关文章:

  • 深度学习中的步数指的是什么
  • 外网访问本地部署的 VMware ESXi 服务
  • 企业级网络运维管理系统深度解析与实践案例
  • 设计模式 创建型 工厂模式(Factory Pattern)与 常见技术框架应用 解析
  • 《学校一卡通管理系统》数据库MySQL的设计与实现
  • 【前序、中序、后序遍历递归栈的实现】
  • ubuntu1604 apt镜像源切换
  • 使用PyTorch实现基于稀疏编码的生成对抗网络(GAN)在CIFAR-10数据集上的应用
  • 计算机毕业设计PyHive+Hadoop深圳共享单车预测系统 共享单车数据分析可视化大屏 共享单车爬虫 共享单车数据仓库 机器学习 深度学习
  • STM32-笔记34-4G遥控灯
  • Golang:使用minio替代文件系统实战教程
  • NLP CH3复习
  • springboot3 性能优化
  • 1.4 spring八股文学习
  • 机器学习基础例子篇
  • 如何通过 5 种有用的方法将 iPhone 连接到戴尔笔记本电脑?
  • PDF文件提示-文档无法打印-的解决办法
  • 跟着问题学3.3——Faster R-CNN详解及代码实战(1)
  • 【AimRT】AimRT Hello World
  • 【Matlab算法】基于改进人工势场法的移动机器人路径规划研究(附MATLAB完整代码)
  • 【计算机视觉技术 - 人脸生成】2.GAN网络的构建和训练
  • 超越YOLO11!DEIM:先进的实时DETR目标检测
  • 服务器信息整理
  • 源代码编译安装X11及相关库、vim,配置vim(1)
  • IDEA 社区版 SpringBoot不能启动
  • QML自定义滑动条Slider的样式