当前位置: 首页 > article >正文

Java多线程与高并发专题——生产/消费者模式

引入

我们的很多技术设计思路都来源于生活,生产/消费者模式也不例外。在现实世界中,我们把生产商品的一方称为生产者,把消费商品的一方称为消费者,有时生产者的生产速度特别快,但消费者的消费速度跟不上,就是所谓的“产能过剩”,相反,有时消费者的消费速度大于生产者的生产速度,也就是所谓的“供不应求”。

什么是生产/消费者模式?

生产/消费者模式是一种经典的程序设计模式。

其设计思路正如其名,通过两类角色,即生产者和消费者来实现。生产者负责生成数据或任务,并将其放入一个共享的缓冲区中;消费者则从缓冲区中获取数据或任务并进行处理。生产者和消费者之间通过缓冲区进行解耦,它们不需要直接相互通信或了解对方的具体工作细节,只需要关注缓冲区的状态和操作。

其核心优势:

  1. 解耦:生产者和消费者之间相互独立,它们只需要关注自己的生产和消费行为,而不需要了解对方的具体实现细节。这样可以降低系统的耦合度,提高系统的可维护性和可扩展性。
  2. 提高性能:生产者和消费者可以并行工作,生产者可以在消费者处理数据的同时继续生产数据,从而提高系统的整体性能。
  3. 缓冲作用:缓冲区可以起到缓冲数据的作用,当生产者生产数据的速度大于消费者消费数据的速度时,缓冲区可以暂存数据,避免数据丢失;当消费者消费数据的速度大于生产者生产数据的速度时,缓冲区可以提供数据,保证消费者不会因为没有数据而等待。

在java并发编程时,要实现生产/消费者模式,通常需要在两者之间增加一个阻塞队列作为媒介,有了媒介之后就相当于有了一个缓冲,平衡了两者的能力,整体的设计如上图所示,最上面是阻塞队列,左侧是生产者线程,生产者在生产数据后将数据存放在阻塞队列中,右侧是消费者线程,消费者获取阻塞队列中的数据。而中间分别代表生产者消费者之间互相通信的过程,因为无论阻塞队列是满还是空都可能会产生阻塞,阻塞之后就需要在合适的时机去唤醒被阻塞的线程。

那么什么时候阻塞线程需要被唤醒呢?

主要有两种情况:

  1. 当消费者看到阻塞队列为空时,开始进入等待,这时生产者一旦往队列中放入数据,就会通知所有的消费者,唤醒阻塞的消费者线程。
  2. 如果生产者发现队列已经满了,也会被阻塞,而一旦消费者获取数据之后就相当于队列空了一个位置,这时消费者就会通知所有正在阻塞的生产者进行生产。

相关重要方法的使用注意事项

我们通过下面几个问题去梳理 wait/notify/notifyAll 方法的使用注意事项。

为什么 wait 方法必须在 synchronized 保护的同步代码中使用?

wait 方法的源码注释如下:

    /**
     * Causes the current thread to wait until another thread invokes the
     * {@link java.lang.Object#notify()} method or the
     * {@link java.lang.Object#notifyAll()} method for this object.
     * In other words, this method behaves exactly as if it simply
     * performs the call {@code wait(0)}.
     * <p>
     * The current thread must own this object's monitor. The thread
     * releases ownership of this monitor and waits until another thread
     * notifies threads waiting on this object's monitor to wake up
     * either through a call to the {@code notify} method or the
     * {@code notifyAll} method. The thread then waits until it can
     * re-obtain ownership of the monitor and resumes execution.
     * <p>
     * As in the one argument version, interrupts and spurious wakeups are
     * possible, and this method should always be used in a loop:
     * <pre>
     *     synchronized (obj) {
     *         while (&lt;condition does not hold&gt;)
     *             obj.wait();
     *         ... // Perform action appropriate to condition
     *     }
     * </pre>
     * This method should only be called by a thread that is the owner
     * of this object's monitor. See the {@code notify} method for a
     * description of the ways in which a thread can become the owner of
     * a monitor.
     *
     * @throws  IllegalMonitorStateException  if the current thread is not
     *               the owner of the object's monitor.
     * @throws  InterruptedException if any thread interrupted the
     *             current thread before or while the current thread
     *             was waiting for a notification.  The <i>interrupted
     *             status</i> of the current thread is cleared when
     *             this exception is thrown.
     * @see        java.lang.Object#notify()
     * @see        java.lang.Object#notifyAll()
     */
    public final void wait() throws InterruptedException {
        wait(0);
    }

翻译:

导致当前线程等待,直到另一个线程针对此对象调用 {@link java.lang.Object#notify ()} 方法或 {@link java.lang.Object#notifyAll ()} 方法。

换句话说,此方法的行为完全就像是简单地执行 {@code wait (0)} 调用。

当前线程必须拥有此对象的监视器。该线程释放此监视器的所有权并等待,直到另一个线程通过调用 {@code notify} 方法或 {@code notifyAll} 方法来通知在此对象的监视器上等待的线程唤醒。

然后,该线程等待,直到它可以重新获得监视器的所有权并继续执行。

与单参数版本一样,可能会出现中断和虚假唤醒,因此此方法应始终在循环中使用:

synchronized (obj) {
    while (<条件不成立>)
            obj.wait();
             ... // 执行与条件相应的操作
}

此方法仅应由作为此对象监视器所有者的线程调用。有关线程如何成为监视器所有者的方式,请参阅 {@code notify} 方法的说明。

@throws IllegalMonitorStateException 如果当前线程不是此对象监视器的所有者。
@throws InterruptedException 如果在当前线程等待通知之前或期间,有任何线程中断了当前线程。当抛出此异常时,当前线程的 <i>中断状态</i> 会被清除。

其中提到,在使用 wait 方法时,必须把 wait 方法写在 synchronized 保护的 while 代码块中,并始终判断执行条件是否满足,如果满足就往下继续执行,如果不满足就执行 wait 方法,而在执行 wait 方法之前,必须先持有对象的 monitor 锁,也就是通常所说的 synchronized 锁。

设计成这样主要是为了保证线程通信的正确性和数据的一致性:

  1. 实现线程间的正确通信和协作
    wait方法的作用是让当前线程暂停执行,并释放它所持有的对象锁,进入等待状态,直到其他线程调用同一对象的notifynotifyAll方法来唤醒它。这种线程间的协作需要在一个可靠的同步机制下进行,以确保线程之间能够正确地传递信号和协调执行顺序。
    如果wait方法不在synchronized代码中使用,那么就无法保证在调用wait方法时,当前线程对相关对象的操作是原子性的和可见的,可能会出现数据不一致的情况。例如,多个线程同时访问共享资源,一个线程在修改共享资源后调用wait,但其他线程可能看不到这个修改,导致程序出现错误的结果。

  2. 确保可见性和原子性
    synchronized关键字提供了内存可见性和原子性的保证。当一个线程进入synchronized代码块或方法时,它会获取对象的锁,这会导致线程从主内存中重新读取共享变量的值,并且在执行完synchronized代码块或方法后,会将修改后的变量值刷新回主内存。
    这样,当一个线程在synchronized代码中调用wait方法时,它对共享变量所做的任何修改都会被其他线程可见,并且在等待期间,其他线程无法访问被锁定的对象,从而保证了数据的一致性和完整性。如果没有synchronized的保护,线程对共享变量的修改可能不会及时被其他线程看到,就会破坏线程之间的通信机制。

  3. 避免非法的监视器状态异常
    Java 的线程模型中,每个对象都有一个与之关联的监视器(monitor)。waitnotifynotifyAll方法都是基于对象的监视器来实现线程间的通信的。只有当线程获得了对象的监视器锁(即进入了synchronized代码块或方法),它才有权调用wait方法。
    如果在没有获取监视器锁的情况下调用wait方法,JVM 会抛出IllegalMonitorStateException异常,这是为了保证线程对对象监视器的操作是合法和有序的。

关于“虚假唤醒”(spurious wakeup)的问题,线程可能在既没有被notify/notifyAll,也没有被中断或者超时的情况下被唤醒,这种唤醒是我们不希望看到的。虽然在实际生产中,虚假唤醒发生的概率很小,但是程序依然需要保证在发生虚假唤醒的时候的正确性,所以就需要采用while循环的结构。

这样即便被虚假唤醒了,也会再次检查while里面的条件,如果不满足条件,就会继续wait,也就消除了虚假唤醒的风险。

为什么 wait/notify/notifyAll 被定义在 Object 类中,而 sleep 定义在 Thread 类中?

主要有两点原因:

  1. 因为 Java 中每个对象都有一把称之为 monitor 监视器的锁,由于每个对象都可以上锁,这就要求在对象头中有一个用来保存锁信息的位置。这个锁是对象级别的,而非线程级别的,wait/notify/notifyAll 也都是锁级别的操作,它们的锁属于对象,所以把它们定义在 Object 类中是最合适,因为 Object 类是所有对象的父类。
  2. 因为如果把 wait/notify/notifyAll 方法定义在 Thread 类中,会带来很大的局限性,比如一个线程可能持有多把锁,以便实现相互配合的复杂逻辑,假设此时 wait 方法定义在 Thread 类中,如何实现让一个线程持有多把锁呢?又如何明确线程等待的是哪把锁呢?既然我们是让当前线程去等待某个对象的锁,自然应该通过操作对象来实现,而不是操作线程。

wait/notify 和 sleep 方法的异同?

相同点:

  1. 它们都可以让线程阻塞。

  2. 它们都可以响应 interrupt 中断:在等待的过程中如果收到中断信号,都可以进行响应,并抛出InterruptedException 异常。

不同点:

  1. wait 方法必须在 synchronized 保护的代码中使用,而 sleep 方法并没有这个要求。

  2. 在同步代码中执行 sleep 方法时,并不会释放 monitor 锁,但执行 wait 方法时会主动释放monitor 锁。

  3. sleep 方法中会要求必须定义一个时间,时间到期后会主动恢复,而对于没有参数的 wait 方法而言,意味着永久等待,直到被中断或被唤醒才能恢复,它并不会主动恢复。

  4. wait/notify 是 Object 类的方法,而 sleep 是 Thread 类的方法。

实现生产/消费者模式

如何用 BlockingQueue 实现?

为了便于阅读,先不纠结一些语法细节,代码里省略了 try/catch 检测等,以下便是利用 BlockingQueue 实现生产者消费者模式的示例代码:

public static void main(String[] args) {
    BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
    Runnable producer = () -> {
        while (true) {
            queue.put(new Object());
        }
    };
    new Thread(producer).start();
    new Thread(producer).start();
    Runnable consumer = () -> {
        while (true) {
            queue.take();
        }
    };
    new Thread(consumer).start();
    new Thread(consumer).start();
}

如代码所示,首先,创建了一个 ArrayBlockingQueue 类型的 BlockingQueue,命名为 queue 并将它的容量设置为10;其次,创建一个简单的生产者,while(true) 循环体中的queue.put() 负责往队列添加数据;然后,创建两个生产者线程并启动;同样消费者也非常简单,while(true) 循环体中的 queue.take() 负责消费数据,同时创建两个消费者线程并启动。

虽然代码非常简单,但实际上 ArrayBlockingQueue 已经在背后完成了很多工作,比如队列满了就去阻塞生产者线程,队列有空就去唤醒生产者线程等。

如何用 Condition 实现?

BlockingQueue 实现生产者消费者模式看似简单,背后却暗藏玄机,在掌握这种方法的基础上仍需 要掌握更复杂的实现方法,我们接下来看如何在掌握了 BlockingQueue 的基础上利用 Condition 实现生产者消费者模式。它们背后的实现原理非常相似,相当于我们自己实现一个简易版的 BlockingQueue:

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MyBlockingQueueForCondition {
    private Queue queue;
    private int max = 16;
    private ReentrantLock lock = new ReentrantLock();
    private Condition notEmpty = lock.newCondition();
    private Condition notFull = lock.newCondition();
    public MyBlockingQueueForCondition(int size){
        this.max = size;
        queue = new LinkedList();
    }
    public void put(Object o) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == max) {
                notFull.await();
            }
            queue.add(o);
            notEmpty.signalAll();
        } finally {
            lock.unlock();
        }
    }
    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == 0) {
                notEmpty.await();
            }
            Object item = queue.remove();
            notFull.signalAll();
            return item;
        } finally {
            lock.unlock();
        }
    }
}

如代码所示,首先,定义了一个队列变量queue并设置最大容量为16;其次,定义了一个ReentrantLock类型的Lock锁,并在Lock锁的基础上创建两个Condition,一个是notEmpty,另一个是notFull,分别代表队列没有空和没有满的条件;最后,声明了put和take这两个核心方法。

因为生产者消费者模式通常是面对多线程的场景,需要一定的同步措施保障线程安全,所以在put方法中先将Lock锁上,然后,在while的条件里检测queue是不是已经满了,如果已经满了,则调用notFull的await()阻塞生产者线程并释放Lock,如果没有满,则往队列放入数据并利用notEmpty.signalAll()通知正在等待的所有消费者并唤醒它们。最后在finally中利用lock.unlock()方法解锁,把unlock方法放在finally中是一个基本原则,否则可能会产生无法释放锁的情况。

下面再来看 take 方法,take 方法实际上是与 put 方法相互对应的,同样是通过 while 检查队列是否为 空,如果为空,消费者开始等待,如果不为空则从队列中获取数据并通知生产者队列有空余位置,最后 在 finally 中解锁。

这里需要注意,我们在 take() 方法中使用 while( queue.size() == 0 ) 检查队列状态,而不能用 if( queue.size() == 0 )。

为什么呢?因为生产者消费者往往是多线程的,我们假设有两个消费者,第一个消费者线程获取数据时,发现队列为空,便进入等待状态;因为第一个线程在等待时会释放 Lock 锁,所以第二个消费者可以进入并执行 if( queue.size() == 0 ),也发现队列为空,于是第二个线程也进入等待;而此时,如果生产者生产了一个数据,便会唤醒两个消费者线程,而两个线程中只有一个线程可以拿到锁,并执行 queue.remove 操作,另外一个线程因为没有拿到锁而卡在被唤醒的地方,而第一个线程执行完操作后会在 finally 中通过 unlock 解锁,而此时第二个线程便可以拿到被第一个线程释放的锁,继续执行操作,也会去调用 queue.remove 操作,然而这个时候队列已经为空了,所以会抛出 NoSuchElementException 异常,这不符合我们的逻辑。而如果用 while 做检查,当第一个消费者被唤醒得到锁并移除数据之后,第二个线程在执行 remove 前仍会进行 while 检查,发现此时依然满足 queue.size() == 0 的条件,就会继续执行 await 方法,避免了获取的数据为 null 或抛出异常的情况。

如何用 wait/notify 实现?

最后我们再来看看使用 wait/notify 实现生产者消费者模式的方法,实际上实现原理和Condition 是非常类似的,它们是兄弟关系:

import java.util.LinkedList;

class MyBlockingQueue {
    private int maxSize;
    private LinkedList<Object> storage;

    public MyBlockingQueue(int size) {
        this.maxSize = size;
        storage = new LinkedList<>();
    }

    public synchronized void put() throws InterruptedException {
        while (storage.size() == maxSize) {
            wait();
        }
        storage.add(new Object());
        notifyAll();
    }

    public synchronized void take() throws InterruptedException {
        while (storage.size() == 0) {
            wait();
        }
        System.out.println(storage.remove());
        notifyAll();
    }
}

如代码所示,最主要的部分仍是 take 与 put 方法,我们先来看 put 方法,put 方法被 synchronized 保护,while 检查队列是否为满,如果不满就往里放入数据并通过 notifyAll() 唤醒其他线程。同样,take 方法也被 synchronized 修饰,while 检查队列是否为空,如果不为空就获取数据并唤醒其他线程。

总结

第一种 BlockingQueue 模式实现比较简单,但其背后的实现原理在第二种、第三种实现方法中得以体现,第二种、第三种实现方法本质上是我们自己实现了 BlockingQueue 的一些核心逻辑,供生产者与消费者使用。


http://www.kler.cn/a/530364.html

相关文章:

  • 如何使用 DeepSeek 和 Dexscreener 构建免费的 AI 加密交易机器人?
  • 强化学习、深度学习、深度强化学习的区别是什么?
  • 海外问卷调查之渠道查,企业经营的指南针
  • Elasticsearch的索引生命周期管理
  • SpringBoot 整合 SpringMVC:SpringMVC的注解管理
  • 【llm对话系统】大模型 Llama 源码分析之归一化方法 RMS Norm
  • XML DOM 节点树
  • ROS应用之AMCL 多机器人支持
  • Python-基于PyQt5,wordcloud,pillow,numpy,os,sys等的智能词云生成器(最终版)
  • C++编程语言:抽象机制:泛型编程(Bjarne Stroustrup)
  • 汇编语言运行环境搭建及简单使用
  • 沙皮狗为什么禁养?
  • 第39天:WEB攻防-通用漏洞_CSRF_SSRF_协议玩法_内网探针_漏洞利用
  • ubuntu 下使用deepseek
  • C# 装箱和拆箱(以及 as ,is)
  • gitea - fatal: Authentication failed
  • 水质数据监控大屏,保护水资源,共筑绿水青山
  • MySQL不适合创建索引的11种情况
  • Linux mpstat 命令使用详解
  • CodeGPT使用本地部署DeepSeek Coder
  • 菜单映射的工具函数整合
  • 数据结构---线性表
  • Linux网络 | 理解运营商与网段划分、理解NAT技术和分片
  • 开源智慧园区管理系统对比其他十种管理软件的优势与应用前景分析
  • 专业的定制版软件,一键操作,无限使用
  • 在React中使用redux