当前位置: 首页 > article >正文

RabbitMQ学习—day6—死信队列与延迟队列

目录

        死信队列

1. 死信的概念

2. 死信的来源

实战演练

1. 消息TTL过期

2. 队列达到最大长度

3. 消息被拒绝

延迟队列

概念

使用场景

TTL的两种设置


死信队列

1. 死信的概念

1.1 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

1.2 应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

2. 死信的来源

来源

  1. 消息TTL过期
  2. 队列达到最大长度(队列满了,无法再添加数据到mq中)
  3. 消息被拒绝(basic.reject或basic.nack)并且requeue=false

死信代码架构图

实战演练

1. 消息TTL过期

生产者

    /*
 * 死信队列之生产者代码
 *
 * */
 public class Producer {
 
     //普通交换机的名称
     public static final String NORMAL_EXCHANGE = "normal_exchange";
     public static void main(String[] args) throws Exception{
         Channel channel = RabbitMqUtils.getChannel();
 
         //死信消息,设置TTL时间  单位是ms  10000ms是10s
         AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
 
         for (int i = 0; i < 10; i++) {
             String message = "info" + i;
             channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes(StandardCharsets.UTF_8));
         }
     }
 }

消费者

    /*
 * 死信队列实战
 * 消费者01
 * */
 public class Consumer01 {
 
     //普通交换机名称
     public static final String NORMAL_EXCHANGE = "normal_exchange";
 
     //死信交换机名称
     public static final String DEAD_EXCHANGE = "dead_exchange";
 
     //普通队列名称
     public static final String NORMAL_QUEUE = "normal_queue";
 
     //死信队列名称
     public static final String DEAD_QUEUE = "dead_queue";
 
     public static void main(String[] args) throws  Exception{
         Channel channel = RabbitMqUtils.getChannel();
 
         //声明死信和普通的交换机类型为direct
         channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
         channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
 
         //声明普通队列
         HashMap<String, Object> arguments = new HashMap<>();
 
         //过期时间
         arguments.put("x-message-ttl",1000);
         //正常队列设置死信队列
         arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
         //设置死信RoutingKey
         arguments.put("x-dead-letter-routing-key","lisi");

     //声明死信和普通队列
     channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
     channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

     //绑定普通的交换机与普通的队列
     channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
     //绑定死信的交换机与死信的队列
     channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
     System.out.println("等待接收消息......");

     DeliverCallback deliverCallback = (consumerTag,message) -> {
         System.out.println("Consumer01接收的消息是:" + new String(message.getBody()));
     };

     channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag->{});
 }
}
 /*
 * 死信队列实战
 * 消费者02
 * */
 public class Consumer02 {
 
     //死信队列名称
     public static final String DEAD_QUEUE = "dead_queue";
 
     public static void main(String[] args) throws  Exception{
         Channel channel = RabbitMqUtils.getChannel();
         System.out.println("等待接收消息......");

     DeliverCallback deliverCallback = (consumerTag,message) -> {
         System.out.println("Consumer02接收的消息是:" + new String(message.getBody()));
     };

     channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag->{});
 }
}

效果:先启动消费者01,让交换机和队列创建,再关闭消费者01,启动生产者,10条消息被传送到NORMAL_QUEUE,然后被传送到DEAD_QUEUE,此时启动消费者02,消息全被接收

2. 队列达到最大长度

生产者

生产者Producer取消过期时间

消费者

消费者Consumer01修改:增加队列长度

消费者Consumer02不修改

效果:一共发送10条消息·普通队列接收到6条,死信队列接收到4条

3. 消息被拒绝

生产者

生产者代码无改变

消费者

消费者C1:让info5的消息被丢弃,记得开启手动应答,以及关闭队列长度

消费者C2代码无改变

效果:C1消费者接收到9条消息,C2接收到1条消息

延迟队列

概念

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

使用场景

  • 订单在十分钟之内未支付则自动取消
  • 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  • 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  • 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  • 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
  • 这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

TTL的两种设置

TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。

换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为「死信」。如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。

队列设置 TTL

在创建队列的时候设置队列的 x-message-ttl 属性

Map<String, Object> params = new HashMap<>();
params.put("x-message-ttl",5000);
return QueueBuilder.durable("QA").withArguments(args).build(); // QA 队列的最大存活时间位 5000 毫秒

消息设置 TTL

针对每条消息设置 TTL

rabbitTemplate.converAndSend("X","XC",message,correlationData -> {
    correlationData.getMessageProperties().setExpiration("5000");
});

两个代码块来自下方的案例

两者区别

如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间,具体看下方案例。

另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃

具体的延迟队列将在下一篇文章springboot整合rabbitmq中体现


http://www.kler.cn/a/558774.html

相关文章:

  • 网络IP跳动问题解决详
  • flink operator v1.10部署flink v1.19.2
  • 前后端分离系统架构:基于Spring Boot的最佳实践
  • Python数据结构深度探索:树的构建与遍历
  • 跟据spring boot版本,查看对应的tomcat,并查看可支持的tomcat的版本范围
  • 高速PCB电源层
  • 跟着 Lua 5.1 官方参考文档学习 Lua (8)
  • MyBatis框架七:缓存
  • 智能测试执行 利用算法 利用图像识别、自然语言处理等技术实现自动化测试执行
  • 《操作系统 - 清华大学》8 -1:进程管理:进程的定义
  • C++入门基础课程讲解
  • 解决每次 Maven Rebuild 后 Java 编译器版本变为 1.5
  • jmeter后端监视器的妙用和实现方法
  • 【网络】如何划分子网、计算子网掩码、确定网络地址、广播地址和可用主机地址范围?
  • 通俗理解什么是云原生?
  • Redis过期数据处理
  • Gin从入门到精通 (五)数据绑定与验证
  • devops 工具 网络安全
  • XML Schema 元素替换
  • ASP.NET Core Clean Architecture