当前位置: 首页 > article >正文

RabbitMQ延迟消息——DelayExchange插件

什么是死信以及死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信

        1. 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false

        2. 消息是一个过期消息,超时无人消费

        3. 要投递的队列消息满了,无法投递

如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息

  2. 收集那些因队列满了而被拒绝的消息

  3. 收集因TTL(有效期)到期的消息

为什么这里会介绍死信交换机呢,举个例子,我们在购买车票的时候会有一个支付时间,8分钟没有支付就会销毁订单,返回车票。mq不可能时刻监控客户有没有支付,可以使用延迟消息,延迟8分钟,八分钟后再去发送消息到mq,在查看支付情况。

DelayExchange插件

官网下载地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

docker volume inspect mq-plugins

[
    {
        "CreatedAt": "2024-06-19T09:22:59+08:00",
        "Driver": "local",
        "Labels": null,
        "Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
        "Name": "mq-plugins",
        "Options": null,
        "Scope": "local"
    }
]

 插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。

接下来执行命令,安装插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange 

 

 具体使用

声明交换机,基于@Bean:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class DelayExchangeConfig {

    @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange("delay.direct") // 指定交换机类型和名称
                .delayed() // 设置delay的属性为true
                .durable(true) // 持久化
                .build();
    }

    @Bean
    public Queue delayedQueue(){
        return new Queue("sdgstu.queue");
    }
    
    @Bean
    public Binding delayQueueBinding(){
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}

基于注解:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "stusdg.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

发送消息:

@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}


http://www.kler.cn/a/303188.html

相关文章:

  • 生成自签名证书并配置 HTTPS 使用自签名证书
  • 深入探索 TypeScript:从基础到高级特性
  • WebLogic 介绍
  • Keil基于ARM Compiler 5的工程迁移为ARM Compiler 6的工程
  • [宁波24届]平方数
  • 1.两数之和-力扣(LeetCode)
  • Python之 条件与循环(Python‘s Conditions and loops)
  • 在麒麟系统 v10 SP3 上运行自带的 MariaDB
  • 【鸿蒙】HarmonyOS NEXT星河入门到实战6-组件化开发-样式结构重用常见组件
  • Oracle中VARCHAR和VARCHAR2的区别
  • CSS框架 Tailwind CSS
  • Leetcode3276. 选择矩阵中单元格的最大得分
  • CNN中的conv
  • ASP.net core 8.0网站发布
  • 房产销售系统|基于java和vue的房产销售系统(源码+数据库+文档)
  • 利用apache-pdfbox库修改pdf文件模板,进行信息替换
  • 【基础算法总结】二分查找
  • 在Python的Pandas库中,`df.iloc[::500]`是一个用于数据选择的索引器,它允许我们从DataFrame中选择特定的行和列。
  • golang学习笔记19——golang做服务发现与注册的深度剖析
  • 从安装ffmpeg开始,把一个视频按照每秒30帧fps剪切为图片
  • Vue组件:模板引用ref属性的使用
  • 微信小程序之轮播图组件封装
  • CTF常见编码及加解密(超全)第二篇
  • java程序员入行科目一之CRUD轻松入门教程(二)
  • layui监听table表单的多选框
  • 高级实时通信:基于 Python 的 WebSocket 实现与异步推送解决方案