Redis7——进阶篇(六)
前言:此篇文章系本人学习过程中记录下来的笔记,里面难免会有不少欠缺的地方,诚心期待大家多多给予指教。
基础篇:
- Redis(一)
- Redis(二)
- Redis(三)
- Redis(四)
- Redis(五)
- Redis(六)
- Redis(七)
- Redis(八)
进阶篇:
- Redis(九)
- Redis(十)
- Redis(十一)
- Redis(十二)
- Redis(十三)
接上期内容:上期完成了缓存穿透、预热、雪崩、击穿相关知识学习。下面学习分布式锁(重点),话不多说,直接发车。
一、什么是锁
(一)、单锁
- 定义:进程内的线程同步机制,确保共享资源在同一时间只能被一个线程访问。
- 实现方式:通过synchronized或者Lock接口实现。
- 应用场景:单进程内的并发控制(如内存缓存、本地队列)。
(二)、分布式锁
- 定义:跨多个进程 / 节点的锁机制,确保分布式系统中共享资源的互斥访问。
-
实现方式:redis(基于SETNX)或Redisson框架
- 应用场景:各种分布式系统。
二、分布式锁具备的条件
- 独占性:同一时间只能有一个节点持有锁。
- 高可用:锁服务不能因单点故障而失效。
- 防死锁:锁必须有过期时间,避免因节点崩溃导致锁永久无法释放。
- 不乱抢(公平性):锁的获取顺序应与请求顺序一致,不能释放别人的锁。
- 重入性:同一节点可多次获取同一锁而不阻塞。
三、分布式锁的功能
- 数据一致性保障:防止多节点同时修改共享数据(如库存扣减)。
- 操作原子性:确保跨节点的复杂操作(如 “查询 - 修改 - 写入”)不被打断。
- 并发限流:控制分布式系统中的请求并发量(如秒杀活动)
- 任务调度:保证同一任务只被一个节点执行(如分布式定时任务)。
四、分布式锁实操
(一)、版本一
1、搭建base工程
其他详细步骤略。最终目录:
InventoryController类:
@RestController
@Tag(name = "redis分布式锁测试")
public class InventoryController
{
@Resource
private InventoryService inventoryService;
@Tag(name = "扣减库存,一次扣减一个")
@GetMapping(value = "/inventory/sale")
public String sale()
{
return inventoryService.sale();
}
}
InventoryService类:
@Service
@Slf4j
public class InventoryService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String port;
private final Lock lock = new ReentrantLock();
public String sale() {
String retMessage = "";
lock.lock();
try {
//1 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//2 判断库存是否足够
int inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//3 扣减库存
if (inventoryNumber > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余: " + inventoryNumber;
System.out.println(retMessage);
} else {
retMessage = "商品卖完了,o(╥﹏╥)o";
}
} finally {
lock.unlock();
}
return retMessage + "\t" + "服务端口号:" + port;
}
}
2、演示单锁
在service层加上了Lock锁
3、测试单锁
访问http://localhost:8088/swagger-ui/index.html#/,进行测试。
测试结果:不管点击多快,不会出现库存扣除错误,出现超卖现象。
(二)、版本二
在单机版中Lock或者synchronized能确保资源只能被一个线程所持有。但是在分布式系统、高并发场景下,不知道还能不实现业务需求。
1、架构说明
将系统分布式部署后,通过Nginx做负载均衡,转发到对应的服务器上。
本地模拟分布式部署方式有两种:①、拷贝现有项目,修改端口。②、使用IDEA自带的虚拟服务,启动一个虚拟端口。(在之前的SpringCloud微服务篇有说到怎么启动虚拟端口—SpringCloud微服务(四)),推荐使用②
2、Nginx的安装
①、安装 EPEL仓库
Nginx 可以通过 EPEL 仓库进行安装,因此需要先安装该仓库。也可以使用其他方式,只要确保Nginx安装成功即可。
dnf install epel-release -y
②、安装Nginx
dnf install nginx -y
③、修改Nginx配置
配置文件目录:/etc/nginx/nginx.conf
upstream mynginx {
server 自己的iP和端口号 weight=1;
server 自己的iP和端口号 weight=1;
}
#在server下配置
location / {
proxy_pass http://mynginx;
index index.html index.htm;
}
④、启动Nginx服务
systemctl start nginx
systemctl status nginx
可能存在问题:
- 防火墙问题,导致Nginx无法正常运行。(systemctl stop firewalld)
- SELinux安全措施,处于强制模式。(getenforce 查看状态 Enforcing/Disabled/Permissive)
- setenforce 0 临时禁止
3、模拟测试
通过Nginx访问,你的Linux服务器地址IP,http:xxx.xxx.xxx.xxx/inventory/sale,使用JMeter进行压力测试。
4、存在问题
在高并发场景下,出现重复扣减现象,很明显synchronized或Lock无法保证业务的正常进行。
解决办法:使用分布式锁
(三)、版本三
在集群高并发场景下,单锁无法满足正常的业务逻辑。需要使用分布式锁,基于Redis中setnx命令实现。
1、修改代码
3.0版本
public String sale() {
String retMessage = "";
String key = "RedisLock";
String uuidValue = UUID.randomUUID() + ":" + Thread.currentThread().getId();
// redis锁
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue);
if (Boolean.FALSE.equals(flag)) {
//暂停20毫秒后递归调用
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
sale();
} else {
try {
//1 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//2 判断库存是否足够
int inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//3 扣减库存
if (inventoryNumber > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余: " + inventoryNumber;
System.out.println(retMessage);
} else {
retMessage = "商品卖完了,o(╥﹏╥)o";
}
} finally {
stringRedisTemplate.delete(key);
}
}
return retMessage + "\t" + "服务端口号:" + port;
}
2、存在问题
从测试结果来看,没有出现重复扣减现象,基本符合业务需求。但是还有潜在的bug,递归调用容易出现StackOverflowError问题,需要改进。
3、优化代码
使用while循环来替代递归调用,避免栈溢出问题。
public String sale() {
String retMessage = "";
String key = "RedisLock";
String uuidValue = UUID.randomUUID() + ":" + Thread.currentThread().getId();
// redis锁
while (Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue))) {
//暂停20毫秒后,类似CAS自旋
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
//1 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//2 判断库存是否足够
int inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//3 扣减库存
if (inventoryNumber > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余: " + inventoryNumber;
System.out.println(retMessage);
} else {
retMessage = "商品卖完了,o(╥﹏╥)o";
}
} finally {
stringRedisTemplate.delete(key);
}
return retMessage + "\t" + "服务端口号:" + port;
}
(四)、版本四
在版本三中,还存在潜在bug。如果部署了微服务的Java程序机器挂了,代码层面根本没有走到finally这块,没办法保证解锁(无过期时间该key一直存在),这个key没有被删除,需要加入一个过期时间来限定key。
1、修改代码
public String sale() {
String retMessage = "";
String key = "RedisLock";
String uuidValue = UUID.randomUUID() + ":" + Thread.currentThread().getId();
// redis锁
while (Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue))) {
//暂停20毫秒后,类似CAS自旋
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 设置锁的过期时间
stringRedisTemplate.expire(key,30L,TimeUnit.SECONDS);
try {
//1 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//2 判断库存是否足够
int inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//3 扣减库存
if (inventoryNumber > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余: " + inventoryNumber;
System.out.println(retMessage);
} else {
retMessage = "商品卖完了,o(╥﹏╥)o";
}
} finally {
stringRedisTemplate.delete(key);
}
return retMessage + "\t" + "服务端口号:" + port;
}
2、存在问题
设置key和设置key的过期时间不具备原子性,继续优化。
public String sale() {
String retMessage = "";
String key = "RedisLock";
String uuidValue = UUID.randomUUID() + ":" + Thread.currentThread().getId();
// redis锁
while (Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue, 30L, TimeUnit.SECONDS))) {
//暂停20毫秒后,类似CAS自旋
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
//1 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//2 判断库存是否足够
int inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//3 扣减库存
if (inventoryNumber > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余: " + inventoryNumber;
System.out.println(retMessage);
} else {
retMessage = "商品卖完了,o(╥﹏╥)o";
}
} finally {
stringRedisTemplate.delete(key);
}
return retMessage + "\t" + "服务端口号:" + port;
}
(五)、版本五
在分布式系统使用 Redis 分布式锁时,会出现并发问题。线程 A 获取锁并执行任务,但其业务处理时间超出锁的过期时间。锁过期后,线程 B 获取并创建新锁。此时线程 A 完成业务尝试解锁,因未做鉴权,无法识别锁的归属,误删线程 B 创建的锁,破坏锁的互斥性,可能导致数据不一致,该如何处理此问题?
1、修改代码
public String sale() {
String retMessage = "";
String key = "RedisLock";
String uuidValue = UUID.randomUUID() + ":" + Thread.currentThread().getId();
// redis锁
while (Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue, 30L, TimeUnit.SECONDS))) {
//暂停20毫秒后,类似CAS自旋
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
//1 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//2 判断库存是否足够
int inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//3 扣减库存
if (inventoryNumber > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余: " + inventoryNumber;
System.out.println(retMessage);
} else {
retMessage = "商品卖完了,o(╥﹏╥)o";
}
} finally {
// 判断加锁与解锁是不是同一个客户端,同一个才行,自己只能删除自己的锁,不误删他人的
if (Objects.equals(stringRedisTemplate.opsForValue().get(key), uuidValue)) {
stringRedisTemplate.delete(key);
}
}
return retMessage + "\t" + "服务端口号:" + port;
}
2、存在问题
上述代码虽然保证了不会误删别人的锁,但是finally块的判断+del删除操作不是原子性的,需要引入Lua脚本来保证操作原子性。
2.1、Lua脚本
Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。官网地址:Scripting with Lua | Docs
2.2、Lua语法
eval luascript numkeys [key [key ...]] [arg [arg ...]]
参数解释:
- luascript:这是要执行的Lua脚本的内容。Lua 脚本可以包含多个 Redis 命令,使用
redis.call()
或redis.pcall()
函数来调用这些命令。 - numkeys:表示脚本中会使用到的键名参数的数量。
- [key [key .....]]:是一系列的键名参数,数量由
numkeys
指定。在 Lua 脚本中,可以通过KEYS
数组来访问这些键名,例如KEYS[1]
表示第一个键名,KEYS[2]
表示第二个键名,依此类推。 - [arg [arg .....]]:是一系列的附加参数,在 Lua 脚本中,可以通过
ARGV
数组来访问这些参数,例如ARGV[1]
表示第一个附加参数,ARGV[2]
表示第二个附加参数,依此类推。
2.3、Lua练习
①、hello world Lua
②、set get练习
③、mset 练习
④、if else练习
先获取,在判断,判断相等删除,不相等返回false。C语言中用1代替true,0代表false。
(六)、版本六
在java程序中通过引入Lua脚本,来实现判断key和删除key的原子性操作。
1、修改代码
/**
* V6.0
*/
public String sale() {
String retMessage = "";
String key = "RedisLock";
String uuidValue = UUID.randomUUID() + ":" + Thread.currentThread().getId();
// redis锁
while (Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue, 30L, TimeUnit.SECONDS))) {
//暂停20毫秒后,类似CAS自旋
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
//1 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//2 判断库存是否足够
int inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//3 扣减库存
if (inventoryNumber > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余: " + inventoryNumber;
System.out.println(retMessage);
} else {
retMessage = "商品卖完了,o(╥﹏╥)o";
}
} finally {
// 判断加锁与解锁是不是同一个客户端,同一个才行,自己只能删除自己的锁,不误删他人的
// Lua脚本保证判断key和删除key的原子性
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('del',KEYS[1]) " +
"else " +
"return 0 " +
"end";
stringRedisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), List.of(key), uuidValue);
}
return retMessage + "\t" + "服务端口号:" + port;
}
2、存在问题
到此,版本六差不多已经实现了锁的前四个特性,还差一个锁的可重入性实现。
(七)、版本七
1、定义
可重入锁(递归锁),是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提,锁对象得是同一个对象),不会因为之前已经获取过还没释放而阻塞。
总结:一个线程中的多个流程可以获取同一把锁,持有这把同步锁可以再次进入。
2、可重入锁的种类
隐式锁和显示锁。
隐式锁是指通过 Java 语言的内置机制来实现同步的锁。使用时不需要手动调用获取锁和释放锁的方法,这些操作由 JVM 自动完成。synchronized关键字就是典型的隐式锁。
隐式锁:synchronized使用的锁,自动释放锁。抛出异常时,JVM 会自动释放锁,避免死锁。
显式锁是指通过 Java 代码手动调用方法来获取锁和释放锁的锁机制。java.util.concurrent.locks包下的Lock接口及其实现类(如ReentrantLock、ReentrantReadWriteLock等)就是显示锁的代表。
显示锁:Lock锁(ReentrantLock为列),需要手动调用 unlock() 方法释放锁。
3、设计可重入锁
①、synchronized锁机制
每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针。进入同步方法时(相当于执行 monitorenter),如果目标锁对象的计数器为零,那么说明它没有被其他线程所持有,Java虚拟机会将该锁对象的持有线程设置为当前线程,并且将其计数器加1。在目标锁对象的计数器不为零的情况下,如果锁对象的持有线程是当前线程,那么 Java 虚拟机可以将其计数器加1,否则需要等待,直至持有线程释放该锁。当执行monitorexit时,Java虚拟机则需将锁对象的计数器减1。计数器为零代表锁已被释放。
②、redis如何实现
早期 Redis 用 setnx 命令实现锁,能满足低并发场景。随着并发量增大,该方式难以满足业务需求。参考 synchronized 锁机制,用 Redis 的 Hash 类型实现可重入锁。
4、实操
引入工厂设计模式来实现redis分布式锁,同时还满足JUC里面AQS对Lock锁的接口规范,加上使用Lua脚本来确保原子性。
最终目录:
DistributedLockFactory类:
/**
* 通过工厂模式,实现多种锁
*/
@Component
public class DistributedLockFactory {
@Resource
private StringRedisTemplate stringRedisTemplate;
private final String uuidValue;
public DistributedLockFactory() {
this.uuidValue = UUID.randomUUID().toString();
}
public Lock getDistributedLock(String lockType, String lockName) {
// JDK14 之后的写法
return switch (lockType == null ? "null" : lockType.toLowerCase()) {
case "redis" -> new RedisDistributedLock(stringRedisTemplate, lockName, uuidValue);
case "zookeeper" -> // TODO zookeeper 版本的分布式锁实现
null;
case "mysql" -> // TODO mysql 版本的分布式锁实现
null;
default -> null;
};
}
}
RedisDistributedLock类:
public class RedisDistributedLock implements Lock {
private final StringRedisTemplate stringRedisTemplate;
private final String lockName; // KEYS[1]
private final String uuidValue; // ARGV[1]
private Long expireTime = 30L; // ARGV[2]
public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName, String uuidValue) {
this.lockName = lockName;
this.stringRedisTemplate = stringRedisTemplate;
this.uuidValue = uuidValue + ":" + Thread.currentThread().getId();
}
@Override
public void lock() {
tryLock();
}
@Override
public boolean tryLock() {
try {
//-1L 表示一个特殊的时间值,它通常被用来代表不进行等待,即尝试立即获取锁。
// 此方法会尝试立即获取锁,如果锁当前可用,则获取锁并返回 true;如果锁不可用,不会等待,而是立即返回 false。
// 如果time = 5,unit = TimeUnit.SECONDS 表示最多等待5秒,尝试在指定的时间内获取锁
//如果在指定时间内锁可用,则获取锁并返回 true;
// 如果在指定时间内锁一直不可用,超过时间后会返回 false;如果在等待过程中线程被中断,则会抛出 InterruptedException。
return tryLock(-1L, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
/**
* 加锁
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (time != -1) {
this.expireTime = unit.toSeconds(time);
}
// 如果锁不存在或已经存在,设置获取锁的次数,并设置过期时间;反之则获取锁失败
String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then " +
"redis.call('HINCRBY',KEYS[1],ARGV[1],1) " +
"redis.call('EXPIRE',KEYS[1],ARGV[2]) " +
"return 1 " +
"else " +
"return 0 " +
"end";
Boolean result = stringRedisTemplate.execute(RedisScript.of(script, Boolean.class), List.of(lockName), uuidValue, String.valueOf(expireTime));
// 重试获取锁
while (Boolean.FALSE.equals(result)) {
TimeUnit.MILLISECONDS.sleep(50);
}
return true;
}
/**
* 解锁
*/
@Override
public void unlock() {
// 如果锁不存在,返回nil;如果锁存在且锁的获取次数不等于0,就减一,直到等于0,就删除锁。
String script = "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then " +
"return nil " +
"elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " +
"return redis.call('DEL',KEYS[1])" +
"else " +
"return 1 " +
"end";
// nil =false= 0 true = 1
// == 0 解锁失败
Boolean result = stringRedisTemplate.execute(RedisScript.of(script, Boolean.class), List.of(lockName), uuidValue);
if (result == null) {
throw new RuntimeException("This lock doesn't EXIST");
}
}
// redis分布锁不涉及暂不涉及
@Override
public void lockInterruptibly() throws InterruptedException {}
// redis分布锁不涉及暂不涉及
@Override
public Condition newCondition() {return null;}
}
InventoryService类:
@Resource
private DistributedLockFactory distributedLockFactory;
/**
* V7.0
*/
public String sale() {
String retMessage = "";
// 加锁
Lock lock = distributedLockFactory.getDistributedLock("redis", "redisLock");
lock.lock();
// 业务操作
try {
//1 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//2 判断库存是否足够
int inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//3 扣减库存
if (inventoryNumber > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余: " + inventoryNumber;
System.out.println(retMessage);
this.testReEnter();
} else {
retMessage = "商品卖完了,o(╥﹏╥)o";
}
} finally {
// 解锁
lock.unlock();
}
return retMessage + "\t" + "服务端口号:" + port;
}
private void testReEnter()
{
Lock redisLock = distributedLockFactory.getDistributedLock("redis","redisLock");
redisLock.lock();
try
{
System.out.println("################测试可重入锁#######");
}finally {
redisLock.unlock();
}
}
5、测试可重入性
启动项目,使用Jmeter进行压力测试。
没有出现重复扣减,锁的重入性也ok,测试通过。
(八)、版本八
如何确保redisLock过期时间大于业务执行时间的问题,即redis分布式锁如何实现自动续期?
1、修改代码
RedisDistributedLock类:
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (time != -1) {
this.expireTime = unit.toSeconds(time);
}
// 如果锁不存在或已经存在,设置获取锁的次数,并设置过期时间;反之则获取锁失败
String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then " +
"redis.call('HINCRBY',KEYS[1],ARGV[1],1) " +
"redis.call('EXPIRE',KEYS[1],ARGV[2]) " +
"return 1 " +
"else " +
"return 0 " +
"end";
System.out.println("lockName: " + lockName + "uuidValue: " + uuidValue);
// 重试获取锁
Boolean result = stringRedisTemplate.execute(RedisScript.of(script, Boolean.class), List.of(lockName), uuidValue, String.valueOf(expireTime));
while (Boolean.FALSE.equals(result)) {
TimeUnit.MILLISECONDS.sleep(50);
}
// 如果锁获取成功后,如果在规定的过期时间内还没有完成业务操作
// 那就需要重新设置key的过期时间
this.renewExpire();
return true;
}
private final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1);
/**
* 自动续签
*/
private void renewExpire() {
String script = "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then " +
"return redis.call('EXPIRE',KEYS[1],ARGV[2]) " +
"else " +
"return 0 " +
"end";
// 单线程
// new Timer().schedule(new TimerTask() {
// @Override
// public void run() {
// // 递归调用,根据上一个定时任务执行结果来判断是否开启下一个任务
// if (Boolean.TRUE.equals(stringRedisTemplate.execute(RedisScript.of(script, Boolean.class),
// List.of(lockName), uuidValue, String.valueOf(expireTime)))) {
// renewExpire();
// }
// }
// }, (expireTime * 1000) / 3);
// 线程池,避免递归,防止栈溢出
scheduler.scheduleAtFixedRate(() -> {
if (Boolean.FALSE.equals(stringRedisTemplate.execute(RedisScript.of(script, Boolean.class),
List.of(lockName), uuidValue, String.valueOf(expireTime)))) {
// 续期失败,停止定时任务
scheduler.shutdown();
}
}, (expireTime * 1000) / 3, (expireTime * 1000) / 3, TimeUnit.MILLISECONDS);
}
InventoryService类:
public String sale() {
long b = System.currentTimeMillis();
System.out.println("开始时间=====" + b);
String retMessage = "";
// 加锁
Lock lock = distributedLockFactory.getDistributedLock("redis", "redisLock");
lock.lock();
// 业务操作
try {
//1 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//2 判断库存是否足够
int inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//3 扣减库存
if (inventoryNumber > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余: " + inventoryNumber;
System.out.println(retMessage);
// 故意模拟业务操作时间超过锁过期时间场景
try { TimeUnit.SECONDS.sleep(120); } catch (InterruptedException e) { e.printStackTrace(); }
} else {
retMessage = "商品卖完了,o(╥﹏╥)o";
}
} finally {
// 解锁
lock.unlock();
}
System.out.println("结束时间=====" + System.currentTimeMillis());
System.out.println("耗时====" + (System.currentTimeMillis() - b));
return retMessage + "\t" + "服务端口号:" + port;
}
2、校验测试
当过期时间超过三分之一后,业务还没结束,锁的过期时间会自动续上,直到业务完成,才会释放锁,自动续期测试通过。
五、 总结
起初,在单机环境下,使用 Java 的 synchronized 关键字实现锁机制能够稳定运行,它可以有效保证同一 JVM 内线程对共享资源的互斥访问。然而,当系统采用 Nginx 进行分布式部署时,synchronized 锁的局限性便暴露无遗,由于其作用范围仅限于单个 JVM 进程内,无法实现跨 JVM 的线程同步,难以满足分布式系统对数据一致性和并发控制的严格要求。
鉴于此,将目光转向 Redis,采用其中的 setnx 命令来实现分布式锁。通过一系列深入的研究与实践操作,逐步为该锁赋予了高可用性、防死锁以及避免误抢等重要特性。同时为了确保锁操作的原子性,避免在高并发场景下出现竞态条件,还引入了 Lua 脚本。Lua 脚本在 Redis 中可以原子性地执行多个命令,从而保证锁的获取和释放操作的一致性。
不过,随着对锁功能需求的进一步深入,发现基于 setnx 命令实现的锁存在一个显著的缺陷 —— 不支持可重入性。在某些业务场景中,一个线程在持有锁的情况下可能需要再次获取同一把锁,如果锁不具备可重入性,就会导致线程被阻塞,进而引发死锁问题。为了解决这一难题,又摒弃了原有的 setnx 实现方案,转而采用 Redis 的哈希(Hash)数据结构。利用 Redis 哈希的特性,能够方便地记录锁的持有者以及重入次数,从而实现了锁的可重入性。
为了防止锁在业务逻辑执行过程中因过期而提前释放,又借助 Java 并发工具包(JUC)中的调度任务来实现锁的自动续期功能。通过定时检查锁的状态并在必要时延长其过期时间,确保在业务操作完成之前锁不会意外释放。
经过上述学习与实践后,最终写出一款符合Java Lock 接口规范的分布式锁。这款锁不仅具备锁该有的特性,而且在性能和稳定性方面也有着较高的水平,能够很好地适应分布式系统复杂多变的应用场景。
ps:努力到底,让持续学习成为贯穿一生的坚守。学习笔记持续更新中。。。。