java每日精进 2.24 【MQ实际应用场景】
1.可靠的消息投递方案
一、可靠消息投递的核心目标
-
消息不丢失:
-
确保消息从生产者到RabbitMQ,再到消费者的整个链路中不会丢失。
-
-
消息不重复:
-
避免消息因网络问题或重试机制导致重复投递。
-
-
消息顺序性:
-
在需要保证顺序的场景下,确保消息按顺序投递。
-
二、可靠消息投递方案设计
1. 生产者端的可靠性
生产者在发送消息时,需要确保消息成功到达RabbitMQ。如果消息发送失败,需要进行重试。
实现步骤:
-
开启生产者确认模式(Publisher Confirm):
-
RabbitMQ提供了生产者确认机制,生产者发送消息后,RabbitMQ会返回一个确认(ack)或拒绝(nack)。
-
配置生产者确认模式:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.confirmSelect(); // 开启确认模式
-
-
异步监听确认结果:
-
通过异步监听确认结果,确保消息成功发送。
channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) { System.out.println("消息已确认,deliveryTag: " + deliveryTag); } @Override public void handleNack(long deliveryTag, boolean multiple) { System.out.println("消息未确认,deliveryTag: " + deliveryTag); // 重试发送消息 } });
-
-
消息持久化:
-
发送消息时,将消息标记为持久化,确保RabbitMQ重启后消息不会丢失。
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) // 2表示持久化消息 .build(); channel.basicPublish("exchange", "routingKey", properties, message.getBytes());
-
-
生产者重试机制:
-
如果消息发送失败(未收到ack),生产者需要进行重试。
-
可以使用Spring Retry等工具实现重试逻辑。
-
2. RabbitMQ端的可靠性
RabbitMQ需要确保消息在队列中不会丢失,并且在消费者处理失败时能够重新投递。
实现步骤:
-
队列持久化:
-
创建队列时,将队列标记为持久化。
boolean durable = true; channel.queueDeclare("queueName", durable, false, false, null);
-
-
交换机持久化:
-
创建交换机时,将交换机标记为持久化。
boolean durable = true; channel.exchangeDeclare("exchangeName", "direct", durable);
-
-
消息持久化:
-
发送消息时,将消息标记为持久化(见生产者端实现)。
-
-
死信队列(DLX):
-
如果消息在队列中无法被消费者正确处理,可以将其路由到死信队列,避免消息丢失。
Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "dlxExchange"); args.put("x-dead-letter-routing-key", "dlxRoutingKey"); channel.queueDeclare("queueName", true, false, false, args);
-
3. 消费者端的可靠性
消费者需要确保消息被正确处理,并且在处理失败时能够重新投递。
实现步骤:
-
手动确认模式(Manual Ack):
-
消费者在处理完消息后,手动向RabbitMQ发送确认(ack)。如果处理失败,则不发送ack,RabbitMQ会将消息重新投递。
boolean autoAck = false; // 关闭自动确认 channel.basicConsume("queueName", autoAck, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { // 处理消息 processMessage(body); // 手动确认 channel.basicAck(envelope.getDeliveryTag(), false); } catch (Exception e) { // 处理失败,拒绝消息并重新入队 channel.basicNack(envelope.getDeliveryTag(), false, true); } } });
-
-
消费者重试机制:
-
如果消息处理失败,消费者可以进行重试。可以在消费者端实现重试逻辑,或者依赖RabbitMQ的重试机制。
-
-
幂等性设计:
-
消费者处理消息时需要保证幂等性,避免重复消费导致的数据不一致。
-
可以通过唯一ID、数据库唯一约束等方式实现幂等性。
-
4. 消息可靠性增强
-
事务机制:
-
RabbitMQ支持事务机制,但性能较差,通常不推荐使用。
channel.txSelect(); // 开启事务 try { channel.basicPublish("exchange", "routingKey", properties, message.getBytes()); channel.txCommit(); // 提交事务 } catch (Exception e) { channel.txRollback(); // 回滚事务 }
-
-
消息补偿机制:
-
如果消息最终无法投递或处理,可以通过日志记录、告警等方式进行人工干预。
-
三、完整流程示例
-
生产者发送消息:
-
开启生产者确认模式,发送持久化消息。
-
如果未收到ack,进行重试。
-
-
RabbitMQ存储消息:
-
将消息存储在持久化队列中。
-
如果消息无法投递,路由到死信队列。
-
-
消费者处理消息:
-
使用手动确认模式,确保消息被正确处理。
-
如果处理失败,拒绝消息并重新入队。
-
2.代码示例
一、生产者发送消息
1.代码实现:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrderProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendOrderMessage(String orderId) {
// 设置消息属性(持久化)
MessageProperties properties = new MessageProperties();
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 持久化消息
Message message = new Message(("订单消息:订单ID=" + orderId).getBytes(), properties);
// 设置消息ID
CorrelationData correlationData = new CorrelationData(orderId);
// 发送消息
rabbitTemplate.convertAndSend("order_exchange", "order_routing_key", message, correlationData);
// 监听确认回调
rabbitTemplate.setConfirmCallback((correlationData1, ack, cause) -> {
if (ack) {
System.out.println("消息已确认,订单ID: " + correlationData1.getId());
} else {
System.out.println("消息未确认,订单ID: " + correlationData1.getId() + ",原因: " + cause);
// 重试发送消息
retrySendMessage(orderId);
}
});
}
private void retrySendMessage(String orderId) {
// 重试逻辑
System.out.println("重试发送消息,订单ID: " + orderId);
sendOrderMessage(orderId);
}
}
2.注
MessageProperties
:代表消息的属性,这些属性决定消息的行为和存储方式。setDeliveryMode(MessageDeliveryMode.PERSISTENT)
:设置消息为持久化,即使 RabbitMQ 宕机,消息也不会丢失。持久化的消息会保存在磁盘中。- 创建一个消息对象 (
Message
),消息内容是一个字符串订单消息:订单ID=xxx
,并将其转换为字节数组(getBytes()
)。 - 该消息还附带了我们之前设置的属性 (
properties
),比如消息的持久化属性。 CorrelationData
:这是一个用于关联发送的消息与回调(确认)信息的对象。在此例中,它使用订单ID作为消息的唯一标识符。这样在回调中就能知道是哪个订单的消息。convertAndSend
方法用于将消息发送到指定的交换机、路由键和消息。它的参数如下:"order_exchange"
:目标交换机的名称。"order_routing_key"
:路由键,用来决定消息应该被发送到哪个队列。message
:要发送的消息。correlationData
:消息的唯一标识符,用于确认回调。
3.说明
-
消息持久化:
-
通过
MessageProperties
设置消息为持久化。
-
-
生产者确认模式:
-
使用
setConfirmCallback
监听消息确认结果。
-
-
重试机制:
-
如果消息未确认(ack=false),触发重试逻辑。
-
二、RabbitMQ 配置
1.代码实现:
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
// 订单交换机
@Bean
public Exchange orderExchange() {
return new DirectExchange("order_exchange", true, false); // 持久化交换机
}
// 订单队列
@Bean
public Queue orderQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange"); // 死信交换机
args.put("x-dead-letter-routing-key", "dlx_routing_key"); // 死信路由键
return new Queue("order_queue", true, false, false, args); // 持久化队列
}
// 绑定订单队列和交换机
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order_routing_key").noargs();
}
// 死信交换机
@Bean
public Exchange dlxExchange() {
return new DirectExchange("dlx_exchange", true, false); // 持久化死信交换机
}
// 死信队列
@Bean
public Queue dlxQueue() {
return new Queue("dlx_queue", true); // 持久化死信队列
}
// 绑定死信队列和死信交换机
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx_routing_key").noargs();
}
}
2.注
- 订单队列 (
order_queue
) 用于处理正常的消息,若消息无法消费或有问题,它会被转发到 死信交换机 (dlx_exchange
)。 - 死信队列 (
dlx_queue
) 用于存储那些无法正常消费的消息(死信)。 orderQueue
和dlxQueue
都是持久化队列,保证消息不会丢失。orderExchange
和dlxExchange
都是持久化交换机,确保交换机本身也不会丢失。
3.说明
-
持久化交换机和队列:
-
交换机和队列都设置为持久化。
-
-
死信队列配置:
-
如果消息无法被正确处理,会被路由到死信队列。
-
三、消费者处理消息
1.代码实现:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrderConsumer {
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "order_queue")
public void handleOrderMessage(Message message) {
try {
// 处理消息
String orderMessage = new String(message.getBody());
processOrder(orderMessage);
// 手动确认消息
rabbitTemplate.execute(channel -> {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return null;
});
System.out.println("消息处理成功:" + orderMessage);
} catch (Exception e) {
// 处理失败,拒绝消息并重新入队
rabbitTemplate.execute(channel -> {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
return null;
});
System.out.println("消息处理失败,重新入队:" + new String(message.getBody()));
}
}
private void processOrder(String orderMessage) throws Exception {
// 模拟订单处理逻辑
System.out.println("处理订单:" + orderMessage);
if (orderMessage.contains("失败")) {
throw new Exception("订单处理失败");
}
}
}
2.备注
basicAck
方法确认消息已经被成功处理并从队列中移除。deliveryTag
是消息的唯一标识符,false
表示消息仅对当前消费者确认。basicNack
方法用于拒绝消息并将其重新入队。deliveryTag
标识消息,false
表示该消息不需要重新路由,true
表示消息会被重新入队供后续消费者继续消费。processOrder
方法模拟了一个订单的处理过程。- 如果订单消息包含
"失败"
字符串,就抛出一个异常,表示订单处理失败。这会触发异常处理逻辑,将消息重新入队。 channel.basicNack
:这是用于拒绝当前消息的操作。它有三个参数:deliveryTag
:消息的唯一标识符,可以通过message.getMessageProperties().getDeliveryTag()
获取。false
:表示是否对多个消息进行批量确认。false
表示仅拒绝当前消息,而不是批量拒绝。true
:表示是否将消息重新入队。如果设为true
,消息会被重新投递到队列中,供后续消费者再次处理。
3.说明
-
手动确认模式:
-
使用
basicAck
确认消息,使用basicNack
拒绝消息并重新入队。
-
-
幂等性设计:
-
在
processOrder
方法中实现幂等性逻辑。
-
四、死信队列处理
代码实现:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQTest implements CommandLineRunner {
@Autowired
private OrderProducer orderProducer;
@Override
public void run(String... args) throws Exception {
// 发送正常消息
orderProducer.sendOrderMessage("123");
// 发送失败消息(模拟处理失败)
orderProducer.sendOrderMessage("456失败");
}
}
说明:
-
死信队列消费者:
-
监听死信队列,记录日志或触发告警。
-
五、全流程测试
测试代码:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQTest implements CommandLineRunner {
@Autowired
private OrderProducer orderProducer;
@Override
public void run(String... args) throws Exception {
// 发送正常消息
orderProducer.sendOrderMessage("123");
// 发送失败消息(模拟处理失败)
orderProducer.sendOrderMessage("456失败");
}
}
测试结果:
-
正常消息:
-
消息被成功处理,消费者发送
ack
。
-
-
失败消息:
-
消息处理失败,消费者发送
nack
并重新入队。 -
如果多次重试失败,消息最终进入死信队列。
-
六、总结
通过 RabbitMQTemplate
,我们可以轻松实现 RabbitMQ 的可靠消息投递方案:
-
生产者端:
-
使用
ConfirmCallback
确保消息成功发送。
-
-
RabbitMQ 配置:
-
配置持久化交换机和队列,以及死信队列。
-
-
消费者端:
-
使用手动确认模式和幂等性设计,确保消息被正确处理。
-
-
死信队列:
-
处理无法正常消费的消息,记录日志或人工干预。
-
3.优化
在 processOrder
方法中实现幂等性设计,可以确保即使同一条消息被多次消费,也不会对系统状态产生副作用。以下是几种常见的幂等性设计方法,并结合代码示例进行实现。
一、幂等性设计方法
1. 基于唯一标识的幂等性
通过消息的唯一标识(如订单ID)来确保同一条消息只被处理一次。
实现步骤:
-
在数据库中为订单ID添加唯一约束。
-
在处理消息时,先检查订单是否已处理。
代码示例:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Transactional
public void processOrder(String orderMessage) throws Exception {
// 解析订单ID
String orderId = extractOrderId(orderMessage);
// 检查订单是否已处理
if (orderRepository.existsById(orderId)) {
System.out.println("订单已处理,跳过:" + orderId);
return;
}
// 处理订单逻辑
System.out.println("处理订单:" + orderId);
if (orderMessage.contains("失败")) {
throw new Exception("订单处理失败");
}
// 保存订单
Order order = new Order();
order.setId(orderId);
order.setStatus("已处理");
orderRepository.save(order);
}
private String extractOrderId(String orderMessage) {
// 假设订单消息格式为 "订单消息:订单ID=123"
return orderMessage.split("=")[1];
}
}
2. 基于 Redis 的幂等性
使用 Redis 缓存记录已处理的消息ID,确保同一条消息只被处理一次。
实现步骤:
-
在处理消息前,检查 Redis 中是否存在该消息ID。
-
如果存在,则跳过处理;如果不存在,则处理消息并将消息ID存入 Redis。
代码示例:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class OrderService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void processOrder(String orderMessage) throws Exception {
// 解析订单ID
String orderId = extractOrderId(orderMessage);
// 检查 Redis 中是否存在该订单ID
String key = "order:" + orderId;
if (redisTemplate.opsForValue().setIfAbsent(key, "processed", 30, TimeUnit.MINUTES)) {
// 处理订单逻辑
System.out.println("处理订单:" + orderId);
if (orderMessage.contains("失败")) {
throw new Exception("订单处理失败");
}
} else {
System.out.println("订单已处理,跳过:" + orderId);
}
}
private String extractOrderId(String orderMessage) {
// 假设订单消息格式为 "订单消息:订单ID=123"
return orderMessage.split("=")[1];
}
}
3. 基于数据库状态机的幂等性
通过状态机设计,确保订单状态只能从特定状态转移到下一状态。
实现步骤:
-
在订单表中添加状态字段。
-
处理消息时,检查订单状态是否符合预期。
代码示例:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Transactional
public void processOrder(String orderMessage) throws Exception {
// 解析订单ID
String orderId = extractOrderId(orderMessage);
// 查询订单
Order order = orderRepository.findById(orderId).orElseThrow(() -> new Exception("订单不存在"));
// 检查订单状态
if ("已处理".equals(order.getStatus())) {
System.out.println("订单已处理,跳过:" + orderId);
return;
}
// 处理订单逻辑
System.out.println("处理订单:" + orderId);
if (orderMessage.contains("失败")) {
throw new Exception("订单处理失败");
}
// 更新订单状态
order.setStatus("已处理");
orderRepository.save(order);
}
private String extractOrderId(String orderMessage) {
// 假设订单消息格式为 "订单消息:订单ID=123"
return orderMessage.split("=")[1];
}
}
二、整合到 processOrder
方法
以下是基于 Redis 的幂等性设计整合到 processOrder
方法的完整代码:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class OrderConsumer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RedisTemplate<String, String> redisTemplate;
@RabbitListener(queues = "order_queue")
public void handleOrderMessage(Message message) {
try {
// 处理消息
String orderMessage = new String(message.getBody());
processOrder(orderMessage);
// 手动确认消息
rabbitTemplate.execute(channel -> {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return null;
});
System.out.println("消息处理成功:" + orderMessage);
} catch (Exception e) {
// 处理失败,拒绝消息并重新入队
rabbitTemplate.execute(channel -> {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
return null;
});
System.out.println("消息处理失败,重新入队:" + new String(message.getBody()));
}
}
private void processOrder(String orderMessage) throws Exception {
// 解析订单ID
String orderId = extractOrderId(orderMessage);
// 检查 Redis 中是否存在该订单ID
String key = "order:" + orderId;
if (redisTemplate.opsForValue().setIfAbsent(key, "processed", 30, TimeUnit.MINUTES)) {
// 处理订单逻辑
System.out.println("处理订单:" + orderId);
if (orderMessage.contains("失败")) {
throw new Exception("订单处理失败");
}
} else {
System.out.println("订单已处理,跳过:" + orderId);
}
}
private String extractOrderId(String orderMessage) {
// 假设订单消息格式为 "订单消息:订单ID=123"
return orderMessage.split("=")[1];
}
}
三、总结
在 processOrder
方法中实现幂等性设计,可以有效避免消息重复消费带来的问题。常用的方法包括:
-
基于唯一标识的幂等性:
-
使用数据库唯一约束或 Redis 缓存记录已处理的消息ID。
-
-
基于状态机的幂等性:
-
通过状态机设计,确保订单状态只能从特定状态转移到下一状态。
-
在实际项目中,可以根据业务需求选择合适的方法,或结合多种方法实现更强大的幂等性保障。