当前位置: 首页 > article >正文

RabbitMQ-消息消费确认

我们一般使用的是消费者作为被动方接收 RabbitMQ 推送消息,另一种是消费者作为主动方可以主动拉取消息。

RabbitMq 服务器推送消息分为隐式(自动)确认和显示确认。

1 消费者拉取消息

消费者作为主动方拉取消息,每次只能获取一条。

using (var channel = connection.CreateModel())
{
    BasicGetResult result = channel.BasicGet("PersistenceQueue", true);
    string message = Encoding.UTF8.GetString(result.Body.ToArray());
    Console.WriteLine($"拉取到消息:{message}");
}

2 RabbitMq服务器推送消息

消费者作为被动方接收RabbitMQ推送消息。

using (var channel = connection.CreateModel())
{
    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    //就是Rabbitmq的服务器作为主动方---RabbitMq 推送消息到消费者来的;
    consumer.Received += (model, ea) =>
    {
        string message = Encoding.UTF8.GetString(ea.Body.ToArray());
        Console.WriteLine($"正常收到消息:{message}");
    };
    channel.BasicConsume(queue: "PersistenceQueue", autoAck: true, consumer: consumer);
}

3 隐式确认

当 RabbbitMQ 将消息发送给消费者后,消费者端接收到消息后,不等待消息处理结束,立即自动回送一个确认回执。

自动确认的用法十分简单,设置消费方法的参数 autoAck 为 true 即可,我们前边的例子都是使用的自动确认。

channel.BasicConsume(queue: "PersistenceQueue", autoAck: true, consumer: consumer);

4 显式确认

设置消费方法的参数 autoAck 为 false,channel.BasicAck可以一条一条确认后删除,也可使用 channel.BasicReject不删除。

//定义消费者                                      
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
int i = 0;
//就是Rabbitmq的服务器作为主动方---RabbitMq 推送消息到消费者来的;
consumer.Received += (model, ea) =>
{
    string message = Encoding.UTF8.GetString(ea.Body.ToArray());
    Console.WriteLine($"正常收到消息:{message}");
    if (i < 100)
    {
        Console.WriteLine($"【{message}】消息已经被消费,同时从RabbitMQ服务器删除");
        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
    }
    else
    {
        Console.WriteLine($"【{message}】消息没有被正常消费,可以让消息不要删除");
        channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
        //throw new Exception("消息消费异常了~");
    }
    i++;
};
//显式确认
channel.BasicConsume(queue: "PersistenceQueue", autoAck: false, consumer: consumer);

5 消息质量

channel.BasicQos可以设置每次从队列中取出几条消息进行消费。

channel.BasicQos(prefetchSize: 0, prefetchCount: 2, global: false);

方法中参数 prefetchSize 为预取的长度,一般设置为0即可,表示长度不限;

prefetchCount 表示预取的条数,即发送的最大消息条数;

global 表示是否在 Connection 中全局设置,true表示 Connetion 下的所有 channel 都设置为这个配置。


http://www.kler.cn/a/503656.html

相关文章:

  • ArkTS 组件事件、状态管理与资源管理
  • 图生生 AI 绘画,根据文字描述生成图片
  • 深入理解 ECMAScript 2024 新特性:正则表达式 /v 标志
  • 数据结构的存储方式
  • 【已解决】git clone报错:Failed to connect to github.com port 443: Timed out
  • Nginx安全加固系列:Referrer-Policy
  • 小结:华为路由器常用的操作指令
  • thinkphp 5.0 结合redis 做延迟队列,队列无法被消费
  • ESLint修正代码规范错误
  • 纯 Python、Django、FastAPI、Flask、Pyramid、Jupyter、dbt 解析和差异分析
  • PySide6-UI界面设计
  • 《使用人工智能虚拟原生增强技术取代晚期钆增强技术,用于肥厚型心肌病的无钆心血管磁共振组织表征》论文精读
  • 元宇宙和边缘计算是什么?两者有什么关系?
  • 实用好软-----电脑端链接手机 免root权限管理手机 调试安卓
  • WINFORM - DevExpress -> gridcontrol ---->控件(ColumnEdit控件)
  • 水库水雨情监测系统:实时自动化预警
  • FPGA工程师成长四阶段
  • 核密度估计(Kernel Density Estimation, KDE)是一种非参数统计方法
  • react swiper@6.x 工作中遇到的问题处理
  • C++实现设计模式---备忘录模式 (Memento)