RocketMQ发送和接收方式详解
RocketMQ有几种发送方式
RocketMQ 提供了几种不同的消息发送方式,以满足不同场景下的需求。这些发送方式主要包括:
同步发送(Synchronous):这是最常见的一种发送方式,客户端发送消息后,会等待服务器端的响应。只有当消息完全被服务器接收,客户端才会继续执行。这种方式适用于对可靠性要求较高的场景。
异步发送(Asynchronous):客户端在发送消息后不会立即等待服务器的响应,而是通过一个回调接口来处理服务器的响应,这样可以提高消息发送的吞吐量。这种方式适用于对响应时间要求较敏感的场景。
单向发送(One-way):这种方式是最轻量级的一种发送方式,客户端只负责发送消息,不等待服务器的响应,也不关心消息是否被服务器接收。这种方式的可靠性最低,但吞吐量最高,适用于对可靠性要求不高的日志收集等场景。
顺序发送(Ordered):RocketMQ 还支持顺序发送消息,确保特定主题的消息能按照发送的顺序被消费。这对于需要保证消息顺序的业务场景(如订单处理)非常重要。**顺序必须是同步的**
这些发送方式可以根据具体的业务需求和场景来选择使用,以达到最佳的性能和可靠性平衡。
不同发送方式的调用方式或者不同配置
在使用 RocketMQ 进行消息发送时,可以根据不同的发送需求选择合适的发送方式。以下是不同发送方式的基本调用方式或配置示例:
1. 同步发送(Synchronous)
同步发送是指消息发送方将消息发送给服务器后,会等待服务器的响应,确认消息已经被接收后才继续执行后续操作。
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 设置NameServer地址
producer.setNamesrvAddr("nameserver1:9876");
// 启动Producer实例
producer.start();
// 创建消息实例,指定topic,tag和消息体
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
// 调用send方法发送消息,并获取发送结果
SendResult sendResult = producer.send(msg);
// 打印发送结果
System.out.printf("%s%n", sendResult);
// 当不再发送消息时,关闭Producer实例
producer.shutdown();
2. 异步发送(Asynchronous)
异步发送是指消息发送方发送消息后,不会立即等待服务器的响应,而是提供一个回调接口,服务器响应时通过回调接口来通知发送方。
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 设置NameServer地址
producer.setNamesrvAddr("nameserver1:9876");
// 启动Producer实例
producer.start();
// 创建消息实例,指定topic,tag和消息体
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
// 异步发送消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 消息发送成功的回调
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
// 消息发送失败的回调
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
// 关闭Producer实例
producer.shutdown();
3. 单向发送(One-way)
单向发送是指消息发送方只负责发送消息,不等待服务器的响应也不关心消息是否被服务器接收,适用于对可靠性要求不高的场景。
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 设置NameServer地址
producer.setNamesrvAddr("nameserver1:9876");
// 启动Producer实例
producer.start();
// 创建消息实例,指定topic,tag和消息体
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
// 单向发送消息,没有返回结果
producer.sendOneway(msg);
// 关闭Producer实例
producer.shutdown();
4. 顺序发送(Ordered)
顺序发送保证了特定主题的消息能按照发送的顺序来消费。
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 设置NameServer地址
producer.setNamesrvAddr("nameserver1:9876");
// 启动Producer实例
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
// 订单列表
List<OrderStep> orderList = new ArrayList<OrderStep>();
orderList.add(new OrderStep(15103111039L, "创建"));
orderList.add(new OrderStep(15103111065L, "支付"));
orderList.add(new OrderStep(15103111039L, "完成"));
for (int i = 0; i < 10; i++) {
// 加入订单的时间戳来模拟简单的订单ID
String body = orderList.get(i % orderList.size()) + " 时间:" + System.currentTimeMillis();
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; // 根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i % orderList.size()).getOrderId()); // 订单id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
// 关闭Producer实例
producer.shutdown();
每种发送方式都有其适用的场景,可以根据实际的业务需求选择使用。在实际开发中,需要根据具体的业务逻辑对示例代码进行适当的调整和优化。
不同接收方式的调用方式或者不同配置
1. 集群消费(Clustering)
集群消费模式下,同一个消费者组中的多个消费者实例共同消费主题下的消息,每条消息只会被消费一次。
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
// 设置NameServer地址
consumer.setNamesrvAddr("nameserver1:9876");
// 订阅一个或多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("TopicTest", "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 消费消息
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
}
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
2. 广播消费(Broadcasting)
广播消费模式下,消息会被消费者组中的每个消费者都消费一次。
// 实例化消费者,并设置消费模式为广播模式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setNamesrvAddr("nameserver1:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
3. 顺序消费(Orderly)
顺序消费保证了特定主题的消息能按照发送的顺序来消费。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("nameserver1:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s, QueueId: %d %n", Thread.currentThread().getName(), new String(msg.getBody()), msg.getQueueId());
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
4. 延迟消费
延迟消费不是通过消费者的特定设置来实现的,而是在发送消息时设置消息的延迟等级。
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("nameserver1:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes());
// 设置延迟等级3,这会使消息延迟10s再被消费
msg.setDelayTimeLevel(3);
SendResult sendResult = producer.send(msg);
producer.shutdown();
接收延迟消息的代码与普通消息的接收方式相同,不需要特殊配置。
5. 重试和死信队列
对于处理失败的消息,RocketMQ 会自动重试,无需特别配置。如果重试次数达到上限仍然失败,消息会被转移到死信队列。消费死信队列中的消息需要订阅特定的Topic(%DLQ%+消费者组名)。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("nameserver1:9876");
// 订阅死信队列
consumer.subscribe("%DLQ%ConsumerGroupName", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("DLQ Message: %s %n", new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
不同的接收方式适用于不同的业务场景,可以根据实际需求选择最合适的方式。在实际应用中,可能需要结合业务逻辑对示例代码进行适当的调整和优化。