当前位置: 首页 > article >正文

RabbitMQ--延迟队列

(一)延迟队列

1.概念

 延迟队列是一种特殊的队列,消息被发送后,消费者并不会立刻拿到消息,而是等待一段时间后,消费者才可以从这个队列中拿到消息进行消费

2.应用场景

 延迟队列的应用场景很多,就比如大部分定时的场景,我们都可以利用延迟队列例如:闹钟定时,预约会议,空调定时开关等

 但是RabbitMQ是没有直接给我们提供延迟队列的,但是我们可以通过上一篇博客说的ttl和死信来达到延迟队列的效果,具体操作如下

 首先我们有一个交换机和一个队列,然后此队列又指定一个死信交换机,死信交换机绑定一个死信队列,然后我们消费者并不是从正常队列中获取消息,而是从死信队列中获取消息,通过给消息/队列设置过期时间来影响消息到达死信队列的时间,消费者拿到消息就会延迟,这样就可以模拟出延迟的效果。

那接下来就是我们的代码实现

首先我们通过设置队列ttl来实现

 @Bean("ttlExchange")
    public Exchange ttlExchange(){
        return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).durable(true).build();
    }
    @Bean("ttlQueue")
    public Queue ttlQueue(){
        return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(5000)
                .deadLetterExchange(Constants.DEAD_EXCHANGE)
                .deadLetterRoutingKey("dead")
                .build();
    }
    @Bean("ttlBind")
    public Binding ttlBind(@Qualifier("ttlExchange") Exchange ackExchange,@Qualifier("ttlQueue") Queue queue){
        return BindingBuilder.bind(queue).to(ackExchange).with("ttl").noargs();
    }
    @Bean("deadExchange")
    public Exchange deadExchange(){
        return ExchangeBuilder.directExchange(Constants.DEAD_EXCHANGE).durable(true).build();
    }
    @Bean("deadQueue")
    public Queue deadQueue(){
        return QueueBuilder.durable(Constants.DEAD_QUEUE).build();
    }
    @Bean("deadBind")
    public Binding deadBind(@Qualifier("deadExchange") Exchange ackExchange,@Qualifier("deadQueue") Queue queue){
        return BindingBuilder.bind(queue).to(ackExchange).with("dead").noargs();
    }

然后生产者代码没什么变化

 @RequestMapping("ttl")
    public String TTLPro(){
        String s1="ttl test";
        Message message=new Message(s1.getBytes(StandardCharsets.UTF_8));
//        message.getMessageProperties().setExpiration("10000");
        RabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl",message);
        return "发送成功";
    }

只不过消费者订阅的是死信队列

@RabbitListener(queues = Constants.DEAD_QUEUE)
    public void ListenerQueue2(Message message,Channel channel) throws IOException {
        long Tag=message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("接收到消息: "+ new String(message.getBody())+" TagID: "
                    +Tag);
            int num=3/0;     //模拟失败
            channel.basicAck(Tag,false);
            System.out.println("处理完成");
        }catch (Exception e){
            channel.basicReject(Tag,false);
        }
    }

这样过了5s后我们就可以从死信队列中获取到延迟消息了

那我们再来通过设置消息的ttl来看一下

首先我们要把队列的ttl给取消掉,记得要删队列

 @Bean("ttlExchange")
    public Exchange ttlExchange(){
        return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).durable(true).build();
    }
    @Bean("ttlQueue")
    public Queue ttlQueue(){
        return QueueBuilder.durable(Constants.TTL_QUEUE)
                .deadLetterExchange(Constants.DEAD_EXCHANGE)
                .deadLetterRoutingKey("dead")
                .build();
    }
    @Bean("ttlBind")
    public Binding ttlBind(@Qualifier("ttlExchange") Exchange ackExchange,@Qualifier("ttlQueue") Queue queue){
        return BindingBuilder.bind(queue).to(ackExchange).with("ttl").noargs();
    }
    @Bean("deadExchange")
    public Exchange deadExchange(){
        return ExchangeBuilder.directExchange(Constants.DEAD_EXCHANGE).durable(true).build();
    }
    @Bean("deadQueue")
    public Queue deadQueue(){
        return QueueBuilder.durable(Constants.DEAD_QUEUE).build();
    }
    @Bean("deadBind")
    public Binding deadBind(@Qualifier("deadExchange") Exchange ackExchange,@Qualifier("deadQueue") Queue queue){
        return BindingBuilder.bind(queue).to(ackExchange).with("dead").noargs();
    }

  然后我们发送一个ttl时间为10s的,再发送一个5s的,我们知道这样两条数据是会发生错误的,因为我们设置消息过期时间,我们RabbitMQ(性能问题)并不会遍历整个消息队列看看谁过没过期,如果过期的消息不在队头,那么只有当使用的时候,才会真正的进行一些过期处理,比如传给死信交换机 

@RequestMapping("ttl")
    public String TTLPro(){
        String s1="ttl test";
        Message message=new Message(s1.getBytes(StandardCharsets.UTF_8));
        message.getMessageProperties().setExpiration("10000");
        RabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl",message);
        message.getMessageProperties().setExpiration("5000");
        RabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl",message);
        return "发送成功";
    }

如果正常,会在5s后接收到第一个消息,在10s后接收到第二个消息,但是此时我们会同一时间(10s)接收到两条消息

  那这个问题在上一篇ttl的时候就说过了,这里依然是个问题,虽然设置队列的ttl不会有这个问题,但是设置队列ttl我们针对不同延迟时间就需要创建多个队列,这是不太合理的,所以针对这个问题,我们有一个延迟队列的插件可以使用

 3.延迟队列插件

延迟队列插件,会给我们提供一个特殊的交换机,来完成我们的延迟功能

这是我们插件的下载地址

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

  我们需要找到ez文件并下载,但是注意这里的版本要与你的RabbitMQ版本可以匹配,否则之后会出现问题

那插件下完后,我们要找到对应目录,下载插件

上面两个目录,我们可以任选一个下载即可,没有这个目录,我们手动创建

然后把下载的ez文件,copy到这个目录中即可,然后我们可以使用命令 rabbitmq-plugins list 来查看插件列表,看看我们有没有成功放进去,但是注意,即使我们成功放进去并成功显示了,也可能会出错,这就可能是你们下载的RabbitMQ版本与整个延迟插件的版本不匹配,重新下载其他版本即可

然后我们启动插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange

之后重启服务service rabbitmq-server restart

在没有发生错误的情况下,我们就发现我们会多了一个默认的交换机

此时我们代码中就不需要声明普通交换机了而是直接使用默认交换机即可

我们生产者代码是需要改一下的,我们需要调用一个方法来设置延迟时间

@RequestMapping("/delay2")
public String delay2() {
 //发送带ttl的消息 
 rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed", 
"delayed test 20s..."+new Date(), messagePostProcessor -> {
 messagePostProcessor.getMessageProperties().setDelayLong(20000L); 
 return messagePostProcessor;
 });
 rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed", 
"delayed test 10s..."+new Date(), messagePostProcessor -> {
 messagePostProcessor.getMessageProperties().setDelayLong(10000L); //设置延迟时间 
 return messagePostProcessor;
 });
 return "发送成功!";
}

此时我们就可以在10s正确接收一个消息,在20s正确接收另一个消息 

  注意我们使用TTL+死信时消息传递给交换机后映射之后一直在正常队列中的,等待TTL时间到了把消息给死信交换机再映射到死信队列再拿到消息,我们使用插件的时候,消息是在RabbitMQ给我们提供的那个特殊的交换机中的,等待时间到了,再映射给队列,然后从队列中拿消息

4.常见面试题

介绍下延迟队列

我们可以这样回答:

 延迟队列是一个特殊的队列,消息发送后,消费者并不会立刻拿到,而是等待一定延迟时间后才发送给消费者进行消费

 并且延迟队列的应用场景很多,比如订单支付,智能家电,以及定时邮箱

 但是延迟队列在RabbitMQ中并没有直接给我们提供,我们可以通过TTL+死信的方式或者使用延迟插件的方式来实现延迟功能

 两者的区别:

1.通过TTL+死信

 优点:比较灵活,不需要我们额外引入插件

 缺点:我们设置消息TTL的时候可能会出现顺序的问题,而且我们需要多创建死信队列和死信交换机,完成一些绑定,增加了系统的复杂性

2.基于插件实现的延迟队列

 优点:通过插件能够简化延迟消息的实现,并且避免了时序问题

 缺点:需要依赖插件,不同版本RabbitMQ需要不同版本插件,有运维工作


http://www.kler.cn/a/509687.html

相关文章:

  • Java 接口安全指南
  • CSRF攻击XSS攻击
  • 如何在 Google Cloud Shell 中使用 Visual Studio Code (VS Code)?
  • 小白:react antd 搭建框架关于 RangePicker DatePicker 时间组件使用记录 2
  • 基于 Python 的财经数据接口库:AKShare
  • Power Automate 实现字符串分割、替换、换行显示
  • opencv3.4 ffmpeg3.4 arm-linux 交叉编译
  • 02UML图(D1_结构图)
  • 【Python项目】基于深度学习的开放领域时间抽取系统
  • 【机器学习:二十、拆分原始训练集】
  • 【数据分析】coco格式数据生成yolo数据可视化
  • 11.在 Vue 3 中使用 ECharts 实现树状图
  • vue中的那些事(刷新+key+v-if,v-for)
  • Python制作简易PDF查看工具PDFViewerV1.0
  • 自建本地Linux、PHP服务部署并验证
  • python编程-OpenCV(图像读写-图像处理-图像滤波-角点检测-边缘检测)图像变换
  • STM32 学习笔记【补充】(十)硬件I2C读写MPU6050
  • 微调Qwen2.5-0.5B记录
  • 西门子PLC读取梅安森烟雾传感器数据
  • 5. 使用springboot做一个音乐播放器软件项目【业务逻辑开发】
  • 分布式理解
  • SiamCAR(2019CVPR):用于视觉跟踪的Siamese全卷积分类和回归网络
  • app版本控制java后端接口版本管理
  • Spring Boot 中使用 ShardingSphere-Proxy
  • SpringBoot 项目中配置日志系统文件 logback-spring.xml 原理和用法介绍
  • 数字化的三大战场与开源AI智能名片2+1链动模式S2B2C商城小程序源码的应用探索