当前位置: 首页 > article >正文

分布式锁,rediss,redisson,看门狗,可重入,可重试

目录

  • 什么是分布式锁
  • redisson的分布式锁的特点
  • 可重入的实现原理
  • 可重试的实现原理
  • 超时续期(看门狗机制)

什么是分布式锁

分布式锁是一种锁机制,用户解决同一个共享资源被多个线程并发访问的问题,使用分布式锁可以避免并发安全,数据不一致的情况。

redisson的分布式锁的特点

  • 可重入:同一个线程可以多次获取同一把锁
  • 可重试:如果设置了等待时间,可以在这一段时间尝试从新获取锁
  • 高可用:程序不易崩溃,时时刻刻都保证较高的可用性
  • 自动续期:看门狗机制,如果没有设置过期时间,会一直续期,知道锁被释放
  • 互斥性:同一把锁只有一个线程可以获取

可重入的实现原理

redisson的分布式锁,采用了hash结构来存储数据,大key表示锁的名称,小key表示被哪一个锁持有,小key的value记录着这一个锁被同一个线程获取了几次。

获取锁的大概的流程是这样的:先判断先要获取的锁是否存在,如果不存 在,则获取锁,设置超时时间,返回,如果存在,判断锁是不是自己的,如果不是,则获取失败,如果是,增加锁的持有次数,重置超时时间,返回。

获取锁流程图:

在这里插入图片描述

释放锁的流程:判断锁是不是自己的,如果不是,说明锁已经被释放,如果是,判断现在还有多少个地方使用到,如果只有自己,直接释放,如果还有别的地方使用到,减少持有的次数,重置过期时间,通知其他线程已经释放锁的消息。

释放锁流程图:

在这里插入图片描述

rediss源码的lua脚本:

获取锁:

if (redis.call('exists', KEYS[1]) == 0) then 				--  判断锁是否存在,进入if表示不存在
    redis.call('hincrby', KEYS[1], ARGV[2], 1); 			--  锁的持有数量加一
    redis.call('pexpire', KEYS[1], ARGV[1]); 				--  设置过期时间
    return nil; 										 --  返回获取成功
end; 

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 		--  锁已经存在,判断是不是自己的,进入if表示是自己的
    redis.call('hincrby', KEYS[1], ARGV[2], 1); 			--  锁的持有数量加一
    redis.call('pexpire', KEYS[1], ARGV[1]); 				--  重置过期时间
    return nil; 										--  返回获取成功
end; 

return redis.call('pttl', KEYS[1]);						--  锁存在并且不是自己的,返回获取失败

释放锁:

local key = KEYS[1]          -- "myLock" 锁的名称
local pubsub_key = KEYS[2]   -- "lock_channel" 发布订阅的通道名称
local release_message = ARGV[1] -- "lock_released" 发布的消息
local expire_time = ARGV[2]    -- 30000 锁的过期时间
local threadId = ARGV[3]       -- "thread123" 持有锁的线程

if (redis.call('hexists', key, threadId) == 0) then   -- 判定要释放锁是否存在
    return nil;										-- 不存在表示无需释放返回nil
end; 
local counter = redis.call('hincrby', key, threadId, -1); -- 判断持有数量加一之后剩余数量是否大于0
if (counter > 0) then 
    redis.call('pexpire', key, expire_time); 			-- 大于0表示还有其他地方是由锁
    return 0; 										-- 返回0表示为完全释放
else 
    redis.call('del', key); 							-- 小于0表示完全释放,删除锁
    redis.call('publish', pubsub_key, release_message);  -- 同时订阅释放锁消息的线程
    return 1;  -- 返回1表示完全释放
end; -- 默认返回值,一般不会使用到

可重试的实现原理

大概流程是这样子的:先尝试获取锁,如果获取成功直接返回,如果获取失败,尝试重新获取,但是不会马上获取,先判断等待是否超时,如果超时返回失败,如果未超时,订阅释放锁的消息,如果在等待的时间还是订阅不到消息,则获取失败,取消订阅,返回失败,如果在等待期间获取到消息,判断是否超时,如果超时,尝试获取锁,获取成功返回,获取失败,继续订阅消息,判断是否超时,尝试重新获取,如此反复,知道获取成功或则超时。

流程图:

在这里插入图片描述

redisson源码

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    // 尝试获取锁,方法返回两种结果,如果是null表示获取成功,如果是ttl(锁剩余时间),表示获取失败
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
    // 如果成功,返回true
    if (ttl == null) {
        return true;
    }
    // 如果失败,尝试再次获取
    // 如果等待时间已过,返回false
    time -= System.currentTimeMillis() - current;
    if (time <= 0) {
        // 执行获取锁失败后的逻辑
        acquireFailed(waitTime, unit, threadId);
        return false;
    }
    // 不会立刻尝试获取,刚刚获取不到,立马获取大概率获取不到
    current = System.currentTimeMillis();
    // 调用异步方法,订阅消息,如果有锁释放,尝试获取
    RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    // 调用await等待异步方法,两种结果,一种是等不到订阅信息,取消等于放回false,一种是收到订阅信息,继续执行
    if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
        // 超时,订阅不到消息,取消订阅,返回false
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.onComplete((res, e) -> {
                if (e == null) {
                    unsubscribe(subscribeFuture, threadId);
                }
            });
        }
        acquireFailed(waitTime, unit, threadId);
        return false;
    }
    // 订阅到了消息
    try {
        // 判断是否超时
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        // 循环尝试获取
        while (true) {
            long currentTime = System.currentTimeMillis();
            // 尝试获取
            ttl = tryAcquire(waitTime, leaseTime, unit, threadId);

            if (ttl == null) {
                // 成功获取
                return true;
            }
            // 获取失败,判断是否超时
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }

            // 阻塞等待订阅信息,两种结果,一种超时放回false,一种接收到,返回true,
            // 但是这里的放回值没有获取,只是在这里阻塞一段时间
            currentTime = System.currentTimeMillis();
            if (ttl >= 0 && ttl < time) {
                // 如果锁过期时间小于等待时间,以过期时间为标准
                subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                // 如果锁过期时间大约等于等待时间,以等待为标准
                subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }
            // 判断是否超时
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        }
    } finally {
        // 取消订阅
        unsubscribe(subscribeFuture, threadId);
    }
}

超时续期(看门狗机制)

如果一个锁没有设置过期时间,则在释放锁之前需要一直存在,如果不设置过期时间,可能在释放锁之前,服务宕机了,到时锁没有得到释放,所以才有设置过期时间,之后一直续期,即使服务宕机了,也会自己释放。

大概流程是这样子的,如果需要使用续期机制,在线程第一次获取锁的时候,开启看门狗,也就是定时任务,默认的过期时间是30秒,定时任务每隔10秒重置为30秒,再继续开启一个定时任务。在锁释放的时候,需要删除锁,关闭定时任务。

流程图:

在这里插入图片描述

rediss源码

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1) { // 不需要看门狗走这里
        return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 需要开门狗走下面
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(
        waitTime,
	   commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),// 默认过期时间30秒
        TimeUnit.MILLISECONDS, threadId, 
        RedisCommands.EVAL_LONG
    );
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }
        if (ttlRemaining == null) {
            scheduleExpirationRenewal(threadId); // 开启看门狗
        }
    });
    return ttlRemainingFuture;
}


private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();  // 里面可以存放一个定时任务和锁
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); // 如果map里面有这个锁,返回null,没有则返回ExpirationEntry
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId); // 已经有过,线程持有次数加一
    } else {
        entry.addThreadId(threadId); // 线程持有次数加一
        renewExpiration(); // 开启看门狗
    }
}


private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }

    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }

            RFuture<Boolean> future = renewExpirationAsync(threadId); // 重置过期时间
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    return;
                }

                if (res) {
                    // reschedule itself
                    renewExpiration(); // 在开启一个定时任务
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    ee.setTimeout(task);
}

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          "return 0;",
                          Collections.singletonList(getName()),
                          internalLockLeaseTime, getLockName(threadId));
}

http://www.kler.cn/a/613884.html

相关文章:

  • 【实战ES】实战 Elasticsearch:快速上手与深度实践-2.2.1 Bulk API的正确使用与错误处理
  • Open GL ES ->模型矩阵、视图矩阵、投影矩阵等变换矩阵数学推导以及方法接口说明
  • 信息学奥赛一本通 1514:【例 2】最大半连通子图 | 洛谷 P2272 [ZJOI2007] 最大半连通子图
  • Emacs 折腾日记(二十)——修改emacs的一些默认行为
  • 【C++项目实战】:基于正倒排索引的Boost搜索引擎(1)
  • s1: Simple test-time scaling 【论文阅读笔记】
  • PPTP、L2TP 和 IPSec
  • PyTorch 分布式训练(Distributed Data Parallel, DDP)简介
  • 在IDEA中快速注释所有console.log
  • Taro创建微信小程序项目 第一步搭建项目
  • 掌握!Postman 设置 Bearer Token 的完整指南
  • 3d pose 指标和数据集
  • 【tips】微信小程序wxs 注意
  • WHAT - 程序员英语之美式发音学习系列(五)
  • 【华三】华三模拟器HCL防火墙、AC和交换机的Web登入
  • 06-SpringBoot3入门-常见注解(简介)
  • 基于HTML5和CSS3实现3D旋转相册效果
  • 力扣hot100二刷——动态规划
  • uni-app踩坑记录【图片先压缩再上传】
  • Oracle 数据库同步至 GaussDB问题及解决方案