当前位置: 首页 > article >正文

RocketMQ 学习笔记01

一、MQ简介

1. 什么是MQ?

MQ(Message Queue,消息队列) 是一种在分布式系统中用于实现进程间通信和数据传输的中间件。它通过在不同进程或应用程序之间传递消息,实现数据的异步处理、解耦和削峰填谷等功能。MQ广泛应用于互联网、高并发场景下的数据传输和处理。

2. MQ的核心概念

  • Message(消息)

    • 定义:在不同进程或应用程序之间传递的数据单元。
    • 特性:可以包含任意格式的数据,如文本、二进制等。
    • 传输方式:消息可以在同一台机器上的不同进程间传递,也可以在不同机器上的进程间传递。
  • Queue(队列)

    • 定义:一种具有先进先出(FIFO)特性的缓存数据结构,用于存储和管理消息。
    • 功能:消息队列负责暂存消息,确保消息按照一定的顺序被处理。
    • 特性:除了FIFO,部分MQ实现还支持优先级队列、延迟队列等。

3. MQ的作用

MQ在系统架构中扮演着重要角色,主要体现在以下几个方面:

  • 异步处理

    • 概念:生产者发送消息后,不需要等待消费者处理完成即可继续其他操作。
    • 优势:提高系统的响应速度和吞吐量,避免因单个服务的阻塞导致整个系统性能下降。
    • 例子:快递员通过菜鸟驿站实现快递的异步配送,快递员只需将包裹放入驿站即可继续派送其他包裹,客户可根据自己的时间安排取件。
  • 解耦

    • 概念:通过消息队列,生产者和消费者之间不直接交互,减少了彼此的依赖。
    • 优势
      1. 降低耦合度:服务之间独立运行,互不影响,提高系统整体的稳定性和可维护性。
      2. 灵活扩展:可以根据需求灵活增加或减少消费者数量,而不影响生产者的运行。
    • 例子:图书《Thinking in JAVA》的翻译过程,通过编辑团队将英文内容翻译成其他语言,实现不同语言版本的独立发布和管理。
  • 削峰填谷

    • 概念:通过消息队列缓冲瞬时的高并发请求,平滑系统负载,避免系统资源被瞬时大量请求耗尽。
    • 优势:提高系统的稳定性,防止因瞬时流量激增导致系统崩溃。
    • 例子:长江每年洪水期通过三峡大坝进行水量调控,将高峰期的洪水储存起来,平稳地释放到下游,避免下游地区被洪水淹没。

二、RocketMQ产品特点

1. RocketMQ介绍

RocketMQ 是由阿里巴巴开源的一款高性能、高可靠的分布式消息中间件。经过阿里内部多次大规模业务场景(如双十一)的验证,RocketMQ能够处理亿万级别的消息。2016年,RocketMQ开源并捐赠给Apache基金会,现已成为Apache的顶级项目。

发展历程

  • 早期阶段

    • 阿里巴巴最初使用 ActiveMQ 作为消息中间件,但随着业务量的增长,ActiveMQ的IO性能迅速达到瓶颈。
  • 探索Kafka

    • 阿里巴巴开始关注 Apache Kafka,但发现Kafka主要针对日志收集场景,且在多Topic情况下,Partition文件过多导致文件索引耗时增加,严重影响IO性能,不完全适合阿里的业务需求。
  • 自研消息中间件

    • 为了解决上述问题,阿里巴巴决定自研消息中间件,最初命名为 MetaQ,后来更名为 RocketMQ。RocketMQ最初的目标是解决多Topic下的IO性能压力,经过阿里内部不断改进,RocketMQ逐渐展现出独特的优势。

2. RocketMQ特点

RocketMQ在众多MQ产品中脱颖而出,具有以下显著特点:

优点缺点适合场景
高吞吐量、高性能服务加载较慢几乎全场景,特别适合金融场景
高可用性
功能全面(支持广播消费、延迟队列、死信队列、事务消息等)
客户端协议丰富
Java语言开发,便于定制

与其他主流MQ产品对比

  • Apache Kafka

    • 优点:吞吐量极高,性能卓越,集群高可用。
    • 缺点:可能存在数据丢失风险,功能相对单一。
    • 适用场景:日志分析、大数据采集等。
  • RabbitMQ

    • 优点:消息可靠性高,功能丰富。
    • 缺点:基于Erlang语言开发,定制困难;吞吐量相对较低。
    • 适用场景:企业内部小规模服务调用。
  • Apache Pulsar

    • 优点:基于Bookkeeper构建,消息可靠性高。
    • 缺点:周边生态尚不完善,使用企业较少。
    • 适用场景:企业内部大规模服务调用。
  • Apache RocketMQ

    • 优点:高吞吐、高性能、高可用;功能全面;客户端协议丰富;Java语言开发,便于定制。
    • 缺点:服务加载较慢。
    • 适用场景:几乎全场景,尤其适合金融场景。

RocketMQ的核心优势

  • 高吞吐量与高性能
    • RocketMQ在处理亿万级别消息时,表现出色,吞吐量虽略低于Kafka,但远超RabbitMQ。
  • 高可靠性
    • 消息可靠性较Kafka有显著提升,确保消息不丢失,特别适用于金融等高可靠性需求场景。
  • 丰富的高级功能
    • 支持广播消费、延迟队列、死信队列、事务消息等,满足复杂业务需求。
  • 适用于高并发和高可靠性场景
    • 经过阿里巴巴内部大规模双十一等高并发场景的严格测试,RocketMQ适用于需要高可靠性和高可用性的业务场景,尤其是金融行业。

三、RocketMQ快速实战

通过以下步骤,可以快速搭建RocketMQ服务并实现消息的发送与接收。

1. 快速搭建RocketMQ服务

1.1 RocketMQ的下载与安装
  • 官网下载地址:RocketMQ官网
  • 下载页面:下载链接

版本选择

  • 最新版本:5.x版本,专注于云原生,具备众多新特性,但企业中应用较少。
  • 稳定版本:4.9.5版本,更为稳定,适合企业环境。

注意:2020年下半年,RocketMQ发布了5.0大版本,带来了重大功能升级,如定时消息发送时间精确化、支持Grpc协议、多种集群模式等。但由于功能强大,问题也较多,企业多采用内部优化版或继续使用稳定的4.9.5版本。

1.2 解压并上传到服务器
  1. 下载运行包:选择适合的版本(如4.9.5)下载后解压。

  2. 上传到服务器:将解压后的文件夹上传到服务器上的 /app/rocketmq 目录。

    tar -zxvf rocketmq-all-4.9.5-bin-release.tar.gz -C /app/rocketmq/
    
1.3 运行环境配置

内存配置

  • 生产环境:建议至少12G内存,确保RocketMQ运行稳定。
  • 学习阶段:如果服务器内存不足,可调整Java进程的内存大小。

修改启动脚本

  1. 编辑 runserver.sh

    vi /app/rocketmq/rocketmq-all-4.9.5-bin-release/bin/runserver.sh
    

    找到并修改内存配置:

    JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    
  2. 编辑 runbroker.sh

    vi /app/rocketmq/rocketmq-all-4.9.5-bin-release/bin/runbroker.sh
    

    修改内存配置:

    JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g"
    

JDK安装

  • RocketMQ基于Java开发,需提前安装JDK(建议1.8版本)。

  • 安装示例

    yum install java-1.8.0-openjdk -y
    
1.4 启动RocketMQ服务

NameServer与Broker的作用

  • NameServer:类似于DNS,负责管理Broker的地址信息,提供路由服务。
  • Broker:核心服务,负责消息的存储、转发和消费。

启动步骤

  1. 启动NameServer

    cd /app/rocketmq/rocketmq-all-4.9.5-bin-release
    nohup bin/mqnamesrv &
    
    • 验证启动:检查 nohup.out 日志文件,确认出现以下关键日志:

      The Name Server boot success. serializeType=JSON
      
    • 使用 jps 检查进程

      jps
      

      应显示 NamesrvStartup 进程。

  2. 启动Broker

    • 配置 broker.conf

      conf 目录下编辑 broker.conf,添加:

      autoCreateTopicEnable=true
      

      注意autoCreateTopicEnable=true 仅适用于测试环境,生产环境建议关闭,避免客户端随意创建Topic,增加管理压力。

    • 启动Broker

      nohup bin/mqbroker &
      
    • 验证启动:检查 nohup.out 日志文件,确认出现以下关键日志:

      The broker[xxxxx] boot success. serializeType=JSON
      
    • 使用 jps 检查进程

      jps
      

      应显示 BrokerStartup 进程。

环境变量配置

  • 为方便使用RocketMQ指令,可将RocketMQ的安装路径添加至环境变量。

    vi ~/.bash_profile
    

    添加以下内容:

    export ROCKETMQ_HOME=/app/rocketmq/rocketmq-all-4.9.5-bin-release
    export PATH=$ROCKETMQ_HOME/bin:$PATH
    

    使配置生效:

    source ~/.bash_profile
    

停止服务

  • 使用 mqshutdown 指令停止服务:

    mqshutdown namesrv   # 关闭NameServer
    mqshutdown broker    # 关闭Broker
    

2. 快速实现消息收发

RocketMQ提供了命令行工具和编程接口,方便快速实现消息的发送与接收。

2.1 命令行实现消息收发

步骤

  1. 配置环境变量

    • 设置 NAMESRV_ADDR 环境变量,指向启动的NameServer地址。

      vi ~/.bash_profile
      

      添加:

      export NAMESRV_ADDR='localhost:9876'
      

      使配置生效:

      source ~/.bash_profile
      
  2. 发送消息

    • 使用RocketMQ提供的Producer示例发送消息:

      tools.sh org.apache.rocketmq.example.quickstart.Producer
      
    • 日志示例

      SendResult [sendStatus=SEND_OK, msgId=C0A8E88007AC3764951D891CE9A003E7, offsetMsgId=C0A8E88000002A9F00000000000317BF, messageQueue=MessageQueue [topic=TopicTest, brokerName=worker1, queueId=1], queueOffset=249]
      
      • 解释
        • sendStatus=SEND_OK:消息发送成功。
        • msgId:消息ID,唯一标识一条消息。
        • offsetMsgId:消息在CommitLog中的偏移量ID。
        • messageQueue:消息存储的队列信息,包括Topic、Broker名称、Queue ID。
        • queueOffset:消息在队列中的偏移量,标识存储位置。
  3. 接收消息

    • 使用RocketMQ提供的Consumer示例接收消息:

      tools.sh org.apache.rocketmq.example.quickstart.Consumer
      
    • 日志示例

      ConsumeMessageThread_19 Receive New Messages: [MessageExt 
      [brokerName=worker1, queueId=2, storeSize=203, queueOffset=53, sysFlag=0, 
      bornTimestamp=1606460371999, bornHost=/192.168.232.128:43436, 
      storeTimestamp=1606460372000, storeHost=/192.168.232.128:10911, 
      msgId=C0A8E88000002A9F000000000000A7AE, commitLogOffset=42926, 
      bodyCRC=1968636794, reconsumeTimes=0, preparedTransactionOffset=0, 
      toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, 
      MAX_OFFSET=250, CONSUME_START_TIME=1606460450150, 
      UNIQ_KEY=C0A8E88007AC3764951D891CE41F00D4, CLUSTER=DefaultCluster, 
      WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 
      101, 116, 77, 81, 32, 50, 49, 50], transactionId='null'}]] 
      
      • 解释
        • brokerName:消息存储的Broker名称。
        • queueId:消息所在的Queue ID。
        • queueOffset:消息在队列中的偏移量。
        • msgId:消息ID。
        • body:消息内容,以字节数组形式展示。
    • 注意:Consumer示例不会主动结束,会持续等待新消息,可以使用 CTRL+C 手动停止。

总结:通过命令行工具,可以快速验证RocketMQ的消息发送与接收功能,了解消息的存储和消费过程。

2.2 搭建Maven客户端项目

为更深入理解RocketMQ的消息收发过程,可通过编写Maven项目,集成RocketMQ的客户端API,实现消息的发送与接收。

步骤

  1. 创建Maven项目

    • 使用IDE(如IntelliJ IDEA)或命令行创建一个标准的Maven项目。
  2. 添加RocketMQ依赖

    • pom.xml 中添加RocketMQ客户端依赖:

      <dependency>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-client</artifactId>
          <version>4.9.5</version>
      </dependency>
      
  3. 实现消息生产者

    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    public class Producer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
            // 初始化消息生产者,指定生产者组名
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            // 设置NameServer地址
            producer.setNamesrvAddr("192.168.232.128:9876");
            // 启动生产者
            producer.start();
            for (int i = 0; i < 2; i++) {
                try {
                    // 创建消息,包含Topic、Tag和消息体
                    Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    // 发送消息并获取发送结果
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
            // 关闭生产者
            producer.shutdown();
        }
    }
    

    说明

    • DefaultMQProducer:RocketMQ提供的默认消息生产者实现。
    • 组名:生产者组名需唯一,用于标识生产者实例。
    • Topic:消息所属的主题,需与消费者订阅的Topic一致。
    • Tag:消息的标签,用于消息过滤。
    • 消息体:实际传输的数据内容,需指定字符集编码。
  4. 实现消息消费者

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    
    public class Consumer {
        public static void main(String[] args) throws InterruptedException, MQClientException {
            // 初始化消息消费者,指定消费者组名
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
            // 设置NameServer地址
            consumer.setNamesrvAddr("192.168.232.128:9876");
            // 设置消费位置:从最后一个偏移量开始消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            // 订阅Topic和Tag,*表示订阅所有Tag
            consumer.subscribe("TopicTest", "*");
            // 注册消息监听器,处理接收到的消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    msgs.forEach(messageExt -> {
                        try {
                            // 打印消息内容
                            System.out.println("收到消息:" + new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
                        } catch (UnsupportedEncodingException e) {}
                    });
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            // 启动消费者
            consumer.start();
            System.out.print("Consumer Started");
        }
    }
    

    说明

    • DefaultMQPushConsumer:RocketMQ提供的默认消息消费者实现。
    • 组名:消费者组名需唯一,用于标识消费者实例。
    • ConsumeFromWhere:设置消费起始位置,如从上次偏移量继续消费、从头开始消费等。
    • 订阅:指定要订阅的Topic和Tag,Tag用于消息过滤。
    • 消息监听器:注册消息处理逻辑,接收到消息后触发回调函数进行处理。
  5. 运行与验证

    • 运行生产者:执行Producer的main方法,发送消息到RocketMQ。
    • 运行消费者:执行Consumer的main方法,接收并打印消息内容。
    • 验证结果
      • 控制台输出:消费者应打印出生产者发送的消息内容,如收到消息:Hello RocketMQ 0收到消息:Hello RocketMQ 1
      • Dashboard查看:可通过RocketMQ Dashboard查看消息的分布和消费情况。

总结:通过命令行工具和Maven项目,可以快速验证RocketMQ的消息发送与接收功能,理解消息的存储和消费机制,为后续深入学习奠定基础。

3. 搭建RocketMQ可视化管理服务

RocketMQ提供了图形化的管理控制台——Dashboard,用于实时监控和管理RocketMQ集群的运行状态。

3.1 Dashboard的下载与安装
  1. 下载Dashboard源码

    • 访问RocketMQ官网下载页面,获取Dashboard的源码包。
    • 注意:RocketMQ官方未提供直接可运行的Dashboard Jar包,需自行编译源码。
  2. 编译Dashboard源码

    • 前提条件:需要在本地安装Maven构建工具。

    • 编译步骤

      git clone https://github.com/apache/rocketmq.git
      cd rocketmq
      cd rocketmq-dashboard
      mvn clean package -Dmaven.test.skip=true
      
    • 编译结果:在 target 目录下生成可运行的Jar包,如 rocketmq-dashboard-1.0.1-SNAPSHOT.jar

  3. 上传Jar包至服务器

    • 将编译生成的Jar包上传至服务器上的 /app/rocketmq/rocketmq-dashboard 目录。

    • 示例命令

      scp target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar user@server:/app/rocketmq/rocketmq-dashboard/
      
3.2 配置Dashboard
  1. 创建配置文件

    • 在Jar包所在目录下创建 application.yml 文件,配置RocketMQ的NameServer地址。

    • 示例配置

      rocketmq: 
        config: 
          namesrvAddrs: 
            - 192.168.232.128:9876 
      
    • 说明

      • namesrvAddrs:指定RocketMQ集群中所有NameServer的地址,多个地址用-列表方式表示。
  2. 启动Dashboard服务

    • 使用Java指令运行Dashboard Jar包:

      cd /app/rocketmq/rocketmq-dashboard
      java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar
      
    • 验证启动:访问 http://192.168.232.128:8080,应看到RocketMQ Dashboard的管理页面。

3.3 Dashboard的功能介绍
  • 驾驶舱(Dashboard)
    • 功能:展示RocketMQ集群近期的运行情况,包括消息吞吐量、延迟、Broker状态等。
  • 运维页面(Operation)
    • 功能:管理NameServer服务,查看NameServer的健康状态和运行指标。
  • 集群页面(Cluster)
    • 功能:管理RocketMQ的Broker服务,查看各Broker的状态、负载情况、消息分布等。

示例界面

Dashboard示例

(请替换为实际图片链接)

总结:通过搭建RocketMQ Dashboard,可以直观地监控和管理RocketMQ集群的运行状态,实时了解消息的流动和Broker的健康情况,极大地方便了运维和管理工作。

4. 升级分布式集群

单台服务器搭建的RocketMQ服务无法满足生产环境的高可用性和稳定性需求,因此需要将其升级为分布式集群,避免单点故障,提高系统的容错能力。

4.1 分布式集群的架构

主从架构

  • 架构图

    主从架构

    (请替换为实际图片链接)

  • 特点

    • Master节点:负责处理客户端的请求,存储和转发消息。
    • Slave节点:备份Master节点的数据,防止数据丢失。
    • 容错机制:当Master节点出现故障时,Slave节点可以接管,确保消息不丢失。

集群方案

  • 集群规划

    机器名NameServer服务部署Broker服务部署
    worker1NameServer
    worker2NameServerbroker-a (Master), broker-b-s (Slave)
    worker3NameServerbroker-a-s (Slave), broker-b (Master)
  • 2主2从配置

    • broker-a:在worker2上部署Master,在worker3上部署Slave。
    • broker-b:在worker3上部署Master,在worker2上部署Slave。
4.2 部署步骤

步骤概览

  1. 准备服务器:准备三台相同配置的Linux服务器,配置 /etc/hosts 文件,方便通过主机名访问。

    cat /etc/hosts
    192.168.232.128 worker1
    192.168.232.129 worker2
    192.168.232.130 worker3
    
  2. 部署NameServer服务

    • 在三台服务器上分别启动NameServer服务,按照之前的步骤启动即可。

    • 启动命令

      cd /app/rocketmq/rocketmq-all-4.9.5-bin-release
      nohup bin/mqnamesrv &
      
  3. 配置Broker集群

    • 使用RocketMQ提供的集群配置模板,如 2m-2s-async,进行集群配置。

    • 配置文件说明

      • brokerClusterName:集群名称,相同的名称表示同一个集群。
      • brokerName:Broker名称,相同名称表示一组主从节点。
      • brokerId:Broker的唯一标识,0表示Master,>0表示Slave。
      • namesrvAddr:NameServer地址列表,用分号分隔。
      • autoCreateTopicEnable:是否允许Broker自动创建Topic,建议生产环境关闭。
      • storePath:消息存储路径,不同Broker需指定不同路径,避免冲突。
      • brokerRole:Broker角色,ASYNC_MASTER表示异步Master,SLAVE表示Slave。
      • listenPort:Broker监听端口,确保同一台机器上不同Broker使用不同端口。
    • 配置broker-a Master(worker2)

      编辑 conf/2m-2s-async/broker-a.properties

      brokerClusterName=rocketmq-cluster
      brokerName=broker-a
      brokerId=0
      namesrvAddr=worker1:9876;worker2:9876;worker3:9876
      autoCreateTopicEnable=true
      deleteWhen=04
      fileReservedTime=120
      storePathRootDir=/app/rocketmq/store
      storePathCommitLog=/app/rocketmq/store/commitlog
      storePathConsumeQueue=/app/rocketmq/store/consumequeue
      storePathIndex=/app/rocketmq/store/index
      storeCheckpoint=/app/rocketmq/store/checkpoint
      abortFile=/app/rocketmq/store/abort
      brokerRole=ASYNC_MASTER
      flushDiskType=ASYNC_FLUSH
      listenPort=10911
      
    • 配置broker-a Slave(worker3)

      编辑 conf/2m-2s-async/broker-a-s.properties

      brokerClusterName=rocketmq-cluster
      brokerName=broker-a
      brokerId=1
      namesrvAddr=worker1:9876;worker2:9876;worker3:9876
      autoCreateTopicEnable=true
      deleteWhen=04
      fileReservedTime=120
      storePathRootDir=/app/rocketmq/storeSlave
      storePathCommitLog=/app/rocketmq/storeSlave/commitlog
      storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue
      storePathIndex=/app/rocketmq/storeSlave/index
      storeCheckpoint=/app/rocketmq/storeSlave/checkpoint
      abortFile=/app/rocketmq/storeSlave/abort
      brokerRole=SLAVE
      flushDiskType=ASYNC_FLUSH
      listenPort=11011
      
    • 配置broker-b Master(worker3)

      编辑 conf/2m-2s-async/broker-b.properties

      brokerClusterName=rocketmq-cluster
      brokerName=broker-b
      brokerId=0
      namesrvAddr=worker1:9876;worker2:9876;worker3:9876
      autoCreateTopicEnable=true
      deleteWhen=04
      fileReservedTime=120
      storePathRootDir=/app/rocketmq/store
      storePathCommitLog=/app/rocketmq/store/commitlog
      storePathConsumeQueue=/app/rocketmq/store/consumequeue
      storePathIndex=/app/rocketmq/store/index
      storeCheckpoint=/app/rocketmq/store/checkpoint
      abortFile=/app/rocketmq/store/abort
      brokerRole=ASYNC_MASTER
      flushDiskType=ASYNC_FLUSH
      listenPort=10911
      
    • 配置broker-b Slave(worker2)

      编辑 conf/2m-2s-async/broker-b-s.properties

      brokerClusterName=rocketmq-cluster
      brokerName=broker-b
      brokerId=1
      namesrvAddr=worker1:9876;worker2:9876;worker3:9876
      autoCreateTopicEnable=true
      deleteWhen=04
      fileReservedTime=120
      storePathRootDir=/app/rocketmq/storeSlave
      storePathCommitLog=/app/rocketmq/storeSlave/commitlog
      storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue
      storePathIndex=/app/rocketmq/storeSlave/index
      storeCheckpoint=/app/rocketmq/storeSlave/checkpoint
      abortFile=/app/rocketmq/storeSlave/abort
      brokerRole=SLAVE
      flushDiskType=ASYNC_FLUSH
      listenPort=11011
      

关键配置项说明

  • brokerClusterName:集群名称,相同名称的Broker在同一集群内。
  • brokerName:Broker名称,相同名称的Broker构成一组主从节点。
  • brokerId:Broker的唯一标识,0表示Master,1表示Slave。
  • namesrvAddr:NameServer地址列表,多个地址用分号分隔。
  • autoCreateTopicEnable:是否允许Broker自动创建Topic,生产环境建议关闭。
  • storePath:消息存储路径,不同Broker需指定不同路径,避免数据冲突。
  • brokerRole:Broker角色,ASYNC_MASTER表示异步Master,SLAVE表示Slave。
  • listenPort:Broker监听端口,不同Broker需指定不同端口。

注意事项

  • 存储路径:在同一台服务器上部署多个Broker时,需确保不同Broker的存储路径不冲突。
  • 监听端口:不同Broker需指定不同的监听端口,避免端口冲突。
  • 网络配置:如果服务器配置了多网卡(如内网与外网),需在配置文件中指定 brokerIP1 属性,指向外网IP地址,确保其他服务器能访问到RocketMQ服务。
4.3 启动Broker服务

启动命令

  • 指定配置文件启动Broker

    使用 -c 参数指定Broker的配置文件。

  • 在worker2上启动broker-a Master和broker-b Slave

    cd /app/rocketmq/rocketmq-all-4.9.5-bin-release
    nohup bin/mqbroker -c ./conf/2m-2s-async/broker-a.properties &
    nohup bin/mqbroker -c ./conf/2m-2s-async/broker-b-s.properties &
    
  • 在worker3上启动broker-b Master和broker-a Slave

    cd /app/rocketmq/rocketmq-all-4.9.5-bin-release
    nohup bin/mqbroker -c ./conf/2m-2s-async/broker-b.properties &
    nohup bin/mqbroker -c ./conf/2m-2s-async/broker-a-s.properties &
    

验证启动

  • 使用 jps 检查Broker进程是否正常运行。

  • 检查 nohup.out 日志文件,确认出现以下关键日志:

    The broker[xxxxx] boot success. serializeType=JSON
    
4.4 检查集群服务状态

使用 mqadmin 指令

  • 查看集群列表

    cd /app/rocketmq/rocketmq-all-4.9.5-bin-release/bin
    mqadmin clusterList
    
  • 示例输出

    #Cluster Name     #Broker Name    #BID  #Addr                  #Version  #InTPS(LOAD)  #OutTPS(LOAD) #PCWait(ms) #Hour  #SPACE
    rocketmq-cluster  broker-a        0     192.168.232.129:10911  V4_9_1    0.00(0,0ms)    0.00(0,0ms)    0         3425.28 0.3594
    rocketmq-cluster  broker-a        1     192.168.232.130:11011  V4_9_1    0.00(0,0ms)    0.00(0,0ms)    0         3425.28 0.3607
    rocketmq-cluster  broker-b        0     192.168.232.130:10911  V4_9_1    0.00(0,0ms)    0.00(0,0ms)    0         3425.27 0.3607
    rocketmq-cluster  broker-b        1     192.168.232.129:11011  V4_9_1    0.00(0,0ms)    0.00(0,0ms)    0         3425.27 0.3594
    
    • 字段说明
      • Cluster Name:集群名称。
      • Broker Name:Broker名称。
      • BID:Broker ID,0表示Master,1表示Slave。
      • Addr:Broker地址(IP:Port)。
      • Version:RocketMQ版本。
      • InTPS:消息输入吞吐量(TPS)。
      • OutTPS:消息输出吞吐量(TPS)。
      • PCWait(ms):消息处理等待时间(毫秒)。
      • Hour:小时数。
      • SPACE:磁盘空间使用情况。

使用Dashboard查看

  • 配置Dashboard

    编辑 application.yml,添加所有NameServer地址:

    rocketmq: 
      config: 
        namesrvAddrs: 
          - worker1:9876 
          - worker2:9876 
          - worker3:9876
    
  • 启动Dashboard

    cd /app/rocketmq/rocketmq-dashboard
    java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar
    
  • 查看集群状态

    登录Dashboard,进入“集群”菜单页,即可查看集群的运行情况,包括Broker状态、消息吞吐量、磁盘使用等。

总结:通过主从架构的分布式集群搭建,RocketMQ能够有效防止单点故障,提高系统的容错能力和数据的可靠性,适应生产环境的高可用需求。

5. 升级高可用集群

虽然主从架构的RocketMQ集群可以防止消息丢失,但在服务高可用性方面仍有不足。为了进一步提升集群的高可用性,RocketMQ引入了Dledger集群,基于Raft协议实现自动主从切换,解决主从架构下的服务不可用问题。

5.1 主从架构的不足
  • 服务高可用性问题

    • 现象:当Master节点宕机,虽然Slave节点备份了数据,但Slave节点无法自动升级为Master,导致集群部分功能不可用。
    • 影响:客户端请求需要转发到其他Broker,原Master上的消息无法读取,系统部分功能中断。
  • 数据一致性问题

    • 主从架构下,Slave节点数据备份可能存在延迟,不能保证数据的实时一致性。
5.2 Dledger集群的引入

Dledger集群 是RocketMQ自4.5版本引入的高可用集群解决方案,基于Raft协议实现Leader选举和数据一致性。

特点

  • 自动Leader选举

    • 集群中的Broker节点通过Raft协议选举出Leader(类似Master),其他节点为Follower(类似Slave)。
    • 当Leader宕机时,自动重新选举新的Leader,确保集群持续可用。
  • 强一致性

    • 使用Raft协议保证CommitLog日志文件在集群中的强一致性,确保所有节点的数据同步一致。
  • 高容错性

    • 只要超过半数的节点正常运行,集群就能正常工作。
    • 适合部署奇数台服务器,提升集群的容错能力。

架构图

Dledger集群架构

(请替换为实际图片链接)

5.3 搭建Dledger集群

集群结构

  • 节点规划:3个Broker节点,确保集群有超过半数(2个节点)正常运行即可。
  • 集群特点:自动Leader选举,Follower可以随时升级为Leader,解决主从架构下的高可用性问题。

部署步骤

  1. 部署NameServer

    • 与主从集群相同,在三台服务器上分别启动NameServer服务。

    • 启动命令

      cd /app/rocketmq/rocketmq-all-4.9.5-bin-release
      nohup bin/mqnamesrv &
      
  2. 配置Dledger Broker

    • 使用RocketMQ提供的Dledger配置模板,编辑 broker.conf 文件。

    • 关键配置项

      • enableDLegerCommitLog:开启Dledger功能,设置为 true
      • dLegerGroup:Raft集群组名,建议与 brokerName 保持一致。
      • dLegerPeers:集群中所有节点的地址及端口,格式为 n0-worker1:40911;n1-worker2:40911;n2-worker3:40911
      • dLegerSelfId:当前节点在集群中的唯一ID,需与 dLegerPeers 中的ID对应。
      • storePath:Dledger的消息存储路径,不同节点需指定不同路径。
      • listenPort:Dledger Broker监听端口,确保不同Broker使用不同端口。
    • 配置示例

      • worker1的 broker.conf

        brokerClusterName=RaftCluster
        brokerName=RaftNode00
        listenPort=30911
        namesrvAddr=worker1:9876;worker2:9876;worker3:9876
        storePathRootDir=/app/rocketmq/storeDledger/
        storePathCommitLog=/app/rocketmq/storeDledger/commitlog
        storePathConsumeQueue=/app/rocketmq/storeDledger/consumequeue
        storePathIndex=/app/rocketmq/storeDledger/index
        storeCheckpoint=/app/rocketmq/storeDledger/checkpoint
        abortFile=/app/rocketmq/storeDledger/abort
        enableDLegerCommitLog=true
        dLegerGroup=RaftNode00
        dLegerPeers=n0-worker1:40911;n1-worker2:40911;n2-worker3:40911
        dLegerSelfId=n0
        sendMessageThreadPoolNums=16
        
      • worker2的 broker.conf

        brokerClusterName=RaftCluster
        brokerName=RaftNode00
        listenPort=30911
        namesrvAddr=worker1:9876;worker2:9876;worker3:9876
        storePathRootDir=/app/rocketmq/storeDledger/
        storePathCommitLog=/app/rocketmq/storeDledger/commitlog
        storePathConsumeQueue=/app/rocketmq/storeDledger/consumequeue
        storePathIndex=/app/rocketmq/storeDledger/index
        storeCheckpoint=/app/rocketmq/storeDledger/checkpoint
        abortFile=/app/rocketmq/storeDledger/abort
        enableDLegerCommitLog=true
        dLegerGroup=RaftNode00
        dLegerPeers=n0-worker1:40911;n1-worker2:40911;n2-worker3:40911
        dLegerSelfId=n1
        sendMessageThreadPoolNums=16
        
      • worker3的 broker.conf

        brokerClusterName=RaftCluster
        brokerName=RaftNode00
        listenPort=30911
        namesrvAddr=worker1:9876;worker2:9876;worker3:9876
        storePathRootDir=/app/rocketmq/storeDledger/
        storePathCommitLog=/app/rocketmq/storeDledger/commitlog
        storePathConsumeQueue=/app/rocketmq/storeDledger/consumequeue
        storePathIndex=/app/rocketmq/storeDledger/index
        storeCheckpoint=/app/rocketmq/storeDledger/checkpoint
        abortFile=/app/rocketmq/storeDledger/abort
        enableDLegerCommitLog=true
        dLegerGroup=RaftNode00
        dLegerPeers=n0-worker1:40911;n1-worker2:40911;n2-worker3:40911
        dLegerSelfId=n2
        sendMessageThreadPoolNums=16
        
    • 说明

      • dLegerGroup:Raft集群组名,建议与 brokerName 保持一致,便于管理。
      • dLegerPeers:定义集群中所有节点的地址和端口,确保所有节点的配置一致。
      • dLegerSelfId:当前节点的唯一ID,必须与 dLegerPeers 中的ID对应,确保集群唯一性。
      • sendMessageThreadPoolNums:Dledger内部发送消息的线程数,建议与服务器的CPU核心数相匹配,提升性能。
  3. 启动Broker服务

    • 在三台服务器上分别启动配置好的Broker服务:

      cd /app/rocketmq/rocketmq-all-4.9.5-bin-release/
      nohup bin/mqbroker -c conf/dledger/broker.conf &
      
  4. 检查集群服务状态

    • 使用Dashboard查看

      登录RocketMQ Dashboard,进入“集群”菜单页,查看Dledger集群的运行状态,包括Leader选举、节点状态、消息同步等。

    • 使用 mqadmin 指令查看

      cd /app/rocketmq/rocketmq-all-4.9.5-bin-release/bin
      mqadmin clusterList
      
    • 验证Leader选举

      • 在Dledger集群中,自动选举出一个Leader节点(类似Master),其他节点为Follower。
      • 测试高可用性:停止Leader节点,观察集群自动选举出新的Leader,确保集群持续可用。

总结:通过Dledger集群的搭建,RocketMQ实现了更高的可用性和数据一致性,自动Leader选举和数据同步机制有效解决了主从架构下的高可用性问题,适用于对高可靠性和高可用性要求极高的业务场景。

5.4 关于Dledger集群的一些补充

DLedger的功能

  • 数据复制
    • 通过Raft协议,集群中每个节点都维护一份相同的数据副本,确保数据一致性。
  • 容错性
    • 集群具有高容错性,只要超过半数的节点正常运行,集群即可正常工作。
  • 高可用性
    • 通过Leader选举和负载均衡机制,确保在节点故障时能够快速切换,维持集群的持续可用。
  • 分布式锁
    • 提供分布式锁功能,解决分布式系统中的资源竞争和同步问题,实现跨节点的资源同步。
  • 强一致性
    • 使用Raft一致性协议,确保集群中所有副本节点的数据一致性,避免数据冲突和不一致。

Raft协议与脑裂问题

  • Raft协议

    • 选举机制:通过选举Leader节点,确保集群中只有一个Leader,避免多个Leader同时存在。
    • 任期管理:每个选举周期设置递增的任期编号,防止旧Leader重新成为Leader。
    • 心跳机制:Leader定期发送心跳消息,保持与Follower的连接,及时发现Leader故障。
    • 日志复制:Leader负责将日志条目复制到Follower,确保数据的一致性。
  • 脑裂问题

    • 定义:由于网络分区等原因,集群被分割为多个子集群,各自独立运行,可能导致多个Leader共存。
    • 解决方案:Raft协议通过多数决机制,确保只有一个Leader存在,防止脑裂问题的发生。

ChatGPT关于Dledger功能的描述

RocketMQ是一款分布式消息队列系统,主要用于处理大量数据的实时传输和处理。在RocketMQ中,DLedger是一个为高可用、高性能、高可靠的分布式存储系统提供基础支持的组件。DLedger集群主要具有以下功能:

  • 数据复制:通过Raft协议保证数据一致性,集群中每个节点都维护相同的数据副本。
  • 容错性:即使部分节点故障,只要集群中有大多数节点正常工作,集群仍能提供服务。
  • 高可用性:通过负载均衡和热备份机制,确保在节点故障时快速切换到其他节点,提高系统可用性。
  • 分布式锁:提供分布式锁功能,解决分布式系统中的资源争用,实现跨节点资源同步。
  • 强一致性:通过Raft协议确保集群中副本节点之间的数据同步,保证数据一致性。
  • 高性能:支持水平扩展,通过增加节点提升系统的吞吐量和存储能力,满足业务需求。
  • 易于管理:提供管理和监控功能,便于运维人员掌握系统运行状况,及时发现和解决问题。

总结:Dledger集群通过Raft协议实现自动Leader选举和数据一致性,解决了主从架构下的高可用性问题,提高了RocketMQ集群的容错能力和数据可靠性,为高并发、高可靠性的业务场景提供了坚实的基础。

6. 总结RocketMQ的运行架构

通过前面的实战操作,我们对RocketMQ的运行架构有了更深入的理解。以下是RocketMQ运行时的整体架构及各组件的详细作用。

6.1 RocketMQ的整体架构
+------------+     +-----------+     +----------+
| Producer   | --> | Broker    | <-- | Consumer |
+------------+     +-----------+     +----------+
       |                 |
   +-------+         +-------+
   | NameServer       | Dashboard |
   +-------+         +-------+
6.2 组件详细介绍
  1. NameServer(命名服务)

    • 作用:NameServer负责管理RocketMQ集群中所有Broker的地址信息,提供路由服务。类似于DNS的角色,客户端和Broker通过NameServer进行地址发现和路由查询。

    • 特点

      • 无状态:NameServer不存储任何状态信息,支持水平扩展。
      • 轻量级:启动和运行资源消耗较低。
      • 高可用性:通过部署多个NameServer实例,确保服务的高可用性。
  2. Broker(核心服务)

    • 作用:Broker是RocketMQ的核心组件,负责消息的存储、转发、查询和管理。所有消息的生产者和消费者都通过Broker进行消息传递。

    • 功能

      • 消息存储:将消息持久化存储在CommitLog和ConsumeQueue中,确保消息的可靠性。
      • 消息转发:根据Topic和Queue进行消息的分发和转发。
      • 消息查询:提供消息的查询和检索功能。
      • 集群管理:支持主从集群和Dledger集群,确保高可用性和数据一致性。
    • 特点

      • 高性能:通过高效的IO处理和内存管理,实现高吞吐量和低延迟。
      • 可扩展性:支持水平扩展,通过增加Broker实例提升系统性能和容量。
      • 丰富的功能:支持广播消费、延迟队列、死信队列、事务消息等高级功能。
  3. Client(客户端)

    • 作用:Client包括消息生产者和消费者,是应用程序与RocketMQ集群交互的接口。生产者负责发送消息,消费者负责接收和处理消息。

    • 功能

      • 消息发送:生产者通过客户端API发送消息到指定的Topic。
      • 消息消费:消费者通过客户端API订阅Topic,接收并处理消息。
      • 负载均衡:客户端自动进行负载均衡,分配消息到不同的MessageQueue,提升系统性能。
      • 消费进度管理:客户端记录消费进度(Offset),确保每条消息在消费者组内只被消费一次。
    • 特点

      • 易用性:提供丰富的API接口,支持多语言客户端(Java、C++, Python等)。
      • 灵活性:支持多种消费模式,如广播消费、集群消费等。
      • 可靠性:通过事务消息和消息重试机制,确保消息的可靠传递和处理。
  4. Dashboard(管理控制台)

    • 作用:Dashboard提供RocketMQ集群的可视化管理和监控功能,帮助运维人员实时了解集群状态、消息流动和性能指标。

    • 功能

      • 集群监控:实时展示集群中各Broker的状态、负载情况、消息吞吐量等。
      • 消息管理:查看和管理Topic、Queue、消息分布等信息。
      • 运维操作:提供Broker的启动、停止、配置管理等运维操作。
      • 告警通知:设置告警规则,实时通知运维人员集群异常情况。
    • 特点

      • 直观性:图形化界面,数据展示直观易懂。
      • 全面性:涵盖集群运行的各个方面,提供全面的监控和管理功能。
      • 易操作性:简化运维流程,提高运维效率。

运行架构总结

  • 协调与通信:客户端通过NameServer获取Broker地址,直接与Broker进行消息发送和接收。NameServer作为集群的协调中心,确保消息路由的准确性和高效性。

  • 消息流动:生产者发送消息到指定的Topic,Broker根据Topic将消息存储到对应的MessageQueue中。消费者订阅Topic,通过消息队列获取并处理消息,消费进度由消费者组记录,确保消息不重复消费。

  • 集群管理:Broker集群通过主从架构或Dledger集群实现高可用性和数据一致性,NameServer集群通过多实例部署实现高可用性。Dashboard作为监控和管理工具,实时监控集群状态,提供便捷的运维操作。

示意图

RocketMQ架构图

(请替换为实际图片链接)

四、理解RocketMQ的消息模型

RocketMQ的消息模型是其核心设计之一,决定了消息的存储、传递和消费方式。理解消息模型有助于更好地设计和优化基于RocketMQ的系统。

4.1 消息发送与存储流程

  1. 生产者发送消息

    • 指定Topic:生产者在发送消息时,指定消息所属的Topic。
    • 消息分配:RocketMQ根据Topic将消息分配到对应的MessageQueue中,分散存储到不同的Broker上。
    • 消息属性:每条消息包含Topic、Tag、消息体(Body)等属性,并生成唯一的 msgIdqueueOffset
  2. 消息存储

    • CommitLog:所有消息首先写入CommitLog,RocketMQ通过顺序写入确保高性能。
    • ConsumeQueue:每个MessageQueue对应一个ConsumeQueue,记录消息的索引信息,包括偏移量、存储位置等。
    • Index:为了支持快速查询,RocketMQ为消息建立索引,加快消息检索速度。
  3. 消费者消费消息

    • 订阅Topic:消费者订阅指定的Topic,接收属于该Topic的所有消息。
    • 消息拉取:消费者通过ConsumeQueue定位消息在CommitLog中的存储位置,读取并处理消息。
    • 消费进度:消费者组记录每个MessageQueue的消费进度(Offset),确保每条消息在组内只被消费一次。

4.2 消息模型示意图

+-----------+       +--------------+       +------------+
| Producer  | ----> | MessageQueue | ----> | Broker     |
+-----------+       +--------------+       +------------+
      |                   |                     |
      |                   |                     |
      |             +-----------+         +-----------+
      |             | Consume   |         | CommitLog |
      |             | Queue     |         +-----------+
      |             +-----------+               |
      |                   |                     |
      |                   |                     |
      |             +-----------+               |
      |             | Index     | <-------------+
      |             +-----------+
      |
      v
+-----------+
| Consumer  |
+-----------+

4.3 消息模型的关键概念

  • Topic

    • 定义:消息的逻辑分类,相当于消息的主题或类别。
    • 作用:生产者发送消息到特定Topic,消费者订阅Topic以接收相关消息。
    • 管理:Topic需由管理员在Broker端预先创建,生产环境中不建议开启自动创建。
  • MessageQueue

    • 定义:每个Topic下的消息存储队列,具备FIFO特性。
    • 作用:分散存储消息,提升系统的并发处理能力。
    • 分布:MessageQueue分布在不同的Broker上,实现负载均衡。
  • CommitLog

    • 定义:RocketMQ中所有消息的物理存储文件,采用顺序写入方式。
    • 作用:确保消息的高性能写入和持久化存储。
    • 特点:支持顺序写入,提升写入性能;支持高效的读取和索引。
  • ConsumeQueue

    • 定义:每个MessageQueue对应的消费索引文件,记录消息在CommitLog中的位置信息。
    • 作用:支持消费者快速定位和读取消息,提高消费效率。
    • 特点:轻量级,占用较少存储空间;支持高效的消息查询。
  • Index

    • 定义:RocketMQ为支持快速查询建立的消息索引。
    • 作用:加快消息检索速度,支持按照消息属性(如Tag)快速过滤消息。
    • 特点:支持高效的索引查找,提升消息查询性能。
  • Offset(偏移量)

    • 定义:记录消息在MessageQueue中的存储位置。
    • 作用:用于定位和读取消息,确保消息按照顺序消费。
    • 管理:消费者组记录每个MessageQueue的消费Offset,保证消息不重复消费。

4.4 消息模型的优势

  • 高性能:通过CommitLog的顺序写入和ConsumeQueue的高效索引,确保消息的高性能存储和消费。
  • 可扩展性:通过MessageQueue的分布式存储和多Broker集群,支持高并发和大规模消息处理。
  • 灵活性:支持多种消费模式(广播、集群),满足不同业务需求。
  • 可靠性:通过主从架构或Dledger集群,实现消息的高可用性和数据的持久化存储。

总结:RocketMQ的消息模型通过Topic、MessageQueue、CommitLog、ConsumeQueue、Index等核心组件,实现了高性能、高可扩展性和高可靠性的消息存储与传递机制,适应了现代互联网和企业级应用的复杂需求。

4.5 RocketMQ与Kafka的对比

RocketMQ和Kafka都是分布式消息队列系统,虽然在设计理念上有相似之处,但在实现和功能上存在显著差异。

特性Apache KafkaApache RocketMQ
设计初衷日志收集与大数据采集场景广泛的消息传递与高可靠性场景
消息存储Partition文件,分布式存储CommitLog与ConsumeQueue,分布式存储
高可用性使用Kafka自身的集群管理机制,Kraft使用Dledger(基于Raft协议)或主从架构
Topic管理Topic过多时性能下降设计优化,支持大量Topic而性能不受影响
消息可靠性可能存在数据丢失风险高可靠性,特别适合金融场景
高级功能基础消息功能支持广播消费、延迟队列、死信队列、事务消息等
客户端语言支持多语言支持Java为主,支持多种客户端协议
性能吞吐量极高吞吐量高,性能稍低于Kafka但高于RabbitMQ
社区与生态社区活跃,生态成熟社区活跃,持续发展中

具体对比

  • Topic管理

    • Kafka:Topic数量过多会导致Partition文件过多,影响文件索引耗时,降低IO性能。
    • RocketMQ:优化了Topic的管理,支持大量Topic而不会显著影响整体性能,适合需要高Topic数量的场景。
  • Leader选举与高可用性

    • Kafka:使用自己的Kraft集群管理机制,优先保证服务可用性,但可能在一定程度上牺牲消息的安全性。
    • RocketMQ:采用Dledger集群(基于Raft协议)或主从架构,确保高可靠性和数据一致性,特别适合对消息安全性要求高的金融场景。

总结:RocketMQ在高可靠性、消息安全性和高级功能方面表现优异,适用于对消息传递要求高的企业级和金融级应用;而Kafka在日志收集和大数据采集等场景中表现出色,适合需要极高吞吐量和实时数据处理的业务需求。

四、总结RocketMQ的运行架构

通过前述的快速实战和消息模型的理解,我们对RocketMQ的运行架构有了全面的认识。以下将结合实际操作,总结RocketMQ的运行架构及各组件的具体作用。

4.1 RocketMQ运行架构概览

+------------+     +-----------+     +----------+
| Producer   | --> | Broker    | <-- | Consumer |
+------------+     +-----------+     +----------+
       |                 |
   +-------+         +-------+
   | NameServer       | Dashboard |
   +-------+         +-------+

4.2 组件详细作用

  1. NameServer(命名服务)

    • 功能
      • Broker注册与发现:Broker启动后向NameServer注册自身的地址信息,客户端和其他Broker通过NameServer获取Broker地址,实现动态路由。
      • 路由服务:提供Topic到MessageQueue的路由信息,客户端根据路由信息直接与Broker进行消息发送和接收。
    • 特点
      • 无状态:NameServer不保存任何状态信息,支持多实例部署,实现高可用性。
      • 轻量级:资源占用低,易于部署和扩展。
  2. Broker(核心服务)

    • 功能
      • 消息存储:负责将生产者发送的消息存储到CommitLog和ConsumeQueue中,确保消息持久化。
      • 消息转发:根据Topic和Queue将消息转发到消费者,支持广播消费和集群消费模式。
      • 消息查询:提供消息的查询和检索功能,支持按条件过滤消息。
      • 集群管理:通过主从架构或Dledger集群实现高可用性和数据一致性,防止单点故障。
    • 特点
      • 高性能:通过顺序写入和高效的内存管理,实现高吞吐量和低延迟。
      • 可扩展性:支持水平扩展,通过增加Broker实例提升系统性能和容量。
      • 丰富的功能:支持高级消息功能,如事务消息、延迟消息、死信消息等,满足复杂业务需求。
  3. Client(客户端)

    • 功能
      • 消息生产:生产者通过客户端API发送消息到指定的Topic,支持同步和异步发送模式。
      • 消息消费:消费者通过客户端API订阅Topic,接收并处理消息,支持并发消费和顺序消费。
      • 负载均衡:客户端自动进行负载均衡,分配消息到不同的MessageQueue,提高系统性能。
      • 消费进度管理:消费者组记录消费进度(Offset),确保消息不重复消费,支持消息重试机制。
    • 特点
      • 易用性:提供简单易用的API接口,支持多种编程语言和客户端协议。
      • 灵活性:支持多种消费模式,如广播消费、集群消费、事务消息消费等。
      • 可靠性:通过事务消息和消息重试机制,确保消息的可靠传递和处理。
  4. Dashboard(管理控制台)

    • 功能
      • 集群监控:实时展示RocketMQ集群中各Broker的状态、负载情况、消息吞吐量等指标。
      • 消息管理:查看和管理Topic、MessageQueue、消息分布等信息。
      • 运维操作:提供Broker的启动、停止、配置管理等运维操作,简化运维流程。
      • 告警与通知:设置告警规则,实时通知运维人员集群异常情况,及时响应和处理问题。
    • 特点
      • 直观性:图形化界面,数据展示直观易懂,方便快速定位问题。
      • 全面性:涵盖集群运行的各个方面,提供全面的监控和管理功能。
      • 易操作性:简化运维流程,提高运维效率,减少人为操作错误。

4.3 运行架构详细流程

  1. 消息发送流程

    • 生产者:通过客户端API发送消息到指定的Topic。
    • NameServer:生产者通过NameServer获取Broker地址,确定发送目标。
    • Broker:接收到消息后,将消息存储到CommitLog和ConsumeQueue中,确保消息持久化。
    • 消息分配:Broker根据Topic将消息分配到对应的MessageQueue,分散存储到不同的Broker实例上。
  2. 消息消费流程

    • 消费者:通过客户端API订阅Topic,接收消息。
    • NameServer:消费者通过NameServer获取Broker地址,确定消费源。
    • Broker:消费者通过ConsumeQueue定位消息在CommitLog中的存储位置,读取并处理消息。
    • 消费进度:消费者组记录每个MessageQueue的消费Offset,确保消息按顺序消费且不重复消费。
  3. 集群管理流程

    • 主从架构:Master负责处理请求,Slave备份数据,确保消息不丢失。
    • Dledger集群:通过Raft协议实现自动Leader选举和数据一致性,提升集群的高可用性和容错性。
    • Dashboard监控:实时监控集群状态,提供运维操作和告警通知,保障集群稳定运行。

4.4 集群故障处理

  1. 主从架构下的故障处理

    • 故障检测:监控Master节点的健康状态,发现故障后自动切换到Slave节点。
    • 数据备份:Slave节点备份Master节点的数据,防止数据丢失。
    • 服务恢复:切换到Slave节点后,系统继续提供消息服务,保障业务连续性。
  2. Dledger集群下的故障处理

    • 自动Leader选举:当Leader节点故障时,集群自动选举新的Leader,确保集群持续可用。
    • 数据一致性:通过Raft协议保证所有节点的数据一致性,避免数据冲突和不一致。
    • 高容错性:只要超过半数的节点正常运行,集群即可正常工作,提升系统的容错能力。

总结:RocketMQ通过NameServer、Broker、Client和Dashboard等核心组件,实现了高性能、高可用性和高可靠性的消息传递机制。集群管理通过主从架构和Dledger集群,确保系统在高并发和故障情况下的稳定运行。理解RocketMQ的运行架构有助于更好地设计和优化基于RocketMQ的分布式系统。

五、理解RocketMQ的消息模型

RocketMQ的消息模型是其设计的核心,决定了消息的存储、传递和消费方式。通过深入理解消息模型,可以更高效地设计和优化消息传递系统。

5.1 消息发送与存储流程

  1. 生产者发送消息

    • 指定Topic:生产者在发送消息时,必须指定消息所属的Topic。Topic是消息的逻辑分类,类似于邮件的主题。

    • 消息分配

      • RocketMQ根据Topic将消息分配到对应的MessageQueue中。
      • 每个Topic可以包含多个MessageQueue,消息在多个Queue中分布存储,实现负载均衡。
    • 消息属性

      • msgId:每条消息的唯一标识,用于追踪和查询。
      • queueOffset:消息在MessageQueue中的偏移量,标识消息存储位置。
    • 存储过程

      • CommitLog:所有消息首先写入CommitLog文件,确保消息的持久化存储。
      • ConsumeQueue:每个MessageQueue对应一个ConsumeQueue文件,记录消息在CommitLog中的存储位置和偏移量。
      • Index:为支持快速查询,RocketMQ为消息建立索引,提升消息检索速度。
  2. 消息存储

    • CommitLog
      • 特点:顺序写入,支持高性能的消息存储。
      • 作用:持久化存储所有消息,确保消息不丢失。
    • ConsumeQueue
      • 特点:轻量级,占用较少存储空间。
      • 作用:记录消息在CommitLog中的存储位置,支持快速定位和消费。
    • Index
      • 特点:支持高效的索引查找,提升消息检索性能。
      • 作用:按消息属性(如Tag)快速过滤和查询消息。

5.2 消息消费流程

  1. 消费者订阅Topic

    • 订阅:消费者通过客户端API订阅一个或多个Topic,接收对应的消息。

    • 消费模式

      • 广播消费:所有消费者实例均可消费同一消息,实现广播。
      • 集群消费:每条消息仅由一个消费者实例消费,实现负载均衡。
  2. 消息拉取与处理

    • 拉取消息
      • 消费者通过ConsumeQueue定位消息在CommitLog中的存储位置。
      • 根据Offset读取消息内容,实现高效的消息消费。
    • 消息处理
      • 消费者处理接收到的消息,可以进行业务逻辑处理、数据存储等操作。
      • 处理完成后,消费者更新消费进度(Offset),确保消息不重复消费。
  3. 消费进度管理

    • Offset记录:消费者组记录每个MessageQueue的消费Offset,标识已消费的消息位置。

    • 消费重试:如果消息消费失败,RocketMQ支持消息的重试机制,确保消息最终被消费。

    • 事务消息:支持分布式事务,确保消息与业务操作的一致性。

5.3 消息模型示意图

+-----------+       +--------------+       +------------+
| Producer  | ----> | MessageQueue | ----> | Broker     |
+-----------+       +--------------+       +------------+
      |                   |                     |
      |                   |                     |
      |             +-----------+         +-----------+
      |             | Consume   |         | CommitLog |
      |             | Queue     |         +-----------+
      |             +-----------+               |
      |                   |                     |
      |                   |                     |
      |             +-----------+               |
      |             | Index     | <-------------+
      |             +-----------+
      |
      v
+-----------+
| Consumer  |
+-----------+

说明

  • Producer发送消息到MessageQueue:生产者通过指定的Topic将消息发送到对应的MessageQueue中。
  • MessageQueue存储消息:消息被存储在CommitLog中,同时ConsumeQueue记录消息的存储位置。
  • Consumer从MessageQueue消费消息:消费者通过ConsumeQueue定位消息在CommitLog中的位置,读取并处理消息。
  • Index支持快速查询:Index提供按消息属性(如Tag)的快速检索功能,提升消息查询效率。

5.4 消息模型的关键概念

  • Topic

    • 定义:消息的逻辑分类,用于组织和管理消息。
    • 作用:生产者发送消息到指定的Topic,消费者订阅Topic接收消息。
    • 管理:Topic需由管理员在Broker端预先创建,生产环境中建议关闭自动创建功能。
  • MessageQueue

    • 定义:每个Topic下的消息存储队列,具备FIFO特性。
    • 作用:分散存储消息,提升系统的并发处理能力。
    • 分布:MessageQueue分布在不同的Broker实例上,实现负载均衡和高可用性。
  • CommitLog

    • 定义:RocketMQ中所有消息的物理存储文件,采用顺序写入方式。
    • 作用:确保消息的高性能写入和持久化存储。
    • 特点:支持顺序写入,提升写入性能;支持高效的读取和索引。
  • ConsumeQueue

    • 定义:每个MessageQueue对应的消费索引文件,记录消息在CommitLog中的存储位置。
    • 作用:支持消费者快速定位和读取消息,提高消费效率。
    • 特点:轻量级,占用较少存储空间;支持高效的消息查询。
  • Index

    • 定义:RocketMQ为支持快速查询建立的消息索引。
    • 作用:加快消息检索速度,支持按照消息属性(如Tag)快速过滤消息。
    • 特点:支持高效的索引查找,提升消息查询性能。
  • Offset(偏移量)

    • 定义:记录消息在MessageQueue中的存储位置。
    • 作用:用于定位和读取消息,确保消息按照顺序消费。
    • 管理:消费者组记录每个MessageQueue的消费Offset,保证消息不重复消费。

5.5 消息模型的优势

  • 高性能
    • 通过CommitLog的顺序写入和ConsumeQueue的高效索引,实现高吞吐量和低延迟。
  • 可扩展性
    • 通过MessageQueue的分布式存储和多Broker集群,支持高并发和大规模消息处理。
  • 灵活性
    • 支持多种消费模式(广播消费、集群消费),满足不同业务需求。
  • 可靠性
    • 通过主从架构或Dledger集群,实现消息的高可用性和数据的持久化存储。

5.6 RocketMQ与Kafka的对比

RocketMQ和Kafka都是分布式消息队列系统,虽然在设计理念上有相似之处,但在实现和功能上存在显著差异。

特性Apache KafkaApache RocketMQ
设计初衷日志收集与大数据采集场景广泛的消息传递与高可靠性场景
消息存储Partition文件,分布式存储CommitLog与ConsumeQueue,分布式存储
高可用性使用Kafka自身的集群管理机制,Kraft使用Dledger(基于Raft协议)或主从架构
Topic管理Topic过多时性能下降设计优化,支持大量Topic而性能不受影响
消息可靠性可能存在数据丢失风险高可靠性,特别适合金融场景
高级功能基础消息功能支持广播消费、延迟队列、死信队列、事务消息等
客户端语言支持多语言支持Java为主,支持多种客户端协议
性能吞吐量极高吞吐量高,性能稍低于Kafka但高于RabbitMQ
社区与生态社区活跃,生态成熟社区活跃,持续发展中

具体对比

  • Topic管理

    • Kafka:Topic数量过多会导致Partition文件过多,影响文件索引耗时,降低IO性能。
    • RocketMQ:优化了Topic的管理,支持大量Topic而不会显著影响整体性能,适合需要高Topic数量的场景。
  • Leader选举与高可用性

    • Kafka:使用自己的Kraft集群管理机制,优先保证服务可用性,但可能在一定程度上牺牲消息的安全性。
    • RocketMQ:采用Dledger集群(基于Raft协议)或主从架构,确保高可靠性和数据一致性,特别适合对消息安全性要求高的金融场景。

总结:RocketMQ在高可靠性、消息安全性和高级功能方面表现优异,适用于对消息传递要求高的企业级和金融级应用;而Kafka在日志收集和大数据采集等场景中表现出色,适合需要极高吞吐量和实时数据处理的业务需求。

建议:在学习RocketMQ时,可以结合Kafka的学习,进行横向对比,理解各自的优势和适用场景,从而更好地应用于实际业务中。

六、章节总结

6.1 本章节内容回顾

本章节通过RocketMQ的快速实战操作,深入理解了RocketMQ的运行架构和消息模型,掌握了从服务搭建、消息发送与消费到集群升级的全过程。具体内容包括:

  1. MQ简介

    • 了解MQ的基本概念、核心组成和主要作用。
    • 理解异步处理、解耦和削峰填谷在系统架构中的重要性。
  2. RocketMQ产品特点

    • 了解RocketMQ的背景和发展历程。
    • 对比RocketMQ与其他主流MQ产品(Kafka、RabbitMQ、Pulsar)的优势和适用场景。
  3. RocketMQ快速实战

    • 学会快速搭建RocketMQ服务,包括NameServer和Broker的安装与配置。
    • 通过命令行工具和Maven项目,实现消息的发送与接收。
    • 搭建RocketMQ的可视化管理控制台(Dashboard),实现集群的实时监控与管理。
    • 升级RocketMQ集群至分布式主从架构,提升系统的高可用性和容错能力。
    • 进一步升级至高可用的Dledger集群,基于Raft协议实现自动Leader选举和数据一致性。
  4. 总结RocketMQ的运行架构

    • 理解RocketMQ的核心组件(NameServer、Broker、Client、Dashboard)的作用和协同工作方式。
    • 通过示意图和具体流程,梳理消息的发送、存储和消费过程。
  5. 理解RocketMQ的消息模型

    • 深入理解RocketMQ的消息存储与消费机制,包括CommitLog、ConsumeQueue、Index等关键概念。
    • 对比RocketMQ与Kafka的消息模型,理解各自的优势和设计理念。
  6. 章节总结

    • 回顾本章节的学习内容,强调RocketMQ在高性能、高可靠性和高级功能方面的优势。
    • 提出后续学习建议,鼓励结合实际业务场景,深入研究RocketMQ的高级功能和优化策略。

6.2 学习建议

  1. 横向对比学习

    • 对比Kafka和RocketMQ:了解两者在设计理念、消息模型、性能优化等方面的异同,深化对分布式消息队列系统的理解。

    • 对比RabbitMQ和Pulsar:探索不同MQ产品在功能、性能和适用场景上的特点,选择最适合业务需求的消息队列系统。

  2. 深入研究高级功能

    • 事务消息:学习RocketMQ的事务消息机制,确保分布式系统中消息与业务操作的一致性。

    • 延迟队列:掌握消息延迟发送和消费的实现方式,应用于定时任务和延迟处理场景。

    • 死信队列:理解死信队列的概念和应用,处理消费失败的消息,提升系统的健壮性。

  3. 性能优化与监控

    • 性能调优:研究RocketMQ的性能调优策略,如内存配置、消息压缩、批量发送等,提高系统的吞吐量和响应速度。

    • 监控与告警:结合Dashboard和第三方监控工具,建立全面的监控体系,及时发现和解决集群运行中的问题。

  4. 实战项目应用

    • 业务场景应用:将RocketMQ应用于实际业务场景中,如订单处理、日志收集、实时数据传输等,验证其性能和可靠性。

    • 集群部署与维护:掌握RocketMQ集群的部署、扩展和维护技巧,确保系统的高可用性和稳定性。

  5. 社区与资源

    • 参与开源社区:关注RocketMQ的GitHub仓库,参与社区讨论和贡献,获取最新的技术动态和最佳实践。

    • 学习资源:结合图灵课程、官方文档和在线教程,系统化学习RocketMQ的理论知识和实战经验。

总结:通过本章节的学习,已对RocketMQ的基础概念、运行架构和消息模型有了全面的认识。接下来的学习将进一步探索RocketMQ的高级功能和优化策略,结合实际业务场景,全面掌握分布式消息队列系统的设计与应用。


http://www.kler.cn/a/507147.html

相关文章:

  • 时序数据库TDengine 3.3.5.0 发布:高并发支持与增量备份功能引领新升级
  • Linux安装Docker教程(详解)
  • RK3568平台(音频篇)lineout无声调试
  • 如何在前端给视频进行去除绿幕并替换背景?-----Vue3!!
  • 【C++】B2112 石头剪子布
  • 【RDMA学习笔记】1:RDMA(Remote Direct Memory Access)介绍
  • 从Arrays源码学习定义工具类
  • sqlalchemy The transaction is active - has not been committed or rolled back.
  • leetcode hot100(2)
  • 【CSS】:nth-child和:nth-of-type
  • 【Elasticsearch】全文搜索与相关性排序
  • SCSSA-BiLSTM基于改进麻雀搜索算法优化双向长短期记忆网络多特征分类预测Matlab实现
  • UI自动化测试框架之PO模式+数据驱动
  • 如何选择合适的服务器?服务器租赁市场趋势分析
  • 【遥感目标检测】【数据集】DOTA:用于航空图像中目标检测的大规模数据集
  • 【深度学习】Pytorch:CUDA 模型训练
  • .net core 中使用AsyncLocal传递变量
  • 【实践功能记录9】使用pnpm打补丁
  • VD:生成a2l文件
  • Lora理解QLoRA
  • iOS - Objective-C 底层中的内存屏障
  • 服务器下发任务镭速利用变量实现高效的大文件传输效率
  • Python人工智能在气象中的应用,包括:天气预测、气候模拟、降雨量和降水预测、气象数据分析、气象预警系统
  • 【Element】一键重置表单resetFields
  • 【开源分享】nlohmann C++ JSON解析库
  • 学习 Git 的工作原理,而不仅仅是命令