ReentrantLock底层原理、源码解析
ReentrantLock 的底层原理涉及到 Java 线程调度、锁的实现机制以及 AQS(AbstractQueuedSynchronizer)框架。
1. AQS 框架
ReentrantLock 是基于 AQS 实现的。AQS 是一个用于构建锁和其他同步组件的基础框架,它使用了一个 FIFO 队列来管理线程的等待顺序。
-
State 变量:AQS 使用一个
volatile int state
变量来记录锁的状态。对于 ReentrantLock 来说,这个变量表示当前锁被持有的次数。 -
CLH 队列:AQS 内部维护了一个 CLH(Craig, Landin, and Hagersten)队列,这是一个双向队列,用于管理等待获取锁的线程。
2. 锁的获取与释放
获取锁 (lock()
)
-
非公平模式:
-
线程首先尝试通过 CAS(Compare And Swap)操作直接获取锁(即修改
state
变量)。如果成功,则继续执行。 -
如果失败,检查当前线程是否已经持有锁(可重入性),如果是,则增加
state
计数。 -
如果仍然失败,将线程加入 CLH 队列并阻塞。
-
-
公平模式:
-
线程会先检查队列中是否有其他线程在等待,如果有,则直接进入队列等待,不会尝试直接获取锁。
-
如果没有其他线程在等待,则尝试通过 CAS 操作获取锁。
-
释放锁 (unlock()
)
-
当线程调用
unlock()
方法时,会减少state
计数。 -
如果
state
计数变为 0,表示锁完全释放,唤醒队列中的下一个线程。 -
如果
state
计数不为 0,说明当前线程仍然是锁的持有者,可以继续执行。
3. 可重入性
-
ReentrantLock 支持可重入特性,即同一个线程可以多次获取同一把锁而不会发生死锁。
-
每次获取锁时,
state
计数器加1;每次释放锁时,计数器减1;当计数器为0时,表示锁完全释放。
4. 条件变量
-
ReentrantLock 还支持条件变量(Condition),允许线程在某个条件下等待或唤醒其他线程。
-
条件变量内部也是基于 AQS 实现的,使用了 AQS 的 ConditionObject 类。
源码(解析在注释中):
import java.util.concurrent.TimeUnit;
import java.util.Collection;
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
private final Sync sync;
// 继承AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
/**
* 不公平尝试获取锁
* @param acquires
* @return
*/
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) { // 如果当前无线程持有锁,进行cas尝试上锁
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 如果该线程本来就持有锁,state增加,表示锁的重入
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
/**
* state 减 release
* @param releases
* @return
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 释放完全 持有线程设置为空
free = true;
setExclusiveOwnerThread(null);
}
// 设置state 为 减去 releases后的值
setState(c);
return free;
}
/**
* 判断持有锁线程是否为当前线程
*
*/
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
/**
*
* 获取当前持有持有锁的线程,无则返回null
*/
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
/**
* 获取state(重入数量),只有持有锁的线程可以获取
*/
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
/**
* 判断是否有线程上锁
*/
final boolean isLocked() {
return getState() != 0;
}
}
/**
* 非公平锁
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* 当前线程cas与aqs等待队列头的线程竞争, cas竞争失败会进入等待队列
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// 尝试获取锁,或者重入
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* 公平锁
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
/**
* 直接放入等待队列,排队上锁
*/
final void lock() {
acquire(1);
}
/**
* 尝试获取锁,或者重入
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 如果当前线程无需排队(前面没有在等待的线程)且 cas获取锁成功,设置持有线程为当前线程
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 重入 state + acquires
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
// 构造方法,默认不公平
public ReentrantLock() {
sync = new NonfairSync();
}
// 指定是否公平
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
// 后面就不再一一解释了,都是sync的方法。
}