当前位置: 首页 > article >正文

Rabbitmq高级特性之消费方确认

背景:

发送方发送消息之后,到达消费端之后,可能会有以下情况:消息处理成功,消息处理异常。RabbitMQ在向消费者发送消息之后,就会把这条消息给删除掉,那么第二种情况,就会造成消息丢失,为了保证是顺利到达消费方,RabbitMQ提供了消息确认机制。消费者在订阅队列时,可以指定autoAck参数,根据这个参数设置,消息确认机制分为以下两种:自动确认和手动确认。

RabbitMQ原生api中的消费消息的方法:

ps:接下的代码都是基于AMQP来实现的(基于Spring对RabbitMQ进行了一层封装)

1.手动确认

当autoAck为false的时候,RabbitMQ会等待消费者显式的调用Basic.Ack命令,回复确认信号后才从内存或磁盘中移去消息,该种方式适合对消息可靠性要求比较高的场景。

(1)application.yml配置

要基于AMQP实现手动确认,只需修改application.yml(或.properities)的配置:

manual:手动确认,必须显示调用basicAck方法来确认消息,如果消息未被确认,RabbitMQ会认为消息尚未被成功处理,并且会在消费者可用时重新投递该消息,这种模式提高了消息处理的可靠性,因为即使消费方消费失败,消息也不会消失,而是可以被重新处理。

auto:自动确认,这种模式下,消费者在消息处理成功时会自动确认消息,但是如果处理过程中出现异常,则不会确认消息。

none:这种模式下,无论消费者有没有处理成功消息,RabbitMQ都会自动确认消息,从RabbitMq中移除消息,如果消息处理失败,那么可能会导致丢失。

(2)manual方式示例

config配置类:

controller类:

 listen类:

 现在开启手动确认,但是却没有手动确认,此时访问对应接口(127.0.0.1:8080/producer/ack):

启动台打印:

RabbitMQ管理界面:

未被确认的消息会在合适时机重新入队,有消费者监听队列时,投递给消费者进行消费。

 现在进行修改代码:

 刚才滞留的消息,会立即被重新消费,此时已经进行了手动确认:

查看RabbitMQ管理界面:

 滞留的消息确认已经被重新消费了。

2.自动确认

自动确认开启的情况下,会自动把发送出去大的消息置为确认,然后从内存中(或者硬盘)删除,而不管消费者是否真正地消费到了这些消息,自动确认模式适合对于消息可靠性要求不高的场景。

有auto和none,两者在未发生异常的情况下是相同:自动确认消息。

2.1 auto方式示例

(1)application.yml配置文件:

(2)controller类:

(3)producer类:

(4)正常启动,并访问接口127.0.0.1:8080/producer/ack:

观察RabbitMQ管理界面:(消息发送到消费端后,消费成功后就从RabbitMQ上移除了)

(5)设置异常,然后启动并访问接口127.0.0.1:8080/producer/ack:

可以看到客户端在不断报错:

这是因为在auto模式下,如果在消费过程中出现异常,消费会立即重新入队进行消费,由于还是一个消费方,所以会一直报错(一直重复上述过程),观察RabbitMQ管理界面也可以看到中有一条消息处于Ready状态(这也是会一直重新入队消费的原因,不同manual方式下如果没有手动确认的消息会处于Unacked状态下,只有进行了手动确认才会从RabbitMQ中移除)

 2.2 none方式示例

(1)application.yml配置文件:

(2)producer类:

(3)controller类:

(4)正常启动,并访问接口127.0.0.1:8080/producer/ack:

(5)设置异常,启动并访问接口127.0.0.1:8080/producer/ack:

出现报错:

RabbitMQ管理界面:

可以看到消息丢失。

3. 手动确认的相关方法

(1)basicAck

第一个参数(deliveryTag):消息的唯一标识,是每一个通道(channel)独立维护的,所以在每一个通道上都是唯一的,当消费者手动确认一条消息的时候,都必须使用对应的通道进行。

第二个参数(multiple):是否批量确认,在某些情况下,为了减少网络流量,可以对一系列连续的deliveryTag进行批量确认,值为true则会一次性ack确认所有小于或等于指定的deliveryTag的消息;为false,则只会确认当前的消息。

(2)basicReject

第一个参数参考basicAck。

第二个参数(requeue):是否重新入队,表示拒绝后,该条消息如何处理,如果设置为true,那么在消息处理失败后,会重新入队,以便发送给下一个监听该队列的消费者,如果设置为false,则RabbitMQ会把这条消息从队列中移除,而不会把消息发送给下一个监听的消费者。

(3)basicNack

和basciReject都是否定确认,不同的是basicNack可以批量拒绝消息。 


http://www.kler.cn/a/513068.html

相关文章:

  • 开源AI崛起:新模型逼近商业巨头
  • 第12章:Python TDD完善货币加法运算(一)
  • 从CRUD到高级功能:EF Core在.NET Core中全面应用(三)
  • @LoadBalanced注解的实现原理
  • 深入探索Python人脸识别技术:从原理到实践
  • Numpy基础01(Jupyter基本用法/Ndarray创建与基本操作)
  • Flutter中PlatformView在鸿蒙中的使用
  • Elixir语言的数据库编程
  • CKS认证 | Day1 K8s集群部署与安全配置
  • 部门管理查询部门,nginx反向代理,前端如何访问到后端Tomcat 注解@RequestParam
  • 博客之星2024年度-技术总结:技术探险家小板的一年的征程
  • AF3 AttentionPairBias类源码解读
  • 三、I2C客户端驱动 —— htu21d
  • uboot剖析之命令行延时
  • C++ 学习:深入理解 Linux 系统中的冯诺依曼架构
  • python爬虫入门(实践)
  • 基于Springboot+Redis秒杀系统 demo
  • 【2024年华为OD机试】 (JavaScriptJava PythonC/C++)
  • 网络安全态势感知技术综述
  • Apache Hive 聚合函数与 OVER 窗口函数:从基础到高级应用
  • Oracle审计
  • SecureUtil.aes数据加密工具类
  • 通义万相:阿里巴巴 AI 生成式多模态平台解析与实战应用
  • 细说STM32F407单片机电源低功耗StandbyMode待机模式及应用示例
  • AI编程工具使用技巧:在Visual Studio Code中高效利用阿里云通义灵码
  • 如何提升IP地址查询数据服务的安全?