当前位置: 首页 > article >正文

RocketMQ 中如何实现消息的可靠传递?

引言

作为头部消息队列开源中间件,学习其中的技术方案并且总结可靠性和健壮性,提升我们的架构思维和解决问题的能力 。

在 RocketMQ 中实现消息的可靠传递可以从多个方面入手,涵盖生产者、Broker 以及消费者等不同环节。

 

生产者端

1. 同步发送消息

生产者使用同步发送模式时,会等待 Broker 返回发送结果,确保消息成功发送到 Broker 才会继续后续操作。若发送失败,生产者可以进行重试。

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;

public class SyncProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes("UTF-8"));
        try {
            // 同步发送消息
            producer.send(msg);
        } catch (Exception e) {
            // 发送失败,可进行重试等处理
            e.printStackTrace();
        }
        producer.shutdown();
    }
}

2. 重试机制

生产者在发送消息失败时,可配置重试次数。RocketMQ 支持自动重试,当遇到网络抖动、Broker 临时不可用等情况时,会自动尝试重新发送消息。

producer.setRetryTimesWhenSendFailed(3); // 设置发送失败时的重试次数为 3 次

3. 消息幂等性处理

为避免因重试导致消息重复发送,生产者可以为每条消息生成唯一的 ID。Broker 在接收消息时,会根据消息 ID 进行去重处理,确保相同 ID 的消息只被处理一次。

Broker 端

1. 刷盘策略

  • 同步刷盘:当 Broker 收到消息后,会先将消息写入磁盘,再返回响应给生产者。这种策略保证了消息不会因 Broker 异常重启而丢失,但会降低系统的吞吐量。
    flushDiskType = SYNC_FLUSH
  • 异步刷盘:Broker 收到消息后,先将消息写入内存缓冲区,然后立即返回响应给生产者,由专门的线程将消息异步写入磁盘。这种策略性能较高,但在 Broker 异常崩溃时,可能会丢失部分内存中的消息。

    2. 主从复制

    RocketMQ 支持主从复制架构,主 Broker 接收消息后,会将消息同步复制到从 Broker。当主 Broker 出现故障时,可以切换到从 Broker 继续提供服务,保证消息的可用性。

    brokerRole = SYNC_MASTER # 主 Broker 配置为同步主节点
    brokerRole = SLAVE # 从 Broker 配置为从节点

    消费者端

    1. 手动提交消费偏移量

    消费者在处理完消息后,手动向 Broker 提交消费偏移量,确保只有在消息处理成功后才更新消费进度。这样,当消费者出现异常时,可以从上次提交的偏移量处继续消费,避免消息丢失。

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class ManualCommitConsumer {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
            consumer.setNamesrvAddr("localhost:9876");
            consumer.subscribe("TopicTest", "*");
    
            // 手动提交消费偏移量
            consumer.setAutoCommit(false);
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        try {
                            // 处理消息
                            System.out.println(new String(msg.getBody()));
                        } catch (Exception e) {
                            // 处理失败,返回重试
                            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                        }
                    }
                    // 手动提交消费偏移量
                    context.setAckIndex(msgs.size() - 1);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.start();
        }
    }

    2. 消费重试机制

    当消费者处理消息失败时,RocketMQ 会自动进行重试。消费者可以根据业务需求,设置重试次数和重试间隔,确保消息能够被成功处理。

    3. 幂等消费

    消费者在处理消息时,要保证消息的幂等性,即多次处理相同的消息不会产生额外的影响。可以通过消息 ID 或业务唯一标识来判断消息是否已经处理过,避免重复处理。

总结

  1. 持久化策略:内存注定是不可靠的,刷盘一定是可靠性首选,但是刷盘导致的IO延时如何优化,是评判中间件性能的关键。
  2. 重试机制:3次重试应该是各个开源框架的默认重试次数。
  3. 集群化策略:单个节点注定不是高可用的最终形态,主从复制多节点可靠是最终态。
  4. 幂等机制:保持消息的重复消费可靠性,幂等键或者其他策略都是可参考的。

http://www.kler.cn/a/525485.html

相关文章:

  • 全程Kali linux---CTFshow misc入门(14-24)
  • Spring Boot - 数据库集成05 - 集成MongoDB
  • 【UE插件】Sphinx关键词语音识别
  • Vue.js组件开发-实现下载时暂停恢复下载
  • doris:HLL
  • 实现B-树
  • C++,STL 简介:历史、组成、优势
  • 9.1 LangChain深度解析:大模型应用开发的“万能胶水”与核心架构设计
  • 数论问题77一一3x+1问题
  • 【deepseek实战】绿色好用,不断网
  • UE5制作视差图
  • 热更新杂乱记
  • Android车机DIY开发之学习篇(七)NDK交叉工具构建
  • 数据结构---哈希表
  • Linux - 常用的I/O 多路复用技术 select, poll, epoll
  • PyTorch 与 Python 版本对应关系
  • hive:基本数据类型,关于表和列语法
  • Unity敌人逻辑笔记
  • 推动知识共享的在线知识库实施与优化指南
  • java实现mysql数据库备份还原定时删除过期备份文件
  • JavaScript图像处理,JavaScript实现高斯滤波图像处理算法
  • http://noi.openjudge.cn/——4.2算法之数论——2419:Coins
  • 【面试】【前端】SSR与SPA的优缺点
  • doris:Bitmap
  • 3.4 Go函数作用域(标识符)
  • 【C++】内联函数inline、关键字auto与新式for