Java并发编程——AQS原理解析
文章目录
- 一. AQS概述
- 二. AQS工作原理
- 2.1 原理概览
- 2.2 AQS 对资源的共享方式
- 1. 独占式资源访问(Exclusive Mode)
- 2. 共享式资源访问(Shared Mode)
- 3. AQS中的模版模式应用
- 三. Semaphore(信号量)-允许多个线程同时访问
- 3.1 核心方法
- 3.2 内部实现
- 3.3 示例
- 四. CountDownLatch (倒计时器)
- 4.1 工作原理
- 4.2 两种典型用法
- 4.3 示例
- 4.4 CountDownLatch 的不足
- 五. CyclicBarrier(循环栅栏)
- 5.2 原理
- 5.2 应用场景
- 5.3 实例
- 5.4 CyclicBarrier源码分析
- 5.5 CyclicBarrier 和 CountDownLatch 的区别
一. AQS概述
AQS
的全称为(AbstractQueuedSynchronizer
),这个类在 java.util.concurrent.locks
包下面。
AQS(AbstractQueuedSynchronizer)
是 Java 并发库中非常核心的一个类,它为构建锁(如 ReentrantLock
)和其他同步工具(如 CountDownLatch
, Semaphore
等)提供了框架。AQS的设计理念是通过一个先进先出(FIFO)队列来管理线程的排队和同步操作。
- AQS 主要负责管理同步状态(即锁的持有和释放状态)和线程的排队,底层使用了
CAS
(Compare And Swap)来实现原子操作,从而提高并发性能。- AQS 并没有直接提供同步工具,而是提供了一些抽象方法,供我们在继承 AQS 时进行实现。
二. AQS工作原理
2.1 原理概览
AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH
队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
CLH
(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列
(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配。
AQS(AbstractQueuedSynchronizer)原理图:
AQS
内部维护了一个 state
变量来表示同步状态
,通过内置的 FIFO
队列来完成获取资源线程的排队工作。AQS
使用 CAS
对该同步状态进行原子操作实现对其值的修改。
private volatile int state; //共享变量,使用volatile修饰保证线程可见性
状态信息通过 protected
类型的getState
,setState
,compareAndSetState
进行操作
//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
2.2 AQS 对资源的共享方式
AQS 提供了两种资源共享方式,分别是 独占式和共享式资源访问方式,这两种方式分别适用于不同的同步场景:
1. 独占式资源访问(Exclusive Mode)
独占式资源访问模式指的是同一时刻只有一个线程能够获取资源并执行任务。当资源被一个线程占用时,其他线程必须等待资源释放才能继续获取。这种模式适用于锁等需要线程独占资源的场景。
又可分为公平锁和非公平锁,
ReentrantLock
同时支持两种锁,下面以ReentrantLock
对这两种锁的定义做介绍:
- 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
- 非公平锁:当线程要获取锁时,先通过两次 CAS 操作去抢锁,如果没抢到,当前线程再加入到队列中等待唤醒。
下面来看 ReentrantLock 中相关的源代码:
ReentrantLock 默认采用非公平锁,因为考虑获得更好的性能,通过 boolean 来决定是否用公平锁(传入 true 用公平锁)。
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
public ReentrantLock() {
// 默认非公平锁
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
非公平锁的 lock
方法:
static final class NonfairSync extends Sync {
final void lock() {
// 2. 和公平锁相比,这里会直接先进行一次CAS,成功就返回了
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// AbstractQueuedSynchronizer.acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 这里没有对阻塞队列进行判断
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
公平锁的 lock
方法:
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
// AbstractQueuedSynchronizer.acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 1. 和非公平锁相比,这里多了一个判断:是否有线程在等待
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
总结
公平锁和非公平锁只有两处不同:
- 非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。
- 非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。
公平锁和非公平锁就这两点区别,如果这两次 CAS 都不成功,那么后面非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒。
相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。
Synchonized
是非公平锁,因为它采用非公平的锁获取机制。释放锁时,会唤醒等待线程,但不保证顺序,导致先到的线程可能被后续线程插队。
2. 共享式资源访问(Shared Mode)
共享式资源访问模式允许多个线程同时访问资源。在这种模式下,多个线程可以并发地使用资源,直到资源被占用的最大限制为止。适用于那些允许多个线程同时执行某些任务的场景,如信号量(Semaphore)、读写锁(ReadWriteLock)等。
3. AQS中的模版模式应用
在模板模式(Template Pattern)中,一个抽象类公开定义了执行它的方法的方式/模板。它的子类可以按需要重写方法实现,但调用将以抽象类中定义的方式进行。这种类型的设计模式属于行为型模式。
同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):
- 使用者继承
AbstractQueuedSynchronizer
并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state
的获取和释放) - 将
AQS
组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
AQS 提供的模板方法:
- 关键方法:
isHeldExclusively() //该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int) //独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int) //独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int) //共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int) //共享方式。尝试释放资源,成功则返回true,失败则返回false。
- 子类实现:
- 子类(如 ReentrantLock)继承 AQS,并重写上述方法以定义具体的同步逻辑。
- 默认情况下,这些方法会抛出
UnsupportedOperationException
,必须重写以实现功能。
- 特点:
- AQS中的其他方法通常是
final
,不可重写,确保算法框架的固定结构。 - 用户只需专注于资源的获取和释放逻辑,其余复杂管理(如线程排队、状态维护)由 AQS 处理。
示例:ReentrantLock中的应用
- state变量:用于表示资源的持有状态。初始化为 0,表示未锁定。
- 锁的获取与释放:
- 获取锁(
lock()
):通过tryAcquire()
独占获取。成功则state+1
,多个线程只能有一个成功。 - 释放锁(
unlock()
):通过tryRelease()
独占释放。state
减少,回零时释放资源给其他线程。
- 获取锁(
- 可重入特性:同一线程可以多次获取锁,
state
递增,释放时同样递减,确保最终能释放成功。
示例:CountDownLatch中的应用
- 初始化state:设置为N,表示需要完成的子线程数量。
- 子线程完成:每个子线程完成后调用
countDown()
,使用 CAS(比较并交换)操作将state
减 1。 - 主线程等待:直到
state
为 0,主线程被unpark()
,继续执行后续操作。
自定义同步器的选择
- 独占方式:当资源只能由一个线程使用时,实现
tryAcquire()
和tryRelease()
。- 共享方式:当资源可以被多个线程共享时(如读锁),实现
tryAcquireShared()
和tryReleaseShared()
。- 结合使用:AQS 支持同时实现独占和共享方式,如
ReentrantReadWriteLock
,其中读锁共享,写锁独占。
三. Semaphore(信号量)-允许多个线程同时访问
Semaphore
是 Java 并发包中用于控制同时访问某个资源的线程数量的工具。它通过许可证机制,允许多个线程(特定数量)同时访问共享资源,超过数量的线程会被阻塞,直到有许可证被释放。
3.1 核心方法
- acquire(): 获取一个许可证。如果没有可用许可证,线程会阻塞,直到有许可证被释放。
- release(): 释放一个许可证,增加可用许可证的数量,允许更多的线程继续执行。
- tryAcquire(): 尝试获取一个许可证,若获取失败则立即返回 false,不阻塞线程。
Semaphore 有两种模式,公平模式和非公平模式:
- 公平模式:线程按调用
acquire()
的顺序依次获取许可证,保证先进先得(FIFO)。- 非公平模式:默认模式,线程尝试直接获取许可证,可能引起“抢断”现象,导致某些线程长时间等待。
3.2 内部实现
- 基于
AbstractQueuedSynchronizer
(AQS),通过状态变量state
管理许可证数量。 acquire
方法减少state
,若不足则阻塞线程;release
方法增加state
,唤醒阻塞线程。
3.3 示例
public class SemaphoreExample1 {
private static final int threadCount = 550;
public static void main(String[] args) throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(300);
final Semaphore semaphore = new Semaphore(20);
for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {
try {
semaphore.acquire(); // 获取一个许可
test(threadnum);
semaphore.release(); // 释放一个许可
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
threadPool.shutdown();
System.out.println("finish");
}
public static void test(int threadnum) throws InterruptedException {
Thread.sleep(1000);
System.out.println("threadnum:" + threadnum);
Thread.sleep(1000);
}
}
四. CountDownLatch (倒计时器)
CountDownLatch
是 Java 并发工具包中的一个计数器工具,允许主线程等待多个线程完成任务后才继续执行。它通过一个计数器来实现同步,当计数器减到 0 时,主线程被唤醒。
4.1 工作原理
- 初始化计数器:使用构造方法传入一个初始值
count
,表示需要完成的任务数。 - 任务完成通知:每个任务完成后,调用
countDown()
方法,计数器减 1。 - 主线程等待:主线程调用
await()
方法阻塞,直到计数器为 0 方可继续执行。
4.2 两种典型用法
- 等待所有任务完成
- 应用场景:启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
- 某一线程在开始运行前等待 n 个线程执行完毕。将
CountDownLatch
的计数器初始化为 n :new CountDownLatch(n)
,每当一个任务线程执行完毕,就将计数器减 1countdownlatch.countDown()
,当计数器的值变为 0 时,在CountDownLatch
上await()
的线程就会被唤醒。
- 实现并行启动
- 应用场景:多个线程同时开始执行,比如并发测试。
初始化一个共享的CountDownLatch
对象,将其计数器初始化为 1 :new CountDownLatch(1)
,多个线程在开始执行任务前首先coundownlatch.await()
,当主线程调用countDown()
时,计数器变为 0,多个线程同时被唤醒。
4.3 示例
/**
*
* @
* @date 2025年2月17日
* @Description: CountDownLatch 使用方法示例
*/
public class CountDownLatchExample1 {
// 请求的数量
private static final int threadCount = 550;
public static void main(String[] args) throws InterruptedException {
// 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
ExecutorService threadPool = Executors.newFixedThreadPool(300);
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {// Lambda 表达式的运用
try {
test(threadnum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
countDownLatch.countDown();// 表示一个请求已经被完成
}
});
}
countDownLatch.await();
threadPool.shutdown();
System.out.println("finish");
}
public static void test(int threadnum) throws InterruptedException {
Thread.sleep(1000);// 模拟请求的耗时操作
System.out.println("threadnum:" + threadnum);
Thread.sleep(1000);// 模拟请求的耗时操作
}
}
上面的代码中,我们定义了请求的数量为 550,当这 550 个请求被处理完成之后,才会执行
System.out.println("finish");
。
与 CountDownLatch 的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()
方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。
其他 N 个线程必须引用闭锁对象,因为他们需要通知 CountDownLatch 对象,他们已经完成了各自的任务。这种通知机制是通过CountDownLatch.countDown()
方法来完成的;每调用一次这个方法,在构造函数中初始化的 count 值就减 1。所以当 N 个线程都调 用了这个方法,count 的值等于 0,然后主线程就能通过await()
方法,恢复执行自己的任务。
4.4 CountDownLatch 的不足
CountDownLatch
是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch
使用完毕后,它不能再次被使用。
五. CyclicBarrier(循环栅栏)
5.2 原理
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier
默认的构造方法是 CyclicBarrier(int parties)
,其参数表示屏障拦截的线程数量,每个线程调用await
方法告诉 CyclicBarrier
我已经到达了屏障,然后当前线程被阻塞。
CyclicBarrier
和CountDownLatch
非常类似,它也可以实现线程间的技术等待,但是它的功能比CountDownLatch
更加复杂和强大。主要应用场景和CountDownLatch
类似。CountDownLatch
的实现是基于AQS
的,而CycliBarrier
是基于ReentrantLock
(ReentrantLock 也属于 AQS 同步器)和Condition
的。
构造函数:
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
parties
就代表了有拦截的线程的数量,当拦截的线程数量达到这个值的时候就打开栅栏,让所有线程通过。
5.2 应用场景
- 多线程数据处理:如分块处理大数据集,每个线程处理一部分,屏障点用于汇总结果。
- 分布式系统:多个节点需同步处理数据或状态更新。
- 模拟与测试:需多个线程协同执行特定阶段的场景。
5.3 实例
示例一:
/**
*
* @
* @date 2025年2月17日
* @Description: 测试 CyclicBarrier 类中带参数的 await() 方法
*/
public class CyclicBarrierExample2 {
// 请求的数量
private static final int threadCount = 550;
// 需要同步的线程数量
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
public static void main(String[] args) throws InterruptedException {
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
threadPool.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
}
public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
System.out.println("threadnum:" + threadnum + "is ready");
try {
/**等待60秒,保证子线程完全执行结束*/
cyclicBarrier.await(60, TimeUnit.SECONDS);
} catch (Exception e) {
System.out.println("-----CyclicBarrierException------");
}
System.out.println("threadnum:" + threadnum + "is finish");
}
}
运行结果,如下:
可以看到当线程数量也就是请求数量达到我们定义的 5 个的时候, await方法之后的方法才被执行。
另外,CyclicBarrier
还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction)
,用于在线程到达屏障时,优先执行barrierAction
,方便处理更复杂的业务场景。示例代码如下:
/**
*
* @
* @date 2025年2月17日
* @Description: 新建 CyclicBarrier 的时候指定一个 Runnable
*/
public class CyclicBarrierExample3 {
// 请求的数量
private static final int threadCount = 550;
// 需要同步的线程数量
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
System.out.println("------当线程数达到之后,优先执行------");
});
public static void main(String[] args) throws InterruptedException {
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
threadPool.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
}
public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
System.out.println("threadnum:" + threadnum + "is ready");
cyclicBarrier.await();
System.out.println("threadnum:" + threadnum + "is finish");
}
}
运行结果,如下:
程序的输出将分为多个批次,每批次处理5个线程。每个批次会依次打印5个线程的“is ready”信息,随后由屏障触发打印一次提示信息,然后这5个线程依次打印“is finish”信息,如此反复,直到所有550个线程处理完毕。
5.4 CyclicBarrier源码分析
当调用 CyclicBarrier
对象调用 await()
方法时,实际上调用的是dowait(false, 0L)
方法。 await()
方法就像树立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到 parties
的值时,栅栏才会打开,线程才得以通过执行。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
// 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行。上面的示例中 count 的值就为 5。
private int count;
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 锁住
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
// 如果线程中断了,抛出异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// cout减1
int index = --count;
// 当 count 数量减为 0 之后说明最后一个线程已经到达栅栏了,也就是达到了可以执行await 方法之后的条件
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 将 count 重置为 parties 属性的初始化值
// 唤醒之前等待的线程
// 下一波执行开始
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
总结:
CyclicBarrier
内部通过一个count
变量作为计数器,cout
的初始值为parties
属性的初始化值,每当一个线程到了栅栏这里了,那么就将计数器减一。如果count
值为 0 了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。
5.5 CyclicBarrier 和 CountDownLatch 的区别
特性 | CyclicBarrier | CountDownLatch |
---|---|---|
用途 | 多线程到达屏障点后一起继续 | 等待多个线程完成任务 |
使用方式 | 双向等待,多次使用 | 单向等待,只能使用一次 |
计数机制 | 可重置,支持循环使用 | 一次性,不可重置 |
屏障动作 | 支持自定义屏障动作 | 不支持屏障动作 |