RabbitMQ消息队列 发送和接受
步骤 1: 安装 RabbitMQ
首先,需要安装 RabbitMQ,并确保它在运行中。
下载erlang语言包OTP。官网地址:Downloads - Erlang/OTP
Rabbitmq官网下载地址:Downloading and Installing RabbitMQ — RabbitMQ
安装MQ注意事项:需要先安装Erlang语言包,然后再安装RabbitMQ,安装成功后默认登录账号和密码是guest
步骤 2: 添加 RabbitMQ 客户端库
在你的 .NET Core Web API 项目中,使用 NuGet 包管理器添加 RabbitMQ.Client 客户端库
创建两个控制台
发送
// 引入RabbitMQ客户端库
using RabbitMQ.Client;
using System.Text;
// 创建连接工厂对象(用于配置连接参数)
var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1"; // RabbitMQ服务器IP地址
factory.DispatchConsumersAsync = true; // 启用异步消费者模式(即使当前是生产者)
// 定义消息路由关键参数
string exchangeName = "exchange1"; // 直连(Direct)类型交换机名称
string eventName = "myEvent"; // 路由键(RoutingKey),用于消息路由匹配
// 创建持久化连接(TCP连接复用)
using var conn = factory.CreateConnection();
// 持续发送消息的循环
while (true)
{
// 生成消息内容:当前时间的时分秒
string msg = DateTime.Now.TimeOfDay.ToString();
// 创建临时信道(Channel),using确保资源释放
using (var channel = conn.CreateModel())
{
// 设置消息属性
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2; // 2=消息持久化(重启后不会丢失)
// 声明直连型交换机(服务端不存在时自动创建)
// durable参数默认为false,重启后交换机会被删除
channel.ExchangeDeclare(
exchange: exchangeName,
type: "direct");
// 将消息转换为字节数组
byte[] body = Encoding.UTF8.GetBytes(msg);
// 发布消息到交换机
channel.BasicPublish(
exchange: exchangeName, // 目标交换机
routingKey: eventName, // 路由键(决定哪个队列接收)
mandatory: true, // 开启消息回退机制(找不到队列时返回消息)
basicProperties: properties,// 消息属性配置
body: body); // 消息体
}
Console.WriteLine($"已发布消息:{msg}");
Thread.Sleep(1000); // 每秒发送一次
}
[生产者] → (发布消息到exchange1交换机)
↓
exchange1根据routingKey="myEvent"路由
↓
[队列] ← 需提前绑定队列到该交换机(代码中未体现)
接受
// 引入RabbitMQ客户端库
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
// 创建连接工厂(配置服务器参数)
var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1"; // RabbitMQ服务器地址
factory.DispatchConsumersAsync = true; // 启用异步消息处理模式(重要!提升吞吐量)
// 定义消息路由参数(需与生产者一致)
string exchangeName = "exchange1"; // 直连型交换机名称
string eventName = "myEvent"; // 路由键匹配规则
// 建立TCP连接和信道(Channel)
using var conn = factory.CreateConnection();
using var channel = conn.CreateModel(); // 信道复用,避免频繁创建开销
// 队列定义与绑定
string queueName = "queue1";
// 声明持久化队列(服务重启后仍存在)
channel.ExchangeDeclare(
exchange: exchangeName,
type: "direct", // 直连型交换机
durable: true); // 持久化交换机(建议生产环境开启)
channel.QueueDeclare(
queue: queueName,
durable: true, // 队列持久化
exclusive: false, // 非排他队列(允许多消费者)
autoDelete: false, // 不自动删除队列(需显式删除)
arguments: null); // 无额外参数
// 绑定队列到交换机(路由键严格匹配)
channel.QueueBind(
queue: queueName,
exchange: exchangeName,
routingKey: eventName); // 仅接收routingKey=myEvent的消息
// 配置异步消费者
var consumer = new AsyncEventingBasicConsumer(channel);
// 注册消息接收事件处理器
consumer.Received += Consumer_Received;
// 启动消费(关闭自动确认)
channel.BasicConsume(
queue: queueName,
autoAck: false, // 手动消息确认(确保处理完成后再ACK)
consumer: consumer);
Console.ReadLine(); // 保持程序运行
// 消息处理函数(异步)
async Task Consumer_Received(object sender, BasicDeliverEventArgs args)
{
try
{
// 解析消息内容
var bytes = args.Body.ToArray();
string msg = Encoding.UTF8.GetString(bytes);
Console.WriteLine($"{DateTime.Now} 处理消息:{msg}");
// 模拟业务处理耗时
await Task.Delay(800);
// 手动确认消息(删除队列中的消息)
channel.BasicAck(
deliveryTag: args.DeliveryTag,
multiple: false); // 不批量确认
Console.WriteLine($"消息 {msg} 处理完成");
}
catch (Exception ex)
{
// 拒绝消息(requeue:true 重新入队)
channel.BasicReject(
deliveryTag: args.DeliveryTag,
requeue: true); // 允许消息重新投递
Console.WriteLine($"消息处理失败,已重新入队。错误:{ex.Message}");
}
}
运行效果
代码参考:杨中科老师