RabbitMQ的消息可靠性保证
文章目录
- 1.环境搭建
- 1.common-rabbitmq-starter 配置防止消费者抢消息(基础配置)
- 2.common-rabbitmq-starter-demo下创建一个生产者一个消费者
- 2.生产者可靠性
- 1.开启消息超时重试机制
- 2.生产者开启ConfirmCallback消息确认机制
- 1.application.yml
- 2.TestConfigPublisher.java
- 3.测试交换机名字写错的情况
- 3.MQ可靠性
- 1.使用LazyQueue和持久化队列结合的方式来做
- 4.消费者可靠性
- 1.消费者失败重试机制
- 1.application.yml
- 2.解释
- 2.消费者消息失败处理策略
- 1.ErrorConfiguration.java 指定错误消息发送到异常交换机
- 2.ErrorListener.java 异常队列监听器
- 3.ErrorMessageHandler.java 异常消息处理器
- 4.TestConfig.java配置
- 5.TestConfigPublisher.java 生产者
- 6.TestConfigConsumer.java 消费者故意消费失败
- 7.测试,消费失败则重试三次后到异常处理逻辑
1.环境搭建
1.common-rabbitmq-starter 配置防止消费者抢消息(基础配置)
spring:
rabbitmq:
# 消费者配置
listener:
simple:
prefetch: 1 # 每次获取一条消息,处理完再获取下一条
2.common-rabbitmq-starter-demo下创建一个生产者一个消费者
2.生产者可靠性
1.开启消息超时重试机制
# 生产者消息重试配置
template:
retry:
# 启用消息重试机制,默认为 false
enabled: true
# 初始重试间隔时间为一秒
initial-interval: 1000ms
# 重试最大次数,默认为 3 次
max-attempts: 2
# 重试的间隔倍数
# 配置 2 的话,第一次等initial-interval也就是1s,第二次等 2s,第三次等 4s
multiplier: 2
connection-timeout: 500ms # 连接超时时间500ms
2.生产者开启ConfirmCallback消息确认机制
1.application.yml
# 生产者配置
publisher-confirm-type: correlated # 发布确认类型为异步回调(一旦配置了,就必须要有回调方法)
2.TestConfigPublisher.java
package com.sunxiansheng.publisher.pub;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.UUID;
/**
* Description: 测试发布者
*
* @Author sun
* @Create 2024/12/31 19:05
* @Version 1.0
*/
@RestController
@Slf4j
public class TestConfigPublisher {
@Resource
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
public void send() {
log.info("发送消息");
// 1.创建CorrelationData对象
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
// 2.设置回调
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable throwable) {
// 基本不可能发生,因为这里的异常不是MQ问题导致的
log.error("ConfirmCallback:消息发送失败(非MQ问题):{}", throwable.getMessage());
}
@Override
public void onSuccess(CorrelationData.Confirm confirm) {
// 判断是否发送成功
if (confirm.isAck()) {
log.info("ConfirmCallback:消息发送成功:{}", confirm);
} else {
log.error("ConfirmCallback:消息发送失败:{}", confirm.getReason());
}
}
});
rabbitTemplate.convertAndSend("fanout.exchange.tesst", "", "hello rabbitmq", cd);
}
}
3.测试交换机名字写错的情况
3.MQ可靠性
1.使用LazyQueue和持久化队列结合的方式来做
/**
* 创建一个队列
*
* @return
*/
@Bean
public Queue fanoutQueueTest() {
return QueueBuilder.durable("lazyQueue") // 持久化队列
.lazy() // 惰性队列
.build();
}
持久化队列可以保存队列的元数据,重启后自动恢复,惰性队列可以将所有的消息都持久化到磁盘,内存只保留最近的2048条消息
4.消费者可靠性
1.消费者失败重试机制
1.application.yml
# 消费者配置
listener:
simple:
acknowledge-mode: auto # 自动确认模式(消费者确认机制)
retry:
enabled: true # 开启重试机制
max-attempts: 3 # 最大重试次数
initial-interval: 1000ms # 重试间隔时间
multiplier: 1.0 # 重试时间间隔倍数
stateless: false # false:有状态,true:无状态,如果是有状态的,每次重试都会发送到同一个队列
2.解释
首先开启了消费者自动确认机制,如果消息消费失败,就进行重试
2.消费者消息失败处理策略
1.ErrorConfiguration.java 指定错误消息发送到异常交换机
package com.sunxiansheng.rabbitmq.error;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Description: 处理失败消息的交换机和队列
*
* @Author sun
* @Create 2024/12/31 19:07
* @Version 1.0
*/
@Configuration
// 当配置文件中spring.rabbitmq.listener.simple.retry.enabled=true时,才会生效
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple", name = "retry.enabled", havingValue = "true")
public class ErrorConfiguration {
/**
* 一个error交换机
*/
@Bean
public DirectExchange errorExchange() {
return new DirectExchange("error.exchange");
}
/**
* 一个error队列
*/
@Bean
public Queue errorQueue() {
return new Queue("error.queue");
}
/**
* 绑定error队列到error交换机
*/
@Bean
public Binding errorBinding() {
return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");
}
/**
* MessageRecoverer
*/
@Bean
public MessageRecoverer myMessageRecoverer(RabbitTemplate rabbitTemplate) {
// 指定错误消息发送到error.exchange交换机,routingKey为error
return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
}
}
2.ErrorListener.java 异常队列监听器
package com.sunxiansheng.consumer.error;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* Description: 错误消息监听器
*
* @Author sun
* @Create 2024/12/31 20:32
* @Version 1.0
*/
@Component
@Slf4j
public class ErrorListener {
@RabbitListener(queues = "error.queue")
public void errorListener(Message message) {
// 解析错误信息
ErrorMessageHandler.handleErrorMessage("error.queue", message);
}
}
3.ErrorMessageHandler.java 异常消息处理器
package com.sunxiansheng.consumer.error;
import com.rabbitmq.client.LongString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
* Description: 错误消息处理器
*
* @Author sun
* @Create 2024/12/31 20:32
* @Version 1.0
*/
@Slf4j
public class ErrorMessageHandler {
public static void handleErrorMessage(String listenerName, Message message) {
// 获取消息属性
MessageProperties messageProperties = message.getMessageProperties();
String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
Map<String, Object> headers = messageProperties.getHeaders();
// 从消息头部获取异常信息
String exceptionMessage = (String) headers.get("x-exception-message");
String originalExchange = (String) headers.get("x-original-exchange");
String originalRoutingKey = (String) headers.get("x-original-routingKey");
// 处理LongString类型的异常堆栈跟踪信息
String exceptionStackTrace = null;
if (headers.containsKey("x-exception-stacktrace")) {
Object stacktraceObject = headers.get("x-exception-stacktrace");
if (stacktraceObject instanceof LongString) {
exceptionStackTrace = stacktraceObject.toString();
}
}
// 格式化输出所有信息,并在前后添加分割线
log.error("\n-------------------------------\n" +
"MQ错误监听队列: {}\n" +
"原始交换机: {}\n" +
"原始路由键: {}\n" +
"原始信息: {}\n" +
"异常信息: {}\n" +
"异常堆栈: {}\n" +
"-------------------------------",
listenerName, originalExchange, originalRoutingKey, messageBody, exceptionMessage, exceptionStackTrace);
}
}
4.TestConfig.java配置
package com.sunxiansheng.publisher.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Description: 测试配置类
*
* @Author sun
* @Create 2024/12/31 19:00
* @Version 1.0
*/
@Configuration
public class TestConfig {
/**
* 创建一个fanout类型的交换机
*
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.exchange.test");
}
/**
* 创建一个队列
*
* @return
*/
@Bean
public Queue fanoutQueueTest() {
return QueueBuilder.durable("lazyQueue") // 持久化队列
.lazy() // 惰性队列
.build();
}
/**
* 交换机和队列绑定
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(fanoutQueueTest()).to(fanoutExchange());
}
}
5.TestConfigPublisher.java 生产者
package com.sunxiansheng.publisher.pub;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.UUID;
/**
* Description: 测试发布者
*
* @Author sun
* @Create 2024/12/31 19:05
* @Version 1.0
*/
@RestController
@Slf4j
public class TestConfigPublisher {
@Resource
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
public void send() {
log.info("发送消息");
// 1.创建CorrelationData对象
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
// 2.设置回调
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable throwable) {
// 基本不可能发生,因为这里的异常不是MQ问题导致的
log.error("ConfirmCallback:消息发送失败(非MQ问题):{}", throwable.getMessage());
}
@Override
public void onSuccess(CorrelationData.Confirm confirm) {
// 判断是否发送成功
if (confirm.isAck()) {
log.info("ConfirmCallback:消息发送成功:{}", confirm);
} else {
log.error("ConfirmCallback:消息发送失败:{}", confirm.getReason());
}
}
});
rabbitTemplate.convertAndSend("fanout.exchange.test", "", "hello rabbitmq", cd);
}
}
6.TestConfigConsumer.java 消费者故意消费失败
package com.sunxiansheng.consumer.con;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* Description: 测试消费者
*
* @Author sun
* @Create 2024/12/31 19:03
* @Version 1.0
*/
@Component
@Slf4j
public class TestConfigConsumer {
@RabbitListener(queues = "fanout.queue.test")
public void receive(String message) {
log.info("接收到的消息:{}", message);
int i = 1 / 0;
}
}