使用Redis实现分布式锁,基于原本单体系统进行业务改造
一、单体系统下,使用锁机制实现秒杀功能,并限制一人一单功能
1.流程图:
2.代码实现:
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIDWorker redisIDWorker;
/**
* 下单秒杀券
* @param voucherId
* @return
*/
@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠券信息
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
//2.判断当前时间是否在优惠券的开始时间和结束时间内
if(seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())){
return Result.fail("优惠券秒杀尚未开始");
}
if(seckillVoucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("优惠券秒杀已经结束");
}
//3.判断库存是否充足
if (seckillVoucher.getStock() < 1){
return Result.fail("库存不足");
}
//加上锁
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()){//事务提交后才释放锁,避免事务未提交产生的线程安全问题
//拿到代理对象-这样事务才会生效
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
//4.一人一单
//4.1 查询数据库中是否有当前用户购买此秒杀券的记录
Long userId = UserHolder.getUser().getId();
long count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if(count>0){
return Result.fail("此用户已经购买过一次了,不能重复购买");
}
//5.扣减库存
boolean isDiscount = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0)//乐观锁:使用stock代替版本号- 减库存之前判断库存是否充足,防止超卖
.update();
if(!isDiscount){
return Result.fail("库存不足");
}
//6.创建订单并保存
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIDWorker.nextId("order");
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(userId);
voucherOrder.setId(orderId);
save(voucherOrder);
//7.返回订单ID
return Result.ok(orderId);
}
}
3.存在的并发安全问题
这里我们是使用用户ID作为锁对象的,但是在分布式系统下,有多台JVM,其内存空间是各自独立的,此时虽然用户ID的值是一样的,但是其userId.toString().intern()方法返回的对象只能保证在同一个JVM上是相同的,不同的JVM使用serId.toString().intern()方法返回的对象是不同的。
因此,对于不同JVM的线程,当前使用的方式并不能解决线程安全问题,也即,这种方法无法适配分布式系统。
4.解决方案-引入分布式锁
由于引发的问题是因为 :
在不同的JVM中获得到的锁对象是不同的,因此只要让它们获得的锁对象是同一个,那就可以解决这个问题。
4.1分布式锁
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
4.2分布式锁的实现
二、使用Redis实现分布式锁
1.原理:
实现分布式锁时需要实现的两个基本方法:
(1)获取锁:
互斥:确保只能有一个线程获取锁
非阻塞:尝试一次,成功返回true,失败返回false
(2)释放锁:
手动释放
超时释放:设置过期时间
2.流程图:
3.代码改造:
1.ILock接口:(后续实现的锁都要实现这个接口)
public interface ILock {
/**
* 尝试获取锁
* @param timeOutSec
* @return true代表获取锁成功,false代表获取锁失败
*/
boolean tryLock(Long timeOutSec);
/**
* 释放锁
*/
void unlock();
}
2.SimpleRedisLock:
public class SimpleRedisLock implements ILock{
private StringRedisTemplate stringRedisTemplate;
private String name;
private static final String KEY_PREFIX = "lock:";
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean tryLock(Long timeOutSec) {
//使用String 类型来实现锁,如果key存在,则无法设置新的值
//value 为当前线程ID,为后面释放锁做准备
String threadId = Thread.currentThread().getId() + "";
Boolean isLock = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeOutSec, TimeUnit.SECONDS);
return BooleanUtil.isTrue(isLock);
}
@Override
public void unlock() {
stringRedisTemplate.delete(KEY_PREFIX+name);
}
}
3.Service代码改造:
将 seckillVoucher 中的:
//加上锁
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()){//事务提交后才释放锁,避免事务未提交产生的线程安全问题
//拿到代理对象-这样事务才会生效
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
修改为:
//加上锁
Long userId = UserHolder.getUser().getId();
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
boolean isLock = lock.tryLock(10L);
if(!isLock){
return Result.fail("请勿重复下单");
}
//拿到代理对象-这样事务才会生效
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
lock.unlock();
}
这样就实现了redis分布式锁在秒杀业务中的应用。
4.当前存在的问题-误删问题:
如上图,线程1在成功获取到锁,执行业务的时候发生阻塞,可能会由于超时释放锁。
此时线程2成功获取到锁,也在执行自己的业务,在线程2执行业务的时候,线程1又开始运行,在线程1执行完成之后会去释放这个锁。
此时如果线程3去获取锁,也能获取成功。
.....
这就是当前代码存在的线程不安全问题。
5.解决方案-改进Redis的分布式锁
5.1 解决误删除-step1:删前判断
需求:修改之前的分布式锁实现,满足:
1.在获取锁时存入线程标示(可以用UUID表示)
2.在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致 如果一致则释放锁 如果不一致则不释放锁
5.1.1 流程图
5.1.2 改造代码:
修改SimpleRedisLock 如下:
public class SimpleRedisLock implements ILock{
private StringRedisTemplate stringRedisTemplate;
private String name;
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";//用于区分不同jvm的线程
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean tryLock(Long timeOutSec) {
//使用String 类型来实现锁,如果key存在,则无法设置新的值
//value 为当前线程ID,为后面释放锁做准备
String threadId = ID_PREFIX + Thread.currentThread().getId() + "";//这样每个线程都有唯一key了
Boolean isLock = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeOutSec, TimeUnit.SECONDS);
return BooleanUtil.isTrue(isLock);
}
@Override
public void unlock() {
//获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId() + "";//这样每个线程都有唯一key了
//获取redis中的值
String key = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
if(threadId.equals(key)){//一致的话,删除锁
stringRedisTemplate.delete(KEY_PREFIX+name);
}
}
}
可以发现,在释放锁(unlock)代码中,判断一致和删除锁是非原子性的,那么也会引发线程安全问题:
可以看到,线程1在判断一致之后阻塞,在线程1阻塞期间由于超时,锁被释放。
线程2此时获取锁成功,在执行业务的时候,线程1阻塞结束,将锁删除,那还是会存在误删的情况。
这里引入lua脚本来解决原子性问题
5.2 解决误删除-step2:使用Lua脚本实现原子性
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法可以参考网站:https://www.runoob.com/lua/lua-tutorial.html
(1)unlock.lua:
-- 比较线程标识与锁中的标识是否一致
if(redis.call('get',KEYS[1])== ARGV[1])
then
-- 删除锁
return redis.call('del',KEYS[1])
end
return 0
(2)修改 SimpleRedisLock如下:
public class SimpleRedisLock implements ILock{
private StringRedisTemplate stringRedisTemplate;
private String name;
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";//用于区分不同jvm的线程
private static final DefaultRedisScript<Long>UNLOCK_SCRIPT;
static {
//类加载的时候初始化,不用重复初始化
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));//设置lua脚本位置
UNLOCK_SCRIPT.setResultType(Long.class);//设置返回类型
}
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean tryLock(Long timeOutSec) {
//使用String 类型来实现锁,如果key存在,则无法设置新的值
//value 为当前线程ID,为后面释放锁做准备
String threadId = ID_PREFIX + Thread.currentThread().getId() + "";//这样每个线程都有唯一key了
Boolean isLock = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeOutSec, TimeUnit.SECONDS);
return BooleanUtil.isTrue(isLock);
}
/**
* 使用Lua脚本保证原子性
*/
@Override
public void unlock() {
String threadId = ID_PREFIX + Thread.currentThread().getId() + "";//这样每个线程都有唯一key了
stringRedisTemplate.execute(UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
threadId);
}
// @Override
// public void unlock() {
// //获取线程标识
// String threadId = ID_PREFIX + Thread.currentThread().getId() + "";//这样每个线程都有唯一key了
// //获取redis中的值
// String key = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// if(threadId.equals(key)){//一致的话,删除锁
// stringRedisTemplate.delete(KEY_PREFIX+name);
// }
//
// }
}
此时这个使用Redis实现的分布式锁就可以满足大部分业务需求了,不过还是存在一些问题:
1.不可重入
2.不可重试
3.超时释放存在的安全隐患
4.集群时,主从节点之间存在的不一致性
这些问题我们可以直接采用成熟的,已实现的框架来帮助我们解决问题,如Redisson,后续也会出一篇关于Redisson的文章。