当前位置: 首页 > article >正文

Redisson分布式锁的源码解读

之前秒杀项目中就用到了这个 Redisson 分布式锁 👇,这篇就一起来看看源码吧!

图片

tryLock 加锁 流程

图片

// RedissonLock.java
@Override
public boolean tryLock() {
    return get(tryLockAsync());
}

@Override
public RFuture<Boolean> tryLockAsync() {
    return tryLockAsync(Thread.currentThread().getId());
}

@Override
public RFuture<Boolean> tryLockAsync(long threadId) {
    return tryAcquireOnceAsync(-1, -1, null, threadId);
}

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Boolean> acquiredFuture;
    // 续租时间:锁的过期时间(没有设置的话就用默认的 internalLockLeaseTime 看门狗时间)
    if (leaseTime > 0) {
        acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    } else {
        acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                           TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    }

    CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {
        // lock acquired
        if (acquired) {
            if (leaseTime > 0) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 没配置过期时间就执行这里
                scheduleExpirationRenewal(threadId);
            }
        }
        return acquired;
    });
    return new CompletableFutureWrapper<>(f);
}

代码很长,主要看 tryLockInnerAsyncscheduleExpirationRenewal 方法。

前置知识

图片

图片

// EVAL 命令,用于在 Redis 服务器端执行 Lua 脚本。
RedisStrictCommand<Boolean> EVAL_NULL_BOOLEAN = new RedisStrictCommand<Boolean>("EVAL", new BooleanNullReplayConvertor());

// BooleanNullReplayConvertor 判断是不是 NULL。
public class BooleanNullReplayConvertor implements Convertor<Boolean> {
    @Override
    public Boolean convert(Object obj) {        return obj == null;     }
}

tryLockInnerAsync

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    // getRawName 即 锁的名称
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                          // 锁不存在,添加 hash 数据,可重入次数加一,毫秒级别过期时间,返回 null
                          "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          // 锁存在,可重入次数加一,毫秒级别过期时间,返回 null
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          // 锁被别人占有, 返回毫秒级别过期时间
                          "return redis.call('pttl', KEYS[1]);",
                          Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

ARGV[1] 过期时间

ARGV[2] 即 getLockName(threadId) ,这里是 redisson 客户端id + 这个线程 ID , 如下 👇

图片

scheduleExpirationRenewal (看门狗机制)

上面加锁完,就来到这段代码。

没有设置过期时间的话,默认给你设置 30 s 过期,并每隔 10s 自动续期,确保锁不会在使用过程中过期。

同时,防止客户端宕机,留下死锁。

图片

// RedissonBaseLock.java

protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        try {
            // 看这里 
            renewExpiration();
        } finally {
            if (Thread.currentThread().isInterrupted()) {
                cancelExpirationRenewal(threadId);
            }
        }
    }
}

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    // 延时任务,10s 续期一次。
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
     
            // 续期操作
            CompletionStage<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }

                if (res) {
                    // reschedule itself
                    renewExpiration();
                } else {
                    cancelExpirationRenewal(null);
                }
            });
        }
        // 三分之一时间,30s /3= 10s
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    ee.setTimeout(task);
}

// 续期脚本
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          "return 0;",
                          Collections.singletonList(getRawName()),
                          internalLockLeaseTime, getLockName(threadId));
}

get

上面的加锁操作,最终返回的是 return new CompletableFutureWrapper<>(f);  这个异步操作。

还记得上面的 BooleanNullReplayConvertor 吗,当 eval 执行加锁脚本时,成功会返回 null,并在这里转成 True 。

@Override
public <V> V get(RFuture<V> future) {
    if (Thread.currentThread().getName().startsWith("redisson-netty")) {
        throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");
    }

    try {
        return future.toCompletableFuture().get();
    } catch (InterruptedException e) {
        future.cancel(true);
        Thread.currentThread().interrupt();
        throw new RedisException(e);
    } catch (ExecutionException e) {
        throw convertException(e);
    }
}

那么,加锁的部分到这里就结束, 解锁 的就简单过一下 👇

unlock 解锁

图片

// RedissonLock.java
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          // 不存在,直接返回 null
                          "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                          "return nil;" +
                          "end; " +
                          // 减一
                          "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                          // 大于0,设置毫秒级过期时间,并返回0
                          "if (counter > 0) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                          "return 0; " +
                          // 删除锁,并向指定channel发布 0 这个消息,并返回1
                          "else " +
                          "redis.call('del', KEYS[1]); " +
                          "redis.call('publish', KEYS[2], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          // 返回 null
                          "return nil;",
                          Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

KEYS[1] 为锁名,KEYS[2] channel 名 👇

图片

ARGV[1] 为0 👇, ARGV[2] 过期时间,ARGV[3] 为 redisson 客户端id + 这个线程 ID

图片

解锁后,取消续期任务。

图片

结尾

通过源码,我们了解到上文提到的 redisson 框架的几个特点:自动续期可重入锁lua脚本


http://www.kler.cn/a/449101.html

相关文章:

  • 【机器学习】探索机器学习与人工智能:驱动未来创新的关键技术
  • GitCode 光引计划投稿|JavaVision:引领全能视觉智能识别新纪元
  • ShardingSphere-Proxy 连接实战:从 Golang 原生 SQL 到 GORM 的应用
  • unity webgl部署到iis报错
  • 财会〔2024〕22号发布,全面提高管理会计数字化、智能化水平,泛微·齐业成来助力
  • Ubuntu硬盘分区及挂载(命令行)
  • panddleocr-文本检测+文本方向分类+文本识别整体流程
  • JavaAgent技术应用和原理:JVM持久化监控
  • ubuntu18.04连接不上网络问题
  • Spring Boot与Django对比:哪个更适合做为Web服务器框架?
  • 32岁前端干了8年,是继续做前端开发,还是转其它工作
  • 图像处理中的图像配准方法
  • 详解js柯里化原理及用法,探究柯里化在Redux Selector 的场景模拟、构建复杂的数据流管道、优化深度嵌套函数中的精妙应用
  • 【PyQt5 02】基本功能(示例)
  • 作业:循环比赛日程表 与 取余运算(mod)
  • TensorFlow和Keras的区别和关系
  • GitCode 光引计划投稿|智能制造一体化低代码平台 Skyeye云
  • /etc/fstab 文件学习systemd与该文件关系
  • 开发整合笔记
  • 华为IPD流程6大阶段370个流程活动详解_第二阶段:计划阶段 — 86个活动
  • 基于Springboot的数字科技风险报告管理系统
  • 百度热力图数据处理,可直接用于论文
  • 层次聚类算法的研究
  • 江苏计算机专转本 技能Mysql知识点总结(三)
  • CTF-WEB php-Session 文件利用 [第一届国城杯 n0ob_un4er 赛后学习笔记]
  • open Feign服务抽取