当前位置: 首页 > article >正文

死信队列

死信队列

死信的概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列.

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

造成死信的原因

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 MQ 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

死信架构图

在这里插入图片描述

代码实战

  • TTL过期

    package com.vmware.rabbit.demo8;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DeliverCallback;
    import com.vmware.rabbit.utils.RabbitUtil;
    
    import java.util.HashMap;
    
    public class Consumer {
        //普通交换机
        private static final String NORMAL_EXCHANGE = "normal_exchange";
        //死信交换机
        private static final String DEAD_EXCHANGE = "dead_exchange";
        //普通队列
        private static final String NORMAL_QUEUE = "normal_queue";
        //死信队列
        private static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws Exception {
            Connection connection = RabbitUtil.getConnection();
            Channel channel = connection.createChannel();
            //创建普通队列死信分发参数
            HashMap<String,Object> arguments= new HashMap<>();
            arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
            arguments.put("x-dead-letter-routing-key","lisi");
            //创建普通交换机与队列并绑定
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
            channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
    
            //创建死信交换机和队列并绑定
            channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
            channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
            channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
    
    
            DeliverCallback deliverCallback= (tag,msg)->{
                String message = new String(msg.getBody());
                System.out.println("接收到消息:"+message);
                }
            };
    
            //创建消费者
            channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,(tag)->{});
        }
    }
    
    package com.vmware.rabbit.demo8;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.vmware.rabbit.utils.RabbitUtil;
    
    public class Producer {
        private static final String EXCHANGE_NAME = "normal_exchange";
        private static final String ROUTING_KEY = "zhangsan";
    
        public static void main(String[] args) throws Exception {
            Connection connection = RabbitUtil.getConnection();
            System.out.println("已连接到RabbitMQ服务器....");
            Channel channel = connection.createChannel();
            //设置超时为10秒
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().expiration("10000").build();
            for (int i = 0; i < 10; i++) {
                String message = "msg" + i;
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, basicProperties, message.getBytes());
                System.out.println("消息:"+message+"发送成功!");
            }
        }
    }
    
  • 队列达到最大长度

    package com.vmware.rabbit.demo8;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DeliverCallback;
    import com.vmware.rabbit.utils.RabbitUtil;
    
    import java.util.HashMap;
    
    public class Consumer {
        //普通交换机
        private static final String NORMAL_EXCHANGE = "normal_exchange";
        //死信交换机
        private static final String DEAD_EXCHANGE = "dead_exchange";
        //普通队列
        private static final String NORMAL_QUEUE = "normal_queue";
        //死信队列
        private static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws Exception {
            Connection connection = RabbitUtil.getConnection();
            Channel channel = connection.createChannel();
            //创建普通队列死信分发参数
            HashMap<String,Object> arguments= new HashMap<>();
            arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
            arguments.put("x-dead-letter-routing-key","lisi");
            //设置队列最大长度
            arguments.put("x-max-length",5);
            //创建普通交换机与队列并绑定
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
            channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
    
            //创建死信交换机和队列并绑定
            channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
            channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
            channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
    
    
            DeliverCallback deliverCallback= (tag,msg)->{
                String message = new String(msg.getBody());
           		System.out.println("消息:"+message+"被拒绝");
            };
    
            //创建消费者
            channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,(tag)->{});
        }
    }
    
    package com.vmware.rabbit.demo8;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.vmware.rabbit.utils.RabbitUtil;
    
    public class Producer {
        private static final String EXCHANGE_NAME = "normal_exchange";
        private static final String ROUTING_KEY = "zhangsan";
    
        public static void main(String[] args) throws Exception {
            Connection connection = RabbitUtil.getConnection();
            System.out.println("已连接到RabbitMQ服务器....");
            Channel channel = connection.createChannel();
            for (int i = 0; i < 10; i++) {
                String message = "msg" + i;
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
                System.out.println("消息:"+message+"发送成功!");
            }
        }
    }
    
  • 消息被拒

    package com.vmware.rabbit.demo8;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DeliverCallback;
    import com.vmware.rabbit.utils.RabbitUtil;
    
    import java.util.HashMap;
    
    public class Consumer {
        //普通交换机
        private static final String NORMAL_EXCHANGE = "normal_exchange";
        //死信交换机
        private static final String DEAD_EXCHANGE = "dead_exchange";
        //普通队列
        private static final String NORMAL_QUEUE = "normal_queue";
        //死信队列
        private static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws Exception {
            Connection connection = RabbitUtil.getConnection();
            Channel channel = connection.createChannel();
            //创建普通队列死信分发参数
            HashMap<String,Object> arguments= new HashMap<>();
            arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
            arguments.put("x-dead-letter-routing-key","lisi");
            //创建普通交换机与队列并绑定
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
            channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
    
            //创建死信交换机和队列并绑定
            channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
            channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
            channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
    
    
            DeliverCallback deliverCallback= (tag,msg)->{
                String message = new String(msg.getBody());
                if (message.equals("msg5")){
                    System.out.println("消息:"+message+"被拒绝");
                    channel.basicReject(msg.getEnvelope().getDeliveryTag(),false);
                }else {
                    System.out.println("接收到消息:"+message);
                    channel.basicAck(msg.getEnvelope().getDeliveryTag(),false);
                }
            };
    
            //创建消费者
            channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,(tag)->{});
        }
    }
    
    package com.vmware.rabbit.demo8;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DeliverCallback;
    import com.vmware.rabbit.utils.RabbitUtil;
    
    public class Consumer2 {
        private static final String DEAD_QUEUE_NAME = "dead_queue";
    
        public static void main(String[] args)throws Exception {
            Connection connection = RabbitUtil.getConnection();
            Channel channel = connection.createChannel();
            DeliverCallback deliverCallback=(tag,msg)->{
                String message= new String(msg.getBody());
                System.out.println("队列:"+DEAD_QUEUE_NAME+"\t收到消息:"+message);
            };
            channel.basicConsume(DEAD_QUEUE_NAME,true,deliverCallback,(tag)->{});
        }
    }
    
    package com.vmware.rabbit.demo8;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.vmware.rabbit.utils.RabbitUtil;
    
    public class Producer {
        private static final String EXCHANGE_NAME = "normal_exchange";
        private static final String ROUTING_KEY = "zhangsan";
    
        public static void main(String[] args) throws Exception {
            Connection connection = RabbitUtil.getConnection();
            System.out.println("已连接到RabbitMQ服务器....");
            Channel channel = connection.createChannel();
            for (int i = 0; i < 10; i++) {
                String message = "msg" + i;
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
                System.out.println("消息:"+message+"发送成功!");
            }
        }
    }
    

http://www.kler.cn/a/16700.html

相关文章:

  • 网络技术-定义配置ACL规则的语法和命令
  • JavaWeb后端开发知识储备1
  • 信号-3-信号处理
  • Docker 篇-Docker 详细安装、了解和使用 Docker 核心功能(数据卷、自定义镜像 Dockerfile、网络)
  • HBase理论_背景特点及数据单元及与Hive对比
  • 【云计算解决方案面试整理】1-2云计算基础概念及云计算技术原理
  • Vue3透传Attributes
  • Crowdsoure的简单介绍
  • Android Signal 使用
  • 关于使用Notion的board做工作安排这件事
  • 『Linux』第九讲:Linux多线程详解(一)_ 线程概念 | 线程控制之线程创建 | 虚拟地址到物理地址的转换
  • 云原生技术概谈
  • 医院安全(不良)事件报告系统 PHP语言实现
  • 【华为/华三】PPP
  • springbean 并发安全
  • Vue3中如何实现数字翻牌效果?
  • Redis-哈希
  • 互联网摸鱼日报(2023-04-29)
  • Docker基本管理
  • 少儿编程scratch
  • 7-1 设计一个学生类和它的一个子类——本科生类(interface接口)
  • PyTorch机器学习与深度学习技术方法
  • 微信小程序定义模板
  • 基于松鼠算法的极限学习机(ELM)回归预测-附代码
  • MySQL调优笔记——慢SQL优化记录(1)
  • 【热门框架】Maven分模块开发是什么意思?怎样操作?