RabbitMQ之旅(2)
相信自己,终会成功
目录
Spring连接RabbitMQ
1.引入依赖
2.配置rabbitmq
3.生产者(简单模式)
@Bean 将方法的返回值交给 Spring 容器管理
作用
4.消费者
5.测试
消息确认机制(用于控制消息的确认行为)
持久性
发送方确认
重试机制(在消息处理失败时,自动重试)
死信队列(DLQ处理无法被正常消费的消息)
设置TTL
事务支持(确保消息的发送和确认在事务中执行)
消息分发 决定了消息如何从生产者传递到消费者
Spring连接RabbitMQ
1.引入依赖
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
2.配置rabbitmq
在yml中配置
spring:
# application:
# name=rabbit-extensions-demo
rabbitmq:
addresses: amqp://用户名:密码@云服务器IP地址:5672/虚拟主机
listener:
simple:
# acknowledge-mode: none #消息接收确认
# acknowledge-mode: auto #消息接收确认
acknowledge-mode: manual #消息接收确认
prefetch: 1 #预取数量
retry:
enabled: true # 开启消费者失败重试
initial-interval: 5000ms # 初始失败等待时长为5秒
max-attempts: 5 # 最大重试次数
# publisher-confirm-type: correlated #消息发送确认
properties
配置
spring.rabbitmq.host=云服务器IP地址
spring.rabbitmq.port=5672
spring.rabbitmq.username=用户名
spring.rabbitmq.password=密码
3.生产者(简单模式)
声明队列和交换机(使用Spring AMQP中的 QueueBuilder
和 ExchangeBuilder
)
//声明常量
public class Constants {
public static final String ACK_QUEUE = "ack.queue";
public static final String ACK_EXCHANGE = "ack.exchange";
}
@Configuration
public class RabbitMQConfig {
//声明队列
@Bean("ackQueue")
public Queue ackQueue(){
return QueueBuilder.durable(Constants.ACK_QUEUE).build();
}
//声明交换机
@Bean("directExchange")
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();
}
// 将队列绑定到交换机,并指定路由键(Routing Key)
// @Bean("ackBinding")
// public Binding ackBinding(Exchange directExchange, Queue queue){
// return BindingBuilder.bind(queue).to(directExchange).with("ack").noargs();
// }
// 将队列绑定到交换机,并指定路由键(Routing Key)
// directExchange 的类型是 DirectExchange,这是一个具体的交换机类型。
@Bean("ackBinding")
public Binding ackBinding(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("ackQueue") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("ack");
}
}
// BindingBuilder.bind(queue).to(directExchange).with("ack");
// 这里没有调用 noargs(),因为 DirectExchange 的绑定不需要额外的参数。
// 由于 directExchange 是 DirectExchange 类型,这段代码明确指定了交换机的类型,适用于 Direct 类型的交换机。
- BindingBuilder.bind(queue).to(directExchange).with("ack")
bind(queue)
:指定要绑定的队列t
o(directExchange)
:指定要绑定的交换机with("ack")
:指定路由键(Routing Key)noargs()
:表示不设置额外的绑定参数(可选)。
@Configuration在类方法上面的注解
作用定义 Bean:在配置类中,可以使用 @Bean
注解来定义 Spring 容器管理的 Bean。替代 XML 配置:@Configuration
注解是 Java 配置的方式,可以完全替代传统的 XML 配置文件。模块化配置:可以将不同的配置分散到多个配置类中,便于管理和维护。
@Configuration
与@Bean
的关系
@Configuration
类中的@Bean
方法会被 Spring 拦截,确保每次调用返回的是同一个 Bean 实例(单例模式)。如果去掉@Configuration
注解,@Bean
方法将不会被 Spring 代理,每次调用都会返回一个新的实例。
@Configuration
与@Component
的区别
@Configuration
:用于定义配置类,通常包含@Bean
注解的方法,用于显式定义 Bean。@Component
:用于标记普通的组件类(如 Service、Repository 等),Spring 会自动扫描并注册为 Bean。
指定一个接口,接口的 value 元素可用于在使用注释时传递特定值
指定调用路径
@Resource(name="rabbitTemplate")
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ack")
public String ack() {
rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, "ack", "consumer ack mode test...");
return "消息发送成功";
}
@Resource
是 Java 的注解,用于依赖注入 . name="rabbitTemplate"
表示注入一个名为 rabbitTemplate
的 Bean。RabbitTemplate
对象,它是 Spring AMQP 提供的一个工具类,用于与 RabbitMQ 进行交互(发送和接收消息)。convertAndSend
是 RabbitTemplate
的一个方法,用于将消息发送到指定的交换器
public void convertAndSend(String exchange, String routingKey, Object object) throws AmqpException {
this.convertAndSend(exchange, routingKey, object, (CorrelationData)null);
}
//String exchange交换机的名字
//String routingKey 指定的路由键,决定消息是如何从路由器到队列
//Object object 指定发送的内容
QueueBuilder.durable(Constants.ACK_QUEUE).build()
:
使用 QueueBuilder
创建一个持久化的队列。durable(true)
表示队列是持久化的,即使 RabbitMQ 服务器重启,队列也不会丢失。Constants.ACK_QUEUE
是一个常量,表示队列的名称。
ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build()
:
使用 ExchangeBuilder
创建一个直连交换机(Direct Exchange)。Constants.ACK_EXCHANGE
是一个常量,表示交换机的名称。
@Bean 将方法的返回值交给 Spring 容器管理
作用
-
将对象交给 Spring 管理:
-
被
@Bean
注解标记的方法,其返回值会被 Spring 容器注册为一个 Bean。 -
这个 Bean 可以被其他组件(如
@Autowired
)注入和使用。
-
-
控制 Bean 的生命周期:
-
Spring 容器会管理 Bean 的创建、初始化、销毁等生命周期。
-
可以通过
@Bean
的initMethod
和destroyMethod
属性指定初始化和销毁方法。
-
-
自定义 Bean 的名称:
-
默认情况下,Bean 的名称是方法名。
-
可以通过
@Bean("自定义名称")
指定 Bean 的名称。
-
4.消费者
@Configuration
public class AckListerner {
//@RabbitListener 表示是一个消息监听器,在这个代码中表示监听Constants.ACK_QUEUE
@RabbitListener(queues = Constants.ACK_QUEUE)
public void handMessage(Message message, Channel channel) throws IOException {
//每次从队列中消费一条消息时,RabbitMQ都会为该消息分配一个唯一的deliveryTag。
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//消费者逻辑
System.out.printf("接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
//进行业务逻辑处理
System.out.println("业务逻辑处理");
System.out.println("业务处理完成");
// 这是对消息的肯定确认(acknowledgment),表示消息已经被成功处理。
// deliveryTag指定要确认的消息。
// false表示不进行批量确认,只确认当前消息。
channel.basicAck(deliveryTag,false);
} catch (Exception e) {
// deliveryTag指定要否定的消息。
// false表示不进行批量否定确认。
// true表示将消息重新放回队列,以便稍后重新处理。
channel.basicNack(deliveryTag, false, true);
}
}
}
5.测试
在RabbitMQ可视化界面中可以看到新建的交换机和队列
deliverytag正常从1开始,因为先演示的生产者,队列中存在一条数据,重新启动程序后,消费者自动消费了数据,但是因为我没截图,只好重新发送一条
消费成功后的队列
消息确认机制(用于控制消息的确认行为)
Spring提供的AcknowledgeMode
AcknowledgeMode.NONE
这种模式下,消息一旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ就会自动确认消息,从Rabbit MQ队列中移除消息,如果消费者处理消息失效,消息可能会丢失
AcknowledgeMode.AUTO(默认)
这种模式下,消费者在消息处理成功时会自动确认消息,但如果处理过程中抛出了异常,则不会确认消息
AcknowledgeMode.MANUAL
这种模式下,消费者必须在成功处理消息后显式调用basicAck方法来确认消息,如果消息未被确认,RabbitMQ会认为消息尚未被成功处理,并且会在消费者可用时重新投递该消息,这种模式提供了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,而是可以被重新处理
持久性
交换机持久化
队列持久化
消息的持久化
消息是存储在队列中,所以消息的持久化,需要队列持久化+消息持久化
如果只设置了队列持久化,MQ重启后,消息就会丢失
如果只设置了消息的持久化,MQ重启后,队列会丢失,消息也会丢失
//消息非持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
//消息持久化
// message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
-
getMessageProperties()
:获取消息的属性对象(MessageProperties
)。 -
MessageDeliveryMode.PERSISTENT
:-
消息会被持久化到磁盘。
-
即使 RabbitMQ 服务器重启,消息也不会丢失。
-
适用于需要确保消息不丢失的场景。
-
-
MessageDeliveryMode.NON_PERSISTENT
:-
消息不会被持久化到磁盘,仅存储在内存中。
-
如果 RabbitMQ 服务器重启,消息会丢失。
-
适用于对消息可靠性要求不高的场景,性能更高。
-
队列/交换机 | 队列/消息(交换机默认持久化) | |
持久化/持久化 | 都不会消失 | 都不会消失 |
非持久化/非持久化 | 都会消失 | 都消失 |
持久化/非持久化 | 都不会消失 | 队列不会消失,消息会消失 |
非持久化/持久化 | 都消失 | 都消失 |
发送方确认
在 RabbitMQ 中,发送方确认(Publisher Confirms) 是一种确保消息成功到达 RabbitMQ 服务器的机制。通过启用发送方确认,生产者可以知道消息是否被 RabbitMQ 成功接收。如果消息未被接收,生产者可以进行重试或其他处理。
生产者发送消息到 RabbitMQ 服务器。RabbitMQ 服务器接收消息后,会向生产者发送一个确认(ack)或未确认(nack)的信号。生产者通过回调方法处理确认或未确认的信号。
1.confirm模式
通常在计算机编程、系统交互或业务流程等场景中,confirm 模式指的是一种确认机制。
public String confirm() {
CorrelationData correlationData = new CorrelationData("1");
confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm111", "confirm test...", correlationData);
return "消息发送成功";
}
CorrelationData 的作用
CorrelationData("1")
:初始化CorrelationData
,并为其设置一个唯一标识符(例如"1"
)
消息关联:
CorrelationData
用于将消息与其确认状态关联起来。每个消息可以有一个唯一的
CorrelationData
对象。确认回调:
当 RabbitMQ 服务器确认收到消息时,会触发确认回调。
在回调中,可以通过
CorrelationData
识别是哪条消息被确认。
@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//设置回调方法
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("执行了confirm方法");
if (ack){
System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());
}else {
System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);
//相应的业务处理
}
}
});
//消息被退回时, 回调方法
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("消息退回:"+returned);
}
});
return rabbitTemplate;
}
消息确认回调(Confirm Callback):
public interface ConfirmCallback {
void confirm(@Nullable CorrelationData var1, boolean var2, @Nullable String var3);
}
-
CorrelationData correlationData
:-
关联数据,用于标识消息。
-
可以是
null
,表示没有关联数据。
-
-
boolean ack
:-
表示消息是否被成功确认。
-
true
:消息被 RabbitMQ 服务器成功接收。 -
false
:消息未被 RabbitMQ 服务器接收。
-
-
String cause
:-
如果
ack
为false
,表示未确认的原因。 -
如果
ack
为true
,通常为null
。
-
public void setReturnsCallback(ReturnsCallback returnCallback) {
Assert.state(this.returnsCallback == null || this.returnsCallback.equals(returnCallback), "Only one ReturnCallback is supported by each RabbitTemplate");
this.returnsCallback = returnCallback;
}
消息退回回调(Returns Callback)
通过 setMandatory(true)
设置强制消息退回
如果消息无法路由到队列,RabbitMQ 会将消息退回给生产者。
这么设置存在两个问题
1.这种方式设置ConfirmCallback影响所有使用RabbitTemple的方法
2.重复调用接口时会提示错误
2.return退回模式
return 退回模式主要用于将数据、控制权或流程返回到上一个状态、调用者或指定的位置。
重试机制(在消息处理失败时,自动重试)
retry:
enabled: true # 开启消费者失败重试
initial-interval: 5000ms # 初始失败等待5S
max-attempts: 5 #最大重试次数(包括自身消费的一次)
@RabbitListener(queues = Constants.RETRY_QUEUE)
// 在 RabbitMQ 中,deliveryTag 是一个用于标识消息的唯一标识符,
// 通常用于确认(acknowledge)或拒绝(reject)消息。
// 当你从 RabbitMQ 队列中消费一条消息时,
// RabbitMQ 会为该消息分配一个 deliveryTag。
// 这个标签是特定于消息所在的通道(channel)的。
public void handlerMessage(Message message) throws UnsupportedEncodingException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("["+Constants.RETRY_QUEUE+"]接收到消息: %s, deliveryTag: %s \n", new String(message.getBody(), "UTF-8"), deliveryTag);
// int num = 3/0;
// 如果消息处理成功,调用 basicAck 确认消息。
// 如果消息处理失败,调用 basicReject 拒绝消息并重新放回队列。
System.out.println("业务处理完成");
}
消息处理成功
消息处理失败(因为设定间隔5s重试一次,所以比较慢,最多重试5次结束)
死信队列(DLQ处理无法被正常消费的消息)
设置TTL
@Bean("ttlQueue2")
public Queue ttlQueue2(){
return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20000).build();
}
//还有ttlQueue1和ttlexchange就不网上写了
@Bean("ttlBinding")
public Binding ttlBinding(@Qualifier("ttlQueue1")Queue queue,@Qualifier("ttlExchange")Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();
}
@Bean("ttlBinding2")
public Binding ttlBinding2(@Qualifier("ttlQueue2")Queue queue,@Qualifier("ttlExchange")Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();
}
@Bean("ttlQueue3")
public Queue ttlQueue3(){
Map<String, Object> map = new HashMap<>();
//向 Map 中添加一个参数 x-message-ttl,表示队列中消息的存活时间(TTL)
map.put("x-message-ttl", 20000);
//withArguments(map):
//将之前定义的参数(x-message-ttl)应用到队列中。
return QueueBuilder.durable(Constants.TTL_QUEUE2).withArguments(map).build(); //设置队列的ttl为20s
}
//设置消息的TTL
@RequestMapping("ttl1")
public String ttl1(){
System.out.println("ttl1...");
rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl1 test 30s...", message -> {
message.getMessageProperties().setExpiration("30000"); //单位: 毫秒, 过期时间为30s
return message;
});
rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl1 test 10s...", message -> {
message.getMessageProperties().setExpiration("10000"); //单位: 毫秒, 过期时间为10s
return message;
});
return "消息发送成功";
}
这段代码(个人理解),设置一个队列中的过期时间,创建一个持久化队列 ,进入一个消息,消息存在时间为20s,如果消息不进行消费,则过期,进入到死信队列
20S后
ttlQueue2和ttlQueue3
二者的区别
特性 | ttlQueue2 | ttlQueue3 |
---|---|---|
实现方式 | 使用 QueueBuilder 的 ttl() 方法直接设置 TTL。 | 手动创建 Map 对象,并通过 withArguments() 设置参数。 |
代码简洁性 | 更简洁,适合只设置 TTL 的场景。 | 稍显冗长,但更灵活,适合需要设置多个参数的场景。 |
灵活性 | 只能设置 TTL,无法同时设置其他参数。 | 可以同时设置多个队列参数(如 TTL、死信队列等)。 |
底层实现 | 内部会创建一个 Map 并设置 x-message-ttl 。 | 显式创建 Map 并设置 x-message-ttl 。 |
注意:
1.设置队列的TTL(存在该队列中所有消息的TTL)
2.设置消息的TTL
假如,队列TTL是20s,消息的TTL是10s,那么消息的TTL取小值,也就是10s
如果消息的一个为30S和10s取大值
死信队列常见的3种情况
1.消息被拒绝:消费者在处理消息时,可能因为消息内容错误,处理逻辑异常等原因拒绝处理该消息,可以理解为程序不能正常运行
2.消息过期
3.队列达到最大长度
@Bean("normalQueue")
public Queue normalQueue(){
return QueueBuilder.durable(Constants.NORMAL_QUEUE)
.deadLetterExchange(Constants.DL_EXCHANGE)
.deadLetterRoutingKey("dlx")
.build();
}
事务支持(确保消息的发送和确认在事务中执行)
开启事务
@Bean("transRabbitTemplate")
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true); //开启事务
return rabbitTemplate;
}
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){
return new RabbitTransactionManager(connectionFactory);
}
@Transactional
@RequestMapping("/trans")
public String trans(){
System.out.println("trans test...");
transRabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE, "trans test 1...");
// int num = 5/0;
transRabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE, "trans test 2...");
return "消息发送成功";
}
不采用事务,第一条成功,第二条失败
采用事务,同时成功或同时失败
消息分发 决定了消息如何从生产者传递到消费者
消息分发是指消息从生产者传递到消费者的过程。RabbitMQ 提供了多种分发模式,包括直接分发、广播分发、主题分发和头部分发(详见之前的回答)。除此之外,RabbitMQ 还支持以下分发策略:
工作队列模式(Work Queue)
在多个消费者之间分发消息,RabbitMQ 默认采用 轮询(Round-Robin) 的方式将消息均匀地分发给所有消费者。
非公平分发(Non-Fair Dispatch)
默认情况下,RabbitMQ 会将消息均匀地分发给所有消费者,即使某些消费者处理速度较慢。这可能导致某些消费者积压大量消息,而其他消费者空闲。
限流(Quality of Service, QoS)
限流是指控制消费者从队列中获取消息的速率,以避免消费者过载或消息积压。RabbitMQ 通过 QoS 预取机制(Prefetch Count) 实现限流。