当前位置: 首页 > article >正文

RabbitMQ如何实现延时队列?

1.什么是RabbitMQ?

RabbitMQ 是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP)。它允许不同的应用程序通过消息进行通信,并且能够可靠地传递、路由和存储消息。

2.利用RabbitMQ实现延时队列的方法

2.1利用死信交换机(Dead - Letter - Exchange,DLX)来实现

原理:

当消息在队列中过期后,会被发送到死信交换机,消费者从连接死信交换机的队列中获取过期后的消息进行处理。

步骤示例:

声明交换机和队列

首先需要声明一个普通交换机(比如direct类型的交换机)和一个死信交换机(同样可以是direct类型),以及两个队列,一个是普通队列用于接收消息,另一个是连接死信交换机的队列用于接收过期消息。

假设使用 Java 和 RabbitMQ 的 Java 客户端amqp - client来实现,代码如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class DelayedQueueExample {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    private static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
    private static final String NORMAL_QUEUE = "normal_queue";
    private static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明死信交换机
        channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, "direct");
        // 声明普通交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, "direct");
        // 声明死信队列
        channel.queueDeclare(DEAD_LETTER_QUEUE, false, false, false, null);
        // 声明普通队列,并设置死信交换机和路由键
        Map<String, Object> args = new HashMap<>();
        args.put("x - dead - letter - exchange", DEAD_LETTER_EXCHANGE);
        args.put("x - dead - letter - routing - key", "dead_letter_key");
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, args);
        // 将普通队列绑定到普通交换机
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal_key");
        // 将死信队列绑定到死信交换机
        channel.queueBind(DEAD_LETTER_QUEUE, DEAD_LETTER_EXCHANGE, "dead_letter_key");
    }
}

发送消息并设置过期时间

当队列和交换机声明完成后,可以通过普通交换机发送消息到普通队列,并设置消息的过期时间(expiration属性)。例如,发送一条文本消息并设置过期时间为 5000 毫秒(5 秒):

String message = "Delayed Message";
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
       .expiration("5000")
       .build();
channel.basicPublish(NORMAL_EXCHANGE, "normal_key", properties, message.getBytes());

消费者从死信队列中接收消息进行处理。可以使用basicConsume方法来设置消费者从死信队列接收消息。例如:

Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String receivedMessage = new String(body, "UTF - 8");
        System.out.println("Received delayed message: " + receivedMessage);
    }
};
channel.basicConsume(DEAD_LETTER_QUEUE, true, consumer);

2.2 使用rabbitmq_delayed_message_exchange 插件

原理:RabbitMQ 提供了一个名为rabbitmq_delayed_message_exchange的插件来实现延时队列。这个插件允许创建一种特殊的交换机,称为延迟交换机(Delayed Message Exchange)。当消息发送到这种交换机时,可以指定一个延迟时间,在延迟时间过后,消息才会被路由到相应的队列,然后消费者就可以从队列中获取并处理消息。

步骤示例:

安装插件:

首先需要安装rabbitmq_delayed_message_exchange插件。对于不同的操作系统和 RabbitMQ 安装方式,安装步骤会有所不同。在基于 Debian 或 Ubuntu 的系统中,如果是通过包管理方式安装的 RabbitMQ,可以使用以下命令安装插件:

sudo rabbitmq - plugins enable rabbitmq_delayed_message_exchange

安装完成后,需要重启 RabbitMQ 服务使插件生效。

声明延迟交换机和队列:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DelayedQueuePluginExample {
    private static final String DELAYED_EXCHANGE = "delayed_exchange";
    private static final String QUEUE = "delayed_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明延迟交换机,设置类型为x - delayed - message
        channel.exchangeDeclare(DELAYED_EXCHANGE, "x - delayed - message");
        channel.queueDeclare(QUEUE, false, false, false, null);
        // 将队列绑定到延迟交换机
        channel.queueBind(QUEUE, DELAYED_EXCHANGE, "");
    }
}

发送延迟消息:

String message = "Delayed Message via Plugin";
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
       .headers(new HashMap<String, Object>() {
            {
                put("x - delayed - type", "direct");
                put("x - delay", 10000);
            }
        })
       .build();
channel.basicPublish(DELAYED_EXCHANGE, "", properties, message.getBytes());

后续消费者接收消息就和普通交换机一样。


http://www.kler.cn/a/447609.html

相关文章:

  • 2014年IMO第4题
  • 推挽输出和开漏输出
  • 2024年港澳台华侨生联考师范类院校录取情况来
  • Java模拟多个Mqtt客户端连接Mqtt Broker
  • 【设计模式】空接口
  • docker简单命令
  • Windows通过git-bash安装zsh
  • 基于 iAP2 协议 的指令协议,用于对安防设备的 MCU 进行操作
  • 【Java基础面试题029】Java中的hashCode和equals方法之间有什么关系?
  • Python tkinter写的《电脑装配单》和 Html版 可打印 可导出 excel 文件
  • CV算法在工作中有哪些实际应用?
  • 数据挖掘之认识数据
  • C++9--前置++和后置++重载,const,日期类的实现(对前几篇知识点的应用)
  • docker hub上下载使用postgis官方插件
  • 【python 字典(dict)和集合(set)】创建、访问、基本操作及各自的特点】
  • keil已有项目改工程名
  • 1387. 将整数按权重排序 中等
  • 3大Excel免费功能
  • 吉快科技荣膺“金边奖·最佳大模型一体机”,引领AI边缘新时代
  • 江苏计算机专转本 技能Mysql知识点总结(二)
  • C05S07-Tomcat服务架设
  • 15款行业大数据报告下载网站
  • H5 ios软键盘弹起遮挡输入框
  • #渗透测试#漏洞挖掘#红蓝攻防#护网#sql注入介绍06-基于子查询的SQL注入(Subquery-Based SQL Injection)
  • macOS 配置 vscode 命令行启动
  • pat乙级1072 开学寄语