Saga 模式实战 Demo
以下是一个基于 Apache Camel 的完整 Saga 模式实战 Demo,包含订单创建、库存扣减、支付扣款和物流发送四个服务,结合补偿机制实现最终一致性。代码已充分注释,可直接运行。
环境准备
-
依赖添加
在pom.xml
中添加 Apache Camel 和 ActiveMQ 依赖:<dependencies> <!-- Apache Camel 核心 --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>3.14.0</version> </dependency> <!-- Camel Saga 组件 --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-saga</artifactId> <version>3.14.0</version> </dependency> <!-- ActiveMQ 集成 --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-activemq</artifactId> <version>3.14.0</version> </dependency> <!-- SLF4J 日志 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.36</version> </dependency> </dependencies>
-
ActiveMQ 配置
在application.yml
中配置 ActiveMQ 连接:camel: activemq: broker-url: tcp://localhost:61616 connection-factory: pooling: enabled: true max-connections: 10 components: activemq: component-name: activemq broker-url: tcp://localhost:61616
业务代码实现
1. 订单服务(OrderService)
@Service
public class OrderService {
@Autowired
private InventoryService inventoryService;
@Autowired
private PaymentService paymentService;
@Autowired
private LogisticsService logisticsService;
@Autowired
private CamelContext camelContext;
// Saga 起始点:创建订单
public void createOrder(OrderRequest request) {
camelContext.createProducerTemplate().sendBody("direct:startOrder", request);
}
}
2. 服务实现类
InventoryService(库存服务)
@Service
public class InventoryService {
// 扣减库存(正向操作)
public void deductStock(String sku) {
// 模拟库存扣减(实际应查询数据库)
System.out.println("扣减库存 SKU: " + sku + ",剩余库存: 99");
// 故意模拟 30% 失败率
if (Math.random() < 0.3) {
throw new RuntimeException("库存不足,SKU: " + sku);
}
}
// 恢复库存(补偿操作)
public void restoreStock(String sku) {
System.out.println("恢复库存 SKU: " + sku + ",剩余库存: 100");
}
}
PaymentService(支付服务)
@Service
public class PaymentService {
// 扣款(正向操作)
public void chargePayment(BigDecimal amount) {
System.out.println("扣款金额: " + amount + ",成功");
// 模拟 20% 失败率
if (Math.random() < 0.2) {
throw new RuntimeException("支付失败,金额: " + amount);
}
}
// 退款(补偿操作)
public void refundPayment(BigDecimal amount) {
System.out.println("退款金额: " + amount + ",成功");
}
}
LogisticsService(物流服务)
@Service
public class LogisticsService {
// 发送物流(正向操作)
public void sendLogistics(Order order) {
System.out.println("物流已发送,订单号: " + order.getId());
}
}
3. Camel 路由配置
在 src/main/resources/routes.xml
中定义 Saga 路由:
<?xml version="1.0" encoding="UTF-8"?>
<routes xmlns="http://camel.apache.org/schema/spring">
<!-- Saga 路由定义 -->
<route id="orderSaga" errorHandler="errorHandler">
<from uri="direct:startOrder"/>
<!-- Step 1: 扣减库存 -->
<Saga>
<serviceCall>
<method>inventoryService.deductStock</method>
<arguments>
<argument>${header.sku}</argument>
</arguments>
</serviceCall>
<compensationMethod>inventoryService.restoreStock</compensationMethod>
</Saga>
<to uri="direct:generateOrder"/>
<!-- Step 2: 生成订单 -->
<Saga>
<serviceCall>
<method>orderDAO.insertOrder</method>
<arguments>
<argument>${body.order}</argument>
</arguments>
</serviceCall>
<compensationMethod>orderDAO.deleteOrder</compensationMethod>
</Saga>
<to uri="direct:chargePayment"/>
<!-- Step 3: 扣款 -->
<Saga>
<serviceCall>
<method>paymentService.chargePayment</method>
<arguments>
<argument>${header.amount}</argument>
</arguments>
</serviceCall>
<compensationMethod>paymentService.refundPayment</compensationMethod>
</Saga>
<to uri="direct:sendLogistics"/>
<!-- Step 4: 发送物流 -->
<Saga>
<serviceCall>
<method>logisticsService.sendLogistics</method>
<arguments>
<argument>${body.order}</argument>
</arguments>
</serviceCall>
<!-- 无需补偿操作 -->
</Saga>
</route>
<!-- 错误处理 -->
<errorHandler id="errorHandler">
<onException>
<exception>java.lang.RuntimeException</exception>
<handled>true</handled>
<log message="发生错误: ${exception.message}"/>
<!-- 触发全链路补偿 -->
<redeliveryPolicy>
<maximumRedeliveries>3</maximumRedeliveries>
<delay>5000</delay>
</redeliveryPolicy>
</onException>
</errorHandler>
</routes>
4. 配置与启动
启动类配置
在 Spring Boot 启动类中启用 Camel 路由:
@SpringBootApplication
public class SagaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SagaDemoApplication.class, args);
}
@Bean
public RouteBuilder routeBuilder() {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
// 加载 XML 路由
from("file:src/main/resources/routes.xml").to("camelContext:routes");
}
};
}
}
5. 测试与验证
1. 正常流程
发送请求:
OrderRequest request = new OrderRequest(
"user123",
new BigDecimal("100.00"),
"SKU_123"
);
orderService.createOrder(request);
输出日志:
扣减库存 SKU: SKU_123,剩余库存: 99
扣款金额: 100.00,成功
物流已发送,订单号: 1
2. 模拟支付失败
修改 PaymentService.chargePayment
方法,强制抛出异常:
public void chargePayment(BigDecimal amount) {
throw new RuntimeException("支付失败!");
}
再次发送请求,观察补偿流程:
扣减库存 SKU: SKU_123,剩余库存: 99
退款金额: 100.00,成功
恢复库存 SKU: SKU_123,剩余库存: 100
关键原理与优势
1. Saga 路由定义
• <Saga>
标签:标记事务边界,自动绑定正向服务和补偿方法。
• serviceCall
:指定具体服务方法及参数(通过 ${header}
或 ${body}
传递上下文)。
• compensationMethod
:定义失败时的反向补偿操作。
2. 消息持久化
• ActiveMQ 配置:通过 camel-activemq
的持久化配置,确保事件在服务重启后不丢失。
3. 错误重试
• <redeliveryPolicy>
:配置最大重试次数和延迟时间,避免无限重试。
常见问题与解决
1. 补偿操作未执行
• 检查点:
• 确保补偿方法签名与正向方法完全一致。
• 验证服务类是否被 Spring 管理(添加 @Service
注解)。
2. 消息重复消费
• 解决方案:
• 在事件中添加全局唯一 ID(如 UUID),通过 exchange.getMessage().getMessageId()
获取。
• 使用 Redis 或数据库记录已处理消息。
3. 服务调用超时
• 解决方案:
• 在 serviceCall
中配置超时时间:
xml <serviceCall> <method>...</method> <timeout>5000</timeout> <!-- 5秒超时 --> </serviceCall>
总结
通过 Apache Camel 的 Saga 组件,我们无需手动编写复杂的补偿逻辑,即可实现分布式事务的最终一致性保障。其核心优势在于:
• 声明式路由:通过 XML 或 DSL 定义事务流程,清晰直观。
• 自动补偿:失败时自动触发逆向操作,减少开发成本。
• 高可用性:结合 ActiveMQ 实现事件持久化和重试机制。
适用场景:
• 微服务架构下的长事务(如电商订单、金融交易)。
• 需要灵活补偿逻辑的复杂业务场景。
进阶方向:
• 结合 Seata 框架实现更强大的分布式事务管理。
• 添加监控和报警(如 Prometheus + Grafana)实时跟踪 Saga 执行状态。