【JavaEE初阶 — 多线程】生产消费模型 阻塞队列
1. 阻塞队列
(1) 阻塞队列
1. 概念
阻塞队列是一种特殊的队列,也遵守"先进先出"的原则;
阻塞队列能是一种线程安全的数据结构,主要用来阻塞队列的插入和获取操作:
- 当队列满了的时候,插入操作会被阻塞,直到队列有空位。
- 当队列为空的时候,获取操作会被阻塞,直到队列有值。
常用在实现生产者和消费者场景,在笔试题中比较常见。
2. 如何创建一个阻塞队列
- 阻塞队列 BlockingQueue 继承 Queue;
- 在 Java 标准库中内置了阻塞队列,如果我们需要在一些程序中使用阻塞队列,直接使用标准库中的即可.
- BlockingQueue是一个 interface 接口,真正实现的类是LinkedBlockingQueue.
3 查看 put() 和 take() 的阻塞效果
- 入队列,出队列操作,对比 queue 的 offer() 和 poll(),阻塞队列更常用的是put(),take();
- put(),take() 具有阻塞功能;
(1) 查看 take() 的阻塞功能
- 如果队列满了,使用 offer() 会报错,put() 则不会;
- 因为使用 put() 有阻塞效果,等到别的线程调用 take() 出一个满队列的元素后,put() 的阻塞状态才会结束;
- 如果没有 put(),直接 take(),此时 take() 的就是一个空队列;
- 再次执行程序,执行结果什么也没有打印,说明是程序执行 take() 时被阻塞了;
我们通过jconsole来查看当前线程状态:
当前线程状态是waiting,说明该线程进入了没有时间的等待。
(2) 查看 put() 的阻塞功能
因为 LinkedBlockingQueue 是基于链表实现的阻塞队列,是可以选择无界(不传 capacity)和有界(传 capacity)的;
如果不传 capacity,就没那么好验证 put() 和 take() 的阻塞功能,并且在实际开发中,一般建议大家设置 capacity ,避免因为队列过大,导致内存被耗尽,产生“内存超出范围”这样的异常;
所以我们给 LinkedBlockingQueue 传参,并写出如下代码,来验证 put() 的阻塞功能:
运行程序
从当前执行结果来看,通过打印日志可以发现,程序被阻塞在了最后一个put(),没有打印后面的日志;可以再次 jconsole 查看线程状态:
总结:
- take() 和 put() 是Java标准库中,对于阻塞队列,带有阻塞功能的API;通过上述例子,我们已经初步演示了 take() 和 put() 的阻塞功能;
- 值得一提,BlockingQueue也有offer, poll, peek等方法,但是这些方法不带有阻塞特性.
2. 生产消费模型
(1) 生产消费模型的概念
- 生产者消费者模式是是一种典型的编码技巧,通过一个容器,来解决生产者和消费者的强耦合问题。
- 生产者和消费者不会直接进行数据交互(通讯),而是通过阻塞队列进行数据交互(通讯);
- 生产者生产完数据之后,不用等待消费者处理,而是直接扔给阻塞队列,消费者也不会找生产者要数据,而是直接从阻塞队列里取数据。
(2) 阻塞队列运用场景
- 上游服务器A:入口服务器,干的活更简单,单个请求消耗的资源数少;
- 下游服务器B:通常承担更重的任务量,复杂的计算/存储工作,单个请求消耗的资源数更多;
- 队列服务器:主要用于存储与转发,针对单个请求,做的事少,可以抗很多的请求量;
生产消费模型的优点:
解耦合,降低代码耦合度
- A B两个服务器,如果直接进行数据交互,后续单独对A或者B的数据进行修改,大概率就会影响到另一个服务器的正常运转。
- 使用阻塞队列作为交易平台,在修改服务器的数据时,由于阻塞队列中的结构固定,两个服务器之间的耦合度就降低,因此能节约后期修改代码的成本;
- 削峰削谷:
- 在服务器中,波峰就是请求量高的时候,波谷就是请求量低的时候;
- 如果 A 和 B 直接进行交互,当上游服务器A经历波峰,将大量请求传给下游服务器B 的时候,B就有可能挂掉;
通过队列进行交互,A处理的请求的数量和速度,和B处理的请求的数量和速度就不一样了;A处理请求的数量和速度,取决于外面用户的访问量,而B服务器则根据自身来决定;
哪怕A已经是惊涛骇浪了,B依然波澜不惊,根据自己能够承担的处理请求速度,慢条斯理地消费阻塞队列中的数据;
运用生产消费模型,即使遇到突发的流量峰值,B服务器也不会轻易挂掉。
由于流量峰值一般是突发的,时间也比较短;趁着峰值过去了后,B会利用波谷的时间,消费之前波峰积压的数据;
(3) 生产消费模型代码演示
生产消费模型初步演示
创建阻塞队列
生产者线程
消费者线程
上述代码逻辑
- producer 会因为 while(true),而不断生产数字n,并且 n 随着循环次数增加而增加,并且会不断把生产的数字通过 put(n),入队列;
- consumer 也会因为 while(true),不断通过 take(),从队列中取出 producer 生产的数字 n,并且打印当前 n 的值;
程序运行结果
结论
- 我们通过打印日志,可以发现,两个线程的执行速度旗鼓相当,生产者线程的生产速度很快,消费也很快,并没有阻塞效果;
- 上述情况在开发中是一个典型的情况,虽然生产消费模型也会产生阻塞,但是只要我们协调好生产和消费的速度,两个线程的速度相差不大,那么程序都会一直高效地运行。
查看模型阻塞效果
上述的 producer 和 consumer 两个线程的速度旗鼓相当,直接运行,很难观察到阻塞效果;
我们可以手动协调生产者线程和消费者线程入队列和出队列的速度,来查看阻塞队列产生的阻塞效果:
观察队列为空的阻塞效果
我们让 producer 在每次生产元素时,休眠 1 秒,而 consumer 保持原来的消费速度;
阻塞队列中的元素被消耗的速度远远大于生产的速度,进而阻塞队列对 consumer 产生阻塞效果
程序运行结果
通过降低 producer 调用 put() 的速度,进而让阻塞队列对 consumer 的 take() 产生阻塞效果;
观察队列为满的阻塞效果
上述是通过降低 producer 的生产速度,来查看阻塞队列对消费者的阻塞效果;
现在,我们来降低 consumer 的消费速度,来观察阻塞队列对 producer 的阻塞效果:
程序运行结果
- 在减低 consumer 的消费速度后,程序会在执行后的一秒,从只调度 producer ,到 producer 和 consumer 并发调度,并且程序只在执行的第一秒,producer 就把阻塞队列填满了;
- 之后因为队列满了,所以只能 consumer 消费一个元素,生产者才能再生产一个元素;
所以这个案例,是通过减低 consumer 的调用 take(),来让阻塞队列对 producer 调用 put() 的操作产生阻塞效果。
3. 阻塞队列的模拟实现
(1) 实现原理
通过基于数组,实现循环队列,在循环队列的基础上,实现阻塞功能:
(2) 模拟实现
1. 基于数组,先模拟实现一个普通队列
上面不小心把 MyBlockingQueue 设置成内部类了,修改一下:
2. 成员变量
通过定义 front 和 tail,来记录队守和队尾在队列中的相关信息,确定入队列和出队列在队列中的相对位置,并且定义 size ,记录队列中的元素个数;
3. 循环队列原理
- tail 指针指向要 put() 元素的空间,front 指针指向要 take 的区间;
- tail 指针指向一个空的下标元素后,会对这个空间 put() 一个元素,然后 tail++;
- front 指针指向一个的下标元素后,会 take() 这个空间的元素,然后 front++;
- 如果 front 和 tail 大于 data.length,则令这两个指针重新置为0;
- 当 front 和 tail 指向同一个元素时,要么队列为空,要么队列为满,put 和 take 两个方法一定会有一个被阻塞;
- 所以不用担心一个空间的元素被连续 put()两次 (入队列的 elem 还没出队列就被新的elem 覆盖),或者连续 take() (elem已经出队列,还对空的空间进行出队列操作) 两次;
- 一定是 take() 和 put() 两个反复穿插操作同一个空间。
4. 实现 put() 方法
- 如果队列满了,进入阻塞状态,否则将 tail 指向的数组空间置为要插入的元素 elem;
- 如果 tail 走出 data 范围,把 tail 重新置为0;
- 每 put 一个元素,元素个数 size++;
5. 实现 take() 方法
- 如果队列为空,进入阻塞状态,否则接收 front 指向的数组元素并返回;
- 返回元素后,后续该元素会被 tail 指针覆盖;
- 如果 front 走出 data 范围,把 front 重新置为0;
- 每 take 一个元素,元素个数 size--;
6. 完善 put() 和 take()
(1) 确保线程安全
在多线程调用 put() 和 take() 的情况下,put() 和 take() 的操作并不是原子的,并且都包含了多步写操作,为了保证线程安全,我们需要对两个线程进行上锁:
哪个线程调用 put() 或者 take(),就对哪个线程上锁,确保该线程完整地执行完 put() 或者 take();
(2) 完善阻塞功能
- 阻塞队列只会针对 put() 或者 take() 其中一个方法进行阻塞,如果执行其中一个方法的线程进入阻塞,只能通过其他线程调用另一个方法来唤醒;
- 一个线程 t 在执行take() 时,因为队列为空,执行wait方法,陷入阻塞状态,只能等执行put() 的线程来唤醒;
- 如果此时没有执行 put() 线程,但是有别的线程调用 t . interrupt(),从而结束正在 wait() 的 t;
- 如果出现这种情况,我们当前这个代码,是直接向外抛异常 InterruptedException;但是如果我们是通过 try catch 来捕获,并且解决异常的情况又有不同:
- 如果我们是通过 throw 抛出 wait() 相应的异常 ,那么 wait() 如果被 interrupt() 打断等待状态,就会抛出异常;
- 如果是通过 try catch 来处理 wait() 相应的异常,这样的操作会捕获并且处理 wait() 被 interrupt() 打断而抛出的异常;
- 我们的预期结果,就是让 wait() 被打断而不是被唤醒的时候抛出异常,但是 try catch 把异常给捕获了,因此 wait() 在被打断就不会抛出异常了;并且不管队列是否为空或者满,直接打破阻塞状态而执行下面的逻辑;
标准库建议:
- 如果要通过特定的条件,来使当前线程进入等待状态,那么建议这个特点的条件的判断操作,应该放在 while 循环中,而不是放在 if 中;
- if 只会判断一次,在判断一次成功执行 wait() 后,可能 wait() 被打破不是因为不满足 if 中的条件,而是被别的线程调用 interrupt() 类似的方法,而非法终止 wait() ;
- 通过在 wait() 外面嵌套一层 while ,而不是嵌套 if,可以避免 wait() 非法唤醒;
7. 模拟生产消费模型
生产速度等于消费速度,producer 和 consumer 都不会被阻塞;
生产速度小于消费速度,阻塞 consummer
生产速度大于消费速度,阻塞 producer
8. 模拟实现阻塞队列完整代码
package Thread;
class MyBlockingQueue {
private String[] data = null;
private int front = 0; //队首
private int tail = 0; //队尾
private int size = 0; //元素个数
public MyBlockingQueue(int capacity) {
data = new String[capacity];
}
public void put(String elem) throws InterruptedException {
synchronized (this) {
while (size == data.length) {
//队列满了,需要阻塞
this.wait();
}
data[tail] = elem;
tail++;
if (tail >= data.length) tail = 0;
size++;
this.notify();
}
}
public String take() throws InterruptedException {
synchronized (this) {
while (size == 0) {
//队列为空,需要阻塞
this.wait();
}
String ret = data[front];
front++;
if (front >= data.length) front = 0;
size--;
this.notify();
return ret;
}
}
}
public class Demo26 {
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue(1000);
Thread producer = new Thread(() -> {
int n = 0;
while (true){
try {
queue.put(n + "");
System.out.println("生产元素" + n);
n++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"producer");
Thread consumer = new Thread(() -> {
String ret = null;
while (true){
try {
ret = queue.take();
System.out.println("消费元素" + ret);
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"consumer");
producer.start();
consumer.start();
}
}
4. 常见的阻塞队列
1. ArrayBlockingQueue
- 一个有界队列,底层基于数组实现。
- 需要在初始化时指定队列的大小,队列满时,生产者会被阻塞,队列空时,消费者会被阻塞。
2. LinkedBlockingQueue
- 基于链表的阻塞队列,允许可选的界限(有界或无界)。
- 无界模式下可以不断添加元素,直到耗尽系统资源。
- 有界模式则类似于ArrayBlockingQueue,但吞吐量通常较高。
3. PriorityBlockingQueue
- 一个无界的优先级队列,元素按照自然顺序,或者指定的比较器顺序进行排序。
- 与其他阻塞队列不同的是,PriorityBlockingQueue不保证元素的FIFO 顺序。
4. DelayQueue
- 一个无界队列,队列中的元素必须实现 Delayed 接口,只有当元素的延迟时间到期时,才能被取出。常用于延迟任务调度。
5. SynchronousQueue
- 一个没有内部容量的队列,每个插入操作必须等待对应的移除操作,反之亦然。常用于在线程之间的直接传递任务,而不是存储任务。
6. LinkedTransferQueue
- LinkedTransferQueue,相对于其他阻塞队列,从名字来看它有 Transfer 功能,其实也不是什么神奇功能;
- 一般阻塞队列都是将元素入队,然后消费者从队列中获取元素。LinkedTransferQueue 的transfer 是元素入队的时候看看是否已经有消费者在等了,如果有在等了直接给消费者即可,所以就是这里少了一层,没有锁操作。
5. 不同阻塞队列的使用场景
ArrayBlockingQueue 和 LinkedBlockingQueue 使用场景
- ArrayBlockingQueue 和 LinkedBlockingQueue 常用于典型的生产者-消费者场景。
- 例如:任务处理系统中,生产者生成任务,消费者从队列中取出任务并执行。
ArrayList 和 LinkedList 的区别
1. 底层数据结构不同
- ArrayList:基于动态数组实现,元素在内存中连续存储。
- LinkedList:基于双向链表实现,元素通过节点链接,内存中不需要连续存储。
2. 性能区别
(1) ArrayList:
- 随机访问速度快,查找元素的时间复杂度为0(1)。
- 插入和删除操作慢,尤其是在中间插入或删除时,时间复杂度为0(n),因为需要移动后续元素。
(2) LinkedList:
- 随机访问速度慢,查找元素的时间复杂度为0(n)。
- 插入和删除操作快,尤其是在头尾插入或删除时,时间复杂度为0(1)。
PriorityBlockingQueue 使用场景
PriorityBlockingQueue 更适合处理带有优先级的任务场景;
例如:任务调度系统。
DelayQueue 使用场景
- DelayQueue 适用于需要延迟处理的任务;
- 例如:缓存失效策略、定时任务调度等。
SynchronousQueue 使用场景
- SynchronousQueue 适合在线程间直接传递数据,而不希望数据被存储在队列中。
- 例如:ThreadPoolExecutor 的直接交接模式中使用 SynchronousQueue 来传递任务。
ArrayBlockingQueue 和 LinkedBlockingQueue 区别
ArrayBlockingQueue 和 LinkedBlockingQueue,分别是基于 数组 和 链表 的有界阻塞队列。
两者原理都是基于 ReentrantLock 和 Condition ;
ArrayBlockingQueue 基于数组,内部实现只用了一把锁,可以指定公平或者非公平锁;
LinkedBlockingQueue 基于链表,内部实现用了两把锁,take 一把、put一把,所以入队和出队这两个操作是可以并行的,从这里看并发度应该比 ArrayBlockingQueue 高。