当前位置: 首页 > article >正文

Rabbitmq--延迟消息

13.延迟消息

延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才会收到消息

延迟任务:一定时间之后才会执行的任务

image-20250310204548904

1.死信交换机

当一个队列中的某条消息满足下列情况之一时,就会成为死信(dead letter):

  • 消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false

  • 过期消息(达到了队列或消息本身设置的过期时间),消息超时后无人消费

  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果队列通过 dead-letter-exchange 属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中,这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)

image-20250310204712321

利用死信交换机的特点,可以实现发送延迟消息的功能

2.延迟消息插件(推荐使用)

1.下载并安装延迟插件

RabbitMQ 的官方推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后,可以将消息暂存一段时间,时间到了之后再将消息投递到队列中

插件的下载地址:rabbitmq-delayed-message-exchange

image-20250310204809121

下载完插件后,运行以下指令,在输出信息中找到 Mounts ,再找到 RabbitMQ 的插件的安装目

sudo docker inspect rabbitmq

image-20250310204857664

然后进入 RabbitMQ 的插件的安装目录,将刚才下载的插件上传到该目录下

sudo chmod +rx -R /var/lib/docker

//接着打开/var/lib/docker/volumes/rabbitmq-plugins/_data目录的写权限(如果修改权限不生效,请切换到 root 用户执行指令)
sudo chmod 777 /var/lib/docker/volumes/rabbitmq-plugins/_data

//将刚才下载的插件上传到/var/lib/docker/volumes/rabbitmq-plugins/_data目录
//上传成功后将/var/lib/docker/volumes/rabbitmq-plugins/_data目录的权限复原
sudo chmod 755 /var/lib/docker/volumes/rabbitmq-plugins/_data

//最后进入容器内部,运行指令安装插件,安装完成后退出容器内部
sudo docker exec -it rabbitmq bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
exit

看到以下信息,说明插件安装成功了

image-20250310205159598

2.在 Java 代码中发送延迟消息

声明延迟交换机

@Bean
public DirectExchange delayExchange() {
    return ExchangeBuilder.directExchange("delay.direct").delayed().build();
}

声明队列和延迟交换机,并将队列和延迟交换机绑定在一起

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue"),
        exchange = @Exchange(name = "delay.direct", delayed = "true", type = ExchangeTypes.DIRECT),
        key = "delay"
))
public void listenDelayQueue(String message) {
    SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
    simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");
    System.out.println("消费者收到了 delay.queue的消息: " + message + ",时间:" + simpleDateFormat.format(System.currentTimeMillis()));
}

编写测试方法,测试发送延迟消息

@Test
void testSendDelayMessage() {
    rabbitTemplate.convertAndSend("delay.direct", "delay", "Hello, DelayQueue!", new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setDelay(10000); // 毫秒
            return message;
        }
    });

    SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
    simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");
    System.out.println("发送消息成功!发送时间:" + simpleDateFormat.format(System.currentTimeMillis()));
}

发送延迟消息的本质是在消息头属性中添加 x-delay 属性

image-20250310205359715

3. 延迟消息的原理和缺点
  • RabbitMQ 的延迟消息是怎么实现的呢?RabbitMQ 会自动维护一个时钟,这个时钟每隔一秒就跳动一次,如果对时钟的精度要求比较高的,可能还要精确到毫秒,甚至纳秒

  • RabbitMQ 会为发送到交换机的每一条延迟消息创建一个时钟,时钟运行的过程中需要 CPU 不断地进行计算。发送到交换机的延迟消息数越多,RabbitMQ 需要维护的时钟就越多,对 CPU 的占用率就越高(Spring 提供的定时任务的原理也是类似)

  • 定时任务属于 CPU 密集型任务,中间涉及到的计算过程对 CPU 来说压力是很大的,所以说,采用延迟消息会给服务器的 CPU 带来更大的压力。当交换机中有非常多的延迟消息时,对 CPU 的压力就会特别大

4.取消超时订单

设置 30 分钟后检测订单支付状态实现起来非常简单,但是存在两个问题

  1. 如果并发较高,30分钟可能堆积消息过多,对 MQ 压力很大
  2. 大多数订单在下单后 1 分钟内就会支付,但消息需要在 MQ 中等待30分钟,浪费资源

image-20250310205533364

image-20250310205550036

5.发送延迟检测订单的消息

我们定义一个实体类,用于记录延迟消息的内容和延迟消息的延迟时间列表(该实体类也是延迟消息的类型)

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class MultipleDelayMessage<T> {

    private T data;

    private List<Long> delayMillis;

    public MultipleDelayMessage() {

    }

    public MultipleDelayMessage(T data, Long... delayMillis) {
        this.data = data;
        this.delayMillis = new ArrayList<>(Arrays.asList(delayMillis));
    }

    public MultipleDelayMessage(T data, List<Long> delayMillis) {
        this.data = data;
        this.delayMillis = delayMillis;
    }

    public static <T> MultipleDelayMessage<T> of(T data, Long... delayMillis) {
        return new MultipleDelayMessage<>(data, new ArrayList<>(Arrays.asList(delayMillis)));
    }

    public static <T> MultipleDelayMessage<T> of(T data, List<Long> delayMillis) {
        return new MultipleDelayMessage<>(data, delayMillis);
    }

    public boolean hasNextDelay() {
        return !delayMillis.isEmpty();
    }

    public Long removeNextDelay() {
        return delayMillis.remove(0);
    }

    public T getData() {
        return data;
    }

    public void setData(T data) {
        this.data = data;
    }

    public List<Long> getDelayMillis() {
        return delayMillis;
    }

    public void setDelayMillis(List<Long> delayMillis) {
        this.delayMillis = delayMillis;
    }

    @Override
    public String toString() {
        return "MultipleDelayMessage{" +
                "data=" + data +
                ", delayMillis=" + delayMillis +
                '}';
    }

}

我们再定义一个发送延迟消息的消息处理器,供所有服务使用

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;

public class DelayMessagePostProcessor implements MessagePostProcessor {

    private final Integer delay;

    public DelayMessagePostProcessor(Integer delay) {
        this.delay = delay;
    }

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setDelay(delay);
        return message;
    }

}

改造后的发送延迟消息的测试方法

  • “delay.direct”:交换机的名称

  • “delay”:路由键(routing key),交换机会将消息发送到绑定到这个路由键的队列

  • “Hello, DelayQueue!”:实际要发送的消息内容

  • new DelayMessagePostProcessor(10000):消息后处理器(Message Post Processor),用于在消息发送之前对消息进行修改

@Test
void testSendDelayMessage() {
    rabbitTemplate.convertAndSend("delay.direct", "delay", "Hello, DelayQueue!", new DelayMessagePostProcessor(10000));

    SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
    simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");
    System.out.println("发送消息成功!发送时间:" + simpleDateFormat.format(System.currentTimeMillis()));
}


http://www.kler.cn/a/579842.html

相关文章:

  • 深入理解C语言预处理器:从原理到实战
  • 游戏引擎学习第148天
  • 使用Python和p5.js创建的迷你游戏示例,该游戏包含多个屏幕和动画,满足在画布上显示图像、使用键盘命令移动图像
  • AXI接口总结
  • DeepSeek-进阶版部署(Linux+GPU)
  • RAG助力机器人场景理解与具身操作!EmbodiedRAG:基于动态三维场景图检索的机器人任务规划
  • 腾讯云低代码开发应用
  • Unity大型游戏开发全流程指南
  • ChātGPT开发“SolidWorks工具箱”,可建海量3D模型库,能一键画图、批量赋属性、自动出图,效率提高10倍
  • JAVA学习-练习试用Java实现“使用FP-Growth算法对大数据集中的频繁模式进行挖掘和筛选”
  • 将Exce中工作簿的多个工作表拆分为单独的Excel文件
  • 鸿蒙“一码多端”:万物互联时代的中国式创新与温度
  • 城市消防无人机系统解决技术详解
  • 蓝桥杯备赛-差分-推箱子
  • 拒绝“浅尝辄止”让考研知识深入人心
  • 010---基于Verilog HDL的分频器设计
  • 《 C++ 点滴漫谈: 三十 》函数参数
  • PyTorch基础语法万字解析
  • 开源将图像和 PDF 文件高精度地转换为 Markdown 和 JSON 格式的文本软件
  • 【一文学会 HTML5】