Java多线程与高并发专题——生产/消费者模式
引入
我们的很多技术设计思路都来源于生活,生产/消费者模式也不例外。在现实世界中,我们把生产商品的一方称为生产者,把消费商品的一方称为消费者,有时生产者的生产速度特别快,但消费者的消费速度跟不上,就是所谓的“产能过剩”,相反,有时消费者的消费速度大于生产者的生产速度,也就是所谓的“供不应求”。
什么是生产/消费者模式?
生产/消费者模式是一种经典的程序设计模式。
其设计思路正如其名,通过两类角色,即生产者和消费者来实现。生产者负责生成数据或任务,并将其放入一个共享的缓冲区中;消费者则从缓冲区中获取数据或任务并进行处理。生产者和消费者之间通过缓冲区进行解耦,它们不需要直接相互通信或了解对方的具体工作细节,只需要关注缓冲区的状态和操作。
其核心优势:
- 解耦:生产者和消费者之间相互独立,它们只需要关注自己的生产和消费行为,而不需要了解对方的具体实现细节。这样可以降低系统的耦合度,提高系统的可维护性和可扩展性。
- 提高性能:生产者和消费者可以并行工作,生产者可以在消费者处理数据的同时继续生产数据,从而提高系统的整体性能。
- 缓冲作用:缓冲区可以起到缓冲数据的作用,当生产者生产数据的速度大于消费者消费数据的速度时,缓冲区可以暂存数据,避免数据丢失;当消费者消费数据的速度大于生产者生产数据的速度时,缓冲区可以提供数据,保证消费者不会因为没有数据而等待。
在java并发编程时,要实现生产/消费者模式,通常需要在两者之间增加一个阻塞队列作为媒介,有了媒介之后就相当于有了一个缓冲,平衡了两者的能力,整体的设计如上图所示,最上面是阻塞队列,左侧是生产者线程,生产者在生产数据后将数据存放在阻塞队列中,右侧是消费者线程,消费者获取阻塞队列中的数据。而中间分别代表生产者消费者之间互相通信的过程,因为无论阻塞队列是满还是空都可能会产生阻塞,阻塞之后就需要在合适的时机去唤醒被阻塞的线程。
那么什么时候阻塞线程需要被唤醒呢?
主要有两种情况:
- 当消费者看到阻塞队列为空时,开始进入等待,这时生产者一旦往队列中放入数据,就会通知所有的消费者,唤醒阻塞的消费者线程。
- 如果生产者发现队列已经满了,也会被阻塞,而一旦消费者获取数据之后就相当于队列空了一个位置,这时消费者就会通知所有正在阻塞的生产者进行生产。
相关重要方法的使用注意事项
我们通过下面几个问题去梳理 wait/notify/notifyAll 方法的使用注意事项。
为什么 wait 方法必须在 synchronized 保护的同步代码中使用?
wait 方法的源码注释如下:
/**
* Causes the current thread to wait until another thread invokes the
* {@link java.lang.Object#notify()} method or the
* {@link java.lang.Object#notifyAll()} method for this object.
* In other words, this method behaves exactly as if it simply
* performs the call {@code wait(0)}.
* <p>
* The current thread must own this object's monitor. The thread
* releases ownership of this monitor and waits until another thread
* notifies threads waiting on this object's monitor to wake up
* either through a call to the {@code notify} method or the
* {@code notifyAll} method. The thread then waits until it can
* re-obtain ownership of the monitor and resumes execution.
* <p>
* As in the one argument version, interrupts and spurious wakeups are
* possible, and this method should always be used in a loop:
* <pre>
* synchronized (obj) {
* while (<condition does not hold>)
* obj.wait();
* ... // Perform action appropriate to condition
* }
* </pre>
* This method should only be called by a thread that is the owner
* of this object's monitor. See the {@code notify} method for a
* description of the ways in which a thread can become the owner of
* a monitor.
*
* @throws IllegalMonitorStateException if the current thread is not
* the owner of the object's monitor.
* @throws InterruptedException if any thread interrupted the
* current thread before or while the current thread
* was waiting for a notification. The <i>interrupted
* status</i> of the current thread is cleared when
* this exception is thrown.
* @see java.lang.Object#notify()
* @see java.lang.Object#notifyAll()
*/
public final void wait() throws InterruptedException {
wait(0);
}
翻译:
导致当前线程等待,直到另一个线程针对此对象调用 {@link java.lang.Object#notify ()} 方法或 {@link java.lang.Object#notifyAll ()} 方法。
换句话说,此方法的行为完全就像是简单地执行 {@code wait (0)} 调用。
当前线程必须拥有此对象的监视器。该线程释放此监视器的所有权并等待,直到另一个线程通过调用 {@code notify} 方法或 {@code notifyAll} 方法来通知在此对象的监视器上等待的线程唤醒。
然后,该线程等待,直到它可以重新获得监视器的所有权并继续执行。
与单参数版本一样,可能会出现中断和虚假唤醒,因此此方法应始终在循环中使用:
synchronized (obj) {
while (<条件不成立>)
obj.wait();
... // 执行与条件相应的操作
}此方法仅应由作为此对象监视器所有者的线程调用。有关线程如何成为监视器所有者的方式,请参阅 {@code notify} 方法的说明。
@throws IllegalMonitorStateException 如果当前线程不是此对象监视器的所有者。
@throws InterruptedException 如果在当前线程等待通知之前或期间,有任何线程中断了当前线程。当抛出此异常时,当前线程的 <i>中断状态</i> 会被清除。
其中提到,在使用 wait 方法时,必须把 wait 方法写在 synchronized 保护的 while 代码块中,并始终判断执行条件是否满足,如果满足就往下继续执行,如果不满足就执行 wait 方法,而在执行 wait 方法之前,必须先持有对象的 monitor 锁,也就是通常所说的 synchronized 锁。
设计成这样主要是为了保证线程通信的正确性和数据的一致性:
-
实现线程间的正确通信和协作
wait
方法的作用是让当前线程暂停执行,并释放它所持有的对象锁,进入等待状态,直到其他线程调用同一对象的notify
或notifyAll
方法来唤醒它。这种线程间的协作需要在一个可靠的同步机制下进行,以确保线程之间能够正确地传递信号和协调执行顺序。
如果wait
方法不在synchronized
代码中使用,那么就无法保证在调用wait
方法时,当前线程对相关对象的操作是原子性的和可见的,可能会出现数据不一致的情况。例如,多个线程同时访问共享资源,一个线程在修改共享资源后调用wait
,但其他线程可能看不到这个修改,导致程序出现错误的结果。 -
确保可见性和原子性
synchronized
关键字提供了内存可见性和原子性的保证。当一个线程进入synchronized
代码块或方法时,它会获取对象的锁,这会导致线程从主内存中重新读取共享变量的值,并且在执行完synchronized
代码块或方法后,会将修改后的变量值刷新回主内存。
这样,当一个线程在synchronized
代码中调用wait
方法时,它对共享变量所做的任何修改都会被其他线程可见,并且在等待期间,其他线程无法访问被锁定的对象,从而保证了数据的一致性和完整性。如果没有synchronized
的保护,线程对共享变量的修改可能不会及时被其他线程看到,就会破坏线程之间的通信机制。 -
避免非法的监视器状态异常
Java 的线程模型中,每个对象都有一个与之关联的监视器(monitor)。wait
、notify
和notifyAll
方法都是基于对象的监视器来实现线程间的通信的。只有当线程获得了对象的监视器锁(即进入了synchronized
代码块或方法),它才有权调用wait
方法。
如果在没有获取监视器锁的情况下调用wait
方法,JVM 会抛出IllegalMonitorStateException
异常,这是为了保证线程对对象监视器的操作是合法和有序的。
关于“虚假唤醒”(spurious wakeup)的问题,线程可能在既没有被notify/notifyAll,也没有被中断或者超时的情况下被唤醒,这种唤醒是我们不希望看到的。虽然在实际生产中,虚假唤醒发生的概率很小,但是程序依然需要保证在发生虚假唤醒的时候的正确性,所以就需要采用while循环的结构。
这样即便被虚假唤醒了,也会再次检查while里面的条件,如果不满足条件,就会继续wait,也就消除了虚假唤醒的风险。
为什么 wait/notify/notifyAll 被定义在 Object 类中,而 sleep 定义在 Thread 类中?
主要有两点原因:
- 因为 Java 中每个对象都有一把称之为 monitor 监视器的锁,由于每个对象都可以上锁,这就要求在对象头中有一个用来保存锁信息的位置。这个锁是对象级别的,而非线程级别的,wait/notify/notifyAll 也都是锁级别的操作,它们的锁属于对象,所以把它们定义在 Object 类中是最合适,因为 Object 类是所有对象的父类。
- 因为如果把 wait/notify/notifyAll 方法定义在 Thread 类中,会带来很大的局限性,比如一个线程可能持有多把锁,以便实现相互配合的复杂逻辑,假设此时 wait 方法定义在 Thread 类中,如何实现让一个线程持有多把锁呢?又如何明确线程等待的是哪把锁呢?既然我们是让当前线程去等待某个对象的锁,自然应该通过操作对象来实现,而不是操作线程。
wait/notify 和 sleep 方法的异同?
相同点:
-
它们都可以让线程阻塞。
-
它们都可以响应 interrupt 中断:在等待的过程中如果收到中断信号,都可以进行响应,并抛出InterruptedException 异常。
不同点:
-
wait 方法必须在 synchronized 保护的代码中使用,而 sleep 方法并没有这个要求。
-
在同步代码中执行 sleep 方法时,并不会释放 monitor 锁,但执行 wait 方法时会主动释放monitor 锁。
-
sleep 方法中会要求必须定义一个时间,时间到期后会主动恢复,而对于没有参数的 wait 方法而言,意味着永久等待,直到被中断或被唤醒才能恢复,它并不会主动恢复。
-
wait/notify 是 Object 类的方法,而 sleep 是 Thread 类的方法。
实现生产/消费者模式
如何用 BlockingQueue 实现?
为了便于阅读,先不纠结一些语法细节,代码里省略了 try/catch 检测等,以下便是利用 BlockingQueue 实现生产者消费者模式的示例代码:
public static void main(String[] args) {
BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
Runnable producer = () -> {
while (true) {
queue.put(new Object());
}
};
new Thread(producer).start();
new Thread(producer).start();
Runnable consumer = () -> {
while (true) {
queue.take();
}
};
new Thread(consumer).start();
new Thread(consumer).start();
}
如代码所示,首先,创建了一个 ArrayBlockingQueue 类型的 BlockingQueue,命名为 queue 并将它的容量设置为10;其次,创建一个简单的生产者,while(true) 循环体中的queue.put() 负责往队列添加数据;然后,创建两个生产者线程并启动;同样消费者也非常简单,while(true) 循环体中的 queue.take() 负责消费数据,同时创建两个消费者线程并启动。
虽然代码非常简单,但实际上 ArrayBlockingQueue 已经在背后完成了很多工作,比如队列满了就去阻塞生产者线程,队列有空就去唤醒生产者线程等。
如何用 Condition 实现?
BlockingQueue 实现生产者消费者模式看似简单,背后却暗藏玄机,在掌握这种方法的基础上仍需 要掌握更复杂的实现方法,我们接下来看如何在掌握了 BlockingQueue 的基础上利用 Condition 实现生产者消费者模式。它们背后的实现原理非常相似,相当于我们自己实现一个简易版的 BlockingQueue:
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class MyBlockingQueueForCondition {
private Queue queue;
private int max = 16;
private ReentrantLock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public MyBlockingQueueForCondition(int size){
this.max = size;
queue = new LinkedList();
}
public void put(Object o) throws InterruptedException {
lock.lock();
try {
while (queue.size() == max) {
notFull.await();
}
queue.add(o);
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (queue.size() == 0) {
notEmpty.await();
}
Object item = queue.remove();
notFull.signalAll();
return item;
} finally {
lock.unlock();
}
}
}
如代码所示,首先,定义了一个队列变量queue并设置最大容量为16;其次,定义了一个ReentrantLock类型的Lock锁,并在Lock锁的基础上创建两个Condition,一个是notEmpty,另一个是notFull,分别代表队列没有空和没有满的条件;最后,声明了put和take这两个核心方法。
因为生产者消费者模式通常是面对多线程的场景,需要一定的同步措施保障线程安全,所以在put方法中先将Lock锁上,然后,在while的条件里检测queue是不是已经满了,如果已经满了,则调用notFull的await()阻塞生产者线程并释放Lock,如果没有满,则往队列放入数据并利用notEmpty.signalAll()通知正在等待的所有消费者并唤醒它们。最后在finally中利用lock.unlock()方法解锁,把unlock方法放在finally中是一个基本原则,否则可能会产生无法释放锁的情况。
下面再来看 take 方法,take 方法实际上是与 put 方法相互对应的,同样是通过 while 检查队列是否为 空,如果为空,消费者开始等待,如果不为空则从队列中获取数据并通知生产者队列有空余位置,最后 在 finally 中解锁。
这里需要注意,我们在 take() 方法中使用 while( queue.size() == 0 ) 检查队列状态,而不能用 if( queue.size() == 0 )。
为什么呢?因为生产者消费者往往是多线程的,我们假设有两个消费者,第一个消费者线程获取数据时,发现队列为空,便进入等待状态;因为第一个线程在等待时会释放 Lock 锁,所以第二个消费者可以进入并执行 if( queue.size() == 0 ),也发现队列为空,于是第二个线程也进入等待;而此时,如果生产者生产了一个数据,便会唤醒两个消费者线程,而两个线程中只有一个线程可以拿到锁,并执行 queue.remove 操作,另外一个线程因为没有拿到锁而卡在被唤醒的地方,而第一个线程执行完操作后会在 finally 中通过 unlock 解锁,而此时第二个线程便可以拿到被第一个线程释放的锁,继续执行操作,也会去调用 queue.remove 操作,然而这个时候队列已经为空了,所以会抛出 NoSuchElementException 异常,这不符合我们的逻辑。而如果用 while 做检查,当第一个消费者被唤醒得到锁并移除数据之后,第二个线程在执行 remove 前仍会进行 while 检查,发现此时依然满足 queue.size() == 0 的条件,就会继续执行 await 方法,避免了获取的数据为 null 或抛出异常的情况。
如何用 wait/notify 实现?
最后我们再来看看使用 wait/notify 实现生产者消费者模式的方法,实际上实现原理和Condition 是非常类似的,它们是兄弟关系:
import java.util.LinkedList;
class MyBlockingQueue {
private int maxSize;
private LinkedList<Object> storage;
public MyBlockingQueue(int size) {
this.maxSize = size;
storage = new LinkedList<>();
}
public synchronized void put() throws InterruptedException {
while (storage.size() == maxSize) {
wait();
}
storage.add(new Object());
notifyAll();
}
public synchronized void take() throws InterruptedException {
while (storage.size() == 0) {
wait();
}
System.out.println(storage.remove());
notifyAll();
}
}
如代码所示,最主要的部分仍是 take 与 put 方法,我们先来看 put 方法,put 方法被 synchronized 保护,while 检查队列是否为满,如果不满就往里放入数据并通过 notifyAll() 唤醒其他线程。同样,take 方法也被 synchronized 修饰,while 检查队列是否为空,如果不为空就获取数据并唤醒其他线程。
总结
第一种 BlockingQueue 模式实现比较简单,但其背后的实现原理在第二种、第三种实现方法中得以体现,第二种、第三种实现方法本质上是我们自己实现了 BlockingQueue 的一些核心逻辑,供生产者与消费者使用。