SpringBoot整合RabbitMQ应用
本文主要介绍SpringBoot中如何使用RabbitMQ,相关概念及基础使用参考RabbitMQ简单使用
常用配置及方法
展示rabbitmq各个模式在springboot中如何使用之前,先介绍rabbitmq在springboot中的一些常用配置及方法:
注册队列
//使用配置类以bean方式声明队列
@Configuration
public class QueueConfig {
@Bean
public Queue durableQueue() {
return QueueBuilder
.durable("simple_queue")
.exclusive()
.autoDelete()
.ttl(5000)
.expires(5000)
.maxLength(1000)
.maxLengthBytes(1000)
.maxPriority(1)
.deadLetterExchange("")
.deadLetterRoutingKey("")
.deliveryLimit(1)
.lazy()
.leaderLocator(null)
.overflow(null)
.quorum()
.singleActiveConsumer()
.build();
}
}
以下是 RabbitMQ 在 Spring Boot 中使用
QueueBuilder
创建队列时,各方法的详细作用:
1. durable(String queueName)
:声明一个持久化队列。该队列会在 RabbitMQ 服务重启后仍然存在,队列元数据和内容不会丢失。
2. exclusive()
:声明队列为排他队列。该队列只能由当前连接的客户端使用,并且连接断开时队列会被删除。
3. autoDelete()
:设置队列为自动删除。当队列不再被任何消费者使用时,RabbitMQ 会自动删除该队列。
4. ttl(int milliseconds)
:为队列中消息设置TTL,即过期时间。超过 TTL 时间的消息会变为死信。
5. expires(int milliseconds)
:设置队列的存活时间。如果队列在指定时间内未被使用(未绑定消费者或未接收消息),RabbitMQ 将自动删除该队列。
6. maxLength(int length)
:设置队列的最大消息数量。队列中消息数量不能超过设置条数,超过的消息会变为死信(Dead Letter)。
7. maxLengthBytes(int size)
:设置队列的最大消息体总大小。队列中消息体总大小不能超过参数size
字节,超出的消息会变为死信。
8. maxPriority(int priorityLevel)
:设置队列的最大优先级。优先级范围为 0
到 传入的参数priorityLevel
。比如设置10,优先级范围则是0-10
。开启此项后,存入此队列的每个消息都可以设置优先级属性,优先级越大越被优先消费。
9. deadLetterExchange(String exchangeName)
:指定死信队列的交换机。消息在变为死信时会被转发到该交换机。当传入空字符串 ""
时,表示没有绑定死信交换机。
10. deadLetterRoutingKey(String routingKey)
:指定死信队列的路由键。当消息被转发到死信交换机时,会使用该路由键。传入空字符串 ""
时,表示未指定。
11. deliveryLimit(int limit)
:设置消息的最大传递次数。消息如果被拒绝(basic.reject
或 basic.nack
)的次数超过此值,将变为死信。
12. lazy()
:设置队列为惰性队列。消息会尽量存储在磁盘而不是内存中,适合处理大量不频繁访问的消息。
13. leaderLocator(String policy)
:设置队列的主节点定位策略。用于集群模式下队列的主节点分配,例如通过策略指定主节点位置。此处为 null
,表示未设置。
14. overflow(String policy)
:设置队列溢出策略。参数为"reject-publish"
代表新消息会被拒绝。参数为"drop-head"
代表会丢弃最早的消息(头部)。参数为null
表示未设置溢出策略。
15. quorum()
:将队列设置为仲裁队列(Quorum Queue)。表示队列使用分布式架构(需要 RabbitMQ 3.8+ 支持),适合高可靠性场景。
16. singleActiveConsumer()
:启用单个活动消费者模式。即使队列有多个消费者连接,只有一个消费者会接收消息,其他消费者处于备用状态。
17. build()
:构建最终的队列配置对象,将前面所有配置参数应用到队列中。
注册交换机
//使用配置类以bean方式声明交换机
@Configuration
public class QueueConfig {
@Bean
public DirectExchange durableExchange() {
return ExchangeBuilder
.directExchange("direct_exchange")
.durable(true)
.admins()
.alternate("")
.autoDelete()
.delayed()
.ignoreDeclarationExceptions()
.internal()
.suppressDeclaration()
.withArgument("","")
.withArguments(null)
.build();
}
}
以下是 RabbitMQ 在 Spring Boot 中使用
ExchangeBuilder
创建交换机时,各方法的详细作用说明:
1. directExchange(String exchangeName)
:声明一个 direct
类型的交换机。参数为交换机名称。
2. durable(boolean durable)
:设置交换机是否持久化。true
-持久化,false
-非持久化,默认为true
。
3. admins(Object... admins)
:设置交换机的 RabbitAdmin 管理员。指定哪些 RabbitAdmin
实例可管理该交换机的声明、删除等操作。admins
是可变参数,可以指定一个或多个 RabbitAdmin
。
4. alternate(String alternateExchangeName)
:指定备用交换机(Alternate Exchange)。如果消息无法路由到绑定的队列,则会被转发到备用交换机。默认值:未指定备用交换机。
5. autoDelete()
:设置交换机为自动删除。当交换机不再被绑定到任何队列时,RabbitMQ 会自动删除该交换机。
6. delayed()
:设置交换机为延迟交换机。延迟交换机支持消息延迟投递到队列(需要 RabbitMQ 的延迟消息插件)。仅在 RabbitMQ 开启延迟功能插件时有效。
7. ignoreDeclarationExceptions()
:忽略交换机声明时发生的异常。声明交换机时,如果遇到异常(如交换机已存在但配置冲突),会自动忽略该异常,而不是抛出错误。
8. internal()
:将交换机设置为内部交换机。内部交换机只能由其他交换机进行消息路由,而无法直接接收生产者发送的消息。适用于消息的复杂路由逻辑。
9. suppressDeclaration()
:抑制交换机声明。声明不会在 RabbitMQ 上实际创建该交换机。用于测试或其他需要逻辑中声明但不实际创建的场景。
10. withArgument(String key, Object value)
:为交换机添加单个自定义参数。用于设置额外的参数,例如延迟交换机的 x-delayed-type
。
11. withArguments(Map<String, Object> arguments)
:为交换机添加多个自定义参数。
12. build()
:构建最终的交换机对象。将前面所有设置组合,返回一个可用于声明的交换机对象。
注册绑定关系
//使用配置类以bean方式声明交换机与队列的绑定关系
@Configuration
public class QueueConfig {
//省略队列与交换机,参考上面
//绑定交换机与队列
@Bean
public Binding durableBinding() {
return new Binding(
"durable_queue", //队列名
Binding.DestinationType.QUEUE, //指定绑定的目标类型为队列
"durable_exchange", //交换机名
"durable-key", //路由键
null //额外参数
);
}
}
Binding
类是 RabbitMQ 中用于描述交换机(Exchange)和队列(Queue)之间绑定关系的对象。它允许我们将一个队列绑定到交换机,或者将交换机绑定到交换机,并且在此过程中指定路由键(Routing Key)以及其他额外的参数。以下是该类的构造方法及参数的详细解析。
构造方法:
public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,
@Nullable Map<String, Object> arguments)
这个构造方法用于创建一个绑定对象。它接收五个参数:
参数 1:
destination
(String)
- 类型:
String
- 含义: 绑定的目的地,可以是队列或交换机。此参数表示该绑定的目标对象名称。
- 用途:
- 如果绑定的目标是队列,则
destination
为队列名称。 - 如果绑定的目标是交换机,则
destination
为交换机名称。
- 如果绑定的目标是队列,则
参数 2:
destinationType
(DestinationType)
- 类型:
DestinationType
- 含义: 指定
destination
是一个队列(QUEUE
)还是交换机(EXCHANGE
)。DestinationType
是一个枚举类型,包含了两个值:QUEUE
: 绑定的目标是一个队列。EXCHANGE
: 绑定的目标是一个交换机。
- 用途: 用于确定目标对象的类型,是队列还是交换机。这是决定绑定逻辑的重要信息。
参数 3:
exchange
(String)
- 类型:
String
- 含义: 要绑定的交换机名称。该参数指定了源交换机的名称。
- 用途: 这是绑定关系中的源交换机。如果目标是一个队列,则交换机将通过路由键来路由消息到队列。如果目标是一个交换机,则该交换机会将消息路由到其他交换机或队列。
参数 4:
routingKey
(String)
- 类型:
String
- 含义: 用于路由消息的路由键(Routing Key)。
- 用途:
- 在绑定队列和交换机时,
routingKey
决定了消息通过交换机时的路由规则。例如,在direct
类型的交换机中,路由键用于匹配队列;在topic
类型的交换机中,路由键支持模式匹配。
- 在绑定队列和交换机时,
参数 5:
arguments
(Map<String, Object>)
- 类型:
Map<String, Object>
- 含义: 可选的附加参数。用于配置绑定的额外属性,如
x-dead-letter-exchange
、x-message-ttl
等自定义属性。 - 用途:
- 可以传递额外的配置项,这些配置项会根据 RabbitMQ 的扩展功能被使用。例如,可以通过设置参数指定死信队列、消息过期时间、最大队列长度等。
arguments
是一个键值对的集合,其中的键是 RabbitMQ 允许的特殊参数名称,而值是相应的配置项。
MQ模板
RabbitTemplate
是 Spring AMQP 提供的一个强大且灵活的工具类,用于与 RabbitMQ 进行消息交互。它提供了方便的 API 来发送和接收消息,并支持异步操作、消息确认、回退、事务等功能,且易于配置和扩展。通过合理的配置和方法调用,RabbitTemplate
可以满足大多数与 RabbitMQ 集成的应用场景。
1. 基本功能和作用
RabbitTemplate
是一个用于发送和接收消息的同步工具,它实现了 RabbitMQ 的消息发送、消息接收和异步处理的功能。主要用于生产者和消费者之间的消息传递。
- 发送消息:通过
RabbitTemplate
可以将消息发送到 RabbitMQ 中的指定交换机和队列。 - 接收消息:
RabbitTemplate
还支持从队列中同步接收消息,并进行处理。
2. 常用配置
创建 RabbitTemplate Bean
通常,RabbitTemplate
会通过 Java 配置类创建并注入到 Spring 容器中。常见的创建方式如下:
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange("your-exchange");
rabbitTemplate.setRoutingKey("your-routing-key");
return rabbitTemplate;
}
connectionFactory
:连接工厂,用于创建到 RabbitMQ 的连接。setExchange
:指定消息发送的交换机(如果在发送时没有指定,RabbitTemplate
会使用默认的交换机)。setRoutingKey
:设置消息发送的路由键,通常是队列名或自定义的键。
注意: 默认情况下,springboot会根据连接信息为我们自动配置RabbitTemplate
,所以一般不需要单独做上面的配置,直接使用RabbitTemplate
即可。
核心方法
RabbitTemplate
提供了多种方法来发送和接收消息,最常用的包括:
send()
:发送消息到指定的交换机和路由键。convertAndSend()
:将一个对象转换为消息并发送(通常是 JSON 或其他格式的对象)。receive()
:接收一个消息并返回(同步方法)。receiveAndConvert()
:接收一个消息并转换为对象。
示例代码:
发送消息:
rabbitTemplate.convertAndSend("exchange", "routing-key", "消息内容");
接收消息:
Object response = rabbitTemplate.receiveAndConvert("queue-name");
发送/接收带有回调的消息
RabbitTemplate
还支持通过回调处理响应。例如,发送消息并等待消费者的回复:
Message responseMessage = rabbitTemplate.convertSendAndReceive("exchange", "routing-key", message);
convertSendAndReceive
:将消息发送到指定交换机和路由键并等待回复。sendAndReceive
:可以发送消息并在接收到响应后进行处理。它可以传递一个CorrelationData
对象,用于跟踪消息。
3. 常用配置和功能
消息确认和回退
通过配置 RabbitTemplate
,可以启用消息确认机制。RabbitTemplate
提供了 setConfirmCallback
和 setReturnsCallback
来处理消息确认和消息退回:
- ConfirmCallback: 当消息成功发送到 RabbitMQ 交换机时,回调被触发。
- ReturnsCallback: 如果消息无法路由到目标队列并被 RabbitMQ 退回,回调会被触发。
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
// 消息成功发送
} else {
// 处理失败的情况
}
});
rabbitTemplate.setReturnsCallback(returned -> {
// 处理消息被退回的情况
});
设置超时和重试
RabbitTemplate
允许设置超时、重试和消息过期等策略。例如:
setReplyTimeout()
:设置接收回复消息的超时时间。setRetryTemplate()
:自定义消息重试策略。
rabbitTemplate.setReplyTimeout(10000); // 设置回复超时为10秒
使用临时队列和回调队列
通过 RabbitTemplate
,可以指定临时队列作为消息回调队列。这对于 RPC 模式非常有用,消息发送后,消费者处理完消息可以通过回调队列返回结果。
rabbitTemplate.setUseTemporaryReplyQueues(true);
rabbitTemplate.setReplyAddress("callback-queue");
消息序列化与转换
RabbitTemplate
具有将消息体从 Java 对象转换为消息体字节数组的能力。可以通过自定义序列化器来实现不同的数据格式(例如 JSON、XML 或其他格式)。
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
Jackson2JsonMessageConverter
:使用 Jackson 将 Java 对象和 JSON 进行相互转换。
4. 常见的高级配置
设置消息的持久化
你可以设置消息的持久性,确保消息在 RabbitMQ 中不丢失。通常通过在队列中设置持久性或者通过 RabbitTemplate
设置消息持久性:
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = new Message("Persistent Message".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("exchange", "routing-key", message);
事务支持
通过 RabbitTemplate
可以开启事务模式。事务模式可以确保消息的发送和接收过程中的原子性。
rabbitTemplate.execute(channel -> {
channel.basicPublish("exchange", "routing-key", true, false, null);
return null;
});
高级异步支持
Spring AMQP 提供的 RabbitTemplate
支持异步消息发送。通过设置 async
配置项,RabbitTemplate
可以异步发送消息。
5. 常见使用场景
- 简单的消息发送和接收:
RabbitTemplate
用于同步地发送和接收消息,如简单的生产者-消费者模式。 - RPC模式:结合
sendAndReceive
和回调队列,用于实现 RPC(远程过程调用)模式。 - 消息确认与回退:使用
setConfirmCallback
和setReturnsCallback
配置消息的确认和失败回退处理。 - 消息持久化与事务:通过设置
DeliveryMode.PERSISTENT
或配置事务机制,保证消息的可靠性。
监听注解
在 Spring Boot 中,@RabbitListener
和 @RabbitHandler
是用于SpringBoot消费者处理 RabbitMQ 消息的两个重要注解。它们常常一起使用,但它们的用途和使用方式各有不同。以下是对这两个注解的详细解析。
@RabbitListener
@RabbitListener
用于标记一个方法或类,用来监听指定的队列并处理来自 RabbitMQ 的消息。
基本用法
-
在类级别使用
配置一个类监听某些队列,该类的具体消息处理方法需要使用@RabbitHandler
注解标注。@RabbitListener(queues = "example_queue") public class ExampleListener { @RabbitHandler public void handleMessage(String message) { System.out.println("Received: " + message); } }
-
在方法级别使用
将一个方法直接注册为队列的监听器。此时不需要@RabbitHandler
。public class ExampleListener { @RabbitListener(queues = "example_queue") public void handleMessage(String message) { System.out.println("Received: " + message); } }
关键属性
-
queues
- 指定要监听的队列名称。
- 如果队列不存在,可能会根据配置自动创建。
- 示例:
@RabbitListener(queues = "example_queue")
-
queuesToDeclare
- 用于声明需要监听的队列。
- 示例:
@RabbitListener(queuesToDeclare = @Queue(value = "example_queue", durable = "true"))
-
bindings
- 声明绑定关系(队列、交换机、路由键)。
- 示例:
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "example_queue", durable = "true"), exchange = @Exchange(value = "example_exchange", type = "direct"), key = "routing_key" ))
-
concurrency
- 设置消费者的并发数。
- 示例:
@RabbitListener(queues = "example_queue", concurrency = "3-10")
-
containerFactory
- 指定使用的
RabbitListenerContainerFactory
。 - 示例:
@RabbitListener(queues = "example_queue", containerFactory = "myContainerFactory")
- 指定使用的
-
errorHandler
- 自定义错误处理逻辑。
- 示例:
@RabbitListener(queues = "example_queue", errorHandler = "myErrorHandler")
-
messageConverter
- 自定义消息转换器,用于将消息体转换为目标类型。
- 示例:
@RabbitListener(queues = "example_queue", messageConverter = "jsonMessageConverter")
@RabbitHandler
@RabbitHandler
是用于标注消息处理方法的注解。它通常与 @RabbitListener
配合使用,用于处理不同类型的消息。
基本用法
- 配合类级别的
@RabbitListener
注解使用时,一个类中可以有多个@RabbitHandler
方法,具体执行哪个方法取决于消息体的类型。
@RabbitListener(queues = "example_queue")
public class ExampleListener {
@RabbitHandler
public void handleTextMessage(String message) {
System.out.println("Received text: " + message);
}
@RabbitHandler
public void handleOrderMessage(Order order) {
System.out.println("Received order: " + order);
}
@RabbitHandler
public void handleDefault(Object message) {
System.out.println("Received generic: " + message);
}
}
在上面的代码中:
- 如果消息体是
String
类型,调用handleTextMessage
。 - 如果消息体是
Order
类型,调用handleOrderMessage
。 - 其他类型的消息将调用
handleDefault
。
@RabbitHandler 的规则
- 一个类可以有多个
@RabbitHandler
方法。 - 必须与类级别的
@RabbitListener
一起使用。 - 根据消息体的类型或内容自动选择处理方法。
两个注解的对比
特性 | @RabbitListener | @RabbitHandler |
---|---|---|
标注位置 | 类级别或方法级别 | 方法级别 |
作用 | 定义队列监听逻辑 | 定义具体的消息处理逻辑 |
是否支持多类型消息 | 不支持(直接监听的方法只能处理一种类型的消息) | 支持(根据类型选择不同的处理方法) |
典型用法 | 配置队列绑定、容器工厂等 | 在同一个类中处理多种类型的消息 |
@RabbitListener
:用于定义监听队列的逻辑,支持队列绑定、并发配置等。
@RabbitHandler
:用于定义消息的具体处理逻辑,可以根据消息类型实现多态处理。
推荐用法:
- 如果消息只有一种类型,直接在方法级别使用
@RabbitListener
。 - 如果消息有多种类型,在类级别使用
@RabbitListener
,并配合多个@RabbitHandler
实现不同类型消息的处理逻辑。
springboot中各模式的应用
新建springboot项目,pom相关依赖项如下
spring-boot版本:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.10</version>
</parent>
其他依赖
<!-- 核心启动器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- web启动器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- amqp支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
简单模式
yaml
只需配置rabbitmq的基本连接信息即可
server:
port: 8084
spring:
application:
name: RabbitMQ-Demo
rabbitmq:
host: 192.168.137.200
port: 5672
username: admin
password: admin
virtual-host: /
配置类
新建一个配置类,使用spring的方式声明一个简单模式队列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
//简单模式队列
@Bean("simpleQueue")
public Queue SimpleQueue() {
return QueueBuilder
.durable("simple_queue") //持久化一个叫simple_queue的队列
.build();
}
}
生产者
创建消息对象
Message
,通过构造方法将消息内容及消息属性MessageProperties
存入其中,并使用RabbitTemplate
模板发布消息
springboot会根据yaml的配置自动创建连接工厂ConnectionFactory
, RabbitTemplate
默认使用ConnectionFactory
来连接rabbitmq服务进行消息发布
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
@RestController
@RequestMapping("send")
public class SimpleProducer {
//注入模板
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("simple")
public void execute(@RequestParam String data){
//MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
MessageProperties messageProperties = new MessageProperties();
//设置消息类型为文本: text/plain
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//消息对象
Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);
/**
* convertAndSend():将指定的消息对象转换为 AMQP 消息并发送到指定的交换机,支持通过指定的路由键进行消息路由。
* 1.三个参数从左往右依次为交换机、路由键、消息对象
* 2.在使用简单、工作、广播模式时不需要指定交换机,使用的是默认的交换机(AMQP default),
* 路由键处填写队列名即可(会自动匹配队列)
* 3.direct、topic等模式则填写对应的交换机和路由键
* */
rabbitTemplate.convertAndSend("","simple_queue",message);
}
}
消费者
@RabbitListener
注解可以指定要监听的队列,@RabbitHandler
用于标记方法为消息处理方法。一旦监听的队列有消息时,监听类会自动调用消息处理方法进行处理注意:一个
@RabbitListener
注解标记的类中可以有多个@RabbitHandler
标记的方法,但参数类型不能相同。@RabbitListener
注解也可以不标记在类上,直接标记在方法上。
只要simple_queue队列中有消息存入,如下代码就会自动触发:
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 1.@RabbitListener(queues = "simple_queue")
* 需要simple_queue队列已存在
*
* 2.@RabbitListener(queuesToDeclare = @Queue(value = "simple_queue",durable = "true",autoDelete = "false"))
* 不需要simple_queue存在,没有会自动创建,且指定了队列会持久化,不自动删除
* 使用此处的注解,可以不用上面的配置类即可声明simple_queue队列
*
* 3.@RabbitHandler注解配合@RabbitListener使用,标记方法为消息处理方法。但当生产者与消费者的消息类型不一致会报错
*
* */
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "simple_queue"))
public class SimpleCustomer {
@RabbitHandler
public static void excute(String message) {
System.out.println(" 简单模式消费者接收消息: " + message + "!");
}
}
使用测试
运行主启动类,请求地址:
localhost:8084/send/simple?data=简单模式测试消息
控制台输出:
简单模式消费者接收消息: 简单模式测试消息!
工作模式
yaml
只需配置rabbitmq的基本连接信息即可
server:
port: 8084
spring:
application:
name: RabbitMQ-Demo
rabbitmq:
host: 192.168.137.200
port: 5672
username: admin
password: admin
virtual-host: /
配置类
注册工作模式队列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
//工作模式队列
@Bean("workQueue")
public Queue WorkQueue() {
return QueueBuilder
.durable("work_queue")
.build();
}
}
生产者
创建消息对象
Message
,通过构造方法将消息内容及消息属性MessageProperties
存入其中,并使用RabbitTemplate
模板发布消息
springboot会根据yaml的配置自动创建连接工厂ConnectionFactory
, RabbitTemplate
默认使用ConnectionFactory
来连接rabbitmq进行消息发布。这里循环发10条消息:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
@RestController
@RequestMapping("send")
public class WorkProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("work")
public void execute(@RequestParam String data){
//MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
MessageProperties messageProperties = new MessageProperties();
//设置消息类型为文本: text/plain
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//消息对象
Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);
//发10条消息
for (int i = 0; i < 10; i++) {
/**
* convertAndSend():将指定的消息对象转换为 AMQP 消息并发送到指定的交换机,支持通过指定的路由键进行消息路由。
* 1.在使用简单、工作、广播模式时不需要指定交换机,使用的是默认的交换机(AMQP default),路由键处填写队列名即可(自动匹配队列)
* 2.direct、topic等模式则填写对应的交换机和路由键
* */
rabbitTemplate.convertAndSend("","work_queue",message);
}
}
}
轮询模式消费
- RabbitMQ 将消息以 轮询的方式 均匀地分发给所有消费者,消息的分配模式是一个消费者分配一条,直至消息消费完成。
- 每个消费者都会轮流收到消息,而不会考虑消费者当前的工作负载。
创建两个消费者,或者一个消费者中使用两个消息处理方法
消费者一
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 1.@RabbitListener(queues = "work_queue")
* 需要work_queue队列已存在
*
* 2.@RabbitListener(queuesToDeclare = @Queue(value = "work_queue",durable = "true",autoDelete = "false"))
* 不需要work_queue存在,没有会自动创建,且指定了队列会持久化,不自动删除
*
* 3.当@RabbitListener注解在类上时,生产者与消费者的消息类型不一致会报错,此注解放在方法上则不会
*
* */
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "work_queue"))
public class WorkConsumerOne {
@RabbitHandler
public static void excute(String message) {
System.out.println(" 工作模式消费者-1接收消息: " + message + "!");
}
}
消费者二
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 1.@RabbitListener(queues = "work_queue")
* 需要work_queue队列已存在
*
* 2.@RabbitListener(queuesToDeclare = @Queue(value = "work_queue",durable = "true",autoDelete = "false"))
* 不需要work_queue存在,没有会自动创建,且指定了队列会持久化,不自动删除
*
* 3.当@RabbitListener注解在类上时,生产者与消费者的消息类型不一致会报错,此注解放在方法上则不会
*
* */
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "work_queue"))
public class WorkConsumerTwo {
@RabbitHandler
public static void excute(String message) {
System.out.println(" 工作模式消费者-2接收消息: " + message + "!");
}
}
整合多个消息处理方法的消费者,与上面两个消费者的方式二选一即可:
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 如果消息处理方法的参数类型一致,将@RabbitListener标记在方法上
* */
@Component
public class WorkConsumers {
@RabbitListener(queuesToDeclare = @Queue(value = "work_queue"))
public static void excute1(String message) {
System.out.println(" 工作模式消费者-1接收消息: " + message + "!");
}
@RabbitListener(queuesToDeclare = @Queue(value = "work_queue"))
public static void excute2(String message) {
System.out.println(" 工作模式消费者-2接收消息: " + message + "!");
}
}
使用测试
运行主启动类后,调用地址:
localhost:8084/send/work?data=工作模式测试消息
控制台输出如下,可发现两个消费者都接收到同等数量的消息,都是5个。
工作模式消费者-2接收消息: 工作模式测试消息!
工作模式消费者-1接收消息: 工作模式测试消息!
工作模式消费者-1接收消息: 工作模式测试消息!
工作模式消费者-2接收消息: 工作模式测试消息!
工作模式消费者-1接收消息: 工作模式测试消息!
工作模式消费者-2接收消息: 工作模式测试消息!
工作模式消费者-1接收消息: 工作模式测试消息!
工作模式消费者-2接收消息: 工作模式测试消息!
工作模式消费者-1接收消息: 工作模式测试消息!
工作模式消费者-2接收消息: 工作模式测试消息!
公平模式消费
- 公平分配遵循能者多劳的原则,核心是基于 消费者的繁忙程度 分发消息。
- RabbitMQ 通过 消息确认(ACK)机制 来检测消费者是否空闲。
- 如果消费者在当前未完成前一个任务,则不会分配新的任务给该消费者。
公平模式也叫能者多劳,消息处理速度快的消费者会得到更多消息
在上面的基础上,yaml增加监听配置,完成后如下
server:
port: 8084
spring:
application:
name: RabbitMQ-Demo
rabbitmq:
host: 192.168.137.200
port: 5672
username: admin
password: admin
virtual-host: /
#监听配置
listener:
simple:
prefetch: 1 # 队列一次只拿一条给消费者,限流. prefetch:1 即qos=1, 默认250
#开启消费者手动确认消息,在spring boot中提供了三种确认模式:
# NONE - 使用rabbitmq的自动确认
# AUTO - 使用rabbitmq的手动确认, springboot会自动发送确认回执 (默认)
# MANUAL - 使用rabbitmq的手动确认, 且必须手动执行确认操作
# 默认的AUTO模式中, 处理消息的方法抛出异常, 则表示消息没有被正确处理, 该消息会被重新发送.
acknowledge-mode: manual
生产者代码不变
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
@RestController
@RequestMapping("send")
public class WorkProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("work")
public void execute(@RequestParam String data){
//MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
MessageProperties messageProperties = new MessageProperties();
//设置消息类型为文本: text/plain
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//消息对象
Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);
//发10条消息
for (int i = 0; i < 10; i++) {
/**
* convertAndSend():将指定的消息对象转换为 AMQP 消息并发送到指定的交换机,支持通过指定的路由键进行消息路由。
* 1.在使用简单、工作、广播模式时不需要指定交换机,使用的是默认的交换机(AMQP default),路由键处填写队列名即可(自动匹配队列)
* 2.direct、topic等模式则填写对应的交换机和路由键
* */
rabbitTemplate.convertAndSend("","work_queue",message);
}
}
}
创建两个消费者,或者一个消费者中使用两个消息处理方法
消费者一,添加手动确认方法,并加入线程阻塞模拟处理慢:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 1.@RabbitListener(queues = "work_queue")
* 需要work_queue队列已存在
*
* 2.@RabbitListener(queuesToDeclare = @Queue(value = "work_queue",durable = "true",autoDelete = "false"))
* 不需要work_queue存在,没有会自动创建,且指定了队列会持久化,不自动删除
*
* 3.当@RabbitListener注解在类上时,生产者与消费者的消息类型不一致会报错,此注解放在方法上则不会
*
* */
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "work_queue"))
public class WorkConsumerOne {
@RabbitHandler
public static void excute(String message, Message msg, Channel channel) throws IOException {
System.out.println(" 工作模式消费者-1接收消息: " + message + "!");
try {
/**
* 手动ack,如果此方法不执行,则此消费者会一直等待,不会接收到新消息
* */
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
//模拟让消费者1处理变慢
Thread.sleep(300);
} catch (Exception e) {
e.printStackTrace();
/**
* 出现异常则拒绝消息。
* 如果第二个参数为true,标识可以同时拒绝多个消息,false则只能一个
* 如果第三个参数为True,则会把消息返回队列重新发送
* */
channel.basicNack(msg.getMessageProperties().getDeliveryTag(),true,true);
}
}
}
消费者二,正常处理:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 1.@RabbitListener(queues = "work_queue")
* 需要work_queue队列已存在
*
* 2.@RabbitListener(queuesToDeclare = @Queue(value = "work_queue",durable = "true",autoDelete = "false"))
* 不需要work_queue存在,没有会自动创建,且指定了队列会持久化,不自动删除
*
* 3.当@RabbitListener注解在类上时,生产者与消费者的消息类型不一致会报错,此注解放在方法上则不会
*
* */
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "work_queue"))
public class WorkConsumerTwo {
@RabbitHandler
public static void excute(String message, Message msg, Channel channel) throws IOException {
System.out.println(" 工作模式消费者-2接收消息: " + message + "!");
try {
/**
* 手动ack,如果此方法不执行,则此消费者会一直等待,不会接收到新消息
* */
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
/**
* 出现异常则拒绝消息。
* 如果第二个参数为true,标识可以同时拒绝多个消息,false则只能一个
* 如果第三个参数为True,则会把消息返回队列重新发送
* */
channel.basicNack(msg.getMessageProperties().getDeliveryTag(),true,true);
}
}
}
整合多个消息处理方法的消费者,与上面两个消费者的方式二选一即可:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 如果消息处理方法的参数类型一致,将@RabbitListener标记在方法上
* */
@Component
public class WorkConsumers {
@RabbitListener(queuesToDeclare = @Queue(value = "work_queue"))
public static void excute1(String message, Message msg, Channel channel) throws IOException {
System.out.println(" 工作模式消费者-1接收消息: " + message + "!");
try {
/**
* 手动ack,如果此方法不执行,则此消费者会一直等待,不会接收到新消息
* */
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
//模拟让消费者1处理变慢
Thread.sleep(300);
} catch (Exception e) {
e.printStackTrace();
/**
* 出现异常则拒绝消息。
* 如果第二个参数为true,标识可以同时拒绝多个消息,false则只能一个
* 如果第三个参数为True,则会把消息返回队列重新发送
* */
channel.basicNack(msg.getMessageProperties().getDeliveryTag(),true,true);
}
}
@RabbitListener(queuesToDeclare = @Queue(value = "work_queue"))
public static void excute2(String message, Message msg, Channel channel) throws IOException {
System.out.println(" 工作模式消费者-2接收消息: " + message + "!");
try {
/**
* 手动ack,如果此方法不执行,则此消费者会一直等待,不会接收到新消息
* */
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
/**
* 出现异常则拒绝消息。
* 如果第二个参数为true,标识可以同时拒绝多个消息,false则只能一个
* 如果第三个参数为True,则会把消息返回队列重新发送
* */
channel.basicNack(msg.getMessageProperties().getDeliveryTag(),true,true);
}
}
}
使用测试
运行主启动类后,调用地址:
localhost:8084/send/work?data=工作模式测试消息
控制台输出如下,由于消费者一添加了阻塞,所以处理慢于消费者二,消费者二会得到更多消息。
工作模式消费者-1接收消息: 工作模式测试消息!
工作模式消费者-2接收消息: 工作模式测试消息!
工作模式消费者-2接收消息: 工作模式测试消息!
工作模式消费者-2接收消息: 工作模式测试消息!
工作模式消费者-2接收消息: 工作模式测试消息!
工作模式消费者-2接收消息: 工作模式测试消息!
工作模式消费者-2接收消息: 工作模式测试消息!
工作模式消费者-2接收消息: 工作模式测试消息!
工作模式消费者-2接收消息: 工作模式测试消息!
工作模式消费者-1接收消息: 工作模式测试消息!
广播模式
yaml
server:
port: 8084
spring:
application:
name: RabbitMQ-Demo
rabbitmq:
host: 192.168.137.200
port: 5672
username: admin
password: admin
virtual-host: /
配置类
声明交换机、三个队列,并进行绑定
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
//广播模式队列
@Bean("fanoutQueue1")
public Queue fanoutQueue1() {
return QueueBuilder
.durable("fanout1_queue")
.build();
}
@Bean("fanoutQueue2")
public Queue fanoutQueue2() {
return QueueBuilder
.durable("fanout2_queue")
.build();
}
@Bean("fanoutQueue3")
public Queue fanoutQueue3() {
return QueueBuilder
.durable("fanout3_queue")
.build();
}
//广播模式交换机
@Bean("fanoutExchange")
public Exchange fanoutExchange() {
return ExchangeBuilder
.fanoutExchange("fanout_exchange")
.durable(true)
.build();
}
//绑定fanout模式交换机与fanout模式队列
@Bean
public Binding fanoutBinding1() {
return new Binding(
"fanout1_queue", //队列名
Binding.DestinationType.QUEUE, //指定绑定的目标类型为队列
"fanout_exchange", //交换机名
"", //路由键,广播模式不需要路由键
null //额外参数,可用于配置死信队列的绑定等
);
}
@Bean
public Binding fanoutBinding2() {
return new Binding(
"fanout2_queue", //队列名
Binding.DestinationType.QUEUE, //指定绑定的目标类型为队列
"fanout_exchange", //交换机名
"", //路由键,广播模式不需要路由键
null //额外参数,可用于配置死信队列的绑定等
);
}
@Bean
public Binding fanoutBinding3() {
return new Binding(
"fanout3_queue", //队列名
Binding.DestinationType.QUEUE, //指定绑定的目标类型为队列
"fanout_exchange", //交换机名
"", //路由键,广播模式不需要路由键,空字符即可
null //额外参数,可用于配置死信队列的绑定等
);
}
}
生产者
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
@RestController
@RequestMapping("send")
public class FanoutProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("fanout")
public void execute(@RequestParam String data){
//MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
MessageProperties messageProperties = new MessageProperties();
//设置消息类型为文本: text/plain
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//消息对象
Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);
/**
* convertAndSend():将指定的消息对象转换为 AMQP 消息并发送到指定的交换机,支持通过指定的路由键进行消息路由。
* 这里指定交换机为配置类中声明的,路由键传空字符串是因为广播模式不需要路由键
* */
rabbitTemplate.convertAndSend("fanout_exchange","",message);
}
}
消费者
可将多个消费者整合到一个类中,或者为每个消费者单独定义一个类。两种二选一使用即可
整合多个消费者到一个类
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutConsumer {
// @RabbitListener(bindings = @QueueBinding(
// value = @Queue(value = "fanout1_queue"), //声明的队列
// exchange = @Exchange(value = "fanout_exchange",type = "fanout") //声明的交换机,需指定类型,否则默认为direct
// ))
//与上面注释掉的注解二选一使用即可,使用上面注释掉的注解则可以不用在配置类中配置队列、交换机以及绑定关系,该注解会帮我们完成注册
@RabbitListener(queues = "fanout1_queue")
public void receive1(String message) {
System.out.println("广播模式消费者1接收到消息:" + message);
}
// @RabbitListener(bindings = @QueueBinding(
// value = @Queue(value = "fanout2_queue"), //声明的队列
// exchange = @Exchange(value = "fanout_exchange",type = "fanout") //声明的交换机,需指定类型,否则默认为direct
// ))
//与上面注释掉的注解二选一使用即可,使用上面注释掉的注解则可以不用在配置类中配置队列、交换机以及绑定关系,该注解会帮我们完成注册
@RabbitListener(queues = "fanout2_queue")
public void receive2(String message) {
System.out.println("广播模式消费者2接收到消息:" + message);
}
// @RabbitListener(bindings = @QueueBinding(
// value = @Queue(value = "fanout3_queue"), //声明的队列
// exchange = @Exchange(value = "fanout_exchange",type = "fanout") //声明的交换机,需指定类型,否则默认为direct
// ))
//与上面注释掉的注解二选一使用即可,使用上面注释掉的注解则可以不用在配置类中配置队列、交换机以及绑定关系,该注解会帮我们完成注册
@RabbitListener(queues = "fanout3_queue")
public void receive3(String message) {
System.out.println("广播模式消费者3接收到消息:" + message);
}
}
或为每个消费者单独定义一个类
@RabbitListener(queues = "fanout1_queue")
@Component
public class FanoutConsumerOne {
@RabbitHandler
public void receive(String message) {
System.out.println("广播模式消费者1接收到消息:" + message);
}
}
@RabbitListener(queues = "fanout2_queue")
@Component
public class FanoutConsumerTwo {
@RabbitHandler
public void receive(String message) {
System.out.println("广播模式消费者2接收到消息:" + message);
}
}
@RabbitListener(queues = "fanout3_queue")
@Component
public class FanoutConsumerThree {
@RabbitHandler
public void receive(String message) {
System.out.println("广播模式消费者3接收到消息:" + message);
}
}
使用测试
运行主启动类后,调用地址:
localhost:8084/send/fanout?data=广播模式测试消息
控制台输出如下,每个消费者都得到了消息。
广播模式消费者2接收到消息:广播模式测试消息
广播模式消费者3接收到消息:广播模式测试消息
广播模式消费者1接收到消息:广播模式测试消息
Direct模式
yaml
配置基本连接信息
server:
port: 8084
spring:
application:
name: RabbitMQ-Demo
rabbitmq:
host: 192.168.137.200
port: 5672
username: admin
password: admin
virtual-host: /
配置类
使用
ExchangeBuilder.directExchange()
声明direct模式交换机
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
//direct模式交换机
@Bean("directExchange")
public Exchange DirectExchange() {
return ExchangeBuilder
.directExchange("direct_exchange")
.durable(true)
.build();
}
//direct模式队列
@Bean("directQueue1")
public Queue DirectQueue1() {
return QueueBuilder
.durable("direct1_queue")
.build();
}
@Bean("directQueue2")
public Queue DirectQueue2() {
return QueueBuilder
.durable("direct2_queue")
.build();
}
//绑定direct模式交换机与direct模式队列
@Bean
public Binding directBinding1() {
return new Binding(
"direct1_queue", //队列名
Binding.DestinationType.QUEUE, //指定绑定的目标类型为队列
"direct_exchange", //交换机名
"info", //路由键
null //额外参数,可用于配置死信队列的绑定
);
}
@Bean
public Binding directBinding2() {
return new Binding(
"direct2_queue", //队列名
Binding.DestinationType.QUEUE, //指定绑定的目标类型为队列
"direct_exchange", //交换机名
"error", //路由键
null //额外参数,可用于配置死信队列的绑定
);
}
}
生产者
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
@RestController
@RequestMapping("send")
public class DirectProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("direct")
public void execute(@RequestParam String data, @RequestParam String key){
//MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
MessageProperties messageProperties = new MessageProperties();
//设置消息类型为文本: text/plain
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//消息对象
Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);
/**
* convertAndSend():将指定的消息对象转换为 AMQP 消息并发送到指定的交换机,支持通过指定的路由键进行消息路由。
* 这里指定交换机为配置类中声明的,路由键由请求传入
* */
rabbitTemplate.convertAndSend("direct_exchange",key,message);
}
}
消费者
将多个消费者整合到一个类中,也可以为每一个消费者单独定义一个类,这里使用整合方式
整合方式
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DirectConsumer {
// @RabbitListener(bindings = {
// @QueueBinding(
// value = @Queue(value = "direct1_queue"), // 创建队列
// key = {"info"}, // 路由key,绑定多个键:key = {"info", "error"}
// exchange = @Exchange(name = "direct_exchange", type = "direct") //交换机,也可用ExchangeTypes.DIRECT来设置交换机类型
// )})
//与上面注释掉的注解二选一使用即可,使用上面注释掉的注解则可以不用在配置类中配置队列、交换机以及绑定关系,该注解会帮我们完成注册
@RabbitListener(queues = "direct1_queue")
public void receive1(String message) {
System.out.println("Direct模式消费者1接收到消息:" + message);
}
// @RabbitListener(bindings = {
// @QueueBinding(
// value = @Queue(value = "direct2_queue"), // 创建队列
// key = {"error"}, // 路由key
// exchange = @Exchange(name = "direct_exchange", type = "direct") //交换机,也可用ExchangeTypes.DIRECT来设置交换机类型
// )})
//与上面注释掉的注解二选一使用即可,使用上面注释掉的注解则可以不用在配置类中配置队列、交换机以及绑定关系,该注解会帮我们完成注册
@RabbitListener(queues = "direct2_queue")
public void receive2(String message) {
System.out.println("Direct模式消费者2接收到消息:" + message);
}
}
单独定义
@Component
@RabbitListener(queues = "direct1_queue")
public class DirectConsumerOne {
@RabbitHandler
public void receive1(String message) {
System.out.println("Direct模式消费者1接收到消息:" + message);
}
}
@Component
@RabbitListener(queues = "direct2_queue")
public class DirectConsumerTwo {
@RabbitHandler
public void receive1(String message) {
System.out.println("Direct模式消费者2接收到消息:" + message);
}
}
使用测试
运行主启动类
调用如下地址:
localhost:8084/send/direct?data=路由模式测试消息&key=info
因为消费者一所监听的队列绑定交换机的路由键为info
,所以控制台输出:
Direct模式消费者1接收到消息:路由模式测试消息
再调用如下地址:
localhost:8084/send/direct?data=路由模式测试消息&key=error
因为消费者二所监听的队列绑定交换机的路由键为error
,所以控制台输出:
Direct模式消费者2接收到消息:路由模式测试消息
topic模式
yaml
server:
port: 8084
spring:
application:
name: RabbitMQ-Demo
rabbitmq:
host: 192.168.137.200
port: 5672
username: admin
password: admin
virtual-host: /
配置类
使用
ExchangeBuilder.topicExchange()
声明topic模式交换机
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
//topic模式交换机
@Bean("topicExchange")
public Exchange topicExchange() {
return ExchangeBuilder
.topicExchange("topic_exchange")
.durable(true)
.build();
}
//topic模式队列
@Bean("topicQueue1")
public Queue topicQueue1() {
return QueueBuilder
.durable("topic1_queue")
.build();
}
@Bean("topicQueue2")
public Queue topicQueue2() {
return QueueBuilder
.durable("topic2_queue")
.build();
}
//绑定topic模式交换机与topic模式队列
@Bean
public Binding topicBinding1() {
return new Binding(
"topic1_queue", //队列
Binding.DestinationType.QUEUE, //指定绑定的目标类型为队列
"topic_exchange", //交换机
"#.message.#", //路由键
null //额外参数,可用于配置死信队列的绑定
);
}
@Bean
public Binding topicBinding2() {
return new Binding(
"topic2_queue", //队列
Binding.DestinationType.QUEUE, //指定绑定的目标类型为队列
"topic_exchange", //交换机
"message.*", //路由键
null //额外参数,可用于配置死信队列的绑定
);
}
}
生产者
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
@RestController
@RequestMapping("send")
public class TopicProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("topic")
public void execute(@RequestParam String data, @RequestParam String key){
//MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
MessageProperties messageProperties = new MessageProperties();
//设置消息类型为文本: text/plain
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//消息对象
Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);
/**
* convertAndSend():将指定的消息对象转换为 AMQP 消息并发送到指定的交换机,支持通过指定的路由键进行消息路由。
* 这里指定交换机为配置类中声明的,路由键由请求传入
* */
rabbitTemplate.convertAndSend("topic_exchange",key,message);
}
}
消费者
将多个消费者整合到一个类中,也可以为每一个消费者单独定义一个类,这里使用整合方式
整合方式:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicConsumer {
// @RabbitListener(bindings = {
// @QueueBinding(
// value = @Queue(value = "topic1_queue"), // 创建队列
// key = {"#.message.#"}, // 路由key
// exchange = @Exchange(name = "topic_exchange", type = "topic") //交换机
// )})
//与上面注释掉的注解二选一使用即可,使用上面注释掉的注解则可以不用在配置类中配置队列、交换机以及绑定关系
@RabbitListener(queues = "topic1_queue")
public void receive1(String message) {
System.out.println("topic模式消费者1接收到消息:" + message);
}
// @RabbitListener(bindings = {
// @QueueBinding(
// value = @Queue(value = "topic2_queue"), // 创建队列
// key = {"message.*"}, // 路由key
// exchange = @Exchange(name = "topic_exchange", type = "topic") //交换机
// )})
//与上面注释掉的注解二选一使用即可,使用上面注释掉的注解则可以不用在配置类中配置队列、交换机以及绑定关系
@RabbitListener(queues = "topic2_queue")
public void receive2(String message) {
System.out.println("topic模式消费者2接收到消息:" + message);
}
}
为每个消费者单独定义一个类的方式:
//消费者1
@Component
public class TopicConsumerOne {
@RabbitListener(queues = "topic1_queue")
public void receive1(String message) {
System.out.println("topic模式消费者1接收到消息:" + message);
}
}
//消费者2
@Component
public class TopicConsumerTwo {
@RabbitListener(queues = "topic2_queue")
public void receive2(String message) {
System.out.println("topic模式消费者2接收到消息:" + message);
}
}
使用测试
topic
模式的通配符匹配规则:
*
: 匹配不多不少恰好1个词#
: 匹配零个、一个或多个词
运行主启动类,当调用地址为:
localhost:8084/send/topic?data=路由模式测试消息&key=message.info
请求传入的路由键为message.info,消费者1绑定的路由键为**#.message.#,消费者2绑定的路由键为message.***,二者皆可匹配,所以控制台输出:
topic模式消费者1接收到消息:路由模式测试消息
topic模式消费者2接收到消息:路由模式测试消息
当把请求修改为如下地址再次调用时,将只有消费者1可以匹配:
localhost:8084/send/topic?data=路由模式测试消息&key=info.message
控制台只会输出:
topic模式消费者1接收到消息:路由模式测试消息
rpc模式
rpc模式配置步骤
-
创建rpc模式的消息队列queue
-
配置rpc模式专用的
RabbitTemplate
模板,为其指定**回调队列(可使用临时队列或固定队列)**并设置CorrelationId
-
配置监听器
MessageListenerContainer
监听回调队列 -
生产者使用
RabbitTemplate
模板的sendAndReceive
方法来发布消息并等待接收回调消息 -
消费者接收消息后,获取
CorrelationId
并设置为回调消息的属性,然后使用RabbitTemplate
模板的send
方法返回回调消息
yaml
server:
port: 8084
spring:
application:
name: RabbitMQ-Demo
rabbitmq:
host: 192.168.137.200
port: 5672
username: admin
password: admin
virtual-host: /
#监听配置
listener:
simple:
#开启消费者手动确认消息,在spring boot中提供了三种确认模式:
# NONE - 使用rabbitmq的自动确认
# AUTO - 使用rabbitmq的手动确认, springboot会自动发送确认回执 (默认)
# MANUAL - 使用rabbitmq的手动确认, 且必须手动执行确认操作
# 默认的AUTO模式中, 处理消息的方法抛出异常, 则表示消息没有被正确处理, 该消息会被重新发送.
acknowledge-mode: manual
配置类
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Autowired
private ConnectionFactory connectionFactory;
//rpc模式队列
@Bean("rpcQueue")
public Queue rpcQueue() {
return QueueBuilder
.durable("rpc_queue")
.build();
}
//用于rpc模式进行消息回调的队列
@Bean("replyQueue")
public Queue replyQueue() {
return QueueBuilder
.durable("reply_queue")
.build();
}
@Bean
public RabbitTemplate replyTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//决定是否使用临时队列接收回调消息。设置为 false 使用固定队列。
rabbitTemplate.setUseTemporaryReplyQueues(false);
//指定固定回调队列,配合 setUseTemporaryReplyQueues(false) 使用。
rabbitTemplate.setReplyAddress("reply_queue");
//开启CorrelationId自动关联到消息的功能,如不设置为true则需要手动创建CorrelationData对象并关联到消息上
rabbitTemplate.setUserCorrelationId(true);
//设置等待回调消息的超时时间,超时后抛出异常。
rabbitTemplate.setReplyTimeout(60000);
return rabbitTemplate;
}
/**
* RPC回调队列监听器,监听reply_queue回调队列,接收消费者的回调消息并将其交付给生产者的 sendAndReceive 方法。
*
**/
@Bean
public SimpleMessageListenerContainer createReplyListenerContainer() {
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory);
listenerContainer.setQueueNames("reply_queue");
listenerContainer.setMessageListener(replyTemplate());
return listenerContainer;
// listenerContainer.setConsumerBatchEnabled(true);//设置消费多线程处理
// listenerContainer.setDeBatchingEnabled(true);//开启多线程处理
// listenerContainer.setBatchSize(500);//设置监听器一次批量处理的消息数量
// listenerContainer.setConcurrentConsumers(500); //设置线程数
// listenerContainer.setMaxConcurrentConsumers(1000); //最大线程数
}
}
生产者
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@RestController
@RequestMapping("send")
public class RpcProducer {
//使用注册的rpc模式模板
@Autowired
private RabbitTemplate replyTemplate;
@GetMapping("rpc")
public void execute(@RequestParam String data){
//MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
MessageProperties messageProperties = new MessageProperties();
//设置消息类型为文本: text/plain
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//设置消息的回调id,作用是标识此消息来自于哪个生产者,以便消费者进行回调时,通过此id将消息返回给来源的生产者
messageProperties.setCorrelationId(UUID.randomUUID().toString());
//消息对象
Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);
/**
* sendAndReceive:向rpc队列投递消息并同步等待消费结果。
* 从左到右参数为:
* 交换机:rpc模式可不用指定交换机,这里传空字符传
* 路由键:设置为rpc模式的队列名
* 消息内容
* 回调id:用于标识消息来自那个生产者,没有此id消息将无法回调给生产者
* */
Message returnMsg = replyTemplate.sendAndReceive("","rpc_queue",message);
/**
* 此处与配置类 rabbitTemplate.setUserCorrelationId(true) 的设置相关联,如果未设置true或设置了false,
* 则需要使用如下方式创建CorrelationData对象,并将此对象传入发送方法的第四个参数
* */
//CorrelationData correlationData = new CorrelationData();
//correlationData.setId(messageProperties.getCorrelationId());
//Message returnMsg = replyTemplate.sendAndReceive("","rpc_queue",message,correlationData);
//输出消费者回调回来的消息
System.out.println("生产者接收回调消息:"+new String(returnMsg.getBody()));
}
}
消费者
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@Component
public class RpcConsumer {
@Autowired
private RabbitTemplate replyTemplate;
@RabbitListener(queues = "rpc_queue")
public void receive(String message, Message msg, Channel channel) throws IOException {
//接收来自生产者发布到队列的消息
System.out.println("rpc模式消费者接收到消息:" + message);
try{
//封装一个消费者返回给生产者的消息,也就是rpc的回调消息
MessageProperties messageProperties = new MessageProperties();
//要从消息中取出生产者发布消息时设置的CorrelationId,此id标识消息来自哪个生产者,回调时会根据此id来给对应生产者返回消息
messageProperties.setCorrelationId(msg.getMessageProperties().getCorrelationId());
Message reMsg = new Message("返回给生产者的rpc回调消息".getBytes(StandardCharsets.UTF_8),messageProperties);
//回调方法,指定路由键为配置类中注册的回调队列
replyTemplate.send("","reply_queue",reMsg);
//手动确认消息接收成功
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
}catch(Exception e){
//出现异常就拒绝消息
channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);
}
}
}
使用测试
运行主启动类,调用地址:
localhost:8084/send/rpc?data=rpc模式测试消息
控制台输出如下,消费者输出了消息,生产者也接收到了回调:
rpc模式消费者接收到消息:rpc模式测试消息
生产者接收回调消息:返回给生产者的rpc回调消息
死信队列
yaml
server:
port: 8084
spring:
application:
name: RabbitMQ-Demo
rabbitmq:
host: 192.168.137.200
port: 5672
username: admin
password: admin
virtual-host: /
#监听配置
listener:
simple:
#开启消费者手动确认消息,在spring boot中提供了三种确认模式:
# NONE - 使用rabbitmq的自动确认
# AUTO - 使用rabbitmq的手动确认, springboot会自动发送确认回执 (默认)
# MANUAL - 使用rabbitmq的手动确认, 且必须手动执行确认操作
# 默认的AUTO模式中, 处理消息的方法抛出异常, 则表示消息没有被正确处理, 该消息会被重新发送.
acknowledge-mode: manual
配置类
注册普通队列时使用
deadLetterExchange()
为其指定死信交换机
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
/**
* 用于存放超时消息的死信队列
* */
@Bean("deadQueue")
public Queue DeadQueue() {
return QueueBuilder.durable("dead_queue").build();
}
/**
* 死信交换机
* */
@Bean("deadExchange")
public Exchange deadLetterExchange() {
return ExchangeBuilder.directExchange("dead_exchange").durable(true).build();
}
/**
* 绑定死信交换机与死信队列
* */
@Bean
public Binding deadLetterBinding() {
return new Binding(
"dead_queue",
Binding.DestinationType.QUEUE,
"dead_exchange",
"dead-key",
null
);
}
/**
* 创建普通队列,并绑定死信交换机,用于消息过期后把消息放入死信队列
*
* */
@Bean("normalQueue")
public Queue normalQueue() {
return QueueBuilder
.durable("normal_queue")
//设置队列内所有消息的超时时间为5秒,并指定此队列的死信交换机
.ttl(5000).deadLetterExchange("dead_exchange").deadLetterRoutingKey("dead-key")
.build();
}
}
生产者
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
@RestController
@RequestMapping("send")
public class DlxProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("dlx")
public void execute(@RequestParam String data){
//MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
MessageProperties messageProperties = new MessageProperties();
//设置消息类型为文本: text/plain
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//消息对象
Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);
/**
* convertAndSend():将指定的消息对象转换为 AMQP 消息并发送到指定的交换机,支持通过指定的路由键进行消息路由。
* 1.在使用简单、工作、广播模式时不需要指定交换机,使用的是默认的交换机(AMQP default),路由键处填写队列名即可(自动匹配队列)
* 2.direct、topic等模式则填写对应的交换机和路由键
* */
rabbitTemplate.convertAndSend("","normal_queue",message);
}
}
普通队列消费者
用于监听普通队列的消息,手动制造异常来让消息被拒绝,并投放到死信队列中
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import java.io.IOException;
@Component
public class NormalConsumer {
// @RabbitListener(
// queuesToDeclare = @Queue(
// value = "normal_queue", // 普通队列名称
// durable = "true", // 队列持久化
// autoDelete = "false", // 不自动删除
// arguments = { // 配置死信队列的参数, 也可以设置其他队列属性
// @Argument(name = "x-dead-letter-exchange", value = "dead_exchange"), // 指定死信交换机
// @Argument(name = "x-dead-letter-routing-key", value = "dead-key"), // 指定死信路由键
// @Argument(name = "x-message-ttl", value = "5000", type = "java.lang.Integer") // 设置消息过期时间为5秒
// }
// )
// )
//与上面的注解二选一使用,使用上面的注解后,可以不用再配置类中进行normal_queueu队列的配置,但死信队列的配置不能去掉
@RabbitListener(queues = "normal_queue")
public void receive1(String message, Message msg, Channel channel) throws IOException {
try{
//制造一个异常模拟消费者拒绝消息
int i = 1/0;
//手动确认消息接收成功
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
}catch(Exception e){
System.out.println("出现异常,消费者拒绝接收消息: " + message+",异常信息为:"+e.getMessage());
//出现异常就拒绝消息
channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);
}
}
}
死信队列消费者
用于监听死信队列,当死信队列中存在消息时,会被此消费者消费
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DlxConsumer {
@RabbitListener(queues = "dead_queue")
public void receive1(String message) {
System.out.println("死信队列消费者接收到消息: " + message);
}
}
使用测试
运行主启动类,调用地址
localhost:8084/send/dlx?data=死信队列的测试消息
控制台输出:
出现异常,消费者拒绝接收消息: 死信队列的测试消息,异常信息为:/ by zero
死信队列消费者接收到消息: 死信队列的测试消息
消息会被普通消费者拒绝,存入死信队列并被死信队列的消费者消费
消息可靠性保证
在 RabbitMQ 中,为了保证消息的可靠性,主要通过以下机制来实现消息从生产者到消费者的可靠传递:
- 消息确认机制,用于保证消息可以从生产者投递到交换机中。
- 消息回退机制,用于保证消息可以从交换机投递到队列中。
- 消息持久化,确保消息在 RabbitMQ 中持久化存储,不会因服务崩溃或重启而丢失。
- 消费者的消息确认,消费者通过
basicAck
、basicNack
等方法手动确认或拒绝消息。 - 死信队列,消息被消费者拒绝并指定不重新入队列、消息在队列中过期、队列达到最大长度限制 等三种情况会将消息投入死信队列,可通过监听死信队列对失效消息进行处理
- 事务机制,使用 RabbitMQ 提供的事务机制(
channel.txSelect()
,txCommit()
,txRollback()
),但性能开销较大已不建议使用,使用 消息回退与确认机制 替代 事务机制,性能更优。 - 消息重试,使用Spring AMQP提供的重试机制,在消息消费失败后重新发送给消费者。
关于消费者的消息确认(在工作模式示例中)、死信队列已在上面展示过不再进行展示,事务机制已基本不用也不进行展示。下面主要演示消息确认回退机制、消息持久化、消息重试
消息确认机制示例
yaml
server:
port: 8084
spring:
application:
name: RabbitMQ-Demo
rabbitmq:
host: 192.168.137.200
port: 5672
username: admin
password: admin
virtual-host: /
#开启消息确认机制,保证消息可从生产者正确投递到交换机中
publisher-confirm-type: correlated
在 Spring Boot 中,publisher-confirm-type
是用于配置 RabbitMQ 的生产者确认模式的属性,决定了 RabbitMQ 如何处理生产者发布消息的确认逻辑。以下是该配置的详细解析:
配置可选值
1. NONE
(默认值)
- 说明:
- 不启用生产者发布确认机制。
- 生产者不会收到任何消息投递的成功或失败回调。
- 适用场景:
- 对消息可靠性要求不高,或者使用其他机制保障可靠性。
- 优缺点:
- 优点:性能高,没有额外的回调开销。
- 缺点:如果消息丢失或未成功投递,生产者无法感知。
2. CORRELATED
- 说明:
- 启用简单的发布确认模式。
- 配合
ConfirmCallback
方法,可以监听消息是否成功到达交换机。
- 适用场景:
- 需要知道消息是否成功到达交换机。
- 不需要追踪消息的详细流转状态。
- 优缺点:
- 优点:简单易用,适合大多数场景。
- 缺点:无法区分消息是否成功路由到队列。
3. SIMPLE
-
说明:
- 启用完整的发布确认机制。
- 配合
ConfirmCallback
和ReturnCallback
方法,可以监听消息是否成功到达交换机以及是否成功路由到队列。 - 消息可靠性更高。
-
适用场景:
- 对消息可靠性要求较高的场景。
- 需要捕获消息未成功路由到队列的详细信息。
-
优缺点:
- 优点:功能强大,适用于复杂的业务场景。
- 缺点:需要额外处理
ReturnCallback
的逻辑,稍复杂。
配置类
注册一个队列和带有消息确认函数的
rabbitmq
模板,配置类中注释掉的交换机及绑定先不要放开,以观察测试效果。
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.nio.charset.StandardCharsets;
@Configuration
public class QueueConfig {
//连接工厂
@Autowired
private ConnectionFactory connectionFactory;
//direct模式队列
@Bean("confirmCallQueue")
public Queue confirmCallQueue() {
return QueueBuilder.durable("confirmCall_queue").build();
}
// @Bean("confirmCallExchange")
// public Exchange confirmCallExchange() {
// return ExchangeBuilder.directExchange("confirmCall_exchange").durable(true).build();
// }
//
// @Bean
// public Binding confirmCallBinding() {
// return new Binding("confirmCall_queue", Binding.DestinationType.QUEUE, "confirmCall_exchange", "confirmCall-key", null);
// }
//配置带有消息确认函数的rabbitmq模板
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
/**
* 消息确认方法:
* 如果消息未成功从消费者到达交换机,ack参数为false,cause为失败原因
* correlationData中含有回调消息,但需要在生产者代码中手动添加才可在此处获取,可从中定义并取出消息内容等投递失败的具体信息。可取参数如下:
* 1.message:被退回的消息本体,包含消息的内容和消息属性
* 2.replyCode:RabbitMQ返回的退回代码,表示消息退回的原因,通过此代码可以判断失败类型
* 3.replyText:RabbitMQ返回的退回描述文本。用于补充说明消息退回的具体原因。
* 4.exchange:消息被投递的交换机名称
* 5.routingKey:消息被投递时使用的路由键
*
* */
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if(ack){
System.out.println("消息已正确发送到交换机");
}else{
ReturnedMessage rm = correlationData.getReturned();
Message message = rm.getMessage();
//从消息中获取消息属性
MessageProperties messageProperties = message.getMessageProperties();
//消息内容
String data = new String(message.getBody(), StandardCharsets.UTF_8);
//返回的退回代码:
// 312: No route. 消息无法路由到任何队列。
// 313: Message expired. 消息在队列中过期。
// 404: Exchange not found. 指定的交换机不存在。
int replyCode = rm.getReplyCode();
//RabbitMQ 返回的退回描述文本。用于补充说明消息退回的具体原因。配合 replyCode 提供更清晰的退回原因描述。
//"NO_ROUTE": 消息无法路由到任何队列。
//"MESSAGE_EXPIRED": 消息过期。
String replyText = rm.getReplyText();
//消息被投递的交换机名称
String exchange = rm.getExchange();
//消息被投递时使用的路由键
String routingKey = rm.getRoutingKey();
System.out.println("消息未正确发送到交换机,失败原因是:"+cause+"\n投递失败消息的消息内容为:"+data+", 退回代码为:"+replyCode+", 退回描述文本为:"+replyText+", "+
"交换机名称为:"+exchange+", 路由键是:"+routingKey);
}
});
return rabbitTemplate;
}
}
生产者
指定一个不存在的交换机,以测试消息确认机制是否生效
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@RestController
@RequestMapping("send")
public class ConfirmProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("confirm")
public void execute(@RequestParam String data, @RequestParam String key){
//MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
MessageProperties messageProperties = new MessageProperties();
//设置消息类型为文本: text/plain
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//消息对象
Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);
//关联消息确认方法中的correlationData参数,为其提供确认回调信息
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//ReturnedMessage构造参数从左到右分别为:消息内容message、退回代码replyCode、退回描述replyText、队列名或路由键routingKey
correlationData.setReturned(new ReturnedMessage(message,404,"NOT_FOUND","",key));
//发送时,由于配置类中注释掉了交换机,这里相当于是指定了一个不存在的交换机,以测试消息确认机制生效结果
rabbitTemplate.convertAndSend("confirmCall_exchange",key,message,correlationData);
}
}
消费者
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "confirmCall_queue")
public class ConfirmConsumer {
@RabbitHandler
public void receive1(String message) {
System.out.println("消费者接收到消息:" + message);
}
}
使用测试
运行主启动类
当消息未成功发送到交换机的情况
调用地址:
localhost:8084/send/confirm?data=生产者消息确认机制的测试消息&key=confirmCall-key
控制台输出具体错误信息,可以看到,在确认方法ConfirmCallback
的cause
参数中,已包含了reply-code
、reply-text
等信息
消息未正确发送到交换机,失败原因是:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirmCall_exchange' in vhost '/', class-id=60, method-id=40)
投递失败消息的消息内容为:生产者消息确认机制的测试消息, 退回代码为:404, 退回描述文本为:NOT_FOUND, 交换机名称为:, 路由键是:confirmCall-key
消息成功发送到交换机的情况
在这里放开配置类中注释的如下部分:
@Bean("confirmCallExchange")
public Exchange confirmCallExchange() {
return ExchangeBuilder.directExchange("confirmCall_exchange").durable(true).build();
}
@Bean
public Binding confirmCallBinding() {
return new Binding("confirmCall_exchange", Binding.DestinationType.QUEUE, "confirmCall_queue", "confirmCall-key", null);
}
然后再次调用地址:
localhost:8084/send/confirm?data=生产者消息确认机制的测试消息&key=confirmCall-key
控制台输出:
消息已正确发送到交换机
消费者接收到消息:生产者消息确认机制的测试消息
因为注释放开,交换机已存在,所以消息被成功路由并被消费者消费掉。
消息回退机制示例
yaml
server:
port: 8084
spring:
application:
name: RabbitMQ-Demo
rabbitmq:
host: 192.168.137.200
port: 5672
username: admin
password: admin
virtual-host: /
#开启消息退回机制,保证消息可从交换机投递到队列中
publisher-returns: true
template:
# 启用强制路由(与 ReturnCallback 配合使用)
mandatory: true
publisher-returns: true
的作用
启用消息的退回机制(ReturnCallback)。当消息无法成功路由到队列时(例如路由键不匹配,或队列不存在),RabbitMQ 会触发 ReturnCallback
回调方法,通知生产者消息未能被路由。
适用场景:
- 交换机到队列的路由失败场景。
- 消息丢失防护:通过
ReturnCallback
机制,可以捕获未被路由的消息并处理(例如重新发送或记录日志)。
mandatory
的作用
- 如果
mandatory
为true
,消息在无法路由到队列时,会触发ReturnCallback
。 - 如果
mandatory
为false
,消息无法路由时将被丢弃,且生产者不会收到任何通知。
配置类
注册一个交换机和带有消息回退函数的
rabbitmq
模板,配置类中注释掉的队列及绑定先不要放开,以观察测试效果。
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
//连接工厂
@Autowired
private ConnectionFactory connectionFactory;
//direct模式交换机
@Bean("returnsCallExchange")
public Exchange returnsCallExchange() {
return ExchangeBuilder.directExchange("returnsCall_exchange").durable(true).build();
}
//direct模式队列
// @Bean("returnsCallQueue")
// public Queue returnsCallCallQueue() {
// return QueueBuilder.durable("returnsCall_queue").build();
// }
//
// @Bean
// public Binding returnsCallBinding() {
// return new Binding("returnsCall_queue", Binding.DestinationType.QUEUE, "returnsCall_exchange", "returnsCall-key", null);
// }
//配置带有消息回退函数的rabbitmq模板
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 配置 ReturnCallback,returnedMessage中包含退回信息
rabbitTemplate.setReturnsCallback(returnedMessage -> {
// 获取退回消息的相关信息:
//原始消息内容,包含消息体和消息属性。
String messageBody = new String(returnedMessage.getMessage().getBody());
//RabbitMQ 的退回代码,指示失败的具体原因。
int replyCode = returnedMessage.getReplyCode();
//RabbitMQ 提供的退回描述信息。
String replyText = returnedMessage.getReplyText();
//消息被发送到的交换机名称。
String exchange = returnedMessage.getExchange();
//消息使用的路由键。
String routingKey = returnedMessage.getRoutingKey();
System.out.println("消息退回:");
System.out.println("内容:" + messageBody);
System.out.println("退回码:" + replyCode);
System.out.println("原因:" + replyText);
System.out.println("交换机:" + exchange);
System.out.println("路由键:" + routingKey);
// 可在此处理退回的消息,比如记录日志或重发等
});
// 设置消息强制路由
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
}
生产者
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
@RestController
@RequestMapping("send")
public class ReturnProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("return")
public void execute(@RequestParam String data, @RequestParam String key){
//MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
MessageProperties messageProperties = new MessageProperties();
//设置消息类型为文本: text/plain
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//消息对象
Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);
//发送时,由于配置类中注释掉了队列,这里相当于是指定了一个不存在的队列,以测试消息回退机制生效结果
rabbitTemplate.convertAndSend("returnsCall_exchange",key,message);
}
}
使用测试
测试消息未成功从交换机发送到队列的情况
运行主启动类,调用地址:
localhost:8084/send/return?data=生产者消息回退机制的测试消息&key=returnsCall-key
因为队列在配置类中被注释掉,消息无法路由,触发回退方法,所以控制台输出:
消息退回:
内容:生产者消息回退机制的测试消息
退回码:312
原因:NO_ROUTE
交换机:returnsCall_exchange
路由键:returnsCall-key
测试消息成功从交换机发送到队列的情况
放开配置类的如下注释部分:
@Bean("returnsCallQueue")
public Queue returnsCallCallQueue() {
return QueueBuilder.durable("returnsCall_queue").build();
}
@Bean
public Binding returnsCallBinding() {
return new Binding("returnsCall_queue", Binding.DestinationType.QUEUE, "returnsCall_exchange", "returnsCall-key", null);
}
并添加如下消费者:
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "returnsCall_queue")
public class ReturnConsumer {
@RabbitHandler
public void receive1(String message) {
System.out.println("消费者接收到消息:" + message);
}
}
运行主启动类,再调用地址:
localhost:8084/send/return?data=生产者消息回退机制的测试消息&key=returnsCall-key
因为队列在配置类中被注册,消息可以路由到队列中,回退方法不会被触发,消费被消费者消费,所以控制台输出:
消费者接收到消息:生产者消息回退机制的测试消息
消息确认与回退对比
回调类型 | 触发条件 | 适用场景 |
---|---|---|
ConfirmCallback | 消息是否成功到达交换机 | 判断消息是否到达交换机 |
ReturnCallback | 消息未能路由到任何队列时触发 | 捕获消息路由失败的详细信息 |
消息持久化示例
消息持久化是指 RabbitMQ 将消息存储到磁盘,而不是仅保存在内存中。持久化的消息即使在 RabbitMQ 服务重启后仍然可用。
需要配置持久化的对象
- 交换机(Exchange)持久化。
- 队列(Queue)持久化。
- 消息本身持久化。
yaml
消息持久化只需要基本连接配置信息即可,无需额外配置
server:
port: 8084
spring:
application:
name: RabbitMQ-Demo
rabbitmq:
host: 192.168.137.200
port: 5672
username: admin
password: admin
virtual-host: /
配置类
配置消息持久化,需要先配置其使用的交换机及队列为持久化(简单模式等未使用交换机的模式可以不用配交换机持久化,因为其使用默认交换机)
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public Queue durableQueue() {
return QueueBuilder
.durable("durable_queue")//队列持久化
.build();
}
@Bean
public DirectExchange durableExchange() {
return ExchangeBuilder.directExchange("durable_exchange")
.durable(true) // 交换机持久化
.build();
}
//绑定交换机与队列
@Bean
public Binding durableBinding() {
return new Binding(
"durable_queue", //队列名
Binding.DestinationType.QUEUE, //指定绑定的目标类型为队列
"durable_exchange", //交换机名
"durable-key", //路由键
null //额外参数
);
}
}
生产者
调用消息属性的
setDeliveryMode(MessageDeliveryMode.PERSISTENT)
为消息指定持久化,MessageDeliveryMode.PERSISTENT
对应的值为2
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
@RestController
@RequestMapping("send")
public class DurableProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("durable")
public void execute(@RequestParam String data, @RequestParam String key){
//MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
MessageProperties messageProperties = new MessageProperties();
//设置消息类型为文本: text/plain
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//设置消息持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//消息对象
Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);
/**
* convertAndSend():将指定的消息对象转换为 AMQP 消息并发送到指定的交换机,支持通过指定的路由键进行消息路由。
* 这里指定交换机为配置类中声明的,路由键由请求传入
* */
rabbitTemplate.convertAndSend("durable_exchange",key,message);
}
}
使用测试
运行主启动类,调用地址:
localhost:8084/send/durable?data=持久化的测试消息&key=durable-key
访问管理页面,地址为你的ip:15672:
http://192.168.137.200:15672/#/queues
查看队列,有1条未消费消息:
点击队列名进入队列详情页面,点击Get messgae
按钮,查看消息,其持久化属性已被设置(蓝色框):
使用linux命令systemctl restart rabbitmq-server
重启rabbitmq服务,再回到管理页面查看,消息依然存在,持久化成功:
消息重试示例
为什么需要消息重试?
- 消费者处理消息时可能遇到短暂性错误(如服务不可用、网络波动等)。
- 重试可以在一定时间内自动再次尝试处理消息,避免消息丢失。
注意事项
- 消息重试针对的是消费者端在处理消息时出现异常、未能正确消费的情况,在消息未确认之前,RabbitMQ 会将消息重新投递到消费者。如果是消费者手动拒绝(执行
basicReject
或basicNack
方法)则不会触发重试机制。 - 如果超过最大重试次数,消息会被丢弃或转入死信队列(如果配置了死信队列)。
yaml
server:
port: 8084
spring:
application:
name: RabbitMQ-Demo
rabbitmq:
host: 192.168.137.200
port: 5672
username: admin
password: admin
virtual-host: /
#在监听配置中开启消息重试
listener:
simple:
retry:
enabled: true # 开启消费者重试
max-attempts: 5 # 最大重试次数
initial-interval: 3000 # 重试间隔时间(毫秒)
#multiplier: 2.0 # 重试间隔的倍增因子,大于0时,下一次重试的延迟时间为:重试间隔时间 * 倍增因子
#max-interval: 10000 # 最大重试间隔时间(毫秒)
生产者
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
@RestController
@RequestMapping("send")
public class RetryProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("retry")
public void execute(@RequestParam String data){
//MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
MessageProperties messageProperties = new MessageProperties();
//设置消息类型为文本: text/plain
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//消息对象
Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);
/**
* convertAndSend():将指定的消息对象转换为 AMQP 消息并发送到指定的交换机,支持通过指定的路由键进行消息路由。
* 1.在使用简单、工作、广播模式时不需要指定交换机,使用的是默认的交换机(AMQP default),路由键处填写队列名即可(自动匹配队列)
* 2.direct、topic等模式则填写对应的交换机和路由键
* */
rabbitTemplate.convertAndSend("","retry_queue",message);
}
}
消费者
简化配置,不在配置类中以bean的形式创建队列,直接在消费者中使用注解声明
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Component
//如果retry_queue队列不存在会自动创建
@RabbitListener(queuesToDeclare = @Queue(value = "retry_queue"))
public class RetryCustomer {
@RabbitHandler
public static void excute(String message) {
System.out.println(
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
+"处理消息:" + message
);
System.out.println();
int i = 1/0;
}
}
使用测试
启动后,调用地址:
localhost:8084/send/retry?data=消息重试机制的测试消息
消费者控制台会输出5次处理信息,然后抛出异常并丢弃队列中的消息(不想丢弃可以结合死信队列处理):
2024-11-21 08:54:42处理消息:消息重试机制的测试消息
2024-11-21 08:54:45处理消息:消息重试机制的测试消息
2024-11-21 08:54:48处理消息:消息重试机制的测试消息
2024-11-21 08:54:51处理消息:消息重试机制的测试消息
2024-11-21 08:54:54处理消息:消息重试机制的测试消息
消息优先级
消息优先级(Message Priority)是 RabbitMQ 提供的一种机制,用于在消费者消费消息时,根据消息的重要性或优先级先处理高优先级的消息。
RabbitMQ 消息优先级的特点
- 队列优先级声明:
- 消息优先级在 RabbitMQ 中是以队列为单位配置的,所以要先在队列上开启优先级配置。
- 一个队列可以设置一个最大优先级(
x-max-priority
),即优先级范围是0
到最大优先级(通常不超过255
)。
- 消息优先级范围:
- 消息优先级默认为
0
。 - 如果队列没有声明优先级,所有消息按先进先出(FIFO)处理。
- 消息优先级默认为
- 优先级策略:
- 消息优先级越高,越早被消费者处理。
- 优先级设置得过大可能会增加内存消耗和队列扫描的性能开销。
- 工作机制:
- 队列在存储消息时,会根据优先级对消息进行排序,高优先级的消息先被处理。
注意事项
- 优先级对性能的影响:
- 优先级队列可能会影响性能,因为 RabbitMQ 需要维护一个优先级排序。
- 推荐在实际业务中设置适当的优先级范围,如
0
到10
。
- 队列必须声明支持优先级:
- 如果队列未设置
x-max-priority
,即使消息设置了优先级也不会生效。
- 如果队列未设置
- 优先级范围:
- 如果消息的优先级超出队列的
x-max-priority
,则优先级被限制在最大值。
- 如果消息的优先级超出队列的
- 消息排序机制:
- RabbitMQ 并非严格意义的优先级队列,高优先级消息并不总是最先被处理,只能保证“尽量优先”。
yaml
server:
port: 8084
spring:
application:
name: RabbitMQ-Demo
rabbitmq:
host: 192.168.137.200
port: 5672
username: admin
password: admin
virtual-host: /
配置类
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
//简单模式队列
@Bean("priorityQueue")
public Queue priorityQueue() {
return QueueBuilder
.durable("priority_queue") //持久化一个叫priority_queue的队列
.maxPriority(10) //开启优先级配置,设置优先级范围为:0 - 10
.build();
}
}
生产者
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
@RestController
@RequestMapping("send")
public class PriorityProducer {
//注入模板
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("priority")
public void execute(@RequestParam String data, @RequestParam Integer number){
//MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
MessageProperties messageProperties = new MessageProperties();
//设置消息类型为文本: text/plain
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//设置消息的优先级
messageProperties.setPriority(number);
//消息对象
Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);
//发布消息
rabbitTemplate.convertAndSend("","priority_queue",message);
}
}
消费者
需要注意的是,在生产者产生消息的速度大于消费者消费速度、且有消息堆积的情况下,设置队列及消息优先级才有意义,否则,生产者刚生产一条就被消费者消费了,无法体现优先级。
这里先注释掉@RabbitListener
注解,让消息堆积在队列中,以测试优先级效果:
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class PriorityCustomer {
//@RabbitListener(queues = "priority_queue")
public static void excute(String message) {
System.out.println(" Priority消费者接收到消息: " + message + "!");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
使用测试
运行主启动类,分别调用下面地址(参数number代表消息的优先级大小,传入生产者代码),产生3条消息存于队列中:
localhost:8084/send/priority?data=优先级测试消息1&number=1
localhost:8084/send/priority?data=优先级测试消息2&number=2
localhost:8084/send/priority?data=优先级测试消息3&number=3
发送后看管理界面,看到有3条消息在队列中等待,蓝色的Pri
标志代表队列是优先级队列
然后放开消费者@RabbitListener
注解的注释,重启springboot主启动类查看控制台输出:
Priority消费者接收到消息: 优先级测试消息3!
Priority消费者接收到消息: 优先级测试消息2!
Priority消费者接收到消息: 优先级测试消息1!
优先级数字大的先被消费,所以打印顺序为3、2、1。
延迟队列
延迟队列的概念
延迟队列是一种特殊的消息队列,消息会在特定时间之后被投递给消费者,而不是立即投递。它广泛应用于以下场景:
- 订单未支付自动取消。
- 定时任务调度。
- 延迟消息通知。
实现延迟队列的方法
方法一:使用 RabbitMQ 插件实现延迟队列
RabbitMQ 提供了 Delayed Message Plugin 插件,支持基于交换机的延迟消息功能。
方法二:使用 TTL + 死信队列实现延迟队列
-
TTL(Time-To-Live):
- 为消息或队列设置生存时间,消息超过生存时间后会变成死信。
-
死信队列:
- 死信消息会被转发到绑定的死信交换机,再由死信交换机路由到指定的队列。
两种方法对比
方法 | 优点 | 缺点 |
---|---|---|
TTL + 死信队列 | 不依赖插件,兼容性强。 | 实现复杂,无法实现单条消息不同延迟时间。 |
Delayed Message Plugin | 配置简单,支持单条消息设置不同延迟时间。 | 依赖插件,需要额外安装,可能存在兼容性问题。 |
- TTL + 死信队列适合场景较为固定的延迟需求,如所有消息有相同的延迟时间。
- Delayed Message Plugin更灵活,适合需要动态设置消息延迟时间的场景。
- 根据业务需求选择合适的实现方式,同时注意性能影响和插件的兼容性。
插件方式实现
插件下载
访问地址:
https://www.rabbitmq.com/community-plugins#routing
点击下图红框处的Releases
跳转github后,选择对应的版本下载,我这里选择当前最新版4.0.2的.ez文件 (ez文件对应linux系统):
将下载后的ez文件放入/usr/lib/rabbitmq/lib/rabbitmq_server-4.0.3/plugins
目录中,注意4.0.3是版本号,你的可能不同。
放入后,使用以下命令启用插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
运行结果
[root@Centos7 ~]# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@localhost:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@localhost...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange
started 1 plugins.
验证插件是否启用成功:
rabbitmq-plugins list
在输出中找到 rabbitmq_delayed_message_exchange
,并确保其状态为 [E*]
(已启用)。
[root@Centos7 ~]# rabbitmq-plugins list
Listing plugins with pattern ".*" ...
Configured: E = explicitly enabled; e = implicitly enabled
| Status: * = running on rabbit@localhost
|/
[ ] rabbitmq_amqp1_0 4.0.3
[ ] rabbitmq_auth_backend_cache 4.0.3
[ ] rabbitmq_auth_backend_http 4.0.3
[ ] rabbitmq_auth_backend_ldap 4.0.3
[ ] rabbitmq_auth_backend_oauth2 4.0.3
[ ] rabbitmq_auth_mechanism_ssl 4.0.3
[ ] rabbitmq_consistent_hash_exchange 4.0.3
[E*] rabbitmq_delayed_message_exchange 4.0.2
[ ] rabbitmq_event_exchange 4.0.3
[ ] rabbitmq_federation 4.0.3
[ ] rabbitmq_federation_management 4.0.3
[ ] rabbitmq_federation_prometheus 4.0.3
[ ] rabbitmq_jms_topic_exchange 4.0.3
[E*] rabbitmq_management 4.0.3
[e*] rabbitmq_management_agent 4.0.3
[ ] rabbitmq_mqtt 4.0.3
[ ] rabbitmq_peer_discovery_aws 4.0.3
[ ] rabbitmq_peer_discovery_common 4.0.3
[ ] rabbitmq_peer_discovery_consul 4.0.3
[ ] rabbitmq_peer_discovery_etcd 4.0.3
[ ] rabbitmq_peer_discovery_k8s 4.0.3
[ ] rabbitmq_prometheus 4.0.3
[ ] rabbitmq_random_exchange 4.0.3
[ ] rabbitmq_recent_history_exchange 4.0.3
[ ] rabbitmq_sharding 4.0.3
[ ] rabbitmq_shovel 4.0.3
[ ] rabbitmq_shovel_management 4.0.3
[ ] rabbitmq_shovel_prometheus 4.0.3
[ ] rabbitmq_stomp 4.0.3
[ ] rabbitmq_stream 4.0.3
[ ] rabbitmq_stream_management 4.0.3
[ ] rabbitmq_top 4.0.3
[ ] rabbitmq_tracing 4.0.3
[ ] rabbitmq_trust_store 4.0.3
[e*] rabbitmq_web_dispatch 4.0.3
[ ] rabbitmq_web_mqtt 4.0.3
[ ] rabbitmq_web_mqtt_examples 4.0.3
[ ] rabbitmq_web_stomp 4.0.3
[ ] rabbitmq_web_stomp_examples 4.0.3
重启rabbitmq服务
使用如下命令重启
systemctl restart rabbitmq-server
重启后,下面编写代码测试。
yaml
server:
port: 8084
spring:
application:
name: RabbitMQ-Demo
rabbitmq:
host: 192.168.137.200
port: 5672
username: admin
password: admin
virtual-host: /
配置类
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class QueueConfig {
// 配置延迟交换机
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> arguments = new HashMap<>();
//延迟参数
arguments.put("x-delayed-type", "direct");
return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, arguments);
}
// 延迟队列
@Bean
public Queue delayedQueue() {
return QueueBuilder
.durable("delayed_queue") //持久化一个叫simple_queue的队列
.build();
}
// 绑定延迟队列和交换机
@Bean
public Binding delayedBinding() {
return BindingBuilder.bind(delayedQueue())
.to(delayedExchange())
.with("delayed_key")
.noargs();
// 或使用如下名称注册方式
// return new Binding(
// "delayed_queue", //队列名
// Binding.DestinationType.QUEUE, //指定绑定的目标类型为队列
// "delayed_exchange", //交换机名
// "delayed_key", //路由键,广播模式不需要路由键
// null //额外参数
// );
}
}
生产者
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@RestController
@RequestMapping("send")
public class DelayedMessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("delayed")
public void sendDelayedMessage(@RequestParam String data) {
//MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
MessageProperties messageProperties = new MessageProperties();
//设置延迟时间(毫秒),这里为10秒的发送延迟
messageProperties.setDelay(10000);
//也可以使用这种方式设置
//messageProperties.setHeader("x-delay", 10000); // 设置延迟时间(毫秒)
//设置消息类型为文本: text/plain
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//消息对象
Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.convertAndSend("delayed_exchange", "delayed_key", message);
System.out.println("时间点"+LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
+",生产者发送延迟消息:" + data);
}
}
消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Component
public class DelayedMessageConsumer {
@RabbitListener(queues = "delayed_queue")
public void receiveMessage(String message) {
System.out.println("时间点"+LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
+",消费者接收到延迟消息:" + message);
}
}
使用测试
运行代码,访问mq管理页面
ip:15672
,看到多出了一个带有DM
标识的交换机,即为新加的延迟交换机
然后调用地址
localhost:8084/send/delayed?data=这是一条延迟队列测试消息
控制台输出如下,注意要等待10秒才会输出消费者消息,因为生产者代码中设置了10秒的发送延迟:
时间点2024-11-21 14:38:27,生产者发送延迟消息:这是一条延迟队列测试消息
时间点2024-11-21 14:38:37,消费者接收到延迟消息:这是一条延迟队列测试消息
死信队列实现延迟
yaml
配置基本信息即可
server:
port: 8084
spring:
application:
name: RabbitMQ-Demo
rabbitmq:
host: 192.168.137.200
port: 5672
username: admin
password: admin
virtual-host: /
配置类
注册普通队列、死信队列、死信交换机及绑定关系,注意为普通队列指定死信队列是要设置超时时间,以实现延迟
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
/**
* 用于存放超时消息的死信队列
* */
@Bean("deadDelayQueue")
public Queue DeadQueue() {
return QueueBuilder.durable("dead_delay_queue").build();
}
/**
* 死信交换机
* */
@Bean("deadDelayExchange")
public Exchange deadDelayExchange() {
return ExchangeBuilder.directExchange("dead_delay_exchange").durable(true).build();
}
/**
* 绑定死信交换机与死信队列
* */
@Bean
public Binding deadLetterBinding() {
return new Binding(
"dead_delay_queue",
Binding.DestinationType.QUEUE,
"dead_delay_exchange",
"dead-delay-key",
null
);
}
/**
* 创建普通队列,并绑定死信交换机,用于消息过期后把消息放入死信队列
*
* */
@Bean("normalDelayQueue")
public Queue normalDelayQueue() {
return QueueBuilder
.durable("normal_delay_queue")
//设置队列内所有消息的超时时间为5秒
.ttl(5000)
//指定此队列的死信交换机及路由键
.deadLetterExchange("dead_delay_exchange").deadLetterRoutingKey("dead-delay-key")
.build();
}
}
生产者
直接向普通队列中发布消息
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@RestController
@RequestMapping("send")
public class DeadDelayedProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("deadDelayed")
public void sendDelayedMessage(@RequestParam String data) {
//MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
MessageProperties messageProperties = new MessageProperties();
//设置消息类型为文本: text/plain
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//消息对象
Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);
//向普通队列投放消息
rabbitTemplate.convertAndSend("", "normal_delay_queue", message);
System.out.println("时间点"+LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
+",死信生产者发送延迟消息:" + data);
}
}
死信队列消费者
监听死信队列来消费延迟消息,之所以不创建普通队列消费者,是为了让投递到普通队列的消息无法被消费,进而达到过期时间被放入死信队列,来让死信消费者消费。这样就以间接方式实现了延迟队列(5秒延迟)
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Component
public class DeadDelayedConsumer {
@RabbitListener(queues = "dead_delay_queue")
public void receiveMessage(String message) {
System.out.println("时间点"+LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
+",死信队列消费者接收到延迟消息:" + message);
}
}
使用测试
运行代码后,请求地址:
localhost:8084/send/deadDelayed?data=这是一条死信队列实现方式的延迟队列测试消息
控制台输出如下,需等待5秒显示完毕:
时间点2024-11-21 15:37:59,死信生产者发送延迟消息:这是一条死信队列实现方式的延迟队列测试消息
时间点2024-11-21 15:38:04,死信队列消费者接收到延迟消息:这是一条死信队列实现方式的延迟队列测试消息
可见两者输出间隔5秒,实现了延迟发送。
消息唯一性
在 RabbitMQ 中,确保消息唯一性是为了防止重复消息的处理,RabbitMQ自身没有去重机制,但通常可使用消息的唯一标识来实现。下面列举一个简单的例子:
RabbitMQ 的消息属性中包含一个
Message ID
字段,可以为每条消息指定一个全局唯一的 ID,用于标识消息。
生产者设置 Message ID
:
在消息发送时,生产者为每条消息生成一个唯一的 ID(如 UUID)。
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
Message message = new Message("message content".getBytes(), messageProperties);
rabbitTemplate.send("exchange_name", "routing_key", message);
消费者校验唯一性:
消费者处理消息时,检查 Message ID
是否已处理过。可使用 Redis、数据库等存储已处理的 Message ID
,并在处理前进行去重校验。
if (!redis.contains(message.getMessageProperties().getMessageId())) {
// 处理消息
processMessage(message);
// 将消息ID存入Redis
redis.save(message.getMessageProperties().getMessageId());
} else {
// 忽略重复消息
System.out.println("Duplicate message ignored");
}
优点:
- 消息的唯一性完全由
Message ID
确保,与 RabbitMQ 的内部机制无关。 - 适合高并发场景。
注意事项:
- 消息的
Message ID
必须全局唯一,可以使用 UUID 或数据库的唯一键生成。 - 消费者的去重存储系统(如 Redis)的容量和性能要能够满足业务需求。
发布与消费自定义类型消息
在使用 RabbitTemplate
发布消息时,可以将自定义类型的对象进行序列化并发送,而不是直接发送字符串。以下是实现的步骤和关键点:
yaml
配置基本连接信息即可
server:
port: 8084
spring:
application:
name: RabbitMQ-Demo
rabbitmq:
host: 192.168.137.200
port: 5672
username: admin
password: admin
virtual-host: /
配置类
配置消息的序列化机制
RabbitTemplate
默认使用 SimpleMessageConverter
将消息转为字符串。为了支持自定义对象类型,可以配置其他 MessageConverter
,例如 Jackson2JsonMessageConverter
,将对象序列化为 JSON 格式:
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Autowired
private ConnectionFactory connectionFactory;
//声明队列
@Bean("serializeQueue")
public Queue serializeQueue() {
return QueueBuilder.durable("serialize_queue").build();
}
//配置序列化器
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
//配置模板,指定序列化器
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return rabbitTemplate;
}
}
定义自定义对象
定义要发送的自定义对象,确保其可被序列化(通过 Jackson
或其他方式)。
import com.fasterxml.jackson.annotation.JsonProperty;
public class CustomMessage {
@JsonProperty("id")
private String id;
@JsonProperty("content")
private String content;
public CustomMessage(String id, String content) {
this.id = id;
this.content = content;
}
public CustomMessage() {}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
生产者
发布自定义对象,通过
RabbitTemplate
发送消息时,直接传递自定义对象即可。
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("send")
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("serialize")
public void sendMessage() {
CustomMessage message = new CustomMessage("12345", "Hello RabbitMQ!");
rabbitTemplate.convertAndSend("", "serialize_queue", message);
System.out.println("生产者发送消息: " + message);
}
}
消费者
接收自定义对象,确保消费者可以正确反序列化消息。
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
@RabbitListener(queues = "serialize_queue")
public void receiveMessage(CustomMessage message) {
System.out.println("消费者接受消息,消息id: " + message.getId() + ", 消息content: " + message.getContent());
}
}
注意事项
-
类型匹配:
- 消费者方法的参数类型必须与发送的对象类型一致,否则会抛出序列化/反序列化错误。
-
依赖配置:
-
如果使用
Jackson2JsonMessageConverter
,请确保Jackson
的相关依赖已引入:<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.x.x</version> </dependency>
-
-
序列化格式:
- 默认情况下,
Jackson2JsonMessageConverter
会将对象序列化为 JSON 格式并设置content-type
为application/json
。
- 默认情况下,
-
其他序列化方式:
- 如果需要使用其他序列化机制,可以自定义实现
MessageConverter
。
- 如果需要使用其他序列化机制,可以自定义实现
使用测试
启动代码后调用地址:
localhost:8084/send/serialize
控制台输出:
生产者发送消息: com.Serializable.CustomMessage@4c9be371
消费者接受消息,消息id: 12345, 消息content: Hello RabbitMQ!
通过上述配置和代码,自定义类型的对象可以轻松在 RabbitTemplate
中传递并消费。