当前位置: 首页 > article >正文

黑马点评6——优惠券秒杀—Redis消息队列实现异步秒杀

文章目录

  • Redis消息队列实现异步秒杀
    • 基于list结构模拟消息队列
    • 基于pubSub的消息队列
    • 基于Stream的单消费模式
    • 基于Stream的消息队列-消费者组
    • 基于Stream的消息队列实现异步秒杀

书接上回,基于JVM的消息队列实现的阻塞队列,引发的两个问题
在这里插入图片描述

Redis消息队列实现异步秒杀

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

基于list结构模拟消息队列

在这里插入图片描述
在这里插入图片描述

基于pubSub的消息队列

在这里插入图片描述
在这里插入图片描述

基于Stream的单消费模式

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

基于Stream的消息队列-消费者组

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
总结一下
在这里插入图片描述

基于Stream的消息队列实现异步秒杀

在这里插入图片描述
我们在redis中创建消息队列

 XGROUP CREATE stream.orders g1 0 MKSTREAM

然后修改我们的阻塞队列的实现代码,修改成消息队列的实现方式。

@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Autowired
    private ISeckillVoucherService seckillVoucherService;

    @Autowired
    private RedisIdWorker redisIdWorker;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Autowired
    private RedissonClient redissonClient;
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();  // 单线程的线程池

    @PostConstruct // 当前类初始化完毕后执行
    private void init(){
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }
    // 这个任务必须在这个类初始化之后就得执行,用spring提供的注解实现PostConstruct实现
    private class VoucherOrderHandler implements Runnable{  // 线程任务,通过内部类来实现
        String queueName = "stream.orders";
        @Override
        public void run() {  // 这里就不断的从阻塞队列里取,然后执行
            while(true){
                try{
                    // 1. 获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order >
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create(queueName, ReadOffset.lastConsumed())
                    );
                    // 2 判断消息获取是否成功
                    if(list == null || list.isEmpty()){
                        // 2.1 如果获取失败,说明没有消息,继续下一次循环
                        continue;
                    }
                    // 3. 解析消息中的订单信息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> values = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                    // 3. 如果获取成功,可以下单
                    handlerVoucherOrder(voucherOrder);
                    // 4. ACK 确认 SACK stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
                }catch (Exception e){
                    log.error("处理订单异常",e);
                    handlePendingList();
                }
            }
        }

        private void handlePendingList() {
            while(true){
                try{
                    // 1. 获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order 0
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create(queueName, ReadOffset.from("0"))
                    );
                    // 2 判断消息获取是否成功
                    if(list == null || list.isEmpty()){
                        // 2.1 如果获取失败,说明pending-list没有异常消息,继续下结束循环
                        break;
                    }
                    // 3. 解析消息中的订单信息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> values = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                    // 3. 如果获取成功,可以下单
                    handlerVoucherOrder(voucherOrder);
                    // 4. ACK 确认 SACK stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
                }catch (Exception e){
                    log.error("处理pending-list订单异常",e);
                    try {
                        Thread.sleep(20);
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
    }

    private void handlerVoucherOrder(VoucherOrder voucherOrder) {
        // 1.获取用户
        Long userId = voucherOrder.getUserId();
        // 2.创建锁对象
        RLock lock = redissonClient.getLock("lock:oder:" + userId);  // 使用Redisson获取锁
        // 3.获取锁
        boolean isLock = lock.tryLock();
        // 4.判断是否获取锁成功
        if(!isLock){
            // 获取锁失败,返回错误信息或重试
            log.error("不允许重复下单");
            return;
        }
        try{
            // 获取代理对象(事务)
            proxy.createVoucherOrder(voucherOrder);
        }finally{
            // 释放锁
            lock.unlock();
        }
    }
    private IVoucherOrderService proxy;

    @Override
    public Result seckillVoucher(Long voucherId) {
        // 获取用户
        Long userId = UserHolder.getUser().getId();
        // 获取订单id
        long orderId = redisIdWorker.nextId("order");
        // 1. 执行lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(),
                userId.toString(),
                String.valueOf(orderId)
        );
        // 2. 判断结果是否为0
        int r = result.intValue();
        if(r != 0){
            // 2.1 不为0, 代表没有购买资格
            return Result.fail(r == 1 ? "库存不足": "不能重复下单");
        }
        // 3. 获取代理对象
        proxy = (IVoucherOrderService) AopContext.currentProxy();
        // 3.返回订单id
        return Result.ok(orderId);
    }



    /**
         *          每一个请求过来,这个id对象都是一个全新的id对象,因为要是对userId加锁的话,对象变了锁就变了,那不行
         *          我们希望id的值一样,所以用了toString(),但是toString()依旧不能保证是对对象的值加锁的
         *          toString底层是new 一个String数组,还是new了一个新对象,同一个用户id在不同的请求中过来,每次都new一个,还是不能把锁加载同一个用户上
         *          于是用intern() ,intern()方法可以去字符串常量池中找字符串值一样的引用返回
         *          这样一来,如果你的userId是5,不管你new了多少个字符串,只要值是一样的,返回的结果也一样。这样就可以锁住同一个用户
         *          不同的用户不会被锁住
         *//*

        *//*synchronized (userId.toString().intern()) {
            // 获取代理对象(事务)
            IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();  // 拿到当前对象的代理对象,其实就是IVoucherOrderService这个接口的代理对象,返回的是Object,做个强转
            return proxy.createVoucherOrder(voucherId);  // 报错了是因为我们的接口中没有这个方法,那我们就在接口中创建一下这个方法
        }*//*
        // 创建锁对象-初版分布式锁
        RLock lock = redissonClient.getLock("lock:oder:" + userId);  // 使用Redisson获取锁
        // 获取锁
        boolean isLock = lock.tryLock();
        // 判断是否获取锁成功
        if(!isLock){
            // 获取锁失败,返回错误信息或重试
            return Result.fail("不允许重复下单!");
        }
        try{
// 获取代理对象(事务)
            IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();  // 拿到当前对象的代理对象,其实就是IVoucherOrderService这个接口的代理对象,返回的是Object,做个强转
            return proxy.createVoucherOrder(voucherId);  // 报错了是因为我们的接口中没有这个方法,那我们就在接口中创建一下这个方法
        }finally{
            // 释放锁
            lock.unlock();
        }
    }*/

    /**
     * 事务加在这,就失效了,为什么呢?
     * 加载这是对createVoucherOrder函数加了事务,没有给seckillVoucher函数加事务,而seckillVoucher函数调用的时候
     * createVoucherOrder(voucherId);
     * 这样使用this调用的,这个this拿到的是当前的VoucherOrderServiceImpl对象
     * 而不是VoucherOrderServiceImpl的代理对象
     * 而事务要想生效,是spring对当前这个类做了动态代理,拿到代理对象做的事务处理
     * 而我们当前的this是非代理对象,这就是事务失效的几种可能性之一
     * 解决方法之一:
     * AopContext.currentProxy()拿到代理对象来调用createVoucherOrder
     *
     * 当然这样解决还得做两件事:
     * 1. 引入aspectj的依赖
     * 2. 启动类添加注解@EnableAspectJAutoProxy(exposeProxy = true)暴露代理对象
     */
    @Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder){
        // 5. 一人一单
//        Long userId = UserHolder.getUser().getId();
        Long userId = voucherOrder.getUserId();
        // 5.1 查询订单
        Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
        // 5.2 判断是否存在
        if(count > 0){
            log.error("用户已经购买过一次!");
            return;
        }

        // 6.扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1")
                .eq("voucher_id", voucherOrder.getVoucherId())
                .gt("stock",0)   // 把判断条件改成库存大于0就可以避免乐观锁的弊端
                .update();

        if(!success){
            log.error("库存不足!");
            return;
        }
        save(voucherOrder);
        new ArrayList<>();

    }
}


http://www.kler.cn/a/293146.html

相关文章:

  • SQL 中 BETWEEN AND 用于字符串的理解
  • 安全见闻1-5
  • 设计模式之工厂模式,但是宝可梦
  • 【计算机网络】TCP网络程序
  • 1.7 JS性能优化
  • HTTP常见的请求头有哪些?都有什么作用?在 Web 应用中使用这些请求头?
  • 智联云采 SRM2.0 autologin 身份认证绕过漏洞复现
  • Spring、SpringMVC、SpringBoot都是什么,有什么区别
  • 如何使用事件流相关操作
  • Maven聚合与继承
  • 11、Django Admin启用对计算字段的过滤
  • 大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
  • Java反射机制讲解
  • C++set与map容器
  • 10Python的Pandas:样式Style
  • 数据访问:JPA
  • Django ORM - 如何单独使用 Django 数据库
  • AutosarMCAL开发——基于EB Gpt驱动
  • 【王树森】BERT:预训练Transformer模型(个人向笔记)
  • 2024 年高教社杯全国大学生数学建模竞赛题目-C 题 农作物的种植策略
  • 【Cesium实体创建】
  • HTML:charset讲解
  • Elasticsearch 再次开源
  • 开源云原生数据库PolarDB PostgreSQL 15兼容版本正式发布
  • 计算机视觉中,什么是上下文信息(contextual information)?
  • uuid uuid uuid