7.RabbitMQ延时交换机
七、延时交换机与延时队列
1、延时问题
(1)、问题引入
场景:有一个订单,15分钟内如果不支付,就把该订单设置为交易关闭,那么就不能支付了
(2)、解决方式
-
定时任务方式
每隔3秒扫描一次数据库,查询过期的订单然后进行处理;
优点:
- 简单,容易实现;
缺点:
- 存在延迟(延迟时间不准确),如果你每隔1分钟扫一次,那么就有可能延迟1分钟;
- 性能较差,每次扫描数据库,如果订单量很大会影响性能
-
被动取消
当用户查询订单的时候,判断订单是否超时,超时了就取消(交易关闭);
优点:
- 对服务器而言,压力小;
缺点:
- 用户不查询订单,将永远处于待支付状态,会对数据统计等功能造成影响;
- 用户打开订单页面,有可能比较慢,因为要处理大量订单,用户体验少稍差;
-
JDK延迟队列(单体应用,不能分布式下)
DelayedQueue
无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素
优点:
- 实现简单,任务延迟低;
缺点:
- 服务重启、宕机,数据丢失;
- 只适合单机版,不适合集群;
- 订单量大,可能内存不足而发生异常;
-
采用消息中间件(rabbitmq)
RabbitMQ本身不支持延迟队列,可以使用TTL结合DLX的方式来实现消息的延迟投递,即把DLX跟某个队列绑定,到了指定时间,消息过期后,就会从DLX路由到这个队列,消费者可以从这个队列取走消息。
不同延迟时间的消息要发到不同的队列上,同一个队列的消息,它的延迟时间应该一样
2、延时交换机
7.2.1、死信队列实现
(1)、实现方式
给正常队列绑定一个死信交换机和设置死信路由key
给正常队列设置消息过期时间,过期时间用于模拟延时操作
当消息过期后没有被消费就会转到死信队列
测试模块rabbitmq-07-delay-01
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ属性
server:
port: 8080
spring:
application:
name: delay-learn1
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
// 正常交换机(
public static final String EXCHANGE_NAME = "exchange.delay.normal.1";
// 死信交换机
public static final String EXCHANGE_DLX_NAME = "exchange.delay.dlx.1";
// 正常队列
public static final String QUEUE_NAME = "queue.delay.normal.1";
// 死信队列
public static final String QUEUE_DLX_NAME = "queue.delay.dlx.1";
// 正常路由key
public static final String ROUTING_NAME = "order1";
// 死信路由key
public static final String ROUTING_DLX_NAME = "error1";
}
生产者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
@Service
@Slf4j
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}
定义消息队列
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
/**
* 正常交换机
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
/**
* 正常队列
*
* @return
*/
@Bean
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
// 重点:设置这两个参数
//设置队列的死信交换机
arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
//设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机
arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME);
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME)
.ttl(25000) //队列的过期时间
.withArguments(arguments) // 设置对列的参数
.build();
}
/**
* 正常交换机和正常队列绑定
*
* @param normalExchange
* @param normalQueue
* @return
*/
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME);
}
/**
* 死信交换机
*
* @return
*/
@Bean
public DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
}
/**
* 死信队列
*
* @return
*/
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();
}
/**
* 死信交换机和死信队列绑定
*
* @param dlxExchange
* @param dlxQueue
* @return
*/
@Bean
public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME);
}
}
发送消息
package com.longdidi;
import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Rabbitmq07Delay01Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq07Delay01Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
测试
(2)、存在的问题
如果不设置队列的过期时间,在发送消息时设置消息的过期时间会存在以下问题
- 如果队头的消息过期时间长,后面的消息过期时间端,但是因为队头的消息没有被消费,因此后面已过期的消息也无法到达死信队列中
测试模块rabbitmq-07-delay-02
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ属性
server:
port: 8080
spring:
application:
name: delay-learn2
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
// 正常交换机(
public static final String EXCHANGE_NAME = "exchange.delay.normal.2";
// 死信交换机
public static final String EXCHANGE_DLX_NAME = "exchange.delay.dlx.2";
// 正常队列
public static final String QUEUE_NAME = "queue.delay.normal.2";
// 死信队列
public static final String QUEUE_DLX_NAME = "queue.delay.dlx.2";
// 正常路由key
public static final String ROUTING_NAME = "order2";
// 死信路由key
public static final String ROUTING_DLX_NAME = "error2";
}
生产者
发送消息时先发送一条过期时间长的,再发送一条过期时间短的消息
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.amqp.core.MessageProperties;
import java.util.Date;
@Service
@Slf4j
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
{
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("25000"); //第一条消息
Message message = MessageBuilder.withBody("hello world 1".getBytes())
.andProperties(messageProperties)
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
{
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("15000"); //第二条消息
Message message = MessageBuilder.withBody("hello world 2".getBytes())
.andProperties(messageProperties)
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}
}
消费者
消费者监听死信队列的消息来查看消息接收的时间
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
@Slf4j
public class ReceiveMessageService {
/**
* 延迟队列一定要接收死信队列的消息
*/
@RabbitListener(queues = RabbitMQConstant.QUEUE_DLX_NAME)
public void receiveMsg(Message message) {
String body = new String(message.getBody());
log.info("接收到的消息为:{},接收时间为:{}", body, new Date());
}
}
定义消息队列
注意:正常队列不要设置过期时间
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
/**
* 正常交换机
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
/**
* 正常队列
*
* @return
*/
@Bean
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
// 重点:设置这两个参数
//设置队列的死信交换机
arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
//设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机
arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME);
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME)
//.ttl(5000) //队列的过期时间
.withArguments(arguments) // 设置对列的参数
.build();
}
/**
* 正常交换机和正常队列绑定
*
* @param normalExchange
* @param normalQueue
* @return
*/
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME);
}
/**
* 死信交换机
*
* @return
*/
@Bean
public DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
}
/**
* 死信队列
*
* @return
*/
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();
}
/**
* 死信交换机和死信队列绑定
*
* @param dlxExchange
* @param dlxQueue
* @return
*/
@Bean
public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME);
}
}
发送消息
package com.longdidi;
import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Rabbitmq07Delay02Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq07Delay02Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
测试
(3)、多队列解决过期时间问题
对于上面存在的问题,可以将不同过期时间的消息发送到不同的队列上,过期后再转到死信队列上
测试模块rabbitmq-07-delay-03
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ属性
server:
port: 8080
spring:
application:
name: delay-learn3
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NAME = "exchange.delay.normal.3";
// 死信交换机
public static final String EXCHANGE_DLX_NAME = "exchange.delay.dlx.3";
// 正常订单队列
public static final String QUEUE_ORDER_NAME = "queue.delay.normal.order.3";
// 正常支付队列
public static final String QUEUE_PAY_NAME = "queue.delay.normal.pay.3";
// 死信队列
public static final String QUEUE_DLX_NAME = "queue.delay.dlx.3";
// 订单路由key
public static final String ROUTING_ORDER_NAME = "order3";
// 支付路由key
public static final String ROUTING_PAY_NAME = "pay3";
// 死信路由key
public static final String ROUTING_DLX_NAME = "error3";
}
生产者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
import org.springframework.amqp.core.MessageProperties;
@Service
@Slf4j
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
{
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("10000"); //第一条消息
Message message = MessageBuilder.withBody("hello world 1".getBytes())
.andProperties(messageProperties)
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_ORDER_NAME, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
{
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("5000"); //第二条消息
Message message = MessageBuilder.withBody("hello world 2".getBytes())
.andProperties(messageProperties)
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_PAY_NAME, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}
}
消费者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
@Slf4j
public class ReceiveMessageService {
/**
* 延迟队列一定要接收死信队列的消息
*/
@RabbitListener(queues = RabbitMQConstant.QUEUE_DLX_NAME)
public void receiveMsg(Message message) {
String body = new String(message.getBody());
log.info("接收到的消息为:{},接收时间为:{}", body, new Date());
}
}
定义消息队列
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
/**
* 正常交换机
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
/**
* 订单队列
*
* @return
*/
@Bean
public Queue normalOrderQueue() {
Map<String, Object> arguments = new HashMap<>();
// 重点:设置这两个参数
//设置队列的死信交换机
arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
//设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机
arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME);
return QueueBuilder.durable(RabbitMQConstant.QUEUE_ORDER_NAME)
.withArguments(arguments) // 设置对列的参数
.build();
}
/**
* 支付队列
*
* @return
*/
@Bean
public Queue normalPayQueue() {
Map<String, Object> arguments = new HashMap<>();
// 重点:设置这两个参数
//设置队列的死信交换机
arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
//设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机
arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME);
return QueueBuilder.durable(RabbitMQConstant.QUEUE_PAY_NAME)
.withArguments(arguments) // 设置对列的参数
.build();
}
/**
* 正常交换机和订单队列绑定
*
* @param normalExchange
* @param normalOrderQueue
* @return
*/
@Bean
public Binding bindingOrderNormal(DirectExchange normalExchange, Queue normalOrderQueue) {
return BindingBuilder.bind(normalOrderQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_ORDER_NAME);
}
/**
* 正常交换机和支付队列绑定
*
* @param normalExchange
* @param normalPayQueue
* @return
*/
@Bean
public Binding bindingPayNormal(DirectExchange normalExchange, Queue normalPayQueue) {
return BindingBuilder.bind(normalPayQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_PAY_NAME);
}
/**
* 死信交换机
*
* @return
*/
@Bean
public DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
}
/**
* 死信队列
*
* @return
*/
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();
}
/**
* 死信交换机和死信队列绑定
*
* @param dlxExchange
* @param dlxQueue
* @return
*/
@Bean
public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME);
}
}
发送消息
package com.longdidi;
import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Rabbitmq07Delay03Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq07Delay03Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
测试
7.2.2、使用延时插件
(1)、安装插件
第一步:下载
选择对应的版本下载 rabbitmq-delayed-message-exchange 插件
下载地址:http://www.rabbitmq.com/community-plugins.html
第二步:插件拷贝到 RabbitMQ 服务器plugins目录下
第三步:解压缩
cd /usr/local/rabbitmq_server-4.0.7/plugins
unzip rabbitmq_delayed_message_exchange-v4.0.7.ez
如果unzip 没有安装,先安装一下
yum install unzip -y
第四步:启用插件
cd /usr/local/rabbitmq_server-4.0.7/sbin/
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange
第五步:查询安装情况
查询安装的所有插件
./rabbitmq-plugins list
重启rabbitmq使其生效(此处也可以不重启)
(2)、实现原理
消息发送后不会直接投递到队列,而是先存储到内嵌的 Mnesia数据库中,然后会检查 x-delay 时间(消息头部),将过期的消息放到死信队列中
延迟插件在 RabbitMQ 3.5.7 及以上的版本才支持,依赖 Erlang/OPT 18.0 及以上运行环境;
Mnesia 是一个小型数据库,不适合于大量延迟消息的实现
解决了消息过期时间不一致出现的问题
(3)、实现延时队列
消息只要发送到延时交换机即可,延时交换机绑定死信路由的key
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ属性
server:
port: 8080
spring:
application:
name: delay-learn4
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
// 正常交换机(死信交换机)
public static final String EXCHANGE_NAME = "exchange.delay.4";
public static final String QUEUE_NAME = "queue.delay.4";
public static final String ROUTING_NAME = "plugin4";
}
生产者
生产者发送消息时要在headers中添加过期时间
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
@Service
@Slf4j
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
{
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("x-delay", 25000);//第一条消息 延迟时间
//messageProperties.setExpiration("25000"); //不要用这个
Message message = MessageBuilder.withBody("hello world 1".getBytes())
.andProperties(messageProperties)
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
{
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("x-delay", 15000);//第二条消息 延迟时间
//messageProperties.setExpiration("15000"); //不要用这个
Message message = MessageBuilder.withBody("hello world 2".getBytes())
.andProperties(messageProperties)
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}
}
消费者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
@Slf4j
public class ReceiveMessageService {
/**
* 延迟队列一定要接收死信队列的消息
*/
@RabbitListener(queues = RabbitMQConstant.QUEUE_NAME)
public void receiveMsg(Message message) {
String body = new String(message.getBody());
log.info("接收到的消息为:{},接收时间为:{}", body, new Date());
}
}
定义消息队列
注意延时交换机需要使用自定义类型定义
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
/**
* 创建自定义交换机
*
* @return
*/
@Bean
public CustomExchange customExchange() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct"); //放一个参数
//CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)
return new CustomExchange(RabbitMQConstant.EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
}
@Bean
public Queue queue() {
return QueueBuilder
.durable(RabbitMQConstant.QUEUE_NAME) //队列名称
.build();
}
@Bean
public Binding binding(CustomExchange customExchange, Queue queue) {
//绑定也指定路由key,加noargs 方法
return BindingBuilder.bind(queue).to(customExchange).with(RabbitMQConstant.ROUTING_NAME).noargs();
}
}
发送消息
package com.longdidi;
import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Rabbitmq07Delay04Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq07Delay04Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
.QUEUE_NAME) //队列名称
.build();
}
@Bean
public Binding binding(CustomExchange customExchange, Queue queue) {
//绑定也指定路由key,加noargs 方法
return BindingBuilder.bind(queue).to(customExchange).with(RabbitMQConstant.ROUTING_NAME).noargs();
}
}
**发送消息**
```java
package com.longdidi;
import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Rabbitmq07Delay04Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq07Delay04Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}