分布式锁,rediss,redisson,看门狗,可重入,可重试
目录
- 什么是分布式锁
- redisson的分布式锁的特点
- 可重入的实现原理
- 可重试的实现原理
- 超时续期(看门狗机制)
什么是分布式锁
分布式锁是一种锁机制,用户解决同一个共享资源被多个线程并发访问的问题,使用分布式锁可以避免并发安全,数据不一致的情况。
redisson的分布式锁的特点
- 可重入:同一个线程可以多次获取同一把锁
- 可重试:如果设置了等待时间,可以在这一段时间尝试从新获取锁
- 高可用:程序不易崩溃,时时刻刻都保证较高的可用性
- 自动续期:看门狗机制,如果没有设置过期时间,会一直续期,知道锁被释放
- 互斥性:同一把锁只有一个线程可以获取
可重入的实现原理
redisson的分布式锁,采用了hash结构来存储数据,大key表示锁的名称,小key表示被哪一个锁持有,小key的value记录着这一个锁被同一个线程获取了几次。
获取锁的大概的流程是这样的:先判断先要获取的锁是否存在,如果不存 在,则获取锁,设置超时时间,返回,如果存在,判断锁是不是自己的,如果不是,则获取失败,如果是,增加锁的持有次数,重置超时时间,返回。
获取锁流程图:
释放锁的流程:判断锁是不是自己的,如果不是,说明锁已经被释放,如果是,判断现在还有多少个地方使用到,如果只有自己,直接释放,如果还有别的地方使用到,减少持有的次数,重置过期时间,通知其他线程已经释放锁的消息。
释放锁流程图:
rediss源码的lua脚本:
获取锁:
if (redis.call('exists', KEYS[1]) == 0) then -- 判断锁是否存在,进入if表示不存在
redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 锁的持有数量加一
redis.call('pexpire', KEYS[1], ARGV[1]); -- 设置过期时间
return nil; -- 返回获取成功
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then -- 锁已经存在,判断是不是自己的,进入if表示是自己的
redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 锁的持有数量加一
redis.call('pexpire', KEYS[1], ARGV[1]); -- 重置过期时间
return nil; -- 返回获取成功
end;
return redis.call('pttl', KEYS[1]); -- 锁存在并且不是自己的,返回获取失败
释放锁:
local key = KEYS[1] -- "myLock" 锁的名称
local pubsub_key = KEYS[2] -- "lock_channel" 发布订阅的通道名称
local release_message = ARGV[1] -- "lock_released" 发布的消息
local expire_time = ARGV[2] -- 30000 锁的过期时间
local threadId = ARGV[3] -- "thread123" 持有锁的线程
if (redis.call('hexists', key, threadId) == 0) then -- 判定要释放锁是否存在
return nil; -- 不存在表示无需释放返回nil
end;
local counter = redis.call('hincrby', key, threadId, -1); -- 判断持有数量加一之后剩余数量是否大于0
if (counter > 0) then
redis.call('pexpire', key, expire_time); -- 大于0表示还有其他地方是由锁
return 0; -- 返回0表示为完全释放
else
redis.call('del', key); -- 小于0表示完全释放,删除锁
redis.call('publish', pubsub_key, release_message); -- 同时订阅释放锁消息的线程
return 1; -- 返回1表示完全释放
end; -- 默认返回值,一般不会使用到
可重试的实现原理
大概流程是这样子的:先尝试获取锁,如果获取成功直接返回,如果获取失败,尝试重新获取,但是不会马上获取,先判断等待是否超时,如果超时返回失败,如果未超时,订阅释放锁的消息,如果在等待的时间还是订阅不到消息,则获取失败,取消订阅,返回失败,如果在等待期间获取到消息,判断是否超时,如果超时,尝试获取锁,获取成功返回,获取失败,继续订阅消息,判断是否超时,尝试重新获取,如此反复,知道获取成功或则超时。
流程图:
redisson源码
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 尝试获取锁,方法返回两种结果,如果是null表示获取成功,如果是ttl(锁剩余时间),表示获取失败
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// 如果成功,返回true
if (ttl == null) {
return true;
}
// 如果失败,尝试再次获取
// 如果等待时间已过,返回false
time -= System.currentTimeMillis() - current;
if (time <= 0) {
// 执行获取锁失败后的逻辑
acquireFailed(waitTime, unit, threadId);
return false;
}
// 不会立刻尝试获取,刚刚获取不到,立马获取大概率获取不到
current = System.currentTimeMillis();
// 调用异步方法,订阅消息,如果有锁释放,尝试获取
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 调用await等待异步方法,两种结果,一种是等不到订阅信息,取消等于放回false,一种是收到订阅信息,继续执行
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
// 超时,订阅不到消息,取消订阅,返回false
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
// 订阅到了消息
try {
// 判断是否超时
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 循环尝试获取
while (true) {
long currentTime = System.currentTimeMillis();
// 尝试获取
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
// 成功获取
return true;
}
// 获取失败,判断是否超时
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 阻塞等待订阅信息,两种结果,一种超时放回false,一种接收到,返回true,
// 但是这里的放回值没有获取,只是在这里阻塞一段时间
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
// 如果锁过期时间小于等待时间,以过期时间为标准
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
// 如果锁过期时间大约等于等待时间,以等待为标准
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 判断是否超时
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
// 取消订阅
unsubscribe(subscribeFuture, threadId);
}
}
超时续期(看门狗机制)
如果一个锁没有设置过期时间,则在释放锁之前需要一直存在,如果不设置过期时间,可能在释放锁之前,服务宕机了,到时锁没有得到释放,所以才有设置过期时间,之后一直续期,即使服务宕机了,也会自己释放。
大概流程是这样子的,如果需要使用续期机制,在线程第一次获取锁的时候,开启看门狗,也就是定时任务,默认的过期时间是30秒,定时任务每隔10秒重置为30秒,再继续开启一个定时任务。在锁释放的时候,需要删除锁,关闭定时任务。
流程图:
rediss源码
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) { // 不需要看门狗走这里
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 需要开门狗走下面
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(
waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),// 默认过期时间30秒
TimeUnit.MILLISECONDS, threadId,
RedisCommands.EVAL_LONG
);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId); // 开启看门狗
}
});
return ttlRemainingFuture;
}
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry(); // 里面可以存放一个定时任务和锁
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); // 如果map里面有这个锁,返回null,没有则返回ExpirationEntry
if (oldEntry != null) {
oldEntry.addThreadId(threadId); // 已经有过,线程持有次数加一
} else {
entry.addThreadId(threadId); // 线程持有次数加一
renewExpiration(); // 开启看门狗
}
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId); // 重置过期时间
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration(); // 在开启一个定时任务
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}