智能BI项目第六期
本期任务
- 分析系统现在的不足
- 分布式消息队列
- 分布式消息队列 RabbitMQ 入门实战
系统现状不足分析总结
让我们来讨论一下单机系统的问题。
现状:我们的异步处理是通过本地线程池实现的。
但是存在以下问题:
- 无法集中限制,仅能单机限制:**如果AI服务的使用限制为同时只能有两个用户,我们可以通过限制单个线程池的最大核心线程数为 2 来实现这个限制。但是,当系统需要扩展到分布式,即多台服务器时,每台服务器都需要有 2 个线程,这样就可能会产生 2N 个线程,超过 AI 服务的限制。因此,我们需要一个集中的管理方式来分发任务,例如统一存储当前正在执行的任务数。
- 由于任务存储在内存中执行,可能会丢失:我们可以通过从数据库中人工提取并重试,但这需要额外的开发(如定时任务)。然而,重试的场景是非常典型的,我们不需要过于关注或自行实现。一个可能的解决方案是将任务存储在可以持久化的硬盘中。
- 优化:随着系统功能的增加,长耗时任务也越来越多,系统会变得越来越复杂(比如需要开启多个线程池,可能出现资源抢占的问题)。一种可能的解决方案是服务拆分,即应用解耦。我们可以将长耗时、资源消耗大的任务单独抽成一个程序,以避免影响主业务。此外,我们也可以引入一个"中间人",让它帮助我们连接两个系统,比如核心系统和智能生成业务。
分布式消息队列
1. 中间件
连接多个系统,帮助多个系统紧密协作的技术(或者组件)。
什么是中间件?可以将其视为在开发系统或多个应用时,用于连接多个系统或使多个系统紧密协作的工具。常用的中间件包括 Redis,消息队列,以及分布式存储如 ETCD 等。
事实上,如果非要定义,数据库也可以被视为一种中间件。例如,假设我们有两个系统,系统 A 和系统 B,他们需要共享同一份数据。这时,我们可以将数据存储在数据库中,这样,数据库就像是连接这两个系统的“中间人”,即满足中间件的定义。
在 NodeJS 中,中间件的含义稍有不同,它指的是在不影响业务逻辑的前提下,增强系统功能的工具,连接通用功能和系统。
以智能 BI 后台为例,原有的系统包括用户管理和图表管理等。随着系统应用量的增加,智能分析业务可能会消耗大量资源。此时,我们可能会考虑将智能分析业务抽出,创建一个单独的智能分析服务,专门负责生成图表。
问:如何让这两个系统紧密协作呢?
答:这时,我们就可以使用中间件。
例如,我们可以使用 Redis 来存储共享数据。主系统和智能分析服务都可以读取 Redis 中的数据。这样,Redis 就充当了一个"中间人"的角色,连接了这两个系统,这就是中间件的概念。
2. 消息队列
消息队列:用于存储信息的队列。
此处的关键词有三个:存储、消息、队列。
- 存储:对数据的储存能力。
- 消息:指任何形式的数据结构,例如字符串、对象、二进制数据、JSON等。
- 队列:具有先进先出特性的数据结构。
问:消息队列是特殊的数据库么?
答:也可以这么理解,但是消息队列的核心作用不止于存储数据。
消息队列的应用场景(作用):在多个不同的系统、应用之间实现消息的传输(也可以存储)。不需要考虑传输应用的编程语言、系统、框架等等。例如,可以让 java 开发的应用发消息,让 php 开发的应用收消息,这样就不用把所有代码写到同一个项目里(应用解耦)。
3. 消息队列的模型
消息队列主要由四部分组成:消息生产者(Producer)、消息消费者(Consumer)、消息(Message)和消息队列(Queue)
消息队列的一个主要优点就是可以集中存储消息,使得消息的发送者和接收者无需同时在线,实现了发送者和接收者的解耦。这就是消息队列的核心作用,以及为什么我们需要使用消息队列的原因。
4. 消息队列的优势
-
异步处理:一旦生产者发送完消息,便可以立即转向其他任务,而消费者则可以在任何时候开始处理消息。这样一来,生产者和消费者之间就不会发生阻塞。
-
削峰填谷:消息队列允许我们先将用户请求存储起来,然后消费者(或说实际执行任务的应用)可以根据自身的处理能力和需求,逐步从队列中取出并处理请求。
- 虽然线程池也能实现削峰填谷的效果,但它并没有消息队列这样的存储灵活性,或者说,消息队列能实现的持久化存储。
5. 分布式消息队列的优势
尽管线程池有些许消息队列的影子,例如本地开启一个数组队列也能实现类似功能,但分布式消息队列有其独特的优势。
1)数据持久化:它可以把消息集中存储到硬盘里,服务器重启就不会丢失
2) 可扩展性:可以根据需求,随时增加(或减少)节点,继续保持稳定的服务
3) 应用解耦:可以连接各个不同语言、框架开发的系统,让这些系统能够灵活传输读取数据
应用解耦的优点:
以前,把所有功能放到同一个项目中,调用多个子功能时,一个环节错,系统就整体出错。
使用消息队列进行解耦:
- 一个系统挂了,不影响另一个系统
- 系统挂了并恢复后,仍然可以取出消息,继续执行业务逻辑
- 只要发送消息到队列,就可以立刻返回,不用同步调用所有系统,性能更高
4) 发布订阅:如果一个非常大的系统要给其他子系统发送通知,最简单直接的方式是大系统直接依次调用小系统。
问题:
- 每次发通知都要调用很多系统,很麻烦、有可能失败
- 新出现的项目(或者说大项目感知不到的项目)无法得到通知
解决方案:大的核心系统始终往一个地方(消息队列)去发消息,其他的系统都去订阅这个消息队列(读取这个消息队列中的消息)
6. 消息队列的应用场景
- 耗时的场景(异步)
- 高并发场景(异步、削峰填谷)
- 分布式系统协作(尤其是跨团队、跨业务协作,应用解耦)
- 强稳定性的场景(比如金融业务,持久化、可靠性、削峰填谷)
7.消息队列的缺点
使你的系统变得更复杂,并需要更多的维护工作。若你在公司实施此类解决方案,我们通常会选择由第三方大公司提供的稳定中间件,而这会产生额外的成本。即使你自己部署和维护,也需要额外的资源投入。
另外,一旦你开始使用消息队列,你就需要承担由此带来的各种可能问题,例如消息丢失。并非消息队列就可以保证消息不会丢失,比如在发送消息的过程中,可能就因为某些原因而失败。
再者,你需要保证消息的顺序性,即消息需要按照特定的顺序被消费。此外,你还需要防止消息的重复消费,避免同一个系统多次处理同一条消息。同时,你还需要保证数据的一致性,这并非是消息队列的特定问题,而是任何分布式系统都需要考虑的问题。例如,分布式锁就是为了解决分布式系统中多个服务器之间的一致性问题。
8. 主流分布式消息队列选型
主流技术
- activemq
- rabbitmq
- kafka
- rocketmq
- zeromq
- 吞吐量:IO、并发
- 时效性:类似延迟,消息的发送、到达时间
- 可用性:系统可用的比率(比如 1 年 365 天宕机 1s,可用率大概 X 个 9)
- 可靠性:消息不丢失(比如不丢失订单)、功能正常完成
技术名称 | 吞吐量 | 时效性 | 可用性 | 可靠性 | 优势 | 应用场景 |
---|---|---|---|---|---|---|
activemq | 万级 | 高 | 高 | 高 | 简单易学 | 中小型企业、项目 |
rabbitmq | 万级 | 极高(微秒) | 高 | 高 | 生态好(基本什么语言都支持)、时效性高、易学 | 适合绝大多数分布式的应用,这也是先学他的原因 |
kafka | 十万级 | 高(毫秒以内) | 极高 | 极高 | 吞吐量大、可靠性、可用性,强大的数据流处理能力 | 适用于大规模处理数据的场景,比如构建日志收集系统、实时数据流传输、事件流收集传输 |
rocketmq | 十万级 | 高(ms) | 极高 | 极高 | 吞吐量大、可靠性、可用性,可扩展性 | 适用于金融 、电商等对可靠性要求较高的场景,适合大规模的消息处理。 |
pulsar | 十万级 | 高(ms) | 极高 | 极高 | 可靠性、可用性很高,基于发布订阅模型,新兴(技术架构先进) | 适合大规模、高并发的分布式系统(云原生)。适合实时分析、事件流处理、IoT 数据处理等。 |
RabbitMQ 入门实战
1. 基本概念
首先,我们要介绍一个基本概念,也就是 RabbitMQ 中的 AMQP 协议。
那么,什么是 AMQP 呢?AMQP 的全称是 Advanced Message Queue Protocol,即高级消息队列协议。RabbitMQ 就是根据这个协议开发的。AMQP 是一个标准的协议,不仅 RabbitMQ,如果你想自己实现一个消息队列,也可以按照这个协议来设计。
AMQP 协议主要由几个部分组成,如下图所示,它非常适合我们来解释这个协议的各个组成部分。
2. 安装
首先打开 RabbitMQ 官网,点击 Get Started(开始)。
Installing on Windows | RabbitMQ
这里看不懂英文的可以翻译成中文,安装RabbitMQ之前要先安装Erlang
Erlang:就像 C++ 和 Java 一样,也是一种编程语言,虽然它的应用相对较小众。然而,它的一大优势在于其出色的性能。
下载的时候注意版本对应
Erlang 和 Elixir 软件包下载 - Erlang Solutions (erlang-solutions.com)
Erlang和RabbitMQ下载好后打开服务开一下是否真的下载好了
鱼皮提供:rabbitmq 3.12.0、Erlang 25.3
下载这里不主要介绍,网上会有很多教程
接下来还需要用到一个东西,大家如果说以后安装一些第三方的这种中间件,基本上都会有的。就是肯定是这种中间件会有一个管理页面。回到官网,点击 Docs
(文档)。
找到 rabbitmq-plugins(插件管理)。
rabbitmq-plugins enable rabbitmq_shovel rabbitmq_management
由于是 windows 系统,这个命令不能直接运行;
windows 系统所有的脚本全是 bat 文件,所以要给这条命令加上 .bat,然后复制这条命令。
找到安装 RabbitMQ 路径的位置,进入sbin目录
,这里面都是 RabbitMQ 的执行脚本。
- rabbitmq-server.bat:操作 RabbitMQ 服务器相关的命令。
- rabbitmq-plugins.bat:用来安装 RabbitMQ 的插件。
在目录栏上输出cmd
。
将命令粘贴到此处,回车,显示已经启动了,并配置了一些插件。
这里显示要重新启动后才生效,输入以下命令。
# 关闭服务 net stop rabbitmq
# 开启服务 net start rabbitmq
访问 http://localhost:15672,默认用户名密码都是 guest(文档有说明),点击Login
;
文档还有说明端口访问的端口号,15672 是监控面板的 UI;
ps.里面的 5672、5671 很重要,我们的应用程序之间就是通过 5672 端口来和 RabbitMQ 做链接的,就像 redis 的6379,es 的 9200是一样的。
登录 RabbitMQ 的管理页面。
就能看见 RabbitMQ 的管理页面。
注意:如果你打算在自己的服务器上部署 RabbitMQ,你必须知道在默认设置下,使用 guest 账号是无法访问的。为何会这样呢?原因在于系统安全性的需求。如果你想从远程服务器访问管理面板,你需要创建一个新的管理员账号,不能使用默认的 guest 账号,否则会被系统拦截,导致无法访问或登录。
点击右侧的 User guest,会发现该用户拥有管理员权限,但这仅限于本地访问。如果你直接上线并使用默认的管理员权限,你可以想象可能出现的问题。比如,攻击者可能会扫描你服务器的端口,并不断尝试用 guest 账号登录。一旦找到一个开放的端口并用 guest 登录,攻击者就可以入侵你的服务器,然后在消息队列中不断添加消息,填满你的服务器硬盘。因此,为了安全,系统默认关闭了 guest 账号的远程访问权限,这也是官方出于安全考虑的措施。关于如何创建管理员账号,在这里就不演示,可以参考官方文档进行操作 —— 官方文档的 Adding a User。
3. 快速入门
3.1 "Hello World"
大家可以翻译成中文看
RabbitMQ tutorial - "Hello World!" | RabbitMQ
这里有很详细的解释
这里消息队列的模型为:
一个生产者给一个队列发消息,一个消费者从这个队列取消息(1 对 1)。
它提示你可以去 Maven repository 搜索 java 的客户端下载。
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.17.0</version>
</dependency>
继续往下看
rabbitmq-tutorials/java/Send.java at main · rabbitmq/rabbitmq-tutorials · GitHub
package com.yupi.springbootinit.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
// 定义一个名为SingleProducer的公开类,用于实现消息发送功能
public class SingleProducer {
// 定义一个静态常量字符串QUEUE_NAME,它的值为"hello",表示我们要向名为"hello"的队列发送消息
private final static String QUEUE_NAME = "hello";
// 定义程序的入口点:一个公开的静态main方法,它抛出Exception异常
public static void main(String[] argv) throws Exception {
// 创建一个ConnectionFactory对象,这个对象可以用于创建到RabbitMQ服务器的连接
ConnectionFactory factory = new ConnectionFactory();
// 设置ConnectionFactory的主机名为"localhost",这表示我们将连接到本地运行的RabbitMQ服务器
factory.setHost("localhost");
// 如果你改了本地的用户名和密码,你可能要指定userName、userPassword,
// 如果改了本地的端口,还要改Port。
// 那我们这里不需要,我们这里就用默认的localhost,默认的用户名和密码,就是guest
// factory.setUsername();
// factory.setPassword();
// factory.setPort();
// 使用ConnectionFactory创建一个新的连接,这个连接用于和RabbitMQ服务器进行交互
try (Connection connection = factory.newConnection();
// 通过已建立的连接创建一个新的频道
Channel channel = connection.createChannel()) {
// 在通道上声明一个队列,我们在此指定的队列名为"hello"
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 创建要发送的消息,这里我们将要发送的消息内容设置为"Hello World!"
String message = "Hello World!";
// 使用channel.basicPublish方法将消息发布到指定的队列中。这里我们指定的队列名为"hello"
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
// 使用channel.basicPublish方法将消息发布到指定的队列中。这里我们指定的队列名为"hello"
System.out.println(" [x] Sent '" + message + "'");
}
}
将这段代码粘贴到我们的项目中进行测试,为了方便区分,我们将它命名为SingleProducer
🪔 什么是频道channel?
你可以将频道看作客户端。你可能已经接触过其他类型的客户端,如 JDBC(用于连接数据库)和 Redis Client(用于操作缓存)。在这种情况下,你可以将频道看作是用于操作消息队列的客户端。
频道(Channel)提供了一种与 RabbitMQ 服务器通信的方法,它可以帮助我们建立与 RabbitMQ 服务器的连接。如果你之前学习过 Netty,对 Channel 的概念可能更为熟悉。RabbitMQ 保留 Channel 的原因在于它可以帮助我们实现连接复用,从而提高传输效率。所有的任务都交给 Channel 去统一处理。
核心方法
- queueName:消息队列名称(注意,同名称的消息队列,只能用同样的参数创建一次)
- durabale:消息队列重启后,消息是否丢失
- exclusive:是否只允许当前这个创建消息队列的连接操作消息队列
- autoDelete:没有人用队列后,是否要删除队列
启动该项目后即可到http://localhost:15672去查看
消息发送好之后,接下来要去消费这个消息,继续看官方文档,查看完整版的消费者代码。
rabbitmq-tutorials/java/Recv.java at main · rabbitmq/rabbitmq-tutorials · GitHub
package com.ptu.bi.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
public class SingleConsumer {
// 定义我们正在监听的队列名称
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// 创建连接,创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接工厂的主机名,这里我们连接的是本地的RabbitMQ服务器
factory.setHost("localhost");
// 从工厂获取一个新的连接
Connection connection = factory.newConnection();
// 从连接中创建一个新的频道
Channel channel = connection.createChannel();
// 创建队列,在该频道上声明我们正在监听的队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 在控制台打印等待接收消息的信息
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 定义了如何处理消息,创建一个新的DeliverCallback来处理接收到的消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 将消息体转换为字符串
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
// 在控制台打印已接收消息的信息
System.out.println(" [x] Received '" + message + "'");
};
// 在频道上开始消费队列中的消息,接收到的消息会传递给deliverCallback来处理,会持续阻塞
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
为什么在这里都要创建队列呢?
主要是为了确保该队列的存在,否则在后续的操作中可能会出现错误。主要是为了这点,即便你的队列原本并不存在,此语句也能够帮你创建一个新的队列。但是需要特别注意一点,如果你的队列已经存在,并且你想再次执行声明队列的操作,那么所有的参数必须与之前的设置完全一致。这是因为一旦一个队列已经被创建,就不能再创建一个与其参数不一致的同名队列。可以类比为,一旦你建好了一个快递站,就不能再在同一位置建立一个与之不同的快递站。
注意:请确保消费队列的名称与发送消息的队列名称保持一致。所以在这里,我们统一使用"hello"作为队列名。
运行一下
再去打开客户端,已经被消费了
点击消息队列的名字(hello),即可查看消费情况
3.2 Work Queues
一对一的消息队列模型整完了,然后来试试 Work Queues
。
RabbitMQ tutorial - Work Queues | RabbitMQ
这里消息队列的模型为:一个生产者给一个队列发消息,多个消费者 从这个队列取消息(1 对多)。
这里的图和之前 hello world 的图有什么区别?是不是多了一个消费者;
就像一个快递员都往一个快递站里发快递,但是取快递可以是多个人。
适用的场景:多个机器同时去接受并处理任务(尤其是每个机器的处理能力有限)
- 这有点像发布-订阅模型,适用于什么场景呢?假设我们有一个消费者,由于其性能有限,可能无法处理所有任务。此时,我们可以增加机器来提高处理能力。假设一个生产者在不断地生成任务,但一个消费者无法全部处理,那么我们可以增加一个消费者来共同完成这些任务。因此,这种场景特别适合于多个机器同时接收并处理任务,尤其是在每个机器的处理能力有限的情况下。
这里有很多重要的思想,由于时间原因,我放到下一期一起记录,我们直接看代码
这里集齐生产和消费者的代码
package com.ptu.bi.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.util.Scanner;
public class MultiProducer {
// 定义任务队列的名称
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ服务器地址
factory.setHost("localhost");
// 使用try-with-resources语句确保资源能够自动关闭
try (Connection connection = factory.newConnection(); // 建立到RabbitMQ服务器的连接
Channel channel = connection.createChannel()) { // 创建信道
// 声明一个持久化的队列
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// 创建一个扫描器读取控制台输入
Scanner sc = new Scanner(System.in);
// 循环读取用户输入的消息并发送到队列
while (sc.hasNext()) {
// 从控制台读取一行消息
String message = sc.nextLine();
// 发送消息到指定队列
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN, // 消息属性设置为持久化
message.getBytes("UTF-8"));
// 输出发送的消息
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}
package com.ptu.bi.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class MultiConsumer {
// 定义任务队列的名称
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ服务器地址
factory.setHost("localhost");
// 建立到RabbitMQ服务器的连接
final Connection connection = factory.newConnection();
// 创建两个消费者
for (int i = 0; i < 2; i++) {
// 为每个消费者创建一个新的信道
final Channel channel = connection.createChannel();
// 声明一个持久化的队列
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// 输出等待消息的信息
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 控制单个消费者处理任务积压数,每个消费者最多处理1个任务
channel.basicQos(1);
// 定义一个变量用于标识消费者编号
int finalI = i;
// 定义一个回调方法来处理接收到的消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 将消息体转换为字符串
String message = new String(delivery.getBody(), "UTF-8");
try {
// 处理工作
System.out.println(" [x] Received '" + "编号:" + finalI + " " + message + "'");
// 确认消息接收
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// 模拟长时间处理任务
Thread.sleep(20000); // 暂停20秒
} catch (InterruptedException e) {
// 处理中断异常
e.printStackTrace();
// 否定确认消息接收
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
} finally {
// 输出完成信息
System.out.println(" [x] Done");
// 再次确认消息接收
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 执行消费,开启消费监听
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}
}
这里对原本的生产者代码做了一些改进
启动后,每次读入一行,发送到消息队列TASK_QUEUE_NAME里,如果你发错了一些东西,可以继续到rabbitmq的客户端,点击队列名称,删除队列
直接用java代码删除也可以
Channel channel = ...; // 获取RabbitMQ的Channel
channel.queueDelete("你的队列名");
消费者一开始的代码
public class MultiConsumer {
// 声明队列名称为"multi_queue"
private static final String TASK_QUEUE_NAME = "multi_queue";
public static void main(String[] argv) throws Exception {
// 创建一个新的连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接工厂的主机地址
factory.setHost("localhost");
// 从工厂获取一个新的连接
final Connection connection = factory.newConnection();
// 从连接获取一个新的通道
final Channel channel = connection.createChannel();
// 声明一个队列,并设置属性:队列名称,持久化,非排他,非自动删除,其他参数;如果队列不存在,则创建它
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// 在控制台打印等待消息的提示信息
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// (这个先注释)设置预取计数为1,这样RabbitMQ就会在给消费者新消息之前等待先前的消息被确认
// channel.basicQos(1);
// 创建消息接收回调函数,以便接收消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 将接收到的消息转为字符串
String message = new String(delivery.getBody(), "UTF-8");
try {
// (放到try里)打印出接收到的消息
System.out.println(" [x] Received '" + message + "'");
// 处理工作,模拟处理消息所花费的时间,机器处理能力有限(接收一条消息,20秒后再接收下一条消息)
Thread.sleep(20000);
} catch (InterruptedException e) {
// 模拟处理消息所花费的时间
e.printStackTrace();
} finally {
// 打印出完成消息处理的提示
System.out.println(" [x] Done");
// 手动发送应答,告诉RabbitMQ消息已经被处理
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 开始消费消息,传入队列名称,是否自动确认,投递回调和消费者取消回调
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}
重点是
// (这个先注释)设置预取计数为1,这样RabbitMQ就会在给消费者新消息之前等待先前的消息被确认
// channel.basicQos(1);
我们在这里设置了每次等待20s,再继续消费(模拟机器处理能力有限)
启动后我们将会发现每过20s,才会处理一次消息
如果我们要提高并发能力,就把channel.basicQos(1)打开,使用for循环,创建多个消费者进行消费
package com.yupi.springbootinit.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class MultiConsumer {
private static final String TASK_QUEUE_NAME = "multi_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
for (int i = 0; i < 2; i++) {
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 设置预取计数为 1,这样RabbitMQ就会在给消费者新消息之前等待先前的消息被确认
channel.basicQos(1);
int finalI = i;
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
try {
System.out.println(" [x] Received '" + "编号:"+ finalI + ":" + message + "'");
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}
}
这样一次就可以处理多个任务,如果处理不完,再等20s,继续处理下一批
注意最后一行代码
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
这里有个很有意思的地方,第二个参数叫 autoack,默认为 false —— 消息确认机制。
消息队列如何确保消费者已经成功取出消息呢?它依赖一个称为消息确认的机制。当消费者从队列中取走消息后,必须对此进行确认。这就像在收到快递后确认收货一样,这样消息队列才能知道消费者已经成功取走了消息,并能安心地停止传输。因此,整个过程就像这样。
在面试中,如果被问及如何保证消息不会丢失,例如当业务流程失败时该怎么办,你可以回答:“我们可以选择拒绝接收失败的消息,并重新启动。重新启动或进行其他处理可以指定拒绝某条消息。”
消息确认机制
为了保证消息成功被消费(快递成功被取走),rabbitmq 提供了消息确认机制,当消费者接收到消息后,比如要给一个反馈:
- ack:消费成功
- nack:消费失败
- reject:拒绝
如果告诉 rabbitmq 服务器消费成功,服务器才会放心地移除消息。
// 执行消费,开启消费监听
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
但是,如果在接收到消息后,工作尚未完成,我们是否就不需要确认成功呢?这种情况,建议将 autoack 设置为 false,根据实际情况手动进行确认了。
接收请求
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
拒绝请求
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
第二个参数 'multiple' 表示批量确认,也就是说,是否需要一次性确认所有的历史消息,直到当前这条消息为止。
第 3 个参数表示是否重新入队,可用于重试。
在消息消费的流程中,如果你拒绝处理某条消息,那么该消息将被标记为失败。这意味着它将返回到队列中,等待重新处理。如果你选择不重新入队该消息,那么这条消息将被丢弃,不会再被消费者处理。因此,拒绝消息相当于将其废弃掉,不再进行后续处理。这是保证消息不丢失的一部分流程。