分布式并发场景的核心问题与解决方案
文章目录
- 分布式并发场景的核心问题与解决方案
- 一、核心问题分析
- 1. 分布式事务问题
- 2. 数据一致性问题
- 3. 并发控制问题
- 4. 分布式锁失效问题
- 二、解决方案
- 1. 分布式事务解决方案
- 1.1 可靠消息最终一致性方案
- 1.2 TCC方案实现
- 2. 缓存一致性解决方案
- 2.1 延迟双删策略
- 2.2 Canal方案
- 3. 并发控制解决方案
- 3.1 基于Redis的原子操作
- 3.2 防重复提交
- 三、系统监控与告警
- 1. 分布式链路追踪
- 2. 监控指标收集
- 四、最佳实践建议
- 五、注意事项
分布式并发场景的核心问题与解决方案
一、核心问题分析
1. 分布式事务问题
在分布式环境下,一个业务操作可能横跨多个服务,比如创建订单时涉及:
- 订单服务:创建订单
- 库存服务:扣减库存
- 支付服务:冻结余额
- 积分服务:赠送积分
可能出现的问题:
- 部分服务成功,部分服务失败
- 网络超时导致事务状态不确定
- 服务宕机导致事务中断
2. 数据一致性问题
在分布式系统中,由于CAP理论的限制,我们通常需要在一致性和可用性之间做出选择。
典型场景:
- 主从数据库的数据同步延迟
- 分布式缓存的数据一致性
- 跨服务的数据依赖
3. 并发控制问题
多个节点同时处理请求时的并发控制:
- 超卖问题
- 重复下单
- 数据竞争
4. 分布式锁失效问题
- Redis主从切换导致锁失效
- 时钟不同步导致的锁判断错误
- 网络分区导致的锁状态不一致
二、解决方案
1. 分布式事务解决方案
1.1 可靠消息最终一致性方案
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private OrderMapper orderMapper;
@Autowired
private MessageMapper messageMapper;
@Transactional(rollbackFor = Exception.class)
public void createOrder(OrderDTO orderDTO) {
// 1. 事务消息表记录
TransactionMessage message = new TransactionMessage();
message.setMessageId(UUID.randomUUID().toString());
message.setMessage(JSON.toJSONString(orderDTO));
message.setStatus(MessageStatus.PREPARING);
messageMapper.insert(message);
// 2. 创建订单
Order order = convertToOrder(orderDTO);
orderMapper.insert(order);
// 3. 发送事务消息
sendTransactionMessage(message);
}
private void sendTransactionMessage(TransactionMessage message) {
Message msg = MessageBuilder.withPayload(message)
.setHeader("messageId", message.getMessageId())
.build();
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"ORDER_TOPIC",
msg,
null
);
if (result.getLocalTransactionState() != LocalTransactionState.COMMIT_MESSAGE) {
throw new BusinessException("发送事务消息失败");
}
}
}
消息消费者实现:
@Component
@RocketMQMessageListener(
topic = "ORDER_TOPIC",
consumerGroup = "order-consumer-group"
)
public class OrderMessageListener implements RocketMQListener<Message> {
@Autowired
private StockService stockService;
@Autowired
private MessageMapper messageMapper;
@Override
public void onMessage(Message message) {
String messageId = message.getHeaders().get("messageId", String.class);
// 1. 检查消息是否已处理
if (messageMapper.checkProcessed(messageId)) {
return;
}
try {
// 2. 处理业务逻辑
TransactionMessage txMessage = JSON.parseObject(
new String((byte[]) message.getPayload()),
TransactionMessage.class
);
OrderDTO orderDTO = JSON.parseObject(
txMessage.getMessage(),
OrderDTO.class
);
// 3. 扣减库存
stockService.decreaseStock(orderDTO.getProductId(), orderDTO.getQuantity());
// 4. 更新消息状态
messageMapper.markAsProcessed(messageId);
} catch (Exception e) {
// 5. 失败处理
messageMapper.markAsFailed(messageId, e.getMessage());
// 根据业务需求决定是否抛出异常重试
throw e;
}
}
}
1.2 TCC方案实现
@Service
public class OrderTccServiceImpl implements OrderTccService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private StockTccService stockTccService;
@Autowired
private PaymentTccService paymentTccService;
@GlobalTransactional
public void createOrder(OrderDTO orderDTO) {
// 1. Try阶段
// 1.1 订单服务Try
Order order = prepareTryOrder(orderDTO);
// 1.2 库存服务Try
stockTccService.tryDecrease(
orderDTO.getProductId(),
orderDTO.getQuantity()
);
// 1.3 支付服务Try
paymentTccService.tryFreeze(
orderDTO.getUserId(),
orderDTO.getAmount()
);
}
// Try阶段的订单处理
private Order prepareTryOrder(OrderDTO orderDTO) {
Order order = convertToOrder(orderDTO);
order.setStatus(OrderStatus.TRY);
orderMapper.insert(order);
return order;
}
// Confirm阶段的订单处理
public void confirmOrder(BusinessActionContext context) {
String orderId = context.getActionContext("orderId").toString();
Order order = orderMapper.selectById(orderId);
order.setStatus(OrderStatus.CONFIRMED);
orderMapper.updateById(order);
}
// Cancel阶段的订单处理
public void cancelOrder(BusinessActionContext context) {
String orderId = context.getActionContext("orderId").toString();
Order order = orderMapper.selectById(orderId);
order.setStatus(OrderStatus.CANCELED);
orderMapper.updateById(order);
}
}
2. 缓存一致性解决方案
2.1 延迟双删策略
@Service
public class ProductServiceImpl implements ProductService {
@Autowired
private ProductMapper productMapper;
@Autowired
private RedisTemplate<String, Product> redisTemplate;
@Autowired
private ThreadPoolExecutor threadPoolExecutor;
private static final String PRODUCT_CACHE_KEY = "product:";
private static final long DELAY_DELETE_TIME = 1000; // 1秒
@Transactional(rollbackFor = Exception.class)
public void updateProduct(Product product) {
// 1. 删除缓存
String cacheKey = PRODUCT_CACHE_KEY + product.getId();
redisTemplate.delete(cacheKey);
// 2. 更新数据库
productMapper.updateById(product);
// 3. 延迟双删
threadPoolExecutor.execute(() -> {
try {
Thread.sleep(DELAY_DELETE_TIME);
redisTemplate.delete(cacheKey);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("延迟双删失败", e);
}
});
}
}
2.2 Canal方案
@Component
public class ProductCanalClient {
@Autowired
private RedisTemplate<String, Product> redisTemplate;
@Listen(table = "product")
public void handleProductChange(CanalEntry.Entry entry) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
CanalEntry.RowChange rowChange = entry.getRowChange();
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
// 处理更新事件
handleProductUpdate(rowData);
} else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
// 处理删除事件
handleProductDelete(rowData);
}
}
}
}
private void handleProductUpdate(CanalEntry.RowData rowData) {
// 解析变更数据
Map<String, String> data = parseRowData(rowData.getAfterColumnsList());
String productId = data.get("id");
// 更新缓存
String cacheKey = "product:" + productId;
Product product = convertToProduct(data);
redisTemplate.opsForValue().set(cacheKey, product);
}
}
3. 并发控制解决方案
3.1 基于Redis的原子操作
@Service
public class StockService {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String STOCK_KEY = "product:stock:";
public boolean decreaseStock(Long productId, Integer quantity) {
String key = STOCK_KEY + productId;
// Lua脚本保证原子性
String script = "local stock = redis.call('get', KEYS[1]) " +
"if stock and tonumber(stock) >= tonumber(ARGV[1]) then " +
" return redis.call('decrby', KEYS[1], ARGV[1]) " +
"end " +
"return -1";
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);
Long result = redisTemplate.execute(
redisScript,
Collections.singletonList(key),
quantity.toString()
);
return result != null && result >= 0;
}
}
3.2 防重复提交
@Aspect
@Component
public class RepeatSubmitAspect {
@Autowired
private StringRedisTemplate redisTemplate;
@Around("@annotation(repeatSubmit)")
public Object around(ProceedingJoinPoint joinPoint, RepeatSubmit repeatSubmit) throws Throwable {
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder
.getRequestAttributes()).getRequest();
String token = request.getHeader("token");
String key = getRepeatSubmitKey(joinPoint, token);
// 使用Redis的setIfAbsent实现防重
boolean isNotRepeat = redisTemplate.opsForValue().setIfAbsent(
key,
"1",
repeatSubmit.interval(),
TimeUnit.MILLISECONDS
);
if (!isNotRepeat) {
throw new BusinessException("请勿重复提交");
}
return joinPoint.proceed();
}
}
三、系统监控与告警
1. 分布式链路追踪
@Configuration
public class SleuthConfig {
@Bean
public Sampler defaultSampler() {
return Sampler.ALWAYS_SAMPLE;
}
}
2. 监控指标收集
@Component
public class DistributedMetrics {
@Autowired
private MeterRegistry registry;
// 记录分布式锁获取情况
private Counter lockCounter;
private Timer lockTimer;
@PostConstruct
public void init() {
lockCounter = registry.counter("distributed.lock.acquire");
lockTimer = registry.timer("distributed.lock.time");
}
public void recordLockAcquire(String lockKey, boolean success) {
lockCounter.increment();
Tags tags = Tags.of(
"lock_key", lockKey,
"success", String.valueOf(success)
);
registry.counter("distributed.lock.acquire", tags).increment();
}
public void recordLockTime(String lockKey, long timeMillis) {
lockTimer.record(timeMillis, TimeUnit.MILLISECONDS);
}
}
四、最佳实践建议
-
业务设计层面:
- 尽量避免复杂分布式事务
- 考虑业务可补偿性
- 合理设计重试机制
-
技术选型层面:
- 优先考虑消息队列解耦
- 合理使用缓存
- 选择合适的分布式事务方案
-
监控运维层面:
- 完善的监控系统
- 合理的告警阈值
- 灾难恢复预案
-
性能优化层面:
- 合理的数据分片策略
- 避免长事务
- 批量处理优化
五、注意事项
-
数据库层面:
- 避免大事务
- 合理设计索引
- 注意死锁问题
-
缓存层面:
- 防止缓存雪崩
- 注意缓存穿透
- 合理设置过期时间
-
消息队列层面:
- 保证消息可靠性
- 处理重复消息
- 注意消息顺序性
-
分布式锁层面:
- 防止锁失效
- 避免死锁
- 合理设置超时时间