当前位置: 首页 > article >正文

Saga 模式实战 Demo

以下是一个基于 Apache Camel 的完整 Saga 模式实战 Demo,包含订单创建、库存扣减、支付扣款和物流发送四个服务,结合补偿机制实现最终一致性。代码已充分注释,可直接运行。


环境准备

  1. 依赖添加
    pom.xml 中添加 Apache Camel 和 ActiveMQ 依赖:

    <dependencies>
        <!-- Apache Camel 核心 -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-core</artifactId>
            <version>3.14.0</version>
        </dependency>
        <!-- Camel Saga 组件 -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-saga</artifactId>
            <version>3.14.0</version>
        </dependency>
        <!-- ActiveMQ 集成 -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-activemq</artifactId>
            <version>3.14.0</version>
        </dependency>
        <!-- SLF4J 日志 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.36</version>
        </dependency>
    </dependencies>
    
  2. ActiveMQ 配置
    application.yml 中配置 ActiveMQ 连接:

    camel:
      activemq:
        broker-url: tcp://localhost:61616
        connection-factory:
          pooling:
            enabled: true
            max-connections: 10
      components:
        activemq:
          component-name: activemq
          broker-url: tcp://localhost:61616
    

业务代码实现

1. 订单服务(OrderService)
@Service
public class OrderService {

    @Autowired
    private InventoryService inventoryService;

    @Autowired
    private PaymentService paymentService;

    @Autowired
    private LogisticsService logisticsService;

    @Autowired
    private CamelContext camelContext;

    // Saga 起始点:创建订单
    public void createOrder(OrderRequest request) {
        camelContext.createProducerTemplate().sendBody("direct:startOrder", request);
    }
}

2. 服务实现类
InventoryService(库存服务)
@Service
public class InventoryService {

    // 扣减库存(正向操作)
    public void deductStock(String sku) {
        // 模拟库存扣减(实际应查询数据库)
        System.out.println("扣减库存 SKU: " + sku + ",剩余库存: 99");
        // 故意模拟 30% 失败率
        if (Math.random() < 0.3) {
            throw new RuntimeException("库存不足,SKU: " + sku);
        }
    }

    // 恢复库存(补偿操作)
    public void restoreStock(String sku) {
        System.out.println("恢复库存 SKU: " + sku + ",剩余库存: 100");
    }
}
PaymentService(支付服务)
@Service
public class PaymentService {

    // 扣款(正向操作)
    public void chargePayment(BigDecimal amount) {
        System.out.println("扣款金额: " + amount + ",成功");
        // 模拟 20% 失败率
        if (Math.random() < 0.2) {
            throw new RuntimeException("支付失败,金额: " + amount);
        }
    }

    // 退款(补偿操作)
    public void refundPayment(BigDecimal amount) {
        System.out.println("退款金额: " + amount + ",成功");
    }
}
LogisticsService(物流服务)
@Service
public class LogisticsService {

    // 发送物流(正向操作)
    public void sendLogistics(Order order) {
        System.out.println("物流已发送,订单号: " + order.getId());
    }
}

3. Camel 路由配置

src/main/resources/routes.xml 中定义 Saga 路由:

<?xml version="1.0" encoding="UTF-8"?>
<routes xmlns="http://camel.apache.org/schema/spring">
    <!-- Saga 路由定义 -->
    <route id="orderSaga" errorHandler="errorHandler">
        <from uri="direct:startOrder"/>
        
        <!-- Step 1: 扣减库存 -->
        <Saga>
            <serviceCall>
                <method>inventoryService.deductStock</method>
                <arguments>
                    <argument>${header.sku}</argument>
                </arguments>
            </serviceCall>
            <compensationMethod>inventoryService.restoreStock</compensationMethod>
        </Saga>
        <to uri="direct:generateOrder"/>
        
        <!-- Step 2: 生成订单 -->
        <Saga>
            <serviceCall>
                <method>orderDAO.insertOrder</method>
                <arguments>
                    <argument>${body.order}</argument>
                </arguments>
            </serviceCall>
            <compensationMethod>orderDAO.deleteOrder</compensationMethod>
        </Saga>
        <to uri="direct:chargePayment"/>
        
        <!-- Step 3: 扣款 -->
        <Saga>
            <serviceCall>
                <method>paymentService.chargePayment</method>
                <arguments>
                    <argument>${header.amount}</argument>
                </arguments>
            </serviceCall>
            <compensationMethod>paymentService.refundPayment</compensationMethod>
        </Saga>
        <to uri="direct:sendLogistics"/>
        
        <!-- Step 4: 发送物流 -->
        <Saga>
            <serviceCall>
                <method>logisticsService.sendLogistics</method>
                <arguments>
                    <argument>${body.order}</argument>
                </arguments>
            </serviceCall>
            <!-- 无需补偿操作 -->
        </Saga>
    </route>

    <!-- 错误处理 -->
    <errorHandler id="errorHandler">
        <onException>
            <exception>java.lang.RuntimeException</exception>
            <handled>true</handled>
            <log message="发生错误: ${exception.message}"/>
            <!-- 触发全链路补偿 -->
            <redeliveryPolicy>
                <maximumRedeliveries>3</maximumRedeliveries>
                <delay>5000</delay>
            </redeliveryPolicy>
        </onException>
    </errorHandler>
</routes>

4. 配置与启动

启动类配置

在 Spring Boot 启动类中启用 Camel 路由:

@SpringBootApplication
public class SagaDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(SagaDemoApplication.class, args);
    }

    @Bean
    public RouteBuilder routeBuilder() {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                // 加载 XML 路由
                from("file:src/main/resources/routes.xml").to("camelContext:routes");
            }
        };
    }
}

5. 测试与验证

1. 正常流程

发送请求:

OrderRequest request = new OrderRequest(
    "user123",
    new BigDecimal("100.00"),
    "SKU_123"
);

orderService.createOrder(request);

输出日志

扣减库存 SKU: SKU_123,剩余库存: 99
扣款金额: 100.00,成功
物流已发送,订单号: 1
2. 模拟支付失败

修改 PaymentService.chargePayment 方法,强制抛出异常:

public void chargePayment(BigDecimal amount) {
    throw new RuntimeException("支付失败!");
}

再次发送请求,观察补偿流程:

扣减库存 SKU: SKU_123,剩余库存: 99
退款金额: 100.00,成功
恢复库存 SKU: SKU_123,剩余库存: 100

关键原理与优势

1. Saga 路由定义

<Saga> 标签:标记事务边界,自动绑定正向服务和补偿方法。
serviceCall:指定具体服务方法及参数(通过 ${header}${body} 传递上下文)。
compensationMethod:定义失败时的反向补偿操作。

2. 消息持久化

ActiveMQ 配置:通过 camel-activemq 的持久化配置,确保事件在服务重启后不丢失。

3. 错误重试

<redeliveryPolicy>:配置最大重试次数和延迟时间,避免无限重试。


常见问题与解决

1. 补偿操作未执行

检查点
• 确保补偿方法签名与正向方法完全一致。
• 验证服务类是否被 Spring 管理(添加 @Service 注解)。

2. 消息重复消费

解决方案
• 在事件中添加全局唯一 ID(如 UUID),通过 exchange.getMessage().getMessageId() 获取。
• 使用 Redis 或数据库记录已处理消息。

3. 服务调用超时

解决方案
• 在 serviceCall 中配置超时时间:
xml <serviceCall> <method>...</method> <timeout>5000</timeout> <!-- 5秒超时 --> </serviceCall>


总结

通过 Apache Camel 的 Saga 组件,我们无需手动编写复杂的补偿逻辑,即可实现分布式事务的最终一致性保障。其核心优势在于:
声明式路由:通过 XML 或 DSL 定义事务流程,清晰直观。
自动补偿:失败时自动触发逆向操作,减少开发成本。
高可用性:结合 ActiveMQ 实现事件持久化和重试机制。

适用场景
• 微服务架构下的长事务(如电商订单、金融交易)。
• 需要灵活补偿逻辑的复杂业务场景。

进阶方向
• 结合 Seata 框架实现更强大的分布式事务管理。
• 添加监控和报警(如 Prometheus + Grafana)实时跟踪 Saga 执行状态。


http://www.kler.cn/a/594953.html

相关文章:

  • Compose 实践与探索十五 —— 自定义触摸
  • Prometheus Exporter系列-Postgres_Exporter一键部署
  • Java 大视界 -- Java 大数据分布式计算中的通信优化与网络拓扑设计(145)
  • Python 单例模式的 5 种实现方式:深入解析与最佳实践
  • 如何给商品一键换色?图生生AI,告别繁琐修图
  • 【Dify平台】Function Call 模式模式和ReAct模型有什么不同?
  • Compose 实践与探索十六 —— 与传统的 View 系统混用
  • Q2 电商订单数据分析优化
  • QT Quick(C++)跨平台应用程序项目实战教程 3 — 项目基本设置(窗体尺寸、中文标题、窗体图标、可执行程序图标)
  • uniapp整合SQLite(Android)
  • 集成学习(下):Stacking集成方法
  • MANISKILL3:GPU 并行机器人模拟和渲染,用于通用的具身AI
  • 贪心算法(10)(java)跳跃游戏
  • hive的基础函数>>集合函数, 条件函数, 类型转换函数
  • GEO:在AI时代抢占DeepSeekC位?
  • 深度学习项目--基于ResNet和DenseNet混合架构网络论文的复现(pytorch实现)
  • 【大模型理论篇】CogVLM:多模态预训练语言模型
  • python3.13.2安装详细步骤(附安装包)
  • 【设计模式】C++ 单例模式总结与最佳实践
  • OO_Unit1