当前位置: 首页 > article >正文

RabbitMQ消息队列 发送和接受

步骤 1: 安装 RabbitMQ

首先,需要安装 RabbitMQ,并确保它在运行中。

下载erlang语言包OTP。官网地址:Downloads - Erlang/OTP

Rabbitmq官网下载地址:Downloading and Installing RabbitMQ — RabbitMQ

安装MQ注意事项:需要先安装Erlang语言包,然后再安装RabbitMQ,安装成功后默认登录账号和密码是guest

步骤 2: 添加 RabbitMQ 客户端库

在你的 .NET Core Web API 项目中,使用 NuGet 包管理器添加 RabbitMQ.Client 客户端库

创建两个控制台

发送

// 引入RabbitMQ客户端库
using RabbitMQ.Client;
using System.Text;

// 创建连接工厂对象(用于配置连接参数)
var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1";        // RabbitMQ服务器IP地址
factory.DispatchConsumersAsync = true; // 启用异步消费者模式(即使当前是生产者)

// 定义消息路由关键参数
string exchangeName = "exchange1";    // 直连(Direct)类型交换机名称
string eventName = "myEvent";         // 路由键(RoutingKey),用于消息路由匹配

// 创建持久化连接(TCP连接复用)
using var conn = factory.CreateConnection();

// 持续发送消息的循环
while (true)
{
    // 生成消息内容:当前时间的时分秒
    string msg = DateTime.Now.TimeOfDay.ToString();
    
    // 创建临时信道(Channel),using确保资源释放
    using (var channel = conn.CreateModel())
    {
        // 设置消息属性
        var properties = channel.CreateBasicProperties();
        properties.DeliveryMode = 2; // 2=消息持久化(重启后不会丢失)
        
        // 声明直连型交换机(服务端不存在时自动创建)
        // durable参数默认为false,重启后交换机会被删除
        channel.ExchangeDeclare(
            exchange: exchangeName, 
            type: "direct"); 
        
        // 将消息转换为字节数组
        byte[] body = Encoding.UTF8.GetBytes(msg);
        
        // 发布消息到交换机
        channel.BasicPublish(
            exchange: exchangeName,     // 目标交换机
            routingKey: eventName,      // 路由键(决定哪个队列接收)
            mandatory: true,            // 开启消息回退机制(找不到队列时返回消息)
            basicProperties: properties,// 消息属性配置
            body: body);                // 消息体
    }
    
    Console.WriteLine($"已发布消息:{msg}");
    Thread.Sleep(1000); // 每秒发送一次
}

[生产者] → (发布消息到exchange1交换机) 
           ↓
exchange1根据routingKey="myEvent"路由 
           ↓
[队列] ← 需提前绑定队列到该交换机(代码中未体现)
 

接受

// 引入RabbitMQ客户端库
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

// 创建连接工厂(配置服务器参数)
var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1";         // RabbitMQ服务器地址
factory.DispatchConsumersAsync = true;  // 启用异步消息处理模式(重要!提升吞吐量)

// 定义消息路由参数(需与生产者一致)
string exchangeName = "exchange1";     // 直连型交换机名称
string eventName = "myEvent";          // 路由键匹配规则

// 建立TCP连接和信道(Channel)
using var conn = factory.CreateConnection();
using var channel = conn.CreateModel(); // 信道复用,避免频繁创建开销

// 队列定义与绑定
string queueName = "queue1";
// 声明持久化队列(服务重启后仍存在)
channel.ExchangeDeclare(
    exchange: exchangeName, 
    type: "direct",                     // 直连型交换机
    durable: true);                     // 持久化交换机(建议生产环境开启)

channel.QueueDeclare(
    queue: queueName,
    durable: true,                      // 队列持久化
    exclusive: false,                   // 非排他队列(允许多消费者)
    autoDelete: false,                  // 不自动删除队列(需显式删除)
    arguments: null);                   // 无额外参数

// 绑定队列到交换机(路由键严格匹配)
channel.QueueBind(
    queue: queueName,
    exchange: exchangeName, 
    routingKey: eventName);             // 仅接收routingKey=myEvent的消息

// 配置异步消费者
var consumer = new AsyncEventingBasicConsumer(channel);
// 注册消息接收事件处理器
consumer.Received += Consumer_Received; 

// 启动消费(关闭自动确认)
channel.BasicConsume(
    queue: queueName,
    autoAck: false,                     // 手动消息确认(确保处理完成后再ACK)
    consumer: consumer);

Console.ReadLine(); // 保持程序运行

// 消息处理函数(异步)
async Task Consumer_Received(object sender, BasicDeliverEventArgs args)
{
    try
    {
        // 解析消息内容
        var bytes = args.Body.ToArray();
        string msg = Encoding.UTF8.GetString(bytes);
        Console.WriteLine($"{DateTime.Now} 处理消息:{msg}");
        
        // 模拟业务处理耗时
        await Task.Delay(800);
        
        // 手动确认消息(删除队列中的消息)
        channel.BasicAck(
            deliveryTag: args.DeliveryTag, 
            multiple: false);           // 不批量确认
        
        Console.WriteLine($"消息 {msg} 处理完成");
    }
    catch (Exception ex)
    {
        // 拒绝消息(requeue:true 重新入队)
        channel.BasicReject(
            deliveryTag: args.DeliveryTag,
            requeue: true);             // 允许消息重新投递
        
        Console.WriteLine($"消息处理失败,已重新入队。错误:{ex.Message}");
    }
}

运行效果

代码参考:杨中科老师


http://www.kler.cn/a/541909.html

相关文章:

  • 网络安全与AI:数字经济发展双引擎
  • Linux 实操篇 实用指令
  • Maven 在 Eclipse 中的使用指南
  • Git 分布式版本控制工具使用教程
  • 参考数据和主数据:构建数据管理的基石
  • 什么是矩阵账号?如何做矩阵账号运营?
  • CP AUTOSAR标准之IOHardwareAbstraction(AUTOSAR_SWS_IOHardwareAbstraction)(更新中……)
  • 洛必达法则的证明与重要条件
  • DaDianNao:一种无主存储器的多核加速器
  • 机器学习算法的种类(机器学习类型的比较)
  • FPGA开发技能(10)热电偶测温ADS1118方案
  • Docker Desktop 镜像源配置
  • Spring Boot部署到服务器
  • 物联网智能语音控制灯光系统设计与实现
  • STM32_USART通用同步/异步收发器
  • soular基础教程-使用指南
  • Apifox与Apipost功能对比及选择建议(1):产品背景及API简单调试
  • Android系统分区概述和编译镜像包理解
  • 推荐一款 免费的SSL,自动续期
  • XML 元素:结构化数据的基石
  • 模型压缩中的四大核心技术 —— 量化、剪枝、知识蒸馏和二值化
  • Arduino 第十一章:温度传感器
  • Qt笔记P1-30
  • 【课程系列14】某乎AI大模型全栈工程师-第9期(已完结)
  • 03:Spring之Web
  • MySQL第五次作业(触发器、存储过程)