当前位置: 首页 > article >正文

【JavaEE初阶 — 多线程】生产消费模型 阻塞队列

   c96f743646e841f8bb30b2d242197f2f.gif

ddb5ae16fc92401ea95b48766cb03d96.jpeg692a78aa0ec843629a817408c97a8b84.gif


    1. 阻塞队列     


   (1) 阻塞队列   


  1. 概念    


阻塞队列是一种特殊的队列,也遵守"先进先出"的原则;
阻塞队列能是一种线程安全的数据结构,主要用来阻塞队列的插入和获取操作:

  • 当队列满了的时候,插入操作会被阻塞,直到队列有空位。
  • 当队列为空的时候,获取操作会被阻塞,直到队列有值。

常用在实现生产者和消费者场景,在笔试题中比较常见。


   2. 如何创建一个阻塞队列      


  • 阻塞队列 BlockingQueue 继承 Queue;

  • 在 Java 标准库中内置了阻塞队列,如果我们需要在一些程序中使用阻塞队列,直接使用标准库中的即可.
  • BlockingQueue是一个 interface 接口,真正实现的类是LinkedBlockingQueue.


   3 查看 put() 和 take() 的阻塞效果  


  • 入队列,出队列操作,对比 queue 的 offer() 和 poll(),阻塞队列更常用的是put(),take();
  • put(),take() 具有阻塞功能;


    (1) 查看 take() 的阻塞功能     
  • 如果队列满了,使用 offer() 会报错,put() 则不会;
  • 因为使用 put() 有阻塞效果,等到别的线程调用 take() 出一个满队列的元素后,put() 的阻塞状态才会结束;

  • 如果没有 put(),直接 take(),此时 take() 的就是一个空队列;
  • 再次执行程序,执行结果什么也没有打印,说明是程序执行 take() 时被阻塞了;

我们通过jconsole来查看当前线程状态:

当前线程状态是waiting,说明该线程进入了没有时间的等待。


   (2) 查看 put() 的阻塞功能     

因为 LinkedBlockingQueue 是基于链表实现的阻塞队列,是可以选择无界(不传 capacity)和有界(传 capacity)的; 

如果不传 capacity,就没那么好验证 put() 和 take() 的阻塞功能,并且在实际开发中,一般建议大家设置 capacity ,避免因为队列过大,导致内存被耗尽,产生“内存超出范围”这样的异常;

所以我们给  LinkedBlockingQueue 传参,并写出如下代码,来验证 put() 的阻塞功能:

运行程序

 从当前执行结果来看,通过打印日志可以发现,程序被阻塞在了最后一个put(),没有打印后面的日志;可以再次 jconsole 查看线程状态:


   总结: 

  • take() 和 put() 是Java标准库中,对于阻塞队列,带有阻塞功能的API;通过上述例子,我们已经初步演示了 take() 和 put() 的阻塞功能;
  • 值得一提,BlockingQueue也有offer, poll, peek等方法,但是这些方法不带有阻塞特性.

    2. 生产消费模型   


   (1) 生产消费模型的概念   


  • 生产者消费者模式是是一种典型的编码技巧,通过一个容器,来解决生产者和消费者的强耦合问题。
  • 生产者和消费者不会直接进行数据交互(通讯),而是通过阻塞队列进行数据交互(通讯);
  • 生产者生产完数据之后,不用等待消费者处理,而是直接扔给阻塞队列,消费者也不会找生产者要数据,而是直接从阻塞队列里取数据。

     (2) 阻塞队列运用场景    


  • 上游服务器A:入口服务器,干的活更简单,单个请求消耗的资源数少;
  • 下游服务器B:通常承担更重的任务量,复杂的计算/存储工作,单个请求消耗的资源数更多;
  • 队列服务器:主要用于存储与转发,针对单个请求,做的事少,可以抗很多的请求量;

生产消费模型的优点:


     解耦合,降低代码耦合度    

  1. A B两个服务器,如果直接进行数据交互,后续单独对A或者B的数据进行修改,大概率就会影响到另一个服务器的正常运转。
  2. 使用阻塞队列作为交易平台,在修改服务器的数据时,由于阻塞队列中的结构固定,两个服务器之间的耦合度就降低,因此能节约后期修改代码的成本

  • 削峰削谷:

  1. 在服务器中,波峰就是请求量高的时候,波谷就是请求量低的时候;
  2. 如果 A 和 B 直接进行交互,当上游服务器A经历波峰,将大量请求传给下游服务器B 的时候,B就有可能挂掉;
  3. 通过队列进行交互,A处理的请求的数量和速度,和B处理的请求的数量和速度就不一样了A处理请求的数量和速度,取决于外面用户的访问量,而B服务器则根据自身来决定;

  4. 哪怕A已经是惊涛骇浪了,B依然波澜不惊,根据自己能够承担的处理请求速度,慢条斯理地消费阻塞队列中的数据;

  5. 运用生产消费模型,即使遇到突发的流量峰值,B服务器也不会轻易挂掉。

  6. 由于流量峰值一般是突发的,时间也比较短;趁着峰值过去了后,B会利用波谷的时间,消费之前波峰积压的数据;


   (3) 生产消费模型代码演示    


    生产消费模型初步演示    


   创建阻塞队列   


   生产者线程   

   消费者线程   


    上述代码逻辑    

  • producer 会因为 while(true),而不断生产数字n,并且 n 随着循环次数增加而增加,并且会不断把生产的数字通过 put(n),入队列;
  • consumer 也会因为 while(true),不断通过 take(),从队列中取出 producer 生产的数字 n,并且打印当前 n 的值;

程序运行结果

    结论     


  • 我们通过打印日志,可以发现,两个线程的执行速度旗鼓相当,生产者线程的生产速度很快,消费也很快,并没有阻塞效果;
  • 上述情况在开发中是一个典型的情况,虽然生产消费模型也会产生阻塞,但是只要我们协调好生产和消费的速度,两个线程的速度相差不大,那么程序都会一直高效地运行

   查看模型阻塞效果    


上述的 producer 和 consumer 两个线程的速度旗鼓相当,直接运行,很难观察到阻塞效果;

我们可以手动协调生产者线程和消费者线程入队列和出队列的速度,来查看阻塞队列产生的阻塞效果:


   观察队列为空的阻塞效果    

我们让 producer 在每次生产元素时,休眠 1 秒,而 consumer 保持原来的消费速度;

阻塞队列中的元素被消耗的速度远远大于生产的速度,进而阻塞队列对 consumer 产生阻塞效果 

   程序运行结果   

通过降低 producer 调用 put() 的速度,进而让阻塞队列对 consumer 的 take() 产生阻塞效果;


    观察队列为满的阻塞效果    

上述是通过降低 producer 的生产速度,来查看阻塞队列对消费者的阻塞效果;

现在,我们来降低 consumer 的消费速度,来观察阻塞队列对 producer 的阻塞效果:

    程序运行结果    


  • 在减低 consumer 的消费速度后,程序会在执行后的一秒,从只调度 producer ,到 producer 和 consumer 并发调度,并且程序只在执行的第一秒,producer 就把阻塞队列填满了;
  • 之后因为队列满了,所以只能 consumer 消费一个元素,生产者才能再生产一个元素;

所以这个案例,是通过减低 consumer 的调用 take(),来让阻塞队列对 producer 调用 put() 的操作产生阻塞效果。


    3. 阻塞队列的模拟实现   


   (1) 实现原理     


 通过基于数组,实现循环队列,在循环队列的基础上,实现阻塞功能:


   (2) 模拟实现     


    1. 基于数组,先模拟实现一个普通队列    


上面不小心把 MyBlockingQueue 设置成内部类了,修改一下:

   2. 成员变量    


通过定义 front 和 tail,来记录队守和队尾在队列中的相关信息,确定入队列和出队列在队列中的相对位置,并且定义 size ,记录队列中的元素个数;

    3. 循环队列原理    

  • tail 指针指向要 put() 元素的空间,front 指针指向要 take 的区间;
  • tail 指针指向一个空的下标元素后,会对这个空间 put() 一个元素,然后 tail++; 
  • front 指针指向一个的下标元素后,会 take() 这个空间的元素,然后 front++; 
  • 如果 front 和 tail 大于 data.length,则令这两个指针重新置为0;

  • 当 front 和 tail 指向同一个元素时,要么队列为空,要么队列为满,put 和 take 两个方法一定会有一个被阻塞;
  • 所以不用担心一个空间的元素被连续 put()两次 (入队列的 elem 还没出队列就被新的elem 覆盖),或者连续 take() (elem已经出队列,还对空的空间进行出队列操作) 两次;
  • 一定是 take() 和 put() 两个反复穿插操作同一个空间。

   4. 实现 put() 方法   


  • 如果队列满了,进入阻塞状态,否则将 tail 指向的数组空间置为要插入的元素 elem;
  • 如果 tail 走出 data 范围,把 tail 重新置为0;
  • 每 put 一个元素,元素个数 size++;


   5. 实现 take() 方法   


  • 如果队列为空,进入阻塞状态,否则接收 front 指向的数组元素并返回;
  • 返回元素后,后续该元素会被 tail 指针覆盖;
  • 如果 front 走出 data 范围,把 front 重新置为0;
  • 每 take 一个元素,元素个数 size--;


   6. 完善 put() 和 take()   


      (1) 确保线程安全     

在多线程调用 put() 和 take() 的情况下,put() 和 take() 的操作并不是原子的,并且都包含了多步写操作,为了保证线程安全,我们需要对两个线程进行上锁:

哪个线程调用 put() 或者 take(),就对哪个线程上锁,确保该线程完整地执行完  put() 或者 take();


    (2) 完善阻塞功能     

  • 阻塞队列只会针对 put() 或者 take() 其中一个方法进行阻塞,如果执行其中一个方法的线程进入阻塞,只能通过其他线程调用另一个方法来唤醒;

  • 一个线程 t 在执行take() 时,因为队列为空,执行wait方法,陷入阻塞状态,只能等执行put() 的线程来唤醒;
  • 如果此时没有执行 put() 线程,但是有别的线程调用 t . interrupt(),从而结束正在 wait() 的 t;

  • 如果出现这种情况,我们当前这个代码,是直接向外抛异常 InterruptedException;但是如果我们是通过 try catch 来捕获,并且解决异常的情况又有不同:

  • 如果我们是通过 throw 抛出 wait() 相应的异常 ,那么 wait() 如果被 interrupt() 打断等待状态,就会抛出异常;
  • 如果是通过 try catch 来处理 wait() 相应的异常,这样的操作会捕获并且处理 wait() 被 interrupt() 打断而抛出的异常;
  • 我们的预期结果,就是让 wait() 被打断而不是被唤醒的时候抛出异常,但是 try catch 把异常给捕获了,因此 wait() 在被打断就不会抛出异常了并且不管队列是否为空或者满,直接打破阻塞状态而执行下面的逻辑;

标准库建议:

  • 如果要通过特定的条件,来使当前线程进入等待状态,那么建议这个特点的条件的判断操作,应该放在 while 循环中,而不是放在 if 中;
  • if 只会判断一次,在判断一次成功执行 wait() 后,可能 wait() 被打破不是因为不满足 if 中的条件,而是被别的线程调用 interrupt() 类似的方法,而非法终止 wait() ;
  • 通过在 wait() 外面嵌套一层 while ,而不是嵌套 if,可以避免 wait() 非法唤醒;


     7. 模拟生产消费模型     


生产速度等于消费速度,producer 和 consumer  都不会被阻塞;

生产速度小于消费速度,阻塞 consummer 

生产速度大于消费速度,阻塞 producer


     8. 模拟实现阻塞队列完整代码    

package Thread;

class MyBlockingQueue {

    private String[] data = null;

    private int front = 0; //队首

    private int tail = 0; //队尾

    private int size = 0; //元素个数

    public MyBlockingQueue(int capacity) {
        data = new String[capacity];
    }

    public void put(String elem) throws InterruptedException {
        synchronized (this) {
            while (size == data.length) {
                //队列满了,需要阻塞
                this.wait();
            }
                data[tail] = elem;
                tail++;
                if (tail >= data.length) tail = 0;
                size++;
                this.notify();

        }
    }

    public String take() throws InterruptedException {
        synchronized (this) {
            while (size == 0) {
                //队列为空,需要阻塞
                this.wait();
            }
            String ret = data[front];
            front++;
            if (front >= data.length) front = 0;
            size--;
            this.notify();
            return ret;
        }
    }
}

public class Demo26 {

    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue(1000);
        Thread producer = new Thread(() -> {
           int n = 0;
           while (true){
               try {
                   queue.put(n + "");
                   System.out.println("生产元素" + n);
                   n++;
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               }
           }
        },"producer");
        Thread consumer = new Thread(() -> {
            String ret = null;
            while (true){
                try {
                    ret = queue.take();
                    System.out.println("消费元素" + ret);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
           }
        },"consumer");
        producer.start();
        consumer.start();
    }
}

   4. 常见的阻塞队列   



   1. ArrayBlockingQueue   


  • 一个有界队列,底层基于数组实现
  • 需要在初始化时指定队列的大小,队列满时,生产者会被阻塞,队列空时,消费者会被阻塞。

   2. LinkedBlockingQueue   


  • 基于链表的阻塞队列,允许可选的界限(有界或无界)

  • 无界模式下可以不断添加元素,直到耗尽系统资源。
  • 有界模式则类似于ArrayBlockingQueue,但吞吐量通常较高。

   3. PriorityBlockingQueue   


  • 一个无界的优先级队列,元素按照自然顺序,或者指定的比较器顺序进行排序。
  • 与其他阻塞队列不同的是,PriorityBlockingQueue不保证元素的FIFO 顺序。

   4. DelayQueue    


  • 一个无界队列,队列中的元素必须实现 Delayed 接口,只有当元素的延迟时间到期时,才能被取出。常用于延迟任务调度。

   5. SynchronousQueue   


  • 一个没有内部容量的队列,每个插入操作必须等待对应的移除操作,反之亦然。常用于在线程之间的直接传递任务,而不是存储任务

   6. LinkedTransferQueue   


  • LinkedTransferQueue,相对于其他阻塞队列,从名字来看它有 Transfer 功能,其实也不是什么神奇功能;
  • 一般阻塞队列都是将元素入队,然后消费者从队列中获取元素。LinkedTransferQueue 的transfer 是元素入队的时候看看是否已经有消费者在等了,如果有在等了直接给消费者即可,所以就是这里少了一层,没有锁操作。

   5. 不同阻塞队列的使用场景   


     ArrayBlockingQueue 和 LinkedBlockingQueue 使用场景   


  • ArrayBlockingQueue 和 LinkedBlockingQueue 常用于典型的生产者-消费者场景。
  • 例如:任务处理系统中,生产者生成任务,消费者从队列中取出任务并执行。

    ArrayList 和 LinkedList 的区别    


   1. 底层数据结构不同   


  • ArrayList:基于动态数组实现,元素在内存中连续存储。
  • LinkedList:基于双向链表实现,元素通过节点链接,内存中不需要连续存储。

   2. 性能区别    


   (1)  ArrayList: 

  • 随机访问速度快,查找元素的时间复杂度为0(1)。
  • 插入和删除操作慢,尤其是在中间插入或删除时,时间复杂度为0(n),因为需要移动后续元素。

   (2) LinkedList: 

  • 随机访问速度慢,查找元素的时间复杂度为0(n)。
  • 插入和删除操作快,尤其是在头尾插入或删除时,时间复杂度为0(1)。

     PriorityBlockingQueue 使用场景    


PriorityBlockingQueue 更适合处理带有优先级的任务场景;

例如:任务调度系统。


   DelayQueue 使用场景    


  • DelayQueue 适用于需要延迟处理的任务;
  • 例如:缓存失效策略、定时任务调度等。

   SynchronousQueue 使用场景    


  • SynchronousQueue 适合在线程间直接传递数据,而不希望数据被存储在队列中。
  • 例如:ThreadPoolExecutor 的直接交接模式中使用 SynchronousQueue 来传递任务。

   ArrayBlockingQueue 和 LinkedBlockingQueue 区别   


 ArrayBlockingQueue 和 LinkedBlockingQueue,分别是基于 数组链表 的有界阻塞队列。

两者原理都是基于 ReentrantLock Condition

ArrayBlockingQueue 基于数组,内部实现只用了一把锁,可以指定公平或者非公平锁

LinkedBlockingQueue 基于链表,内部实现用了两把锁,take 一把、put一把,所以入队和出队这两个操作是可以并行的,从这里看并发度应该比 ArrayBlockingQueue 高。


   c96f743646e841f8bb30b2d242197f2f.gif

692a78aa0ec843629a817408c97a8b84.gif


http://www.kler.cn/a/394212.html

相关文章:

  • opc da 服务器数据 转 opc ua项目案例
  • 数据结构之线性表之顺序表
  • 分布式协同 - 分布式事务_2PC 3PC解决方案
  • Y3地图制作1:水果缤纷乐、密室逃脱
  • 在vscode的ESP-IDF中使用自定义组件
  • stm32定时器输出比较----驱动步进电机
  • 基于Java的企业资产管理系统
  • Springboot 日志处理(非常详细)
  • 从opencv-python入门opencv--图像处理之图像滤波
  • golang HTTP基础
  • 【计网】实现reactor反应堆模型 --- 多线程方案优化 ,OTOL方案
  • C++算法练习-day39——654.最大二叉树
  • flutter下拉刷新上拉加载的简单实现方式三
  • 实习冲刺第二十一天
  • 手机怎么玩steam游戏?随时随地远程串流玩steam游戏教程
  • 【JavaWeb】JavaWeb入门之XML详解
  • 【MATLAB】目标检测初探
  • eBay接受支付宝付款!卖家如何接住新流量?
  • Node.js版本管理工具nvm使用教程
  • Scala的Array
  • 2.6 以太网扩展技术
  • 实验6记录网络与故障排除
  • 大模型时代,呼叫中心的呼入机器人系统如何建设?
  • 【蓝牙协议栈】【BLE】【IAS】蓝牙立即警报服务
  • Flink Job更新和恢复
  • 生产模式打包