当前位置: 首页 > article >正文

消息队列的幂等问题解决方案

消息队列的幂等性问题是指在处理重复消息时,保证消息被多次消费但只产生一次影响。由于网络延迟、消费端异常等原因,消息可能会被重复投递或消费,因此消息处理的幂等性是保证系统数据一致性的重要环节。

1. 解决幂等问题常见方案

1.1 唯一请求ID(去重ID)

每个消息都包含一个唯一的 ID,生产者在生成消息时给每个消息赋予一个唯一的标识(如 UUID)。
消费者在处理消息前,先检查该消息的 ID 是否已经处理过。如果已经处理过,则直接丢弃;如果没有处理过,则进行处理并记录这个 ID。
记录方式可以是数据库表或缓存(如 Redis),用于存储已经处理过的消息 ID。

1.2 数据库唯一约束

在数据库中为相关字段设置唯一约束。例如,在处理丁丹时,可以在订单表中为订单号设置唯一索引。
在插入数据时,如果因为唯一约束导致插入失败,则说明该消息已经被处理过,这样就保证了幂等性。

1.3 状态检查

在处理消息时,首先查询当前状态,看是否已经达到目标状态。
只有当状态符合预期时,才进行下一步处理,并更新状态。
这种方式适用于多步骤的业务逻辑,每个步骤都有明确的状态变化。

1.4 使用Redis原子操作

利用Redis的SETNX(SET if Not Exists)操作。先尝试将消息的唯一ID设置到Redis中,如果成功(返回1),则表明是第一次处理,可以继续处理消息;如果失败(返回0),则说明消息已经被处理过。
使用Redis的TTL功能,可以给记录的消息ID设置过期时间,避免Redis中记录无限增长。

1.5 幂等性业务逻辑

将业务逻辑设计为幂等的,即多次执行相同的操作不会影响最终结果。例如,扣减库存操作可以设计为将库存设置为剩余数量而非从库存中减去数量
使用数学运算的性质。例如,订单金额计算可以直接将订单的最终状态写入,而不是进行增量计算。

1.6 实现示例

以使用唯一请求ID和Redis为例:
1.生产者
生成消息时,附加一个唯一的ID,如UUID。

String uniqueId = UUID.randomUUID().toString();
Message message = new Message(uniqueId, data);

2.消费者
在消费消息时,先检查Redis中是否存在该消息ID。

String uniqueId = message.getUniqueId();
boolean isFirstProcess = redisTemplate.opsForValue().setIfAbsent(uniqueId, "1", 10, TimeUnit.MINUTES);

if (isFirstProcess) {
    // 处理消息
    processMessage(message);
} else {
    // 消息已经处理过,直接丢弃
}

注意事项
去重 ID 的有效期:在使用 Redis 等缓存进行去重时,要注意设置合理的过期时间,以免 Redis 中存储过多的已处理消息 ID。
数据一致性:幂等操作的目标是保证数据的一致性。在实际实现时,要结合业务需求确保数据的正确性,避免因重复消费导致的状态不一致。
性能影响:为保证幂等性而查询数据库或缓存可能带来性能开销,因此需要根据业务场景选择合适的实现方案。

2. RocketMQ 和 RabbitMQ 的幂等性实现示例

2.1 RocketMQ幂等实现

RocketMQ 提供了消息幂等性处理的功能,通过消费者端的逻辑来确保消息的幂等性。
1. 生产者发送消息
生成一个唯一的业务 ID(如订单号、交易 ID)并将其作为消息的属性发送到 RocketMQ。

DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.start();

String uniqueId = UUID.randomUUID().toString();
Message message = new Message("TopicTest", "TagA", uniqueId, data.getBytes());

SendResult sendResult = producer.send(message);
producer.shutdown();

2. 消费者处理消息
消费者在处理消息前,检查唯一业务 ID 是否已经处理过。
使用 Redis 的 SETNX 或数据库的唯一约束来判断是否已处理。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.subscribe("TopicTest", "*");

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        String uniqueId = msg.getKeys();
        
        // 使用 Redis 判断是否已处理过
        boolean isFirstProcess = redisTemplate.opsForValue().setIfAbsent(uniqueId, "1", 10, TimeUnit.MINUTES);

        if (isFirstProcess) {
            // 处理消息
            processMessage(msg);
        } else {
            // 消息已经处理过,直接忽略
        }
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

consumer.start();

2.2 RabbitMQ 幂等性实现

在 RabbitMQ 中,幂等性处理主要在消费者端完成,类似于 RocketMQ。
1. 生产者发送消息
生产者发送消息时,同样附带一个唯一的业务 ID。

String uniqueId = UUID.randomUUID().toString();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(uniqueId);

Message message = new Message(data.getBytes(), messageProperties);
rabbitTemplate.send("exchange", "routingKey", message);

2. 消费者处理消息
在消费者处理消息前,检查唯一业务 ID 是否已存在。
可以通过 Redis 或数据库来进行幂等性检查。

@RabbitListener(queues = "queueName")
public void processMessage(Message message) {
    String uniqueId = message.getMessageProperties().getMessageId();

    // 使用 Redis 判断是否已处理过
    boolean isFirstProcess = redisTemplate.opsForValue().setIfAbsent(uniqueId, "1", 10, TimeUnit.MINUTES);

    if (isFirstProcess) {
        // 处理消息
        processBusinessLogic(message);
    } else {
        // 消息已经处理过,直接忽略
    }
}

3. 电商支付模块的幂等性问题

在电商系统中,支付模块的幂等性尤为重要,因为重复支付会导致资金错误和用户体验问题。常见的支付幂等性处理策略包括:
1.唯一交易 ID
每个支付请求都生成一个唯一的交易 ID,并将其作为支付请求的唯一标识。
支付服务在接收到请求时,先查询数据库中是否已经存在该交易 ID。如果存在,说明支付请求已经处理过;如果不存在,才进行支付处理并记录该交易 ID。
2. 数据库事务
利用数据库的事务和唯一约束,保证同一个交易 ID 的支付记录只能插入一次。
通过数据库的原子性操作,确保支付的原子性和幂等性。
3. 支付状态检查
在处理支付请求时,首先检查当前订单的支付状态。
如果订单已经是支付成功状态,则直接返回成功;如果未支付或支付中,则进行下一步支付操作。

public synchronized boolean processPayment(String orderId, BigDecimal amount) {
    // 查询订单状态
    Order order = orderRepository.findById(orderId);
    if (order.getStatus() == OrderStatus.PAID) {
        // 已支付,直接返回
        return true;
    }

    // 进行支付操作
    boolean paymentSuccess = paymentGateway.pay(orderId, amount);
    if (paymentSuccess) {
        // 更新订单状态
        order.setStatus(OrderStatus.PAID);
        orderRepository.save(order);
    }
    return paymentSuccess;
}

4. 幂等性 Token
在发起支付请求时,生成一个幂等性 Token 并将其发送到支付服务端。
支付服务端接收到请求后,使用该 Token 判断该请求是否已经处理过。

实现示例

1.支付请求
在发起支付请求时,生成唯一交易 ID,并在支付请求中附带该 ID。

String transactionId = UUID.randomUUID().toString();
PaymentRequest paymentRequest = new PaymentRequest(orderId, amount, transactionId);

2. 支付服务端
支付服务端接收到支付请求后,检查交易 ID 是否已经处理过。
使用数据库或 Redis 记录已经处理过的交易 ID。

public boolean processPayment(PaymentRequest request) {
    String transactionId = request.getTransactionId();
    boolean isFirstProcess = redisTemplate.opsForValue().setIfAbsent(transactionId, "1", 10, TimeUnit.MINUTES);

    if (isFirstProcess) {
        // 执行支付逻辑
        boolean success = paymentGateway.pay(request.getOrderId(), request.getAmount());

        if (success) {
            // 支付成功,更新订单状态
            updateOrderStatus(request.getOrderId(), OrderStatus.PAID);
        }

        return success;
    } else {
        // 重复请求,直接返回成功
        return true;
    }
}

通过在 RocketMQ 和 RabbitMQ 中使用唯一 ID 来确保消息处理的幂等性,以及在电商支付模块中利用唯一交易 ID 和状态检查,可以有效避免重复处理带来的问题,保证系统数据的一致性和正确性。


http://www.kler.cn/news/308754.html

相关文章:

  • 51单片机+proteus+学习3(串口、矩阵按键)
  • 了解华为云容器引擎(Cloud Container Engine)
  • 关于http的206状态码和416状态码的意义、断点续传以及CORS使用Access-Control-Allow-Origin来允许跨域请求
  • 网络运维故障处理案例
  • 武汉传媒学院联合创龙教仪建设DSP教学实验箱,基于DSP C6000平台搭建
  • Pytorch详解-模型模块(RNN,CNN,FNN,LSTM,GRU,TCN,Transformer)
  • 了解云容器实例云容器实例(Cloud Container Instance)
  • JavaSE入门
  • 多线程同步
  • 【数据结构】经典题
  • 初始MYSQL数据库(5)—— 索引
  • LabVIEW减速机加载控制系统
  • HarmonyOS 实现沉浸式效果
  • Spring自定义注解
  • 超全网络安全面试题汇总(2024版)
  • 速盾:网页游戏可以开cdn吗?
  • selenium元素定位:元素点击交互异常解决方法
  • 1.数据结构-双链表
  • YOLOv8改进 - 注意力篇 - 引入CBAM注意力机制
  • TCP.IP四层模型
  • Redis命令:redis-cli
  • 【乐企】基础请求封装
  • 【基于C++的产品入库管理系统】
  • Java项目实战II基于Java+Spring Boot+MySQL的图书管理系统的设计与实现 (源码+数据库+文档)
  • 关于yolov5遇到空标签导致训练暂停的解决
  • C++基于select和epoll的TCP服务器
  • 计算机毕业设计 毕业季一站式旅游服务定制平台的设计与实现 Java实战项目 附源码+文档+视频讲解
  • sshj使用代理连接服务器
  • as 类型断言
  • 动手学深度学习(四)卷积神经网络-下