RocketMQ保证消息有序性
引言
在某些业务场景中,MQ需要保证消息的顺序性,比如支付订单应该在创建订单之后进行
如果不使用保证顺序的手段,由于多队列、网络等因素可能会导致先处理支付订单的消息再处理创建订单的消息,这样就会导致处理失败
为了避免这样的情况发生,使用MQ时有必要保证消息的顺序性,在RocketMQ中通常使用顺序发送消息和顺序消费消息来保证消息的顺序性
生产者端保证消息有序
当队列全局只有一个时,消息全局有序,此时只需要确保为单个生产者发送(多个生产者同时发送无法预估消息到达的顺序)
或者先生产创建订单的消息再生产支付订单的消息(确保消息不丢)由于全局有序只能有一个队列,队列的压力过大,所以不经常使用
更通用的做法是使用队列有序:在发送消息时通过一定的路由算法将需要有序的消息分发到同一个队列中,使用相同的队列保证有序性
顺序消息分类
RocketMQ 提供了两种顺序消息,即全局顺序消息和分区顺序消息。
-
全局顺序消息:指某个 Topic 下的所有消息都要保证顺序。在这种情况下,Topic 内部只能有一个队列,生产者将消息顺序发送到这个唯一的队列中。因为队列本身是 FIFO(先进先出)的数据结构,所以进入队列的消息天然有序。不过,由于只有一个队列,会导致并发度很低,吞吐量也会受到限制,因此全局顺序消息的使用场景相对较少。
-
分区顺序消息:指在一个分区(队列)内的消息有序,不同分区之间的消息可以无序。这是更常见的使用方式,既保证了一定程度的顺序性,又能通过多个分区提高系统的并发能力和吞吐量。
实现分区顺序消息的发送
为了实现分区顺序消息的发送,生产者在发送消息时需要指定消息的路由规则,确保同一业务逻辑下的消息被发送到同一个队列中。例如,在电商系统中,一个订单的不同状态变更消息需要保证顺序,那么可以使用订单 ID 作为消息的路由键。
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
//参数校验
Validators.checkMessage(msg, this.defaultMQProducer);
//获取topic
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
try {
//获取队列
List<MessageQueue> messageQueueList =
mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
Message userMessage = MessageAccessor.cloneMessage(msg);
String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
userMessage.setTopic(userTopic);
//选择队列
mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
} catch (Throwable e) {
throw new MQClientException("select message queue threw exception.", e);
}
//超时判断
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout < costTime) {
throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
}
if (mq != null) {
//发送消息
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
} else {
throw new MQClientException("select message queue return null.", null);
}
}
//失败
validateNameServerSetting();
throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}
在上述代码中,通过 MessageQueueSelector
实现了消息的路由选择,根据 orderId
选择对应的队列,确保相同 orderId
的消息发送到同一个队列中。
消费者端保证消息有序
前文说过消费者消息消息时,为了全力以赴通常都是使用线程池进行并发消费的
当一批顺序消息被同时拉取到消费者时,如果由线程池并发进行消费也会导致消息的顺序性失效
因此在消费端也需要进行顺序消费,使用DefaultMQPushConsumer进行消费时,设置消息监听器为MessageListenerOrderly
在顺序消费的文章中也说过:设置消息监听器为MessageListenerOrderly时,会通过多种加锁的方式保证消费者顺序消费队列中的消息
但如果消费发生失败会阻塞队列导致消息堆积,因此需要注意特殊处理,比如重试次数超过阈值时就记录下来后续再处理
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
try {
for (MessageExt msg : msgs) {
// 获取消息的重试次数
int retryCount = msg.getReconsumeTimes();
System.out.println("Message [" + msg.getMsgId() + "] is reconsumed " + retryCount + " times");
//如果重试次数超过阈值 记录
if (retryCount >= 3) {
System.out.println("Message [" + msg.getMsgId() + "] add DB");
}
// 模拟消费失败
if (retryCount < 3) {
throw new RuntimeException("Consume failed");
}
// 消费成功
System.out.println("Message [" + msg.getMsgId() + "] consumed successfully");
}
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e) {
// 记录日志
e.printStackTrace();
// 返回重试状态
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
});
顺序消费模式
RocketMQ 的消费者提供了顺序消费模式,消费者在这种模式下会按照消息在队列中的顺序依次消费。消费者会对每个队列加锁,保证同一时间只有一个线程在处理该队列中的消息,从而确保消息消费的顺序性。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class OrderedMessageConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("OrderedTopic", "*");
// 设置为顺序消费模式
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
在上述代码中,通过 MessageListenerOrderly
实现了顺序消费的逻辑,消费者会按顺序处理接收到的消息。
消费异常处理
在顺序消费过程中,如果出现消费异常,需要进行合理的处理,以保证消息顺序不受影响。RocketMQ 提供了重试机制,当消费失败时,消息会被重新放入队列中,等待下次消费。消费者需要确保在处理异常时不会破坏消息的顺序。