RabbitMQ - 1 ( 7000 字 RabbitMQ 入门级教程 )
一:
在互联网行业,许多公司喜欢用动物命名产品或作为公司的 Logo 和吉祥物,比如腾讯的企鹅、京东的狗、美团的袋鼠、携程的海豚,而阿里更是凭借蚂蚁、飞猪、天猫、菜鸟、闲鱼、盒马等,打造了一座“动物园”。Rabbit(兔子)也是一家公司的名字,其产品 RabbitMQ 是一个消息队列(Message Queue,MQ)服务,实现了 AMQP(高级消息队列协议),并成为当前主流的消息中间件之一。
1.1 什么是 MQ
MQ(Message Queue,消息队列)本质上是一个队列,遵循 FIFO(先入先出)原则,只不过队列中存放的是消息(message)。消息可以非常简单,比如仅包含文本字符串或 JSON 格式,也可以较为复杂,例如内嵌对象。MQ 通常用于分布式系统之间的通信,而系统之间的调用一般有两种方式:
通信方式 | 描述 |
---|---|
同步通信 | 数据从一端发出后,直接调用对方的服务,数据立即传递到另一端,实现实时响应。 |
异步通信 | 数据从一端发出后,先进入一个容器进行临时存储,达到某种条件后再由容器转发至另一端。容器的一种具体实现就是 MQ(消息队列)。 |
1.2 MQ 的作用
RabbitMQ 是一种 MQ 的实现,其主要功能是接收和转发消息。在不同的应用场景中,MQ 可以发挥多种作用。
场景 | 描述 |
---|---|
异步解耦 | 在业务流程中,对于一些耗时但无需即时返回结果的操作,可以通过 MQ 异步化处理。例如,用户注册后发送短信或邮件通知,可以作为异步任务,不必等待这些操作完成再返回注册成功结果。 |
流量削峰 | 在流量激增时,应用仍需保持正常运行,而直接为峰值流量配置资源会造成浪费。MQ 可通过流量削峰功能将请求排队,系统根据自身处理能力逐步处理。例如秒杀或促销活动中,MQ 可以缓解突发流量压力,防止系统崩溃。 |
消息分发 | 当多个系统需要对同一数据做出响应时,MQ 可用于消息分发。例如,支付成功后支付系统向 MQ 发送消息,其他订阅该消息的系统可以获取更新,无需轮询数据库。 |
延迟通知 | 在需要延迟发送通知的场景中,MQ 提供延迟消息功能。例如,在电商平台中,如果用户下单后未在指定时间内支付,可以通过延迟队列在超时后自动取消订单。 |
1.3 为什么学习 RabbitMQ
目前业界有多种 MQ 产品可供选择,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka 和 ZeroMQ 等,甚至也有直接使用 Redis 作为消息队列的案例。这些消息队列各有侧重,没有绝对的优劣之分,只有适不适合。在实际选型时,需要根据自身需求和 MQ 产品的特点进行综合考量。
产品 | 描述 |
---|---|
Kafka | Kafka 最初用于日志收集和传输,追求高吞吐量,性能卓越,单机吞吐量可达十万级。其在日志领域功能简单且成熟,主要支持基础的 MQ 功能,是日志采集需求的首选。 |
RocketMQ | RocketMQ 由阿里巴巴开源并捐赠给 Apache,使用 Java 开发。借鉴了 Kafka 的设计并进行了优化,经过多年双十一场景的验证,在可用性、可靠性和稳定性方面表现出色,适用于高可靠性和高并发场景,如互联网金融领域。但其客户端语言支持较少,社区活跃度一般。 |
RabbitMQ | RabbitMQ 使用 Erlang 开发,MQ 功能完备,几乎支持所有主流语言,且开源提供的管理界面友好。其性能较好,吞吐量达万级,社区活跃度高,适合中小型公司,尤其是在数据量和并发量不大的场景中使用。 |
由于 RabbitMQ 综合能力较强,且我们的项目并非超高并发,对 RabbitMQ 社区的成熟度和管理界面的友好性需求较高,因此接下来主要学习 RabbitMQ 的使用。
二: RabbitMQ 核心概念
RabbitMQ是⼀个消息中间件也是⼀个⽣产者消费者模型,它负责接收、存储并转发消息.,界面上的导航栏分为 6 部分,每部分都有其特定的功能含义。在深入了解之前,我们先来看看 RabbitMQ 的工作流程,以便更好地理解各部分的作用。
2.1 Producer 和 Consumer
生产者(Producer)负责创建消息并将其发布到 RabbitMQ 中。在实际应用中,消息通常是具有一定业务逻辑结构的数据,例如 JSON 字符串。消息可以附带标签,RabbitMQ 会根据这些标签进行路由,将消息发送给感兴趣的消费者(Consumer)。
消费者通过连接 RabbitMQ 服务器来消费消息。在消费过程中,消息的标签会被丢弃,消费者只接收消息内容,并不会知道消息的生产者是谁,也无需关心消息的来源。对于 RabbitMQ 而言,一个 Broker 可以理解为一个 RabbitMQ 服务节点或实例,大多数情况下可视为一台 RabbitMQ 服务器。
名称 | 描述 |
---|---|
Producer | 生产者,是 RabbitMQ Server 的客户端,负责向 RabbitMQ 发送消息。 |
Consumer | 消费者,也是 RabbitMQ Server 的客户端,负责从 RabbitMQ 接收消息。 |
Broker | RabbitMQ Server 本身,主要负责接收消息、存储消息并将消息分发给消费者。 |
2.2 Connection 和 Channel
名称 | 描述 |
---|---|
Connection | 连接。客户端和 RabbitMQ 服务器之间的 TCP 连接,是消息传递的基础,负责传输客户端与服务器之间的所有数据和控制信息。 |
Channel | 通道(信道)。Channel 是建立在 Connection 之上的抽象层,在一个 TCP 连接上可以创建多个 Channel,每个 Channel 是独立的虚拟连接。消息的发送和接收都是基于 Channel 的。 |
通道的主要作用是将消息的读写操作复用到同一个 TCP 连接上,减少连接的建立和关闭开销,从而提高系统性能。
2.3 Virtual host
Virtual host(虚拟主机)是一个逻辑概念,用于为消息队列提供隔离机制。在 RabbitMQ 中,一个 Broker Server 上可以包含多个 Virtual Host,不同用户可以通过划分 vhost 来独立使用 RabbitMQ 服务,在各自的 vhost 中创建和管理 exchange、queue 等资源。它类似于 MySQL 中的 “database” 概念,是逻辑上的集合,而非类似 VMware 那样的物理虚拟机。
2.4 Queue
Queue 是 RabbitMQ 的内部对象,用于存储消息。多个消费者可以同时订阅同一个队列,共同消费其中的消息。
2.5 Exchange
Exchange(交换机)是消息到达 RabbitMQ Broker 的第一站,负责接收生产者发送的消息,并根据特定的规则将这些消息路由到一个或多个队列中。交换机起到消息路由的作用,通过其类型和规则来决定如何转发消息。类似于快递物流,根据收件地址分派快递到不同的站点,再由站点配送到收件人手中,交换机的作用就相当于完成这一步的分配工作。
2.6 RabbitMQ 工作流程
在理解了上述概念之后,我们可以通过回顾这张图,进一步梳理 RabbitMQ 的工作流程。
- Producer(生产者)生成一条消息。
- Producer 连接到 RabbitMQ Broker,建立一个连接(Connection)并开启一个信道(Channel)。
- Producer 声明一个交换机(Exchange),用于路由消息。
- Producer 声明一个队列(Queue),用于存储消息。
- Producer 将消息发送到 RabbitMQ Broker。
- RabbitMQ Broker 接收消息并将其存入相应的队列(Queue)。如果找不到对应的队列,则根据生产者的配置选择丢弃消息或退回给生产者。
RabbitMQ 核心概念 | 对应的物流公司角色 |
---|---|
Broker | 类似物流公司的总部,负责协调和管理所有物流站点,确保包裹能够安全、高效地送达。 |
Virtual Host | 类似物流公司为不同客户或业务部门划分的独立运营中心。每个运营中心都有自己的仓库(Queue)、分拣规则(Exchange)和运输路线(Connection 和 Channel),以确保不同客户的包裹互不干扰,并提供定制化服务。 |
Exchange | 类似物流站点中的分拣中心。根据包裹上的标签决定包裹的目的地(队列)。不同类型的分拣中心可能有不同规则,如按地址分拣或将包裹复制给多个收件人等。 |
Queue | 类似物流站点中的仓库,用于临时存放等待派送的包裹。每个仓库都有一名或多名快递员(消费者),负责从仓库中取出包裹并派送给最终的收件人。 |
Connection | 类似快递员与快递站点之间的通信线路,快递员通过该线路接收派送任务(消息)。 |
Channel | 类似快递员在执行任务时使用的多个并行通信线路,允许快递员同时处理多个任务。例如,一边派送包裹,一边接收新的派送任务。 |
2.7 AMQP
AMQP(Advanced Message Queuing Protocol,高级消息队列协议)定义了一套完整的消息交换功能,包括交换器(Exchange)和队列(Queue)等组件。这些组件协同工作,使生产者能够将消息发送到交换器,随后由队列接收并存储,等待消费者取用。此外,AMQP 还定义了一个网络协议,用于支持客户端应用与消息代理之间的交互通信。
RabbitMQ 是 AMQP 协议的实现,使用 Erlang 语言开发。换句话说,RabbitMQ 遵循 AMQP 的规范,同时还支持其他协议,如 STOMP 和 MQTT。由于 RabbitMQ 的模型结构与 AMQP 的模型结构一致,它可以充分发挥 AMQP 的强大功能。
三:web 界面操作
RabbitMQ 管理界面中的 Connections、Channels、Exchange 和 Queues 对应的概念与上述流程图中的内容一致。其中,Overview 表示系统的概览视图,Admin 则用于用户和权限管理。在操作 RabbitMQ 前需要先创建一个 Virtual Host。接下来,我们将详细介绍具体的操作步骤。
3.1 用户相关操作
3.1.1 添加用户
- 点击 Admin -> Add user
- 设置账号密码及权限
步骤 | 描述 |
---|---|
① | 设置账号 |
② | 设置密码 |
③ | 确认密码 |
④ | 设置权限(Tags),如 Admin、Monitoring、Policymaker 等权限标签 |
- 观察用户是否添加成功
3.1.2 删除用户
- 点击要删除的用户, 查看用户详情
-
在用户详情页面, 进行更新或删除操作
-
设置对虚拟机的操作权限
-
更新 / 删除用户
-
3.1.3 退出当前用户
3.2 虚拟主机相关操作
3.2.1 创建虚拟主机
在 Admin 标签页下,点击右侧的 Virtual Hosts -> Add a new virtual host,然后设置虚拟主机的名称。
四: RabbitMQ 快速入门
- 引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
- 编写生产者代码
public class RabbitProducer {
public static void main(String[] args) throws Exception {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置连接参数
factory.setHost("110.41.51.65"); // 设置主机地址(默认值为localhost)
factory.setPort(15673); // 设置端口号(默认值为5672)
factory.setVirtualHost("bite"); // 设置虚拟主机名称(默认值为 "/")
factory.setUsername("study"); // 设置用户名(默认值为guest)
factory.setPassword("study"); // 设置密码(默认值为guest)
// 3. 创建连接
Connection connection = factory.newConnection();
// 4. 创建通道
Channel channel = connection.createChannel();
// 5. 声明队列
/*
* 1. queue: 队列名称
* 2. durable: 是否持久化(true:队列会持久化到磁盘,MQ重启后队列仍在)
* 3. exclusive: 是否独占(true:队列只允许当前连接使用,连接关闭后队列自动删除)
* 4. autoDelete: 是否自动删除(true:当队列中没有消息且没有消费者时自动删除)
* 5. arguments: 队列的其他参数(如 TTL、消息长度限制等)
*
* 如果指定的队列不存在,则会自动创建该队列;如果队列已存在,则使用已有队列。
*/
channel.queueDeclare("hello", true, false, false, null);
// 6. 发送消息到队列
/*
* 1. exchange: 交换机名称(简单模式下使用默认的""交换机)
* 2. routingKey: 路由键(此处设置为队列名称,决定消息发送到哪个队列)
* 3. props: 配置信息(如消息的优先级、过期时间等,可设为null)
* 4. body: 消息内容,字节数组形式
*
* 使用默认的""交换机时,routingKey 必须与队列名称一致,才能正确路由消息到指定队列。
*/
String msg = "Hello World"; // 消息内容
channel.basicPublish("", "hello", null, msg.getBytes()); // 发送消息
System.out.println(msg + " 消息发送成功");
// 7. 释放资源
channel.close(); // 关闭通道
connection.close(); // 关闭连接
}
}
- 编写消费者代码
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class RabbitmqConsumer {
public static void main(String[] args) throws Exception {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置连接参数
factory.setHost("110.41.51.65"); // 设置主机地址(默认值为 localhost)
factory.setPort(15673); // 设置端口号(默认值为 5672)
factory.setVirtualHost("bite"); // 设置虚拟主机名称(默认值为 "/")
factory.setUsername("study"); // 设置用户名(默认值为 guest)
factory.setPassword("study"); // 设置密码(默认值为 guest)
// 3. 创建连接
Connection connection = factory.newConnection();
// 4. 创建通道
Channel channel = connection.createChannel();
// 5. 声明队列
/*
* 1. queue: 队列名称
* 2. durable: 是否持久化(true:队列会持久化到磁盘,MQ 重启后队列仍在)
* 3. exclusive: 是否独占(true:队列只允许当前连接使用,连接关闭后队列自动删除)
* 4. autoDelete: 是否自动删除(true:当队列中没有消息且没有消费者时自动删除)
* 5. arguments: 队列的其他参数(如 TTL、消息长度限制等)
*
* 如果指定的队列不存在,则会自动创建该队列;如果队列已存在,则使用已有队列。
*/
channel.queueDeclare("hello", true, false, false, null);
// 6. 接收并消费消息
/*
* 1. queue: 队列名称
* 2. autoAck: 是否自动确认(true:消费者收到消息后自动告诉 MQ 消息已消费)
* 3. callback: 回调对象,处理接收到的消息
*/
DefaultConsumer consumer = new DefaultConsumer(channel) {
/*
* 回调方法,当收到消息后会自动执行该方法
* 参数说明:
* 1. consumerTag: 消费者标签(标识消费者)
* 2. envelope: 包含一些信息,如交换机、路由键
* 3. properties: 消息的配置信息
* 4. body: 消息内容(字节数组形式)
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息: " + new String(body)); // 打印接收到的消息
}
};
// 开始消费消息
channel.basicConsume("hello", true, consumer);
// 等待回调函数执行完毕
TimeUnit.SECONDS.sleep(5);
// 7. 释放资源
// 消费者是一个监听程序,通常情况下不需要主动关闭资源,如果需要手动关闭,可以按照以下顺序
// channel.close();
// connection.close();
}
}
生产者和消费者使用的通道(channel)并不是同一个。当一个新的 RabbitMQ 节点启动时,它会预先声明几个内置交换机,其中默认的内置交换机名称为空字符串(“”)。生产者发送的消息可以根据队列名称直接路由到对应的队列。
如果有一个名为 “hello” 的队列,生产者可以直接将消息发送到 “hello” 队列,而消费者可以从 “hello” 队列中接收消息,而无需关注交换机的存在。这种模式非常适合简单的应用场景,尤其是生产者和消费者之间的一对一通信。