当前位置: 首页 > article >正文

Redisson - 分布式锁和同步器

文章目录

  • 锁(Lock)
  • 公平锁(Fair Lock)
  • 联锁(MultiLock)
  • 红锁(RedLock) 【已废弃】
  • 读写锁(ReadWriteLock)
  • 信号量(Semaphore)
  • 可过期许可信号量(PermitExpirableSemaphore)
  • 倒计时门闩(CountDownLatch)
  • 自旋锁(Spin Lock)
  • 栅栏锁(Fenced Lock)
  • 官方文档

在这里插入图片描述


锁(Lock)

基于Redis或Valkey的分布式可重入锁对象(Java实现),实现了Lock接口。通过发布/订阅(pub/sub)通道通知所有Redisson实例中等待获取锁的其他线程。

如果获取锁的Redisson实例崩溃,该锁可能会永远处于已获取状态。为避免此问题,Redisson维护了一个锁看门狗,当持有锁的Redisson实例存活时,它会延长锁的过期时间。默认锁看门狗超时时间为30秒,可通过Config.lockWatchdogTimeout配置修改。

在获取锁时可通过leaseTime参数指定锁的持有时间。超过指定时间后,锁将自动释放。

RLock对象的行为遵循Java锁规范,即只有锁的持有线程可以解锁,否则会抛出IllegalMonitorStateException。其他情况可考虑使用RSemaphore对象。

代码示例:

RLock lock = redisson.getLock("myLock");

// 传统加锁方式
lock.lock();

// 加锁并10秒后自动解锁
lock.lock(10, TimeUnit.SECONDS);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       lock.unlock();
   }
}

异步接口使用示例:

RLock lock = redisson.getLock("myLock");

long threadId = Thread.currentThread().getId();
RFuture<Void> lockFuture = lock.lockAsync(threadId);

// 加锁并10秒后自动解锁
RFuture<Void> lockFuture = lock.lockAsync(10, TimeUnit.SECONDS, threadId);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
RFuture<Boolean> lockFuture = lock.tryLockAsync(100, 10, TimeUnit.SECONDS, threadId);

lockFuture.whenComplete((res, exception) -> {
    // ...
    lock.unlockAsync(threadId);
});

响应式接口使用示例:

RedissonReactiveClient redisson = redissonClient.reactive();
RLockReactive lock = redisson.getLock("myLock");

long threadId = Thread.currentThread().getId();
Mono<Void> lockMono = lock.lock(threadId);

// 加锁并10秒后自动解锁
Mono<Void> lockMono = lock.lock(10, TimeUnit.SECONDS, threadId);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
Mono<Boolean> lockMono = lock.tryLock(100, 10, TimeUnit.SECONDS, threadId);

lockMono.doOnSuccess(res -> {
   // ...
})
.doFinally(r -> lock.unlock(threadId).subscribe())
.subscribe();

RxJava3接口使用示例:

RedissonRxClient redisson = redissonClient.rxJava();
RLockRx lock = redisson.getLock("myLock");

long threadId = Thread.currentThread().getId();
Completable lockRes = lock.lock(threadId);

// 加锁并10秒后自动解锁
Completable lockRes = lock.lock(10, TimeUnit.SECONDS, threadId);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
Single<Boolean> lockRes = lock.tryLock(100, 10, TimeUnit.SECONDS, threadId);

lockRes.doOnSuccess(res -> {
   // ...
})
.doFinally(() -> lock.unlock(threadId).subscribe())
.subscribe();

公平锁(Fair Lock)

基于Redis或Valkey的分布式可重入公平锁(Java实现),实现了Lock接口。公平锁保证线程按请求顺序获取锁。所有等待线程会被队列化,若某线程异常终止,Redisson会等待其恢复5秒(例如5个线程异常终止,总延迟为25秒)。

锁看门狗机制与普通锁相同,支持leaseTime参数。解锁需由持有线程执行,否则抛出IllegalMonitorStateException

代码示例:

RLock lock = redisson.getFairLock("myLock");

// 传统加锁方式
lock.lock();

// 加锁并10秒后自动解锁
lock.lock(10, TimeUnit.SECONDS);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       lock.unlock();
   }
}

异步接口使用示例:

RLock lock = redisson.getFairLock("myLock");

RFuture<Void> lockFuture = lock.lockAsync();

// 加锁并10秒后自动解锁
RFuture<Void> lockFuture = lock.lockAsync(10, TimeUnit.SECONDS);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
RFuture<Boolean> lockFuture = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);

lockFuture.whenComplete((res, exception) -> {
    // ...
    lock.unlockAsync();
});

响应式接口使用示例:

RedissonReactiveClient redisson = redissonClient.reactive();
RLockReactive lock = redisson.getFairLock("myLock");

long threadId = Thread.currentThread().getId();
Mono<Void> lockMono = lock.lock(threadId);

// 加锁并10秒后自动解锁
Mono<Void> lockMono = lock.lock(10, TimeUnit.SECONDS, threadId);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
Mono<Boolean> lockMono = lock.tryLock(100, 10, TimeUnit.SECONDS, threadId);

lockMono.doOnSuccess(res -> {
   // ...
})
.doFinally(r -> lock.unlock(threadId).subscribe())
.subscribe();

RxJava3接口使用示例:

RedissonRxClient redisson = redissonClient.rxJava();
RLockRx lock = redisson.getFairLock("myLock");

long threadId = Thread.currentThread().getId();
Completable lockRes = lock.lock(threadId);

// 加锁并10秒后自动解锁
Completable lockRes = lock.lock(10, TimeUnit.SECONDS, threadId);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
Single<Boolean> lockRes = lock.tryLock(100, 10, TimeUnit.SECONDS, threadId);

lockRes.doOnSuccess(res -> {
   // ...
})
.doFinally(() -> lock.unlock(threadId).subscribe())
.subscribe();

联锁(MultiLock)

基于Redis或Valkey的分布式联锁对象,允许多个RLock对象(可属于不同Redisson实例)组合为单个锁。若持有联锁的Redisson实例崩溃,联锁可能永久处于已获取状态。锁看门狗机制与普通锁相同,支持leaseTime参数。解锁需由持有线程执行。

代码示例:

RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");

RLock multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3);

// 传统加锁方式
multiLock.lock();

// 加锁并10秒后自动解锁
multiLock.lock(10, TimeUnit.SECONDS);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
boolean res = multiLock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       multiLock.unlock();
   }
}

异步接口使用示例:

RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");

RLock multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3);

long threadId = Thread.currentThread().getId();
RFuture<Void> lockFuture = multiLock.lockAsync(threadId);

// 加锁并10秒后自动解锁
RFuture<Void> lockFuture = multiLock.lockAsync(10, TimeUnit.SECONDS, threadId);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
RFuture<Boolean> lockFuture = multiLock.tryLockAsync(100, 10, TimeUnit.SECONDS, threadId);

lockFuture.whenComplete((res, exception) -> {
    // ...
    multiLock.unlockAsync(threadId);
});

响应式接口使用示例:

RedissonReactiveClient anyRedisson = redissonClient.reactive();

RLockReactive lock1 = redisson1.getLock("lock1");
RLockReactive lock2 = redisson2.getLock("lock2");
RLockReactive lock3 = redisson3.getLock("lock3");

RLockReactive multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3);

long threadId = Thread.currentThread().getId();
Mono<Void> lockMono = multiLock.lock(threadId);

// 加锁并10秒后自动解锁
Mono<Void> lockMono = multiLock.lock(10, TimeUnit.SECONDS, threadId);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
Mono<Boolean> lockMono = multiLock.tryLock(100, 10, TimeUnit.SECONDS, threadId);

lockMono.doOnSuccess(res -> {
   // ...
})
.doFinally(r -> multiLock.unlock(threadId).subscribe())
.subscribe();

RxJava3接口使用示例:

RedissonRxClient anyRedisson = redissonClient.rxJava();

RLockRx lock1 = redisson1.getLock("lock1");
RLockRx lock2 = redisson2.getLock("lock2");
RLockRx lock3 = redisson3.getLock("lock3");

RLockRx multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3);

long threadId = Thread.currentThread().getId();
Completable lockRes = multiLock.lock(threadId);

// 加锁并10秒后自动解锁
Completable lockRes = multiLock.lock(10, TimeUnit.SECONDS, threadId);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
Single<Boolean> lockRes = multiLock.tryLock(100, 10, TimeUnit.SECONDS, threadId);

lockRes.doOnSuccess(res -> {
   // ...
})
.doFinally(() -> multiLock.unlock(threadId).subscribe())
.subscribe();

红锁(RedLock) 【已废弃】

此对象已弃用,请改用RLockRFencedLock

RedLock 的背景

  • 问题场景:传统单节点 Redis 锁(如 SETNX)在主从切换时可能失效(例如主节点崩溃,锁未同步到从节点,导致其他客户端重复获取锁)。
  • 解决方案:Redis 作者 Antirez 提出 RedLock 算法,要求客户端在多个独立的 Redis 节点上获取锁,通过多数派机制提高可靠性。

RedLock 实现原理

Redisson 的 RedissonRedLock 类实现了 RedLock 算法,核心步骤如下:

  1. 获取锁

    • N 个独立 Redis 节点 依次发起加锁请求(使用相同键和随机值)。
    • 加锁成功条件:
      • 成功获取锁的节点数 ≥ N/2 + 1(多数派)。
      • 总耗时 < 锁的失效时间(避免锁过期后仍被误用)。
  2. 释放锁

    • 向所有节点发起解锁请求(无论是否加锁成功),避免残留锁状态。

Code

// 1. 创建多个独立 Redis 节点的客户端
RedissonClient client1 = Redisson.create(new Config().useSingleServer().setAddress("redis://node1:6379"));
RedissonClient client2 = Redisson.create(new Config().useSingleServer().setAddress("redis://node2:6379"));
RedissonClient client3 = Redisson.create(new Config().useSingleServer().setAddress("redis://node3:6379"));

// 2. 从每个客户端获取 RLock 对象
RLock lock1 = client1.getLock("myLock");
RLock lock2 = client2.getLock("myLock");
RLock lock3 = client3.getLock("myLock");

// 3. 创建 RedLock
RLock redLock = new RedissonRedLock(lock1, lock2, lock3);

try {
    // 4. 尝试加锁(最多等待 10 秒,锁有效期 30 秒)
    boolean success = redLock.tryLock(10, 30, TimeUnit.SECONDS);
    if (success) {
        // 执行业务逻辑
    }
} finally {
    redLock.unlock(); // 5. 释放锁
}

使用注意事项

  • 节点独立性:所有 Redis 节点必须为独立主节点(无主从关系),避免数据同步问题。
  • 锁有效期(Lease Time):需合理设置,确保业务逻辑能在锁失效前完成。
  • 性能开销:跨节点通信增加延迟,适合对可靠性要求高的场景。
  • 网络分区风险:极端情况下仍可能锁失效,需结合业务幂等性设计。

缺陷

  • 无法应对NPC异常场景
    RedLock 的核心算法无法正确处理网络延迟(Network Delay)、进程暂停(Process Pause)和时钟漂移(Clock Drift)这三种异常情况。例如:当持有锁的客户端因进程暂停(如JVM的STW)导致锁超时释放时,其他客户端可能同时获取同一资源的锁,破坏互斥性。若Redis节点时钟不同步,锁可能提前失效,导致多个客户端同时持有锁。

  • ​学术界与社区的批评
    Redis作者Antirez提出的RedLock算法曾遭到分布式系统专家Martin Kleppmann的质疑。Martin指出,RedLock依赖的“多数派加锁成功”机制无法保证严格的互斥性,尤其在节点故障恢复或主从切换时,可能产生锁状态不一致的问题。

RedLock的替代方案

  • ​ 联锁(MultiLock)​:允许一次性锁定多个资源,但需显式管理所有锁实例。
  • 单节点锁+WatchDog:通过定期续期避免锁超时,简化实现并兼顾性能

读写锁(ReadWriteLock)

基于Redis或Valkey的分布式可重入读写锁(Java实现),实现了ReadWriteLock接口。读锁和写锁均实现RLock接口,允许多个读锁或单个写锁。

锁看门狗机制与普通锁相同,支持leaseTime参数。解锁需由持有线程执行,否则抛出IllegalMonitorStateException

并发规则矩阵

请求类型已持读锁已持写锁
请求读锁✅允许❌拒绝
请求写锁❌拒绝❌拒绝

代码示例:

RReadWriteLock rwlock = redisson.getReadWriteLock("myLock");

RLock readLock = rwlock.readLock();
// 或
RLock writeLock = rwlock.writeLock();

// 传统加锁方式
readLock.lock();

// 加锁并10秒后自动解锁
writeLock.lock(10, TimeUnit.SECONDS);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
boolean res = writeLock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       writeLock.unlock();
   }
}

异步接口使用示例:

RReadWriteLock rwlock = redisson.getReadWriteLock("myLock");

long threadId = Thread.currentThread().getId();
RLock readLock = rwlock.readLock();
// 或
RLock writeLock = rwlock.writeLock();

RFuture<Void> lockFuture = readLock.lockAsync(threadId);

// 加锁并10秒后自动解锁
RFuture<Void> lockFuture = writeLock.lockAsync(10, TimeUnit.SECONDS, threadId);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
RFuture<Boolean> lockFuture = writeLock.tryLockAsync(100, 10, TimeUnit.SECONDS, threadId);

lockFuture.whenComplete((res, exception) -> {
    // ...
    writeLock.unlockAsync(threadId);
});

响应式接口使用示例:

RedissonReactiveClient redisson = redissonClient.reactive();

RReadWriteLockReactive rwlock = redisson.getReadWriteLock("myLock");

RLockReactive readLock = rwlock.readLock();
// 或
RLockReactive writeLock = rwlock.writeLock();

long threadId = Thread.currentThread().getId();
Mono<Void> lockMono = writeLock.lock(threadId);

// 加锁并10秒后自动解锁
Mono<Void> lockMono = writeLock.lock(10, TimeUnit.SECONDS, threadId);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
Mono<Boolean> lockMono = writeLock.tryLock(100, 10, TimeUnit.SECONDS, threadId);

lockMono.doOnSuccess(res -> {
   // ...
})
.doFinally(r -> writeLock.unlock(threadId).subscribe())
.subscribe();

RxJava3接口使用示例:

RedissonRxClient redisson = redissonClient.rxJava();

RReadWriteLockRx rwlock = redisson.getReadWriteLock("myLock");

RLockRx readLock = rwlock.readLock();
// 或
RLockRx writeLock = rwlock.writeLock();

long threadId = Thread.currentThread().getId();
Completable lockRes = writeLock.lock(threadId);

// 加锁并10秒后自动解锁
Completable lockRes = writeLock.lock(10, TimeUnit.SECONDS, threadId);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
Single<Boolean> lockRes = writeLock.tryLock(100, 10, TimeUnit.SECONDS, threadId);

lockRes.doOnSuccess(res -> {
   // ...
})
.doFinally(() -> writeLock.unlock(threadId).subscribe())
.subscribe();

信号量(Semaphore)

基于Redis或Valkey的分布式信号量(Java实现),与Java Semaphore类似。需通过trySetPermits(permits)初始化许可数量。

代码示例:

RSemaphore semaphore = redisson.getSemaphore("mySemaphore");

// 获取一个许可
semaphore.acquire();

// 获取10个许可
semaphore.acquire(10);

// 尝试获取许可
boolean res = semaphore.tryAcquire();

// 尝试获取许可(最多等待15秒)
boolean res = semaphore.tryAcquire(15, TimeUnit.SECONDS);

// 尝试获取10个许可
boolean res = semaphore.tryAcquire(10);

// 尝试获取10个许可(最多等待15秒)
boolean res = semaphore.tryAcquire(10, 15, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       semaphore.release();
   }
}

异步接口使用示例:

RSemaphore semaphore = redisson.getSemaphore("mySemaphore");

// 获取一个许可
RFuture<Void> acquireFuture = semaphore.acquireAsync();

// 获取10个许可
RFuture<Void> acquireFuture = semaphore.acquireAsync(10);

// 尝试获取许可
RFuture<Boolean> acquireFuture = semaphore.tryAcquireAsync();

// 尝试获取许可(最多等待15秒)
RFuture<Boolean> acquireFuture = semaphore.tryAcquireAsync(15, TimeUnit.SECONDS);

// 尝试获取10个许可
RFuture<Boolean> acquireFuture = semaphore.tryAcquireAsync(10);

// 尝试获取10个许可(最多等待15秒)
RFuture<Boolean> acquireFuture = semaphore.tryAcquireAsync(10, 15, TimeUnit.SECONDS);

acquireFuture.whenComplete((res, exception) -> {
    // ...
    semaphore.releaseAsync();
});

响应式接口使用示例:

RedissonReactiveClient redisson = redissonClient.reactive();

RSemaphoreReactive semaphore = redisson.getSemaphore("mySemaphore");

// 获取一个许可
Mono<Void> acquireMono = semaphore.acquire();

// 获取10个许可
Mono<Void> acquireMono = semaphore.acquire(10);

// 尝试获取许可
Mono<Boolean> acquireMono = semaphore.tryAcquire();

// 尝试获取许可(最多等待15秒)
Mono<Boolean> acquireMono = semaphore.tryAcquire(15, TimeUnit.SECONDS);

// 尝试获取10个许可
Mono<Boolean> acquireMono = semaphore.tryAcquire(10);

// 尝试获取10个许可(最多等待15秒)
Mono<Boolean> acquireMono = semaphore.tryAcquire(10, 15, TimeUnit.SECONDS);

acquireMono.doOnSuccess(res -> {
   // ...
})
.doFinally(r -> semaphore.release().subscribe())
.subscribe();

RxJava3接口使用示例:

RedissonRxClient redisson = redissonClient.rxJava();

RSemaphoreRx semaphore = redisson.getSemaphore("mySemaphore");

// 获取一个许可
Completable acquireRx = semaphore.acquire();

// 获取10个许可
Completable acquireRx = semaphore.acquire(10);

// 尝试获取许可
Single<Boolean> acquireRx = semaphore.tryAcquire();

// 尝试获取许可(最多等待15秒)
Single<Boolean> acquireRx = semaphore.tryAcquire(15, TimeUnit.SECONDS);

// 尝试获取10个许可
Single<Boolean> acquireRx = semaphore.tryAcquire(10);

// 尝试获取10个许可(最多等待15秒)
Single<Boolean> acquireRx = semaphore.tryAcquire(10, 15, TimeUnit.SECONDS);

acquireRx.doOnSuccess(res -> {
   // ...
})
.doFinally(() -> semaphore.release().subscribe())
.subscribe();

可过期许可信号量(PermitExpirableSemaphore)

基于Redis或Valkey的分布式信号量,支持为每个许可设置租约时间。每个许可通过唯一ID标识,需使用该ID释放。需通过trySetPermits(permits)初始化许可数量,并通过addPermits(permits)动态调整。

代码示例:

RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
semaphore.trySetPermits(23);

// 获取许可
String id = semaphore.acquire();

// 获取许可(租约10秒)
String id = semaphore.acquire(10, TimeUnit.SECONDS);

// 尝试获取许可
String id = semaphore.tryAcquire();

// 尝试获取许可(最多等待15秒)
String id = semaphore.tryAcquire(15, TimeUnit.SECONDS);

// 尝试获取许可(租约15秒,最多等待10秒)
String id = semaphore.tryAcquire(10, 15, TimeUnit.SECONDS);
if (id != null) {
   try {
     ...
   } finally {
       semaphore.release(id);
   }
}

异步接口使用示例:

RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");

RFuture<Boolean> setFuture = semaphore.trySetPermitsAsync(23);

// 获取许可
RFuture<String> acquireFuture = semaphore.acquireAsync();

// 获取许可(租约10秒)
RFuture<String> acquireFuture = semaphore.acquireAsync(10, TimeUnit.SECONDS);

// 尝试获取许可
RFuture<String> acquireFuture = semaphore.tryAcquireAsync();

// 尝试获取许可(最多等待15秒)
RFuture<String> acquireFuture = semaphore.tryAcquireAsync(15, TimeUnit.SECONDS);

// 尝试获取许可(租约15秒,最多等待10秒)
RFuture<String> acquireFuture = semaphore.tryAcquireAsync(10, 15, TimeUnit.SECONDS);
acquireFuture.whenComplete((id, exception) -> {
    // ...
    semaphore.releaseAsync(id);
});

响应式接口使用示例:

RedissonReactiveClient redisson = redissonClient.reactive();

RPermitExpirableSemaphoreReactive semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");

Mono<Boolean> setMono = semaphore.trySetPermits(23);

// 获取许可
Mono<String> acquireMono = semaphore.acquire();

// 获取许可(租约10秒)
Mono<String> acquireMono = semaphore.acquire(10, TimeUnit.SECONDS);

// 尝试获取许可
Mono<String> acquireMono = semaphore.tryAcquire();

// 尝试获取许可(最多等待15秒)
Mono<String> acquireMono = semaphore.tryAcquire(15, TimeUnit.SECONDS);

// 尝试获取许可(租约15秒,最多等待10秒)
Mono<String> acquireMono = semaphore.tryAcquire(10, 15, TimeUnit.SECONDS);

acquireMono.flatMap(id -> {
   // ...
   return semaphore.release(id);
}).subscribe();

RxJava3接口使用示例:

RedissonRxClient redisson = redissonClient.rxJava();

RPermitExpirableSemaphoreRx semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");

Single<Boolean> setRx = semaphore.trySetPermits(23);

// 获取许可
Single<String> acquireRx = semaphore.acquire();

// 获取许可(租约10秒)
Single<String> acquireRx = semaphore.acquire(10, TimeUnit.SECONDS);

// 尝试获取许可
Maybe<String> acquireRx = semaphore.tryAcquire();

// 尝试获取许可(最多等待15秒)
Maybe<String> acquireRx = semaphore.tryAcquire(15, TimeUnit.SECONDS);

// 尝试获取许可(租约15秒,最多等待10秒)
Maybe<String> acquireRx = semaphore.tryAcquire(10, 15, TimeUnit.SECONDS);

acquireRx.flatMap(id -> {
   // ...
   return semaphore.release(id);
}).subscribe();

倒计时门闩(CountDownLatch)

基于Redis或Valkey的分布式倒计时门闩(Java实现),需通过trySetCount(count)初始化计数器。

代码示例:

RCountDownLatch latch = redisson.getCountDownLatch("myCountDownLatch");

// 初始化计数器为1
latch.trySetCount(1);
// 等待计数器归零
latch.await();

// 其他线程或JVM中减少计数
RCountDownLatch latch = redisson.getCountDownLatch("myCountDownLatch");
latch.countDown();

异步接口使用示例:

RCountDownLatch latch = redisson.getCountDownLatch("myCountDownLatch");

RFuture<Boolean> setFuture = latch.trySetCountAsync(1);
// 等待计数器归零
RFuture<Void> awaitFuture = latch.awaitAsync();

// 其他线程或JVM中减少计数
RCountDownLatch latch = redisson.getCountDownLatch("myCountDownLatch");
RFuture<Void> countFuture = latch.countDownAsync();

响应式接口使用示例:

RedissonReactiveClient redisson = redissonClient.reactive();
RCountDownLatchReactive latch = redisson.getCountDownLatch("myCountDownLatch");

Mono<Boolean> setMono = latch.trySetCount(1);
// 等待计数器归零
Mono<Void> awaitMono = latch.await();

// 其他线程或JVM中减少计数
RCountDownLatchReactive latch = redisson.getCountDownLatch("myCountDownLatch");
Mono<Void> countMono = latch.countDown();

RxJava3接口使用示例:

RedissonRxClient redisson = redissonClient.rxJava();
RCountDownLatchRx latch = redisson.getCountDownLatch("myCountDownLatch");

Single<Boolean> setRx = latch.trySetCount(1);
// 等待计数器归零
Completable awaitRx = latch.await();

// 其他线程或JVM中减少计数
RCountDownLatchRx latch = redisson.getCountDownLatch("myCountDownLatch");
Completable countRx = latch.countDown();

自旋锁(Spin Lock)

基于Redis或Valkey的分布式可重入自旋锁(Java实现),采用指数退避策略避免因短时间内大量锁操作导致的网络吞吐量限制和Redis/Valkey CPU过载。锁看门狗机制与普通锁相同,支持leaseTime参数。解锁需由持有线程执行。

代码示例:

RLock lock = redisson.getSpinLock("myLock");

// 传统加锁方式
lock.lock();

// 加锁并10秒后自动解锁
lock.lock(10, TimeUnit.SECONDS);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       lock.unlock();
   }
}

异步接口使用示例:

RLock lock = redisson.getSpinLock("myLock");

long threadId = Thread.currentThread().getId();
RFuture<Void> lockFuture = lock.lockAsync(threadId);

// 加锁并10秒后自动解锁
RFuture<Void> lockFuture = lock.lockAsync(10, TimeUnit.SECONDS, threadId);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
RFuture<Boolean> lockFuture = lock.tryLockAsync(100, 10, TimeUnit.SECONDS, threadId);

lockFuture.whenComplete((res, exception) -> {
    // ...
    lock.unlockAsync(threadId);
});

响应式接口使用示例:

RedissonReactiveClient redisson = redissonClient.reactive();
RLockReactive lock = redisson.getSpinLock("myLock");

long threadId = Thread.currentThread().getId();
Mono<Void> lockMono = lock.lock(threadId);

// 加锁并10秒后自动解锁
Mono<Void> lockMono = lock.lock(10, TimeUnit.SECONDS, threadId);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
Mono<Boolean> lockMono = lock.tryLock(100, 10, TimeUnit.SECONDS, threadId);

lockMono.doOnSuccess(res -> {
   // ...
})
.doFinally(r -> lock.unlock(threadId).subscribe())
.subscribe();

RxJava3接口使用示例:

RedissonRxClient redisson = redissonClient.rxJava();
RLockRx lock = redisson.getSpinLock("myLock");

long threadId = Thread.currentThread().getId();
Completable lockRes = lock.lock(threadId);

// 加锁并10秒后自动解锁
Completable lockRes = lock.lock(10, TimeUnit.SECONDS, threadId);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
Single<Boolean> lockRes = lock.tryLock(100, 10, TimeUnit.SECONDS, threadId);

lockRes.doOnSuccess(res -> {
   // ...
})
.doFinally(() -> lock.unlock(threadId).subscribe())
.subscribe();

栅栏锁(Fenced Lock)

基于Redis或Valkey的分布式可重入栅栏锁(Java实现),通过**防护令牌(Fencing Token)**避免因客户端长时间停顿(如GC暂停)导致的锁失效问题。加锁方法返回令牌,服务需校验令牌是否递增。

锁看门狗机制与普通锁相同,支持leaseTime参数。解锁需由持有线程执行。

代码示例:

RFencedLock lock = redisson.getFencedLock("myLock");

// 加锁并获取令牌
Long token = lock.lockAndGetToken();

// 加锁并10秒后自动解锁,返回令牌
token = lock.lockAndGetToken(10, TimeUnit.SECONDS);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁,返回令牌
Long token = lock.tryLockAndGetToken(100, 10, TimeUnit.SECONDS);
if (token != null) {
   try {
     // 校验令牌是否 >= 旧令牌
     ...
   } finally {
       lock.unlock();
   }
}

异步接口使用示例:

RFencedLock lock = redisson.getFencedLock("myLock");

RFuture<Long> lockFuture = lock.lockAndGetTokenAsync();

// 加锁并10秒后自动解锁,返回令牌
RFuture<Long> lockFuture = lock.lockAndGetTokenAsync(10, TimeUnit.SECONDS);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁,返回令牌
RFuture<Long> lockFuture = lock.tryLockAndGetTokenAsync(100, 10, TimeUnit.SECONDS);

long threadId = Thread.currentThread().getId();
lockFuture.whenComplete((token, exception) -> {
    if (token != null) {
        try {
          // 校验令牌是否 >= 旧令牌
          ...
        } finally {
          lock.unlockAsync(threadId);
        }
    }
});

响应式接口使用示例:

RedissonReactiveClient redisson = redissonClient.reactive();
RFencedLockReactive lock = redisson.getFencedLock("myLock");

long threadId = Thread.currentThread().getId();
Mono<Long> lockMono = lock.lockAndGetToken();

// 加锁并10秒后自动解锁,返回令牌
Mono<Long> lockMono = lock.lockAndGetToken(10, TimeUnit.SECONDS);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁,返回令牌
Mono<Long> lockMono = lock.tryLockAndGetToken(100, 10, TimeUnit.SECONDS);

lockMono.doOnSuccess(token -> {
    if (token != null) {
        try {
          // 校验令牌是否 >= 旧令牌
          ...
        } finally {
            lock.unlock(threadId).subscribe();
        }
    }
})
.subscribe();

RxJava3接口使用示例:

RedissonRxClient redisson = redissonClient.rxJava();
RFencedLockRx lock = redisson.getFencedLock("myLock");

Single<Long> lockRes = lock.lockAndGetToken();

// 加锁并10秒后自动解锁,返回令牌
Single<Long> lockRes = lock.lockAndGetToken(10, TimeUnit.SECONDS);

// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁,返回令牌
Single<Long> lockRes = lock.tryLock(100, 10, TimeUnit.SECONDS);

long threadId = Thread.currentThread().getId();
lockRes.doOnSuccess(token -> {
    if (token != null) {
        try {
          // 校验令牌是否 >= 旧令牌
          ...
        } finally {
            lock.unlock(threadId).subscribe();
        }
    }
})
.subscribe();

官方文档

https://redisson.pro/docs/data-and-services/locks-and-synchronizers/

在这里插入图片描述


http://www.kler.cn/a/612538.html

相关文章:

  • stm32-IIC
  • 数据驱动的教育革命:让学习更智能、更个性化
  • React Native集成到原生iOS应用中
  • 蓝桥杯经典题解:班级活动分组问题的深度解析与优化实现
  • Nodemation(n8n)+MCP系列:1、什么是n8n以及Windows系统下的安装指南
  • Qt开发:QVariant的使用
  • 机器学习——神经网络、感知机
  • 修改 docker0 网卡配置的详细步骤
  • Java全栈面试宝典:JVM与Spring核心模块深度解析
  • RISC-V AIA学习3---APLIC第三部分
  • 手机销售终端MPR+LTC项目项目总体方案P183(183页PPT)(文末有下载方式)
  • 自由学习记录(48)
  • 【设计模式】工厂模式详解-----简单工厂模式、工厂方法模式、抽象工厂模式
  • JS—Promise:3分钟掌握Promise
  • C++11QT复习(二)
  • kafka 如何保证消息不丢失,详细讲解
  • docker-Dify外接Fastgpt知识库
  • python dict转换成json格式
  • 【C++游戏引擎开发】《线性代数》(2):矩阵加减法与SIMD集成
  • PHP 应用SQL 注入符号拼接请求方法HTTP 头JSON编码类