RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)
文章目录
- RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)
- 一、引言
- 二、简介
- 三、准备工作
- 3.1 说明
- 3.2 生成项目
- 四、实战
- 4.1 交换机(Exchanges)
- 4.2 临时队列(Temporary Queues)
- 4.3 绑定(Bindings)
- 4.4 整合代码
- 发布程序
- 订阅程序
- 4.5 验证一:先广播后订阅
- 4.6 验证二:先订阅后广播
- 五、结论
RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)
一、引言
在快节奏的软件开发世界中,我们经常面临需要将消息发送给多个接收者的场景。例如,在构建日志监控系统、实时通知系统等场景时,我们希望一个事件的发生能够被多个服务同时感知和处理。这时,发布/订阅模式(Publish/Subscribe)就显得尤为重要。在本教程中,我们将通过一个简单的例子来学习如何使用RabbitMQ实现发布/订阅模式。
二、简介
在上一篇教程中,我们学习了如何使用RabbitMQ实现工作队列(Work Queues)。今天,我们将探索工作队列的进阶应用——发布/订阅模式,这是一种允许多个接收者(Subscribers)监听同一个消息通道,并在消息发布时接收通知的机制。发布/订阅模式的核心在于解耦消息的发送者(Publisher)和接收者(Subscribers),发送者不需要知道有哪些接收者,只需要将消息发送到一个交换机(Exchange),而接收者则订阅这个交换机来接收消息。
三、准备工作
3.1 说明
在本教程中,我们将使用RabbitMQ的.NET客户端来创建一个简单的发布/订阅系统。我们将创建一个名为logs的fanout类型的交换机,并将所有日志消息广播给所有订阅了该交换机的队列。
3.2 生成项目
首先,我们需要生成两个项目:
EmitLogApp
:用于模拟日志消息的发布者。
ReceiveLogsApp
:用于接收并打印日志消息的订阅者。
我们可以使用以下命令来创建这两个项目:
dotnet new console --name EmitLog
cd EmitLog
dotnet add package RabbitMQ.Client
cd ..
dotnet new console --name ReceiveLogs
cd ReceiveLogs
dotnet add package RabbitMQ.Client
这些命令创建了两个新的控制台应用程序,一个用于发送日志消息,另一个用于接收并打印日志消息。
四、实战
4.1 交换机(Exchanges)
在之前的教程中,我们直接将消息发送到队列。现在,我们需要引入交换机(Exchange)的概念。在RabbitMQ中,生产者从不直接向队列发送消息,而是发送到交换机,然后由交换机将消息推送到一个或多个队列。交换机的行为由交换机类型定义。
我们将创建一个名为logs
的fanout
类型的交换机,它将广播所有接收到的消息给所有绑定到它的队列。
channel.ExchangeDeclare("logs", ExchangeType.Fanout);
4.2 临时队列(Temporary Queues)
在我们的日志系统中,我们希望每个运行的接收者程序都能接收到所有日志消息。因此,我们不需要为队列指定名称,而是让服务器为我们生成一个随机名称。同时,我们希望在消费者断开连接后队列能自动删除。
在.NET客户端中,我们可以创建一个非持久性、独占的、自动删除的队列,并让服务器为我们生成一个名称:
var queueName = channel.QueueDeclare().QueueName;
4.3 绑定(Bindings)
我们已经创建了一个fanout
交换机和一个队列。现在,我们需要告诉交换机将消息发送到我们的队列。交换机和队列之间的关系称为绑定(Binding)。
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: string.Empty);
4.4 整合代码
发布程序
using RabbitMQ.Client;
using System.Text;
await PublishMessagesAsync(10);
/// <summary>
/// 发布指定次数的消息到RabbitMQ队列
/// </summary>
/// <param name="loopCount">消息发送的次数</param>
/// <returns>Task对象,表示异步操作</returns>
async Task PublishMessagesAsync(int loopCount)
{
// 循环发送指定次数的消息
for (int i = 1; i <= loopCount; i++)
{
// 调用SendMessageToQueue方法发送消息,并包含当前迭代次数
await SendMessageToQueue($"Iteration {i} - Hello World");
// 这里可以添加延迟,如果需要的话
// await Task.Delay(1000);
}
Console.ReadLine();
}
/// <summary>
/// 向RabbitMQ队列发送一条消息
/// </summary>
/// <param name="message">要发送的消息内容</param>
/// <returns>Task对象,表示异步操作</returns>
async Task SendMessageToQueue(string message)
{
// 创建连接工厂,并设置RabbitMQ服务器地址为localhost
var factory = new ConnectionFactory { HostName = "localhost" };
// 使用异步方式创建连接
using var connection = await factory.CreateConnectionAsync();
// 使用异步方式创建通道
using var channel = await connection.CreateChannelAsync();
//声明名为"logs"的fanout类型的交换机
await channel.ExchangeDeclareAsync(exchange: "logs", type: ExchangeType.Fanout);
// 将消息内容编码为字节数组
var body = Encoding.UTF8.GetBytes(message);
// 异步发布消息到队列
await channel.BasicPublishAsync(exchange: "logs", routingKey: string.Empty, body: body);
Console.WriteLine($" [x] Sent {message}");
}
订阅程序
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
// 创建连接工厂,并设置RabbitMQ服务器地址为localhos
var factory = new ConnectionFactory { HostName = "localhost" };
// 使用异步方式创建连接
using var connection = await factory.CreateConnectionAsync();
// 使用异步方式创建通道
using var channel = await connection.CreateChannelAsync();
// 异步声明一个名为"logs"的fanout类型的交换机
// 交换机会将所有接收到的消息广播给所有绑定到它的队列
await channel.ExchangeDeclareAsync(exchange: "logs",
type: ExchangeType.Fanout);
// 声明一个由服务器命名的队列,这样每个消费者都会有一个唯一的队列
// 这使得我们可以有多个消费者同时接收消息,而不会相互干扰
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;
// 将由服务器创建的队列绑定到"logs"交换机
// 这样,交换机就会将消息发送到这个队列
await channel.QueueBindAsync(queue: queueName, exchange: "logs", routingKey: string.Empty);
// 输出提示信息,表示消费者正在等待日志消息
Console.WriteLine(" [*] Waiting for logs.");
// 创建一个异步事件驱动的消费者
var consumer = new AsyncEventingBasicConsumer(channel);
// 设置当消费者接收到消息时的事件处理程序
consumer.ReceivedAsync += (model, ea) =>
{
// 从接收到的消息中提取消息体并转换为字符串
byte[] body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
// 打印接收到的消息
Console.WriteLine($" [x] {message}");
// 返回Task.CompletedTask以满足异步事件处理的签名要求
return Task.CompletedTask;
};
// 开始消费指定队列的消息
// 这个调用会告诉RabbitMQ服务器,我们有一个消费者准备好接收"logs"交换机绑定的队列中的消息
await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
4.5 验证一:先广播后订阅
运行EmitLogApp\bin\Debug\net8.0\EmitLogApp.exe
,即先发布广播;再运行ReceiveLogsApp\bin\Debug\net8.0\ReceiveLogsApp.exe
进行订阅广播。可以发现,没有接收到任何内容。原因是需要先启动订阅者(消费者),再启动广播(发布者/生产者)才可以接收到消息
4.6 验证二:先订阅后广播
先运行ReceiveLogsApp\bin\Debug\net8.0\ReceiveLogsApp.exe
;
再运行EmitLogApp\bin\Debug\net8.0\EmitLogApp.exe
;可以发现每个消费者(订阅者)都收到了相同信息。
五、结论
在本教程中,我们深入探讨了RabbitMQ发布/订阅模式的概念和实现。通过构建一个简单的日志系统,我们学习了如何创建fanout
类型的交换机,以及如何发送和接收消息。以下是我们从本教程中获得的关键要点:
-
解耦发送者和接收者:发布/订阅模式允许发送者和接收者之间没有直接的联系,发送者只需要将消息发送到交换机,而接收者则订阅交换机来接收消息。
-
消息广播:
fanout
类型的交换机会将所有接收到的消息广播给所有绑定到它的队列,这对于日志系统、事件通知等场景非常有用。 -
临时队列:我们使用了临时队列来接收消息,这样每个订阅者都会有自己的队列,并且在订阅者断开连接后,队列会自动删除。
-
动态订阅:订阅者可以随时订阅或取消订阅交换机,这使得系统具有很高的灵活性和动态性。
通过这些机制,我们能够建立一个高效的发布/订阅系统,它不仅能够提高系统的响应速度,还能够在面对各种异常情况时保持消息的可靠性和持久性。这些知识为我们在实际开发中实现复杂的事件驱动架构提供了坚实的基础。