从tryLock()源码来出发,解析Redisson的重试机制和看门狗机制
参考:bilibili
源码基于 redisson-3.27.2
tryLock() 只尝试获取一次
tryLock(waitTime,leaseTime..):
- 先尝试获取一次
- 失败,订阅释放锁的消息
- 在waitTime时间内得到订阅消息,在do..while循环内重复尝试获取锁
- 在这些过程中,是一直不断的在判断waitTime是否到期
---------------------------------------------------------------------
- 获取到锁
- leaseTime = -1,执行看门狗机制
- 创建一个ExpiratinEntry对象,存储了获取锁的线程ID
- 将该ExpiratinEntry对象存放到EXPIRATION_RENEWAL_MAP这个静态的concurrentHashMap中
- 开启递归延迟任务自动续约,每10s 续约 30s
---------------------------------------------------------------------
unlockAsync0(long threadId):
- 解决重入的问题,每释放一次锁,threadIds-Map 中该threadId 的 value--
- 判断是否是最后一次释放锁
- 取消ExpirationEntry中的Timeout定时任务
- 将该ExpirationEntry 从 EXPIRATION_RENEWAL_MAP中移除
tryLock
public boolean tryLock() {
return (Boolean)this.get(thi.tryLockAsync());
}
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 转换为毫秒
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 调用tryAcquire 获得ttl
// 若ttl 为null 代表获取锁成功
// tryAcquire 最终调用的是 tryAcquireAsync() 可见该方法详解
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
// 获取锁成功
if (ttl == null) {
return true;
} else {
// waitTime - 获取锁消耗的时间
time -= System.currentTimeMillis() - current;
// 消耗时间 > 等待时间 ,返回获取失败
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
// 还存在等待时间,通过subscribe()订阅别人释放锁的信号
// 在unlockInnerAsync()方法中,可以看到释放锁后发布消息的代码
current = System.currentTimeMillis();
CompletableFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
try {
// 订阅任务在此等待,但并不是无限等待,而是在剩余等待时间内等待
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException var21) {
if (!subscribeFuture.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + time + "ms. Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
// 取消订阅
this.unsubscribe(res, threadId);
}
});
}
this.acquireFailed(waitTime, unit, threadId);
return false;
} catch (ExecutionException var22) {
this.acquireFailed(waitTime, unit, threadId);
return false;
}
// 等待时间内等到了释放锁的消息
// 在do while 循环内,在还有剩余时间下 不断的尝试获取锁
boolean var14;
try {
time -= System.currentTimeMillis() - current;
if (time > 0L) {
boolean var16;
do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
var16 = true;
return var16;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
this.acquireFailed(waitTime, unit, threadId);
var14 = false;
} finally {
this.unsubscribe((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture), threadId);
}
return var14;
}
}
}
tryAcquireAsync-获取锁 && 看门狗机制的方法
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture ttlRemainingFuture;
// 通过leaseTime 判断是否走看门狗机制,默认不传leaseTime,使用其默认值-1,也就是走else路径
if (leaseTime > 0L) {
ttlRemainingFuture = this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// tryLockInnerAsync()是真正执行获取锁的方法,代码见下
ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
// 看门狗机制的实现
CompletionStage<Long> s = this.handleNoSync(threadId, ttlRemainingFuture);
RFuture<Long> ttlRemainingFuture = new CompletableFutureWrapper(s);
CompletionStage<Long> f = ttlRemainingFuture.thenApply((ttlRemaining) -> {
// ttlRemaining == null 意味着获取锁成功
if (ttlRemaining == null) {
if (leaseTime > 0L) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// leaseTime = -1 调用scheduleExpirationRenewal()开启看门狗机制
this.scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper(f);
}
tryLockInnerAsync-lua脚本执行获取锁的方法
// 该方法就是执行lua的最终方法,获取锁成功,返回null,获取锁失败,返回锁的剩余有效时间
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return this.commandExecutor.syncedEval(this.getRawName(), LongCodec.INSTANCE, command,
"if ((redis.call('exists', KEYS[1]) == 0)
or (redis.call('hexists', KEYS[1], ARGV[2]) == 1))
then redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil; end;
return redis.call('pttl', KEYS[1]);",
Collections.singletonList(this.getRawName()),
new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
}
scheduleExpirationRenewal-看门狗机制实现的方法
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
// putIfAbsent(),存在,返回旧的ExpirationEntry ,不存在,返回null
// EXPIRATION_RENEWAL_MAP,类的静态变量,一个concurrentHash类型,存储了所有获得锁的实例
ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {
// 该方法是记录该线程ID出现的次数,即可以理解为,该线程重入锁的次数
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
// 开启续约
this.renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
this.cancelExpirationRenewal(threadId);
}
}
}
}
renewExpiration-看门狗续约的实现方法
private void renewExpiration() {
ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
Timeout task = this.getServiceManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = (ExpirationEntry)RedissonBaseLock.EXPIRATION_RENEWAL_MAP.get(RedissonBaseLock.this.getEntryName());
if (ent != null) {
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
// renewExpirationAsync() 刷新锁的有效期
CompletionStage<Boolean> future = RedissonBaseLock.this.renewExpirationAsync(threadId);
// whenComplete 是 CompletionStage 的一个方法,用于在异步操作完成后执行一个回调函数
// res是异步操作的结果,Boolean类型,e是操作过程中抛出的异常
future.whenComplete((res, e) -> {
if (e != null) {
RedissonBaseLock.log.error("Can't update lock {} expiration", RedissonBaseLock.this.getRawName(), e);
RedissonBaseLock.EXPIRATION_RENEWAL_MAP.remove(RedissonBaseLock.this.getEntryName());
} else {
if (res) {
// 递归调用自己
RedissonBaseLock.this.renewExpiration();
} else {
RedissonBaseLock.this.cancelExpirationRenewal((Long)null);
}
}
});
}
}
}
// 该任务延时 internalLockLeaseTime / 3L 执行,即10s后执行
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}
renewExpirationAsync-续约的lua脚本方法
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;",
Collections.singletonList(this.getRawName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
unlockAsync0-释放锁方法
private RFuture<Void> unlockAsync0(long threadId) {
//调用释放锁的lua方法
CompletionStage<Boolean> future = this.unlockInnerAsync(threadId);
CompletionStage<Void> f = future.handle((opStatus, e) -> {
// 取消看门狗续约的方法
this.cancelExpirationRenewal(threadId);
if (e != null) {
if (e instanceof CompletionException) {
throw (CompletionException)e;
} else {
throw new CompletionException(e);
}
} else if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
throw new CompletionException(cause);
} else {
return null;
}
});
return new CompletableFutureWrapper(f);
}
unlockInnerAsync-释放锁的lua脚本方法
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0)
then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else redis.call('del', KEYS[1]);
// 发布释放锁的消息
redis.call('publish', KEYS[2], ARGV[1]);
return 1; end;
return nil;",
Arrays.asList(this.getRawName(), this.getChannelName()), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});
}
cancelExpirationRenewal-取消看门狗续约的方法
protected void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (task != null) {
if (threadId != null) {
// 将threadIds Map中对应的threadID-key 的 value值 -1
task.removeThreadId(threadId);
}
// 如果是最后一次解锁 || threadIdS Map 直接为空 意味着是彻底释放掉锁了,在这取消看门狗的续约
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
// 取消定时任务
timeout.cancel();
}
// EXPIRATION_RENEWAL_MAP中移除该ExpirationEntry
EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
}
}
}