当前位置: 首页 > article >正文

RocketMQ发送和接收方式详解

RocketMQ有几种发送方式

RocketMQ 提供了几种不同的消息发送方式,以满足不同场景下的需求。这些发送方式主要包括:

同步发送(Synchronous):这是最常见的一种发送方式,客户端发送消息后,会等待服务器端的响应。只有当消息完全被服务器接收,客户端才会继续执行。这种方式适用于对可靠性要求较高的场景。

异步发送(Asynchronous):客户端在发送消息后不会立即等待服务器的响应,而是通过一个回调接口来处理服务器的响应,这样可以提高消息发送的吞吐量。这种方式适用于对响应时间要求较敏感的场景。

单向发送(One-way):这种方式是最轻量级的一种发送方式,客户端只负责发送消息,不等待服务器的响应,也不关心消息是否被服务器接收。这种方式的可靠性最低,但吞吐量最高,适用于对可靠性要求不高的日志收集等场景。

顺序发送(Ordered):RocketMQ 还支持顺序发送消息,确保特定主题的消息能按照发送的顺序被消费。这对于需要保证消息顺序的业务场景(如订单处理)非常重要。**顺序必须是同步的**

这些发送方式可以根据具体的业务需求和场景来选择使用,以达到最佳的性能和可靠性平衡。

不同发送方式的调用方式或者不同配置

在使用 RocketMQ 进行消息发送时,可以根据不同的发送需求选择合适的发送方式。以下是不同发送方式的基本调用方式或配置示例:

1. 同步发送(Synchronous)
同步发送是指消息发送方将消息发送给服务器后,会等待服务器的响应,确认消息已经被接收后才继续执行后续操作。

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 设置NameServer地址
producer.setNamesrvAddr("nameserver1:9876");
// 启动Producer实例
producer.start();

// 创建消息实例,指定topic,tag和消息体
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));

// 调用send方法发送消息,并获取发送结果
SendResult sendResult = producer.send(msg);

// 打印发送结果
System.out.printf("%s%n", sendResult);

// 当不再发送消息时,关闭Producer实例
producer.shutdown();

2. 异步发送(Asynchronous)
异步发送是指消息发送方发送消息后,不会立即等待服务器的响应,而是提供一个回调接口,服务器响应时通过回调接口来通知发送方。

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 设置NameServer地址
producer.setNamesrvAddr("nameserver1:9876");
// 启动Producer实例
producer.start();

// 创建消息实例,指定topic,tag和消息体
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));

// 异步发送消息
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        // 消息发送成功的回调
        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
    }

    @Override
    public void onException(Throwable e) {
        // 消息发送失败的回调
        System.out.printf("%-10d Exception %s %n", index, e);
        e.printStackTrace();
    }
});

// 关闭Producer实例
producer.shutdown();

3. 单向发送(One-way)
单向发送是指消息发送方只负责发送消息,不等待服务器的响应也不关心消息是否被服务器接收,适用于对可靠性要求不高的场景。

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 设置NameServer地址
producer.setNamesrvAddr("nameserver1:9876");
// 启动Producer实例
producer.start();

// 创建消息实例,指定topic,tag和消息体
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));

// 单向发送消息,没有返回结果
producer.sendOneway(msg);

// 关闭Producer实例
producer.shutdown();

4. 顺序发送(Ordered)
顺序发送保证了特定主题的消息能按照发送的顺序来消费。

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 设置NameServer地址
producer.setNamesrvAddr("nameserver1:9876");
// 启动Producer实例
producer.start();

String[] tags = new String[]{"TagA", "TagB", "TagC"};
// 订单列表
List<OrderStep> orderList = new ArrayList<OrderStep>();
orderList.add(new OrderStep(15103111039L, "创建"));
orderList.add(new OrderStep(15103111065L, "支付"));
orderList.add(new OrderStep(15103111039L, "完成"));

for (int i = 0; i < 10; i++) {
    // 加入订单的时间戳来模拟简单的订单ID
    String body = orderList.get(i % orderList.size()) + " 时间:" + System.currentTimeMillis();
    Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            Long id = (Long) arg;  // 根据订单id选择发送queue
            long index = id % mqs.size();
            return mqs.get((int) index);
        }
    }, orderList.get(i % orderList.size()).getOrderId());  // 订单id

    System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
            sendResult.getSendStatus(),
            sendResult.getMessageQueue().getQueueId(),
            body));
}

// 关闭Producer实例
producer.shutdown();

每种发送方式都有其适用的场景,可以根据实际的业务需求选择使用。在实际开发中,需要根据具体的业务逻辑对示例代码进行适当的调整和优化。

不同接收方式的调用方式或者不同配置

1. 集群消费(Clustering)
集群消费模式下,同一个消费者组中的多个消费者实例共同消费主题下的消息,每条消息只会被消费一次。

// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
// 设置NameServer地址
consumer.setNamesrvAddr("nameserver1:9876");
// 订阅一个或多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("TopicTest", "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            // 消费消息
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
        }
        // 标记该消息已经被成功消费
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
// 启动消费者实例
consumer.start();

2. 广播消费(Broadcasting)
广播消费模式下,消息会被消费者组中的每个消费者都消费一次。

// 实例化消费者,并设置消费模式为广播模式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setNamesrvAddr("nameserver1:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

3. 顺序消费(Orderly)
顺序消费保证了特定主题的消息能按照发送的顺序来消费。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("nameserver1:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        for (MessageExt msg : msgs) {
            System.out.printf("%s Receive New Messages: %s, QueueId: %d %n", Thread.currentThread().getName(), new String(msg.getBody()), msg.getQueueId());
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});
consumer.start();

4. 延迟消费
延迟消费不是通过消费者的特定设置来实现的,而是在发送消息时设置消息的延迟等级。

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("nameserver1:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes());
// 设置延迟等级3,这会使消息延迟10s再被消费
msg.setDelayTimeLevel(3);
SendResult sendResult = producer.send(msg);
producer.shutdown();

接收延迟消息的代码与普通消息的接收方式相同,不需要特殊配置。

5. 重试和死信队列
对于处理失败的消息,RocketMQ 会自动重试,无需特别配置。如果重试次数达到上限仍然失败,消息会被转移到死信队列。消费死信队列中的消息需要订阅特定的Topic(%DLQ%+消费者组名)。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("nameserver1:9876");
// 订阅死信队列
consumer.subscribe("%DLQ%ConsumerGroupName", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            System.out.printf("DLQ Message: %s %n", new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

不同的接收方式适用于不同的业务场景,可以根据实际需求选择最合适的方式。在实际应用中,可能需要结合业务逻辑对示例代码进行适当的调整和优化。


http://www.kler.cn/a/274089.html

相关文章:

  • QT-------认识QT
  • LLM预训练recipe — 摘要版
  • 简单讲解关于微信小程序调整 miniprogram 后, tabbar 找不到图片的原因之一
  • 大模型应用技术系列(三): 深入理解大模型应用中的Cache:GPTCache
  • C++的封装(十四):《设计模式》这本书
  • AIGC:生成图像动力学
  • 从基础入门到学穿C++
  • <JavaEE> 了解网络层协议 -- IP协议
  • 【蓝桥杯每日一题】填充颜色超详细解释!!!
  • AWS监控,AWS 性能监控工具
  • 【日常记录】【插件】使用ColorThief,跟随图片变化改变网页背景
  • JDK1.8超详细安装教程
  • Json Web Token(JWT) 快速入门
  • Android 13 源码编译及报错修复
  • 【C++庖丁解牛】继承的概念及定义 | 继承中的作用域 | 继承与友元继承与静态成员 | 复杂的菱形继承及菱形虚拟继承
  • Ubuntu双系统/home分区扩容
  • 期权希腊字母
  • clipboard好用的复制剪切库
  • springcloud gateway
  • 一文掌握python函数式编程及应用实例(超详细及超多应用实例)(一)
  • RPC 和 序列化
  • 05 龙芯平台openstack部署搭建-placement部署
  • 【系统架构师】-第19章-大数据架构设计理论与实践
  • STP环路避免实验(思科)
  • 代码随想录day20(2)二叉树:完全二叉树节点个数(leetcode222)
  • 创建存储过程,与存储过程调用