谷粒商城のRabbitMQ高级篇最终一致性解决方案。
文章目录
- 前言
- 一、延迟&死信队列
- 1、死信队列
- 2、延迟队列
- 二、库存解锁的分布式事务最终一致性
- 2.1、队列架构设计
- 2、业务设计
- 三、订单关单的分布式事务最终一致性
- 2.1、队列架构设计
- 2、业务设计
- 3、消息丢失、积压、重复消费解决方案
- 3.1、消息丢失
- 3.2、消息重复
- 3.3、消息积压
前言
本篇介绍RabbitMQ的延迟队列
,死信队列
,以及利用这些实现订单关单,库存解锁的分布式事务最终一致性解决方案。
对应视频P292-P300
一、延迟&死信队列
1、死信队列
死信队列是一种处理"不合格"消息的机制,当消息在原队列中无法被正常消费时,会被转发到另一个队列(死信队列)供后续分析或处理。
死信产生的原因:
- 启动手动ACK时,消息被拒收,并且没有重新放回队列。
- 队列设置了TTL,消息到期后仍然没有被消费。
- 队列中的消息达到了最大长度,最早入队的消息会被移除。
死信队列本质上就是普通队列,在构造时的Map<String, Object> arguments
参数传递了x-dead-letter-exchange
(死信交换机),x-dead-letter-routing-key
(死信路由键),该队列中的消息一旦满足上面的条件,就会根据路由键将消息发到指定的交换机上。
2、延迟队列
延迟队列用于将消息延迟一段时间后再投递到指定的目标队列。其实现方式通常是通过死信队列
结合TTL过期时间。需要在定义死信队列时,Map<String, Object> arguments
参数额外传递一个x-message-ttl
(过期时间)。如果需要设计一套延迟队列,通常如下:
二、库存解锁的分布式事务最终一致性
在前篇中提到,单体事务的@Transcational
注解无法控制远程调用的服务的回滚,同时使用seata的AT模式对于下单这样的高并发场景性能损失大。根据BASE理论的最终一致性,可以有如下的方案:
2.1、队列架构设计
需要定义两个队列,一个交换机,两个绑定关系:
@Configuration
public class MyMQConfig {
// /**
// * 队列,交换机是懒加载的,只有第一次监听消息发现不存在的时候才会创建
// * @param message
// * @param channel
// */
// @RabbitListener
// public void listener(Message message, Channel channel){
//
// }
/**
* 创建交换机
* @return
*/
@Bean
public Exchange stockEventExchange() {
return new TopicExchange("stock-event-exchange", true, false);
}
/**
* 延迟队列 50分钟
* @return
*/
@Bean
public Queue stockDelayQueue() {
HashMap<String, Object> map = new HashMap<>();
//死信交换机
map.put("x-dead-letter-exchange", "stock-event-exchange");
//路由键
map.put("x-dead-letter-routing-key", "stock.release");
//过期时间
map.put("x-message-ttl", 120000);
return new Queue("stock.delay.queue", true, false, false, map);
}
/**
* 解锁库存消息队列
* @return
*/
@Bean
public Queue stockReleaseQueue() {
return new Queue("stock.release.stock.queue", true, false, false);
}
/**
* 将延迟队列绑定到交换机
* @return
*/
@Bean
public Binding delayQueueToExchange() {
return new Binding("stock.delay.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.locked",
null);
}
/**
* 解锁库存消息队列绑定到交换机
* @return
*/
@Bean
public Binding stockReleaseToExchange() {
return new Binding(
"stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.release.#",
null
);
}
}
2、业务设计
根据上面的绑定关系:
- 库存服务在锁定库存后,向
stock-event-exchange
中使用stock.locked
路由键发送一条消息。 - 交换机根据路由键找到
stock.delay.queue
队列。 stock.delay.queue
队列中的消息在40分钟后到期,带着stock.release
路由键重新回到交换机。- 交换机根据路由键,将消息转发到
stock.release.stock.queue
队列。 - 库存服务监听
stock.release.stock.queue
队列。
库存服务监听到stock.release.stock.queue
队列的消息后,应该进行判断:
- 40分钟前锁定的库存工作单是否还存在?如果不存在,说明是库存服务出现问题,库存和订单都进行了回滚,此时无需解锁库存。
- 根据orderSn查询订单表,可能会有两种情况:
1. 订单表不存在,说明是下单和扣库存之后又出现了异常,订单回滚了,库存没有回滚,需要解锁库存。
2. 订单表存在,就要判定状态,如果状态是已取消
才需要解锁库存。
业务实现关键代码:
类上加@RabbitListener(queues = "stock.release.stock.queue")
注解,监听指定的队列。
@Override
@Transactional(rollbackFor = Exception.class)
public void stockNum(StockLockVO vo) {
log.info("Seata全局事务id=================>{}", RootContext.getXID());
String orderSn = vo.getOrderSn();
List<CartItem> cartItems = vo.getCartItems();
ArrayList<HasStock> hasStocks = new ArrayList<>();
//锁定库存之前先创建一个工作单
WareOrderTaskEntity wareOrderTaskEntity = new WareOrderTaskEntity();
wareOrderTaskEntity.setOrderSn(orderSn);
wareOrderTaskDao.insert(wareOrderTaskEntity);
for (CartItem cartItem : cartItems) {
HasStock hasStock = new HasStock();
Long skuId = cartItem.getSkuId();
//找到这个skuId在哪个仓库有库存
List<Long> wareIds = wareSkuDao.selectWareIdBySkuId(skuId);
hasStock.setSkuId(skuId);
hasStock.setWareIds(wareIds);
hasStock.setCount(cartItem.getCount());
hasStocks.add(hasStock);
}
for (HasStock hasStock : hasStocks) {
boolean singleLocked = false;
Long skuId = hasStock.getSkuId();
Integer count = hasStock.getCount();
List<Long> wareIds = hasStock.getWareIds();
if (CollectionUtils.isEmpty(wareIds)) {
throw new NoStockException(skuId);
}
//依次锁定库存,这个仓库库存锁定完了,还需要继续锁定,就锁定下一个仓库的
for (Long wareId : wareIds) {
int resCount = wareSkuDao.stockLock(skuId, wareId, count);
if (resCount > 0) {
singleLocked = true;
//锁定成功,创建库存工作单详情
WareOrderTaskDetailEntity wareOrderTaskDetailEntity = new WareOrderTaskDetailEntity();
wareOrderTaskDetailEntity.setWareId(wareId);
wareOrderTaskDetailEntity.setTaskId(wareOrderTaskEntity.getId());
wareOrderTaskDetailEntity.setSkuNum(hasStock.getCount());
wareOrderTaskDetailEntity.setSkuId(skuId);
wareOrderTaskDetailEntity.setLockStatus(1);
wareOrderTaskDetailDao.insert(wareOrderTaskDetailEntity);
//向消息队列发送延迟消息 用于后续判断是否需要解锁库存
StockLockedTO stockLockedTO = new StockLockedTO();
stockLockedTO.setId(wareOrderTaskEntity.getId());
stockLockedTO.setDetailId(wareOrderTaskDetailEntity.getId());
rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", stockLockedTO);
break;
}
}
//某个skuId所有仓库都没有锁住
if (!singleLocked) {
throw new NoStockException(skuId);
}
}
}
监听stock.release.stock.queue
队列,进行库存解锁的逻辑:
/**
* 监听stock.release.stock.queue队列,进行库存解锁
*/
@RabbitHandler
public void stockRelease(StockLockedTO stockLockedTO, Message message, Channel channel) throws IOException {
log.info("订单已关闭,准备被动解锁库存");
Long id = stockLockedTO.getId();
Long detailId = stockLockedTO.getDetailId();
//首先用上面两个id查询wareOrderTaskDao和wareOrderTaskDetailDao
//如果两张表都不存在,说明是锁库存的时候就报错了,不需要进行处理,订单也跟着回滚了
WareOrderTaskEntity wareOrderTaskEntity = wareOrderTaskDao.selectById(id);
WareOrderTaskDetailEntity wareOrderTaskDetailEntity = wareOrderTaskDetailDao.selectById(detailId);
if (!ObjectUtils.isEmpty(wareOrderTaskDetailEntity) && !ObjectUtils.isEmpty(wareOrderTaskEntity)) {
//根据orderSn查询订单表(远程调用订单服务),如果订单表不存在,说明是下单和扣库存之后又出现了异常,订单回滚了,这里也要解锁库存
OrderEntity order = null;
try {
order = orderRemotesServiceClient.getOrder(wareOrderTaskEntity.getOrderSn());
} catch (Exception e) {
//远程调用失败,拒收消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
Long wareId = wareOrderTaskDetailEntity.getWareId();
Long skuId = wareOrderTaskDetailEntity.getSkuId();
Integer skuNum = wareOrderTaskDetailEntity.getSkuNum();
if (ObjectUtils.isEmpty(order)) {
//必须解锁库存
this.doStockRelease(wareId, skuId, skuNum,detailId);
}
if (order.getStatus() == 4) {
this.doStockRelease(wareId, skuId, skuNum,detailId);
}
//手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
// //如果两张表都不存在,说明是锁库存的时候就报错了,不需要进行处理,订单也跟着回滚了
else {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
三、订单关单的分布式事务最终一致性
2.1、队列架构设计
需要定义两个队列,一个交换机,三个绑定关系
这一套的绑定关系中,与库存解锁最大的不同在于,订单关单后,还需要通过交换机主动给库存服务的stock.release.stock.queue
队列发送一条消息作为兜底,主动要求库存服务判断是否需要关单。
原因在于,最初的设计中,订单延迟30分钟关单,库存服务延迟40分钟判断是否需要解锁库存。而订单服务因为各种原因,没能及时消费消息修改订单状态,此时库存解锁就先于关单执行,去查订单的状态还是新建状态,库存服务就不会解锁,并且将消息消费完成。订单服务在这个时候去改状态,库存就无法再次解锁了。
@Configuration
public class MyMQConfig {
/**
* 创建交换机
* @return
*/
@Bean
public Exchange orderEventExchange() {
return new TopicExchange("order-event-exchange", true, false);
}
/**
* 定义死信队列,消息到达三十分钟后重新回到order-event-exchange交换机
* @return
*/
@Bean
public Queue orderDelayQueue() {
HashMap<String, Object> map = new HashMap<>();
//死信交换机
map.put("x-dead-letter-exchange", "order-event-exchange");
//路由键
map.put("x-dead-letter-routing-key", "order.release.order");
//过期时间
map.put("x-message-ttl", 60000);
return new Queue("order.delay.queue", true, false, false,map);
}
/**
* 定义订单释放队列
* @return
*/
@Bean
public Queue orderReleaseOrderQueue() {
return new Queue("order.release.order.queue", true, false, false);
}
/**
* 将order.delay.queue队列和order-event-exchange交换机绑定,路由键order.create.order
* @return
*/
@Bean
public Binding orderCreateOrderBinding() {
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",
null);
}
/**
* 将order.release.order.queue队列和order-event-exchange交换机绑定,路由键order.release.order
* @return
*/
@Bean
public Binding orderReleaseOrderBinding() {
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
}
/**
* 用于订单关单后,将消息通过order-event-exchange,路由键order.release.other.#,发送给stock.release.order.queue
* @return
*/
@Bean
public Binding orderReleaseOtherOrderBinding() {
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.other.#",
null
);
}
}
2、业务设计
根据上面的绑定关系:
- 订单服务下单完成后,向
order-event-exchange
中使用order.created.order
路由键发送一条消息。 - 交换机根据路由键找到
order.delay.queue
队列。 order.delay.queue
队列中的消息在40分钟后到期,带着order.release.order
路由键重新回到交换机。- 交换机根据路由键,将消息转发到
order.release.order.queue
队列。 - 订单服务监听
order.release.order.queue
队列。 - 订单服务进行判断是否需要关单,如果需要关单,关单完成后再次向
order-event-exchange
使用order.release.other.#
路由键发送一条消息。 - 库存服务监听
stock.release.stock.queue
队列,再次判断是否应该解锁库存。
订单服务关键代码:
类上加@RabbitListener(queues = "order.release.order.queue")
注解,监听指定队列。
/**
* 提交订单
* @param dto 前端传递的订单信息
* @return
*/
@Override
@Transactional(rollbackFor = Exception.class)//这里的事务仅仅能保证订单数据入表,不能保证远程查库存
// @GlobalTransactional
public SubmitOrderResponseVO submitOrder(SubmitOrderDTO dto) {
//... 创建订单,锁定库存业务
//发送消息到order-event-exchange交换机 30分钟后从order.release.order.queue 队列中取消息,判断状态,是否需要关单
this.releaseOrder(vo.getOrder());
return vo;
}
private void releaseOrder(OrderEntity order) {
rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", order);
}
@Override
public void doReleaseOrder(OrderEntity order) {
Integer status = order.getStatus();
//需要关单
if (Objects.equals(status, OrderStatusEnum.CREATE_NEW.getCode())) {
OrderEntity orderEntityForUpd = new OrderEntity();
orderEntityForUpd.setId(order.getId());
orderEntityForUpd.setStatus(OrderStatusEnum.CANCLED.getCode());
updateById(orderEntityForUpd);
//order-event-exchange发消息到 stock.release.order.queue 库存服务监听
//为了防止一种极端情况,就是订单关单由于系统卡顿,一直无法进行关单,库存消息优先到期,判断状态一直解锁不了库存
//所以在关单之后主动给库存服务发一个消息进行兜底
rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", order);
}
}
这里写一个类专门去监听消息:
@Slf4j
@Component
@RabbitListener(queues = "order.release.order.queue")
public class MyRabbitListener {
@Autowired
private OrderService orderService;
@RabbitHandler
public void releaseOrder(OrderEntity order, Message message, Channel channel) throws IOException {
log.info("接收到关单请求,订单号:{}",order.getOrderSn());
try {
orderService.doReleaseOrder(order);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}
库存服务中,进行处理:
/**
* 监听 关单后 主动发送给队列的消息
* @param order
* @param message
* @param channel
* @throws IOException
*/
@RabbitHandler
public void stockRelease(OrderEntity order, Message message, Channel channel) throws IOException {
log.info("订单已关闭,准备主动解锁库存");
try {
//再次判断工作单详情表的状态,筛选出是1的订单进行关单
String orderSn = order.getOrderSn();
WareOrderTaskEntity wareOrderTaskEntity = wareOrderTaskDao.selectOne(new QueryWrapper<WareOrderTaskEntity>().eq("order_sn", orderSn));
Long id = wareOrderTaskEntity.getId();
List<WareOrderTaskDetailEntity> wareOrderTaskDetailEntities = wareOrderTaskDetailDao.selectList(new QueryWrapper<WareOrderTaskDetailEntity>().eq("task_id", id));
List<WareOrderTaskDetailEntity> collect = wareOrderTaskDetailEntities.stream().filter(wareOrderTaskDetailEntity -> wareOrderTaskDetailEntity.getLockStatus() == 1).collect(Collectors.toList());
if (!CollectionUtils.isEmpty(collect)) {
for (WareOrderTaskDetailEntity wareOrderTaskDetailEntity : collect) {
//解锁
doStockRelease(wareOrderTaskDetailEntity.getWareId(), wareOrderTaskDetailEntity.getSkuId(), wareOrderTaskDetailEntity.getSkuNum(),id);
}
}
//手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
//重新放回队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
3、消息丢失、积压、重复消费解决方案
3.1、消息丢失
关于消息丢失,在RabbitMQ基础篇中已经提到,可以通过回调函数,以及手动ACK的方式解决。除此之外,在进行业务代码编写时,还需要加入重试机制,利用try-catch捕获异常,拒绝签收并重新放回队列。但是不应该无限重试。合理的做法是创建一个消息日志表,记录消息的信息和状态,并且定期扫描数据库对未发送成功的消息进行重发。
3.2、消息重复
可能会存在这样一种情况,消息已经成功消费,并执行了业务逻辑,在手动ACK的时候系统宕机或出现了异常(执行业务代码和手动ACK不是原子性操作,分为了两步。),这时消息会重新回到ready状态,重新投递。这样就类似于重复提交,可以在业务代码中加上状态位条件校验,或者每条消息附带一个唯一标识(messageId),在处理前检查该消息是否已经处理过,避免重复操作。
3.3、消息积压
RabbitMQ队列中的消息堆积过多,消费者无法及时处理,导致系统性能下降或消息延迟。如果业务允许,可以对消息进行批量处理(Batch Processing),一次消费多条消息,减少每条消息的处理开销。同时对于不重要或时效性强的消息,如果积压严重,可以设置消息的TTL或使用死信队列,将过期或处理失败的消息转移或丢弃。也可以增加消费者实例来并行处理消息,缓解单个消费者处理慢的问题,或对于耗时较长的操作,可以将其改为异步处理,快速从队列中取出消息,避免队列积压。
下一篇:秒杀服务。