当前位置: 首页 > article >正文

redis-redission的加锁源码与看门狗机制

redission加锁方式

maven依赖

    <dependency>
      <groupId>org.redisson</groupId>
      <artifactId>redisson</artifactId>
      <version>3.16.8</version>
    </dependency>

 

lock使用方式

import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

import java.util.concurrent.TimeUnit;

public class RedissionLock {

    private static final String KEY = "lock_test";
    private static final RedissonClient redisson;

    static {
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://127.0.0.0:6379")
                .setUsername("***")
                .setPassword("****")
        ;
        redisson = Redisson.create(config);
    }


    public static void main(String[] args) {
        RLock lock = redisson.getLock(KEY);
        try {
            lock.lock();

            // 该方式不会启动看门狗
            lock.lock(20, TimeUnit.SECONDS);

        } finally {
            lock.unlock();
        }
    }


}

内部方法调用顺序

调用lock

    public void lock() {
        try {
            this.lock(-1L, (TimeUnit)null, false);
        } catch (InterruptedException var2) {
            throw new IllegalStateException();
        }
    }

   private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();

        // 加锁成功ttl为null
        Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);


        // 加锁不成功,会自旋尝试重新加锁
        if (ttl != null) {
        // ......
        }
           
    }

 lock(long leaseTime, TimeUnit unit, boolean interruptibly)方法的含义

leaseTime

含义:锁的持有时间,即锁的自动释放时间。单位由TimeUnit参数指定。

作用:如果leaseTime-1,表示锁不会自动释放,需要手动调用unlock()方法释放锁。

unit

含义:leaseTime的时间单位,例如TimeUnit.SECONDSTimeUnit.MILLISECONDS

作用:指定leaseTime的时间单位,确保锁的持有时间被正确解析。

interruptibly

含义:是否响应线程中断。

作用:如果为false,表示在尝试获取锁的过程中,线程不会响应中断,即使被中断也会继续尝试获取锁。如果为true,表示在尝试获取锁的过程中,如果当前线程被中断(调用了Thread.interrupt()),会抛出InterruptedException,并停止尝试获取锁。

 

tryAcquire

    private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        return (Long)this.get(this.tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }

 

waitTime

含义:尝试获取锁的最大等待时间。

单位:由 TimeUnit 参数指定(如秒、毫秒等)。

作用:指定当前线程尝试获取锁的最大等待时间。如果在该时间内无法获取锁,则返回 false

leaseTime

类型:long

含义:锁的持有时间。

单位:由 TimeUnit 参数指定(如秒、毫秒等)。

作用:指定锁的自动释放时间。如果锁在 leaseTime 时间内未被显式释放,Redisson 会自动释放该锁。这可以防止锁被永久持有,从而避免死锁问题。

unit

含义:时间单位,用于指定 waitTimeleaseTime 的时间单位。

作用:确保时间参数的单位一致性。常见的值包括:

threadId

含义:当前线程的唯一标识符。

作用:用于标识当前线程,以便 Redisson 能够跟踪锁的持有者。在分布式环境中,每个线程的 threadId 是唯一的。

返回值

返回值含义

如果成功获取锁,返回 1

如果在指定的 waitTime 内未能获取锁,返回 null0

tryAcquireAsync

    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture<Long> ttlRemainingFuture;
        if (leaseTime != -1L) {
            ttlRemainingFuture = this.<Long>tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            // 持有锁的时间为-1时,会设置默认的持有持有时间30秒
            ttlRemainingFuture = this.<Long>tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        }

        // ttlRemaining 不为null,则表示锁已经被抢占,该值为剩余时间
        CompletionStage<Long> f = ttlRemainingFuture.thenApply((ttlRemaining) -> {
            if (ttlRemaining == null) {
                if (leaseTime != -1L) {
                    this.internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                    // ttlRemaining 为null,表示加锁成功;
                    // 没有设置leaseTime,则触发看门狗进行自动续期
                    //(如果设置了,则不会触发看门狗)
                    this.scheduleExpirationRenewal(threadId);
                }
            }

            return ttlRemaining;
        });
        return new CompletableFutureWrapper(f);
    }

 返回值

返回值类型RFuture<Long>

返回值含义

如果锁被成功获取,返回值为 null

如果锁未被获取,返回值为锁的剩余过期时间(单位为毫秒)

tryLockInnerAsync

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return this.evalWriteAsync(
this.getRawName(), 
LongCodec.INSTANCE,
 command,
 "if (redis.call('exists', KEYS[1]) == 0) 
then redis.call('hincrby', KEYS[1], ARGV[2], 1);
 redis.call('pexpire', KEYS[1], ARGV[1]);
 return nil; end; 
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) 
then redis.call('hincrby', KEYS[1], ARGV[2], 1); 
redis.call('pexpire', KEYS[1], ARGV[1]); 
return nil; end; 
return redis.call('pttl', KEYS[1]);", 
Collections.singletonList(this.getRawName()),
 new Object[]{unit.toMillis(leaseTime), 
this.getLockName(threadId)});
}

加锁,使用hash结构

看门狗

scheduleExpirationRenewal 

添加续期任务

    protected void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);

            try {
              // 续期
                this.renewExpiration();
            } finally {
              // 线程被打断,终止需求(避免线程意外死亡,锁被一致续期的情况)   
                if (Thread.currentThread().isInterrupted()) {
                    this.cancelExpirationRenewal(threadId);
                }

            }
        }

    }

    private void renewExpiration() {
        ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
        if (ee != null) {
            Timeout task = this.commandExecutor.getConnectionManager().newTimeout(
// 创建定时任务(定时任务的执行间隔为this.internalLockLeaseTime/3 )
new TimerTask() {
                public void run(Timeout timeout) throws Exception {
                    ExpirationEntry ent = (ExpirationEntry)RedissonBaseLock.EXPIRATION_RENEWAL_MAP.get(RedissonBaseLock.this.getEntryName());
                    if (ent != null) {
                        Long threadId = ent.getFirstThreadId();
                        if (threadId != null) {
                            // 续期
                            RFuture<Boolean> future = RedissonBaseLock.this.renewExpirationAsync(threadId);
                            future.whenComplete((res, e) -> {
                                if (e != null) {
                                    RedissonBaseLock.log.error("Can't update lock " + RedissonBaseLock.this.getRawName() + " expiration", e);
                                    RedissonBaseLock.EXPIRATION_RENEWAL_MAP.remove(RedissonBaseLock.this.getEntryName());
                                } else {
                                    if (res) {
                                        // 续期成功,继续续期
                                        RedissonBaseLock.this.renewExpiration();
                                    } else {
                                        // 续期失败,取消续期
                                         RedissonBaseLock.this.cancelExpirationRenewal((Long)null);
                               
                                    }

                                }
                            });
                        }
                    }
                }
            }, 
// 在internalLockLeaseTime/3的时间间隔,发起续租(默认30秒,即10秒续租一次)
this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
            ee.setTimeout(task);
        }
    }
renewExpirationAsync续期
    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return this.<Boolean>evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, 

"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) 
then redis.call('pexpire', KEYS[1], ARGV[1]); 
return 1; end; return 0;", 

Collections.singletonList(this.getRawName()), this.internalLockLeaseTime, this.getLockName(threadId));
    }

lua含义:当key存在时,即设置超时时间

如何保证自动续期
  • 创建一个定时任务,使用定时任务管理器触发任务,进行续期
如何保证任务中断后,看门狗不会自动续期
  • 当任务线程被中断后,会取消看门狗任务(从map中删除任务)
  • 当续期失败后,会取消看门狗任务

建议

业务中尽量避免使用看门狗,评估业务耗时,使用自定义过期时间。因为看门狗太多时会消耗系统资源。


http://www.kler.cn/a/512860.html

相关文章:

  • 开源AI崛起:新模型逼近商业巨头
  • Java高频面试之SE-15
  • Docker 实现MySQL 主从复制
  • C# 的 NLog 库高级进阶
  • TypeScript - 利用GPT辅助学习
  • 代码随想录算法【Day27】
  • 【人工智能数学基础—微积分】深入详解梯度与梯度下降:掌握梯度下降法及其变种及模型参数的优化
  • 14天学习微服务-->第1天:微服务架构入门
  • Java锁 死锁及排查 JVM 工具 jconsole 工具 排查死锁
  • R语言的编程范式
  • cuda从零开始手搓PB神经网络
  • QT:多窗口设计(主窗口点击按钮打开子窗口)
  • 开源的Text-to-SQL工具WrenAI
  • SQL Server2022版详细安装教程(Windows)
  • 有线通信方式(I2C、UART、RS485)
  • 【Red Hat8】:搭建FTP服务器
  • springboot接入deepseek深度求索 java
  • vue3使用音频audio标签
  • 可视化平台建设技术方案,商业BI系统解决方案,大屏建设功能需求分析(word原件)
  • Datawhale组队学习笔记task2——leetcode面试题
  • 前〈和平精英〉技术策划进军AI游戏领域,获新投资
  • 【数据结构】搜索二叉树
  • 【有啥问啥】什么是端到端(End-to-End)?
  • 【AI大模型Agent探索】深入探索实践 Qwen-Agent 的 Function Calling
  • 【Linux】Linux入门(4)其他常用指令
  • 基于Docker的Kafka分布式集群