当前位置: 首页 > article >正文

读写分离架构下的一致性挑战

读写分离架构下的一致性挑战

    • 什么是读写分离架构
    • 读写分离架构的一致性挑战
      • 主从复制延迟
      • 事务不一致
    • 主从切换导致的数据丢失
    • 跨表/跨库操作的一致性问题
    • 缓存与数据库的一致性问题
    • 查询路由策略不当导致的问题
    • 全局二级索引的一致性问题
    • 历史查询与实时数据的一致性
    • 分布式锁与读写分离的冲突
    • 解决策略
      • 读写一致性路由
    • 延迟双删策略
    • 异步确认机制
      • 版本号或时间戳
      • 事件溯源
      • 半同步复制
      • 分布式事务
      • TCC模式(Try-Confirm-Cancel)
      • 业务补偿

什么是读写分离架构

读写分离架构是一种常见的数据库架构,它将数据库分为读数据库和写数据库.
读数据库用于读取数据,写数据库用于写入数据.
通过将读操作和写操作分别路由到不同的数据库实例,以提高系统的整体性能和可扩展性。

读写分离架构的一致性挑战

读写分离架构下的一致性挑战主要体现在以下几个方面

主从复制延迟

问题描述
主库的写入操作需要一定时间才能同步到从库,这个时间窗口内,从库的数据是不完整或过时的。
具体表现

// 用户更新个人信息
@Transactional
public void updateUserProfile(Long userId, UserProfile profile) {
    // 写入主库
    userRepository.save(profile);  // 立即写入主库
    
    // 用户立即查询自己的资料(可能路由到从库)
    UserProfile updatedProfile = userQueryService.getUserProfile(userId);  // 可能读取到旧数据
    
    // 用户看到的可能仍是更新前的数据,造成困惑
}

影响

  • 用户刚提交的数据在查询时看不到,导致用户体验差
  • 业务逻辑可能基于过时数据做出错误决策
  • 数据不一致可能导致应用层面的错误

事务不一致

问题描述
在分布式事务中,无法保证主库和从库的事务边界一致,可能导致从库读取到部分提交的事务数据。
具体表现

@Transactional
public void transferMoney(Long fromAccount, Long toAccount, BigDecimal amount) {
    // 在主库执行
    accountRepository.deduct(fromAccount, amount);  // 第一步:扣款
    accountRepository.add(toAccount, amount);       // 第二步:入账
    
    // 如果此时从库只同步了第一步,尚未同步第二步
    // 其他服务查询账户余额(从从库读取)
    BigDecimal fromBalance = accountQueryService.getBalance(fromAccount);  // 已减少
    BigDecimal toBalance = accountQueryService.getBalance(toAccount);      // 尚未增加
    
    // 系统中暂时出现了资金总量不平衡的情况
}
影响
- 可能读取到不符合业务约束的中间状态数据
- 统计和报表可能不准确
- 违反业务规则和数据完整性约束

### 读写不一致问题
问题描述
用户写入数据后立即读取,可能读不到刚写入的数据,造成用户困惑。
具体表现
```java
// 用户下单后查看订单
@Transactional
public OrderResponse placeOrder(OrderRequest request) {
    // 创建订单(写主库)
    Order order = orderService.createOrder(request);
    
    // 用户立即查询订单列表(读从库)
    List<Order> recentOrders = orderQueryService.getRecentOrders(request.getUserId());
    
    // 刚创建的订单可能不在列表中,因为从库尚未同步
    return new OrderResponse(order.getId(), recentOrders);
}

影响

  • 用户体验差,产生数据丢失的错觉
  • 可能导致用户重复操作(如重复下单)
  • 增加客服和技术支持负担

主从切换导致的数据丢失

问题描述
当主库发生故障,从库提升为新主库时,如果从库数据不完整,可能导致数据永久丢失。
具体表现

// 假设系统正在处理大量订单
for (Order order : batchOrders) {
    orderRepository.save(order);  // 写入主库
}

// 此时主库崩溃,系统执行主从切换
// 如果部分订单尚未同步到从库,这些订单数据将永久丢失

影响

  • 数据永久丢失,无法恢复
  • 业务连续性受到严重影响
  • 可能导致财务和法律问题

跨表/跨库操作的一致性问题

问题描述
当业务操作涉及多个表或多个数据库时,由于复制延迟不同,可能导致数据之间的关联关系暂时不一致。
具体表现

@Transactional
public void createUserWithWallet(User user) {
    // 创建用户(表1)
    userRepository.save(user);
    
    // 创建钱包(表2)
    Wallet wallet = new Wallet(user.getId(), BigDecimal.ZERO);
    walletRepository.save(wallet);
    
    // 如果从库同步了用户表但尚未同步钱包表
    // 其他服务查询用户和钱包信息
    User queriedUser = userQueryService.getUser(user.getId());        // 能查到
    Wallet queriedWallet = walletQueryService.getWallet(user.getId()); // 可能查不到
    
    // 出现了用户存在但钱包不存在的不一致状态
}

影响

  • 关联数据暂时不一致,可能导致业务逻辑错误
  • 外键和引用完整性约束在从库上暂时被破坏
  • 复杂查询可能返回不完整或不一致的结果

缓存与数据库的一致性问题

问题描述
在读写分离架构中引入缓存层,可能导致缓存、主库和从库三者之间的数据不一致。
具体表现

// 更新商品价格
@Transactional
public void updateProductPrice(Long productId, BigDecimal newPrice) {
    // 更新数据库
    productRepository.updatePrice(productId, newPrice);  // 写入主库
    
    // 删除缓存
    cacheService.delete("product:" + productId);  // 使缓存失效
    
    // 此时如果有请求查询商品
    Product product = productService.getProduct(productId);
    
    // 可能发生的情况:
    // 1. 缓存已失效,从从库读取,但从库尚未同步,返回旧价格
    // 2. 从库同步了新价格,但缓存重建后,主库又有了新的更新
}

影响

  • 多层数据不一致,问题排查困难
  • 缓存策略可能加剧一致性问题
  • 系统复杂度显著增加

查询路由策略不当导致的问题

问题描述
不恰当的读写分离路由策略可能导致关键业务查询到不一致的数据。
具体表现

// 支付处理流程
@Transactional
public PaymentResult processPayment(Payment payment) {
    // 保存支付记录
    paymentRepository.save(payment);  // 写入主库
    
    // 检查账户余额是否足够(关键查询)
    Account account = accountQueryService.getAccount(payment.getAccountId());  // 可能路由到从库
    
    if (account.getBalance().compareTo(payment.getAmount()) >= 0) {
        // 执行扣款
        accountRepository.deduct(payment.getAccountId(), payment.getAmount());
        return PaymentResult.success();
    } else {
        return PaymentResult.insufficientFunds();
    }
    
    // 如果余额查询路由到从库,且从库数据滞后
    // 可能基于过时的余额数据做出错误决策
}

影响

  • 业务决策基于过时数据,导致逻辑错误
  • 可能违反业务规则(如透支)
  • 数据完整性受到威胁

全局二级索引的一致性问题

问题描述
当使用分库分表架构并维护全局二级索引时,索引数据与主数据之间可能存在不一致。
具体表现

// 更新用户手机号
@Transactional
public void updateUserPhone(Long userId, String newPhone) {
    // 更新用户表
    userRepository.updatePhone(userId, newPhone);  // 写入主库
    
    // 更新手机号索引表(可能在不同的库)
    phoneIndexRepository.update(userId, newPhone);  // 写入索引库
    
    // 如果索引库的同步延迟与主库不同
    // 可能出现通过ID能查到新手机号,但通过手机号查询返回旧用户ID的情况
}

影响

  • 索引查询结果与直接查询结果不一致
  • 唯一性约束可能暂时被破坏
  • 复杂的分布式事务处理

历史查询与实时数据的一致性

问题描述
需要查询历史数据与当前状态的业务场景可能受到读写分离延迟的严重影响。
具体表现

// 订单履行过程
public void fulfillOrder(Long orderId) {
    // 更新订单状态为"已发货"
    orderRepository.updateStatus(orderId, "SHIPPED");  // 写入主库
    
    // 查询订单历史状态变更记录
    List<OrderStatusChange> history = orderHistoryService.getStatusHistory(orderId);  // 可能从从库读取
    
    // 生成发货通知
    String lastStatus = history.isEmpty() ? null : history.get(history.size() - 1).getStatus();
    if (!"SHIPPED".equals(lastStatus)) {
        // 由于从库延迟,可能看不到最新的"已发货"状态
        notificationService.sendShippingNotification(orderId);  // 可能重复发送通知
    }
}

影响

  • 业务流程可能基于不完整的历史记录做出错误决策
  • 状态机转换可能不符合预期
  • 审计和合规风险

分布式锁与读写分离的冲突

问题描述
使用分布式锁控制并发访问时,如果锁的检查和数据读取分别路由到不同的库,可能导致锁失效。
具体表现

// 使用数据库实现分布式锁
public boolean acquireLock(String lockKey, String owner, int timeoutSeconds) {
    try {
        // 尝试插入锁记录
        lockRepository.insert(lockKey, owner, timeoutSeconds);  // 写入主库
        return true;
    } catch (DuplicateKeyException e) {
        // 锁已存在,检查是否过期
        Lock lock = lockRepository.findByKey(lockKey);  // 可能从从库读取
        
        // 如果锁记录尚未同步到从库,会误认为锁不存在
        if (lock == null || lock.isExpired()) {
            lockRepository.update(lockKey, owner, timeoutSeconds);  // 更新或创建锁
            return true;
        }
        return false;
    }
}

影响

  • 分布式锁可能失效,导致并发控制问题
  • 可能出现多个客户端同时获得锁的情况
  • 数据竞争和覆盖风险增加

解决策略

针对读写分离架构下的一致性挑战,常见的解决策略包括:

  1. 读写一致性路由:根据业务需求,将需要强一致性的读请求路由到主库
  2. 延迟双删策略:更新数据后延迟一段时间再次删除缓存,减少不一致窗口
  3. 异步确认机制:写操作后异步确认数据已同步到从库
  4. 版本号或时间戳:使用版本号或时间戳标记数据版本,客户端可检测和处理过时数据
  5. 事件溯源:通过事件流重建状态,减少对实时一致性的依赖
  6. 半同步复制:确保至少一个从库接收到数据后才确认写入成功
  7. 分布式事务:对关键业务使用分布式事务确保一致性
  8. 业务补偿:设计业务流程以容忍暂时的不一致,并提供补偿机制

在实际应用中,通常需要根据业务特性和一致性要求,综合运用多种策略来解决读写分离架构下的一致性挑战。

读写一致性路由

@Service
public class DatabaseRoutingService {
    private final DataSource masterDataSource;
    private final DataSource slaveDataSource;
    
    // 线程本地变量,标记当前线程是否需要强一致性读
    private static final ThreadLocal<Boolean> FORCE_MASTER_READ = ThreadLocal.withInitial(() -> false);
    
    public static void setForceMasterRead(boolean forceMaster) {
        FORCE_MASTER_READ.set(forceMaster);
    }
    
    public static void clearForceMasterRead() {
        FORCE_MASTER_READ.remove();
    }
    
    public Connection getConnection(boolean isWrite) {
        // 写操作或强制主库读取时,使用主库
        if (isWrite || FORCE_MASTER_READ.get()) {
            return masterDataSource.getConnection();
        } else {
            return slaveDataSource.getConnection();
        }
    }
}

原理:通过线程本地变量(ThreadLocal)控制数据源路由,在需要强一致性的场景下将读请求路由到主库。
适用场景:
用户更新自己的资料后立即查看
支付后立即查询订单状态
任何需要立即看到写入结果的业务场景
优点:
实现简单,无需修改数据库架构
可以按需选择性地应用,不影响其他查询
对应用层透明,业务代码无需关心路由细节
缺点:
增加主库负载,可能影响写性能
需要谨慎管理ThreadLocal变量,避免内存泄漏
在分布式系统中,跨服务调用需要传递路由标记
实现细节:
使用AOP切面简化配置,通过注解标记需要主库读取的方法
可以设置超时机制,在写操作后的一段时间内自动路由到主库
在分布式系统中,可以通过请求头或上下文传递路由信息

延迟双删策略

@Service
public class ProductService {
    private final ProductRepository productRepository;
    private final RedisTemplate<String, Product> redisTemplate;
    private final ThreadPoolExecutor executor;
    
    @Transactional
    public void updateProduct(Product product) {
        String cacheKey = "product:" + product.getId();
        
        // 1. 先删除缓存
        redisTemplate.delete(cacheKey);
        
        // 2. 更新数据库
        productRepository.save(product);
        
        // 3. 延迟一段时间后再次删除缓存
        executor.execute(() -> {
            try {
                // 延迟500毫秒,大于主从复制的平均延迟时间
                Thread.sleep(500);
                redisTemplate.delete(cacheKey);
                log.info("Delayed cache deletion for product: {}", product.getId());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("Delayed deletion interrupted", e);
            }
        });
    }
}


原理:解决缓存与数据库不一致问题的策略,通过在更新数据库前后两次删除缓存,确保缓存最终一致性。
工作流程:
第一次删除缓存:防止旧缓存被读取
更新数据库:写入新数据
延迟删除缓存:等待主从复制完成后再次删除,防止从库读取到旧数据后重建缓存
延迟时间设置:
延迟时间应大于主从复制的平均延迟
可以通过监控系统动态调整延迟时间
典型值为几百毫秒到几秒不等,取决于系统特性
优点:
实现简单,无需复杂的分布式协调
能有效解决读写分离环境下的缓存一致性问题
对业务代码侵入性小,可以通过AOP实现
缺点:
在高并发场景下可能仍有一致性窗口
增加系统复杂度和资源消耗
延迟时间设置需要经验和测试

异步确认机制

实现方式

@Service
public class OrderService {
    private final OrderRepository orderRepository;
    private final SlaveStatusChecker slaveStatusChecker;
    private final NotificationService notificationService;
    
    @Transactional
    public OrderResult createOrder(Order order) {
        // 保存订单到主库
        Order savedOrder = orderRepository.save(order);
        
        // 异步等待从库同步完成
        CompletableFuture.runAsync(() -> {
            try {
                // 等待从库同步,最多等待2秒
                boolean synced = slaveStatusChecker.waitForSync(
                    "orders", savedOrder.getId(), 2, TimeUnit.SECONDS);
                
                if (synced) {
                    // 从库已同步,发送通知
                    notificationService.sendOrderConfirmation(savedOrder);
                } else {
                    // 从库同步超时,记录任务稍后重试
                    notificationService.scheduleDelayedNotification(savedOrder);
                }
            } catch (Exception e) {
                log.error("Error waiting for slave sync", e);
            }
        });
        
        return new OrderResult(savedOrder.getId(), "Order created successfully");
    }
}

原理:在写操作完成后,异步等待并确认数据已同步到从库,然后再执行依赖于该数据的后续操作。

实现细节:
使用轮询方式检查从库是否已同步特定记录
设置最大等待时间,避免无限等待
对于超时情况,提供降级策略(如延迟通知)

同步检测方法:
基于时间戳或版本号比较主从库数据
使用数据库复制状态信息(如MySQL的SHOW SLAVE STATUS)
在记录中添加唯一标识符,检查从库是否可查询到

适用场景:
订单确认邮件发送
支付成功后的业务流程
需要确保数据可见性的异步处理

优点:
不阻塞主请求流程,用户体验好
可以根据业务重要性设置不同的等待策略
提供了数据一致性和系统性能之间的平衡

缺点:
增加系统复杂度
需要额外的数据库查询来检查同步状态
在极端情况下可能导致处理延迟

版本号或时间戳

实现方式

@Entity
@Table(name = "products")
public class Product {
    @Id
    private Long id;
    private String name;
    private BigDecimal price;
    private int stock;
    
    @Version
    private int version;
    
    @Column(name = "updated_at")
    private Timestamp updatedAt;
    
    @PreUpdate
    @PrePersist
    public void updateTimestamp() {
        this.updatedAt = new Timestamp(System.currentTimeMillis());
    }
}

@Service
public class ProductService {
    // 客户端提供版本号的更新方法
    @Transactional
    public UpdateResult updateProductWithVersion(Product product, int expectedVersion) {
        try {
            // 查询当前产品
            Product currentProduct = productRepository.findById(product.getId())
                .orElseThrow(() -> new ProductNotFoundException());
            
            // 检查版本号
            if (currentProduct.getVersion() != expectedVersion) {
                return new UpdateResult(false, "Product has been modified by another user");
            }
            
            // 更新产品
            currentProduct.setName(product.getName());
            currentProduct.setPrice(product.getPrice());
            currentProduct.setStock(product.getStock());
            
            productRepository.save(currentProduct);
            return new UpdateResult(true, "Product updated successfully");
            
        } catch (ObjectOptimisticLockingFailureException e) {
            // 乐观锁异常,说明在更新过程中版本发生变化
            return new UpdateResult(false, "Concurrent update detected");
        }
    }
}

原理:使用版本号或时间戳标记数据版本,在读取和更新时检查版本信息,以检测并发修改和数据过时。

版本号实现:
使用@Version注解自动管理版本号
每次更新时版本号自增
更新时检查版本号是否匹配,不匹配则拒绝更新

时间戳实现:
使用@PreUpdate和@PrePersist自动更新时间戳
客户端记录读取数据时的时间戳
服务端返回数据时包含当前时间戳,客户端可比较判断数据是否过时

乐观锁机制:
假设并发冲突较少,先执行操作
在提交时检查版本是否变化
发现冲突时回滚或提供冲突解决机制

适用场景:
协同编辑系统
库存管理
任何需要检测并发修改的场景

优点:
无需使用数据库锁,并发性能好
可以检测并处理并发修改冲突
客户端可以感知数据是否过时

缺点:
在高并发场景下可能导致大量更新失败
需要在应用层处理冲突
增加了客户端复杂度

事件溯源

实现方式

// 事件存储
@Service
public class EventStore {
    private final JdbcTemplate jdbcTemplate;
    private final ObjectMapper objectMapper;
    
    public void saveEvent(DomainEvent event) {
        try {
            String eventData = objectMapper.writeValueAsString(event);
            jdbcTemplate.update(
                "INSERT INTO events (event_id, aggregate_id, event_type, event_data, timestamp) " +
                "VALUES (?, ?, ?, ?, ?)",
                event.getEventId().toString(),
                event.getAggregateId(),
                event.getClass().getSimpleName(),
                eventData,
                new Timestamp(event.getTimestamp())
            );
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Failed to serialize event", e);
        }
    }
    
    public List<DomainEvent> getEventsForAggregate(String aggregateId) {
        return jdbcTemplate.query(
            "SELECT event_type, event_data FROM events " +
            "WHERE aggregate_id = ? ORDER BY timestamp ASC",
            (rs, rowNum) -> {
                try {
                    String eventType = rs.getString("event_type");
                    String eventData = rs.getString("event_data");
                    Class<?> eventClass = Class.forName("com.example.events." + eventType);
                    return (DomainEvent) objectMapper.readValue(eventData, eventClass);
                } catch (Exception e) {
                    throw new RuntimeException("Failed to deserialize event", e);
                }
            },
            aggregateId
        );
    }
}

// 订单服务
@Service
public class OrderService {
    private final EventStore eventStore;
    private final OrderProjection orderProjection;
    
    @Transactional
    public String createOrder(CreateOrderCommand command) {
        // 生成订单ID
        String orderId = UUID.randomUUID().toString();
        
        // 创建订单创建事件
        OrderCreatedEvent event = new OrderCreatedEvent(
            orderId,
            command.getCustomerId(),
            command.getItems(),
            calculateTotal(command.getItems())
        );
        
        // 保存事件
        eventStore.saveEvent(event);
        
        // 返回订单ID
        return orderId;
    }
    
    public Order getOrder(String orderId) {
        // 从事件流重建订单状态
        List<DomainEvent> events = eventStore.getEventsForAggregate(orderId);
        return orderProjection.projectOrder(events);
    }
}

原理:不直接存储实体的当前状态,而是存储导致状态变化的所有事件,通过重放事件流来重建实体状态。

核心组件:
事件:系统中发生的所有状态变更都表示为不可变的事件
事件存储:持久化事件的仓库,通常按聚合ID组织
投影:将事件流转换为实体当前状态的过程

工作流程:
写操作:将业务操作转换为事件并持久化
读操作:从事件存储获取事件流,通过投影重建实体状态

一致性优势:
事件是事实的记录,不会过时
读操作可以选择重放到特定时间点的状态
不依赖于主从复制,避免了复制延迟问题

适用场景:
审计要求严格的系统
需要时间旅行能力的应用(查看历史状态)
复杂的业务领域,需要捕获所有状态变化

优点:
完整的审计跟踪
可以重建任意时间点的系统状态
解耦了写模型和读模型

缺点:
学习曲线陡峭
查询复杂度增加
存储空间需求增加

半同步复制

配置MySQL半同步复制

-- 在主库上启用半同步复制插件
INSTALL PLUGIN rpl_semi_sync_master SONAME 'semisync_master.so';

-- 配置主库参数
SET GLOBAL rpl_semi_sync_master_enabled = 1;
SET GLOBAL rpl_semi_sync_master_timeout = 10000; -- 10秒超时

-- 在从库上启用半同步复制插件
INSTALL PLUGIN rpl_semi_sync_slave SONAME 'semisync_slave.so';

-- 配置从库参数
SET GLOBAL rpl_semi_sync_slave_enabled = 1;

解析
原理:半同步复制要求至少一个从库确认接收到事务日志后,主库才向客户端确认事务提交成功。

工作流程:
客户端向主库提交事务
主库执行事务并生成二进制日志
主库等待至少一个从库确认接收到日志
主库向客户端确认事务提交成功
从库异步应用接收到的日志

超时机制:
设置rpl_semi_sync_master_timeout参数控制等待超时
如果超时未收到从库确认,会自动降级为异步复制
超时后事务仍然提交,但可能存在数据不一致风险

一致性保证:
确保至少一个从库接收到所有提交的事务
减少主库崩溃时的数据丢失风险
不保证从库已应用事务,只保证已接收

性能影响:
增加事务提交延迟,通常为网络往返时间
在网络不稳定时可能导致性能波动
相比异步复制,吞吐量略有下降

监控要点:
半同步状态(是否启用、是否降级)
从库连接数量
半同步等待时间
超时次数

分布式事务

使用XA事务

@Configuration
public class XADataSourceConfig {
    @Bean
    public DataSource dataSource() throws Exception {
        // 创建AtomikosDataSourceBean作为XA数据源
        AtomikosDataSourceBean dataSource = new AtomikosDataSourceBean();
        dataSource.setUniqueResourceName("xaDataSource");
        dataSource.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
        
        Properties props = new Properties();
        props.setProperty("URL", "jdbc:mysql://master-db:3306/mydb");
        props.setProperty("user", "dbuser");
        props.setProperty("password", "dbpass");
        
        dataSource.setXaProperties(props);
        dataSource.setPoolSize(5);
        
        return dataSource;
    }
    
    @Bean
    public JtaTransactionManager transactionManager() throws Exception {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setForceShutdown(false);
        
        UserTransactionImp userTransaction = new UserTransactionImp();
        userTransaction.setTransactionTimeout(300);
        
        JtaTransactionManager jtaTransactionManager = new JtaTransactionManager();
        jtaTransactionManager.setTransactionManager(userTransactionManager);
        jtaTransactionManager.setUserTransaction(userTransaction);
        jtaTransactionManager.setAllowCustomIsolationLevels(true);
        
        return jtaTransactionManager;
    }
}

原理:XA是一个两阶段提交协议,用于协调多个资源管理器(如数据库)参与的分布式事务。

两阶段提交流程:
准备阶段:事务管理器要求所有资源管理器准备提交事务
提交阶段:如果所有资源管理器都准备好,事务管理器指示它们提交事务;否则指示回滚

实现组件:
事务管理器:协调整个分布式事务(如Atomikos、Narayana)
XA资源:支持XA协议的资源(如MySQL的XA数据源)
JTA API:Java事务API,提供标准接口

一致性保证:
确保所有参与者要么全部提交,要么全部回滚
提供ACID属性跨多个资源
解决读写分离环境下的分布式事务问题

优点:
强一致性保证
对应用透明,使用标准事务注解
广泛支持(大多数关系型数据库)

缺点:
性能开销大,特别是在高并发场景
长时间持有资源锁,可能导致阻塞
协调者单点故障风险

适用场景:
金融交易
需要强一致性的关键业务操作
涉及多个数据源的事务

TCC模式(Try-Confirm-Cancel)

实现方式

@Service
public class OrderTccService {
    private final OrderRepository orderRepository;
    private final InventoryFeignClient inventoryClient;
    private final PaymentFeignClient paymentClient;
    
    @Transactional
    public String createOrder(OrderRequest request) {
        // 生成订单ID
        String orderId = UUID.randomUUID().toString();
        
        // Try阶段 - 预留资源
        try {
            // 创建订单(处于PENDING状态)
            Order order = new Order();
            order.setId(orderId);
            order.setCustomerId(request.getCustomerId());
            order.setItems(request.getItems());
            order.setTotalAmount(calculateTotal(request.getItems()));
            order.setStatus("PENDING");
            orderRepository.save(order);
            
            // 预留库存
            InventoryReservationResponse invResponse = inventoryClient.reserveInventory(
                new InventoryReservationRequest(orderId, request.getItems())
            );
            
            if (!invResponse.isSuccess()) {
                throw new RuntimeException("Failed to reserve inventory: " + invResponse.getMessage());
            }
            
            // 预授权支付
            PaymentAuthorizationResponse payResponse = paymentClient.authorizePayment(
                new PaymentAuthorizationRequest(
                    orderId, request.getCustomerId(), order.getTotalAmount())
            );
            
            if (!payResponse.isSuccess()) {
                throw new RuntimeException("Failed to authorize payment: " + payResponse.getMessage());
            }
            
            // 所有Try阶段成功,提交本地事务
            return orderId;
            
        } catch (Exception e) {
            // Try阶段失败,触发Cancel操作
            try {
                // 取消库存预留
                inventoryClient.cancelReservation(orderId);
                
                // 取消支付预授权
                paymentClient.cancelAuthorization(orderId);
                
                // 取消订单
                orderRepository.updateStatus(orderId, "CANCELLED");
            } catch (Exception ex) {
                log.error("Error during compensation", ex);
            }
            
            throw new RuntimeException("Order creation failed", e);
        }
    }
}

原理:TCC是一种补偿型事务模式,将分布式事务拆分为三个阶段:Try(尝试)、Confirm(确认)、Cancel(取消)。

三个阶段:
Try:资源检查和预留,但不实际执行业务操作
Confirm:确认执行业务操作,使预留的资源生效
Cancel:取消预留,释放资源

工作流程:
协调者调用所有参与者的Try接口
如果所有Try成功,调用所有Confirm接口
如果任何Try失败,调用已成功参与者的Cancel接口

一致性保证:
Try阶段预留资源,确保后续操作可行
Confirm/Cancel阶段必须保证幂等性,可重试
最终一致性,可能短暂不一致但最终会达到一致状态

与XA对比:
无需资源锁定,性能更好
更灵活,可以针对业务定制
实现复杂度更高,需要编写三个接口

适用场景:
高性能要求的分布式事务
长事务场景
跨异构系统的事务
实现挑战:

幂等性设计
悬挂事务处理(Confirm/Cancel先于Try到达)
空回滚处理(Try未执行但需要Cancel)

业务补偿

实现方式

@Service
public class OrderCompensationService {
    private final OrderRepository orderRepository;
    private final InventoryRepository inventoryRepository;
    private final PaymentRepository paymentRepository;
    private final NotificationService notificationService;
    
    @Scheduled(fixedRate = 300000) // 每5分钟执行一次
    public void detectAndCompensateInconsistencies() {
        // 查找可能存在不一致的订单
        List<Order> ordersToCheck = orderRepository.findByStatusAndCreatedAtBefore(
            "PROCESSING", 
            new Date(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(30))
        );
        
        for (Order order : ordersToCheck) {
            try {
                compensateIfNeeded(order);
            } catch (Exception e) {
                log.error("Error compensating order " + order.getId(), e);
            }
        }
    }
    
    private void compensateIfNeeded(Order order) {
        // 检查库存扣减状态
        boolean inventoryDeducted = inventoryRepository.isDeductedForOrder(order.getId());
        
        // 检查支付状态
        Payment payment = paymentRepository.findByOrderId(order.getId());
        boolean paymentProcessed = payment != null && "COMPLETED".equals(payment.getStatus());
        
        if (inventoryDeducted && paymentProcessed) {
            // 一切正常,更新订单状态为已完成
            order.setStatus("COMPLETED");
            orderRepository.save(order);
            
            // 发送订单确认通知
            notificationService.sendOrderConfirmation(order);
            
        } else if (inventoryDeducted && !paymentProcessed) {
            // 库存已扣减但支付未完成
            if (payment == null || "FAILED".equals(payment.getStatus())) {
                // 支付失败,需要恢复库存
                inventoryRepository.restoreInventory(order.getId());
                
                // 更新订单状态
                order.setStatus("PAYMENT_FAILED");
                orderRepository.save(order);
                
                // 通知客户支付失败
                notificationService.sendPaymentFailedNotification(order);
            }
        }
        // 其他情况处理...
    }
}

原理:业务补偿是一种基于最终一致性的策略,通过定期检测和修复数据不一致来确保系统最终达到一致状态。

核心组件:
补偿逻辑:针对不同类型的不一致状态执行相应的修复操作
状态跟踪:记录业务实体的处理状态和历史
告警机制:对无法自动修复的问题发出告警

工作流程:
定期扫描处于中间状态或可能不一致的数据
检查相关系统的状态,判断是否存在不一致
根据检测结果执行相应的补偿操作
记录补偿结果,必要时发送通知

补偿策略设计:
前向补偿:继续完成未完成的操作(如支付成功但订单未更新)
反向补偿:回滚已完成的操作(如订单取消但库存未恢复)
替代补偿:执行替代操作(如自动退款、发送道歉优惠券)

优点:
不阻塞主流程,用户体验好
适应性强,可处理各种异常情况
可以处理跨系统、长时间运行的业务流程

缺点:
实现复杂度高,需要考虑各种边缘情况
存在短暂的不一致窗口
需要仔细设计补偿逻辑,避免引入新问题

适用场景:
电商订单处理
支付系统
跨系统业务流程
长时间运行的工作流


http://www.kler.cn/a/573221.html

相关文章:

  • Unity3D主程入职90天工作计划
  • PHP 将图片url,写入到文件夹中,导出到zip下载到桌面
  • SpringBoot Actuator
  • 4. Prometheus监控数据持久化
  • Spring Boot中对接Twilio以实现发送验证码和验证短信码
  • Redis 篇
  • flask 安装后不能识别
  • C# 类型转换
  • 2025中企出海解决方案:工博科技联合SAP构建AI赋能的全球化管理平台
  • vscode远程ssh链接服务器
  • 迷你世界脚本背包接口:Backpack
  • [内网安全] Windows 网络认证 — 基于挑战响应认证的 NTLM 协议
  • Python Pandas实现GROUP BY WITH CUBE和WITH ROLLUP的分类汇总功能
  • 【极客时间】浏览器工作原理与实践-2 宏观视角下的浏览器 (6讲) - 2.5 渲染流程(上):HTML、CSS和JavaScript,是如何变成页面的?
  • 辉视融合服务器方案:为小酒店行业铺垫未来智能化布局
  • 极限入门题解析
  • oracle服务器通过进程查找对应的sql语句
  • 软件工程----软件可靠性建模与管理
  • 代码随想录算法训练营 | 图论 | 孤岛总面积、沉没孤岛
  • ⭐算法OJ⭐跳跃游戏【动态规划 + 单调队列】(C++实现)Jump Game 系列 VI