【分布式锁解决超卖问题】setnx实现
目录
使用场景的描述
并发安全问题
悲观锁与乐观锁问题
一人一单的问题
服务器负载均衡问题
分布式锁
分布式锁的实现
获取锁
释放锁
实现思路
误删情况的分析
解决误删的方法
代码优化
分布式锁的原子性分析
文章代码地址:分布式锁1.0
使用场景的描述
今天的主人公是我们的滑稽老铁。这天领导给滑稽安排了一个秒杀功能的任务,说:“马上就要双十一了,你去把优惠卷的功能实现一下吧。”
并发安全问题
滑稽老铁经过分析需求之后呢,觉得很简单:只需要根据当前的优惠卷的库存判断当前的库存是否大于0;如果大于0,就扣减库存,否则秒杀失败。于是他就很快的写出了以下的代码:
@Transactional
public Result seckillVoucher(Long voucherId) {
// 查询优惠卷
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
// 判断秒杀是否开始
// isAfter()方法用于检查作为参数传递的日期是否在此LocalDateTime实例之后
if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())){
// 尚未开始
return Result.fail("优惠卷秒杀活动尚未开始");
}
// 判断秒杀是否结束
// 如果调用该方法的日期在传入的日期之前,则返回true;否则返回false
if(seckillVoucher.getEndTime().isBefore(LocalDateTime.now())){
// 已经结束
return Result.fail("优惠卷秒杀活动已经结束");
}
// 判断库存是否充足
if(seckillVoucher.getStock()<1){
// 库存不足
return Result.fail("优惠卷库存不足");
}
// 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.update();
// 判断扣减有没有成功
if (!success){
return Result.fail("优惠卷库存不足");
}
// 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 用户id
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
// 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 返回订单
return Result.ok(orderId);
}
等到开发完后,滑稽老铁自信满满的将程序上线;毫无一问,他狠狠挨了领导的吊,为什么呢?我们现在来看:
使用Apache JMeter创建200个线程来访问这个接口,理论上我们的异常率应该是 50%,但是这里却是 46.50% 。
这说明滑稽老铁的程序肯定在哪里出了问题。我们打开数据库来看,发现优惠卷的数量竟然出现了 “负数”。本该是一张优惠卷却出现被好几个用户抢到的局面,这就是所谓的 “超卖”。
很快这位滑稽老铁开始快速的分析问题:
(1) 很显然,上次他只考虑到了以下这一种情况,并没有考虑线程并发问题。
(2) 经过分析,滑稽老铁开始反思上一次的错误,假设当优惠卷的库存只剩下1,此时有多个线程进入查询状态,并且查询的结构都表示 “还有库存”,结果该库存被一个线程抢先了,那么其他的线程因为没有继续判断是否还有库存,所以都对库存进行了扣减导致了超卖。
悲观锁与乐观锁问题
于是滑稽老铁想到了用加锁的方式去保证高并发下的线程安全问题,此时他再次犯难了。因为加锁意味着将程序变为串行,这是十分影响性能的一件事。
悲观锁 | 添加同步锁,让线程串行执行 |
优点 | 简单粗暴 |
缺点 | 性能一般 |
乐观锁 | 不加锁,在更新时判断是否有其它线程在修改 |
优点 | 性能好 |
缺点 | 存在成功率低的问题 |
总结:悲观锁适用于插入数据,乐观锁适用于插入数据。
经过权衡, 他决定使用乐观锁的方式来优化程序。于是他开发出了以下代码:
@Transactional
public Result seckillVoucher(Long voucherId) {
// 查询优惠卷
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
// 判断秒杀是否开始
// isAfter()方法用于检查作为参数传递的日期是否在此LocalDateTime实例之后
if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())){
// 尚未开始
return Result.fail("优惠卷秒杀活动尚未开始");
}
// 判断秒杀是否结束
// 如果调用该方法的日期在传入的日期之前,则返回true;否则返回false
if(seckillVoucher.getEndTime().isBefore(LocalDateTime.now())){
// 已经结束
return Result.fail("优惠卷秒杀活动已经结束");
}
// 判断库存是否充足
if(seckillVoucher.getStock()<1){
// 库存不足
return Result.fail("优惠卷库存不足");
}
// 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.gt("stock",0)
.update();
// 判断扣减有没有成功
if (!success){
return Result.fail("优惠卷库存不足");
}
// 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 用户id
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
// 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 返回订单
return Result.ok(orderId);
}
}
此时的代码就已经解决了超卖问题,异常与数据库的数据都证明他的代码很完美。
乐观锁的策略就是判断之前查询得到的数据是否有被修改过:假设当前库存为1,此时有多个线程进入查询,当一个线程抢先执行了扣减操作,那么其他线程发现库存的值已经被修改过了就不会去再修改,而是重新查询库存,在判断扣减。
一人一单的问题
滑稽老铁根据需求再添加了一人限购一次的逻辑并优化了程序,就再次上线了。为什么要设计一人一单的逻辑呢?主要是为了防止黄牛屯货而设计的,目的是为了更好的引流。
public class SimpleRedisLock implements ILock {
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}
private static final String KEY_PREFIX = "lock:";
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识
long threadId = Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX+name,threadId+"",timeoutSec, TimeUnit.SECONDS);
// 防止自动拆箱
return Boolean.TRUE.equals(success);
}
@Override
public void unLock() {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX+name);
}
}
@Override
public Result seckillVoucher(Long voucherId) {
// 查询优惠卷
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
// 判断秒杀是否开始
// isAfter()方法用于检查作为参数传递的日期是否在此LocalDateTime实例之后
if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())){
// 尚未开始
return Result.fail("优惠卷秒杀活动尚未开始");
}
// 判断秒杀是否结束
// 如果调用该方法的日期在传入的日期之前,则返回true;否则返回false
if(seckillVoucher.getEndTime().isBefore(LocalDateTime.now())){
// 已经结束
return Result.fail("优惠卷秒杀活动已经结束");
}
// 判断库存是否充足
if(seckillVoucher.getStock()<1){
// 库存不足
return Result.fail("优惠卷库存不足");
}
Long userId = UserHolder.getUser().getId();
// userId.toString().intern() 去字符串常量池寻找相同的字符串作为锁对象
synchronized (userId.toString().intern()){
// 获取代理对象
IVoucherOrderService proxy=(IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}
@Transactional
public Result createVoucherOrder(Long voucherId){
// 一人一单
Long userId = UserHolder.getUser().getId();
// 查询该用户购买的订单数量
int count = query()
.eq("user_id", userId)
.eq("voucher_id", voucherId)
.count();
if (count > 0) {
// 该用户已经购买
return Result.fail("一人只限购一单");
}
// 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId).gt("stock",0)
.update();
// 判断扣减有没有成功
if (!success){
return Result.fail("优惠卷库存不足");
}
// 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 用户id
voucherOrder.setUserId(userId);
// 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 返回订单
return Result.ok(orderId);
}
}
服务器负载均衡问题
滑稽老铁只把这个秒杀功能部署在一台服务器上,起初一切正常;随着人数的增多,服务器的性能已经达到了瓶颈。但是他一点也不慌,因为它学过一个叫 nginx 负载均衡的技术,他将服务器水平扩展,通过 nginx 进行分布式集群部署。这样虽然吞吐量上来了,但是程序又出现了超卖的问题。
我们这里可以根据 idea 调试一下:
使用 idea 开启两个 tomcat 服务,并在 postman 中发送两次不同的请求。
通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。
因为我们部署了多个Tomcat,每个Tomcat都有一个属于自己的jvm,那么假设在服务器的Tomcat内部有2个线程。由于这两个线程都是用的同一个jvm,所以他们的锁的对象都是同一个,是可以实现互斥的。
但是由于这里有两个Tomcat,又有2个线程,但是他们的jvm由于服务器不同而不同,他们的锁对象不是同一个,所以B服务器里面的线程没办法和A服务器的线程产生互斥。这就是集群环境下单机锁失效的原因。
在这种情况下,就需要分布式锁来解决这个问题。
分布式锁
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
根据以上滑稽老铁的问题,我们来分析:
只要服务器与服务器之间也使用一把锁锁住,才能保证服务器负载均衡下的高并发问题。一台服务器拿到锁,那么就由这台服务器内部的线程去竞争这把锁,竞争到锁的线程去执行相应的业务,其他线程\服务器阻塞等待;直到锁释放,其他的线程\服务器才能获取锁执行业务。
那么使用哪一把锁来作为服务器之间的锁呢? -- 分布式锁。
分布式锁的实现
使用 Redis 实现分布式锁的方案常见的有以下三种方法:
本章我们就使用 Redis 来实现我们的分布式锁。
使用 Redis 来实现分布式锁,通常是通过 SETNX 和 EXPIRE 命令来实现。SETNX 用于设置一个键值对,如果键不存在,则操作成功;EXPIRE 设置键的过期时间,以防止死锁。这种方法的优点是性能高,但实现相对复杂,需要考虑超时和原子性问题。
实现分布式锁时需要实现的两个基本方法:
获取锁 |
---|
互斥:确保只能有一个线程获取锁 |
非阻塞:尝试一次,成功返回 true,失败返回 false |
释放锁 |
---|
手动释放 |
超时释放:获取锁时添加一个超时时间 |
实现思路
我们利用 redis 的 SETNX 方法,当有多个线程进入时,我们就利用该方法,第一个线程进入时,redis 中就有这个key 了,返回了1,如果结果是1,则表示他抢到了锁,那么他去执行业务,然后再删除锁,退出锁逻辑,没有抢到锁的线程,等待一定时间后重试即可。为了防止死锁的情况,我们可以通过 EXPIRE 来设置过期时间。
利用 SETNX 方法进行加锁,同时增加过期时间,防止死锁,此方法可以保证加锁和增加过期时间具有原子性:
private static final String KEY_PREFIX="lock:"
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = Thread.currentThread().getId()
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
释放锁,防止删除别人的锁:
public void unlock() {
//通过del删除锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
修改业务代码:
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
//创建锁对象(新增代码)
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
//获取锁对象
boolean isLock = lock.tryLock(1200);
//加锁失败
if (!isLock) {
return Result.fail("不允许重复下单");
}
try {
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
//释放锁
lock.unlock();
}
}
误删情况的分析
以上的代码仍有不完美的地方:
持有锁的线程1在锁的内部出现了阻塞,导致他的锁过期被自动释放了,此时线程2过来尝试获得锁,就拿到了这把锁,然后线程2在持有锁执行过程中,线程1反应过来,继续执行,而线程1执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程2的锁进行删除。那么此时线程3就拿到锁,有可能会被线程2删除锁;反复如此,线程安全问题不可避免。这就是误删别人锁的情况。
解决误删的方法
解决方案就是在每个线程释放锁的时候,去判断一下当前这把锁是否属于自己,如果属于自己,则不进行锁的删除,假设还是上边的情况,线程1卡顿,锁自动释放,线程2进入到锁的内部执行逻辑,此时线程1反应过来,然后删除锁,但是线程1,一看当前这把锁不是属于自己,于是不进行删除锁逻辑,当线程2走到删除锁逻辑时,如果没有卡过自动释放锁的时间点,则判断当前这把锁是属于自己的,于是删除这把锁。
代码优化
加锁
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
解锁
public void unlock() {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标示
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标示是否一致
if(threadId.equals(id)) {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
分布式锁的原子性分析
上面的 SETNX 和 EXPIRE 实现分布式锁的方式是不安全,两条命令非原子性的,并不能保证一致性,可以通过一些第三方框架或者自己通过 Lua 脚本实现原子操作,下面会通过代码分析分布式锁来实现。
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
我们的 RedisTemplate 中,可以利用 execute 方法去执行 lua 脚本:
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
public void unlock() {
// 调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}
总结:
利用 SETNX EXPIRE 获取锁,并设置过期时间,保存线程标识 |
---|
释放锁时先判断线程标识是否与自己一致,一致则删除锁 |
特性 -> |
利用 SETNX 满足互斥性 |
利用 EXPIRE 保证故障时锁依然能释放,避免死锁,提高安全性 |
利用 Redis 集群保证高可用和高并发特性 |