当前位置: 首页 > article >正文

基于本地消息表实现分布式事务

假设我们有一个电商系统,包含订单服务和库存服务。当用户下单时,需要在订单服务中创建订单,同时在库存服务中扣减库存。这是一个典型的分布式事务场景,我们需要保证这两个操作要么都成功,要么都失败,以保证数据的最终一致性。

项目结构:

  1. 订单服务(Order Service)
  2. 库存服务(Inventory Service)
  3. 本地消息表(Local Message Table)
  4. 消息恢复系统(Message Recovery System)

核心思想:
使用本地消息表来实现分布式事务。在订单服务中,我们将创建订单和发送消息这两个操作放在一个本地事务中。如果本地事务成功,则订单创建成功,消息也被保存到本地消息表中。然后通过定时任务或消息队列来发送消息到库存服务,实现库存扣减。如果在这个过程中出现任何异常,我们可以通过重试机制来保证最终一致性。

下面是详细的代码实现:

订单服务(Order Service)

@Service  
@Transactional  
public class OrderService {  

    @Autowired  
    private OrderRepository orderRepository;  

    @Autowired  
    private LocalMessageRepository localMessageRepository;  

    @Autowired  
    private KafkaTemplate<String, String> kafkaTemplate;  

    public void createOrder(Order order) {  
        // 开启本地事务  
        TransactionStatus txStatus = transactionManager.getTransaction(new DefaultTransactionDefinition());  

        try {  
            // 1. 保存订单  
            orderRepository.save(order);  

            // 2. 创建本地消息  
            LocalMessage message = new LocalMessage();  
            message.setMessageId(UUID.randomUUID().toString());  
            message.setMessage(JSON.toJSONString(order));  
            message.setStatus("NEW");  
            localMessageRepository.save(message);  

            // 3. 提交事务  
            transactionManager.commit(txStatus);  

            // 4. 发送消息到Kafka  
            kafkaTemplate.send("inventory-topic", message.getMessageId(), message.getMessage());  
        } catch (Exception e) {  
            // 回滚事务  
            transactionManager.rollback(txStatus);  
            throw new RuntimeException("Create order failed", e);  
        }  
    }  
}

库存服务(Inventory Service)

@Service  
public class InventoryService {  

    @Autowired  
    private InventoryRepository inventoryRepository;  

    @KafkaListener(topics = "inventory-topic")  
    public void handleOrderCreation(ConsumerRecord<String, String> record) {  
        String messageId = record.key();  
        Order order = JSON.parseObject(record.value(), Order.class);  

        try {  
            // 扣减库存  
            inventoryRepository.decreaseStock(order.getProductId(), order.getQuantity());  

            // 确认消息处理成功  
            kafkaTemplate.send("inventory-result-topic", messageId, "SUCCESS");  
        } catch (Exception e) {  
            // 消息处理失败,发送失败消息  
            kafkaTemplate.send("inventory-result-topic", messageId, "FAILED");  
        }  
    }  
}

本地消息表(Local Message Table)

@Entity  
@Table(name = "local_message")  
public class LocalMessage {  
    @Id  
    private String messageId;  
    private String message;  
    private String status; // NEW, SENT, CONFIRMED  
    private Date createTime;  
    private Date updateTime;  

    // Getters and setters  
}

消息恢复系统(Message Recovery System)

@Component  
public class MessageRecoverySystem {  

    @Autowired  
    private LocalMessageRepository localMessageRepository;  

    @Autowired  
    private KafkaTemplate<String, String> kafkaTemplate;  

    @Scheduled(fixedRate = 60000) // 每分钟执行一次  
    public void recoverFailedMessages() {  
        List<LocalMessage> failedMessages = localMessageRepository.findByStatusAndCreateTimeBefore("NEW", new Date(System.currentTimeMillis() - 300000)); // 5分钟前的消息  

        for (LocalMessage message : failedMessages) {  
            try {  
                kafkaTemplate.send("inventory-topic", message.getMessageId(), message.getMessage());  
                message.setStatus("SENT");  
                localMessageRepository.save(message);  
            } catch (Exception e) {  
                // 记录日志,等待下次重试  
                log.error("Failed to recover message: " + message.getMessageId(), e);  
            }  
        }  
    }  

    @KafkaListener(topics = "inventory-result-topic")  
    public void handleInventoryResult(ConsumerRecord<String, String> record) {  
        String messageId = record.key();  
        String result = record.value();  

        LocalMessage message = localMessageRepository.findById(messageId).orElse(null);  
        if (message != null) {  
            if ("SUCCESS".equals(result)) {  
                message.setStatus("CONFIRMED");  
            } else {  
                message.setStatus("FAILED");  
            }  
            localMessageRepository.save(message);  
        }  
    }  
}

代码说明:

  1. 订单服务:
    • 在一个本地事务中完成订单创建和本地消息保存。
    • 事务成功后,立即发送消息到Kafka。
  2. 库存服务:
    • 监听Kafka消息,处理库存扣减。
    • 处理结果(成功或失败)通过Kafka反馈给订单服务。
  3. 本地消息表:
    • 存储待发送的消息,包括消息ID、内容、状态等信息。
  4. 消息恢复系统:
    • 定期检查本地消息表,重新发送失败的消息。
    • 监听库存服务的处理结果,更新本地消息状态。

项目亮点:

  1. 高可用性: 即使在网络故障或服务宕机的情况下,也能保证消息最终被成功处理。
  2. 数据一致性: 通过本地事务保证订单创建和消息发送的原子性,再通过消息重试机制保证最终一致性。
  3. 解耦性: 订单服务和库存服务通过消息进行异步通信,降低了系统耦合度。
  4. 可靠性: 使用本地消息表作为消息队列的可靠存储,避免了消息丢失的风险。
  5. 扩展性: 该方案易于扩展,可以方便地增加新的微服务而不影响现有服务。
  6. 性能: 采用异步处理方式,提高了系统的整体吞吐量。

通过这种方式,我们实现了在分布式系统中保证数据最终一致性的目标,同时保持了系统的高可用性和可扩展性。这种方案特别适用于对实时性要求不是特别高,但对数据一致性有较高要求的业务场景。


系列文章

  1. IT Governance Framework:IT治理框架
  2. 12306亿级流量架构分析(史上最全)
  3. 京东内部Redis性能优化最佳实践
  4. 金融级多数据中心灾备互联
  5. TOGAF业务架构-CSDN博客
  6. 如何建设金融数据中心-CSDN博客

互联网Java架构师-CSDN博客


资料下载和预览地址:

  • 链接: https://pan.baidu.com/s/1LFyFlsIHCv46DBQRfMGP9A 提取码: kx6b 


http://www.kler.cn/a/512245.html

相关文章:

  • 深度学习在语音识别中的应用
  • Numpy基础01(Jupyter基本用法/Ndarray创建与基本操作)
  • Jenkins-pipeline Jenkinsfile说明
  • JWT在线解密/JWT在线解码 - 加菲工具
  • C# 解析 HTML 实战指南
  • 力扣 有效的括号
  • JAVAweb学习日记(五) SpringBootWeb
  • Vue+Element-ui 中 使用el-table 设置表格单元格 (el -table-column) 保留空格和换行
  • ASP .NET Core 学习(.NET9)配置接口访问路由
  • 从CentOS到龙蜥:企业级Linux迁移实践记录(容器与应用)
  • 25届合肥工业大学自动化考研复试攻略
  • 大型语言模型(LLM)在算法设计中的系统性综述
  • MySQL 数据库的备份和恢复(Linux)
  • Spring Boot安全加固:基于Spring Security的权限管理
  • vue2 - Day05 - VueX
  • WOA-CNN-GRU-Attention、CNN-GRU-Attention、WOA-CNN-GRU、CNN-GRU四模型对比多变量时序预测
  • qml OpacityMask详解
  • 简单prometheus+grafana+pushgateway采集GPU利用率和交换机流量
  • uniapp(小程序、app、微信公众号、H5)预览下载文件(pdf)
  • ORACLE 12C开启EM EXPRESS过程
  • 架构优化指南:五大场景下如何发现隐藏的耦合?
  • 自动化爬虫运行过程中,有没有办法提高爬虫的抓取效率?
  • 介绍 HTTP 请求如何实现跨域
  • Oracle之RMAN备份异机恢复(单机到单机)
  • doris 2.1 Queries Acceleration-Hints 学习笔记
  • 头歌实训作业 算法设计与分析-贪心算法(第1关:部分背包问题)