当前位置: 首页 > article >正文

Java JUC(三) AQS与同步工具详解

Java JUC(三) AQS与同步工具详解

一. ReentrantLock 概述

ReentrantLockjava.util.concurrent.locks 包下的一个同步工具类,它实现了 Lock 接口,提供了一种相比synchronized关键字更灵活的锁机制。ReentrantLock 是一种独占式且可重入的锁,并且支持可中断、公平锁/非公平锁、超时等待、条件变量等高级特性。其特点如下:

  • 独占式: 一把锁在同一时间只能被一个线程所获取;
  • 可重入: 可重入意味着同一个线程如果已持有某个锁,则可以继续多次获得该锁(注意释放同样次之后才算完全释放成功);
  • 可中断: 在线程获取锁的等待过程中可以中断获取,放弃等待而转去执行其他逻辑;
  • 公平性: ReentrantLock支持公平锁和非公平锁(默认)两种模式。其中,公平锁会按照线程请求锁的顺序来分配锁(降低性能),而非公平锁允许线程抢占已等待的线程的锁(可能存在饥饿现象);
  • 条件变量: 通过 Condition接口的实现,允许线程在某些条件下等待或唤醒,即可以实现选择性通知;
TypeMethodDescription
/ReentrantLock()无参构造方法,默认为非公平锁
/ReentrantLock(boolean fair)带参构造方法,其中fair表示锁的公平性策略:
- true: 公平锁
- false: 非公平锁
voidlock()不可中断式获取锁。若当前锁已被其他线程持有,则阻塞等待;**注意:**该获锁过程不可被中断
voidlockInterruptibly() throws InterruptedException可中断式获取锁。若当前锁已被其他线程持有,则阻塞等待;**注意:**该获锁等待过程可被中断,抛出InterruptedException,并清除当前线程的中断状态
booleantryLock()尝试获取锁,该方法会立即返回。若获锁成功,则返回true,否则将返回false;**注意:**该方法会破坏公平锁配置,即在公平锁策略下,该方法也会立即尝试获取可用锁
booleantryLock(long timeout,TimeUnit unit) throws InterruptedException在给定时间timeout内尝试获取锁。若获锁成功,则返回true,否则将阻塞等待直到timeout过期,返回false注意:
- 获锁等待过程可被中断,抛出InterruptedException,并清除当前线程的中断状态
- 遵循公平锁配置策略,即在公平锁策略下,该方法会按顺序等待获取锁
voidunlock()当前线程尝试释放该锁。若当前线程未持有该锁,则抛出IllegalMonitorStateException异常
ConditionnewCondition()返回一个与当前Lock实例绑定的条件变量集合对象(默认返回AQS内部实现类ConditionObject),用于实现线程的条件等待/唤醒(详见后文)

1. 锁的基本使用

相比synchronized关键字来说,ReentrantLock属于显式锁,其锁机制都是针对Lock实例对象本身进行加锁,并且在使用过程中需要手动释放,即锁的获取与释放是成对出现的;除此之外,ReentrantLock属于JDK API层面实现的互斥锁,其通过方法调用实现锁功能,可以跨方法从而更加灵活。为了避免出现死锁问题,官方建议的开发方式如下:

// new lock object
ReentrantLock lock = new ReentrantLock();
lock.lock();  // block until condition holds
try {
	// ... method body
} finally {
	lock.unlock(); // release
}

问题背景: 假设当前有一个卖票系统,一共有100张票,有4个窗口同时售卖,请模拟该卖票过程,注意保证出票的正确性。

public class Test_01 {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        for (int i = 0; i < 4; i++){
            new Thread(new TicketSeller(lock), "Thread_" + i).start();
        }
    }
}
// 售票类实现
class TicketSeller implements Runnable{
    private static int ticketCount = 100;

    private ReentrantLock windowLock;

    public TicketSeller(ReentrantLock lock) {
        this.windowLock = lock;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(10);
            }catch(InterruptedException e){
                e.printStackTrace();
            }
            windowLock.lock();
            try{
                if(ticketCount > 0){
                    System.out.println(Thread.currentThread().getName() + ": sale and left " + --ticketCount);
                }else{
                    System.out.println(Thread.currentThread().getName() + ": sold out...");
                    break;
                }
            }finally {
                windowLock.unlock();
            }
        }
    }
}

2. 条件变量机制

synchronized关键字中,我们可以通过wait&notify实现线程的等待与唤醒,但在此场景下存在虚假唤醒问题,根本原因就是其等待/唤醒机制只支持单条件(等待线程未作区分,只能全部唤醒)。相比之下,ReentrantLock基于Condition接口也实现了相同的机制,并提供了更细的粒度和更高级的功能;每个Condition实例都对应一个条件队列,用于维护在该条件场景下等待通知的线程,并且ReentrantLock支持多条件变量,即一个ReentrantLock可以关联多个Condition实例。其常用方法如下:

TypeMethodDescription
voidawait() throws InterruptedException使当前线程阻塞等待(进入该条件队列),并释放与此条件变量所关联的锁。注意: 若在等待期间被中断,则抛出InterruptedException,并清除当前线程中断状态
booleanawait(long time,TimeUnit unit) throws InterruptedException使当前线程阻塞等待,并释放与此条件变量所关联的锁,直到被唤醒、被中断或等待time时间过期。其返回值表示:
- true: 在等待时间之内,条件被唤醒;
- false: 等待时间过期,条件未被唤醒;
注意: 若在等待期间被中断,则抛出InterruptedException,并清除当前线程中断状态
voidsignal()唤醒一个等待在Condition上的线程。如果有多个线程在此条件变量下等待,则选择任意一个线程唤醒;注意: 从等待方法返回前必须重新获得Condition相关联的锁
voidsignalAll()唤醒所有等待在Condition上的线程。如果有多个线程在此条件变量下等待,则全部唤醒 ;注意: 从等待方法返回前必须重新获得Condition相关联的锁

在使用时需要注意

  • 调用await相关方法前需要先获得对应条件变量所关联的锁,否则会抛出IllegalMonitorStateException异常;
  • 调用signal相关方法前需要先获得对应条件变量所关联的锁,否则会抛出IllegalMonitorStateException异常;
  • await线程被唤醒(或等待时间过期、被中断)后会重新参与锁的竞争,若成功拿到锁则将从await处恢复继续向下执行;
/**
 * 场景模拟:奶茶店和咖啡店共用一个窗口(window)出餐,等待顾客点单...
 *  - 奶茶店(teaWithMilk):顾客需要奶茶,则奶茶店开始工作;
 *  - 咖啡店(coffee):顾客需要咖啡,则咖啡店开始工作;
 */
public class Test {
    // 窗口锁(ReentrantLock实现)
    static final ReentrantLock window = new ReentrantLock();
    // 奶茶点单条件变量
    static Condition teaWithMilk = window.newCondition();
    // 咖啡点单条件变量
    static Condition coffee = window.newCondition();
    public static void main(String[] args) throws InterruptedException {
        // 奶茶店监控线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    window.lock();
                    try {
                        System.out.println("[奶茶店] 等待接单...");
                        try {
                            teaWithMilk.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("[奶茶店] 接到订单...");
                    } finally {
                        window.unlock();
                    }
                    System.out.println("[奶茶店] 开始工作...");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("[奶茶店] 工作完成...");
                }
            }
        }).start();
        Thread.sleep(1000);
        // 咖啡店监控线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    window.lock();
                    try {
                        System.out.println("[咖啡店] 等待接单...");
                        try {
                            coffee.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("[咖啡店] 接到订单...");
                    } finally {
                        window.unlock();
                    }
                    System.out.println("[咖啡店] 开始工作...");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("[咖啡店] 工作完成...");
                }
            }
        }).start();
        Thread.sleep(1000);
        // 顾客点单线程: main
        window.lock();
        try{
            System.out.println("[顾客] 点了咖啡!!");
            coffee.signal(); // 唤醒咖啡条件等待线程
        }finally {
            window.unlock();
        }
    }
}

二. 从 ReentrantLock 分析 AQS 的原理

1. AQS 框架

AQS 全称为 AbstractQueuedSynchronizer ,即抽象队列同步器;AQSjava.util.concurrent.locks 包下的一个抽象类,其为构建锁和同步器提供了一系列通用模板与框架的实现,大部分JUC包下的并发工具都是基于AQS来构建的,比如ReentrantLockSemaphoreCountDownLatch等。其核心源码如下:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    // 同步队列的节点
    static final class Node {...}
    // 指向同步队列头部
    private transient volatile Node head;
    // 指向同步队列尾部
    private transient volatile Node tail;
    // 同步状态
    private volatile int state;
    
    // 提供一系列并发、同步队列的基本操作方法
    // 比如: 挂起、取消、节点插入、节点替换等...

    // 交由子类实现的模板方法(钩子方法): 自定义同步器的核心实现目标
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }


    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }


    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }


    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }
    
    // 条件变量 Condition 接口的内部实现类
    public class ConditionObject implements Condition {...}

}

由上可知,AQS内部实现了一个核心内部类Node,该内部类表示对等待获取锁的线程的封装节点;在AQS中,基于Node维护了一个双向链表(模拟同步队列),其中head节点指向同步队列的头部,而tail节点指向同步队列的尾部。Node类的核心源码如下:

	static final class Node {
        // 共享模式(共享锁、比如Semaphore)
        static final Node SHARED = new Node();
        // 独占模式(独占锁、比如ReentrantLock)
        static final Node EXCLUSIVE = null;

        // 标识节点线程获取锁的请求已取消、已结束
        static final int CANCELLED =  1;
        // 标识节点线程已准备就绪,等待被唤醒获取资源
        static final int SIGNAL    = -1;
        // 标识节点线程在条件变量Condition中等待
        static final int CONDITION = -2;
        // 在共享模式下启用: 标识获得的同步状态会被传播
        static final int PROPAGATE = -3;

        /**
         * waitStatus 标识节点线程在同步队列中的状态,共存在以下几种情况:
         * (1)SIGNAL: 被标记为SIGNAL的节点处于等待唤醒获取资源的状态,只要前驱节点释放锁就会通知该状态的后续节点线程执行
         * (2)CANCELLED: 在同步队列中等待超时、被中断的线程会进入取消状态,不再响应并会在遍历过程中被移除
         * (3)CONDITION: 标识当前节点线程在Condition下等待,被唤醒后将重新从等待队列转移到同步队列
         * (4)PROPAGATE: 与共享模式有关 
         * (5)0: 默认初始值状态,代表节点初始化
         */
        volatile int waitStatus;

        // 同步队列中的前驱节点
        volatile Node prev;

        // 同步队列中的后继节点
        volatile Node next;

        // 等待获取锁资源的线程
        volatile Thread thread;

        // Condition 等待队列中的后继节点(单向链表)
        Node nextWaiter;

        // 判断是否为共享模式
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        // 获取当前节点在同步队列中的前驱节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        // 省略构造方法...
    }

在了解Node的基本数据结构与状态之后,AQS还有一个核心状态变量即state,该全局变量用于表示同步锁的状态,其具体含义一般在子类实现中进行定义和维护(独占锁和共享锁不一样)。但不管独占模式还是共享模式,简单来说都是使用一个volatile的全局变量来表示资源同步状态(state),并通过CAS完成对state值的修改(修改成功则表示获锁成功),当持有锁的线程数量超过当前模式(独占模式一般限制为1)时,则通过内置的FIFO同步队列来完成资源获取线程的排队工作(通过LockSupport park/unpark方法实现挂起与唤醒)。其核心原理图如下:

在这里插入图片描述

综上,AQS采用了模板方法模式来构建同步框架,并提供了一系列并发操作的公共基础方法,支持共享模式和独占模式两种实现;但AQS并不负责对外提供具体的加锁/解锁逻辑,因为锁是千变万化的,AQS只关注基础组件、顶层模板这些总的概念,具体的锁逻辑将通过”钩子“的方式下放给子类实现。也就是说,独占模式只需要实现tryAcquire-tryRelease方法、共享模式只需要实现tryAcquireShared-tryReleaseShared方法,搭配AQS提供的框架和基础组件就能轻松实现自定义的同步工具。

2. ReentrantLock 源码分析

在这里插入图片描述

ReentrantLock的类结构图可以看出,ReentrantLock实现了Lock接口,其内部包含一个内部类Sync,该内部类继承了AQSAbstractQueuedSynchronizer),ReentrantLock大部分的锁操作都是通过Sync实现的。除此之外,ReentrantLock有公平锁和非公平锁两种模式,分别对应SyncFairSyncNonfairSync两个子类实现。

public class ReentrantLock implements Lock, java.io.Serializable {
    
    private final Sync sync;
    // 默认构造函数: 默认创建非公平锁
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    // 带参构造函数: true公平锁/false非公平锁
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    // 加锁操作
    public void lock() {
        sync.lock();
    }
    
    //...
}

接下来,本节将首先以ReentrantLock的非公平锁为例进行分析,然后再介绍公平锁与非公平锁的主要区别。

2.1 非公平锁lock加锁原理

非公平锁NonfairSync的源码如下:

    static final class NonfairSync extends Sync {
        // 加锁操作
        final void lock() {
            // CAS修改state状态以获取锁资源
            if (compareAndSetState(0, 1))
                // 成功则将独占锁线程设置为当前线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 否则再次请求同步状态(AQS的模板方法)
                acquire(1);
        }
        // AQS 获锁的钩子方法实现
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

在调用lock方法获取锁的过程中,当前线程会先通过CAS操作尝试修改state0(表示无锁)到1(表示占有锁),若修改成功则将AQS中保存的独占线程exclusiveOwnerThread修改为当前线程;若失败则执行acquire(1)方法,该方法是AQS中的一个模板方法,其源码如下:

    public final void acquire(int arg) {
        // tryAcquire -> addWaiter -> acquireQueued
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

可以看到该方法首先调用了钩子方法tryAcquire,该方法是交由子类NonfairSync实现的,在上面的源码中我们已经给出了tryAcquire的实现代码,其直接调用了父类Sync中的nonfairTryAcquire方法,其源码如下:

abstract static class Sync extends AbstractQueuedSynchronizer {

	abstract void lock();
    // nonfairTryAcquire 方法实现
	final boolean nonfairTryAcquire(int acquires) {
        // 获取当前线程及同步队列状态state
		final Thread current = Thread.currentThread();
		int c = getState();
        // 若状态为0表示锁已释放: 重新尝试获取锁
		if (c == 0) {
            // CAS尝试修改state的值
			if (compareAndSetState(0, acquires)) {
                // 若成功则设置独占线程为当前线程
				setExclusiveOwnerThread(current);
				return true;
			}
		}
        // 若独占线程即当前线程,则属于重入锁
		else if (current == getExclusiveOwnerThread()) {
            // 修改state为重入值
			int nextc = c + acquires;
			if (nextc < 0) // overflow
				throw new Error("Maximum lock count exceeded");
			setState(nextc);
			return true;
		}
		return false;
	}
    
    //省略代码...
}

由上述代码可知,该方法首先再次判断锁是否已释放,这是为了避免之前持锁的线程在这段时间内又重新释放了锁,若state==0则会尝试再次CAS修改同步状态以获取锁资源;否则的话,则判断当前线程是否是锁重入的情况,若两个判断都不满足则返回false;到目前为止,我们回过头来想一下非公平锁的非公平性是在哪体现的?

很明显,在上述代码分析中,当有任何线程尝试获取锁时(调用lock方法),不论当前同步队列中是否已有线程排队等待,NonfairSynclock()方法以及SyncnonfairTryAcquire()方法都没有对同步队列中的等待情况进行判断,而是直接通过CAS尝试修改state的值来为当前线程直接占有锁;这就是非公平性的体现,抢占线程可以直接与等待线程竞争锁资源,而不用按照顺序加入队列

分析完这部分之后,我们再回到acquire(1)方法,若tryAcquire(arg)方法返回false即获取不到锁时会继续向下执行到addWaiter(Node.EXCLUSIVE)方法,该方法用于封装线程入队,其源码如下:

    private Node addWaiter(Node mode) {
        // 将请求占锁失败的线程封装为Node节点
        Node node = new Node(Thread.currentThread(), mode);
        
        Node pred = tail;
        // 若同步队列不为空,则尝试CAS在尾部插入当前节点(FIFO)
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 若队列为空或CAS插入节点失败则执行enq()方法处理入队
        enq(node);
        return node;
    }

接着,我们继续分析enq(node)方法的实现:

    private Node enq(final Node node) {
        // 开启自旋(循环)
        for (;;) {
            Node t = tail;
            // 若队列为空
            if (t == null) { // Must initialize
                // 则尝试CAS创建头节点(不存储数据)
                // 原因: 队列为空可能是因为其他线程非公平占有了锁(当前线程试过没抢到),因此这里需要先斩后奏,即再次创建头节点表示已占锁线程的占位,来维护同步队列
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 否则尝试CAS添加尾节点
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
        // 若CAS失败(可能有多线程并发操作),则不断自旋重试直到插入成功
    }

可以看到头节点head其实是不存储数据的,它只表示一个线程占位(占位了锁资源),因为位于头节点的线程肯定已经获取到了锁,头节点只存储后继节点指向,用于当前线程释放锁资源时唤醒后继节点;那么到此这个方法也就分析完成了,在节点入队成功之后会返回当前节点node,然后会继续执行到acquireQueued(addWaiter(Node.EXCLUSIVE),arg)方法,其源码如下:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false; //阻塞挂起标志
            // 开启自旋(循环)
            for (;;) {
                // 获取当前节点的前驱节点p
                final Node p = node.predecessor();
                // 若p是头节点则当前节点尝试获取锁资源
                if (p == head && tryAcquire(arg)) {
                    // 占锁成功则设置当前节点node为头节点: 头节点状态保持SIGNAL状态
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 若p不是头节点或获取锁资源失败,则判断是否阻塞挂起线程来等待
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 若最终无法获取锁,则取消该线程的请求
            if (failed)
                cancelAcquire(node);
        }
    }
	// 将传递的节点设置为同步队列的头节点
    private void setHead(Node node) {
        head = node;
        // 清空当前节点存储的线程信息
        node.thread = null;
        node.prev = null;
    }

acquireQueued方法中,线程会开启自旋,若发现(或被唤醒后发现)当前节点的前驱节点变为头节点,则说明当前节点能够尝试获取锁资源,并尝试通过tryAcquire方法获取同步状态;需要注意的是,head头节点表示当前占有锁的线程节点,只有当head节点对应的线程释放锁资源并唤醒后继节点时,后继节点线程才会自旋去尝试占有锁资源,因此:在同步队列中,只有前驱节点变为头节点时,当前节点才有资格尝试获取锁资源,其他时候都将被挂起等待,避免空转CPU

除此之外,若在自旋过程中,当前节点的前驱节点不是头节点或者节点尝试tryAcquire获取锁资源失败,则会执行shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt()逻辑;需要注意的是在前驱节点是头节点但尝试获取锁资源失败这种特殊情况发生时(比如非公平锁模式下被新到来的请求线程抢占),头节点head此时可能有两种状态:

  • waitStatus==0: 处于该状态一种情况是初始同步队列为空时,默认头节点状态初始化为0;另一种情况是锁释放时(见后文)被unparkSuccessor重置头节点状态;

  • waitStatus==SIGNAL: 处于该状态一种情况是waitStatus==0时更新状态后又自旋回来但仍未获取到锁(可能释放锁后被非公平抢占);另一种情况是释放锁时unparkSuccessor重置失败;

其源码如下:

	// 判断节点线程是否挂起等待
	private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取前驱节点的等待状态
        int ws = pred.waitStatus;
        // 若是SIGNAL状态,则说明前驱节点就绪,当前节点正常需要继续等待即返回true
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        // 若等待状态>0则说明前驱节点是结束状态,需要遍历前驱节点直到找到非结束状态的有效节点
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {// 若等待状态<=0且非SIGNAL,则尝试将前驱节点设置为SIGNAL
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        // false 则返回去继续自旋
        return false;
    }
    // 执行挂起阻塞操作 LockSupport.park
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

shouldParkAfterFailedAcquire方法的执行逻辑是判断前驱节点的等待状态waitStatus,用于挂起当前节点线程等待,结合上述分析,包括以下几种情况:

  • 前驱节点状态waitStatus==SIGNAL: 若前驱节点是头节点,则说明占锁线程还未执行结束,当前节点线程仍需挂起等待;若前驱节点不是头节点,则说明前驱节点就绪/拥有更高的优先级,下次执行还轮不到当前节点,所以也可以安全挂起,直接返回true
  • 前驱节点状态waitStatus>0: 说明前驱节点已处于结束/取消状态,应该从同步队列中移除,并遍历所有前驱节点直到找到非结束状态的有效节点作为前驱;
  • 前驱节点状态waitStatus<0且非SIGNAL: 前驱节点刚从Condition的条件等待队列被唤醒,从而转移到同步队列,需要转换为SIGNAL状态等待;
  • 前驱节点状态waitStatus==0:若前驱节点是头节点,则说明同步队列刚初始化(0)或锁刚被释放重置,锁资源可能未被其他线程持有,需判断能否占有锁(不管当前线程能否占有,该锁一定会被占有,都需要转换状态为SIGNAL);若前驱节点不是头节点,则说明该线程节点刚初始化并被插入队列,需要转换为SIGNAL状态;

综上,当shouldParkAfterFailedAcquire()方法返回true时会调用parkAndCheckInterrupt方法挂起线程等待被唤醒,返回false时则会继续自旋判断;至此,ReetrantLock内部间接依靠AQSFIFO同步队列,就完成了lock()加锁操作。

2.2 公平锁lock加锁原理

公平锁FairSync的源码如下:

    static final class FairSync extends Sync {
        // 加锁操作
        final void lock() {
            acquire(1);
        }

        // AQS 获锁的钩子方法实现
        protected final boolean tryAcquire(int acquires) {
            // 获取当前线程
            final Thread current = Thread.currentThread();
            // 获取同步状态
            int c = getState();
            // 若当前没有线程持有锁资源
            if (c == 0) {
                // 首先判断同步队列是否存在等待节点
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 锁重入
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

与非公平锁唯一不同的是:公平锁的tryAcquire实现中,在尝试修改state之前,会先调用hasQueuedPredecessors()判断AQS内部同步队列中是否已存在等待节点。如果存在,则说明在此之前,已经有线程提交了获取锁的请求,那么当前线程就会直接被封装成Node节点,追加到队尾等待。

2.3 释放锁原理

ReetrantLock显式锁需要手动释放锁资源,其unlock()方法直接调用了Sync中的release(1)方法,而该方法又是在其父类AQS中直接实现的,其源码如下:

    public final boolean release(int arg) {
        // 尝试释放锁
        if (tryRelease(arg)) {
            // 进入该代码块则说明锁已完全释放(state=0)
            // 获取头节点
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 唤醒head头节点的后继节点线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
	// 唤醒node的后继节点线程
    private void unparkSuccessor(Node node) {
        // 获取节点状态
        int ws = node.waitStatus;
        if (ws < 0) // 重置节点状态,允许失败
            compareAndSetWaitStatus(node, ws, 0);

        // 获取后继节点
        Node s = node.next;
        // 若后继节点为空或已结束
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 寻找后继可被唤醒的有效等待节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 执行线程唤醒(继续去自旋) LockSupport.unpark
        if (s != null)
            LockSupport.unpark(s.thread);
    }

release方法能否释放锁并唤醒后继节点线程依赖于tryRelease钩子方法,而该方法又下放到了Sync中实现,其源码如下:

       // ReentrantLock -> Sync -> tryRelease(1)
		protected final boolean tryRelease(int releases) {
            // 计算释放锁后的同步更新状态
            int c = getState() - releases;
            // 如果当前释放锁的线程不为持有锁的线程则抛出异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // 判断更新状态是否为0,如果是则说明已完全释放锁
            if (c == 0) {
                free = true;
                // 完全释放锁才清空当前占有线程
                setExclusiveOwnerThread(null);
            }
            // 更新state值
            setState(c);
            // 完全释放锁才返回true
            return free;
        }

注意:tryRelease方法执行完成返回true之后,就说明当前线程持有的锁已被释放(非公平锁中就已经可以被抢占了),后续unparkSuccessor方法只是进行一些善后工作,其中重置头节点状态的目的是表示逻辑上从持锁到无锁的转换,锁资源目前可能并没有线程持有,因此在后续线程唤醒后执行acquireQueued自旋时waitStatus==0状态会再一次判断并尝试获取锁,而修改为SIGNAL就表示占锁线程正在执行,其他线程需要挂起等待。至此,整个流程可以结合起来理解:s节点的线程被唤醒后,会继续执行acquireQueued()方法中的自旋,判断if (p == head && tryAcquire(arg))代码是否成立,从而执行判断操作。

三. 其他同步工具类

1. Semaphore

1.1 基本概述

Semaphorejava.util.concurrent包下的一种计数信号量,它同样也是基于AQS实现的同步工具类。相比ReentrantLock来说,它应该属于共享锁,即允许多个线程同时访问某个共享资源,但会限制同时访问特定资源的线程数量;Semaphore同样也支持公平模式和非公平模式两种方式,其构造方法如下:

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Semaphore默认为非公平模式(抢占式),在构造信号量对象时都必须提供permits参数;permits可以理解为许可证数量,只有拿到许可证的线程才能执行,该参数限制了能同时获取或访问到共享资源的线程数量,其他超出线程都将阻塞等待。基于此,Semaphore通常用于实现资源有明确访问数量限制的场景,比如限流、池化等。Semaphore的常用方法介绍如下:

TypeMethodDescription
voidacquire() throws InterruptedException当前线程请求获取该信号量的一个许可证。若有可用许可证,则获得许可证并返回执行同步代码,同时可用许可证数量将减少一个;若没有可用许可证,则阻塞等待直到有许可被释放,或线程中断。注意: 若当前线程在等待过程中被中断,则会抛出InterruptedException,并清除当前线程的中断状态。
voidacquire(int permits) throws InterruptedException当前线程请求获取该信号量的permits个许可证。若有可用数量的许可证,则获得许可证并返回执行同步代码,同时可用许可证数量将减少permits个;若没有可用数量的许可证,则阻塞等待直到可用许可达到指定数量,或线程中断。**注意: **若当前线程在等待过程中被中断,则会抛出InterruptedException,并清除当前线程的中断状态。
voidrelease()释放该信号量的一个许可证,并使可用许可证数量增加一个。注意: 没有通过acquire()获取许可的线程甚至也可以直接调用release()来为信号量增加许可证数量,并且可用许可有可能会超出构造时限制的permits值,因此信号量的正确使用必须是通过应用程序中的编程约束来建立。
voidrelease(int permits)释放该信号量的permits个许可证,并使可用许可证数量增加permits个。注意:release()方法,信号量的正确使用必须是通过应用程序中的编程约束来建立。
booleantryAcquire()尝试获取该信号量的一个许可证,但该方法会立即返回。若有可用许可证,则获得许可证并返回true,同时可用许可证数量将减少一个;若没有可用许可证,则返回false注意: 该方法会破坏公平策略,对该方法的调用会进行抢占式获取(不管是否有线程在等待)。
booleantryAcquire(long timeout, TimeUnit unit) throws InterruptedException尝试在指定时间timeout内获取该信号量的一个许可证(遵循公平策略)。若有可用许可证,则获得许可证并返回true,同时可用许可证数量将减少一个;若没有可用许可证,则阻塞等待直到timeout过期,等待时间过期则返回false注意: 若当前线程在等待过程中被中断,则会抛出InterruptedException,并清除当前线程的中断状态。
intavailablePermits()获取当前信号量中的可用许可证数量。

可以看出,许可证是Semaphore的核心概念,Semaphore信号量对许可证的获取是强限制,但对许可证的释放是弱限制的,即请求线程在执行时必须获取acquire到指定数量的许可证,但在释放release时并不会对先前是否获取进行检查,因此可用许可有时可能会超出构造时限制的permits值。换句话说,构造时传入的permits参数只表示信号量的初始许可数量,并且许可证只决定了线程执行的门槛,但并不会对线程作全程限制;当前线程一旦获取到指定数量的许可便开始执行,即使中途释放许可也不会影响后续执行过程,这也就是为什么说信号量的正确使用必须是通过应用程序中的编程约束来建立。举例如下:

public class test {
    public static void main(String[] args) {
        // 初始化许可数量 = 5
        Semaphore semaphore = new Semaphore(5);

        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + ": 等待许可...");
                semaphore.acquire(3); // acquire permits = 3
                System.out.println(Thread.currentThread().getName() + ": 拿到许可,剩余许可 = " + semaphore.availablePermits());
                Thread.sleep(3000);
                semaphore.release(); // release = 1
                System.out.println(Thread.currentThread().getName() + ": 释放许可...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"thread-1").start();

        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + ": 等待许可...");
                semaphore.acquire(3);// acquire permits = 3
                System.out.println(Thread.currentThread().getName() + ": 拿到许可,剩余许可 = " + semaphore.availablePermits());
                Thread.sleep(3000);
                semaphore.release();// release = 1
                System.out.println(Thread.currentThread().getName() + ": 释放许可,剩余许可 = " + semaphore.availablePermits());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"thread-2").start();
    }
}
thread-1: 等待许可...
thread-1: 拿到许可,剩余许可 = 2
thread-2: 等待许可...
thread-1: 释放许可...
thread-2: 拿到许可,剩余许可 = 0
thread-2: 释放许可,剩余许可 = 1

前文说过,Semaphore通常用于实现资源有明确访问数量限制的场景,比如限流、池化等;此处通过Semaphore模拟一个请求限流的场景,其中限制最大并发数为3,实现代码如下:

public class test {
    public static void main(String[] args) {
        // 自定义线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2*2, 8,
                60, TimeUnit.SECONDS,
                new LinkedBlockingDeque<Runnable>(1024),
                new ThreadPoolExecutor.AbortPolicy());
        // 流量控制: 限制最大并发数为3
        final Semaphore semaphore = new Semaphore(3);
        // 模拟10个客户端任务请求
        for(int index = 0;index < 10;index++){
            final int serial = index;
            threadPool.execute(() -> {
                try {
                    // 请求获取许可
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + ": 请求成功!访问编号 = " + serial);
                    // 模拟IO操作
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 释放许可
                    semaphore.release();
                }
            });
        }
        // 等待线程池执行结束关闭: 不再接受新任务提交,对已经提交了的任务不会产生影响
        threadPool.shutdown();
    }
}

1.2 原理分析

Semaphore的大部分方法都是基于内部类Sync实现的,而该类又继承了 AbstractQueuedSynchronizerAQS,并且Sync对应的还有两个子类 NonfairSync(非公平模式实现) 和 FairSync(公平模式实现)。在Semaphore中,AQSstate 被定义为 permits(许可证数量),对象创建时传入的参数permits实际是在对AQS内部的state进行初始化,初始化完成后state代表着当前信号量对象的可用许可数(state>0)。

以非公平模式为例,当线程调用Semaphore.acquire(arg)请求获取许可时,会首先判断remaining = getState() - arg是否大于0,如果是则代表还有满足可用的许可数,并尝试对state进行CAS操作使state=remaining,若CAS成功则代表获取许可成功;否则线程需要封装成Node节点并加入同步队列阻塞等待,直到许可释放被唤醒。

// Semaphore类 -> acquire()方法
public void acquire() throws InterruptedException {
      // Sync类继承AQS,此处直接调用AQS内部的acquireSharedInterruptibly()方法
      sync.acquireSharedInterruptibly(1);
  }

// AbstractQueuedSynchronizer类 -> acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 判断是否出现线程中断信号(标志)
    if (Thread.interrupted())
        throw new InterruptedException();
    // 如果tryAcquireShared(arg)执行结果不小于0,则线程获取同步状态成功
    if (tryAcquireShared(arg) < 0)
        // 未获取成功加入同步队列阻塞等待
        doAcquireSharedInterruptibly(arg);
}
// Semaphore类 -> NofairSync内部类 -> tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
    // 调用了父类Sync中的实现方法
    return nonfairTryAcquireShared(acquires);
}

// Syn类 -> nonfairTryAcquireShared()方法
abstract static class Sync extends AbstractQueuedSynchronizer {
    final int nonfairTryAcquireShared(int acquires) {
         // 开启自旋死循环
         for (;;) {
             int available = getState();
             int remaining = available - acquires;
             // 判断信号量中可用许可数是否已<0或者CAS执行是否成功
             if (remaining < 0 ||
                 compareAndSetState(available, remaining))
                 return remaining;
         }
     }
}

释放逻辑对比获取许可的逻辑相对来说要简单许多,只需要更新state值增加后调用doReleaseShared()方法唤醒后继节点线程即可;需要注意的是,而在共享模式中可能会存在多条线程同时释放许可/锁资源,所以在此处使用了CAS+自旋的方式保证线程安全问题。

2. CountDownLatch

2.1 基本概述

CountDownLatch同样是java.util.concurrent包下的基于AQS实现的同步工具类。类似于SemaphoreCountDownLatch在初始化时也会传入一个参数count来间接赋值给AQSstate,用于表示一个线程计数值;不过CountDownLatch并没有构建公平模式和非公平模式(内部Sync没有子类实现),其构造方法如下:

public CountDownLatch(int count) {
	if (count < 0) throw new IllegalArgumentException("count < 0");
	this.sync = new Sync(count);
}

CountDownLatch的主要作用是等待计数值count归零后,唤醒所有的等待线程。基于该特性,CountDownLatch常被用于控制多线程之间的等待与协作(多线程条件唤醒);相比join来说,CountDownLatch更加灵活且粒度更细,join是以线程执行结束为条件,而CountDownLatch是以方法的主动调用为条件。其常用方法如下:

TypeMethodDescription
voidawait() throws InterruptedException使当前线程阻塞等待,直到计数器count归零或线程被中断。若当前计数已为零,则此方法立即返回。注意: 若在等待过程中被中断,则会抛出InterruptedException,并清除当前线程的中断状态。
voidcountDown()使当前计数器count递减。如果新计数归零,则唤醒所有await()等待线程;注意: 若当前计数已为零,则无事发生。
longgetCount()获取当前计数器count的值。

需要注意的是CountDownLatch一次性的,即计数器的值count只能在构造方法中初始化,此外再没有任何设置值的方法,当 CountDownLatch 使用完毕后(计数归零)将不能重复被使用;若需要重置计数的版本,可以考虑使用CyclicBarrierCountDownLatch 的常用方法有两种:

  • 多等一:初始化count=1多条线程await()阻塞等待一条线程调用countDown()唤醒所有线程。比如模拟并发安全、死锁等;
  • 一等多:初始化count=N一条线程await()阻塞等待N条线程调用countDown()归零后唤醒。比如多接口调用的数据合并、多操作完成后的数据检查、主服务启动后等待多个组件加载完毕等(注意线程间的通信与数据传递需结合Future实现);
public class test {
    public static void main(String[] args) {
        // 模拟10人拼团活动
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        // 固定数量线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(50);
        // 拼团人员ID集合
        List<String> ids = new ArrayList<>();
        // 模拟30人开始抢单拼团
        for (int i = 0; i < 30; i++) {
            threadPool.execute(() -> {
                boolean orderSucess = false;
                System.out.println(Thread.currentThread().getName() + ": 请求拼团...");
                if (countDownLatch.getCount() > 0) {
                    synchronized (ids) {
                        if (countDownLatch.getCount() > 0) {
                            ids.add(Thread.currentThread().getName());
                            System.out.println(Thread.currentThread().getName() + ": 拼团成功!");
                            countDownLatch.countDown();
                            orderSucess = true;
                        }
                    }
                }
                if (!orderSucess) {
                    System.out.println(Thread.currentThread().getName() + ": 拼团失败!已无名额...");
                }
            });
        }
        // 订单生成线程
        new Thread(() -> {
            try {
                countDownLatch.await();
                System.out.println(Thread.currentThread().getName() + ": 拼团结束, 订单已生成...");
                System.out.println(Thread.currentThread().getName() + ": 拼团人员id = " + ids);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }, "拼团").start();
        // 释放线程池
        threadPool.shutdown();
    }
}

2.2 原理分析

CountDownLatch的底层实现原理也非常简单,当线程调用 await() 的时候,如果 state 不为 0 则证明任务还没有执行结束,await() 就会进入阻塞等待,其源码如下:

// CountDownLatch -> await()
public void await() throws InterruptedException {
    // 调用内部类sync的acquireSharedInterruptibly方法
	sync.acquireSharedInterruptibly(1);
}
// CountDownLatch -> Sync -> acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    // 被中断抛出异常
	if (Thread.interrupted())
		throw new InterruptedException();
    // tryAcquireShared -> 判断是否阻塞等待
	if (tryAcquireShared(arg) < 0)
        // 自旋+阻塞(AQS实现)
		doAcquireSharedInterruptibly(arg);
}
// CountDownLatch -> Sync -> tryAcquireShared
protected int tryAcquireShared(int acquires) {
    // 判断当前state是否归零
	return (getState() == 0) ? 1 : -1;
}

当线程调用 countDown() 时,其实最终是调用了Sync中重写的tryReleaseShared方法,该方法以 CAS 的操作来减少 state;若更新后state 归零,则表示所有的计数任务线程都执行完毕,那么在 CountDownLatch 上等待的线程就会被AQSdoReleaseShared方法唤醒并继续向下执行。

// CountDownLatch -> countDown()
public void countDown() {
	sync.releaseShared(1);
}
// CountDownLatch -> Sync -> AQS -> releaseShared
public final boolean releaseShared(int arg) {
    // 判断递减后计数器是否归零
	if (tryReleaseShared(arg)) {
        // 唤醒所有等待线程(AQS实现)
		doReleaseShared();
		return true;
	}
	return false;
}
// CountDownLatch -> Sync -> tryReleaseShared
protected boolean tryReleaseShared(int releases) {
	// Decrement count; signal when transition to zero
	for (;;) {
        // 获取当前state
		int c = getState();
        // 若计数器归零则返回false,其他什么也不做
		if (c == 0)
			return false;
        // CAS更新state递减
		int nextc = c-1;
		if (compareAndSetState(c, nextc))
            // 若更新成功则判断新计数值是否归零
			return nextc == 0;
	}
}

3. CyclicBarrier

//


http://www.kler.cn/news/331124.html

相关文章:

  • 机器学习【教育领域及其平台搭建】
  • 用好AI告别灵感枯竭!如何用300个选题提示词打造病毒式内容?
  • Python笔记 - 函数、方法和类装饰器
  • react-问卷星项目(4)
  • Django一分钟:DRF模型序列化器处理关联关系的示例与注意事项
  • 高校实训产品:动漫和游戏场景AI实训平台建设方案
  • 《Spring Boot应用进阶:打造优雅的错误处理机制与全局异常拦截器》
  • 如何在 Flutter 中实现可拖动的底部弹出框
  • 滚雪球学MySQL[7.2讲]:MySQL安全策略详解:数据加密与SQL注入防护
  • 网安学习(js漏洞挖掘)
  • 登录功能开发 P167重点
  • 每日一练:零钱兑换
  • ScatterAdd算子实现简介
  • 【Android】【bug】ImageView设置scaleType不生效的问题
  • 毕业设计选题:基于ssm+vue+uniapp的教学辅助小程序
  • 十进制转十六进制 ← Python字符串
  • 【Spring】Spring Boot项目创建和目录介绍
  • 计算机毕业设计 基于Python的无人超市管理系统的设计与实现 Python+Django+Vue 前后端分离 附源码 讲解 文档
  • C#/.NET/.NET Core技术前沿周刊 | 第 7 期(2024年9.23-9.30)
  • Hive数仓操作(十)