【并发编程】从AQS机制到同步工具类
AQS机制
Java 中常用的锁主要有两类,一种是 Synchronized 修饰的锁,被称为 Java 内置锁或监视器锁。另一种就是在 JUC 包中的各类同步器,包括 ReentrantLock(可重入锁)、Semaphore(信号量)、CountDownLatch 等。
所有的同步器都是基于AQS机制来构建的,而 AQS 类的核心数据结构是一种名为CLH锁的变体。
CLH锁
CLH锁是一种基于链表的自旋锁,它通过维护一个隐式的等待队列来实现线程的公平性和高效性。CLH锁的核心思想是每个线程在进入临界区时都会在队列尾部排队,并且自旋等待前驱节点的状态变化。CLH 锁的特点是,它将等待线程的状态信息保存在前驱节点中,而不是在本线程中,这样就避免了过多的缓存一致性流量。
隐式双向链表
加锁过程:
- 初始化:CLH 锁初始化时,
Tail
指向一个状态为false
的空节点。 - 线程入队:
- 线程尝试获取锁时,创建一个状态为
true
的新节点,表示正在等待锁。 - 线程通过 CAS 操作将新节点插入队列尾部,并更新
Tail
指针。
- 线程尝试获取锁时,创建一个状态为
- 轮询前驱节点状态:线程不断轮询其前驱节点的状态,直到前驱节点的状态变为
false
,表示可以获取锁。
解锁过程:
- 释放锁:线程完成临界区访问后,将当前节点的状态设置为
false
,表示释放锁。 - 后继节点获取锁:后继节点检测到前驱节点状态变化,获取锁并进入临界区。
AQS对CLH锁的改造
CLH锁存在缺点:
- 自旋操作,当锁持有时间长时会带来较大的 CPU 开销。
- 基本的 CLH 锁功能单一,不改造不能支持复杂的功能。
Java 的 AbstractQueuedSynchronizer(AQS)借鉴了 CLH 锁的思想,并在此基础上做了诸多改进,使其更适合构建高效、可扩展的同步器。以下是 AQS 对 CLH 锁所做的一些主要改造:
显式双向链表
AQS 使用了显式的双向链表来维护等待队列,而不是隐式的单向链表。这样改进的好处是,它允许 AQS 更方便地处理队列中的节点操作,比如取消、唤醒特定节点等。
AQS 加锁过程:
- 初始化:AQS 初始化时,等待队列为空,
head
和tail
指针均为null
。 - 线程入队:
- 线程尝试获取锁时,会检查当前锁的状态(
state
)。如果锁已被占用,线程会创建一个新的节点(Node
),表示自己需要等待锁。 - 线程通过 CAS 操作将新节点原子性地插入到等待队列的尾部,并更新
tail
指针指向新节点。 - 如果队列为空,当前节点会成为队列中的第一个节点。
- 线程尝试获取锁时,会检查当前锁的状态(
- 线程阻塞与等待:如果当前线程无法立即获取锁(
state
不为 0),线程会进入阻塞状态,调用LockSupport.park()
挂起自己,直到被唤醒为止。
AQS 解锁过程:
- 释放锁:持有锁的线程完成临界区的操作后,会调用
release(int arg)
方法将state
变量设置为 0,表示锁已释放。 - 唤醒后继节点:AQS 会调用
LockSupport.unpark(Thread)
来唤醒后继节点的线程,使其从park()
的阻塞状态中恢复。 - 后继节点获取锁:被唤醒的后继节点线程会重新尝试获取锁,通过
CAS
操作将state
从 0 更新为 1。如果成功获取锁,线程将进入临界区执行任务。
多种同步模式
AQS 提供了独占锁(exclusive)和共享锁(shared)两种模式。例如,ReentrantLock
使用的是独占模式,而 Semaphore
和 CountDownLatch
使用的是共享模式。
一般来说,自定义同步器的共享方式要么是独占,要么是共享,他们也只需实现钩子方法中的tryAcquire-tryRelease
或tryAcquireShared-tryReleaseShared
中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock
。
同步状态变量
AQS 使用一个 int
型变量(称为同步状态 state
)来表示锁的状态,而不是像 CLH 锁那样依赖前驱节点的布尔变量。AQS 通过 CAS 操作来修改这个状态,确保线程安全。
- ReentrantLock 的可重入性:
ReentrantLock
通过内部的state
变量表示锁的占用状态。初始state
为 0,表示未锁定。当线程 A 调用lock()
时,通过tryAcquire()
方法尝试获取锁并将state
加 1。若获取成功,线程 A 可以多次获取同一锁,state
会累加,体现可重入性。释放时,state
减 1,直到回到 0,锁才真正释放,其他线程才有机会获取锁。 - CountDownLatch 的倒计时:
CountDownLatch
使用state
变量表示剩余的倒计时数。初始state
为 N,表示 N 个子线程。每个子线程执行完任务后调用countDown()
,state
减 1。所有子线程执行完毕(state
变为 0)后,主线程被唤醒,继续执行后续操作。
实现同步器
- AQS 的设计:AQS 提供了一个基础的框架和队列管理功能,但具体的同步逻辑并没有在 AQS 中实现,而是留给具体的同步器来定义。这就是模板方法模式的典型应用:AQS 提供了模板方法,这些模板方法依赖于子类实现的钩子方法。
- 重写钩子方法:具体的同步器,如
ReentrantLock
、Semaphore
,会通过其内部定义的Sync
类(继承自 AQS)来重写这些钩子方法。这些重写的方法决定了锁的行为(如是否公平、是否可重入、许可的数量等)。
什么是钩子方法? 钩子方法是一种被声明在抽象类中的方法,一般使用
protected
关键字修饰,它可以是空方法(由子类实现),也可以是默认实现的方法。模板设计模式通过钩子方法控制固定步骤的实现。除了这些钩子方法,AQS类中其他方法都是final关键字修饰的,无法被重写。
//独占方式。尝试获取资源,成功则返回true,失败则返回false。
protected boolean tryAcquire(int)
//独占方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryRelease(int)
//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryAcquireShared(int)
//共享方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryReleaseShared(int)
//该线程是否正在独占资源。只有用到condition才需要去实现它。
protected boolean isHeldExclusively()
常见同步工具类
ReentrantLock
ReentrantLock是一种可重入的互斥锁,允许同一个线程在持有锁的情况下多次获取锁。它提供了更灵活的锁机制,可以显式地获取和释放锁,还支持公平锁和非公平锁的选择。通常用来实现线程间的同步,防止多个线程同时访问共享资源。
ReentrantLock
有一个内部类 Sync
,Sync
继承 AQS,添加锁和释放锁的大部分操作实际上都是在 Sync
中实现的。Sync
有公平锁 FairSync
和非公平锁 NonfairSync
两个子类。
公平锁/非公平锁
- 抽象类
Sync
继承自AbstractQueuedSynchronizer,实现了AQS的部分方法; NonfairSync
继承自Sync
,实现了Sync中的方法,主要用于非公平锁的获取;FairSync
继承自Sync
,实现了Sync中的方法,主要用于公平锁的获取。
abstract static class Sync extends AbstractQueuedSynchronizer {}
static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}
可以通过构造方法实现公平锁或非公平锁。
private final Sync sync;
// 默认构造方法,ReentrantLock默认是非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 自己可选择使用公平锁还是非公平锁,传入true是公平锁,传入false是非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
公平锁和非公平锁只有两处不同:
-
公平锁: 在调用
lock()
后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。但是公平锁会先判断等待列中是否有处于等待状态的线程,如果有的话,就乖乖加入到等待线程中去排队,而不能直接插队获取锁。 -
非公平锁: 在调用
lock()
中的第一次CAS 失败后,调用的是nonfairTryAcquire()
非公平方法,如果发现锁这个时候被释放了(state == 0),非公平锁就会直接 CAS 抢锁,不会管当前等待队列中有没有等待线程。但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。
可中断锁
可中断锁与不可中断锁的区别在于:线程尝试获取锁操作失败后,在等待过程中,如果该线程被其他线程中断了,它是如何响应中断请求的。lock
方法会忽略中断请求,继续获取锁直到成功;而lockInterruptibly
则直接抛出中断异常来立即响应中断,由上层调用者处理中断。
lock()
适用于锁获取操作不受中断影响的情况,此时可以忽略中断请求正常执行加锁操作,因为该操作仅仅记录了中断状态(通过Thread.currentThread().interrupt()
操作,只是恢复了中断状态为true,并没有对中断进行响应)。- 如果要求被中断线程不能参与锁的竞争操作,则应该使用
lockInterruptibly
方法,一旦检测到中断请求,立即返回不再参与锁的竞争并且取消锁获取操作(即finally中的cancelAcquire操作)
可重入锁
可重入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁阻塞,该特性的首先需要解决以下两个问题:
- 线程再次获取锁: 所需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次获取成功;
- 锁的最终释放: 线程重复n次获取了锁,随后在第n次释放该锁后,其它线程能够获取到该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前线程被重复获取的次数,而被释放时,计数自减,当计数为0时表示锁已经成功释放。
首先会通过compareAndSetState(int, int)
方法来尝试修改同步状态,如果修改成功则表示获取到了锁,然后调用setExclusiveOwnerThread(Thread)
方法来设置获取到锁的线程。
该方法继承自AbstractOwnableSynchronizer类,它的主要作用就是记录获取到独占锁的线程,AOS类的定义很简单:
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
// The current owner of exclusive mode synchronization.
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
CountDownLatch
CountDownLatch是一个计数器,它允许一个或多个线程等待其它线程完成操作后再继续执行,通常用来实现一个线程等待其它多个线程完成操作之后再继续执行的操作。
CountDownLatch内部维护了一个计数器,该计数器通过CountDownLatch的构造方法指定。当调用await()
方法时,它将一直阻塞,直到计数器变为0。当其它线程执行完指定的任务后,可以调用countDown()
方法将计数器减一。当计数器减为0,所有的线程将同时被唤醒,然后继续执行。
常用方法
- CountDownLatch(int count): CountDownLatch的构造方法,可通过count参数指定计数次数,但是要大于等于0,小于0会抛IIegalArgumentException异常。
- void await(): 如果计数器不等于0,会一直阻塞(在线程没被打断的情况下)。
- boolean await(long timeout,TimeUnit unit): 除非线程被中断,否则会一直阻塞,直至计数器减为0或超出指定时间timeout,当计数器为0返回true,当超过指定时间,返回false。
- void countDown(): 调用一次,计数器就减1,当等于0时,释放所有线程。如果计数器的初始值就是0,那么就当没有用CountDownLatch吧。
- long getCount(): 返回当前计数器的数量,可以用来测试和调试。
使用实例
定义线程任务,实现Runnable
接口
@AllArgsConstructor
public class CountWork implements Runnable {
private CountDownLatch countDownLatch;
@Override
public void run() {
System.out.println("执行任务");
countDownLatch.countDown();
}
}
定义测试类,使用for循环执行任务,知道任务结束完毕后打印结果。
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(3);
CountWork countWork = new CountWork(countDownLatch);
for (int i = 0; i < 3; i++) {
new Thread(countWork).start();
}
countDownLatch.await();
System.out.println("所有任务执行完毕");
}
}
执行结果:
可以发现,当所有任务执行完毕后,才执行了测试类后续的打印任务。
但是如果使用构造函数创建了4个计数new CountDownLatch(4)
,但实际只有3个线程,则测试类阻塞,无法打印结果。
CyclicBarrier
CyclicBarrier是一个同步屏障,它允许多个线程相互等待,直到到达某个公共屏障点,才能继续执行。通常用来实现多个线程在同一个屏障处等待,然后再一起继续执行的操作。
CyclicBarrier也维护了一个类似计数器的变量,通过CyclicBarrier的构造函数指定,需要大于0,否则抛IllegalArgumenException异常。当线程到达屏障位置时,调用await()方法进行阻塞,直到所有线程到达屏障位置时,所有线程才会被释放,而屏障将会被重置为初始值以便下次使用。
常用方法
- CyclicBarrier(int parties): CyclicBarrier的构造方法,可通过parties参数指定需要到达屏障的线程个数,但是要大于0,否则会抛IllegalArgumentException异常。
- CyclicBarrier(int parties,Runnable barrierAction): 另一个构造方法,parties作用同上,barrierAction表示最后一个到达屏障点的线程要执行的逻辑。
- int await(): 表示线程到达屏障点,并等待其它线程到达,返回值表示当前线程在屏障中的位置(第几个到达的)。
- int await(long timeout,TimeUnit unit): 与await()类似,但是设置了超时时间,如果超过指定的时间后,仍然还有线程没有到达屏障点,则等待的线程会被唤醒并执行后续操作。
- void reset(): 重置屏障状态,即将屏障计数器重置为初始值。
- int getParties(): 获取需要同步的线程数量。
- int getNumberWaiting(): 获取当前正在等待的线程数量。
使用实例
定义线程执行的任务,当线程执行完打印任务后,阻塞等待其他线程。
@AllArgsConstructor
public class BarrierTask implements Runnable{
private CyclicBarrier barrier;
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 正在执行");
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
}
定义最终执行的业务逻辑
public class FinalTask implements Runnable{
@Override
public void run() {
System.out.println("所有线程执行完毕");
}
}
定义测试类,当所有线程执行到屏障后,触发最终的业务逻辑。
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new FinalTask());
BarrierTask barrierTask = new BarrierTask(cyclicBarrier);
for (int i = 1; i <= 4; i++) {
new Thread(barrierTask, "线程-" + i).start();
}
}
}
执行结果:
对比CountDownLatch
- CyclicBarrier维护线程的计数,而CounDownLatch维护任务的计数。
- 可重用性: 两者最明显的差异就是可重用性。CyclicBarrier所有线程都到达屏障后,计数会重置为初始值。而CountDownLatch永远不会重置。
Semaphore
Semaphore是一个计数信号量,它允许多个线程同时访问共享资源,并通过计数器来控制访问数量。它通常用来实现一个线程需要等待获取一个许可证才能访问共享资源,或者需要释放一个许可证才能完成的操作。
Semaphore维护了一个内部计数器(许可permits),主要有两个操作,分别对应Semaphore的acquire和release方法。acquire方法用于获取资源,当计数器大于0时,将计数器减1;当计数器等于0时,将线程阻塞。release方法用于释放资源,将计数器加1,并唤醒一个等待中的线程。
常用方法
- Semaphore(int permits): 构造方法,permits表示Semaphore中的许可数量,它决定了同时可以访问某个资源的线程数量。
- Semaphore(int permits,boolean fair): 构造方法,当fair为ture,设置为公平信号量。
- void acquire(): 获取一个许可,如果没有许可,则当前线程被阻塞,直到有许可。如果有许可该方法会将许可数量减1。
- void acquire(int permits): 获取指定数量的许可,获取成功同样将许可减去指定数量,失败阻塞。
- void release(): 释放一个许可,将许可数加1。如果有其他线程正在等待许可,则唤醒其中一个线程。
- void release(int n): 释放n个许可。
- int availablePermits(): 当前可用许可数。
使用实例
信号量的构造方法传入参数为5,设置六个进程获取这5个资源。
public class SemaphoreTest {
Semaphore park;
public SemaphoreTest(int permits) {
park = new Semaphore(permits);
}
public void enter() throws InterruptedException {
park.acquire();
System.out.println(Thread.currentThread().getName() + " 进入");
}
public void leave() {
park.release();
System.out.println(Thread.currentThread().getName() + " 驶出");
}
public static void main(String[] args) {
SemaphoreTest test = new SemaphoreTest(5);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
test.enter();
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
test.leave();
}, "车牌号" + i).start();
}
}
}
返回结果:
可以发现同一时刻只有五个线程获取到资源,当有资源释放时(车牌号5 驶出),其他线程才能获取资源(车牌号6 进入)。