当前位置: 首页 > article >正文

Redisson实现分布式锁

原文链接,对本文进行了总结记录

1.Redisson入门

概念

Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格。通俗来将,就是在 Redis 基础上实现的分布式工具集合。点击访问项目地址。

引入依赖
<!--redisson-->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.3</version>
</dependency>
添加配置

redisson 支持单点、主从、哨兵、集群等部署方式:

/**
 * redisson 配置
 */
@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient() {
        //单点
        Config config = new Config();
        //地址及密码
        String redisUrl = String.format("redis://%s:%s", redisProperties.getHost() + "", redisProperties.getPort() + "");
        config.useSingleServer().setAddress(redisUrl).setPassword(redisProperties.getPassword());
        // 选择数据库0
        config.useSingleServer().setDatabase(0);
        return (Redisson)Redisson.create(config);

        //主从
//        Config config = new Config();
//        config.useMasterSlaveServers()
//            .setMasterAddress("redis://127.0.0.1:6379").setPassword("123456")
//            .addSlaveAddress("redis://127.0.0.1:6389")
//            .addSlaveAddress("redis://127.0.0.1:6399");
//        return Redisson.create(config);

        //哨兵
//        Config config = new Config();
//        config.useSentinelServers()
//            .setMasterName("myMaster")
//            .addSentinelAddress("redis://127.0.0.1:6379", "redis://127.0.0.1:6389")
//            .addSentinelAddress("redis://127.0.0.1:6399");
//        return Redisson.create(config);

        //集群
//        Config config = new Config();
//        config.useClusterServers()
//                //cluster state scan interval in milliseconds
//            .setScanInterval(2000)
//            .addNodeAddress("redis://127.0.0.1:6379", "redis://127.0.0.1:6389")
//            .addNodeAddress("redis://127.0.0.1:6399");
//        return Redisson.create(config);
    }
}
实现分布式锁
@Autowired
private RedissonClient redissonClient;

@RequestMapping("/test")
public  void test() throws InterruptedException {
    //获取锁对象
    RLock lock = redissonClient.getLock("lock");
    //尝试加锁,参数依次为:获取锁的最大等待时间(期间会重试)、锁自动释放时间、时间单位
    //注意:如果指定锁自动释放时间,不管业务有没有执行完,锁都不会自动延期,即没有 watch dog 机制(业务没执行完,续费30s)。
    boolean isLock = lock.tryLock(1, 2, TimeUnit.SECONDS);
    try {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        if (isLock) {
            System.out.println(format.format(System.currentTimeMillis()) + "获取分布式锁成功");
            Thread.sleep(1000);
            System.out.println(format.format(System.currentTimeMillis()) + "业务完成");
        } else {
            System.out.println(format.format(System.currentTimeMillis()) + "获取分布式锁失败");
        }
    } catch (Exception e) {
        throw new RuntimeException("业务异常");
    } finally {
        // 锁的value是当前线程id且锁还是锁定状态
        if (lock.isHeldByCurrentThread() && lock.isLocked()) {
            // 释放锁
            System.out.println("解锁");
            lock.unlock();
        }
    }
}

分布式锁的使用分成以下 3 步:

  1. 获取锁:根据唯一的 key 去 redis 获取锁。
  2. 加锁:拿到锁后在指定的等待时间内不断尝试对其加锁,超过等待时间则加锁失败。
  3. 解锁:分成两种情形:
    • 第一如果在加锁的时候指定了自动释放时间,那么在此时间范围内业务提前完成的话就在 finally 手动释放锁,而如果自动释放时间到了,业务没有完成也会自动释放锁,所以指定自动释放时间需要做非常仔细的考量;
    • 第二就是没有指定自动释放时间,由于 redisson 有 watch dog (看门狗)机制,watch dog 默认的 releaseTime 是 30s,给锁加上 30s 的自动释放时间,并且每隔 releaseTime / 3 即 10 s 去检查业务是否完成,如果没有完成重置 releaseTime 为 30 s, 即锁的续约,所以一个业务严重阻塞的话会造成系统资源的极大浪费。所以分布式锁是没有完美的解决方案的。

使用jmeter测试:

3 个线程同时访问,控制台打印结果如下:

//第一个线程加锁成功
2023-09-17 15:33:19获取分布式锁成功   
2023-09-17 15:33:20业务完成
//第一个线程释放锁     
解锁
//第二个线程加锁成功    
2023-09-17 15:33:20获取分布式锁成功
//第三个线程加锁失败,第二个线程已占有锁且已过等待时间 20 - 19 = 1    
2023-09-17 15:33:20获取分布式锁失败 
2023-09-17 15:33:21业务完成
//第二个线程释放锁    
解锁

首先第 1 个线程在 19 - 20 秒的时间范围内加锁,2、3 线程处于阻塞状态,

在 20 秒 1 线程释放锁后 2 线程刚好在等待时间的临界点加锁成功,3 线程就没那么好运了,在临界点抢不过 2 线程,加锁失败。

21 秒 2 线程完成业务释放锁。

从测试结果可以看到Redisson分布式锁的特点

  1. 独占性:1 线程加锁成功后是 2、3 线程处于阻塞状态无法加锁。
  2. 超时:指定 2 秒的自动释放时间,由于 key 存放在 redis,即使服务宕机,redis 也会自动删除 key 。
  3. 高可用:1 线程和 2 线程加锁成功后能够良好的解锁(这里配置了单点,真正的高可用一般需要哨兵或集群)
Redisson可重入

可重入即同一个线程能否多次获得同一个锁

测试一下便知

/**
     * 重入方法1
     *
     * @throws InterruptedException
     */
@RequestMapping("/reentrant")
public void reentrant1() throws InterruptedException {
    //获取锁
    RLock lock = redissonClient.getLock("reentrant");
    //加锁,参数:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
    boolean isLock = lock.tryLock(10, 25, TimeUnit.SECONDS);
    try {
        if (isLock) {
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            System.out.println(format.format(System.currentTimeMillis()) + "获取分布式锁1成功");
            Thread.sleep(15000);
            //调用方法2
            reentrant2();
            System.out.println(format.format(System.currentTimeMillis()) + "业务1完成");
        }
    } catch (Exception e) {
        throw new RuntimeException("业务异常");
    } finally {
        //当前线程未解锁
        if (lock.isHeldByCurrentThread() && lock.isLocked()) {
            //释放锁
            System.out.println("分布式锁1解锁");
            lock.unlock();
        }
    }
}

/**
     * 重入方法2
     *
     * @throws InterruptedException
     */
public void reentrant2() throws InterruptedException {
    //获取锁
    RLock lock = redissonClient.getLock("reentrant");
    //加锁,参数:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
    boolean isLock = lock.tryLock(5, 25, TimeUnit.SECONDS);
    try {
        if (isLock) {
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            System.out.println(format.format(System.currentTimeMillis()) + "获取分布式锁2成功");
            Thread.sleep(10000);
            System.out.println(format.format(System.currentTimeMillis()) + "业务2完成");
        }
    } catch (Exception e) {
        throw new RuntimeException("业务异常");
    } finally {
        //当前线程未解锁
        if (lock.isHeldByCurrentThread() && lock.isLocked()) {
            //释放锁
            System.out.println("分布式锁2解锁");
            lock.unlock();
        }
    }
}

这里在方法 1 中调用方法 2,并且都尝试获取同一把锁。

测试结果如下:

//方法1加锁
2023-09-17 17:16:01获取分布式锁1成功
//方法2获取同一把锁并加锁    
2023-09-17 17:16:16获取分布式锁2成功
2023-09-17 17:16:26业务2完成
//方法2释放锁    
分布式锁2解锁 
2023-09-17 17:16:26业务1完成
//方法1释放锁       
分布式锁1解锁

根据上面的打印结果,能够推测出 Redisson 是拥有可重入的特性的!!!

方法 1 加锁时, value 为 1
在这里插入图片描述

方法 2 再次加锁,value 为 2

在这里插入图片描述

释放锁的过程则相反,方法 2 释放锁时 value - 1, 方法 1 再次释放锁 value = 0,直接删除锁

具体流程

Redisson 实现可重入采用 hash 的结构,在 key 的位置记录锁的名称,field 的位置记录线程id, value 的位置则记录锁的重入次数。

加锁时,如果线程标识是自己,则锁的重入次数加 1,并重置锁的有效期。

释放锁时,重入次数减 1,并判断是否为 0,如果为 0 直接删除,否则重置锁的有效期。

感觉和ReentrantLock的加锁解锁的思想相似,都是根据线程去做重入。

2.源码解读—RedissonLock.lock()方法

源码解读

lock方法

public void lock() {
        try {
            // lock方法默认leaseTime时间为-1(每隔10秒检查业务是否执行完成,如果没有完成会被赋值30秒,即看门狗机制),waitTime没有表示无限等待
            lock(-1, null, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
    }
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
		// 获取当前线程id
        long threadId = Thread.currentThread().getId();
        // 尝试获取锁(当返回为null时表示获取锁成功),否则返回锁的剩余时间,核心逻辑见下面
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return; // 成功获取锁,返回
        }

	    // 订阅释放锁的消息(如果有线程释放了锁就会发一个消息过来),释放锁时的 publish 命令就是发布消息通知,subscribe 订阅的就是它发布的通知。
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }
				
        try {
		        // 循环获取锁,直到获取成功
            while (true) {
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break; // 获取锁成功,返回
                }

				// 获取锁失败,锁的剩余有效时间大于0
                // waiting for message
                if (ttl >= 0) {
                    try {
		                // 此段代码相当一TimeUnit.SECONDS.sleep(ttl);用于减缓下次获取的时间
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        future.getNow().getLatch().acquire();
                    } else {
                        future.getNow().getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
		        // 获取锁成功,取消订阅
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

tryAcquire方法

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }

tryAcquireAsync方法

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
				// leaseTime锁释放时间,通过上面路径释放时间为-1,所以不会走这里
        if (leaseTime != -1) {
            return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        // 当锁释放时间为-1时,默认续期30秒,不会永久不过期  -->核心逻辑见下面
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                                                commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                                                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        // 接口回调
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

			// 如果剩余有效期为null即获取锁成功,执行scheduleExpirationRenewal方法流程
            if (ttlRemaining == null) {
		         // 获取锁成功才会执行看门狗逻辑
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }

tryLockInnerAsync方法

尝试获取锁,没有锁就创建一个hsah结构的锁,设置过期时间;如果有锁就判断,hash的key值是不是当前线程id,是的话就给value加一(锁重入实现),设置过期时间;否则返回锁的过期剩余时间

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
				        // 判断锁是否存在,0表示不存在
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + //hincrby 设置hash并指定属性为线程id的值为1
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " + //设置过期时间
                        "return nil; " + //返回null,表示获取锁成功
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 如果锁已经存在,判断threadId是否是自己,1表示存在
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // hincrby 设置hash并指定属性为线程id的值+1, 锁的可重入逻辑实现
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " + // 设置过期时间
                        "return nil; " + //返回null,表示获取锁成功
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);", // 返回key的过期剩余时间,表示获取锁失败
                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

scheduleExpirationRenewal方法,看门狗逻辑实现

watch dog 是指加锁时没有指定锁的自动释放时间时,则默认给锁添加一个 30s 的自动释放时间,并且每隔 (30s / 3) 即 10s 去进行锁的续约,即每隔 10s 锁的自动释放时间会被重置为 30s, 直至业务完成。

// 修饰符为static final,即RedissonLock 类的所有实例都可以看到这个map
// 一个 Lock 类会创建出很多锁的实例,每一个锁的实例都有自己的名字(entryName), 在 map 中有唯一的 key 和 唯一的 entry。
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();

private void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        // 把新建的entry放进map里面,key为String 类型(id + 锁名称)
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId); // 已经设置过业务线程
        } else {
		        // 第一次调用
            entry.addThreadId(threadId);
            renewExpiration();
        }
    }

第一次调用时,entry 不存在,所以使用 putIfAbsent 可以放进去;多次调用时,entry 是存在的,putIfAbsent 就会失效,返回旧的 entry,因此就能够保证不管锁被重入几次,拿到的永远是同一个 entry。所以,这里的 map 的作用就是保证同一个锁拿到的永远是同一个 entry

然后将线程ID放入 entry,第一次调用时还会执行 renewExpiration (更新有效期)方法。

renewExpiration方法

看门狗核心逻辑

// 看门狗续期时间,单位毫秒
private long lockWatchdogTimeout = 30 * 1000;

private void renewExpiration() {
				// 先从 map 中得到 entry
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        // 创建一个定时任务,每隔(internalLockLeaseTime / 3)10秒执行一次
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
		            // 取出entry
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                // entry为null,说明被移除了,不再执行任务(**cancelExpirationRenewal方法会移除**)
                if (ent == null) {
                    return;
                }
                // 取出entry中的当前线程id
                Long threadId = ent.getFirstThreadId();
                // 线程id为null,说明也被移除了,不再执行任务(**cancelExpirationRenewal方法会移除**)
                if (threadId == null) {
                    return;
                }
                
                // 给当前锁续期30s
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    
                    if (res) {
                        // reschedule itself
                        // 如果续期成功,则递归调用当前方法再次执行续期逻辑,所以锁的有效期就会不断进行重置,永不过期(初始默认为30s,10s后又设置为30s ....)。
                        renewExpiration();
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        // 最后把任务放到entry中,因此entry中存放了两个数据:一个是当前线程ID,一个是定时任务。
        // 从这里就能看出为啥前面第一次调用时会执行renewExpiration,而后面就不会调用此方法,因为 oldEntry 中已经有了定时任务会递归执行renewExpiration方法,只需要把线程 ID 加进去即可。
        ee.setTimeout(task);
    }

renewExpirationAsync方法

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
				        // 判断当前线程是否持有锁
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
				                // 如果持有,则续上过期时间30s,返回1
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        // 否则返回0
                        "return 0;",
                Collections.singletonList(getName()),
                internalLockLeaseTime, getLockName(threadId));
    }

由上面分析可以看出锁的有效期被无限延续,那什么时候这个任务才会被取消呢?自然是在 unlock 的时候,在释放锁的时候会执行 cancelExpirationRenewal(取消更新任务) 方法

看门狗作用

使用Redis原生api实现的分布式锁,得自己设置一个锁过期时间,但是这个时间难以估算,万一任务还没执行完成,锁就被超时释放掉了,然后别的线程就可以去获取锁执行代码,造成线程安全问题;

所以有了看门狗机制,只要持有锁的线程一直在执行任务且未主动释放锁,那么锁的过期时间就会不断地被刷新,从而避免了在任务未完成前锁就因超时而自动释放。

存在的bug

3.源码解读—RedissonLock.unlock()方法

unlock方法

    public void unlock() {
        try {
		        // 解锁流程,传入当前线程id
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
    }

unlockAsync方法

public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<Void>();
        // 具体解锁逻辑,使用lua脚本实现
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        future.onComplete((opStatus, e) -> {
		        // 解锁成功才会执行取消看门狗续期任务逻辑
            cancelExpirationRenewal(threadId);

						// 解锁有异常的处理
            if (e != null) {
                result.tryFailure(e);
                return;
            }

						// 当前线程不持有锁,执行unlock方法的处理
            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }
						// unlockInnerAsync方法成功的处理
            result.trySuccess(null);
        });

        return result;
    }

unlockInnerAsync方法

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
				        // 判断锁是否自己持有
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
				                // 不持有,直接返回
                        "return nil;" +
                        "end; " +
                        // 是自己的锁,重入次数 - 1
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        // 可重入次数为否为 0
                        "if (counter > 0) then " +
                        // 大于0,不能释放锁,重置有效期,默认30s
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        // 等于0,删除锁
                        "redis.call('del', KEYS[1]); " +
                        // 发布一个解锁消息,其他线程监听到了这个消息,就会执行后续逻辑
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        // 返回1表示解锁成功
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

cancelExpirationRenewal方法
取消看门狗定时任务逻辑

void cancelExpirationRenewal(Long threadId) {
		// 回去当前锁对应的entry
        ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (task == null) {
            return;
        }
        
        // 移除entry中的线程id
        if (threadId != null) {
            task.removeThreadId(threadId);
        }

				
        if (threadId == null || task.hasNoThreads()) {
		     // 取消当前定时任务执行,这样就不会再次给锁续上默认的30s了
            Timeout timeout = task.getTimeout();
            if (timeout != null) {
                timeout.cancel();
            }
            // 最后移除entry
            EXPIRATION_RENEWAL_MAP.remove(getEntryName());
        }
    }

4.源码解读—RedissonLock.tryLock(long waitTime, TimeUnit unit)方法

public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
        return tryLock(waitTime, -1, unit);
 }
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
				// 获取锁的等待时间
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        // 前面分析过此方法不再分析
        Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        
        // 获取锁失败流程
        // 如果获取锁的时间超过了等待时间,则返回false,表示在等待时间内没有获取到锁
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        // 重新获取当前时间,前面的time已经减过了        
        current = System.currentTimeMillis();
        // 如果等待时间还有剩余,则订阅锁释放消息
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        // 如果在锁等待时间内没有收到订阅消息,则返回false,表示在等待时间内没有获取到锁
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(waitTime, unit, threadId);
            return false;
        }

        try {
		      // 此时接收到了锁释放消息,当前线程再去计算锁的剩余等待时间
            time -= System.currentTimeMillis() - current;
            // 如果剩余等待时间小于0,则返回false,表示在等待时间内没有获取到锁
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        
		        // 否则,还有剩余等待时间,则循环尝试获取锁
            while (true) {
                long currentTime = System.currentTimeMillis();
                // 尝试获取锁
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                // 获取锁成功,返回true
                // lock acquired
                if (ttl == null) {
                    return true;
                }

				// 获取锁失败,再次计算锁的剩余等待时间,小于0返回false
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    // 此段代码相当一TimeUnit.SECONDS.sleep(ttl);用于减缓下次获取的时间
                    subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }
								
								// 再次计算锁剩余等待时间,小于0返回false
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

从上面逻辑可以看到:
tryLock 方法的第一个参数就是最长等待时长,获取锁失败后不会立即返回,而是在等待时间内不断进行尝试。若时间结束还没有获取成功,才会返回 false

5.重试与watchdog流程图

在这里插入图片描述


http://www.kler.cn/a/304928.html

相关文章:

  • 机器学习中的凸函数和梯度下降法
  • 业务幂等性技术架构体系之消息幂等深入剖析
  • STM32 FreeRTOS时间片调度---FreeRTOS任务相关API函数---FreeRTOS时间管理
  • Java 泛型及其优势
  • Facebook 隐私风波:互联网时代数据安全警钟
  • Vue 3前端与Python(Django)后端接口简单示例
  • conda根据配置文件自动切换环境-----模仿 rvm 的 .ruby-version机制
  • 新能源汽车安全问题如何解决?细看“保护罩”连接器的守护使命
  • 【华为OD流程】性格测试选项+注意事项
  • 机器学习和深度学习区别
  • Python3 SMTP发送邮件
  • SOMEIP_ETS_108: SD_Deregister_from_Eventgroup
  • 什么是 Grafana?
  • 【C++题解】1406. 石头剪刀布?
  • SQL server 日常运维命令
  • 【初阶数据结构】详解树和二叉树(一) - 预备知识(我真的很想进步)
  • TiDB 数据库核心原理与架构_Lesson 01 TiDB 数据库架构概述课程整理
  • 基于深度学习的农作物病害检测
  • Linux 调试器 GDB 使用指南
  • AI论文精读笔记-Generative Adversarial Nets(GAN)
  • ZYNQ LWIP(RAW API) TCP函数学习
  • css grid布局属性详解
  • Python Tkinter小程序
  • 每日学习一个数据结构-B+树
  • React中九大常用Hooks总结
  • golang学习笔记22——golang微服务中数据竞争问题及解决方案