JUC并发二
大纲
1.JUC中的Lock接口
2.如何实现具有阻塞或唤醒功能的锁
3.ReentractLock如何获取锁
4.AQS的acquire()方法和release()方法总结
5.ReentractReadWriteLock的基本原理
6.ReentractReadWriteLock如何竞争写锁
7.ReentractReadWriteLock如何竞争读锁
8.ReentractReadWriteLock的公平锁和非公平锁
9.Condition的说明和源码
10.等待多线程完成的CountDownLatch
11.控制并发线程数的Semaphore
12.同步屏障CyclicBarrier
13.ConcurrentHashMap的原理
14.并发安全的数组列表CopyOnWriteArrayList
15.并发安全的链表队列ConcurrentLinkedQueue
16.JUC的各种阻塞队列介绍
17.LinkedBlockingQueue的具体实现原理
18.基于两个队列实现的集群同步机制
19.锁优化总结
20.线程池介绍
21.如何设计一个线程池
22.ThreadPoolExecutor线程池的执行流程
23.如何合理设置线程池参数 + 定制线程池
24.ThreadLocal的设计原理
25.Disruptor的生产者源码
26.Disruptor的消费者源码
27.Disruptor的WaitStrategy等待策略
28.Disruptor的高性能原因
精通JDK集合和JUC并发包,研究过相关集合源码、锁原理、AQS源码、线程池源码等,深入阅读过Disruptor源码,对高并发的处理有一定的理解。
1.JUC中的Lock接口
(1)Lock接口的实现类ReentrantLock
ReentractLock是重入锁,属于排他锁,功能和synchronized类似。但是在实际中,其实比较少会使用ReentrantLock。因为ReentrantLock的实现及性能和syncrhonized差不多,所以一般推荐使用synchronized而不是ReentrantLock。
(2)Lock接口的实现类ReentrantReadWriteLock
ReentrantReadWriteLock是可重入的读写锁,ReentrantReadWriteLock维护了两个锁:一是ReadLock,二是WriteLock。ReadLock和WriteLock都分别实现了Lock接口。
(3)Lock接口的实现类StampedLock
ReentrantReadWriteLock锁有一个问题,如果当前有线程在调用get()方法,那么所有调用add()方法的线程必须要等调用get()方法的线程释放锁才能写,所以如果调用get()方法的线程非常多,那么就会导致写线程一直被阻塞。
StampedLock优化了ReentrantReadWriteLock的乐观锁,当有线程调用get()方法读取数据时,不会阻塞准备执行写操作的线程。
StampedLock的tryOptimisticRead()方法会返回一个stamp版本号,用来表示当前线程在读操作期间数据是否被修改过。
StampedLock提供了一个validate()方法来验证stamp版本号,如果线程在读取过程中没有其他线程对数据做修改,那么stamp的值不会变。
StampedLock使用了乐观锁的思想,避免了在读多写少场景中,大量线程占用读锁造成的写阻塞,在一定程度上提升了读写锁的并发性能。
ReentrantReadWriteLock采用的是悲观读策略。当第一个读线程获取锁后,第二个、第三个读线程还可以获取锁。这样可能会使得写线程一直拿不到锁,从而导致写线程饿死。所以其公平和非公平实现中,都会尽量避免这种情形。比如非公平锁的实现中,如果读线程在尝试获取锁时发现,AQS的等待队列中的头结点的后继结点是独占锁结点,那么读线程会阻塞。
StampedLock采用的是乐观读策略,类似于MVCC。读的时候不加锁,读出来发现数据被修改了,再升级为悲观锁。
2.如何实现具有阻塞或唤醒功能的锁
(1)需要一个共享变量标记锁的状态
AQS有一个int变量state用来记录锁的状态,通过CAS操作确保对state变量操作的线程安全。
(2)需要记录当前是哪个线程持有锁
AQS有一个Thread变量exclusiveOwnerThread用来记录持有锁的线程。当state = 0时,没有线程持有锁,此时exclusiveOwnerThread = null。当state = 1时,有一线程持有锁,此时exclusiveOwnerThread = 该线程。当state > 1,说明有一线程重入了锁。
(3)需要支持对线程进行阻塞和唤醒
AQS使用LockSupport工具类的park()方法和unpark()方法,通过Unsafe类提供的native方法实现阻塞和唤醒线程。
(4)需要一个无锁队列来维护阻塞线程
AQS通过一个双向链表和CAS实现了一个无锁的阻塞队列来维护阻塞的线程。
AQS中最关键的两部分是:Node等待队列和state变量。其中Node等待队列是一个双向链表,用来存放阻塞等待的线程,而state变量则用来在加锁和释放锁时标记锁的状态。
3.ReentractLock如何获取锁
(1)compareAndSetState()方法尝试加锁
ReentractLock是基于NonfairSync的lock()方法来实现加锁的。AQS里有一个核心的变量state,代表了锁的状态。在NonfairSync.lock()方法中,会通过CAS操作来设置state从0变为1。如果state原来是0,那么就代表此时还没有线程获取锁,当前线程执行AQS的compareAndSetState()方法便能成功将state设置为1。
所以AQS的compareAndSetState()方法相当于在尝试加锁。AQS的compareAndSetState()方法是基于Unsafe类来实现CAS操作的,Atomic原子类也是基于Unsafe来实现CAS操作的。
(2)setExclusiveOwnerThread()方法设置加锁线程为当前线程
如果执行compareAndSetState(0, 1)返回的是true,那么就说明加锁成功,于是执行AQS的setExclusiveOwnerThread()方法设置加锁线程为当前线程。
(3)线程重入锁时CAS操作失败
假如线程1加锁后又调用ReentrantLock.lock()方法,应如何实现可重入锁?此时state = 1,故执行AQS的compareAndSetState(0, 1)方法会返回false。所以首先通过CAS操作,尝试获取锁会失败,然后返回false,于是便会执行AQS的acquire(1)方法。
(4)Sync的nonfairTryAcquire()实现可重入锁
在AQS的acquire()方法中,首先会执行AQS的tryAcquire()方法尝试获取锁。但AQS的tryAcquire()方法是个保护方法,需要由子类重写。所以其实会执行继承自AQS子类Sync的NonfairSync的tryAcquire()方法,而该方法最终又执行回AQS子类Sync的nonfairTryAcquire()方法。
在AQS子类Sync的nonfairTryAcquire()方法中:首先判断state是否为0,如果是则表明此时已释放锁,可通过CAS来获取锁。否则判断持有锁的线程是否为当前线程,如果是则对state进行累加。也就是通过对state进行累加,实现持有锁的线程可以重入锁。
(5)加锁失败的线程的具体处理流程
首先在ReentrantLock内部类NonfairSync的lock()方法中,执行AQS的compareAndSetState()方法尝试获取锁是失败的。
于是执行AQS的acquire()方法 -> 执行NonfairSync的tryAcquire()方法。也就是执行继承自AQS的Sync的nonfairTryAcquire()方法,进行判断是否是重入锁 + 是否已释放锁。发现也是失败的,所以继承自Sync的NonfairSync的tryAcquire()方法返回false。
然后在AQS的acquire()方法中,if判断的第一个条件tryAcquire()便是false,所以接着会执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。也就是先执行AQS的addWaiter()方法将当前线程加入等待队列,然后再去执行AQS的acquireQueued()方法将当前线程挂起阻塞等待。
ReentrantLock内部类NonfairSync的lock()方法总结:如果CAS操作失败,则说明有线程正在持有锁,此时会继续调用acquire(1)。然后通过NonfairSync的tryAcquire()方法尝试获取独占锁,也就是通过Sync的nonfairTryAcquire()方法尝试获取独占锁。如果NonfairSync的tryAcquire()方法返回false,说明锁已被占用。于是执行AQS的addWaiter()方法将当前线程封装成Node并添加到等待队列,接着执行AQS的acquireQueued()方法通过自旋尝试获取锁以及挂起线程。
(6)执行AQS的addWaiter()方法维护等待队列
在AQS的addWaiter()方法中:首先会将当前获取锁失败的线程封装为一个Node对象,然后判断等待队列(双向链表)的尾结点是否为空。如果尾结点不为空,则使用CAS操作 + 尾插法将Node对象插入等待队列中。如果尾结点为空或者尾结点不为空时CAS操作失败,则调用enq()方法通过自旋 + CAS构建等待队列或把Node对象插入等待队列。注意:等待队列的队头是个空的Node结点,新增一个结点时会从尾部插入。
(7)执行AQS的acquireQueued()方法挂起线程
执行完AQS的addWaiter()方法后便执行AQS的acquireQueued()方法。
在AQS的acquireQueued()方法中:首先会判断传入结点的上一个结点是否为等待队列的头结点。如果是,则再次调用NonfairSync的tryAcquire()方法尝试获取锁。如果获取锁成功,则将传入的Node结点从等待队列中移除。同时设置传入的Node结点为头结点,然后将该结点设置为空。从而确保等待队列的头结点是一个空的Node结点。注意:NonfairSync的tryAcquire()方法会判断是否重入锁 + 是否已释放锁。
在AQS的acquireQueued()方法中:如果首先进行的尝试获取锁失败了,那么就执行shouldParkAfterFailedAcquire()方法判断是否要将当前线程挂起。如果需要将当前线程挂起,则会调用parkAndCheckInterrupt()方法进行挂起,也就是通过调用LockSupport的park()方法挂起当前线程。注意:如果线程被中断,只会暂时设置interrupted为true。然后还要继续等待被唤醒获取锁,才能调用selfInterrupt()方法对线程中断。
AQS的acquireQueued()方法总结:如果当前结点的前驱结点不是队头结点或者当前线程尝试抢占锁失败,那么都会调用shouldParkAfterFailedAcquire()方法,修改当前线程结点的前驱结点的状态为SIGNAL + 决定是否应挂起当前线程。shouldParkAfterFailedAcquire()方法作用是检查当前结点的前驱结点状态。如果状态是SIGNAL,则可以挂起线程。如果状态是CANCELED,则要移除该前驱结点。如果状态是其他,则通过CAS操作修改该前驱结点的状态为SIGNAL。
(5)如何处理正常唤醒和中断唤醒
LockSupport的park操作,会挂起一个线程。LockSupport的unpark操作,会唤醒被挂起的线程。
被LockSupport.park()方法阻塞的线程被其他线程唤醒有两种情况。情况一:其他线程调用了LockSupport.unpark()方法,正常唤醒。情况二:其他线程调用了阻塞线程Thread的interrupt()方法,中断唤醒。
正是因为被LockSupport.park()方法阻塞的线程可能会被中断唤醒,所以AQS的acquireQueued()方法才写了一个for自旋。当阻塞的线程被唤醒后,如果发现自己的前驱结点是头结点,那么就去获取锁。如果获取不到锁,那么就再次阻塞自己,不断重复直到获取到锁为止。
被LockSupport.park()方法阻塞的线程不管是正常唤醒还是被中断唤醒,唤醒后都会通过Thread.interruptrd()方法来判断是否是中断唤醒。如果是中断唤醒,唤醒后不会立刻响应中断,而是再次获取锁,获取到锁后才能响应中断。
4.AQS的acquire()方法和release()方法总结
(1)acquire()方法获取锁的流程总结
首先调用AQS子类的tryAcquire()方法尝试获取锁(是否重入 + 是否释放锁)。如果获取成功,则说明是重入锁或CAS抢占释放的锁成功,于是退出返回。如果获取失败,则调用AQS的addWaiter()方法将当前线程封装成Node结点,并通过AQS的compareAndSetTail()方法将该Node结点添加到等待队列尾部。
然后将该Node结点传入AQS的acquireQueued()方法,通过自旋尝试获取锁。在AQS的acquireQueued()方法中,会判断该Node结点的前驱是否为头结点。如果不是,则挂起当前线程进行阻塞。如果是,则尝试获取锁。如果获取成功,则设置当前结点为头结点,然后退出返回。如果获取失败,则继续挂起当前线程进行阻塞。
当被阻塞线程,被其他线程中断唤醒或其对应结点的前驱结点释放了锁,那么就继续判断该线程对应结点的前驱结点是否成为头结点。
(2)release()方法释放锁的总结
AQS的release()方法主要做了两件事情:一是通过tryRelease()方法释放锁(递减state变量),二是通过unparkSuccessor()方法唤醒等待队列中的下一个线程。
由于是独占锁,只有持有锁的线程才有资格释放锁,所以tryRelease()方法修改state变量值时不需要使用CAS操作。
5.ReentractReadWriteLock的基本原理
(1)读锁和写锁关系
表面上读锁和写锁是两把锁,但实际上只是同一把锁的两个视图。读锁和写锁在初始化的时候会共用一个Sync,也就是同一把锁、两类线程。其中读线程和读线程不互斥,读线程和写线程互斥,写线程和写线程互斥。
(2)锁状态设计
和独占锁一样,读写锁也是使用state变量来表示锁的状态。只是将state变量拆成两半:高16位表示读锁状态,低16位表示写锁状态。
读写锁是通过位运算来快速确定读和写的状态的。假设当前state = s,则写状态等于s & ((1 << 16) - 1),读状态等于s >>> 16。当写状态增加1时,state = s + 1。当读状态加1时,state = s + (1 << 16)。
将一个int型的state变量拆成两半,而不是用两个int型变量分别表示读锁和写锁的状态,是因为无法用一次CAS同时操作两个int型变量。
1 << 0 等于 1,(1 << 0) - 1 = 0
1 << 1 等于 10,(1 << 1) - 1 = 01
1 << 2 等于 100,(1 << 2) - 1 = 011
1 << 4 等于 1000,(1 << 4) - 1 = 0111
1 << 8 等于 100000000,(1 << 8) - 1 = 011111111
1 << 16 等于 10000000000000000,(1 << 16) - 1 = 01111111111111111
//所以s & ((1 << 16) - 1)相当于将s的高16位全部抹去,只剩下低16位
//若s = 11111,则s >>> 2 = 00111
//所以s >>> 16,就是无符号补0右移16位
(3)写锁的获取与释放
写锁是一个可重入的排他锁,它只能被一个线程同时获取。如果当前线程已获取写锁,则增加写状态:s + 1。如果当前线程在获取写锁时,读锁已被获取或者自己不是已获写锁的线程,则进入等待状态。
(4)读锁的获取与释放
读锁是一个可重入的共享锁,它能被多个线程同时获取。在写状态为0时,读锁总会被成功获取,而所做的也只是增加读状态。如果当前线程已获取读锁,则增加读状态:state = s + (1 << 16)。如果当前线程在获取读锁时,写锁已被其他其他线程获取,则该线程进入等待状态。
6.ReentractReadWriteLock如