rocketmq-pull模式-消费重平衡和拉取PullTaskImpl线程
1、观察consumer的线程模型
使用arthas分析
-
MQClientFactoryScheduledThread 定时任务线程 (和push模式一致)
定时任务线程,包含如下任务:
每2分钟更新nameServer列表
每30秒更新topic的路由信息
每30秒检查broker的存活,发送心跳请求
每5秒持久化消费队列的offset。如果是广播模式,持久化在本地;如果是集群模式,反馈给broker
每分钟调整线程池大小(实际上并没有作用。因为最终执行是空方法) -
PullMessageService 这个线程是专门给push模式使用的。在pull模式下,没有作用
-
RebalanceService 重平衡线程。每20秒执行一次
pull模式和push模式都用该线程。区别在于messageQueue变更后,处理有区别。
详细看
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#messageQueueChanged
后面的类图也会红色标记出来 -
PullMsgThread-lite_pull_consumer_testN线程
pull模式下,每个队列messageQueue会包装成一个PullTaskImpl任务,从broker中拉取msg。放到线程池中死循环执行。这是线程名称。下文有描述
整体工作流程图如下:
步骤分析
步骤1、启动后触发重平衡线程RebalanceService
重平衡任务org.apache.rocketmq.client.impl.consumer.RebalanceService#run()
重平衡线程获取到topic对应的Set<> mqSet,当前消费者组group的所有消费者List<> cidAll
cid就是消费端的唯一标识。格式如下:“ip@pid#时间戳”,比如127.01.01.01@1723#2926328724786400
关于重平衡的前期部分,已经在push模式有描述,参考文章rocketmq-push模式-消费侧重平衡-类流程图分析
不再详述
步骤2、messageQueue有更新,调用MessageQueueListener
监听器类org.apache.rocketmq.client.consumer.MessageQueueListener是专门给pull模式使用的
public void updateAssignQueueAndStartPullTask(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
switch (messageModel) {
case BROADCASTING:
updateAssignedMessageQueue(topic, mqAll);
updatePullTask(topic, mqAll);
break;
case CLUSTERING:
updateAssignedMessageQueue(topic, mqDivided);
updatePullTask(topic, mqDivided);
break;
default:
break;
}
}
监听变化后,会做两件事updateAssignedMessageQueue和updatePullTask
步骤3、更新updateAssignedMessageQueue
org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl#assignedMessageQueue
该对象是pull模式专用。
public class AssignedMessageQueue {
private final ConcurrentHashMap<MessageQueue, MessageQueueState> assignedMessageQueueState;
private RebalanceImpl rebalanceImpl;
public AssignedMessageQueue() {
assignedMessageQueueState = new ConcurrentHashMap<MessageQueue, MessageQueueState>();
}
public void setRebalanceImpl(RebalanceImpl rebalanceImpl) {
this.rebalanceImpl = rebalanceImpl;
}
从该类来看,最重要的字段是assignedMessageQueueState,存储的是订阅的所有topic的所有MessageQueue,做map存储。
MessageQueueState的信息如下
private class MessageQueueState {
private MessageQueue messageQueue;
private ProcessQueue processQueue;
private volatile boolean paused = false;
private volatile long pullOffset = -1;
private volatile long consumeOffset = -1;
private volatile long seekOffset = -1;
private MessageQueueState(MessageQueue messageQueue, ProcessQueue processQueue) {
this.messageQueue = messageQueue;
this.processQueue = processQueue;
}
pullOffset 、consumeOffset 、seekOffset 这几个字段有比较大的作用,消费过程中,有多处会设置这几个值。较复杂,后续再做统一的分析
updateAssignedMessageQueue()方法,主要的目的是对assignedMessageQueueState做处理,重平衡后做新增add或者移除操作
步骤4、更新updatePullTask
private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
if (next.getKey().getTopic().equals(topic)) {
if (!mqNewSet.contains(next.getKey())) {
next.getValue().setCancelled(true);
it.remove();
}
}
}
startPullTask(mqNewSet);
}
用到的队列MessageQueue,taskTable会维护对应的PullTaskImpl线程任务。
org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl#taskTable
针对该topic,如果当前内存的taskTable中的MessageQueue已经不再有效,就会将PullTaskImpl的状态设置成Cancel,使得对应线程终止运行。
startPullTask()
private void startPullTask(Collection<MessageQueue> mqSet) {
for (MessageQueue messageQueue : mqSet) {
if (!this.taskTable.containsKey(messageQueue)) {
PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
this.taskTable.put(messageQueue, pullTask);
this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
}
}
}
是将当前队列,包装成PullTaskImpl的线程任务,提交给线程池scheduledThreadPoolExecutor。在arthas上,线程名称就是PullMsgThread-lite_pull_consumer_testN。核心线程数是20
步骤5、PullTaskImpl线程执行
对队列做流控策略。检查cachedMessageCount、cachedMessageSizeInMiB、processQueue.getMaxSpan()
如果不满足要求,则延时再做任务
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
}
return;
}
if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
}
return;
}
if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), queueMaxSpanFlowControlTimes);
}
return;
}
步骤6、同步请求broker,拉取消息
try {
SubscriptionData subscriptionData;
String topic = this.messageQueue.getTopic();
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
} else {
String subExpression4Assign = topicToSubExpression.get(topic);
subExpression4Assign = subExpression4Assign == null ? SubscriptionData.SUB_ALL : subExpression4Assign;
subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression4Assign);
}
PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
if (this.isCancelled() || processQueue.isDropped()) {
return;
}
switch (pullResult.getPullStatus()) {
case FOUND:
final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
synchronized (objLock) {
if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
processQueue.putMessage(pullResult.getMsgFoundList());
submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
}
}
break;
case OFFSET_ILLEGAL:
log.warn("The pull request offset illegal, {}", pullResult.toString());
break;
default:
break;
}
updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue);
} catch (InterruptedException interruptedException) {
关键代码是PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
该方法是同步调用,默认一次拉取10条消息
步骤7、将消息msg放到consumeRequestCache中
private void submitConsumeRequest(ConsumeRequest consumeRequest) {
try {
consumeRequestCache.put(consumeRequest);
} catch (InterruptedException e) {
log.error("Submit consumeRequest error", e);
}
}
org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl#consumeRequestCache
步骤8、任务PullTaskImpl循环执行
PullTaskImpl的run方法,最后是将this,又重新投到线程池中,实现循环执行。
if (!this.isCancelled()) {
scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
} else {
log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
}
查看官方消费的例子,调用poll方法,从consumeRequestCache上获取msg
public class LitePullConsumerSubscribe {
public static volatile boolean running = true;
public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.start();
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.println(new Date());
System.out.printf("%s%n", messageExts);
}
} finally {
litePullConsumer.shutdown();
}
}
}
关键代码是litePullConsumer.poll();死循环执行。
poll方法,从consumeRequestCache上获取msg
public synchronized List<MessageExt> poll(long timeout) {
try {
checkServiceState();
if (timeout < 0) {
throw new IllegalArgumentException("Timeout must not be negative");
}
if (defaultLitePullConsumer.isAutoCommit()) {
maybeAutoCommit();
}
long endTime = System.currentTimeMillis() + timeout;
ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (endTime - System.currentTimeMillis() > 0) {
while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (endTime - System.currentTimeMillis() <= 0) {
break;
}
}
}
总结
后续有空再总结。主要是比较push和pull模式。
后续分析
offset在消费拉取过程中,是如何使用的?和broker是如何交互的?