RocketMQ 学习笔记01
一、MQ简介
1. 什么是MQ?
MQ(Message Queue,消息队列) 是一种在分布式系统中用于实现进程间通信和数据传输的中间件。它通过在不同进程或应用程序之间传递消息,实现数据的异步处理、解耦和削峰填谷等功能。MQ广泛应用于互联网、高并发场景下的数据传输和处理。
2. MQ的核心概念
-
Message(消息):
- 定义:在不同进程或应用程序之间传递的数据单元。
- 特性:可以包含任意格式的数据,如文本、二进制等。
- 传输方式:消息可以在同一台机器上的不同进程间传递,也可以在不同机器上的进程间传递。
-
Queue(队列):
- 定义:一种具有先进先出(FIFO)特性的缓存数据结构,用于存储和管理消息。
- 功能:消息队列负责暂存消息,确保消息按照一定的顺序被处理。
- 特性:除了FIFO,部分MQ实现还支持优先级队列、延迟队列等。
3. MQ的作用
MQ在系统架构中扮演着重要角色,主要体现在以下几个方面:
-
异步处理:
- 概念:生产者发送消息后,不需要等待消费者处理完成即可继续其他操作。
- 优势:提高系统的响应速度和吞吐量,避免因单个服务的阻塞导致整个系统性能下降。
- 例子:快递员通过菜鸟驿站实现快递的异步配送,快递员只需将包裹放入驿站即可继续派送其他包裹,客户可根据自己的时间安排取件。
-
解耦:
- 概念:通过消息队列,生产者和消费者之间不直接交互,减少了彼此的依赖。
- 优势:
- 降低耦合度:服务之间独立运行,互不影响,提高系统整体的稳定性和可维护性。
- 灵活扩展:可以根据需求灵活增加或减少消费者数量,而不影响生产者的运行。
- 例子:图书《Thinking in JAVA》的翻译过程,通过编辑团队将英文内容翻译成其他语言,实现不同语言版本的独立发布和管理。
-
削峰填谷:
- 概念:通过消息队列缓冲瞬时的高并发请求,平滑系统负载,避免系统资源被瞬时大量请求耗尽。
- 优势:提高系统的稳定性,防止因瞬时流量激增导致系统崩溃。
- 例子:长江每年洪水期通过三峡大坝进行水量调控,将高峰期的洪水储存起来,平稳地释放到下游,避免下游地区被洪水淹没。
二、RocketMQ产品特点
1. RocketMQ介绍
RocketMQ 是由阿里巴巴开源的一款高性能、高可靠的分布式消息中间件。经过阿里内部多次大规模业务场景(如双十一)的验证,RocketMQ能够处理亿万级别的消息。2016年,RocketMQ开源并捐赠给Apache基金会,现已成为Apache的顶级项目。
发展历程:
-
早期阶段:
- 阿里巴巴最初使用 ActiveMQ 作为消息中间件,但随着业务量的增长,ActiveMQ的IO性能迅速达到瓶颈。
-
探索Kafka:
- 阿里巴巴开始关注 Apache Kafka,但发现Kafka主要针对日志收集场景,且在多Topic情况下,Partition文件过多导致文件索引耗时增加,严重影响IO性能,不完全适合阿里的业务需求。
-
自研消息中间件:
- 为了解决上述问题,阿里巴巴决定自研消息中间件,最初命名为 MetaQ,后来更名为 RocketMQ。RocketMQ最初的目标是解决多Topic下的IO性能压力,经过阿里内部不断改进,RocketMQ逐渐展现出独特的优势。
2. RocketMQ特点
RocketMQ在众多MQ产品中脱颖而出,具有以下显著特点:
优点 | 缺点 | 适合场景 |
---|---|---|
高吞吐量、高性能 | 服务加载较慢 | 几乎全场景,特别适合金融场景 |
高可用性 | ||
功能全面(支持广播消费、延迟队列、死信队列、事务消息等) | ||
客户端协议丰富 | ||
Java语言开发,便于定制 |
与其他主流MQ产品对比:
-
Apache Kafka:
- 优点:吞吐量极高,性能卓越,集群高可用。
- 缺点:可能存在数据丢失风险,功能相对单一。
- 适用场景:日志分析、大数据采集等。
-
RabbitMQ:
- 优点:消息可靠性高,功能丰富。
- 缺点:基于Erlang语言开发,定制困难;吞吐量相对较低。
- 适用场景:企业内部小规模服务调用。
-
Apache Pulsar:
- 优点:基于Bookkeeper构建,消息可靠性高。
- 缺点:周边生态尚不完善,使用企业较少。
- 适用场景:企业内部大规模服务调用。
-
Apache RocketMQ:
- 优点:高吞吐、高性能、高可用;功能全面;客户端协议丰富;Java语言开发,便于定制。
- 缺点:服务加载较慢。
- 适用场景:几乎全场景,尤其适合金融场景。
RocketMQ的核心优势:
- 高吞吐量与高性能:
- RocketMQ在处理亿万级别消息时,表现出色,吞吐量虽略低于Kafka,但远超RabbitMQ。
- 高可靠性:
- 消息可靠性较Kafka有显著提升,确保消息不丢失,特别适用于金融等高可靠性需求场景。
- 丰富的高级功能:
- 支持广播消费、延迟队列、死信队列、事务消息等,满足复杂业务需求。
- 适用于高并发和高可靠性场景:
- 经过阿里巴巴内部大规模双十一等高并发场景的严格测试,RocketMQ适用于需要高可靠性和高可用性的业务场景,尤其是金融行业。
三、RocketMQ快速实战
通过以下步骤,可以快速搭建RocketMQ服务并实现消息的发送与接收。
1. 快速搭建RocketMQ服务
1.1 RocketMQ的下载与安装
- 官网下载地址:RocketMQ官网
- 下载页面:下载链接
版本选择:
- 最新版本:5.x版本,专注于云原生,具备众多新特性,但企业中应用较少。
- 稳定版本:4.9.5版本,更为稳定,适合企业环境。
注意:2020年下半年,RocketMQ发布了5.0大版本,带来了重大功能升级,如定时消息发送时间精确化、支持Grpc协议、多种集群模式等。但由于功能强大,问题也较多,企业多采用内部优化版或继续使用稳定的4.9.5版本。
1.2 解压并上传到服务器
-
下载运行包:选择适合的版本(如4.9.5)下载后解压。
-
上传到服务器:将解压后的文件夹上传到服务器上的
/app/rocketmq
目录。tar -zxvf rocketmq-all-4.9.5-bin-release.tar.gz -C /app/rocketmq/
1.3 运行环境配置
内存配置:
- 生产环境:建议至少12G内存,确保RocketMQ运行稳定。
- 学习阶段:如果服务器内存不足,可调整Java进程的内存大小。
修改启动脚本:
-
编辑
runserver.sh
:vi /app/rocketmq/rocketmq-all-4.9.5-bin-release/bin/runserver.sh
找到并修改内存配置:
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
-
编辑
runbroker.sh
:vi /app/rocketmq/rocketmq-all-4.9.5-bin-release/bin/runbroker.sh
修改内存配置:
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g"
JDK安装:
-
RocketMQ基于Java开发,需提前安装JDK(建议1.8版本)。
-
安装示例:
yum install java-1.8.0-openjdk -y
1.4 启动RocketMQ服务
NameServer与Broker的作用:
- NameServer:类似于DNS,负责管理Broker的地址信息,提供路由服务。
- Broker:核心服务,负责消息的存储、转发和消费。
启动步骤:
-
启动NameServer:
cd /app/rocketmq/rocketmq-all-4.9.5-bin-release nohup bin/mqnamesrv &
-
验证启动:检查
nohup.out
日志文件,确认出现以下关键日志:The Name Server boot success. serializeType=JSON
-
使用
jps
检查进程:jps
应显示
NamesrvStartup
进程。
-
-
启动Broker:
-
配置
broker.conf
:在
conf
目录下编辑broker.conf
,添加:autoCreateTopicEnable=true
注意:
autoCreateTopicEnable=true
仅适用于测试环境,生产环境建议关闭,避免客户端随意创建Topic,增加管理压力。 -
启动Broker:
nohup bin/mqbroker &
-
验证启动:检查
nohup.out
日志文件,确认出现以下关键日志:The broker[xxxxx] boot success. serializeType=JSON
-
使用
jps
检查进程:jps
应显示
BrokerStartup
进程。
-
环境变量配置:
-
为方便使用RocketMQ指令,可将RocketMQ的安装路径添加至环境变量。
vi ~/.bash_profile
添加以下内容:
export ROCKETMQ_HOME=/app/rocketmq/rocketmq-all-4.9.5-bin-release export PATH=$ROCKETMQ_HOME/bin:$PATH
使配置生效:
source ~/.bash_profile
停止服务:
-
使用
mqshutdown
指令停止服务:mqshutdown namesrv # 关闭NameServer mqshutdown broker # 关闭Broker
2. 快速实现消息收发
RocketMQ提供了命令行工具和编程接口,方便快速实现消息的发送与接收。
2.1 命令行实现消息收发
步骤:
-
配置环境变量:
-
设置
NAMESRV_ADDR
环境变量,指向启动的NameServer地址。vi ~/.bash_profile
添加:
export NAMESRV_ADDR='localhost:9876'
使配置生效:
source ~/.bash_profile
-
-
发送消息:
-
使用RocketMQ提供的Producer示例发送消息:
tools.sh org.apache.rocketmq.example.quickstart.Producer
-
日志示例:
SendResult [sendStatus=SEND_OK, msgId=C0A8E88007AC3764951D891CE9A003E7, offsetMsgId=C0A8E88000002A9F00000000000317BF, messageQueue=MessageQueue [topic=TopicTest, brokerName=worker1, queueId=1], queueOffset=249]
- 解释:
sendStatus=SEND_OK
:消息发送成功。msgId
:消息ID,唯一标识一条消息。offsetMsgId
:消息在CommitLog中的偏移量ID。messageQueue
:消息存储的队列信息,包括Topic、Broker名称、Queue ID。queueOffset
:消息在队列中的偏移量,标识存储位置。
- 解释:
-
-
接收消息:
-
使用RocketMQ提供的Consumer示例接收消息:
tools.sh org.apache.rocketmq.example.quickstart.Consumer
-
日志示例:
ConsumeMessageThread_19 Receive New Messages: [MessageExt [brokerName=worker1, queueId=2, storeSize=203, queueOffset=53, sysFlag=0, bornTimestamp=1606460371999, bornHost=/192.168.232.128:43436, storeTimestamp=1606460372000, storeHost=/192.168.232.128:10911, msgId=C0A8E88000002A9F000000000000A7AE, commitLogOffset=42926, bodyCRC=1968636794, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1606460450150, UNIQ_KEY=C0A8E88007AC3764951D891CE41F00D4, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50, 49, 50], transactionId='null'}]]
- 解释:
brokerName
:消息存储的Broker名称。queueId
:消息所在的Queue ID。queueOffset
:消息在队列中的偏移量。msgId
:消息ID。body
:消息内容,以字节数组形式展示。
- 解释:
-
注意:Consumer示例不会主动结束,会持续等待新消息,可以使用
CTRL+C
手动停止。
-
总结:通过命令行工具,可以快速验证RocketMQ的消息发送与接收功能,了解消息的存储和消费过程。
2.2 搭建Maven客户端项目
为更深入理解RocketMQ的消息收发过程,可通过编写Maven项目,集成RocketMQ的客户端API,实现消息的发送与接收。
步骤:
-
创建Maven项目:
- 使用IDE(如IntelliJ IDEA)或命令行创建一个标准的Maven项目。
-
添加RocketMQ依赖:
-
在
pom.xml
中添加RocketMQ客户端依赖:<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.5</version> </dependency>
-
-
实现消息生产者:
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { // 初始化消息生产者,指定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // 设置NameServer地址 producer.setNamesrvAddr("192.168.232.128:9876"); // 启动生产者 producer.start(); for (int i = 0; i < 2; i++) { try { // 创建消息,包含Topic、Tag和消息体 Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送消息并获取发送结果 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } // 关闭生产者 producer.shutdown(); } }
说明:
- DefaultMQProducer:RocketMQ提供的默认消息生产者实现。
- 组名:生产者组名需唯一,用于标识生产者实例。
- Topic:消息所属的主题,需与消费者订阅的Topic一致。
- Tag:消息的标签,用于消息过滤。
- 消息体:实际传输的数据内容,需指定字符集编码。
-
实现消息消费者:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.io.UnsupportedEncodingException; import java.util.List; public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { // 初始化消息消费者,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); // 设置NameServer地址 consumer.setNamesrvAddr("192.168.232.128:9876"); // 设置消费位置:从最后一个偏移量开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 订阅Topic和Tag,*表示订阅所有Tag consumer.subscribe("TopicTest", "*"); // 注册消息监听器,处理接收到的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { msgs.forEach(messageExt -> { try { // 打印消息内容 System.out.println("收到消息:" + new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) {} }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者 consumer.start(); System.out.print("Consumer Started"); } }
说明:
- DefaultMQPushConsumer:RocketMQ提供的默认消息消费者实现。
- 组名:消费者组名需唯一,用于标识消费者实例。
- ConsumeFromWhere:设置消费起始位置,如从上次偏移量继续消费、从头开始消费等。
- 订阅:指定要订阅的Topic和Tag,Tag用于消息过滤。
- 消息监听器:注册消息处理逻辑,接收到消息后触发回调函数进行处理。
-
运行与验证:
- 运行生产者:执行Producer的
main
方法,发送消息到RocketMQ。 - 运行消费者:执行Consumer的
main
方法,接收并打印消息内容。 - 验证结果:
- 控制台输出:消费者应打印出生产者发送的消息内容,如
收到消息:Hello RocketMQ 0
、收到消息:Hello RocketMQ 1
。 - Dashboard查看:可通过RocketMQ Dashboard查看消息的分布和消费情况。
- 控制台输出:消费者应打印出生产者发送的消息内容,如
- 运行生产者:执行Producer的
总结:通过命令行工具和Maven项目,可以快速验证RocketMQ的消息发送与接收功能,理解消息的存储和消费机制,为后续深入学习奠定基础。
3. 搭建RocketMQ可视化管理服务
RocketMQ提供了图形化的管理控制台——Dashboard,用于实时监控和管理RocketMQ集群的运行状态。
3.1 Dashboard的下载与安装
-
下载Dashboard源码:
- 访问RocketMQ官网下载页面,获取Dashboard的源码包。
- 注意:RocketMQ官方未提供直接可运行的Dashboard Jar包,需自行编译源码。
-
编译Dashboard源码:
-
前提条件:需要在本地安装Maven构建工具。
-
编译步骤:
git clone https://github.com/apache/rocketmq.git cd rocketmq cd rocketmq-dashboard mvn clean package -Dmaven.test.skip=true
-
编译结果:在
target
目录下生成可运行的Jar包,如rocketmq-dashboard-1.0.1-SNAPSHOT.jar
。
-
-
上传Jar包至服务器:
-
将编译生成的Jar包上传至服务器上的
/app/rocketmq/rocketmq-dashboard
目录。 -
示例命令:
scp target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar user@server:/app/rocketmq/rocketmq-dashboard/
-
3.2 配置Dashboard
-
创建配置文件:
-
在Jar包所在目录下创建
application.yml
文件,配置RocketMQ的NameServer地址。 -
示例配置:
rocketmq: config: namesrvAddrs: - 192.168.232.128:9876
-
说明:
namesrvAddrs
:指定RocketMQ集群中所有NameServer的地址,多个地址用-
列表方式表示。
-
-
启动Dashboard服务:
-
使用Java指令运行Dashboard Jar包:
cd /app/rocketmq/rocketmq-dashboard java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar
-
验证启动:访问
http://192.168.232.128:8080
,应看到RocketMQ Dashboard的管理页面。
-
3.3 Dashboard的功能介绍
- 驾驶舱(Dashboard):
- 功能:展示RocketMQ集群近期的运行情况,包括消息吞吐量、延迟、Broker状态等。
- 运维页面(Operation):
- 功能:管理NameServer服务,查看NameServer的健康状态和运行指标。
- 集群页面(Cluster):
- 功能:管理RocketMQ的Broker服务,查看各Broker的状态、负载情况、消息分布等。
示例界面:
(请替换为实际图片链接)
总结:通过搭建RocketMQ Dashboard,可以直观地监控和管理RocketMQ集群的运行状态,实时了解消息的流动和Broker的健康情况,极大地方便了运维和管理工作。
4. 升级分布式集群
单台服务器搭建的RocketMQ服务无法满足生产环境的高可用性和稳定性需求,因此需要将其升级为分布式集群,避免单点故障,提高系统的容错能力。
4.1 分布式集群的架构
主从架构:
-
架构图:
(请替换为实际图片链接)
-
特点:
- Master节点:负责处理客户端的请求,存储和转发消息。
- Slave节点:备份Master节点的数据,防止数据丢失。
- 容错机制:当Master节点出现故障时,Slave节点可以接管,确保消息不丢失。
集群方案:
-
集群规划:
机器名 NameServer服务部署 Broker服务部署 worker1 NameServer worker2 NameServer broker-a (Master), broker-b-s (Slave) worker3 NameServer broker-a-s (Slave), broker-b (Master) -
2主2从配置:
- broker-a:在worker2上部署Master,在worker3上部署Slave。
- broker-b:在worker3上部署Master,在worker2上部署Slave。
4.2 部署步骤
步骤概览:
-
准备服务器:准备三台相同配置的Linux服务器,配置
/etc/hosts
文件,方便通过主机名访问。cat /etc/hosts 192.168.232.128 worker1 192.168.232.129 worker2 192.168.232.130 worker3
-
部署NameServer服务:
-
在三台服务器上分别启动NameServer服务,按照之前的步骤启动即可。
-
启动命令:
cd /app/rocketmq/rocketmq-all-4.9.5-bin-release nohup bin/mqnamesrv &
-
-
配置Broker集群:
-
使用RocketMQ提供的集群配置模板,如
2m-2s-async
,进行集群配置。 -
配置文件说明:
- brokerClusterName:集群名称,相同的名称表示同一个集群。
- brokerName:Broker名称,相同名称表示一组主从节点。
- brokerId:Broker的唯一标识,0表示Master,>0表示Slave。
- namesrvAddr:NameServer地址列表,用分号分隔。
- autoCreateTopicEnable:是否允许Broker自动创建Topic,建议生产环境关闭。
- storePath:消息存储路径,不同Broker需指定不同路径,避免冲突。
- brokerRole:Broker角色,ASYNC_MASTER表示异步Master,SLAVE表示Slave。
- listenPort:Broker监听端口,确保同一台机器上不同Broker使用不同端口。
-
配置broker-a Master(worker2):
编辑
conf/2m-2s-async/broker-a.properties
:brokerClusterName=rocketmq-cluster brokerName=broker-a brokerId=0 namesrvAddr=worker1:9876;worker2:9876;worker3:9876 autoCreateTopicEnable=true deleteWhen=04 fileReservedTime=120 storePathRootDir=/app/rocketmq/store storePathCommitLog=/app/rocketmq/store/commitlog storePathConsumeQueue=/app/rocketmq/store/consumequeue storePathIndex=/app/rocketmq/store/index storeCheckpoint=/app/rocketmq/store/checkpoint abortFile=/app/rocketmq/store/abort brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH listenPort=10911
-
配置broker-a Slave(worker3):
编辑
conf/2m-2s-async/broker-a-s.properties
:brokerClusterName=rocketmq-cluster brokerName=broker-a brokerId=1 namesrvAddr=worker1:9876;worker2:9876;worker3:9876 autoCreateTopicEnable=true deleteWhen=04 fileReservedTime=120 storePathRootDir=/app/rocketmq/storeSlave storePathCommitLog=/app/rocketmq/storeSlave/commitlog storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue storePathIndex=/app/rocketmq/storeSlave/index storeCheckpoint=/app/rocketmq/storeSlave/checkpoint abortFile=/app/rocketmq/storeSlave/abort brokerRole=SLAVE flushDiskType=ASYNC_FLUSH listenPort=11011
-
配置broker-b Master(worker3):
编辑
conf/2m-2s-async/broker-b.properties
:brokerClusterName=rocketmq-cluster brokerName=broker-b brokerId=0 namesrvAddr=worker1:9876;worker2:9876;worker3:9876 autoCreateTopicEnable=true deleteWhen=04 fileReservedTime=120 storePathRootDir=/app/rocketmq/store storePathCommitLog=/app/rocketmq/store/commitlog storePathConsumeQueue=/app/rocketmq/store/consumequeue storePathIndex=/app/rocketmq/store/index storeCheckpoint=/app/rocketmq/store/checkpoint abortFile=/app/rocketmq/store/abort brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH listenPort=10911
-
配置broker-b Slave(worker2):
编辑
conf/2m-2s-async/broker-b-s.properties
:brokerClusterName=rocketmq-cluster brokerName=broker-b brokerId=1 namesrvAddr=worker1:9876;worker2:9876;worker3:9876 autoCreateTopicEnable=true deleteWhen=04 fileReservedTime=120 storePathRootDir=/app/rocketmq/storeSlave storePathCommitLog=/app/rocketmq/storeSlave/commitlog storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue storePathIndex=/app/rocketmq/storeSlave/index storeCheckpoint=/app/rocketmq/storeSlave/checkpoint abortFile=/app/rocketmq/storeSlave/abort brokerRole=SLAVE flushDiskType=ASYNC_FLUSH listenPort=11011
-
关键配置项说明:
- brokerClusterName:集群名称,相同名称的Broker在同一集群内。
- brokerName:Broker名称,相同名称的Broker构成一组主从节点。
- brokerId:Broker的唯一标识,0表示Master,1表示Slave。
- namesrvAddr:NameServer地址列表,多个地址用分号分隔。
- autoCreateTopicEnable:是否允许Broker自动创建Topic,生产环境建议关闭。
- storePath:消息存储路径,不同Broker需指定不同路径,避免数据冲突。
- brokerRole:Broker角色,ASYNC_MASTER表示异步Master,SLAVE表示Slave。
- listenPort:Broker监听端口,不同Broker需指定不同端口。
注意事项:
- 存储路径:在同一台服务器上部署多个Broker时,需确保不同Broker的存储路径不冲突。
- 监听端口:不同Broker需指定不同的监听端口,避免端口冲突。
- 网络配置:如果服务器配置了多网卡(如内网与外网),需在配置文件中指定
brokerIP1
属性,指向外网IP地址,确保其他服务器能访问到RocketMQ服务。
4.3 启动Broker服务
启动命令:
-
指定配置文件启动Broker:
使用
-c
参数指定Broker的配置文件。 -
在worker2上启动broker-a Master和broker-b Slave:
cd /app/rocketmq/rocketmq-all-4.9.5-bin-release nohup bin/mqbroker -c ./conf/2m-2s-async/broker-a.properties & nohup bin/mqbroker -c ./conf/2m-2s-async/broker-b-s.properties &
-
在worker3上启动broker-b Master和broker-a Slave:
cd /app/rocketmq/rocketmq-all-4.9.5-bin-release nohup bin/mqbroker -c ./conf/2m-2s-async/broker-b.properties & nohup bin/mqbroker -c ./conf/2m-2s-async/broker-a-s.properties &
验证启动:
-
使用
jps
检查Broker进程是否正常运行。 -
检查
nohup.out
日志文件,确认出现以下关键日志:The broker[xxxxx] boot success. serializeType=JSON
4.4 检查集群服务状态
使用 mqadmin
指令:
-
查看集群列表:
cd /app/rocketmq/rocketmq-all-4.9.5-bin-release/bin mqadmin clusterList
-
示例输出:
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE rocketmq-cluster broker-a 0 192.168.232.129:10911 V4_9_1 0.00(0,0ms) 0.00(0,0ms) 0 3425.28 0.3594 rocketmq-cluster broker-a 1 192.168.232.130:11011 V4_9_1 0.00(0,0ms) 0.00(0,0ms) 0 3425.28 0.3607 rocketmq-cluster broker-b 0 192.168.232.130:10911 V4_9_1 0.00(0,0ms) 0.00(0,0ms) 0 3425.27 0.3607 rocketmq-cluster broker-b 1 192.168.232.129:11011 V4_9_1 0.00(0,0ms) 0.00(0,0ms) 0 3425.27 0.3594
- 字段说明:
- Cluster Name:集群名称。
- Broker Name:Broker名称。
- BID:Broker ID,0表示Master,1表示Slave。
- Addr:Broker地址(IP:Port)。
- Version:RocketMQ版本。
- InTPS:消息输入吞吐量(TPS)。
- OutTPS:消息输出吞吐量(TPS)。
- PCWait(ms):消息处理等待时间(毫秒)。
- Hour:小时数。
- SPACE:磁盘空间使用情况。
- 字段说明:
使用Dashboard查看:
-
配置Dashboard:
编辑
application.yml
,添加所有NameServer地址:rocketmq: config: namesrvAddrs: - worker1:9876 - worker2:9876 - worker3:9876
-
启动Dashboard:
cd /app/rocketmq/rocketmq-dashboard java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar
-
查看集群状态:
登录Dashboard,进入“集群”菜单页,即可查看集群的运行情况,包括Broker状态、消息吞吐量、磁盘使用等。
总结:通过主从架构的分布式集群搭建,RocketMQ能够有效防止单点故障,提高系统的容错能力和数据的可靠性,适应生产环境的高可用需求。
5. 升级高可用集群
虽然主从架构的RocketMQ集群可以防止消息丢失,但在服务高可用性方面仍有不足。为了进一步提升集群的高可用性,RocketMQ引入了Dledger集群,基于Raft协议实现自动主从切换,解决主从架构下的服务不可用问题。
5.1 主从架构的不足
-
服务高可用性问题:
- 现象:当Master节点宕机,虽然Slave节点备份了数据,但Slave节点无法自动升级为Master,导致集群部分功能不可用。
- 影响:客户端请求需要转发到其他Broker,原Master上的消息无法读取,系统部分功能中断。
-
数据一致性问题:
- 主从架构下,Slave节点数据备份可能存在延迟,不能保证数据的实时一致性。
5.2 Dledger集群的引入
Dledger集群 是RocketMQ自4.5版本引入的高可用集群解决方案,基于Raft协议实现Leader选举和数据一致性。
特点:
-
自动Leader选举:
- 集群中的Broker节点通过Raft协议选举出Leader(类似Master),其他节点为Follower(类似Slave)。
- 当Leader宕机时,自动重新选举新的Leader,确保集群持续可用。
-
强一致性:
- 使用Raft协议保证CommitLog日志文件在集群中的强一致性,确保所有节点的数据同步一致。
-
高容错性:
- 只要超过半数的节点正常运行,集群就能正常工作。
- 适合部署奇数台服务器,提升集群的容错能力。
架构图:
(请替换为实际图片链接)
5.3 搭建Dledger集群
集群结构:
- 节点规划:3个Broker节点,确保集群有超过半数(2个节点)正常运行即可。
- 集群特点:自动Leader选举,Follower可以随时升级为Leader,解决主从架构下的高可用性问题。
部署步骤:
-
部署NameServer:
-
与主从集群相同,在三台服务器上分别启动NameServer服务。
-
启动命令:
cd /app/rocketmq/rocketmq-all-4.9.5-bin-release nohup bin/mqnamesrv &
-
-
配置Dledger Broker:
-
使用RocketMQ提供的Dledger配置模板,编辑
broker.conf
文件。 -
关键配置项:
- enableDLegerCommitLog:开启Dledger功能,设置为
true
。 - dLegerGroup:Raft集群组名,建议与
brokerName
保持一致。 - dLegerPeers:集群中所有节点的地址及端口,格式为
n0-worker1:40911;n1-worker2:40911;n2-worker3:40911
。 - dLegerSelfId:当前节点在集群中的唯一ID,需与
dLegerPeers
中的ID对应。 - storePath:Dledger的消息存储路径,不同节点需指定不同路径。
- listenPort:Dledger Broker监听端口,确保不同Broker使用不同端口。
- enableDLegerCommitLog:开启Dledger功能,设置为
-
配置示例:
-
worker1的
broker.conf
:brokerClusterName=RaftCluster brokerName=RaftNode00 listenPort=30911 namesrvAddr=worker1:9876;worker2:9876;worker3:9876 storePathRootDir=/app/rocketmq/storeDledger/ storePathCommitLog=/app/rocketmq/storeDledger/commitlog storePathConsumeQueue=/app/rocketmq/storeDledger/consumequeue storePathIndex=/app/rocketmq/storeDledger/index storeCheckpoint=/app/rocketmq/storeDledger/checkpoint abortFile=/app/rocketmq/storeDledger/abort enableDLegerCommitLog=true dLegerGroup=RaftNode00 dLegerPeers=n0-worker1:40911;n1-worker2:40911;n2-worker3:40911 dLegerSelfId=n0 sendMessageThreadPoolNums=16
-
worker2的
broker.conf
:brokerClusterName=RaftCluster brokerName=RaftNode00 listenPort=30911 namesrvAddr=worker1:9876;worker2:9876;worker3:9876 storePathRootDir=/app/rocketmq/storeDledger/ storePathCommitLog=/app/rocketmq/storeDledger/commitlog storePathConsumeQueue=/app/rocketmq/storeDledger/consumequeue storePathIndex=/app/rocketmq/storeDledger/index storeCheckpoint=/app/rocketmq/storeDledger/checkpoint abortFile=/app/rocketmq/storeDledger/abort enableDLegerCommitLog=true dLegerGroup=RaftNode00 dLegerPeers=n0-worker1:40911;n1-worker2:40911;n2-worker3:40911 dLegerSelfId=n1 sendMessageThreadPoolNums=16
-
worker3的
broker.conf
:brokerClusterName=RaftCluster brokerName=RaftNode00 listenPort=30911 namesrvAddr=worker1:9876;worker2:9876;worker3:9876 storePathRootDir=/app/rocketmq/storeDledger/ storePathCommitLog=/app/rocketmq/storeDledger/commitlog storePathConsumeQueue=/app/rocketmq/storeDledger/consumequeue storePathIndex=/app/rocketmq/storeDledger/index storeCheckpoint=/app/rocketmq/storeDledger/checkpoint abortFile=/app/rocketmq/storeDledger/abort enableDLegerCommitLog=true dLegerGroup=RaftNode00 dLegerPeers=n0-worker1:40911;n1-worker2:40911;n2-worker3:40911 dLegerSelfId=n2 sendMessageThreadPoolNums=16
-
-
说明:
- dLegerGroup:Raft集群组名,建议与
brokerName
保持一致,便于管理。 - dLegerPeers:定义集群中所有节点的地址和端口,确保所有节点的配置一致。
- dLegerSelfId:当前节点的唯一ID,必须与
dLegerPeers
中的ID对应,确保集群唯一性。 - sendMessageThreadPoolNums:Dledger内部发送消息的线程数,建议与服务器的CPU核心数相匹配,提升性能。
- dLegerGroup:Raft集群组名,建议与
-
-
启动Broker服务:
-
在三台服务器上分别启动配置好的Broker服务:
cd /app/rocketmq/rocketmq-all-4.9.5-bin-release/ nohup bin/mqbroker -c conf/dledger/broker.conf &
-
-
检查集群服务状态:
-
使用Dashboard查看:
登录RocketMQ Dashboard,进入“集群”菜单页,查看Dledger集群的运行状态,包括Leader选举、节点状态、消息同步等。
-
使用
mqadmin
指令查看:cd /app/rocketmq/rocketmq-all-4.9.5-bin-release/bin mqadmin clusterList
-
验证Leader选举:
- 在Dledger集群中,自动选举出一个Leader节点(类似Master),其他节点为Follower。
- 测试高可用性:停止Leader节点,观察集群自动选举出新的Leader,确保集群持续可用。
-
总结:通过Dledger集群的搭建,RocketMQ实现了更高的可用性和数据一致性,自动Leader选举和数据同步机制有效解决了主从架构下的高可用性问题,适用于对高可靠性和高可用性要求极高的业务场景。
5.4 关于Dledger集群的一些补充
DLedger的功能:
- 数据复制:
- 通过Raft协议,集群中每个节点都维护一份相同的数据副本,确保数据一致性。
- 容错性:
- 集群具有高容错性,只要超过半数的节点正常运行,集群即可正常工作。
- 高可用性:
- 通过Leader选举和负载均衡机制,确保在节点故障时能够快速切换,维持集群的持续可用。
- 分布式锁:
- 提供分布式锁功能,解决分布式系统中的资源竞争和同步问题,实现跨节点的资源同步。
- 强一致性:
- 使用Raft一致性协议,确保集群中所有副本节点的数据一致性,避免数据冲突和不一致。
Raft协议与脑裂问题:
-
Raft协议:
- 选举机制:通过选举Leader节点,确保集群中只有一个Leader,避免多个Leader同时存在。
- 任期管理:每个选举周期设置递增的任期编号,防止旧Leader重新成为Leader。
- 心跳机制:Leader定期发送心跳消息,保持与Follower的连接,及时发现Leader故障。
- 日志复制:Leader负责将日志条目复制到Follower,确保数据的一致性。
-
脑裂问题:
- 定义:由于网络分区等原因,集群被分割为多个子集群,各自独立运行,可能导致多个Leader共存。
- 解决方案:Raft协议通过多数决机制,确保只有一个Leader存在,防止脑裂问题的发生。
ChatGPT关于Dledger功能的描述:
RocketMQ是一款分布式消息队列系统,主要用于处理大量数据的实时传输和处理。在RocketMQ中,DLedger是一个为高可用、高性能、高可靠的分布式存储系统提供基础支持的组件。DLedger集群主要具有以下功能:
- 数据复制:通过Raft协议保证数据一致性,集群中每个节点都维护相同的数据副本。
- 容错性:即使部分节点故障,只要集群中有大多数节点正常工作,集群仍能提供服务。
- 高可用性:通过负载均衡和热备份机制,确保在节点故障时快速切换到其他节点,提高系统可用性。
- 分布式锁:提供分布式锁功能,解决分布式系统中的资源争用,实现跨节点资源同步。
- 强一致性:通过Raft协议确保集群中副本节点之间的数据同步,保证数据一致性。
- 高性能:支持水平扩展,通过增加节点提升系统的吞吐量和存储能力,满足业务需求。
- 易于管理:提供管理和监控功能,便于运维人员掌握系统运行状况,及时发现和解决问题。
总结:Dledger集群通过Raft协议实现自动Leader选举和数据一致性,解决了主从架构下的高可用性问题,提高了RocketMQ集群的容错能力和数据可靠性,为高并发、高可靠性的业务场景提供了坚实的基础。
6. 总结RocketMQ的运行架构
通过前面的实战操作,我们对RocketMQ的运行架构有了更深入的理解。以下是RocketMQ运行时的整体架构及各组件的详细作用。
6.1 RocketMQ的整体架构
+------------+ +-----------+ +----------+
| Producer | --> | Broker | <-- | Consumer |
+------------+ +-----------+ +----------+
| |
+-------+ +-------+
| NameServer | Dashboard |
+-------+ +-------+
6.2 组件详细介绍
-
NameServer(命名服务):
-
作用:NameServer负责管理RocketMQ集群中所有Broker的地址信息,提供路由服务。类似于DNS的角色,客户端和Broker通过NameServer进行地址发现和路由查询。
-
特点:
- 无状态:NameServer不存储任何状态信息,支持水平扩展。
- 轻量级:启动和运行资源消耗较低。
- 高可用性:通过部署多个NameServer实例,确保服务的高可用性。
-
-
Broker(核心服务):
-
作用:Broker是RocketMQ的核心组件,负责消息的存储、转发、查询和管理。所有消息的生产者和消费者都通过Broker进行消息传递。
-
功能:
- 消息存储:将消息持久化存储在CommitLog和ConsumeQueue中,确保消息的可靠性。
- 消息转发:根据Topic和Queue进行消息的分发和转发。
- 消息查询:提供消息的查询和检索功能。
- 集群管理:支持主从集群和Dledger集群,确保高可用性和数据一致性。
-
特点:
- 高性能:通过高效的IO处理和内存管理,实现高吞吐量和低延迟。
- 可扩展性:支持水平扩展,通过增加Broker实例提升系统性能和容量。
- 丰富的功能:支持广播消费、延迟队列、死信队列、事务消息等高级功能。
-
-
Client(客户端):
-
作用:Client包括消息生产者和消费者,是应用程序与RocketMQ集群交互的接口。生产者负责发送消息,消费者负责接收和处理消息。
-
功能:
- 消息发送:生产者通过客户端API发送消息到指定的Topic。
- 消息消费:消费者通过客户端API订阅Topic,接收并处理消息。
- 负载均衡:客户端自动进行负载均衡,分配消息到不同的MessageQueue,提升系统性能。
- 消费进度管理:客户端记录消费进度(Offset),确保每条消息在消费者组内只被消费一次。
-
特点:
- 易用性:提供丰富的API接口,支持多语言客户端(Java、C++, Python等)。
- 灵活性:支持多种消费模式,如广播消费、集群消费等。
- 可靠性:通过事务消息和消息重试机制,确保消息的可靠传递和处理。
-
-
Dashboard(管理控制台):
-
作用:Dashboard提供RocketMQ集群的可视化管理和监控功能,帮助运维人员实时了解集群状态、消息流动和性能指标。
-
功能:
- 集群监控:实时展示集群中各Broker的状态、负载情况、消息吞吐量等。
- 消息管理:查看和管理Topic、Queue、消息分布等信息。
- 运维操作:提供Broker的启动、停止、配置管理等运维操作。
- 告警通知:设置告警规则,实时通知运维人员集群异常情况。
-
特点:
- 直观性:图形化界面,数据展示直观易懂。
- 全面性:涵盖集群运行的各个方面,提供全面的监控和管理功能。
- 易操作性:简化运维流程,提高运维效率。
-
运行架构总结:
-
协调与通信:客户端通过NameServer获取Broker地址,直接与Broker进行消息发送和接收。NameServer作为集群的协调中心,确保消息路由的准确性和高效性。
-
消息流动:生产者发送消息到指定的Topic,Broker根据Topic将消息存储到对应的MessageQueue中。消费者订阅Topic,通过消息队列获取并处理消息,消费进度由消费者组记录,确保消息不重复消费。
-
集群管理:Broker集群通过主从架构或Dledger集群实现高可用性和数据一致性,NameServer集群通过多实例部署实现高可用性。Dashboard作为监控和管理工具,实时监控集群状态,提供便捷的运维操作。
示意图:
(请替换为实际图片链接)
四、理解RocketMQ的消息模型
RocketMQ的消息模型是其核心设计之一,决定了消息的存储、传递和消费方式。理解消息模型有助于更好地设计和优化基于RocketMQ的系统。
4.1 消息发送与存储流程
-
生产者发送消息:
- 指定Topic:生产者在发送消息时,指定消息所属的Topic。
- 消息分配:RocketMQ根据Topic将消息分配到对应的MessageQueue中,分散存储到不同的Broker上。
- 消息属性:每条消息包含Topic、Tag、消息体(Body)等属性,并生成唯一的
msgId
和queueOffset
。
-
消息存储:
- CommitLog:所有消息首先写入CommitLog,RocketMQ通过顺序写入确保高性能。
- ConsumeQueue:每个MessageQueue对应一个ConsumeQueue,记录消息的索引信息,包括偏移量、存储位置等。
- Index:为了支持快速查询,RocketMQ为消息建立索引,加快消息检索速度。
-
消费者消费消息:
- 订阅Topic:消费者订阅指定的Topic,接收属于该Topic的所有消息。
- 消息拉取:消费者通过ConsumeQueue定位消息在CommitLog中的存储位置,读取并处理消息。
- 消费进度:消费者组记录每个MessageQueue的消费进度(Offset),确保每条消息在组内只被消费一次。
4.2 消息模型示意图
+-----------+ +--------------+ +------------+
| Producer | ----> | MessageQueue | ----> | Broker |
+-----------+ +--------------+ +------------+
| | |
| | |
| +-----------+ +-----------+
| | Consume | | CommitLog |
| | Queue | +-----------+
| +-----------+ |
| | |
| | |
| +-----------+ |
| | Index | <-------------+
| +-----------+
|
v
+-----------+
| Consumer |
+-----------+
4.3 消息模型的关键概念
-
Topic:
- 定义:消息的逻辑分类,相当于消息的主题或类别。
- 作用:生产者发送消息到特定Topic,消费者订阅Topic以接收相关消息。
- 管理:Topic需由管理员在Broker端预先创建,生产环境中不建议开启自动创建。
-
MessageQueue:
- 定义:每个Topic下的消息存储队列,具备FIFO特性。
- 作用:分散存储消息,提升系统的并发处理能力。
- 分布:MessageQueue分布在不同的Broker上,实现负载均衡。
-
CommitLog:
- 定义:RocketMQ中所有消息的物理存储文件,采用顺序写入方式。
- 作用:确保消息的高性能写入和持久化存储。
- 特点:支持顺序写入,提升写入性能;支持高效的读取和索引。
-
ConsumeQueue:
- 定义:每个MessageQueue对应的消费索引文件,记录消息在CommitLog中的位置信息。
- 作用:支持消费者快速定位和读取消息,提高消费效率。
- 特点:轻量级,占用较少存储空间;支持高效的消息查询。
-
Index:
- 定义:RocketMQ为支持快速查询建立的消息索引。
- 作用:加快消息检索速度,支持按照消息属性(如Tag)快速过滤消息。
- 特点:支持高效的索引查找,提升消息查询性能。
-
Offset(偏移量):
- 定义:记录消息在MessageQueue中的存储位置。
- 作用:用于定位和读取消息,确保消息按照顺序消费。
- 管理:消费者组记录每个MessageQueue的消费Offset,保证消息不重复消费。
4.4 消息模型的优势
- 高性能:通过CommitLog的顺序写入和ConsumeQueue的高效索引,确保消息的高性能存储和消费。
- 可扩展性:通过MessageQueue的分布式存储和多Broker集群,支持高并发和大规模消息处理。
- 灵活性:支持多种消费模式(广播、集群),满足不同业务需求。
- 可靠性:通过主从架构或Dledger集群,实现消息的高可用性和数据的持久化存储。
总结:RocketMQ的消息模型通过Topic、MessageQueue、CommitLog、ConsumeQueue、Index等核心组件,实现了高性能、高可扩展性和高可靠性的消息存储与传递机制,适应了现代互联网和企业级应用的复杂需求。
4.5 RocketMQ与Kafka的对比
RocketMQ和Kafka都是分布式消息队列系统,虽然在设计理念上有相似之处,但在实现和功能上存在显著差异。
特性 | Apache Kafka | Apache RocketMQ |
---|---|---|
设计初衷 | 日志收集与大数据采集场景 | 广泛的消息传递与高可靠性场景 |
消息存储 | Partition文件,分布式存储 | CommitLog与ConsumeQueue,分布式存储 |
高可用性 | 使用Kafka自身的集群管理机制,Kraft | 使用Dledger(基于Raft协议)或主从架构 |
Topic管理 | Topic过多时性能下降 | 设计优化,支持大量Topic而性能不受影响 |
消息可靠性 | 可能存在数据丢失风险 | 高可靠性,特别适合金融场景 |
高级功能 | 基础消息功能 | 支持广播消费、延迟队列、死信队列、事务消息等 |
客户端语言支持 | 多语言支持 | Java为主,支持多种客户端协议 |
性能 | 吞吐量极高 | 吞吐量高,性能稍低于Kafka但高于RabbitMQ |
社区与生态 | 社区活跃,生态成熟 | 社区活跃,持续发展中 |
具体对比:
-
Topic管理:
- Kafka:Topic数量过多会导致Partition文件过多,影响文件索引耗时,降低IO性能。
- RocketMQ:优化了Topic的管理,支持大量Topic而不会显著影响整体性能,适合需要高Topic数量的场景。
-
Leader选举与高可用性:
- Kafka:使用自己的Kraft集群管理机制,优先保证服务可用性,但可能在一定程度上牺牲消息的安全性。
- RocketMQ:采用Dledger集群(基于Raft协议)或主从架构,确保高可靠性和数据一致性,特别适合对消息安全性要求高的金融场景。
总结:RocketMQ在高可靠性、消息安全性和高级功能方面表现优异,适用于对消息传递要求高的企业级和金融级应用;而Kafka在日志收集和大数据采集等场景中表现出色,适合需要极高吞吐量和实时数据处理的业务需求。
四、总结RocketMQ的运行架构
通过前述的快速实战和消息模型的理解,我们对RocketMQ的运行架构有了全面的认识。以下将结合实际操作,总结RocketMQ的运行架构及各组件的具体作用。
4.1 RocketMQ运行架构概览
+------------+ +-----------+ +----------+
| Producer | --> | Broker | <-- | Consumer |
+------------+ +-----------+ +----------+
| |
+-------+ +-------+
| NameServer | Dashboard |
+-------+ +-------+
4.2 组件详细作用
-
NameServer(命名服务):
- 功能:
- Broker注册与发现:Broker启动后向NameServer注册自身的地址信息,客户端和其他Broker通过NameServer获取Broker地址,实现动态路由。
- 路由服务:提供Topic到MessageQueue的路由信息,客户端根据路由信息直接与Broker进行消息发送和接收。
- 特点:
- 无状态:NameServer不保存任何状态信息,支持多实例部署,实现高可用性。
- 轻量级:资源占用低,易于部署和扩展。
- 功能:
-
Broker(核心服务):
- 功能:
- 消息存储:负责将生产者发送的消息存储到CommitLog和ConsumeQueue中,确保消息持久化。
- 消息转发:根据Topic和Queue将消息转发到消费者,支持广播消费和集群消费模式。
- 消息查询:提供消息的查询和检索功能,支持按条件过滤消息。
- 集群管理:通过主从架构或Dledger集群实现高可用性和数据一致性,防止单点故障。
- 特点:
- 高性能:通过顺序写入和高效的内存管理,实现高吞吐量和低延迟。
- 可扩展性:支持水平扩展,通过增加Broker实例提升系统性能和容量。
- 丰富的功能:支持高级消息功能,如事务消息、延迟消息、死信消息等,满足复杂业务需求。
- 功能:
-
Client(客户端):
- 功能:
- 消息生产:生产者通过客户端API发送消息到指定的Topic,支持同步和异步发送模式。
- 消息消费:消费者通过客户端API订阅Topic,接收并处理消息,支持并发消费和顺序消费。
- 负载均衡:客户端自动进行负载均衡,分配消息到不同的MessageQueue,提高系统性能。
- 消费进度管理:消费者组记录消费进度(Offset),确保消息不重复消费,支持消息重试机制。
- 特点:
- 易用性:提供简单易用的API接口,支持多种编程语言和客户端协议。
- 灵活性:支持多种消费模式,如广播消费、集群消费、事务消息消费等。
- 可靠性:通过事务消息和消息重试机制,确保消息的可靠传递和处理。
- 功能:
-
Dashboard(管理控制台):
- 功能:
- 集群监控:实时展示RocketMQ集群中各Broker的状态、负载情况、消息吞吐量等指标。
- 消息管理:查看和管理Topic、MessageQueue、消息分布等信息。
- 运维操作:提供Broker的启动、停止、配置管理等运维操作,简化运维流程。
- 告警与通知:设置告警规则,实时通知运维人员集群异常情况,及时响应和处理问题。
- 特点:
- 直观性:图形化界面,数据展示直观易懂,方便快速定位问题。
- 全面性:涵盖集群运行的各个方面,提供全面的监控和管理功能。
- 易操作性:简化运维流程,提高运维效率,减少人为操作错误。
- 功能:
4.3 运行架构详细流程
-
消息发送流程:
- 生产者:通过客户端API发送消息到指定的Topic。
- NameServer:生产者通过NameServer获取Broker地址,确定发送目标。
- Broker:接收到消息后,将消息存储到CommitLog和ConsumeQueue中,确保消息持久化。
- 消息分配:Broker根据Topic将消息分配到对应的MessageQueue,分散存储到不同的Broker实例上。
-
消息消费流程:
- 消费者:通过客户端API订阅Topic,接收消息。
- NameServer:消费者通过NameServer获取Broker地址,确定消费源。
- Broker:消费者通过ConsumeQueue定位消息在CommitLog中的存储位置,读取并处理消息。
- 消费进度:消费者组记录每个MessageQueue的消费Offset,确保消息按顺序消费且不重复消费。
-
集群管理流程:
- 主从架构:Master负责处理请求,Slave备份数据,确保消息不丢失。
- Dledger集群:通过Raft协议实现自动Leader选举和数据一致性,提升集群的高可用性和容错性。
- Dashboard监控:实时监控集群状态,提供运维操作和告警通知,保障集群稳定运行。
4.4 集群故障处理
-
主从架构下的故障处理:
- 故障检测:监控Master节点的健康状态,发现故障后自动切换到Slave节点。
- 数据备份:Slave节点备份Master节点的数据,防止数据丢失。
- 服务恢复:切换到Slave节点后,系统继续提供消息服务,保障业务连续性。
-
Dledger集群下的故障处理:
- 自动Leader选举:当Leader节点故障时,集群自动选举新的Leader,确保集群持续可用。
- 数据一致性:通过Raft协议保证所有节点的数据一致性,避免数据冲突和不一致。
- 高容错性:只要超过半数的节点正常运行,集群即可正常工作,提升系统的容错能力。
总结:RocketMQ通过NameServer、Broker、Client和Dashboard等核心组件,实现了高性能、高可用性和高可靠性的消息传递机制。集群管理通过主从架构和Dledger集群,确保系统在高并发和故障情况下的稳定运行。理解RocketMQ的运行架构有助于更好地设计和优化基于RocketMQ的分布式系统。
五、理解RocketMQ的消息模型
RocketMQ的消息模型是其设计的核心,决定了消息的存储、传递和消费方式。通过深入理解消息模型,可以更高效地设计和优化消息传递系统。
5.1 消息发送与存储流程
-
生产者发送消息:
-
指定Topic:生产者在发送消息时,必须指定消息所属的Topic。Topic是消息的逻辑分类,类似于邮件的主题。
-
消息分配:
- RocketMQ根据Topic将消息分配到对应的MessageQueue中。
- 每个Topic可以包含多个MessageQueue,消息在多个Queue中分布存储,实现负载均衡。
-
消息属性:
- msgId:每条消息的唯一标识,用于追踪和查询。
- queueOffset:消息在MessageQueue中的偏移量,标识消息存储位置。
-
存储过程:
- CommitLog:所有消息首先写入CommitLog文件,确保消息的持久化存储。
- ConsumeQueue:每个MessageQueue对应一个ConsumeQueue文件,记录消息在CommitLog中的存储位置和偏移量。
- Index:为支持快速查询,RocketMQ为消息建立索引,提升消息检索速度。
-
-
消息存储:
- CommitLog:
- 特点:顺序写入,支持高性能的消息存储。
- 作用:持久化存储所有消息,确保消息不丢失。
- ConsumeQueue:
- 特点:轻量级,占用较少存储空间。
- 作用:记录消息在CommitLog中的存储位置,支持快速定位和消费。
- Index:
- 特点:支持高效的索引查找,提升消息检索性能。
- 作用:按消息属性(如Tag)快速过滤和查询消息。
- CommitLog:
5.2 消息消费流程
-
消费者订阅Topic:
-
订阅:消费者通过客户端API订阅一个或多个Topic,接收对应的消息。
-
消费模式:
- 广播消费:所有消费者实例均可消费同一消息,实现广播。
- 集群消费:每条消息仅由一个消费者实例消费,实现负载均衡。
-
-
消息拉取与处理:
- 拉取消息:
- 消费者通过ConsumeQueue定位消息在CommitLog中的存储位置。
- 根据Offset读取消息内容,实现高效的消息消费。
- 消息处理:
- 消费者处理接收到的消息,可以进行业务逻辑处理、数据存储等操作。
- 处理完成后,消费者更新消费进度(Offset),确保消息不重复消费。
- 拉取消息:
-
消费进度管理:
-
Offset记录:消费者组记录每个MessageQueue的消费Offset,标识已消费的消息位置。
-
消费重试:如果消息消费失败,RocketMQ支持消息的重试机制,确保消息最终被消费。
-
事务消息:支持分布式事务,确保消息与业务操作的一致性。
-
5.3 消息模型示意图
+-----------+ +--------------+ +------------+
| Producer | ----> | MessageQueue | ----> | Broker |
+-----------+ +--------------+ +------------+
| | |
| | |
| +-----------+ +-----------+
| | Consume | | CommitLog |
| | Queue | +-----------+
| +-----------+ |
| | |
| | |
| +-----------+ |
| | Index | <-------------+
| +-----------+
|
v
+-----------+
| Consumer |
+-----------+
说明:
- Producer发送消息到MessageQueue:生产者通过指定的Topic将消息发送到对应的MessageQueue中。
- MessageQueue存储消息:消息被存储在CommitLog中,同时ConsumeQueue记录消息的存储位置。
- Consumer从MessageQueue消费消息:消费者通过ConsumeQueue定位消息在CommitLog中的位置,读取并处理消息。
- Index支持快速查询:Index提供按消息属性(如Tag)的快速检索功能,提升消息查询效率。
5.4 消息模型的关键概念
-
Topic:
- 定义:消息的逻辑分类,用于组织和管理消息。
- 作用:生产者发送消息到指定的Topic,消费者订阅Topic接收消息。
- 管理:Topic需由管理员在Broker端预先创建,生产环境中建议关闭自动创建功能。
-
MessageQueue:
- 定义:每个Topic下的消息存储队列,具备FIFO特性。
- 作用:分散存储消息,提升系统的并发处理能力。
- 分布:MessageQueue分布在不同的Broker实例上,实现负载均衡和高可用性。
-
CommitLog:
- 定义:RocketMQ中所有消息的物理存储文件,采用顺序写入方式。
- 作用:确保消息的高性能写入和持久化存储。
- 特点:支持顺序写入,提升写入性能;支持高效的读取和索引。
-
ConsumeQueue:
- 定义:每个MessageQueue对应的消费索引文件,记录消息在CommitLog中的存储位置。
- 作用:支持消费者快速定位和读取消息,提高消费效率。
- 特点:轻量级,占用较少存储空间;支持高效的消息查询。
-
Index:
- 定义:RocketMQ为支持快速查询建立的消息索引。
- 作用:加快消息检索速度,支持按照消息属性(如Tag)快速过滤消息。
- 特点:支持高效的索引查找,提升消息查询性能。
-
Offset(偏移量):
- 定义:记录消息在MessageQueue中的存储位置。
- 作用:用于定位和读取消息,确保消息按照顺序消费。
- 管理:消费者组记录每个MessageQueue的消费Offset,保证消息不重复消费。
5.5 消息模型的优势
- 高性能:
- 通过CommitLog的顺序写入和ConsumeQueue的高效索引,实现高吞吐量和低延迟。
- 可扩展性:
- 通过MessageQueue的分布式存储和多Broker集群,支持高并发和大规模消息处理。
- 灵活性:
- 支持多种消费模式(广播消费、集群消费),满足不同业务需求。
- 可靠性:
- 通过主从架构或Dledger集群,实现消息的高可用性和数据的持久化存储。
5.6 RocketMQ与Kafka的对比
RocketMQ和Kafka都是分布式消息队列系统,虽然在设计理念上有相似之处,但在实现和功能上存在显著差异。
特性 | Apache Kafka | Apache RocketMQ |
---|---|---|
设计初衷 | 日志收集与大数据采集场景 | 广泛的消息传递与高可靠性场景 |
消息存储 | Partition文件,分布式存储 | CommitLog与ConsumeQueue,分布式存储 |
高可用性 | 使用Kafka自身的集群管理机制,Kraft | 使用Dledger(基于Raft协议)或主从架构 |
Topic管理 | Topic过多时性能下降 | 设计优化,支持大量Topic而性能不受影响 |
消息可靠性 | 可能存在数据丢失风险 | 高可靠性,特别适合金融场景 |
高级功能 | 基础消息功能 | 支持广播消费、延迟队列、死信队列、事务消息等 |
客户端语言支持 | 多语言支持 | Java为主,支持多种客户端协议 |
性能 | 吞吐量极高 | 吞吐量高,性能稍低于Kafka但高于RabbitMQ |
社区与生态 | 社区活跃,生态成熟 | 社区活跃,持续发展中 |
具体对比:
-
Topic管理:
- Kafka:Topic数量过多会导致Partition文件过多,影响文件索引耗时,降低IO性能。
- RocketMQ:优化了Topic的管理,支持大量Topic而不会显著影响整体性能,适合需要高Topic数量的场景。
-
Leader选举与高可用性:
- Kafka:使用自己的Kraft集群管理机制,优先保证服务可用性,但可能在一定程度上牺牲消息的安全性。
- RocketMQ:采用Dledger集群(基于Raft协议)或主从架构,确保高可靠性和数据一致性,特别适合对消息安全性要求高的金融场景。
总结:RocketMQ在高可靠性、消息安全性和高级功能方面表现优异,适用于对消息传递要求高的企业级和金融级应用;而Kafka在日志收集和大数据采集等场景中表现出色,适合需要极高吞吐量和实时数据处理的业务需求。
建议:在学习RocketMQ时,可以结合Kafka的学习,进行横向对比,理解各自的优势和适用场景,从而更好地应用于实际业务中。
六、章节总结
6.1 本章节内容回顾
本章节通过RocketMQ的快速实战操作,深入理解了RocketMQ的运行架构和消息模型,掌握了从服务搭建、消息发送与消费到集群升级的全过程。具体内容包括:
-
MQ简介:
- 了解MQ的基本概念、核心组成和主要作用。
- 理解异步处理、解耦和削峰填谷在系统架构中的重要性。
-
RocketMQ产品特点:
- 了解RocketMQ的背景和发展历程。
- 对比RocketMQ与其他主流MQ产品(Kafka、RabbitMQ、Pulsar)的优势和适用场景。
-
RocketMQ快速实战:
- 学会快速搭建RocketMQ服务,包括NameServer和Broker的安装与配置。
- 通过命令行工具和Maven项目,实现消息的发送与接收。
- 搭建RocketMQ的可视化管理控制台(Dashboard),实现集群的实时监控与管理。
- 升级RocketMQ集群至分布式主从架构,提升系统的高可用性和容错能力。
- 进一步升级至高可用的Dledger集群,基于Raft协议实现自动Leader选举和数据一致性。
-
总结RocketMQ的运行架构:
- 理解RocketMQ的核心组件(NameServer、Broker、Client、Dashboard)的作用和协同工作方式。
- 通过示意图和具体流程,梳理消息的发送、存储和消费过程。
-
理解RocketMQ的消息模型:
- 深入理解RocketMQ的消息存储与消费机制,包括CommitLog、ConsumeQueue、Index等关键概念。
- 对比RocketMQ与Kafka的消息模型,理解各自的优势和设计理念。
-
章节总结:
- 回顾本章节的学习内容,强调RocketMQ在高性能、高可靠性和高级功能方面的优势。
- 提出后续学习建议,鼓励结合实际业务场景,深入研究RocketMQ的高级功能和优化策略。
6.2 学习建议
-
横向对比学习:
-
对比Kafka和RocketMQ:了解两者在设计理念、消息模型、性能优化等方面的异同,深化对分布式消息队列系统的理解。
-
对比RabbitMQ和Pulsar:探索不同MQ产品在功能、性能和适用场景上的特点,选择最适合业务需求的消息队列系统。
-
-
深入研究高级功能:
-
事务消息:学习RocketMQ的事务消息机制,确保分布式系统中消息与业务操作的一致性。
-
延迟队列:掌握消息延迟发送和消费的实现方式,应用于定时任务和延迟处理场景。
-
死信队列:理解死信队列的概念和应用,处理消费失败的消息,提升系统的健壮性。
-
-
性能优化与监控:
-
性能调优:研究RocketMQ的性能调优策略,如内存配置、消息压缩、批量发送等,提高系统的吞吐量和响应速度。
-
监控与告警:结合Dashboard和第三方监控工具,建立全面的监控体系,及时发现和解决集群运行中的问题。
-
-
实战项目应用:
-
业务场景应用:将RocketMQ应用于实际业务场景中,如订单处理、日志收集、实时数据传输等,验证其性能和可靠性。
-
集群部署与维护:掌握RocketMQ集群的部署、扩展和维护技巧,确保系统的高可用性和稳定性。
-
-
社区与资源:
-
参与开源社区:关注RocketMQ的GitHub仓库,参与社区讨论和贡献,获取最新的技术动态和最佳实践。
-
学习资源:结合图灵课程、官方文档和在线教程,系统化学习RocketMQ的理论知识和实战经验。
-
总结:通过本章节的学习,已对RocketMQ的基础概念、运行架构和消息模型有了全面的认识。接下来的学习将进一步探索RocketMQ的高级功能和优化策略,结合实际业务场景,全面掌握分布式消息队列系统的设计与应用。