【Java并发编程】信号量Semaphore详解
一、简介
Semaphore(信号量):是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
Semaphore 一般用于流量的控制,特别是公共资源有限的应用场景。例如数据库的连接,假设数据库的连接数上线为10个,多个线程并发操作数据库可以使用Semaphore来控制并发操作数据库的线程个数最多为10个。
Semaphore 是一个有效的流量控制工具,它基于 AQS 共享锁实现。我们常常用它来控制对有限资源的访问。
- 每次使用资源前,先申请一个信号量,如果资源数不够,就会阻塞等待;
- 每次释放资源后,就释放一个信号量。
二、源码
2.1 类总览
通过上面的类图可以看到,Semaphore 与 ReentrantLock 的内部类的结构相同,类内部总共存在 Sync、NonfairSync、FairSync 三个类, NonfairSync 与 FairSync 类继承自 Sync 类,其只有一个 tryAcquireShared() 方法,重写了AQS的该方法。Sync 类继承自 AbstractQueuedSynchronizer 抽象类。
与 CountDownLatch 类似,Semaphore 主要是通过 AQS 的共享锁机制实现的,因此它的核心属性只有一个 Sync。总体源码如下:
public class Semaphore implements java.io.Serializable {
//序列化版本号
private static final long serialVersionUID = -3222578661600680210L;
//同步队列
private final Sync sync;
//构造方法
//指定许可数,默认为非公平策略
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
//指定许可数和是否公平策略
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
//Semaphore提供了acquire方法来获取一个许可,会阻塞线程(有重载方法,可以指定获取许可的个数)
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1); //调用AQS的acquireSharedInterruptibly方法, 即共享式获取响应中断
}
//tryAcquire的意思是尝试获取许可,如果获取成功返回true,否则返回false,不会阻塞线程,而且不响应中断
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
//Semaphore提供release来释放许可
public void release() {
sync.releaseShared(1); //调用AQS的releaseShared方法,即释放共享式同步状态
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
//获取许可数目
final int getPermits() {
return getState();
}
//共享模式下非公平策略获取
//本质就是一个自旋方法,通过自旋+CAS来保证修改许可值的线程安全性,该方法返回的情况有如下两种情况:
// 信号量不够,直接返回,返回值为负数,表示获取失败;
// 信号量足够,且CAS操作成功,返回值为剩余许可值,获取成功。
final int nonfairTryAcquireShared(int acquires) {
for (;;) { //自旋
int available = getState(); //获取可用许可值
int remaining = available - acquires; //计算剩余的许可值
//如果剩余许可值小于0,说明许可不够用了,直接返回,否则CAS更新许可值,更新成功返回,否则继续自旋
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
//共享模式下进行释放
//该方法也是一个自旋方法,通过自旋+CAS原子性地修改许可值
protected final boolean tryReleaseShared(int releases) {
for (;;) { //自旋
int current = getState(); //获取许可值
int next = current + releases; //计算释放后的许可值
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) //CAS修改许可值,成功则返回,失败则继续自旋
return true;
}
}
//根据指定的缩减量减小可用许可的数目
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
//获取并返回立即可用的所有许可数目
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
//采用非公平策略获取资源
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
//获取许可
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires); //共享模式下非公平策略获取
}
}
//采用公平策略获取资源
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
//获取许可
protected int tryAcquireShared(int acquires) {
for (;;) {
//获取共享锁之前,先调用hasQueuedPredecessors方法来判断队列中是否存在其他正在排队的节点,
// 如果是返回true,否则为false。因此当存在其他正在排队的节点,当前节点就无法获取许可,只能排队等待,这也是公平策略的体现。
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
2.2 核心方法
获取信号量的方法总共有四个:
释放信号量的方法有两个:
获取信号量四个方法中后面三个方法原理同 acquire() ,我们这里来分析一下 acquire() 和 release() 方法。
2.2.1 acquire() 方法
获取许可,会阻塞线程,响应中断。
// Semaphore
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
内部调用的是 AQS 的 acquireSharedInterruptibly() 方法, 即共享式获取响应中断,代码如下:
// AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
除了 tryAcquireShared() 方法由 AQS 子类实现,其他方法在 《AQS实现原理》中有讲解过,这里不再赘述。我们来分析一下子类实现的 tryAcquireShared() 方法,这里就要分公平和非公平策略两种情况了。
2.2.1.1 非公平策略下
非公平策略下的 tryAcquireShared() 方法:
// Semaphore#NonfairSync
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
内部调用 Sync#nonfairTryAcquireShared() 方法:
// Sync
final int nonfairTryAcquireShared(int acquires) {
//自旋
for (;;) {
//获取可用许可值
int available = getState();
//计算剩余的许可值
int remaining = available - acquires;
//如果剩余许可值小于0,说明许可不够用了,直接返回,否则CAS更新同步状态,更新成功返回,否则继续自旋
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
该方法本质就是一个自旋方法,通过自旋+CAS来保证修改许可值的线程安全性。方法返回的情况有如下两种情况
- 信号量不够,直接返回,返回值为负数,表示获取失败;
- 信号量足够,且CAS操作成功,返回值为剩余许可值,获取成功。
2.2.1.2 公平策略下
公平策略下的 tryAcquireShared() 方法如下:
// Semaphore#FairSync
protected int tryAcquireShared(int acquires) {
//自旋
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
我们看到它与非公平策略的唯一区别就是多了下面这个 if 代码:
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
......
}
}
// AbstractQueuedSynchronizer
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
即在获取共享锁之前,先调用 hasQueuedPredecessors() 方法来判断队列中是否存在其他正在排队的节点,如果是返回true,否则为false。因此当存在其他正在排队的节点,当前节点就无法获取许可,只能排队等待,这也是公平策略的体现。
2.2.2 release() 方法
Semaphore 提供 release() 方法来释放许可。我们继续分析 release() 方法,源码如下:
// Semaphore
public void release() {
sync.releaseShared(1);
}
//AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
//如果释放锁成功,唤醒正在排队的节点
doReleaseShared();
return true;
}
return false;
}
//Semaphore#Sync
protected final boolean tryReleaseShared(int releases) {
//自旋
for (;;) {
//获取许可值
int current = getState();
//计算释放后的许可值
int next = current + releases;
//如果释放后比释放前的许可值还小,直接报Error
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS修改许可值,成功则返回,失败则继续自旋
if (compareAndSetState(current, next))
return true;
}
}
tryReleaseShared() 方法是一个自旋方法,通过自旋+CAS原子性地修改同步状态,逻辑很简单。
2.2.3 其余方法
获取信号量的方法有四个:
释放信号量的方法有两个:
其余获取和释放信号量的方法原理同上问,不再赘述。接下来看看其余的工具方法。
2.2.3.1 tryAcquire() 尝试获取许可
该方法一共有四种重载形式:
- tryAcquire() :尝试获取许可,如果获取成功返回true,否则返回false,不会阻塞线程,而且不响应中断。
- tryAcquire(int permits) :同上的基础上,可以指定获取许可的个数。
- tryAcquire(long timeout, TimeUnit unit) :指定超时时间,它调用AQS的tryAcquireSharedNanos() 方法,即共享式超时获取。
- tryAcquire(int permits, long timeout, TimeUnit unit) :可以指定获取许可的个数和超时时间。
//Semaphore
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
2.2.3.2 availablePermits() 获取可用许可数
源码如下:
//Semaphore
public int availablePermits() {
//获取可用许可数
return sync.getPermits();
}
//Sync
//获取可用许可数
final int getPermits() {
return getState();
}
2.2.3.3 drainPermits() 耗光信号量
将剩下的信号量一次性消耗光,并且返回所消耗的信号量。
//Semaphore
public int drainPermits() {
return sync.drainPermits();
}
//Sync
final int drainPermits() {
//自旋操作
for (;;) {
//获取信号量值
int current = getState();
//如果信号量为0,直接返回
//否则CAS修改为0,成功则返回,否则继续自旋
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
2.2.3.4 reducePermits() 减少信号量
reducePermits() 和 acquire() 方法相比都是减少信号量的值,但是 reducePermits() 不会导致任何线程阻塞,即只要传递的参数 reductions(减少的信号量的数量)大于0,操作就会成功。所以调用该方法可能会导致信号量最终为负数。
//Semaphore
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
//Sync
final void reducePermits(int reductions) {
//自旋
for (;;) {
//获取当前信号量值
int current = getState();
//计算剩余许可值
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
//CAS修改同步状态,成功则返回,失败则继续自旋
if (compareAndSetState(current, next))
return;
}
}
三、使用案例
这里以经典的停车作为案例。假设停车场有3个停车位,此时有5辆汽车需要进入停车场停车。
public static void main(String[] args) {
//定义semaphore实例,设置许可数为3,即停车位为3个
Semaphore semaphore = new Semaphore(3);
//创建五个线程,即有5辆汽车准备进入停车场停车
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "尝试进入停车场...");
//尝试获取许可
semaphore.acquire();
//模拟停车
long time = (long) (Math.random() * 10 + 1);
System.out.println(Thread.currentThread().getName() + "进入了停车场,停车" + time +
"秒...");
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "开始驶离停车场...");
//释放许可
semaphore.release();
System.out.println(Thread.currentThread().getName() + "离开了停车场!");
}
}, i + "号汽车").start();
}
}
//执行结果
1号汽车尝试进入停车场...
5号汽车尝试进入停车场...
4号汽车尝试进入停车场...
3号汽车尝试进入停车场...
2号汽车尝试进入停车场...
5号汽车进入了停车场,停车5秒...
1号汽车进入了停车场,停车8秒...
4号汽车进入了停车场,停车9秒...
5号汽车开始驶离停车场...
5号汽车离开了停车场!
3号汽车进入了停车场,停车10秒...
1号汽车开始驶离停车场...
1号汽车离开了停车场!
2号汽车进入了停车场,停车2秒...
4号汽车开始驶离停车场...
4号汽车离开了停车场!
2号汽车开始驶离停车场...
2号汽车离开了停车场!
3号汽车开始驶离停车场...
3号汽车离开了停车场!