Java并发05 - AQS共享模式的数据结构
AQS共享模式的数据结构
文章目录
- AQS共享模式的数据结构
- 一:Semaphore信号量
- 1:信号量概述
- 2:公平锁和非公平锁的实现原理
- 2.1:非公平获取锁 nonfair acquire()
- 2.2:公平获取锁 fair acquire()
- 2.3:释放锁 release()
- 3:ReetrantLock与Semaphore的区别
- 二:CountDownLatch计数递减门阀
- 1:概述和两大方法
- 2:两种用法:多等一和一等多
- 2.1:多等一:检验并发和死锁
- 2.2:一等多:保证依赖关系
- 3:CountDownLatch的实现原理
- 3.1:await()原理
- 3.2:countDown()原理
- 3.3:和CyclicBarrier区别
- 三:ReentrantReadWriteLock可重入读写锁
- 1:读写锁简介
- 2:写锁的获取和释放
- 3:读锁的获取和释放
- 4:实例演示
共享模式与独占模式区别在于:共享模式下允许多条线程同时获取锁资源,而独占模式中,在同一时刻只允许一条线程持有锁资源。
一:Semaphore信号量
1:信号量概述
Semaphore信号量是java.util.concurrent(JUC)包下的一个并发工具类
可以用来控制同一时刻访问临界资源(共享资源)的线程数,以确保访问临界资源的线程能够正确、合理的使用公共资源。
而其内部则于ReetrantLock一样,都是通过直接或间接的调用AQS框架的方法实现。
在Semaphore中存在一个“许可”的概念:在初始化Semaphore信号量需要为这个许可传入一个数值,该数值表示表示同一时刻可以访问临界资源的最大线程数,也被称为许可集。
一条线程想要访问临界资源则需要先执行acquire()获取一个许可,如果线程在获取时许可集已经被分配完了,那么该线程则会进入阻塞等待状态,直至有其他持有许可的线程释放后才有可能获取到许可。
当线程访问完成临界资源后则需要执行release()方法释放已获取的许可。
下面是Semaphore信号量提供的一些主要方法:
// 调用该方法后线程会从许可集中尝试获取一个许可
public void acquire()
// 线程调用该方法时会释放已获取的许可
public void release()
// Semaphore构造方法:permits→许可集数量
Semaphore(int permits)
// Semaphore构造方法:permits→许可集数量,fair→公平与非公平
Semaphore(int permits, boolean fair)
// 从信号量中获取许可,该方法不响应中断
void acquireUninterruptibly()
// 返回当前信号量中未被获取的许可数
int availablePermits()
// 获取并返回当前信号量中立即未被获取的所有许可
int drainPermits()
// 返回等待获取许可的所有线程Collection集合
protected Collection<Thread> getQueuedThreads();
// 返回等待获取许可的线程估计数量
int getQueueLength()
// 查询是否有线程正在等待获取当前信号量中的许可
boolean hasQueuedThreads()
// 返回当前信号量的公平类型,如为公平锁返回true,非公平锁为false
boolean isFair()
// 获取当前信号量中一个许可,当没有许可可用时直接返回false不阻塞线程
boolean tryAcquire()
// 在给定时间内获取当前信号量中一个许可,超时还未获取成功则返回false
boolean tryAcquire(long timeout, TimeUnit unit)
下面举一个三个许可(初始化了三个许可,也就意味着在同一时刻允许三条线程同时访问临界资源)的demo
package com.cui.commonboot.myjuc;
import java.util.concurrent.*;
/**
* <p>
* 功能描述:信号量测试
* </p>
*
* @author cui haida
* @date 2023/12/28/20:40
*/
public class SemaphoreTest {
public static void main(String[] args) {
// 自定义一个线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
4 * 2, 40, // 核心线程和最大线程
30, TimeUnit.SECONDS, // 临时线程无连接时的存活时间
new LinkedBlockingDeque<Runnable>(1024 * 10), // 阻塞队列数据结构和容量
Executors.defaultThreadFactory(), // 默认线程工厂
new ThreadPoolExecutor.AbortPolicy() // 饱和策略
);
// 设置信号量同一时刻最大线程数为3
int permits = 3;
final Semaphore semaphore = new Semaphore(permits);
// 模拟一百个对象进行请求
for (int index = 0; index < 100; index++) {
final int serial = index;
threadPool.execute(()->{
try {
// 使用acquire()获取许可
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "线程成功获取许可!请求序号: " + serial);
// 模拟数据库IO
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 临界资源访问结束后释放许可
semaphore.release();
}
});
}
}
}
Semaphore信号量里的“许可”概念与前面我们文章中分析的互斥锁的“同步状态标识”有着异曲同工之妙。
创建信号量对象时,只给许可集分配一个数量即可生成独占锁:
final Semaphore semaphore = new Semaphore(1);
2:公平锁和非公平锁的实现原理
在创建Semaphore对象时,也和ReetrantLock一样手动选择公平锁和非公平锁:
public Semaphore(int permits) {
sync = new NonfairSync(permits); // 默认创建的是非公平锁类型
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
获取锁(许可)的方法tryAcquireShared(arg)分别由两个子类FairSync和NofairSync实现,因为公平锁和非公平锁的加锁方式毕竟存在些许不同
而释放锁tryReleaseShared(arg)的逻辑则交由Sync实现,因为释放操作都是相同的,因此放在父类Sync中实现自然是最好的方式。
2.1:非公平获取锁 nonfair acquire()
static final class NonfairSync extends Sync {
// 构造函数:将给定的许可数permits传给父类同步状态标识state
NonfairSync(int permits) {
super(permits);
}
// 释放锁的方法实现则是直接调用父类Sync的释放锁方法
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
// 父类 - Sync类构造函数
Sync(int permits) {
setState(permits); // 调用AQS内部的set方法
}
// AQS(AbstractQueuedSynchronizer)同步器
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer {
// 同步状态标识
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
// 对state变量进行CAS操作
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
}
从上述分析中可知,Semaphore对象创建时传入的许可数permits,实则其实最终是在对AQS内部的state进行初始化。
初始化完成后,state代表着当前信号量对象的可用许可数。(state=p count)
Semaphore获取许可的方法acquire()的具体实现,源码如下:
// Semaphore类 → acquire()方法
public void acquire() throws InterruptedException {
// Sync类继承AQS,此处直接调用AQS内部的acquireSharedInterruptibly()方法
sync.acquireSharedInterruptibly(1);
}
// AbstractQueuedSynchronizer类 → acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// 判断是否出现线程中断信号(标志)
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 如果tryAcquireShared(arg)执行结果不小于0,则线程获取同步状态成功
if (tryAcquireShared(arg) < 0)
// 未获取成功加入同步队列阻塞等待
doAcquireSharedInterruptibly(arg);
}
// Semaphore类 → NofairSync内部类 → tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
// 调用了父类Sync中的实现方法
return nonfairTryAcquireShared(acquires);
}
// Syn类 → nonfairTryAcquireShared()方法
abstract static class Sync extends AbstractQueuedSynchronizer {
final int nonfairTryAcquireShared(int acquires) {
// 开启自旋死循环
for (;;) {
int available = getState();
int remaining = available - acquires;
// 判断信号量中可用许可数是否已<0或者CAS执行是否成功
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
}
// 在doAcquireSharedInterruptibly(arg)方法中总共做了三件事:
// 一、创建一个状态为Node.SHARED共享模式的节点,并通过addWaiter()加入队列
// 二、加入成功后开启自旋,判断前驱节点是否为head,是则尝试获取同步状态标识,
// 获取成功后,将自己设置为head节点,如果可用许可数大于0则唤醒后继节点的线程
// 三、如果前驱节点不为head的节点以及前驱节点为head节点但获取同步状态失败的节点
// 则调用shouldParkAfterFailedAcquire(p,node)判断前驱节点的状态是否为SIGNAL状态(一般shouldParkAfterFailedAcquire(p,node)中的for循环至少需要执行两次以上才会返回ture
// 第一次把前驱节点设置为SIGNAL状态,第二次检测到SIGNAL状态)
// 如果是则调用parkAndCheckInterrupt()挂起当前线程并返回线程中断状态
// AbstractQueuedSynchronizer类 → doAcquireSharedInterruptibly()方法
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 创建节点状态为Node.SHARED共享模式的节点并将其加入同步队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 开启自旋操作
for (;;) {
final Node p = node.predecessor();
// 判断前驱节点是否为head
if (p == head) {
// 尝试获取同步状态state
int r = tryAcquireShared(arg);
// 如果r不小于0说明获取同步状态成功
if (r >= 0) {
// 将当前线程结点设置为头节点并唤醒后继节点线程
setHeadAndPropagate(node, r);
p.next = null; // 置空方便GC
failed = false;
return;
}
}
// 调整同步队列中node节点的状态并判断是否应该被挂起
// 并判断是否存在中断信号,如果需要中断直接抛出异常结束执行
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
// 结束该节点线程的请求
cancelAcquire(node);
}
}
// AbstractQueuedSynchronizer类 → setHeadAndPropagate()方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 获取同步队列中原本的head头节点
setHead(node); // 将传入的node节点设置为头节点
/*
* propagate=剩余可用许可数,h=旧的head节点
* h==null,(h=head)==null:
* 非空判断的标准写法,避免原本head以及新的头节点node为空
* 如果当前信号量对象中剩余可用许可数大于0或者
* 原本头节点h或者新的头节点node不是结束状态则唤醒后继节点线程
*
* 写两个if的原因在于避免造成不必要的唤醒
* 因为很有可能唤醒了后续
节点的线程之后,还没有线程释放许可/锁,从而导致再次陷入阻塞
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 避免传入的node为同步队列的唯一节点,
// 因为队列中如果只存在node一个节点,那么后驱节点s必然为空
if (s == null || s.isShared())
doReleaseShared(); // 唤醒后继节点
}
}
整体逻辑如下
如上分析的是可响应线程中断请求的获取许可方式,而Semaphore中也实现了一套不可中断式的获取方法
可响应线程中断的方法在每次操作之前会先检测线程中断信号,如果线程需要中断操作,则直接抛出异常强制中断线程的执行。
反之不可响应线程中断的方法不会检测线程中断信号,而且不会抛出异常强制中断。
// Semaphore类 → acquireUninterruptibly()方法
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
// AbstractQueuedSynchronizer类 → acquireShared()方法
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// AbstractQueuedSynchronizer类 → doAcquireShared()方法
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 在前面的可中断式获取锁方法中此处是直接抛出异常强制中断线程的
// 而在不可中断式的获取方法中,这里是没有抛出异常中断线程的
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2.2:公平获取锁 fair acquire()
前面一步步进来的就不赘述了,直接看FairSync.tryAcquireShared()方法
// Semaphore类 → FairSync公平锁类
static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}
// Semaphore类 → FairSync内部类 → tryAcquireShared()子类实现
protected int tryAcquireShared(int acquires) {
for (;;) {
// 不同点:先判断队列中是否存在节点后再执行获取锁操作
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
}
// 判断队列中是否存在节点
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
可以非常明显的看出唯一不同点在于:公平锁的模式下获取锁,会先调用hasQueuedPredecessors()
方法判断同步队列中是否存在节点
- 如果存在则直接返回-1回到
acquireSharedInterruptibly()
方法if(tryAcquireShared(arg)<0)
判断,调用doAcquireSharedInterruptibly(arg)
方法将当前线程封装成Node.SHARED
共享节点加入同步队列等待。 - 如果队列中不存在节点则尝试直接获取锁/许可。
2.3:释放锁 release()
使用Semaphore时释放锁则调用的是Semaphore.release()方法
调用该方法之后线程持有的许可会被释放,同时permits/state加一。源码如下:
// Semaphore类 → release()方法
public void release() {
sync.releaseShared(1);
}
// AbstractQueuedSynchronizer类 → releaseShared(arg)方法
public final boolean releaseShared(int arg) {
// 调用子类Semaphore中tryReleaseShared()方法实现
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// Semaphore类 → Sync子类 → tryReleaseShared()方法
// 释放锁/许可的方法逻辑相对来说比较简单,对AQS中的state加一释放获取的同步状态。
// 不过值得注意的是:AQS独占模式实现中,释放锁的逻辑中是没有保证线程安全的
// 因为独占模式的释放锁逻辑永远只会存在一条线程同时操作。
// 而在共享模式中,可能会存在多条线程同时释放许可/锁资源,所以在此处使用了CAS+自旋的方式保证线程安全问题。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 获取AQS中当前同步状态state值
int current = getState();
// 对当前的state值进行增加操作
int next = current + releases;
// 不可能出现,除非传入的releases为负数
if (next < current)
throw new Error("Maximum permit count exceeded");
// CAS更新state值为增加之后的next值
if (compareAndSetState(current, next)) // CAS+自旋的方式保证线程安全问题
return true;
}
}
// 如果上面的方法tryReleaseShared返回true,将会执行doReleaseShared唤醒后继节点线程。
// AbstractQueuedSynchronizer类 → doReleaseShared()方法
private void doReleaseShared() {
/*
* 为了防止释放过程中有其他线程进入队列,这里必须开启自旋
* 如果头节点设置失败则重新检测继续循环
*/
for (;;) {
// 获取队列head头节点
Node h = head;
// 如果头节点不为空并且队列中还存在其他节点
if (h != null && h != tail) {
// 获取头节点的节点状态
int ws = h.waitStatus;
// 如果节点状态为SIGNAL等待唤醒状态则代表
if (ws == Node.SIGNAL) {
// 尝试cas修改节点状态值为0
// 失败则继续下次循环
// 成功则唤醒头节点的后继节点
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h); // 唤醒后继节点线程
}
// 节点状态为0时尝试将节点状态修改为PROPAGATE传播状态
// 失败则跳出循环继续下次循环
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 如果当前队列头节点发生变化继续循环,反之则终止自旋
if (h == head)
break;
}
}
// AbstractQueuedSynchronizer类 → unparkSuccessor()方法
// 参数:传入需要唤醒后继节点的节点
private void unparkSuccessor(Node node) {
// 获取node节点的线程状态
int ws = node.waitStatus;
if (ws < 0)
// 设置head节点为0
compareAndSetWaitStatus(node, ws, 0);
// 获取后继节点
Node s = node.next;
// 如果后继节点为空或线程状态已经结束
if (s == null || s.waitStatus > 0) {
s = null;
// 遍历整个队列拿到可唤醒的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒后继节点线程
LockSupport.unpark(s.thread);
}
至此,释放许可逻辑结束,只需要更新state值后调用doReleaseShared()方法唤醒后继节点线程即可。
在理解doReleaseShared()方法时需要额外注意:调用doReleaseShared()方法的线程会存在两种:
- 一是释放共享锁/许可数的线程。调用release()方法释放许可时必然调用它唤醒后继线程
- 二是刚获取到共享锁/许可数的线程。一定情况下,在满足“超长判断”的任意条件时也会调用它唤醒后继线程
// 这个超长的判断张这样
// AbstractQueuedSynchronizer类 → setHeadAndPropagate()方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 获取同步队列中原本的head头节点
setHead(node); // 将传入的node节点设置为头节点
/*
* propagate=剩余可用许可数,h=旧的head节点
* h==null,(h=head)==null:
* 非空判断的标准写法,避免原本head以及新的头节点node为空
* 如果当前信号量对象中剩余可用许可数大于0或者
* 原本头节点h或者新的头节点node不是结束状态则唤醒后继节点线程
*
* 写两个if的原因在于避免造成不必要的唤醒,因为很有可能唤醒了后续
* 节点的线程之后,还没有线程释放许可/锁,从而导致再次陷入阻塞
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 避免传入的node为同步队列的唯一节点,
// 因为队列中如果只存在node一个节点,那么后驱节点s必然为空
if (s == null || s.isShared())
doReleaseShared(); // 唤醒后继节点
}
}
// 超长判断:该判断的作用在于面对各种特殊情况能够时保证及时获取锁
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 避免传入的node为同步队列的唯一节点,
// 因为队列中如果只存在node一个节点,那么后驱节点s必然为空
if (s == null || s.isShared())
doReleaseShared(); // 唤醒后继节点
}
3:ReetrantLock与Semaphore的区别
对比项 | ReetrantLock | Semaphore |
---|---|---|
实现模式 | 独占模式 | 共享模式 |
获取锁方法 | tryAcquire() | tryAcquireShared() |
释放锁方法 | tryRelease() | tryReleaseShared() |
是否支持重入 | ==支持 == | 不支持 |
线程中断 | 支持 | 支持 |
Condition | ==支持 == | 不支持 |
队列数量 | 一个同步+多个等待 | 单个同步 |
节点类型 | Node.EXCLUSIVE | Node.SHARED |
二:CountDownLatch计数递减门阀
1:概述和两大方法
可以将这个CountDownLatch想象成为大门
- 初始化时会给出关闭大门的数量
- await方法就是会检查大门是不是都开放了,如果没有,将堵住所有的线程进入
- countDown方法就是将一个大门开放
在CountDownLatch初始化时和Semaphore一样,我们需要传入一个数字count作为最大线程数
这个参数同样会间接的赋值给AQS内部的state同步状态标识。一般我们会调用它的两个方法:await()与countDown():
await()
:调用await()方法的线程会被封装成共享节点加入同步队列阻塞等待,直至state=0时才会唤醒同步队列中所有的线程countDown()
:调用countDown()方法的线程会对state减一
2:两种用法:多等一和一等多
2.1:多等一:检验并发和死锁
多等一:初始化count=1,多条线程await()阻塞,一条线程调用countDown()唤醒所有阻塞线程
// 可以将这个想象成为大门,初始化一个关闭的大门
final CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 1; i <= 3; i++) {
new Thread(() -> {
try {
System.out.println("线程:" + Thread.currentThread().getName() + "....阻塞等待!");
countDownLatch.await(); // 线程会被封装成共享节点加入同步队列阻塞等待
// 可以在此处调用需要并发测试的方法或接口
System.out.println("线程:" + Thread.currentThread().getName() + "....开始执行!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T" + i).start();
}
Thread.sleep(1000);
countDownLatch.countDown(); // 在这里才开放大门, 前面所有被阻塞的线程可以通过,以此模拟高并发,死锁等情况
/*
程序开始运行:
线程:T2....阻塞等待!
线程:T1....阻塞等待!
线程:T3....阻塞等待!
程序运行一秒后(三条线程几乎同时执行):
线程:T2....开始执行!
线程:T1....开始执行!
线程:T3....开始执行!
*/
2.2:一等多:保证依赖关系
一等多:初始化count=x,多线程countDown()对count进行减一,一条线程await()阻塞,当count=0时阻塞的线程开始执行
CountDownLatch countDownLatch = new CountDownLatch(3);
Thread thread1 = new Thread(() -> {
methodA();
countDownLatch.countDown();
});
Thread thread2 = new Thread(() -> {
methodB();
countDownLatch.countDown();
});
Thread thread3 = new Thread(() -> {
methodC();
countDownLatch.countDown();
});
thread1.start();
thread2.start();
thread3.start();
countDownLatch.await(); // 保证t1, t2, t3都执行完了,才能执行main方法
🎉 在实际开发过程中,往往很多并发任务都存在前后依赖关系,如详情页需要调用多个接口完成数据聚合、并行执行获取到数据后需要进行结果合并、多个操作完成后需要进行数据检查等等,而这些场景下我们可以使用一等多的用法
3:CountDownLatch的实现原理
CountDownLatch也是基于AQS共享模式实现的,与Semaphore一样,会将传入的count间接的赋值给AQS内部的state同步状态标识。
3.1:await()原理
private final Sync sync;
// CountDownLatch构造方法
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
// 对其内部Sync对象进行初始化
this.sync = new Sync(count);
}
// CountDownLatch类 → Sync内部类
private static final class Sync extends AbstractQueuedSynchronizer {
// Sync构造函数:对AQS内部的state进行赋值
Sync(int count) {
setState(count);
}
// 调用await()方法最终会调用到这里
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
}
// CountDownLatch类 → await()方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AbstractQueuedSynchronizer类 → acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 最终调用到CountDownLatch内部Sync类的tryAcquireShared()方法
// 此时如果下面的tryAcquireShared发现state == 0,可以得到1,不满足条件,将会放行
// 如果发现state != 0, 得到 -1,将走下面的阻塞逻辑【大门没有完全释放完成】
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// CountDownLatch类 → Sync内部类 → tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
3.2:countDown()原理
// CountDownLatch类 → countDown()方法
public void countDown() {
sync.releaseShared(1);
}
// AbstractQueuedSynchronizer类 → releaseShared()方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// AbstractQueuedSynchronizer类 → 模板方法:tryReleaseShared()
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// CountDownLatch类 → Sync内部类 → tryReleaseShared()方法
// 调用countDown()方法最终会调用到这里
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
3.3:和CyclicBarrier区别
可以发现,CountDownLatch和前面说的独占锁CyclicBarrier在功能上非常相似,二者具体对比如下:
对比项 | CountDownLatch | CyclicBarrier |
---|---|---|
实现模式 | 共享模式 | 独占模式 |
计数方式 | 减法 | 减法 |
复用支持 | 不可复用 | 计数可置0 |
重置支持 | 不可重置 | 可重置 |
设计重点 | 一等多 | 多等多 |
三:ReentrantReadWriteLock可重入读写锁
1:读写锁简介
读写锁的内部包含两把锁:一把是读(操作)锁,是一种共享锁;另一把是写(操作)锁,是一种独占锁
在没有写锁的时候,读锁可以被多个线程同时持有,写锁被一个线程持有,其他的线程不能再持有写锁,抢占写锁会阻塞,抢占读锁也会阻塞
读写互斥原则:读读相容,读写互斥,写写互斥
解决线程安全问题使用ReentrantLock就可以,但是ReentrantLock是独占锁,某时只有一个线程可以获取该锁
而实际中会有写少读多的场景,因此就需要读写锁ReentrantReadWriteLock
// JUC包中的读写锁接口为ReadWriteLock
public interface ReadWriteLock {
Lock readLock();//返回读锁
Lock writeLock();//返回写锁
}
AQS中只维护了 一个state状态,而ReentrantReadWriteLock则需要维护读状态和写状态
所以ReentrantReadWriteLock用state的高16位表示读状态,也就是获取到读锁的次数;使用低16位表示获取到写锁的线程的可重入次数,这样就可以用一个state变量,表示两种状态
static final int SHARED_SHIFT = 16;
//共享锁(读锁)状态单位值 65536 1<<16 2^16
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
//共享锁线程最大个数 65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
//排它锁(写锁)掩码,二进制,15 个 1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
//返回读锁线程数
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
//返回写锁可重入个数
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
//firstReader 用来记录第一个获取到读锁的线程
private transient Thread firstReader = null;
//firstReaderHoldCount 则记录第 一个获取到读锁的线程获取读锁的可重入次数
private transient int firstReaderHoldCount;
//cachedHoldCounter 用来记录最后 一个获取读锁的线程获取读锁 的可重入次数
private transient HoldCounter cachedHoldCounter;
/*
readHolds是ThreadLocal变量
用来存放除去第一个获取读锁线程外的其他线程获取读锁的可重入次数。
ThreadLocaHoldCounter 继承了ThreadLocal,返回 一个 HoldCounter 对象
*/
private transient ThreadLocalHoldCounter readHolds;
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
2:写锁的获取和释放
ReentrantReadWriteLock中写锁使用WriteLock来实现
- 写锁是可重入锁
- 如果当前没有线程获取到读锁和写锁,则当前线程可以获取到写锁然后返回
- 如果当前己经有线程获取到读锁和写锁,则当前请求写锁的线程会被阻塞挂起
void lock() -> 获取写锁,不会对中断相应
public static class WriteLock implements Lock, java.io.Serializable {
public void lock() {
sync.acquire(1);
}
}
//AbstractQueuedSynchronizer类
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//Sync类
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
//c!=0说明读锁或者写锁已经被某线程获取
if (c != 0) { // 如果当前 AQS 状态值不为 0 则说明 当前己经有线程获取到了读锁或者写锁
// (Note: if c != 0 and w == 0 then shared count != 0)
// w=O说明已经有线程获取了读锁
// w!=O 并且当前线程不是写锁拥有者则返回false
if (w == 0 || current != getExclusiveOwnerThread())// getExclusiveOwnerThread判断自己是不是写锁拥有者
return false;
// 说明当前线程获取了写锁,判断可重入次数
if (w + exclusiveCount(acquires) > MAX_COUNT)// 判断该线程的可重入次数是不是超过了最大值
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);//设置可重入次数
return true;
}
//第一个写线程获取写锁
// AQS 的状态值等于 0则说明目前没有线程获取到读锁和写锁,让第一个写线程获取写锁,尝试cas
// 对于非公平锁来说总是返回false, 因此会进行CAS尝试获取写锁
// 获取成功则设置当前锁的持有者为当前线程并返回true ,否则返回false
if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
return false;
// 设置为自己已经成功的持有写锁
setExclusiveOwnerThread(current);
return true;
}
// NonfairSync类
// writerShouldBlock方法发非公平锁实现,也是就是所非公平锁这部分始终返回false
final boolean writerShouldBlock() {
return false; // writers can always barge
}
//FairSync类
// writerShouldBlock方法发公平锁实现
// 公平锁下还是hasQueuedPredecessors方法判断有没有前驱结点,因为公平锁下要求FIFO
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
void lockInterruptibly() -> 加写锁,并会对中断进行相应
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
类似于lock()方法,它的不同之处在于,它会对中断进行响应,也就是当其他线程调用了该线程的interrupt方法中断了当前线程时, 当前线程会抛出异常
boolean tryLock() -> 尝试获取写锁非公平实现
//WriteLock类
public boolean tryLock( ) {
return sync.tryWriteLock();
}
//Sync类
final boolean tryWriteLock() {
// 拿到当前线程
Thread current = Thread.currentThread();
// 拿到state
int c = getState();
// 有人持有锁资源,判断持锁人是不是自己,不是自己返回false
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
// 在当前的state的基础 + 1
if (!compareAndSetState(c, c + 1))
return false;
// 标记为自己持有锁
setExclusiveOwnerThread(current);
return true;
}
- 如果当前没有其他线程持有写锁或者读锁,则当前线程获取写锁会成功 ,然后返回true 。
- 如果当前己经有其他线程持有写锁或者读锁则该方法直接返回 false,且当前线程并不会被阻塞
- 如果当前线程已经持有了该写锁则 简单增加 AQS 的状态值后直接返回true 。
void unlock() -> 尝试释放锁
//WriteLock类
public void unlock() {
sync.release(1);
}
//AQS类
public final boolean release(int arg) {
//调用ReentrantReadWriteLock中sync类实现的tryRelease方法
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)//激活阻塞队列里面的一个线程
unparkSuccessor(h);
return true;
}
return false;
}
//Sync类
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())//看是否是写锁拥有者调用的unlock
throw new IllegalMonitorStateException();
//获取可重入值,这里没有考虑高16位,因为获取写锁时读锁状态值肯定为0
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)//如采写锁可重入值为0则释放锁,否则只是简单地更新状态值
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
- 如果当前线程持有该锁,调用该方法会让该线程对该线程持有的 AQS状态值减1
- 如果减去1后当前状态值为0则当前线程会释放该锁 ,否则仅仅减1而己
- 如果当前线程没有持有该锁而调用了该方法则 会抛出异常
3:读锁的获取和释放
ReentrantReadWriteLock 中的读锁是使用 ReadLock 来实现的
void lock()
//ReadLock类
public void lock() {
// 看见这个shared就是共享锁那一套,类比一下信号量Semaphore
sync.acquireShared(1);
}
//AQS类
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
//java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryAcquireShared(int)
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();//获取当前状态值 ---------------- 1
// 是否有其他线程获取到了写锁,如果是则直接返回-1 ----------- 2
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)//判断是否写锁被占用
return -1;
int r = sharedCount(c);//获取读锁计数 -------------- 3
//尝试获取锁 ,多个读线程只有一个会成功,不成功的进入fullTryAcquireShared进行重试 ---------- 4
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
// 第一个线程获取读锁 -------------------- 5
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
//如果当前线程是第一个获取读锁的线程 --------------------- 6
firstReaderHoldCount++;
} else {
//记录最后一个获取读锁的线程或记录其他线程读锁的可重入数
HoldCounter rh = cachedHoldCounter; // 记录最后一个获取到读锁的线程和该线程获取读锁的可重入数 ------------- 7
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);// 有线程正在获取写锁 ---------- 8
}
boolean tryLock() 尝试获取读锁
public boolean tryLock() {
return sync.tryReadLock();
}
final boolean tryReadLock() {
// 拿到当前线程
Thread current = Thread.currentThread();
// 无限循环
for (;;) {
int c = getState(); // 拿到state
// 当前己经有其他线程持有 写锁则该方法直接返回 false,但当前线程并不会被阻塞
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c); // 拿到读锁的个数 r
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 尝试通过自旋拿到读锁
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
// 第一个线程获取读锁
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 如果当前的线程是第一个获取读锁的线程
firstReaderHoldCount++;
} else {
// 记录最后一个获取读锁的线程或记录其他线程读锁的可重入数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
- 如果当前没有其他线程持有写锁,则当前线程获取读锁会成功,然后返回 true
- 如果当前己经有其他线程持有 写锁则该方法直接返回 false,但当前线程并不会被阻塞
- 如果当前线程己经持有了该读锁则简单增加 AQS 的状态值高 16 位后直接返回 true
unlock() -> 释放读锁
//ReadLock类
public void unlock() {
sync.releaseShared(1);
}
//AbstractQueuedSynchronizer.releaseShared(int)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared(); // 释放一个由于获取写锁而被阻塞的线程
return true;
}
return false;
}
//ReentrantReadWriteLock.Sync.tryReleaseShared(int)
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState(); // 首先获取当前 AQS 状态值并将其保存到变量c
int nextc = c - SHARED_UNIT; // 变量c被减去一个读计数单位
if (compareAndSetState(c, nextc)) // 使用CAS操作更新AQS状态值
// 如果更新成功则查看当前 AQS 状态值是否为 0(nextc == 0)
// 为 0 则说明当前己经没有读线程占用读锁,则tryReleaseShared 返回 true
// 然后会调用 doReleaseShared 方法释放一个由于获取写锁而被阻塞的线程
return nextc == 0;
}
}
4:实例演示
package com.cui.commonboot.myjuc;
import java.util.ArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* <p>
* 功能描述:
* </p>
*
* @author cui haida
* @date 2023/12/30/9:45
*/
public class Test09 {
// 线程不安全的list
private static ArrayList<String> array = new ArrayList<>();
// 声明一个可重入的读写锁
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// 再次基础上创建读锁和写锁
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
// add
public void add(String e) {
writeLock.lock();
try {
array.add(e);
} finally {
writeLock.unlock();
}
}
// remove
public void remove(String e) {
writeLock.lock();
try {
array.remove(e);
} finally {
writeLock.unlock();
}
}
// get
public String get(int index) {
readLock.lock();
try {
return array.get(index);
} finally {
readLock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
Test09 obj = new Test09();
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for (int j = 0; j < 5; j++)
obj.add("str" + j);
}).start();
}
Thread.sleep(1000);
System.out.println(obj.array);
}
}