当前位置: 首页 > article >正文

基于Controller模式部署RocketMQ集群

RocketMQ简介

RocketMQ是一种分布式消息中间件,它由阿里巴巴集团开发,并且后来捐献给了Apache软件基金会。RocketMQ最初是为了解决阿里巴巴内部因业务增长带来的高吞吐量需求而设计的。随着其不断发展和完善,RocketMQ已经成为了一个能够处理从传统发布/订阅模式到大规模实时无差错交易系统的强大消息引擎。
在这里插入图片描述

RocketMQ的主要特点

  • 架构简洁:相比其他消息队列系统,RocketMQ具有更简单的架构设计。
  • 丰富的业务功能:支持多种消息类型和应用场景,比如实时消息处理、顺序消息处理以及事务消息处理。
  • 极高的可扩展性:可以轻松地进行水平扩展以满足不同规模的应用需求。
  • 金融级可靠性:经过长时间在实际生产环境中的验证,被广泛认可为适用于对数据准确性和一致性要求极高的金融场景下的消息- 传递解决方案。

与其它消息队列系统的比较

  • Kafka 更适合于大量数据处理(如离线流处理);
  • RabbitMQ 则擅长处理复杂的消息路由及支持多种协议;
  • ActiveMQ 提供了广泛的协议支持但性能相对较低。

官方网站:http://rocketmq.apache.org

项目地址:https://github.com/apache/rocketmq

中文社区:https://rocketmq.io/

集群模式介绍

RocketMQ 技术架构中包含的角色有NameServer、Broker、Proxy、Controller、dashboard、Producer、Consumer等。
在这里插入图片描述

  • NameServer 是 RocketMQ 的路由服务集群,它不保存任何消息数据,而是维护着所有 Broker 的路由信息(包括 Broker 存活情况、各 topic 的路由信息等)
  • Broker 是 RocketMQ 的核心服务节点,负责接收来自 Producer(生产者)的消息、存储消息以及将消息转发给 Consumer(消费者)。在 RocketMQ 架构中,Broker 分为 Master Broker 和 Slave Broker(或称为 Replica)两种角色
  • Proxy 是 RocketMQ 为了提高性能和简化客户端接入而引入的一个可选组件。它作为一个轻量级的代理服务器,位于客户端与 Broker 之间
  • Controller是 RocketMQ 5.0 新增的组件,它充当控制平面的角色,负责管理和协调系统的整体状态。它主要出现在可切换架构的部署过程中,如 Broker 宕机时,它将进行调度,对 Broker 集群进行选举管理。Controller 本身也支持集群部署,基于 Raft 实现容灾。
  • Dashboard 是 RocketMQ 提供的一款可视化管理界面,它允许用户通过网页浏览器直观地监控和管理 RocketMQ 集群的状态,包括但不限于查看消息队列、消费进度、Broker 健康状况、主题配置、消费组详情等信息。
  • Producer 与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
  • Consumer 与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。

RocketMQ集群迭代历程:

  • 在RocketMQ 4.5版本之前,使用简单的Master-slave主从模式,如果Master宕机,不支持自动将Slave切换为Master,需要人工介入
  • 在RocketMQ 4.5版本之后,引入了DLedger来解决Master/Slave的自动主从切换问题,该机制基于Raft协议实现,但存在局限性
  • 在RocketMQ 5.0版本之后,引入controller组件,基于Dledger Controller实现主从切换

Proxy 组件部署有两种方式:

  • Local 模式:将 Proxy 与 Broker 一起部署在同一个进程中,这能够节约一些机器资源。
  • Cluster 模式:将 Proxy 作为一个独立集群进行部署。这能够提供较强的横向扩展能力。

Controller 组件部署有两种方式:

  • 嵌入于 NameServer 进行部署,可以通过配置 enableControllerInNamesrv 打开。
  • 另一种是独立部署,需要单独部署 Controller 组件。

数据存储分为同步刷盘和异步刷盘:

  • 同步刷盘:当消息到达 Broker 后,只有把消息写入到 CommitLog 日志文件中,才给生产者返回发送成功的响应。
  • 异步刷盘:当消息到达 Broker 后,就给生产者返回数据发送成功了,并启动一个异步线程去把消息写入到 CommitLog 中。

主从同步分为同步复制和异步复制:

  • 每个Master配置一个Slave,有多组 Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),优点是性能同多Master模式几乎一样,缺点是Master宕机磁盘损坏情况下会丢失少量消息。

  • 每个Master配置一个Slave,有多组 Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功。优点是数据与服务都无单点故障,缺点是性能比异步复制模式略低

集群部署规划

本示例基于RocketMQ 5.0 controller模式,采用异步刷盘、同步复制方式,所有组件分离部署:

  • proxy为无状态组件,可以部署1至多个节点,本示例为2节点冗余部署
  • nameserver为无状态组件,可以部署1至多个节点,本示例为2节点冗余部署
  • controller基于raft协议,需要部署至少3个节点,本示例为3节点冗余部署
  • broker部署2个副本组,每个副本组包含1个主节点和1个从节点,主从选举及故障切换由controller组件自动管理
  • dashboard为无状态组件,可以部署1至多个节点,本示例为2节点冗余部署
  • LB节点通过haproxy和keepalived实现,作为proxy和dashboard的统一负载均衡入口,本示例为2节点冗余部署

节点规划清单如下:

主机名IP地址角色操作系统CPU/内存/磁盘
LB01192.168.72.18负载均衡器Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
LB02192.168.72.19负载均衡器Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
proxy01192.168.72.31Proxy节点Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
proxy02192.168.72.32Proxy节点Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
nameserver01192.168.72.33NameServer节点Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
nameserver02192.168.72.34NameServer节点Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
broker-1-1192.168.72.51Broker副本组1-1Ubuntu 22.04 LTS2C /8GB/100GB DATA DISK
broker-1-2192.168.72.52Broker副本组1-2Ubuntu 22.04 LTS2C /8GB/100GB DATA DISK
broker-2-1192.168.72.53Broker副本组2-1Ubuntu 22.04 LTS2C /8GB/100GB DATA DISK
broker-2-2192.168.72.54Broker副本组2-2Ubuntu 22.04 LTS2C /8GB/100GB DATA DISK
controller01192.168.72.40Controller节点Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
controller02192.168.72.41Controller节点Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
controller03192.168.72.42Controller节点Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
dashboard01192.168.72.55Dashboard管理节点Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
dashboard02192.168.72.56Dashboard管理节点Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
proxy VIP192.168.72.250proxy VIPN/AN/A
dashboard VIP192.168.72.251dashboard VIPN/AN/A

节点配置说明:

  • 可视情况合并部分组件复用一个节点部署,nameserver、broker为必选组件
  • broker 至少8G内存,生产环境建议提高配置,例如8C 16G 500G SSD,数据盘可单独挂载至/data目录
  • 提供两个分离的VIP地址,一个作为rocketmq proxy统一入口,一个作为rocketmq dashboard统一入口

集群架构如下图所示:
在这里插入图片描述
部署目录规划

  • 安装目录:统一安装包及配置文件存放路径,如 /opt/rocketmq/
  • 数据目录:运行过程中生成的日志和数据文件与安装包分开,存放在 /data/目录下。
  • 服务管理:通过 systemd 管理服务,保证开机自启和管理规范化。

基础环境配置

配置主机名,在所有节点执行,每个节点单独配置。

hostnamectl set-hostname nameserver01
hostnamectl set-hostname nameserver02
hostnamectl set-hostname controller01
hostnamectl set-hostname controller02
hostnamectl set-hostname controller03
hostnamectl set-hostname broker-1-1
hostnamectl set-hostname broker-1-2
hostnamectl set-hostname broker-2-1
hostnamectl set-hostname broker-2-2
hostnamectl set-hostname dashboard01
hostnamectl set-hostname dashboard02
hostnamectl set-hostname proxy01
hostnamectl set-hostname proxy02
hostnamectl set-hostname lb01
hostnamectl set-hostname lb02

配置所有节点时间同步,默认与互联网NTP同步时间

apt install -y chrony
systemctl enable --now chrony

基础软件安装,在所有非LB节点执行。

apt update -y
apt install -y unzip
apt install -y openjdk-11-jdk

依赖说明:

  • RocketMQ依赖JAVA环境,因此需要安装openjdk,注意版本兼容性
  • 官方二进制包解压需要使用unzip工具

操作系统初始化,在所有非LB节点执行,配置RocketMQ相关系统参数。

wget https://raw.githubusercontent.com/apache/rocketmq/refs/heads/develop/distribution/bin/os.sh
bash os.sh

部署 RocketMQ 软件包

以下操作在所有非LB节点执行。

下载二进制文件,以使用RocketMQ 5.3.1版本二进制包安装为例。

wget https://dist.apache.org/repos/dist/release/rocketmq/5.3.1/rocketmq-all-5.3.1-bin-release.zip

rocketmq-all-5.3.1-bin-release.zip 上传到目标服务器。执行以下命令解压到/opt目录下:

# 解压安装包到指定目录
sudo unzip rocketmq-all-5.3.1-bin-release.zip -d /opt

# 创建软链接,便于后续升级管理
sudo ln -s /opt/rocketmq-all-5.3.1-bin-release /opt/rocketmq

配置环境变量,方便调用相关bin二进制命令

cat >/etc/profile.d/rocketmq.sh<<'EOF'
export ROCKETMQ_HOME=/opt/rocketmq
export PATH=$ROCKETMQ_HOME/bin:$PATH
EOF

source /etc/profile

部署 NameServer 组件

在所有nameserver节点操作。

创建 systemd 服务文件

/etc/systemd/system/ 目录下创建一个 rocketmq-namesrv.service 文件。写入以下内容:

cat >/etc/systemd/system/rocketmq-namesrv.service<<'EOF'
[Unit]
Description=RocketMQ NameServer
After=network.target

[Service]
Type=simple
User=root
ExecStart=/opt/rocketmq/bin/mqnamesrv
ExecStop=/bin/kill -SIGTERM $MAINPID
Restart=on-failure
RestartSec=5
LimitNOFILE=65536
Environment="ROCKETMQ_HOME=/opt/rocketmq"
Environment="JAVA_OPT=-Duser.home=/data/rocketmq-namesrv"

[Install]
WantedBy=multi-user.target
EOF

启用并启动 NameServer 服务

执行以下命令:

# 启动 NameServer 服务
sudo systemctl start rocketmq-namesrv

# 设置开机自启
sudo systemctl enable rocketmq-namesrv

# 检查服务状态
sudo systemctl status rocketmq-namesrv

# 重启服务(可选)
systemctl daemon-reload
systemctl restart rocketmq-namesrv

示例输出结果

root@nameserver01:~# systemctl status rocketmq-namesrv
● rocketmq-namesrv.service - RocketMQ NameServer
     Loaded: loaded (/etc/systemd/system/rocketmq-namesrv.service; enabled; vendor preset: enabled)
     Active: active (running) since Sun 2024-12-22 15:27:03 CST; 1min 36s ago
   Main PID: 112830 (mqnamesrv)
      Tasks: 74 (limit: 4557)
     Memory: 240.4M
        CPU: 5.388s
     CGroup: /system.slice/rocketmq-namesrv.service
             ├─112830 /bin/sh /opt/rocketmq/bin/mqnamesrv
             ├─112831 sh /opt/rocketmq/bin/runserver.sh -Drmq.logback.configurationFile=/opt/rocketmq/conf/rmq.namesrv.logback.xml org.apache.rocketmq.namesrv.NamesrvStartup
             └─112858 /usr/lib/jvm/java-11-openjdk-amd64/bin/java -Duser.home=/data/rocketmq-namesrv -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m -XX:+UseG1GC -XX:G1Heap>

Dec 22 15:27:03 nameserver01 systemd[1]: Started RocketMQ NameServer.
Dec 22 15:27:05 nameserver01 mqnamesrv[112858]: The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876

日志检查与验证

NameServer 默认的日志目录位于home目录下,可通过以下命令检查:

tail -f /data/rocketmq-namesrv/logs/rocketmqlogs/namesrv.log

确保 NameServer 服务正常监听在 9876 端口

root@nameserver01:~# ss -tlnp | grep 9876
LISTEN 0      1024               *:9876            *:*    users:(("java",pid=7363,fd=152))   

部署 Controller 组件

以下操作在controller节点执行。

修改controller配置文件

创建controller01节点配置文件,在Controller01节点操作。

cat >/opt/rocketmq/conf/controller/controller.conf<<EOF
controllerDLegerGroup = group1
controllerDLegerPeers = n0-192.168.72.40:9878;n1-192.168.72.41:9878;n2-192.168.72.42:9878
controllerDLegerSelfId = n0
controllerStorePath = /data/admin/DledgerController
EOF

创建controller02节点配置文件,在Controller02节点操作。

cat >/opt/rocketmq/conf/controller/controller.conf<<EOF
controllerDLegerGroup = group1
controllerDLegerPeers = n0-192.168.72.40:9878;n1-192.168.72.41:9878;n2-192.168.72.42:9878
controllerDLegerSelfId = n1
controllerStorePath = /data/admin/DledgerController
EOF

创建controller03节点配置文件,在Controller03节点操作。

cat >/opt/rocketmq/conf/controller/controller.conf<<EOF
controllerDLegerGroup = group1
controllerDLegerPeers = n0-192.168.72.40:9878;n1-192.168.72.41:9878;n2-192.168.72.42:9878
controllerDLegerSelfId = n2
controllerStorePath = /data/admin/DledgerController
EOF

创建 systemd 服务文件

在所有Controller节点操作。

/etc/systemd/system/ 目录下创建一个 rocketmq-controller.service 文件。写入以下内容:

cat >/etc/systemd/system/rocketmq-controller.service<<'EOF'
[Unit]
Description=RocketMQ Controller
After=network.target

[Service]
Type=simple
User=root
ExecStart=/opt/rocketmq/bin/mqcontroller -c /opt/rocketmq/conf/controller/controller.conf
ExecStop=/bin/kill -SIGTERM $MAINPID
Restart=on-failure
RestartSec=5
Environment="ROCKETMQ_HOME=/opt/rocketmq"
Environment="JAVA_OPT=-Duser.home=/data/admin"

[Install]
WantedBy=multi-user.target
EOF

启用并启动 controller 服务

在所有Controller节点操作,执行以下命令:

# 启动 Controller 服务
sudo systemctl start rocketmq-controller

# 设置开机自启
sudo systemctl enable rocketmq-controller

# 检查服务状态
sudo systemctl status rocketmq-controller

# 重启服务(可选)
systemctl daemon-reload
systemctl restart rocketmq-controller

示例输出如下

root@controller01:~# systemctl status rocketmq-controller.service 
● rocketmq-controller.service - RocketMQ Controller
     Loaded: loaded (/etc/systemd/system/rocketmq-controller.service; enabled; vendor preset: enabled)
     Active: active (running) since Sun 2024-12-22 15:31:23 CST; 2min 19s ago
   Main PID: 124707 (mqcontroller)
      Tasks: 89 (limit: 4556)
     Memory: 314.1M
        CPU: 26.095s
     CGroup: /system.slice/rocketmq-controller.service
             ├─124707 /bin/sh /opt/rocketmq/bin/mqcontroller -c /opt/rocketmq/conf/controller/controller.conf
             ├─124708 sh /opt/rocketmq/bin/runserver.sh -Drmq.logback.configurationFile=/opt/rocketmq/conf/rmq.controller.logback.xml org.apache.rocketmq.controller.ControllerStartup -c /opt/rock>
             └─124735 /usr/lib/jvm/java-11-openjdk-amd64/bin/java -Duser.home=/data/admin -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m -XX:+UseG1GC -XX:G1HeapRegionSize=>

Dec 22 15:31:23 controller01 systemd[1]: Started RocketMQ Controller.
Dec 22 15:31:25 controller01 mqcontroller[124735]: load config properties file OK, /opt/rocketmq/conf/controller/controller.conf
Dec 22 15:31:26 controller01 mqcontroller[124735]: WARNING: An illegal reflective access operation has occurred
Dec 22 15:31:26 controller01 mqcontroller[124735]: WARNING: Illegal reflective access by com.caucho.hessian.io.JavaDeserializer (file:/opt/rocketmq-all-5.3.1-bin-release/lib/hessian-3.3.6.jar) to>
Dec 22 15:31:26 controller01 mqcontroller[124735]: WARNING: Please consider reporting this to the maintainers of com.caucho.hessian.io.JavaDeserializer
Dec 22 15:31:26 controller01 mqcontroller[124735]: WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
Dec 22 15:31:26 controller01 mqcontroller[124735]: WARNING: All illegal access operations will be denied in a future release
Dec 22 15:31:27 controller01 mqcontroller[124735]: The Controller Server boot success. serializeType=JSON

日志检查与验证

Controller 默认的日志目录位于home目录下,可通过以下命令检查:

tail -f /data/admin/logs/rocketmqlogs/controller.log

确保 Controller 服务正常监听在 9878 端口:

root@controller01:~# ss -tlnp | grep 9878
LISTEN 0      1024               *:9878            *:*    users:(("java",pid=121984,fd=157))   

查看controller状态

root@controller01:~# mqadmin getControllerMetaData -a 192.168.72.40:9878

#ControllerGroup        group1
#ControllerLeaderId     n0
#ControllerLeaderAddress        192.168.72.40:9878
#Peer:  n0:192.168.72.40:9878
#Peer:  n1:192.168.72.41:9878
#Peer:  n2:192.168.72.42:9878

参数-a 代表的是任意一个 Controller 的地址

部署 Broker 组件

以下操作在broker节点执行。

修改broker配置文件

备份默认配置文件,在所有broker节点操作。

cp /opt/rocketmq/conf/broker.conf{,.bak}

修改两个broker-1-*节点配置文件,副本组1,两个节点配置相同

cat >/opt/rocketmq/conf/broker.conf<<EOF
brokerClusterName=DefaultCluster
brokerName=broker-1
listenPort=10911
storePathCommitLog=/data/rocketmq-broker/store/commitlog/
storePathConsumerQueue=/data/rocketmq-broker/store/consumequeue/
allAckInSyncStateSet=true
#brokerId=-1
deleteWhen=04
fileReservedTime=48
#brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
enableControllerMode = true
controllerAddr = 192.168.72.40:9878;192.168.72.41:9878;192.168.72.42:9878
namesrvAddr = 192.168.72.33:9876;192.168.72.34:9876
EOF

参数说明:

  • brokerClusterName:设定 Broker 所属的集群名称。默认是 DefaultCluster
  • brokerName:指定 Broker 名称,用于区分集群中的不同 Broker,同一个副本组内节点参数相同,不同副本组不同。
  • listenPort:设置 Broker 的监听端口,默认为 10911
  • storePathCommitLog:指定 commitlog 文件的存储路径,RocketMQ 存储消息的文件。
  • storePathConsumerQueue:指定消费者队列文件的存储路径。
  • allAckInSyncStateSet=true:若该值为 true,则一条消息需要复制到 SyncStateSet 中的每一个副本才会向客户端返回成功,可以保证消息不丢失。默认为 false。
  • deleteWhen:该参数指定 commitlog 文件的删除策略,04 表示每天的 4 点删除过期的 commitlog 文件。
  • fileReservedTime:配置 RocketMQ 存储的日志文件的保留时间,单位为小时。48 小时后未被消费的消息文件会被删除。
  • flushDiskType:配置日志刷盘策略。ASYNC_FLUSH 表示异步刷盘,通常性能较好,但可能会丢失部分数据。
  • enableControllerMode:启用控制器模式,用于 RocketMQ 集群管理和控制,通常在 RocketMQ 4.x 版本中开启。
  • controllerAddr:设置控制器节点的地址,用于集群的管理。
  • namesrvAddr:设置 NameServer 的地址,RocketMQ 的 NameServer 用于提供消息路由信息。

修改两个broker-2-*节点配置文件,副本组2,两个节点配置相同,注意与副本组1的brokerName参数不同

cat >/opt/rocketmq/conf/broker.conf<<EOF
brokerClusterName=DefaultCluster
brokerName=broker-2
listenPort=10911
storePathCommitLog=/data/rocketmq-broker/store/commitlog/
storePathConsumerQueue=/data/rocketmq-broker/store/consumequeue/
allAckInSyncStateSet=true
#brokerId=-1
deleteWhen=04
fileReservedTime=48
#brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
enableControllerMode = true
controllerAddr = 192.168.72.40:9878;192.168.72.41:9878;192.168.72.42:9878
namesrvAddr = 192.168.72.33:9876;192.168.72.34:9876
EOF

创建 systemd 服务文件

在所有broker节点操作。

/etc/systemd/system/ 目录下创建一个 rocketmq-broker.service 文件。写入以下内容:

cat >/etc/systemd/system/rocketmq-broker.service<<'EOF'
[Unit]
Description=RocketMQ Broker
After=network.target

[Service]
Type=simple
User=root
ExecStart=/opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf
ExecStop=/bin/kill -SIGTERM $MAINPID
Restart=on-failure
RestartSec=5
LimitNOFILE=65536
Environment="ROCKETMQ_HOME=/opt/rocketmq"
Environment="JAVA_OPT=-Duser.home=/data/rocketmq-broker"

[Install]
WantedBy=multi-user.target
EOF

启用并启动 Broker 服务

执行以下命令:

# 启动 Broker 服务
sudo systemctl start rocketmq-broker

# 设置开机自启
sudo systemctl enable rocketmq-broker

# 检查服务状态
sudo systemctl status rocketmq-broker

# 重启服务(可选)
systemctl daemon-reload
systemctl restart rocketmq-broker

示例输出如下:

root@broker-1-1:~# systemctl status rocketmq-broker
● rocketmq-broker.service - RocketMQ Broker
     Loaded: loaded (/etc/systemd/system/rocketmq-broker.service; enabled; vendor preset: enabled)
     Active: active (running) since Sun 2024-12-22 15:55:35 CST; 4min 36s ago
   Main PID: 119620 (mqbroker)
      Tasks: 187 (limit: 4556)
     Memory: 7.3G
        CPU: 1min 21.725s
     CGroup: /system.slice/rocketmq-broker.service
             ├─119620 /bin/sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf
             ├─119621 sh /opt/rocketmq/bin/runbroker.sh -Drmq.logback.configurationFile=/opt/rocketmq/conf/rmq.broker.logback.xml org.apache.rocketmq.broker.BrokerStartup -c /opt/rocketmq/conf/br>
             └─119650 /usr/lib/jvm/java-11-openjdk-amd64/bin/java -Duser.home=/data/rocketmq-broker -server -Xms8g -Xmx8g -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:Initiat>

Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,708 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.classic.model.processor.LoggerModelHandler - Setting additivity of logger [io.opente>
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,708 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.core.model.processor.AppenderRefModelHandler - Attaching appender named [RocketmqBro>
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,708 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.classic.model.processor.LoggerModelHandler - Setting level of logger [RocketmqAuthAu>
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,708 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.classic.model.processor.LoggerModelHandler - Setting additivity of logger [RocketmqA>
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,708 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.core.model.processor.AppenderRefModelHandler - Attaching appender named [RocketmqAut>
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,708 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.classic.model.processor.RootLoggerModelHandler - Setting level of ROOT logger to INFO
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,708 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.core.model.processor.AppenderRefModelHandler - Attaching appender named [DefaultSift>
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,709 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.core.model.processor.DefaultProcessor@6b4a4e18 - End of configuration.
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,713 |-INFO in org.apache.rocketmq.common.logging.JoranConfiguratorExt@27c86f2d - Registering current configuration as safe fallback point
Dec 22 15:55:51 broker-1-1 mqbroker[119650]: The broker[broker-1, 192.168.72.51:10911] boot success. serializeType=JSON and name server is 192.168.72.33:9876;192.168.72.34:9876
lines 1-22/22 (END)

日志检查与验证

Broker默认的日志目录位于home目录下,可通过以下命令检查:

tail -f /data/rocketmq-broker/logs/rocketmqlogs/broker.log 

确保 Broker 服务正常监听端口:

root@broker01-master:~# ss -tlnp | grep 10
LISTEN 0      1024               *:10909            *:*    users:(("java",pid=10544,fd=176))        
LISTEN 0      1024               *:10911            *:*    users:(("java",pid=10544,fd=175))        
LISTEN 0      50                 *:10912            *:*    users:(("java",pid=10544,fd=169))  

端口说明:

  • 10911端口:这是RocketMQ Broker组件默认使用的监听端口(listenPort)。要修改这一设定值,应该直接在Broker的配置文件内调整listenPort参数。
  • 10912端口:被称为haListenPort,专门用于Master Broker与Slave Broker之间进行主备切换时的数据同步。如需改变此端口,同样需要在Broker的配置文件中寻找并更新haListenPort项。
  • 10909端口:一般情况下,它代表了Broker VIPChannel所用到的端口,默认计算方式为listenPort - 2(因此通常是10909)。VIPChannel是一种特别设计的网络通道,旨在优化高负载环境下的性能表现。如果想要变更这个特定端口,可以考虑调整listenPort或直接在配置中指定新的VIPChannel端口。

查看 SyncStateSet

可以通过运维工具查看 SyncStateSet:

mqadmin getSyncStateSet -a 192.168.72.40:9878 -b broker-1

参数-a 代表的是任意一个 Controller 的地址

如果顺利的话,可以看到以下内容:

root@broker-1-1:~# mqadmin getSyncStateSet -a 192.168.72.40:9878 -b broker-1

#brokerName     broker-1
#MasterBrokerId 1
#MasterAddr     192.168.72.51:10911
#MasterEpoch    6
#SyncStateSetEpoch      6
#SyncStateSetNums       2

InSyncReplica:  ReplicaIdentity{brokerName='broker-1', brokerId=1, brokerAddress='192.168.72.51:10911', alive=true}

InSyncReplica:  ReplicaIdentity{brokerName='broker-1', brokerId=2, brokerAddress='192.168.72.52:10911', alive=true}
root@broker-1-1:~# mqadmin getSyncStateSet -a 192.168.72.40:9878 -b broker-2

#brokerName     broker-2
#MasterBrokerId 1
#MasterAddr     192.168.72.53:10911
#MasterEpoch    3
#SyncStateSetEpoch      6
#SyncStateSetNums       2

InSyncReplica:  ReplicaIdentity{brokerName='broker-2', brokerId=1, brokerAddress='192.168.72.53:10911', alive=true}

InSyncReplica:  ReplicaIdentity{brokerName='broker-2', brokerId=2, brokerAddress='192.168.72.54:10911', alive=true}
root@broker-1-1:~# 

查看 BrokerEpoch

可以通过运维工具查看 BrokerEpochEntry:

mqadmin getBrokerEpoch -n 192.168.72.33:9876 -b broker-1

参数-n 代表的是任意一个 Namesrv 的地址

如果顺利的话,可以看到以下内容:

root@broker-1-1:~# mqadmin getBrokerEpoch -n 192.168.72.33:9876 -b broker-1

#clusterName    DefaultCluster
#brokerName     broker-1
#brokerAddr     192.168.72.51:10911
#brokerId       0
#Epoch: EpochEntry{epoch=1, startOffset=0, endOffset=0}

#clusterName    DefaultCluster
#brokerName     broker-1
#brokerAddr     192.168.72.52:10911
#brokerId       2
#Epoch: EpochEntry{epoch=1, startOffset=0, endOffset=0}
root@broker-1-1:~# 
root@broker-1-1:~# 
root@broker-1-1:~# mqadmin getBrokerEpoch -n 192.168.72.33:9876 -b broker-2

#clusterName    DefaultCluster
#brokerName     broker-2
#brokerAddr     192.168.72.54:10911
#brokerId       2
#Epoch: EpochEntry{epoch=1, startOffset=0, endOffset=217}
#Epoch: EpochEntry{epoch=2, startOffset=217, endOffset=217}

#clusterName    DefaultCluster
#brokerName     broker-2
#brokerAddr     192.168.72.53:10911
#brokerId       0
#Epoch: EpochEntry{epoch=1, startOffset=0, endOffset=217}
#Epoch: EpochEntry{epoch=2, startOffset=217, endOffset=217}
root@broker-1-1:~# 

列出集群信息

root@broker-1-1:~# mqadmin clusterlist -n 192.168.72.33:9876
#Cluster Name           #Broker Name            #BID  #Addr                  #Version              #InTPS(LOAD)     #OutTPS(LOAD)  #Timer(Progress)        #PCWait(ms)  #Hour         #SPACE    #ACTIVATED
DefaultCluster          broker-1                0     192.168.72.51:10911    V5_3_1                 0.00(0,0ms)       0.00(0,0ms)  0-0(0.0w, 0.0, 0.0)               0  481904.09     0.1100          true
DefaultCluster          broker-1                2     192.168.72.52:10911    V5_3_1                 0.00(0,0ms)       0.00(0,0ms)  2-0(0.0w, 0.0, 0.0)               0  481904.09     0.1000         false
DefaultCluster          broker-2                0     192.168.72.53:10911    V5_3_1                 0.00(0,0ms)       0.00(0,0ms)  0-0(0.0w, 0.0, 0.0)               0  0.68          0.1100          true
DefaultCluster          broker-2                2     192.168.72.54:10911    V5_3_1                 0.00(0,0ms)       0.00(0,0ms)  2-0(0.0w, 0.0, 0.0)               0  0.68          0.1100         false

共2个副本组,每个副本组1主1从两个broker节点。

部署 Proxy 组件

在所有proxy节点操作。

创建 systemd 服务文件

/etc/systemd/system/ 目录下创建一个 rocketmq-proxy.service 文件。写入以下内容:

cat >/etc/systemd/system/rocketmq-proxy.service<<'EOF'
[Unit]
Description=RocketMQ Proxy
After=network.target

[Service]
Type=simple
User=root
Environment="ROCKETMQ_HOME=/opt/rocketmq"
Environment="JAVA_OPT=-Duser.home=/data/rocketmq-proxy"
Environment="NAMESRV_ADDR=192.168.72.33:9876;192.168.72.34:9876"
ExecStart=/opt/rocketmq/bin/mqproxy -n ${NAMESRV_ADDR}
ExecStop=/bin/kill -SIGTERM $MAINPID
Restart=on-failure
RestartSec=5

[Install]
WantedBy=multi-user.target
EOF

启用并启动 proxy服务

执行以下命令:

# 启动 proxy 服务
sudo systemctl start rocketmq-proxy

# 设置开机自启
sudo systemctl enable rocketmq-proxy

# 检查服务状态
sudo systemctl status rocketmq-proxy

# 重新启动服务(可选)
systemctl daemon-reload
systemctl restart rocketmq-proxy

日志检查与验证

Proxy默认的日志目录位于home目录下,可通过以下命令检查:

tail -f /data/rocketmq-proxy/logs/rocketmqlogs/proxy.log 

确保 NameServer 服务正常监听端口:

root@proxy01:~# ss -tlnp | grep :80
LISTEN 0      1024               *:8080            *:*    users:(("java",pid=8906,fd=229))         
LISTEN 0      4096               *:8081            *:*    users:(("java",pid=8906,fd=228)) 

端口说明:

  • 8080端口:这是RocketMQ Proxy中默认的remoting协议访问端口(remotingListenPort)。此外,在RocketMQ Dashboard中也作为默认访问端口。如果需要更改这个端口号,对于Proxy来说,可以通过修改配置文件conf/rmq-proxy.json来实现;而对于Dashboard,则是通过调整application.yml文件中的相关设置完成,参考链接为这里。
  • 8081端口:在RocketMQ Proxy里,该端口被指定为gRPC协议访问端口(grpcServerPort)。同样地,若需自定义此端口号,应编辑rmq-proxy.json配置文件。

部署 Dashboard 组件

在所有dashboard节点操作。

下载并解压rocketmq-dashboard源码包

wget https://dist.apache.org/repos/dist/release/rocketmq/rocketmq-dashboard/2.0.0/rocketmq-dashboard-2.0.0-source-release.zip
sudo unzip rocketmq-dashboard-2.0.0-source-release.zip -d /opt
sudo ln -s /opt/rocketmq-dashboard-2.0.0-source-release /opt/rocketmq-dashboard

修改配置文件

root@dashboard01:~# cat /opt/rocketmq-dashboard/src/main/resources/application.yml 
rocketmq:
  config:
    namesrvAddrs:
      - 192.168.72.33:9876;192.168.72.34:9876
    dataPath: /data/rocketmq-console/data
    loginRequired: true
    proxyAddrs:
      - 192.168.72.31:8080
      - 192.168.72.32:8080

编译 rocketmq-dashboard

cd /opt/rocketmq-dashboard
apt install -y maven
mvn clean package -Dmaven.test.skip=true

创建 systemd 服务文件

/etc/systemd/system/ 目录下创建一个 rocketmq-dashboard.service 文件。写入以下内容:

cat >/etc/systemd/system/rocketmq-dashboard.service<<'EOF'
[Unit]
Description=RocketMQ Dashboard Service
After=network.target

[Service]
ExecStart=/usr/bin/java $JAVA_OPT -jar /opt/rocketmq-dashboard/target/rocketmq-dashboard-2.0.0.jar
WorkingDirectory=/opt/rocketmq-dashboard
StandardOutput=journal
StandardError=journal
Restart=always
User=root
Group=root
Environment="JAVA_OPT=-server -Xms256m -Xmx256m -Duser.home=/data/rocketmq-console"

[Install]
WantedBy=multi-user.target
EOF

启用并启动 dashboard 服务

执行以下命令:

# 启动 dashboard 服务
sudo systemctl start rocketmq-dashboard

# 设置开机自启
sudo systemctl enable rocketmq-dashboard

# 检查服务状态
sudo systemctl status rocketmq-dashboard

# 重新服务(可选)
systemctl daemon-reload
systemctl restart rocketmq-dashboard

日志检查与验证

Dashboard默认的日志目录位于home目录下,可通过以下命令检查:

tail -f /data/rocketmq-console/logs/dashboardlogs/rocketmq-dashboard.log

确保 Dashboard服务正常监听端口

root@dashboard01:~# ss -tlnp | grep 8080

提示:Started App in x.xxx seconds (JVM running for x.xxx) 启动成功

浏览器页面访问:http://dashboard-addr:8080
在这里插入图片描述

部署 LB 组件

在所有lb节点执行。

安装haproxy和keepalived

apt install -y haproxy keepalived

修改haproxy配置文件,所有lb节点配置相同。

cat >/etc/haproxy/haproxy.cfg<<EOF
global
    log /dev/log local0
    log /dev/log local1 notice
    maxconn 200
    uid 99
    gid 99
    daemon

defaults
    log     global
    option  httplog
    timeout connect 5000ms
    timeout client  50000ms
    timeout server  50000ms

frontend proxy_front_remoting
    bind *:8080
    mode tcp
    default_backend proxy_back_remoting

backend proxy_back_remoting
    mode tcp
    balance roundrobin
    option http-server-close
    option forwardfor
    server proxy01 192.168.72.31:8080 check
    server proxy02 192.168.72.32:8080 check


frontend proxy_front_grpc
    bind *:8081
    mode tcp
    default_backend proxy_back_grpc

backend proxy_back_grpc
    mode tcp
    balance roundrobin
    option http-server-close
    option forwardfor
    server proxy01 192.168.72.31:8081 check
    server proxy02 192.168.72.32:8081 check

frontend dashboard_front
    bind *:8088
    mode http
    default_backend dashboard_back

backend dashboard_back
    mode http
    balance roundrobin
    cookie SERVERID insert indirect nocache
    option http-server-close
    option forwardfor
    server dashboard1 192.168.72.55:8080 check cookie dashboard1
    server dashboard2 192.168.72.56:8080 check cookie dashboard2
EOF

修改keepalived配置文件,所有lb节点配置相同。

cat >/etc/keepalived/keepalived.conf<<EOF
global_defs {
    router_id 51
    vrrp_version 2
    vrrp_garp_master_delay 1
    vrrp_garp_master_refresh 1
    script_user root
    enable_script_security
}

vrrp_script check_haproxy {
    script "/usr/bin/killall -0 haproxy"
    timeout 3
    interval 5   # check every 5 second
    fall 3       # require 3 failures for KO
    rise 2       # require 2 successes for OK
}

# VRRP Instance for Proxy VIP
vrrp_instance VI_Proxy {
    state BACKUP
    interface ens33
    virtual_router_id 51
    priority 101
    advert_int 1
    authentication {
        auth_type PASS
        auth_pass 1234
    }
    virtual_ipaddress {
        192.168.72.250 dev ens33 # Proxy VIP
    }
    nopreempt
    track_script {
        check_haproxy
    }
}

# VRRP Instance for Dashboard VIP
vrrp_instance VI_Dashboard {
    state BACKUP
    interface ens33
    virtual_router_id 52
    priority 101
    advert_int 1
    authentication {
        auth_type PASS
        auth_pass 1234
    }
    virtual_ipaddress {
        192.168.72.251 dev ens33 # Dashboard VIP
    }
    nopreempt
    track_script {
        check_haproxy  
    }
}
EOF

参数说明:

  • interface ens33:指定节点网卡名称
  • virtual_ipaddress:指定VIP地址

启动服务

systemctl enable --now haproxy.service keepalived.service

测试故障转移

在副本组1中,IP为192.168.72.51的节点,即broker-1-1为master节点,示例如下:
在这里插入图片描述
将该节点直接关机,模拟broker master节点故障

root@broker-1-1:~# shutdown -h now

重新查看集群状态,节点IP为192.168.72.52的节点自动提升为master节点,继续为客户端提供消息写入及消费能力。
在这里插入图片描述
重新恢复IP为192.168.72.51的节点,即broker-1-1,该节点将成为slave节点。
在这里插入图片描述

使用工具生产与消费消息

创建topic

mqadmin updateTopic -n 192.168.72.33:9876 -t TopicTest -c DefaultCluster \
-a +message.type=NORMAL

示例输出

root@nameserver01:~# mqadmin updateTopic -n 192.168.72.33:9876 -t TopicTest -c DefaultCluster \
-a +message.type=NORMAL
create topic to 192.168.72.51:10911 success.
create topic to 192.168.72.53:10911 success.
TopicConfig [topicName=TopicTest, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={+message.type=NORMAL}]
root@nameserver01:~# 

发送消息,连接到proxy VIP地址

export NAMESRV_ADDR=192.168.72.250:8080
tools.sh org.apache.rocketmq.example.quickstart.Producer

示例输出

SendResult [sendStatus=SEND_OK, msgId=C0A84821BD1D5505405770783E0D03E4, offsetMsgId=C0A8483300002A9F000000000009616C, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-1, queueId=2], q
ueueOffset=125]
SendResult [sendStatus=SEND_OK, msgId=C0A84821BD1D5505405770783E0F03E5, offsetMsgId=C0A8483300002A9F000000000009625E, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-1, queueId=3], q
ueueOffset=125]
SendResult [sendStatus=SEND_OK, msgId=C0A84821BD1D5505405770783E1003E6, offsetMsgId=C0A8483300002A9F0000000000096350, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-1, queueId=4], q
ueueOffset=125]
SendResult [sendStatus=SEND_OK, msgId=C0A84821BD1D5505405770783E1203E7, offsetMsgId=C0A8483300002A9F0000000000096442, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-1, queueId=5], q
ueueOffset=125]

消费消息,连接到proxy VIP地址

export NAMESRV_ADDR=192.168.72.250:8080
tools.sh org.apache.rocketmq.example.quickstart.Consumer

示例输出

ConsumeMessageThread_please_rename_unique_group_name_4_5 Receive New Messages: [MessageExt [brokerName=broker-2, queueId=4, storeSize=242, queueOffset=100, sysFlag=0, bornTimestamp=1734869327582, bornHost=/192.168.72.31:57150, storeTimestamp=1734869327582, storeHost=/192.168.72.53:10911, msgId=C0A8483500002A9F000000000008A628, commitLogOffset=566824, bodyCRC=1500772453, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={CONSUME_START_TIME=1734869422248, MSG_REGION=DefaultRegion, UNIQ_KEY=C0A84821BD1D5505405770783ADE026E, CLUSTER=DefaultCluster, MIN_OFFSET=0, TAGS=TagA, WAIT=true, TRACE_ON=true, MAX_OFFSET=124}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 54, 50, 50], transactionId='null'}]] 
ConsumeMessageThread_please_rename_unique_group_name_4_14 Receive New Messages: [MessageExt [brokerName=broker-2, queueId=4, storeSize=242, queueOffset=98, sysFlag=0, bornTimestamp=1734869327512, bornHost=/192.168.72.31:57162, storeTimestamp=1734869327512, storeHost=/192.168.72.53:10911, msgId=C0A8483500002A9F0000000000089708, commitLogOffset=562952, bodyCRC=1456471771, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={CONSUME_START_TIME=1734869422248, MSG_REGION=DefaultRegion, UNIQ_KEY=C0A84821BD1D5505405770783A98024E, CLUSTER=DefaultCluster, MIN_OFFSET=0, TAGS=TagA, WAIT=true, TRACE_ON=true, MAX_OFFSET=124}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53, 57, 48], transactionId='null'}]] 
ConsumeMessageThread_please_rename_unique_group_name_4_4 Receive New Messages: [MessageExt [brokerName=broker-1, queueId=2, storeSize=242, queueOffset=120, sysFlag=0, bornTimestamp=1734869328255, bornHost=/192.168.72.31:52084, storeTimestamp=1734869328255, storeHost=/192.168.72.51:10911, msgId=C0A8483300002A9F0000000000093B9C, commitLogOffset=605084, bodyCRC=2121214082, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={CONSUME_START_TIME=1734869422240, MSG_REGION=DefaultRegion, UNIQ_KEY=C0A84821BD1D5505405770783D7F0394, CLUSTER=DefaultCluster, MIN_OFFSET=0, TAGS=TagA, WAIT=true, TRACE_ON=true, MAX_OFFSET=126}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 49, 54], transactionId='null'}]] 
ConsumeMessageThread_please_rename_unique_group_name_4_3 Receive New Messages: [MessageExt [brokerName=broker-1, queueId=2, storeSize=242, queueOffset=119, sysFlag=0, bornTimestamp=1734869328229, bornHost=/192.168.72.31:52064, storeTimestamp=1734869328229, storeHost=/192.168.72.51:10911, msgId=C0A8483300002A9F000000000009340C, commitLogOffset=603148, bodyCRC=236436726, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={CONSUME_START_TIME=1734869422239, MSG_REGION=DefaultRegion, UNIQ_KEY=C0A84821BD1D5505405770783D650384, CLUSTER=DefaultCluster, MIN_OFFSET=0, TAGS=TagA, WAIT=true, TRACE_ON=true, MAX_OFFSET=126}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 48, 48], transactionId='null'}]] 
ConsumeMessageThread_please_rename_unique_group_name_4_17 Receive New Messages: [MessageExt [brokerName=broker-2, queueId=5, storeSize=242, queueOffset=97, sysFlag=0, bornTimestamp=1734869327482, bornHost=/192.168.72.31:57146, storeTimestamp=1734869327482, storeHost=/192.168.72.53:10911, msgId=C0A8483500002A9F000000000008906A, commitLogOffset=561258, bodyCRC=942024666, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={CONSUME_START_TIME=1734869422221, MSG_REGION=DefaultRegion, UNIQ_KEY=C0A84821BD1D5505405770783A7A023F, CLUSTER=DefaultCluster, MIN_OFFSET=0, TAGS=TagA, WAIT=true, TRACE_ON=true, MAX_OFFSET=124}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53, 55, 53], transactionId='null'}]] 

查看dashboard,连接到dashboard VIP
在这里插入图片描述

查看topic状态
在这里插入图片描述

使用SDK生产与消费消息

使用Idea IDE新建JAVA项目,项目结构如下:
在这里插入图片描述
pom.xml代码示例

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>rocketmq-demo01</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>23</maven.compiler.source>
        <maven.compiler.target>23</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
            <version>5.0.7</version>
        </dependency>

        <!-- SLF4J API -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>2.0.16</version> <!-- 使用当前版本 -->
        </dependency>

        <!-- Logback Classic (SLF4J's default implementation) -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.5.15</version> <!-- 使用当前版本 -->
        </dependency>

        <!-- Logback Core -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.5.13</version> <!-- 使用当前版本 -->
        </dependency>

    </dependencies>
</project>

ProducerExample.java代码示例

package org.example;

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerExample {
    private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);


    public static void main(String[] args) throws ClientException {
        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。
        String endpoint = "192.168.72.250:8081";
        // 消息发送的目标Topic名称,需要提前创建。
        String topic = "TopicTest";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
        ClientConfiguration configuration = builder.build();
        // 初始化Producer时需要设置通信配置以及预绑定的Topic。
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();
        // 普通消息发送。
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                // 设置消息索引键,可根据关键字精确查找某条消息。
                .setKeys("messageKey")
                // 设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("messageTag")
                // 消息体。
                .setBody("messageBody".getBytes())
                .build();
        try {
            // 发送消息,需要关注发送结果,并捕获失败等异常。
            SendReceipt sendReceipt = producer.send(message);
            logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (ClientException e) {
            logger.error("Failed to send message", e);
        }
        // producer.close();
    }
}

配置参数说明:

  • String endpoint 指定proxy VIP地址及端口
  • String topic 指定topic,需提前创建

PushConsumerExample.java代码示例

package org.example;

import java.io.IOException;
import java.util.Collections;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PushConsumerExample {
    private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
        String endpoints = "192.168.72.250:8081";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .build();
        // 订阅消息的过滤规则,表示订阅所有Tag的消息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // 为消费者指定所属的消费者分组,Group需要提前创建。
        String consumerGroup = "YourConsumerGroup";
        // 指定需要订阅哪个目标Topic,Topic需要提前创建。
        String topic = "TopicTest";
        // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // 设置消费者分组。
                .setConsumerGroup(consumerGroup)
                // 设置预绑定的订阅关系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // 设置消费监听器。
                .setMessageListener(messageView -> {
                    // 处理消息并返回消费结果。
                    logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
                    return ConsumeResult.SUCCESS;
                })
                .build();
        Thread.sleep(Long.MAX_VALUE);
        // 如果不需要再使用 PushConsumer,可关闭该实例。
        // pushConsumer.close();
    }
}

配置参数说明:

  • String endpoint 指定proxy VIP地址及端口
  • String topic 指定topic,需提前创建

测试生产消息
在这里插入图片描述
测试消费消息
在这里插入图片描述


http://www.kler.cn/a/448083.html

相关文章:

  • Tool之Excalidraw:Excalidraw(开源的虚拟手绘风格白板)的简介、安装和使用方法、艾米莉应用之详细攻略
  • 前端使用 Konva 实现可视化设计器(20)- 性能优化、UI 美化
  • 2014年IMO第4题
  • 拆解一个微型气泵了解工作原理
  • 【数据安全】如何保证其安全
  • JVM性能优化一:初识内存泄露-内存溢出-垃圾回收
  • 【蓝桥杯选拔赛真题96】Scratch风车旋转 第十五届蓝桥杯scratch图形化编程 少儿编程创意编程选拔赛真题解析
  • tomcat的安装以及配置(基于linuxOS)
  • centos集群部署seata
  • Mono里运行C#脚本1
  • arXiv-2024 | 当视觉语言导航遇见自动驾驶!doScenes:基于自然语言指令的人车交互自主导航驾驶数据集
  • 【hackmyvm】eigthy 靶机wp
  • 无人机视频传输系统的通信能耗优化
  • 拷贝构造和赋值运算符重载
  • 质量小议51 - 茧房
  • 主要模型记录
  • Ubuntu系统安装MySQL
  • GA-BP分类-遗传算法(Genetic Algorithm)和反向传播算法(Backpropagation)
  • java全栈day18--Web后端实战(java操作数据库2)
  • Linux export命令
  • Elasticsearch:什么是查询语言?
  • C++ 杨辉三角 - 力扣(LeetCode)
  • 如何制作搞笑配音视频?操作方法
  • 智能电商:API接口如何驱动自动化与智能化转型
  • Yolo11改进策略:主干网络改进|FastVit与Yolo11完美融合,重参数重构Yolo11网络(全网首发)
  • 插入排序与计数排序;数据库的三范式