当前位置: 首页 > article >正文

7.RabbitMQ延时交换机

七、延时交换机与延时队列

1、延时问题

(1)、问题引入

场景:有一个订单,15分钟内如果不支付,就把该订单设置为交易关闭,那么就不能支付了

(2)、解决方式

  • 定时任务方式

    每隔3秒扫描一次数据库,查询过期的订单然后进行处理;

    优点:

    • 简单,容易实现;

    缺点:

    • 存在延迟(延迟时间不准确),如果你每隔1分钟扫一次,那么就有可能延迟1分钟;
    • 性能较差,每次扫描数据库,如果订单量很大会影响性能
  • 被动取消

    当用户查询订单的时候,判断订单是否超时,超时了就取消(交易关闭);

    优点:

    • 对服务器而言,压力小;

    缺点:

    • 用户不查询订单,将永远处于待支付状态,会对数据统计等功能造成影响;
    • 用户打开订单页面,有可能比较慢,因为要处理大量订单,用户体验少稍差;
  • JDK延迟队列(单体应用,不能分布式下)

    DelayedQueue

    无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素

    优点:

    • 实现简单,任务延迟低;

    缺点:

    • 服务重启、宕机,数据丢失;
    • 只适合单机版,不适合集群;
    • 订单量大,可能内存不足而发生异常;
  • 采用消息中间件(rabbitmq)

    RabbitMQ本身不支持延迟队列,可以使用TTL结合DLX的方式来实现消息的延迟投递,即把DLX跟某个队列绑定,到了指定时间,消息过期后,就会从DLX路由到这个队列,消费者可以从这个队列取走消息。

    不同延迟时间的消息要发到不同的队列上,同一个队列的消息,它的延迟时间应该一样

    在这里插入图片描述

2、延时交换机

7.2.1、死信队列实现

(1)、实现方式

给正常队列绑定一个死信交换机和设置死信路由key

给正常队列设置消息过期时间,过期时间用于模拟延时操作

当消息过期后没有被消费就会转到死信队列

测试模块rabbitmq-07-delay-01

引入依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置MQ属性

server:
  port: 8080

spring:
  application:
    name: delay-learn1

  rabbitmq:
    host: 192.168.1.101
    port: 5672
    username: admin
    password: 123456
    virtual-host: longdidi

定义常量

package com.longdidi.constants;

public class RabbitMQConstant {
    // 正常交换机(
    public static final String EXCHANGE_NAME = "exchange.delay.normal.1";

    // 死信交换机
    public static final String EXCHANGE_DLX_NAME = "exchange.delay.dlx.1";

    // 正常队列
    public static final String QUEUE_NAME = "queue.delay.normal.1";

    // 死信队列
    public static final String QUEUE_DLX_NAME = "queue.delay.dlx.1";

    // 正常路由key
    public static final String ROUTING_NAME = "order1";

    // 死信路由key
    public static final String ROUTING_DLX_NAME = "error1";

}

生产者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;

@Service
@Slf4j
public class SendMessageService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg() {
        Message message = MessageBuilder.withBody("hello world".getBytes())
                .build();
        rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message);
        log.info("消息发送完毕,发送时间为:{}", new Date());
    }
}

定义消息队列

package com.longdidi.config;

import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {

    /**
     * 正常交换机
     *
     * @return
     */
    @Bean
    public DirectExchange normalExchange() {
        return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
    }

    /**
     * 正常队列
     *
     * @return
     */
    @Bean
    public Queue normalQueue() {
        Map<String, Object> arguments = new HashMap<>();
        // 重点:设置这两个参数
        //设置队列的死信交换机
        arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
        //设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机
        arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME);
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME)
                .ttl(25000) //队列的过期时间
                .withArguments(arguments) // 设置对列的参数
                .build();
    }

    /**
     * 正常交换机和正常队列绑定
     *
     * @param normalExchange
     * @param normalQueue
     * @return
     */
    @Bean
    public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME);
    }

    /**
     * 死信交换机
     *
     * @return
     */
    @Bean
    public DirectExchange dlxExchange() {
        return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
    }

    /**
     * 死信队列
     *
     * @return
     */
    @Bean
    public Queue dlxQueue() {
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();
    }

    /**
     * 死信交换机和死信队列绑定
     *
     * @param dlxExchange
     * @param dlxQueue
     * @return
     */
    @Bean
    public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
        return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME);
    }
}

发送消息

package com.longdidi;

import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Rabbitmq07Delay01Application implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(Rabbitmq07Delay01Application.class, args);
    }

    @Resource
    private SendMessageService sendMessageService;

    /**
     * 程序一启动就会运行该方法
     *
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        sendMessageService.sendMsg();
    }
}

测试

在这里插入图片描述

(2)、存在的问题

如果不设置队列的过期时间,在发送消息时设置消息的过期时间会存在以下问题

  • 如果队头的消息过期时间长,后面的消息过期时间端,但是因为队头的消息没有被消费,因此后面已过期的消息也无法到达死信队列中

测试模块rabbitmq-07-delay-02

引入依赖

	<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置MQ属性

server:
  port: 8080

spring:
  application:
    name: delay-learn2

  rabbitmq:
    host: 192.168.1.101
    port: 5672
    username: admin
    password: 123456
    virtual-host: longdidi

定义常量

package com.longdidi.constants;

public class RabbitMQConstant {
    // 正常交换机(
    public static final String EXCHANGE_NAME = "exchange.delay.normal.2";

    // 死信交换机
    public static final String EXCHANGE_DLX_NAME = "exchange.delay.dlx.2";

    // 正常队列
    public static final String QUEUE_NAME = "queue.delay.normal.2";

    // 死信队列
    public static final String QUEUE_DLX_NAME = "queue.delay.dlx.2";

    // 正常路由key
    public static final String ROUTING_NAME = "order2";

    // 死信路由key
    public static final String ROUTING_DLX_NAME = "error2";

}

生产者

发送消息时先发送一条过期时间长的,再发送一条过期时间短的消息

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.amqp.core.MessageProperties;

import java.util.Date;

@Service
@Slf4j
public class SendMessageService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg() {
        {
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setExpiration("25000"); //第一条消息
            Message message = MessageBuilder.withBody("hello world 1".getBytes())
                    .andProperties(messageProperties)
                    .build();
            rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message);
            log.info("消息发送完毕,发送时间为:{}", new Date());
        }
        {
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setExpiration("15000"); //第二条消息
            Message message = MessageBuilder.withBody("hello world 2".getBytes())
                    .andProperties(messageProperties)
                    .build();
            rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message);
            log.info("消息发送完毕,发送时间为:{}", new Date());
        }
    }
}

消费者

消费者监听死信队列的消息来查看消息接收的时间

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
@Slf4j
public class ReceiveMessageService {
    /**
     * 延迟队列一定要接收死信队列的消息
     */
    @RabbitListener(queues = RabbitMQConstant.QUEUE_DLX_NAME)
    public void receiveMsg(Message message) {
        String body = new String(message.getBody());
        log.info("接收到的消息为:{},接收时间为:{}", body, new Date());
    }
}

定义消息队列

注意:正常队列不要设置过期时间

package com.longdidi.config;

import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {

    /**
     * 正常交换机
     *
     * @return
     */
    @Bean
    public DirectExchange normalExchange() {
        return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
    }

    /**
     * 正常队列
     *
     * @return
     */
    @Bean
    public Queue normalQueue() {
        Map<String, Object> arguments = new HashMap<>();
        // 重点:设置这两个参数
        //设置队列的死信交换机
        arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
        //设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机
        arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME);
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME)
                //.ttl(5000) //队列的过期时间
                .withArguments(arguments) // 设置对列的参数
                .build();
    }

    /**
     * 正常交换机和正常队列绑定
     *
     * @param normalExchange
     * @param normalQueue
     * @return
     */
    @Bean
    public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME);
    }

    /**
     * 死信交换机
     *
     * @return
     */
    @Bean
    public DirectExchange dlxExchange() {
        return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
    }

    /**
     * 死信队列
     *
     * @return
     */
    @Bean
    public Queue dlxQueue() {
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();
    }

    /**
     * 死信交换机和死信队列绑定
     *
     * @param dlxExchange
     * @param dlxQueue
     * @return
     */
    @Bean
    public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
        return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME);
    }
}

发送消息

package com.longdidi;

import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Rabbitmq07Delay02Application implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(Rabbitmq07Delay02Application.class, args);
    }

    @Resource
    private SendMessageService sendMessageService;

    /**
     * 程序一启动就会运行该方法
     *
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        sendMessageService.sendMsg();
    }
}

测试

在这里插入图片描述

(3)、多队列解决过期时间问题

对于上面存在的问题,可以将不同过期时间的消息发送到不同的队列上,过期后再转到死信队列上

测试模块rabbitmq-07-delay-03

引入依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置MQ属性

server:
  port: 8080

spring:
  application:
    name: delay-learn3

  rabbitmq:
    host: 192.168.1.101
    port: 5672
    username: admin
    password: 123456
    virtual-host: longdidi

定义常量

package com.longdidi.constants;

public class RabbitMQConstant {
    // 正常交换机
    public static final String EXCHANGE_NAME = "exchange.delay.normal.3";
    // 死信交换机
    public static final String EXCHANGE_DLX_NAME = "exchange.delay.dlx.3";
    // 正常订单队列
    public static final String QUEUE_ORDER_NAME = "queue.delay.normal.order.3";
    // 正常支付队列
    public static final String QUEUE_PAY_NAME = "queue.delay.normal.pay.3";
    // 死信队列
    public static final String QUEUE_DLX_NAME = "queue.delay.dlx.3";
    // 订单路由key
    public static final String ROUTING_ORDER_NAME = "order3";
    // 支付路由key
    public static final String ROUTING_PAY_NAME = "pay3";
    // 死信路由key
    public static final String ROUTING_DLX_NAME = "error3";
}

生产者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;

import org.springframework.amqp.core.MessageProperties;

@Service
@Slf4j
public class SendMessageService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg() {
        {
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setExpiration("10000"); //第一条消息
            Message message = MessageBuilder.withBody("hello world 1".getBytes())
                    .andProperties(messageProperties)
                    .build();
            rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_ORDER_NAME, message);
            log.info("消息发送完毕,发送时间为:{}", new Date());
        }
        {
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setExpiration("5000"); //第二条消息
            Message message = MessageBuilder.withBody("hello world 2".getBytes())
                    .andProperties(messageProperties)
                    .build();
            rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_PAY_NAME, message);
            log.info("消息发送完毕,发送时间为:{}", new Date());
        }
    }
}

消费者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
@Slf4j
public class ReceiveMessageService {
    /**
     * 延迟队列一定要接收死信队列的消息
     */
    @RabbitListener(queues = RabbitMQConstant.QUEUE_DLX_NAME)
    public void receiveMsg(Message message) {
        String body = new String(message.getBody());
        log.info("接收到的消息为:{},接收时间为:{}", body, new Date());
    }
}

定义消息队列

package com.longdidi.config;

import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {

    /**
     * 正常交换机
     *
     * @return
     */
    @Bean
    public DirectExchange normalExchange() {
        return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
    }

    /**
     * 订单队列
     *
     * @return
     */
    @Bean
    public Queue normalOrderQueue() {
        Map<String, Object> arguments = new HashMap<>();
        // 重点:设置这两个参数
        //设置队列的死信交换机
        arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
        //设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机
        arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME);
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_ORDER_NAME)
                .withArguments(arguments) // 设置对列的参数
                .build();
    }

    /**
     * 支付队列
     *
     * @return
     */
    @Bean
    public Queue normalPayQueue() {
        Map<String, Object> arguments = new HashMap<>();
        // 重点:设置这两个参数
        //设置队列的死信交换机
        arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
        //设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机
        arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME);
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_PAY_NAME)
                .withArguments(arguments) // 设置对列的参数
                .build();
    }

    /**
     * 正常交换机和订单队列绑定
     *
     * @param normalExchange
     * @param normalOrderQueue
     * @return
     */
    @Bean
    public Binding bindingOrderNormal(DirectExchange normalExchange, Queue normalOrderQueue) {
        return BindingBuilder.bind(normalOrderQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_ORDER_NAME);
    }

    /**
     * 正常交换机和支付队列绑定
     *
     * @param normalExchange
     * @param normalPayQueue
     * @return
     */
    @Bean
    public Binding bindingPayNormal(DirectExchange normalExchange, Queue normalPayQueue) {
        return BindingBuilder.bind(normalPayQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_PAY_NAME);
    }

    /**
     * 死信交换机
     *
     * @return
     */
    @Bean
    public DirectExchange dlxExchange() {
        return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
    }

    /**
     * 死信队列
     *
     * @return
     */
    @Bean
    public Queue dlxQueue() {
        return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();
    }

    /**
     * 死信交换机和死信队列绑定
     *
     * @param dlxExchange
     * @param dlxQueue
     * @return
     */
    @Bean
    public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
        return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME);
    }
}

发送消息

package com.longdidi;

import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Rabbitmq07Delay03Application implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(Rabbitmq07Delay03Application.class, args);
    }

    @Resource
    private SendMessageService sendMessageService;

    /**
     * 程序一启动就会运行该方法
     *
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        sendMessageService.sendMsg();
    }
}

测试

在这里插入图片描述

7.2.2、使用延时插件

(1)、安装插件

第一步:下载

选择对应的版本下载 rabbitmq-delayed-message-exchange 插件

下载地址:http://www.rabbitmq.com/community-plugins.html

在这里插入图片描述

第二步:插件拷贝到 RabbitMQ 服务器plugins目录下

在这里插入图片描述

第三步:解压缩

cd /usr/local/rabbitmq_server-4.0.7/plugins
unzip rabbitmq_delayed_message_exchange-v4.0.7.ez

在这里插入图片描述

如果unzip 没有安装,先安装一下

yum install unzip -y

第四步:启用插件

cd /usr/local/rabbitmq_server-4.0.7/sbin/
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange 

在这里插入图片描述

第五步:查询安装情况

查询安装的所有插件

./rabbitmq-plugins list 

重启rabbitmq使其生效(此处也可以不重启)

在这里插入图片描述

(2)、实现原理

消息发送后不会直接投递到队列,而是先存储到内嵌的 Mnesia数据库中,然后会检查 x-delay 时间(消息头部),将过期的消息放到死信队列中

延迟插件在 RabbitMQ 3.5.7 及以上的版本才支持,依赖 Erlang/OPT 18.0 及以上运行环境;

在这里插入图片描述

Mnesia 是一个小型数据库,不适合于大量延迟消息的实现

解决了消息过期时间不一致出现的问题

(3)、实现延时队列

消息只要发送到延时交换机即可,延时交换机绑定死信路由的key

引入依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置MQ属性

server:
  port: 8080

spring:
  application:
    name: delay-learn4

  rabbitmq:
    host: 192.168.1.101
    port: 5672
    username: admin
    password: 123456
    virtual-host: longdidi

定义常量

package com.longdidi.constants;

public class RabbitMQConstant {
    // 正常交换机(死信交换机)
    public static final String EXCHANGE_NAME = "exchange.delay.4";

    public static final String QUEUE_NAME = "queue.delay.4";

    public static final String ROUTING_NAME = "plugin4";

}

生产者

生产者发送消息时要在headers中添加过期时间

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;

@Service
@Slf4j
public class SendMessageService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg() {
        {
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setHeader("x-delay", 25000);//第一条消息 延迟时间
            //messageProperties.setExpiration("25000"); //不要用这个
            Message message = MessageBuilder.withBody("hello world 1".getBytes())
                    .andProperties(messageProperties)
                    .build();
            rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message);
            log.info("消息发送完毕,发送时间为:{}", new Date());
        }
        {
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setHeader("x-delay", 15000);//第二条消息 延迟时间
            //messageProperties.setExpiration("15000"); //不要用这个
            Message message = MessageBuilder.withBody("hello world 2".getBytes())
                    .andProperties(messageProperties)
                    .build();
            rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message);
            log.info("消息发送完毕,发送时间为:{}", new Date());
        }
    }
}

消费者

package com.longdidi.service;

import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
@Slf4j
public class ReceiveMessageService {
    /**
     * 延迟队列一定要接收死信队列的消息
     */
    @RabbitListener(queues = RabbitMQConstant.QUEUE_NAME)
    public void receiveMsg(Message message) {
        String body = new String(message.getBody());
        log.info("接收到的消息为:{},接收时间为:{}", body, new Date());
    }
}

定义消息队列

注意延时交换机需要使用自定义类型定义

package com.longdidi.config;


import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {

    /**
     * 创建自定义交换机
     *
     * @return
     */
    @Bean
    public CustomExchange customExchange() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct"); //放一个参数
        //CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)
        return new CustomExchange(RabbitMQConstant.EXCHANGE_NAME, "x-delayed-message", true, false, arguments);

    }

    @Bean
    public Queue queue() {
        return QueueBuilder
                .durable(RabbitMQConstant.QUEUE_NAME) //队列名称
                .build();
    }

    @Bean
    public Binding binding(CustomExchange customExchange, Queue queue) {
        //绑定也指定路由key,加noargs 方法
        return BindingBuilder.bind(queue).to(customExchange).with(RabbitMQConstant.ROUTING_NAME).noargs();
    }

}

发送消息

package com.longdidi;

import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Rabbitmq07Delay04Application implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(Rabbitmq07Delay04Application.class, args);
    }

    @Resource
    private SendMessageService sendMessageService;

    /**
     * 程序一启动就会运行该方法
     *
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        sendMessageService.sendMsg();
    }
}

.QUEUE_NAME) //队列名称
.build();
}

@Bean
public Binding binding(CustomExchange customExchange, Queue queue) {
    //绑定也指定路由key,加noargs 方法
    return BindingBuilder.bind(queue).to(customExchange).with(RabbitMQConstant.ROUTING_NAME).noargs();
}

}


**发送消息**

```java
package com.longdidi;

import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Rabbitmq07Delay04Application implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(Rabbitmq07Delay04Application.class, args);
    }

    @Resource
    private SendMessageService sendMessageService;

    /**
     * 程序一启动就会运行该方法
     *
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        sendMessageService.sendMsg();
    }
}

http://www.kler.cn/a/573457.html

相关文章:

  • 分布式日志和责任链路
  • FFmpeg-chapter7-使用 FFmpeg 解码视频(原理篇)
  • Day02-云服务器+小皮phpstudy一键部署建站
  • STM32中的ADC
  • 带你从入门到精通——自然语言处理(五. 自注意力机制和transformer的输入部分)
  • Element Plus中的树组件的具体用法(持续更新!)
  • Logback:高性能日志框架完全指南
  • 微信小程序接入deepseek
  • 【文生图】windows 部署stable-diffusion-webui
  • 数组中的逆序对(C++)
  • BasicToolNode(tools=[search_tool, lookup_policy, query_sqldb])的内部执行逻辑
  • Android ChatOn-v1.66.536-598-[构建于ChatGPT和GPT-4o之上]
  • 大模型巅峰对决:DeepSeek vs GPT-4/Claude/PaLM-2 全面对比与核心差异揭秘
  • FastExcel/EasyExcel简介以及源码解析
  • 张岳教授:语言模型推理与泛化研究 | ICLR 2025 特邀报告与团队专场
  • 18类创新平台培育入库!长沙经开区2025年各类科技创新平台培育申报流程时间材料及申报条件
  • jQuery UI 简介
  • 【漫话机器学习系列】119.小批量随机梯度方法
  • 机器学习中的优化方法:从局部探索到全局约束
  • Measuring short-form factuality in large language models (SimpleQA) 论文简介