当前位置: 首页 > article >正文

【RabbitMq】RabbitMq高级特性-延迟消息

延迟消息

  • 什么是延迟消息
  • 死信交换机
  • 延迟消息插件-DelayExchange
  • 其他文章

什么是延迟消息

延迟消息:发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延迟任务:设置在一定时间之后才执行的任务
在这里插入图片描述
如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

死信交换机有什么作用呢?

  • 收集那些因处理失败而被拒绝的消息
  • 收集那些因队列满了而被拒绝的消息
  • 收集因TTL(有效期)到期的消息

死信交换机

当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信
    在这里插入图片描述

实现代码
发送者

    @Test
    public void SendNormalMessage() {
        String exchangeName = "normal.direct";
        String message = "Hello normal";
        this.rabbitTemplate.convertAndSend(exchangeName, "dlx", message, message1 -> {
            message1.getMessageProperties().setExpiration("2000");
            return message1;
        });
    }

消费者

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "dlx.queue"),
            exchange = @Exchange(name = "dlx.direct"),
            key = {"dlx"}
    ))
    public void istenerdlxQueue2(String msg){
        log.info("消费者.。。。。。。。监听到 dlx.queue 的消息【{}】",msg);
    }
@Configuration
public class SpringRabbitConfig {
    
    @Bean
    public DirectExchange normalExchange(){
        return new DirectExchange("normal.direct");
    }

    @Bean
    public Queue normalQueue(){
        return QueueBuilder
                .durable("normal.queue")
                .deadLetterExchange("dlx.direct")
                .build();
    }

    @Bean
    public Binding normalQueueBinding(Queue normalQueue, DirectExchange normalExchange){
        return BindingBuilder.bind(normalQueue).to(normalExchange).with("dlx");
    }
    
}

注:
RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。
当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。

延迟消息插件-DelayExchange

这个插件可以将普通交换机改造为支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。

在这里插入图片描述
基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。
官方文档说明:
延迟消息插件
插件下载地址

由于我安装的MQ是3.8.x版本,因此这里下载3.8.17版本,下载时注意版本对应
因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

docker volume inspect mq-plugins

结果如下:
在这里插入图片描述

插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。
接下来执行命令,安装插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

执行结果:
在这里插入图片描述

声明延迟交换机
基于注解方式:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

基于@Bean的方式:

@Slf4j
@Configuration
public class DelayExchangeConfig {

    @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange("delay.direct") // 指定交换机类型和名称
                .delayed() // 设置delay的属性为true
                .durable(true) // 持久化
                .build();
    }

    @Bean
    public Queue delayedQueue(){
        return new Queue("delay.queue");
    }
    
    @Bean
    public Binding delayQueueBinding(){
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}

注:
延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。
因此,不建议设置延迟时间过长的延迟消息。

其他文章

发送者可靠性
消费者可靠性
延迟信息


http://www.kler.cn/a/515581.html

相关文章:

  • RabbitMQ 高级特性
  • Mac安装Homebrew
  • ngrok同时配置多个内网穿透方法
  • ue5 GAS制作一个技能
  • [Qt]系统相关-多线程、线程安全问题以及线程的同步机制
  • Flutter调用HarmonyOS NEXT原生相机拍摄相册选择照片视频
  • 观察者模式 - 观察者模式的应用场景
  • HippoRAG:受海马体启发的长时记忆模型,提升大语言模型的知识整合能力
  • YOLOv1、YOLOv2、YOLOv3目标检测算法原理与实战第十三天|YOLOv3实战、安装Typora
  • 部门管理新增部门 接收json格式的请求参数 @requestbody
  • JAVA 使用反射比较对象属性的变化,记录修改日志。使用注解【策略模式】,来进行不同属性枚举值到中英文描述的切换,支持前端国际化。
  • Agent群舞,在亚马逊云科技搭建数字营销多代理(Multi-Agent)(下篇)
  • xtermjs重复发送
  • 【面试题Java】单例模式
  • 零售业革命:改变行业的顶级物联网用例
  • 算法随笔_17: 回文数
  • Gartner发布2025年网络治理、风险与合规战略路线图
  • 自然语言处理(NLP)-总览图学习
  • Java中回调函数
  • Laravel 实战:用Carbon筛选最近15分钟内的数据
  • 使用EasyExcel(FastExcel) 的模板填充报Create workbook failure
  • c#调用c++的dll,字符串指针参数问题
  • Flutter 使用 flutter_inappwebview 加载 App 本地 HTML 文件
  • QT:控件属性及常用控件(3)-----输入类控件(正则表达式)
  • TangoFlux 本地部署实用教程:开启无限音频创意脑洞
  • Threejs的学习-几何点线面