RabbitMQ:业务幂等、死信交换机
业务幂等性
幂等是一个数学概念,用函数表达式来描述是这样的: f(x) = f(f(x)) 。在程序开发中,幂等性指的是无论操作执行多少次,结果都是一致的。在消息队列的上下文中,幂等性意味着即使消费者多次接收到同一条消息,也不会对业务逻辑产生负面影响。
为什么需要幂等性?
在 RabbitMQ 中,消息可能会被重复消费,原因包括:
- 网络问题:消费者处理完消息后,发送确认(ACK)时网络中断,RabbitMQ 未收到确认,会重新投递消息。
- 消费者故障:消费者处理消息时崩溃,未发送确认,RabbitMQ 会重新投递消息。
- 手动重试:业务逻辑中手动触发了消息的重新投递。
如果消费者没有实现幂等性,重复消费可能会导致数据不一致或业务逻辑错误。例如:
- 订单支付消息被重复消费,导致用户被多次扣款。
- 库存扣减消息被重复消费,导致库存数量错误。
延迟消息接收
延迟消息在以下场景中非常有用:
1、定时任务:如订单超时未支付自动取消(12306买票)。
2、重试机制:如消息处理失败后延迟重试。
3、通知提醒:如预约提醒、会议通知等。
实现延迟消息的常见方法
死信交换机机制
死信: 死信(Dead Letter)是指那些无法被正常消费的消息。
如果队列通过 dead-letter-exchange 属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称 DLX)
私信交换机具体实现思路
消费者监听dlx队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dlx.queue", durable = "true"), // 定义死信队列,名称为 dlx.queue,并设置为持久化
exchange = @Exchange(name = "dlx.direct", type = ExchangeTypes.DIRECT), // 定义死信交换机,名称为 dlx.direct,类型为 DIRECT
key = {"hi"} // 绑定路由键为 "hi"
))
public void listenDlxQueue(String message) {
log.info("消费者监听到 dlx.queue 的消息: [{}]", message); // 打印从死信队列中接收到的消息
}
定义普通交换机
@Configuration // 声明这是一个 Spring 配置类
public class NormalConfiguration {
// 定义普通交换机
@Bean
public DirectExchange normalExchange() {
return new DirectExchange(name = "normal.direct"); // 创建一个名为 normal.direct 的 DIRECT 类型交换机
}
// 定义普通队列
@Bean
public Queue normalQueue() {
return QueueBuilder
.durable(name = "normal.queue") // 创建一个名为 normal.queue 的持久化队列
.deadLetterExchange(dlx = "dlx.direct") // 设置死信交换机为 dlx.direct
.build();
}
// 绑定普通队列到普通交换机
@Bean
public Binding normalExchangeBinding(Queue normalQueue, DirectExchange normalExchange) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(routingKey = "hi"); // 将 normal.queue 绑定到 normal.direct 交换机,并使用路由键 "hi"
}
}
发送消息,同时设置延迟时间
@Test
void testSendDelayMessage() {
String messageId = UUID.randomUUID().toString(); // 生成唯一ID
rabbitTemplate.convertAndSend(
"normal.direct",
"hi",
"hello",
message -> {
message.getMessageProperties().setExpiration("10000");
message.getMessageProperties().setCorrelationId(messageId); // 设置消息的唯一ID
return message;
}
);
// 将 messageId 存储到数据库或缓存中,以便后续使用
}
使用死信交换机
public void onPaymentSuccess(String messageId) {
// 支付成功后的逻辑
cancelDelayMessage(messageId); // 取消延迟消息
}
删除队列里等待的消息
@Autowired
private RabbitTemplate rabbitTemplate;
public void cancelDelayMessage(String messageId) {
// 从队列中删除消息
rabbitTemplate.execute(channel -> {
GetResponse response = channel.basicGet("normal.queue", false); // 从队列中获取消息
while (response != null) {
AMQP.BasicProperties properties = response.getProps();
if (messageId.equals(properties.getCorrelationId())) {
channel.basicAck(response.getEnvelope().getDeliveryTag(), false); // 确认消息
return null; // 找到并删除消息
}
response = channel.basicGet("normal.queue", false);
}
return null;
});
}