当前位置: 首页 > article >正文

谷粒商城のRabbitMQ高级篇最终一致性解决方案。

文章目录

  • 前言
  • 一、延迟&死信队列
    • 1、死信队列
    • 2、延迟队列
  • 二、库存解锁的分布式事务最终一致性
    • 2.1、队列架构设计
    • 2、业务设计
  • 三、订单关单的分布式事务最终一致性
    • 2.1、队列架构设计
    • 2、业务设计
  • 3、消息丢失、积压、重复消费解决方案
    • 3.1、消息丢失
    • 3.2、消息重复
    • 3.3、消息积压


前言

  本篇介绍RabbitMQ的延迟队列死信队列,以及利用这些实现订单关单,库存解锁的分布式事务最终一致性解决方案。
  对应视频P292-P300

一、延迟&死信队列

1、死信队列

  死信队列是一种处理"不合格"消息的机制,当消息在原队列中无法被正常消费时,会被转发到另一个队列(死信队列)供后续分析或处理。
  死信产生的原因:

  • 启动手动ACK时,消息被拒收,并且没有重新放回队列。
  • 队列设置了TTL,消息到期后仍然没有被消费。
  • 队列中的消息达到了最大长度,最早入队的消息会被移除。

  死信队列本质上就是普通队列,在构造时的Map<String, Object> arguments参数传递了x-dead-letter-exchange(死信交换机),x-dead-letter-routing-key(死信路由键),该队列中的消息一旦满足上面的条件,就会根据路由键将消息发到指定的交换机上。
在这里插入图片描述

2、延迟队列

  延迟队列用于将消息延迟一段时间后再投递到指定的目标队列。其实现方式通常是通过死信队列结合TTL过期时间。需要在定义死信队列时,Map<String, Object> arguments参数额外传递一个x-message-ttl(过期时间)。如果需要设计一套延迟队列,通常如下:
在这里插入图片描述

二、库存解锁的分布式事务最终一致性

  在前篇中提到,单体事务的@Transcational注解无法控制远程调用的服务的回滚,同时使用seata的AT模式对于下单这样的高并发场景性能损失大。根据BASE理论的最终一致性,可以有如下的方案:

2.1、队列架构设计

在这里插入图片描述
  需要定义两个队列,一个交换机,两个绑定关系:

@Configuration
public class MyMQConfig {

//    /**
//     * 队列,交换机是懒加载的,只有第一次监听消息发现不存在的时候才会创建
//     * @param message
//     * @param channel
//     */
//    @RabbitListener
//    public void listener(Message message, Channel channel){
//
//    }

    /**
     * 创建交换机
     * @return
     */
    @Bean
    public Exchange stockEventExchange() {
        return new TopicExchange("stock-event-exchange", true, false);
    }

    /**
     * 延迟队列 50分钟
     * @return
     */
    @Bean
    public Queue stockDelayQueue() {
        HashMap<String, Object> map = new HashMap<>();
        //死信交换机
        map.put("x-dead-letter-exchange", "stock-event-exchange");
        //路由键
        map.put("x-dead-letter-routing-key", "stock.release");
        //过期时间
        map.put("x-message-ttl", 120000);
        return new Queue("stock.delay.queue", true, false, false, map);
    }

    /**
     * 解锁库存消息队列
     * @return
     */
    @Bean
    public Queue stockReleaseQueue() {
        return new Queue("stock.release.stock.queue", true, false, false);
    }

    /**
     * 将延迟队列绑定到交换机
     * @return
     */
    @Bean
    public Binding delayQueueToExchange() {
        return new Binding("stock.delay.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.locked",
                null);
    }

    /**
     * 解锁库存消息队列绑定到交换机
     * @return
     */
    @Bean
    public Binding stockReleaseToExchange() {
        return new Binding(
                "stock.release.stock.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.release.#",
                null
        );
    }
}

2、业务设计

  根据上面的绑定关系:

  1. 库存服务在锁定库存后,向stock-event-exchange中使用stock.locked路由键发送一条消息。
  2. 交换机根据路由键找到stock.delay.queue队列。
  3. stock.delay.queue队列中的消息在40分钟后到期,带着stock.release路由键重新回到交换机。
  4. 交换机根据路由键,将消息转发到stock.release.stock.queue队列。
  5. 库存服务监听stock.release.stock.queue队列。

  库存服务监听到stock.release.stock.queue队列的消息后,应该进行判断:

  • 40分钟前锁定的库存工作单是否还存在?如果不存在,说明是库存服务出现问题,库存和订单都进行了回滚,此时无需解锁库存。
  • 根据orderSn查询订单表,可能会有两种情况:
      1. 订单表不存在,说明是下单和扣库存之后又出现了异常,订单回滚了,库存没有回滚,需要解锁库存。
      2. 订单表存在,就要判定状态,如果状态是已取消才需要解锁库存。

  业务实现关键代码:
  类上加@RabbitListener(queues = "stock.release.stock.queue")注解,监听指定的队列。

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void stockNum(StockLockVO vo) {
        log.info("Seata全局事务id=================>{}", RootContext.getXID());
        String orderSn = vo.getOrderSn();
        List<CartItem> cartItems = vo.getCartItems();
        ArrayList<HasStock> hasStocks = new ArrayList<>();

        //锁定库存之前先创建一个工作单
        WareOrderTaskEntity wareOrderTaskEntity = new WareOrderTaskEntity();
        wareOrderTaskEntity.setOrderSn(orderSn);
        wareOrderTaskDao.insert(wareOrderTaskEntity);

        for (CartItem cartItem : cartItems) {
            HasStock hasStock = new HasStock();
            Long skuId = cartItem.getSkuId();
            //找到这个skuId在哪个仓库有库存
            List<Long> wareIds = wareSkuDao.selectWareIdBySkuId(skuId);
            hasStock.setSkuId(skuId);
            hasStock.setWareIds(wareIds);
            hasStock.setCount(cartItem.getCount());
            hasStocks.add(hasStock);
        }

        for (HasStock hasStock : hasStocks) {
            boolean singleLocked = false;
            Long skuId = hasStock.getSkuId();
            Integer count = hasStock.getCount();
            List<Long> wareIds = hasStock.getWareIds();
            if (CollectionUtils.isEmpty(wareIds)) {
                throw new NoStockException(skuId);
            }
            //依次锁定库存,这个仓库库存锁定完了,还需要继续锁定,就锁定下一个仓库的
            for (Long wareId : wareIds) {
                int resCount = wareSkuDao.stockLock(skuId, wareId, count);
                if (resCount > 0) {
                    singleLocked = true;
                    //锁定成功,创建库存工作单详情
                    WareOrderTaskDetailEntity wareOrderTaskDetailEntity = new WareOrderTaskDetailEntity();
                    wareOrderTaskDetailEntity.setWareId(wareId);
                    wareOrderTaskDetailEntity.setTaskId(wareOrderTaskEntity.getId());
                    wareOrderTaskDetailEntity.setSkuNum(hasStock.getCount());
                    wareOrderTaskDetailEntity.setSkuId(skuId);
                    wareOrderTaskDetailEntity.setLockStatus(1);
                    wareOrderTaskDetailDao.insert(wareOrderTaskDetailEntity);
                    //向消息队列发送延迟消息 用于后续判断是否需要解锁库存
                    StockLockedTO stockLockedTO = new StockLockedTO();
                    stockLockedTO.setId(wareOrderTaskEntity.getId());
                    stockLockedTO.setDetailId(wareOrderTaskDetailEntity.getId());
                    rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", stockLockedTO);
                    break;
                }
            }
            //某个skuId所有仓库都没有锁住
            if (!singleLocked) {
                throw new NoStockException(skuId);
            }
        }
    }

  监听stock.release.stock.queue队列,进行库存解锁的逻辑:

    /**
     * 监听stock.release.stock.queue队列,进行库存解锁
     */
    @RabbitHandler
    public void stockRelease(StockLockedTO stockLockedTO, Message message, Channel channel) throws IOException {
        log.info("订单已关闭,准备被动解锁库存");
        Long id = stockLockedTO.getId();
        Long detailId = stockLockedTO.getDetailId();
        //首先用上面两个id查询wareOrderTaskDao和wareOrderTaskDetailDao
        //如果两张表都不存在,说明是锁库存的时候就报错了,不需要进行处理,订单也跟着回滚了
        WareOrderTaskEntity wareOrderTaskEntity = wareOrderTaskDao.selectById(id);
        WareOrderTaskDetailEntity wareOrderTaskDetailEntity = wareOrderTaskDetailDao.selectById(detailId);
        if (!ObjectUtils.isEmpty(wareOrderTaskDetailEntity) && !ObjectUtils.isEmpty(wareOrderTaskEntity)) {
            //根据orderSn查询订单表(远程调用订单服务),如果订单表不存在,说明是下单和扣库存之后又出现了异常,订单回滚了,这里也要解锁库存
            OrderEntity order = null;
            try {
                order = orderRemotesServiceClient.getOrder(wareOrderTaskEntity.getOrderSn());
            } catch (Exception e) {
                //远程调用失败,拒收消息
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
            }
            Long wareId = wareOrderTaskDetailEntity.getWareId();
            Long skuId = wareOrderTaskDetailEntity.getSkuId();
            Integer skuNum = wareOrderTaskDetailEntity.getSkuNum();
            if (ObjectUtils.isEmpty(order)) {
                //必须解锁库存
                this.doStockRelease(wareId, skuId, skuNum,detailId);
            }
            if (order.getStatus() == 4) {
                this.doStockRelease(wareId, skuId, skuNum,detailId);
            }
            //手动ack
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
        //  //如果两张表都不存在,说明是锁库存的时候就报错了,不需要进行处理,订单也跟着回滚了
        else {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

三、订单关单的分布式事务最终一致性

2.1、队列架构设计

  需要定义两个队列,一个交换机,三个绑定关系在这里插入图片描述

  这一套的绑定关系中,与库存解锁最大的不同在于,订单关单后,还需要通过交换机主动给库存服务的stock.release.stock.queue
队列发送一条消息作为兜底,主动要求库存服务判断是否需要关单。
  原因在于,最初的设计中,订单延迟30分钟关单,库存服务延迟40分钟判断是否需要解锁库存。而订单服务因为各种原因,没能及时消费消息修改订单状态,此时库存解锁就先于关单执行,去查订单的状态还是新建状态,库存服务就不会解锁,并且将消息消费完成。订单服务在这个时候去改状态,库存就无法再次解锁了。

@Configuration
public class MyMQConfig {


    /**
     * 创建交换机
     * @return
     */
    @Bean
    public Exchange orderEventExchange() {
        return new TopicExchange("order-event-exchange", true, false);
    }

    /**
     * 定义死信队列,消息到达三十分钟后重新回到order-event-exchange交换机
     * @return
     */
    @Bean
    public Queue orderDelayQueue() {
        HashMap<String, Object> map = new HashMap<>();
        //死信交换机
        map.put("x-dead-letter-exchange", "order-event-exchange");
        //路由键
        map.put("x-dead-letter-routing-key", "order.release.order");
        //过期时间
        map.put("x-message-ttl", 60000);
        return new Queue("order.delay.queue", true, false, false,map);
    }

    /**
     * 定义订单释放队列
     * @return
     */
    @Bean
    public Queue orderReleaseOrderQueue() {
        return new Queue("order.release.order.queue", true, false, false);
    }

    /**
     * 将order.delay.queue队列和order-event-exchange交换机绑定,路由键order.create.order
     * @return
     */
    @Bean
    public Binding orderCreateOrderBinding() {
        return new Binding("order.delay.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.create.order",
                null);
    }

    /**
     * 将order.release.order.queue队列和order-event-exchange交换机绑定,路由键order.release.order
     * @return
     */
    @Bean
    public Binding orderReleaseOrderBinding() {
        return new Binding("order.release.order.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.order",
                null);
    }

    /**
     * 用于订单关单后,将消息通过order-event-exchange,路由键order.release.other.#,发送给stock.release.order.queue
     * @return
     */
    @Bean
    public Binding orderReleaseOtherOrderBinding() {
        return new Binding("stock.release.stock.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.other.#",
                null
                );
    }
}

2、业务设计

  根据上面的绑定关系:

  1. 订单服务下单完成后,向order-event-exchange中使用order.created.order路由键发送一条消息。
  2. 交换机根据路由键找到order.delay.queue队列。
  3. order.delay.queue队列中的消息在40分钟后到期,带着order.release.order路由键重新回到交换机。
  4. 交换机根据路由键,将消息转发到order.release.order.queue队列。
  5. 订单服务监听order.release.order.queue队列。
  6. 订单服务进行判断是否需要关单,如果需要关单,关单完成后再次向order-event-exchange使用order.release.other.#路由键发送一条消息。
  7. 库存服务监听stock.release.stock.queue队列,再次判断是否应该解锁库存。

  订单服务关键代码:
  类上加@RabbitListener(queues = "order.release.order.queue")注解,监听指定队列。

    /**
     * 提交订单
     * @param dto   前端传递的订单信息
     * @return
     */
    @Override
    @Transactional(rollbackFor = Exception.class)//这里的事务仅仅能保证订单数据入表,不能保证远程查库存
//    @GlobalTransactional
    public SubmitOrderResponseVO submitOrder(SubmitOrderDTO dto) {
			 //... 创建订单,锁定库存业务			

        //发送消息到order-event-exchange交换机 30分钟后从order.release.order.queue 队列中取消息,判断状态,是否需要关单
        this.releaseOrder(vo.getOrder());
        return vo;
    }

    private void releaseOrder(OrderEntity order) {
        rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", order);
    }

    @Override
    public void doReleaseOrder(OrderEntity order) {
        Integer status = order.getStatus();
        //需要关单
        if (Objects.equals(status, OrderStatusEnum.CREATE_NEW.getCode())) {
            OrderEntity orderEntityForUpd = new OrderEntity();
            orderEntityForUpd.setId(order.getId());
            orderEntityForUpd.setStatus(OrderStatusEnum.CANCLED.getCode());
            updateById(orderEntityForUpd);
            //order-event-exchange发消息到 stock.release.order.queue 库存服务监听
            //为了防止一种极端情况,就是订单关单由于系统卡顿,一直无法进行关单,库存消息优先到期,判断状态一直解锁不了库存
            //所以在关单之后主动给库存服务发一个消息进行兜底
            rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", order);
        }
    }

  这里写一个类专门去监听消息:

@Slf4j
@Component
@RabbitListener(queues = "order.release.order.queue")
public class MyRabbitListener {

    @Autowired
    private OrderService orderService;

    @RabbitHandler
    public void releaseOrder(OrderEntity order, Message message, Channel channel) throws IOException {
        log.info("接收到关单请求,订单号:{}",order.getOrderSn());
        try {
            orderService.doReleaseOrder(order);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
        }
    }
}

  库存服务中,进行处理:

    /**
     * 监听 关单后 主动发送给队列的消息
     * @param order
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitHandler
    public void stockRelease(OrderEntity order, Message message, Channel channel) throws IOException {
        log.info("订单已关闭,准备主动解锁库存");
        try {
            //再次判断工作单详情表的状态,筛选出是1的订单进行关单
            String orderSn = order.getOrderSn();
            WareOrderTaskEntity wareOrderTaskEntity = wareOrderTaskDao.selectOne(new QueryWrapper<WareOrderTaskEntity>().eq("order_sn", orderSn));
            Long id = wareOrderTaskEntity.getId();
            List<WareOrderTaskDetailEntity> wareOrderTaskDetailEntities = wareOrderTaskDetailDao.selectList(new QueryWrapper<WareOrderTaskDetailEntity>().eq("task_id", id));
            List<WareOrderTaskDetailEntity> collect = wareOrderTaskDetailEntities.stream().filter(wareOrderTaskDetailEntity -> wareOrderTaskDetailEntity.getLockStatus() == 1).collect(Collectors.toList());
            if (!CollectionUtils.isEmpty(collect)) {
                for (WareOrderTaskDetailEntity wareOrderTaskDetailEntity : collect) {
                    //解锁
                    doStockRelease(wareOrderTaskDetailEntity.getWareId(), wareOrderTaskDetailEntity.getSkuId(), wareOrderTaskDetailEntity.getSkuNum(),id);
                }
            }
            //手动确认
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            //重新放回队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }

3、消息丢失、积压、重复消费解决方案

3.1、消息丢失

  关于消息丢失,在RabbitMQ基础篇中已经提到,可以通过回调函数,以及手动ACK的方式解决。除此之外,在进行业务代码编写时,还需要加入重试机制,利用try-catch捕获异常,拒绝签收并重新放回队列。但是不应该无限重试。合理的做法是创建一个消息日志表,记录消息的信息和状态,并且定期扫描数据库对未发送成功的消息进行重发。

3.2、消息重复

  可能会存在这样一种情况,消息已经成功消费,并执行了业务逻辑,在手动ACK的时候系统宕机或出现了异常(执行业务代码和手动ACK不是原子性操作,分为了两步。),这时消息会重新回到ready状态,重新投递。这样就类似于重复提交,可以在业务代码中加上状态位条件校验,或者每条消息附带一个唯一标识(messageId),在处理前检查该消息是否已经处理过,避免重复操作。

3.3、消息积压

  RabbitMQ队列中的消息堆积过多,消费者无法及时处理,导致系统性能下降或消息延迟。如果业务允许,可以对消息进行批量处理(Batch Processing),一次消费多条消息,减少每条消息的处理开销。同时对于不重要或时效性强的消息,如果积压严重,可以设置消息的TTL或使用死信队列,将过期或处理失败的消息转移或丢弃。也可以增加消费者实例来并行处理消息,缓解单个消费者处理慢的问题,或对于耗时较长的操作,可以将其改为异步处理,快速从队列中取出消息,避免队列积压。


下一篇:秒杀服务。


http://www.kler.cn/a/400603.html

相关文章:

  • ggplot2-scale_x_continuous()
  • 多线程4:线程池、并发、并行、综合案例-抢红包游戏
  • 智谱AI清影升级:引领AI视频进入音效新时代
  • 商业物联网详细指南:优势与挑战
  • 4.STM32之通信接口《精讲》之USART通信---实验串口发送程序
  • Redis知识分享(三)
  • Python技巧:查询模块的版本号的方法
  • 百度搜索AI探索版多线程批量生成TXT原创文章软件-可生成3种类型文章
  • C/C++精品项目之图床共享云存储(2):MySql连接池
  • 有序数组的平方(leetcode 977)
  • Redis自学之路—基础数据结构具体方法解析(五)
  • 【网络安全 | 甲方建设】双/多因素认证、TOTP原理及实现
  • 基于 MATLAB 的模拟退火算法详解及实现
  • 小鹏汽车嵌入式面试题及参考答案
  • MySQL索引原理之查询优化
  • Altenergy电力系统控制软件 status_zigbee SQL注入漏洞复现(CVE-2024-11305)
  • MATLAB绘制正四面体、正六面体
  • LSTM 和 LSTMCell
  • 无人机的动力系统节能——CKESC电调小课堂12
  • MySQL学习/复习6复合查询
  • 【揭秘】CSS in JS:用JS对象定义样式,告别类名冲突(4)
  • 字节跳动辞退103人
  • 111页PPT丨服装零售行业数字化时代的业务与IT转型规划
  • Jenkins关闭更新提醒和插件提醒
  • 36.矩阵格式的等差数列 C语言
  • 前端开发模块VUE-Element UI学习笔记