黑马点评6——优惠券秒杀—Redis消息队列实现异步秒杀
文章目录
- Redis消息队列实现异步秒杀
- 基于list结构模拟消息队列
- 基于pubSub的消息队列
- 基于Stream的单消费模式
- 基于Stream的消息队列-消费者组
- 基于Stream的消息队列实现异步秒杀
书接上回,基于JVM的消息队列实现的阻塞队列,引发的两个问题
Redis消息队列实现异步秒杀
基于list结构模拟消息队列
基于pubSub的消息队列
基于Stream的单消费模式
基于Stream的消息队列-消费者组
总结一下
基于Stream的消息队列实现异步秒杀
我们在redis中创建消息队列
XGROUP CREATE stream.orders g1 0 MKSTREAM
然后修改我们的阻塞队列的实现代码,修改成消息队列的实现方式。
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedissonClient redissonClient;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); // 单线程的线程池
@PostConstruct // 当前类初始化完毕后执行
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
// 这个任务必须在这个类初始化之后就得执行,用spring提供的注解实现PostConstruct实现
private class VoucherOrderHandler implements Runnable{ // 线程任务,通过内部类来实现
String queueName = "stream.orders";
@Override
public void run() { // 这里就不断的从阻塞队列里取,然后执行
while(true){
try{
// 1. 获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
// 2 判断消息获取是否成功
if(list == null || list.isEmpty()){
// 2.1 如果获取失败,说明没有消息,继续下一次循环
continue;
}
// 3. 解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
// 3. 如果获取成功,可以下单
handlerVoucherOrder(voucherOrder);
// 4. ACK 确认 SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
}catch (Exception e){
log.error("处理订单异常",e);
handlePendingList();
}
}
}
private void handlePendingList() {
while(true){
try{
// 1. 获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
// 2 判断消息获取是否成功
if(list == null || list.isEmpty()){
// 2.1 如果获取失败,说明pending-list没有异常消息,继续下结束循环
break;
}
// 3. 解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
// 3. 如果获取成功,可以下单
handlerVoucherOrder(voucherOrder);
// 4. ACK 确认 SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
}catch (Exception e){
log.error("处理pending-list订单异常",e);
try {
Thread.sleep(20);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
}
private void handlerVoucherOrder(VoucherOrder voucherOrder) {
// 1.获取用户
Long userId = voucherOrder.getUserId();
// 2.创建锁对象
RLock lock = redissonClient.getLock("lock:oder:" + userId); // 使用Redisson获取锁
// 3.获取锁
boolean isLock = lock.tryLock();
// 4.判断是否获取锁成功
if(!isLock){
// 获取锁失败,返回错误信息或重试
log.error("不允许重复下单");
return;
}
try{
// 获取代理对象(事务)
proxy.createVoucherOrder(voucherOrder);
}finally{
// 释放锁
lock.unlock();
}
}
private IVoucherOrderService proxy;
@Override
public Result seckillVoucher(Long voucherId) {
// 获取用户
Long userId = UserHolder.getUser().getId();
// 获取订单id
long orderId = redisIdWorker.nextId("order");
// 1. 执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString(),
String.valueOf(orderId)
);
// 2. 判断结果是否为0
int r = result.intValue();
if(r != 0){
// 2.1 不为0, 代表没有购买资格
return Result.fail(r == 1 ? "库存不足": "不能重复下单");
}
// 3. 获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
// 3.返回订单id
return Result.ok(orderId);
}
/**
* 每一个请求过来,这个id对象都是一个全新的id对象,因为要是对userId加锁的话,对象变了锁就变了,那不行
* 我们希望id的值一样,所以用了toString(),但是toString()依旧不能保证是对对象的值加锁的
* toString底层是new 一个String数组,还是new了一个新对象,同一个用户id在不同的请求中过来,每次都new一个,还是不能把锁加载同一个用户上
* 于是用intern() ,intern()方法可以去字符串常量池中找字符串值一样的引用返回
* 这样一来,如果你的userId是5,不管你new了多少个字符串,只要值是一样的,返回的结果也一样。这样就可以锁住同一个用户
* 不同的用户不会被锁住
*//*
*//*synchronized (userId.toString().intern()) {
// 获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy(); // 拿到当前对象的代理对象,其实就是IVoucherOrderService这个接口的代理对象,返回的是Object,做个强转
return proxy.createVoucherOrder(voucherId); // 报错了是因为我们的接口中没有这个方法,那我们就在接口中创建一下这个方法
}*//*
// 创建锁对象-初版分布式锁
RLock lock = redissonClient.getLock("lock:oder:" + userId); // 使用Redisson获取锁
// 获取锁
boolean isLock = lock.tryLock();
// 判断是否获取锁成功
if(!isLock){
// 获取锁失败,返回错误信息或重试
return Result.fail("不允许重复下单!");
}
try{
// 获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy(); // 拿到当前对象的代理对象,其实就是IVoucherOrderService这个接口的代理对象,返回的是Object,做个强转
return proxy.createVoucherOrder(voucherId); // 报错了是因为我们的接口中没有这个方法,那我们就在接口中创建一下这个方法
}finally{
// 释放锁
lock.unlock();
}
}*/
/**
* 事务加在这,就失效了,为什么呢?
* 加载这是对createVoucherOrder函数加了事务,没有给seckillVoucher函数加事务,而seckillVoucher函数调用的时候
* createVoucherOrder(voucherId);
* 这样使用this调用的,这个this拿到的是当前的VoucherOrderServiceImpl对象
* 而不是VoucherOrderServiceImpl的代理对象
* 而事务要想生效,是spring对当前这个类做了动态代理,拿到代理对象做的事务处理
* 而我们当前的this是非代理对象,这就是事务失效的几种可能性之一
* 解决方法之一:
* AopContext.currentProxy()拿到代理对象来调用createVoucherOrder
*
* 当然这样解决还得做两件事:
* 1. 引入aspectj的依赖
* 2. 启动类添加注解@EnableAspectJAutoProxy(exposeProxy = true)暴露代理对象
*/
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder){
// 5. 一人一单
// Long userId = UserHolder.getUser().getId();
Long userId = voucherOrder.getUserId();
// 5.1 查询订单
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
// 5.2 判断是否存在
if(count > 0){
log.error("用户已经购买过一次!");
return;
}
// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherOrder.getVoucherId())
.gt("stock",0) // 把判断条件改成库存大于0就可以避免乐观锁的弊端
.update();
if(!success){
log.error("库存不足!");
return;
}
save(voucherOrder);
new ArrayList<>();
}
}