当前位置: 首页 > article >正文

Spring Boot 项目中整合 RabbitMQ,使用死信队列(Dead Letter Exchange, DLX)实现延迟队列功能

在 Spring Boot 项目中整合 RabbitMQ 并使用死信队列(Dead Letter Exchange, DLX)实现延迟队列功能,可以通过配置 RabbitMQ 的交换机、队列和绑定关系来实现。以下是一个完整的示例,展示如何在 Spring Boot 项目中使用死信队列实现延迟队列。

1. 添加依赖

在 pom.xml 文件中添加 RabbitMQ 的依赖:

xml

复制

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

运行 HTML

2. 配置 RabbitMQ

在 application.yml 文件中配置 RabbitMQ 的连接信息:

yaml

复制

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

3. 创建 RabbitMQ 配置类

创建一个配置类,用于定义交换机、队列和绑定关系:

java

复制

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {

    public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
    public static final String DELAY_QUEUE_NAME = "delay.queue";
    public static final String DLX_EXCHANGE_NAME = "dlx.exchange";
    public static final String DLX_QUEUE_NAME = "dlx.queue";

    // 创建死信交换机
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(DLX_EXCHANGE_NAME);
    }

    // 创建死信队列
    @Bean
    public Queue dlxQueue() {
        return new Queue(DLX_QUEUE_NAME);
    }

    // 绑定死信队列到死信交换机
    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("");
    }

    // 创建延迟交换机
    @Bean
    public DirectExchange delayExchange() {
        return new DirectExchange(DELAY_EXCHANGE_NAME);
    }

    // 创建延迟队列,并设置死信交换机
    @Bean
    public Queue delayQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
        return new Queue(DELAY_QUEUE_NAME, false, false, false, args);
    }

    // 绑定延迟队列到延迟交换机
    @Bean
    public Binding delayBinding() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("");
    }
}

4. 发送延迟消息

创建一个消息发送服务,用于发送延迟消息:

java

复制

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class RabbitMQSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void sendDelayMessage(String message, long delayTime) {
        rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, "", message, msg -> {
            msg.getMessageProperties().setExpiration(String.valueOf(delayTime));
            return msg;
        });
        System.out.println(" [x] Sent '" + message + "' with delay " + delayTime + " ms");
    }
}

5. 消费延迟消息

创建一个消息消费者,用于消费延迟消息:

java

复制

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQReceiver {

    @RabbitListener(queues = RabbitMQConfig.DLX_QUEUE_NAME)
    public void receiveDelayMessage(String message) {
        System.out.println(" [x] Received '" + message + "'");
    }
}

6. 测试延迟队列

创建一个测试类,用于测试延迟队列的功能:

java

复制

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQTest implements CommandLineRunner {

    @Autowired
    private RabbitMQSender rabbitMQSender;

    @Override
    public void run(String... args) throws Exception {
        String message = "Hello, RabbitMQ Delay Queue!";
        long delayTime = 5000; // 延迟 5 秒
        rabbitMQSender.sendDelayMessage(message, delayTime);
    }
}

7. 运行项目

启动 Spring Boot 项目,观察控制台输出,可以看到消息在延迟 5 秒后被消费。

总结

  • 配置 RabbitMQ:在 application.yml 文件中配置 RabbitMQ 的连接信息。

  • 创建 RabbitMQ 配置类:定义交换机、队列和绑定关系,设置死信交换机。

  • 发送延迟消息:通过 AmqpTemplate 发送带有 TTL 的消息。

  • 消费延迟消息:通过 @RabbitListener 注解消费死信队列中的消息。

通过这种方式,可以在 Spring Boot 项目中使用死信队列实现延迟队列功能。需要注意的是,消息的 TTL 是以毫秒为单位的,且消息的延迟时间不能超过 RabbitMQ 的最大消息大小限制。

上面代码没看懂的,看这里,以下是详细的流程:

  1. 发送延迟消息

    • 生产者将消息发送到延迟交换机(delay.exchange),并设置消息的 TTL。

    • 消息被路由到延迟队列(delay.queue)。

  2. 延迟队列处理

    • 延迟队列没有消费者,消息会在队列中等待,直到 TTL 到期。

    • TTL 到期后,消息被转发到死信交换机(dlx.exchange)。

  3. 死信队列处理

    • 死信交换机将消息路由到死信队列(dlx.queue)。

    • 死信队列的消费者消费这些消息。


http://www.kler.cn/a/312132.html

相关文章:

  • CLion配置QT开发环境
  • 无插件H5播放器EasyPlayer.js网页web无插件播放器vue和react详细介绍
  • https网站 请求http图片报错:net::ERR_SSL_PROTOCOL_ERROR
  • 使用Matlab神经网络工具箱
  • MySQL中的事务与锁
  • Linux——简单认识vim、gcc以及make/Makefile
  • 设计模式 组合模式(Composite Pattern)
  • 【HTTP】认识 URL 和 URL encode
  • KL散度(Kullback-Leibler)
  • java框架
  • 深入理解 MySQL MVCC:多版本并发控制的核心机制
  • vmware,centos8(虚拟机) 的安装
  • Python Web开发中的持续集成与持续交付(CI/CD)
  • cassandra指定配置文件的docker启动方法
  • 【学术会议征稿】第四届计算机、信息工程与电子材料国际学术会议 (CTIEEM 2024)
  • 微信小程序能不能有一种公共的分包,能被普通的分包引用其资源?(内有解决方案)
  • 【测试】博客系统测试报告
  • docker面经
  • 【Mac】系统环境配置
  • mybatisplus分页查询学习
  • QT应用开发的C++功能框架以及实战入门开发项目场景
  • 计算机网路(应用层)
  • 中台架构下的数据仓库与非结构化数据整合
  • Nuxt Kit 中的插件:创建与使用
  • ffmpeg实现视频的合成与分割
  • 特征融合魔改,看这一篇就够了