【JavaEE】多线程之阻塞队列(BlockingQueue)
目录
1.了解阻塞队列
2.生产者消费者模型又是什么?
2.1生产者消费者模型的优点
2.1.1降低服务器与服务器之间耦合度
2.1.2“削峰填谷”平衡消费者和生产的处理能力
3.标准库中的阻塞队列(BlockingQueue)
3.1基于标准库(BlockingQueue)实现一个简单的阻塞队列的应用
4.基于循环队列模拟是实现阻塞队列
4.1 put()方法
4.2 take()方法
4.3put方法和take方法是如何被唤醒
1.了解阻塞队列
阻塞队列是一种特殊的队列. 也遵守 "先进先出" 的原则.
阻塞队列顾名思义就是带有阻塞特性的队列的,它是如何运行的呢?
答: 1.当队列为空时,尝试出队列,就会发生阻塞,直到队列不空为止。
2.当队列为满时,尝试入队列,就会发送阻塞,直到队列不满为止
阻塞队列在 多线程中非常有用,它可以调节生产者和消费者之间的关系,当生产者生产的资源太多,以至于消费者无法完全消费完,那么可以让生产者阻塞,达到生产、消费平衡的作用。
2.生产者消费者模型又是什么?
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取。
它将所有工作分工明确,参考下面这个例子:
有4个人我们一起包饺子,我们每个人都各顾各的,每个人都得做擀面皮和包饺子,如果这样的话,每个人的包饺子的效率都不是很高。若我们进行分工,1个人负责擀面皮,3个人负责包饺子,擀面皮的不管包饺子的是如何包,包饺子的不管擀面皮的是如何擀的。在第二种方式中擀面皮的就可以看作生产者,包饺子的就可以看作消费者。
2.1生产者消费者模型的优点
2.1.1降低服务器与服务器之间耦合度
看如下解读:开始时A服务器和B服务器通过阻塞对象进行交互,若B服务器挂了,那么我们可以迅速的做出反应,参考阻塞队列进行重新加载B服务器。还有就是若A服务器业务服务器,B服务器是实现业务的服务区,因为业务服务器需要时常更新代码,因此可以阻塞队列相当于一个缓存区。
2.1.2“削峰填谷”平衡消费者和生产的处理能力
比如在 "秒杀" (购买物品)场景下, 服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求,服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程). 这个时候就可以把这些请求都放到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求.在这里就起到了削峰的效果。
“ 削峰填谷”我们可以联想到三峡大坝:
三峡可谓是“功在当代,利在千秋”,为何这样说?三峡大坝它可不仅仅只是发电用的,它还有其他的功能。当到达雨季时,三峡上游的水向下流的会比往常多,这时开始关闸蓄水,防止下游发生洪灾,这就是“削峰”,过了雨季,下游需要灌溉庄稼,但是下游水又少了,这时打开闸门进行放水,造福下游,这就是“填谷”。
在购物系统中,我们就可以将秒杀时突然猛增的用户请求比喻成雨季的雨水,我们通过阻塞队列进行蓄洪,当在平常时,没那么多的用户请求时,我们就是慢慢的解决用户的请求比喻成开闸放水。
3.标准库中的阻塞队列(BlockingQueue)
BlockingQueue的成员:
- ArrayBlockingQueue:基于数组的阻塞队列实现
- LinkedBlockingQueue:基于链表的阻塞队列
- PriorityBlockingQueue:基于优先队列的阻塞队列
- BlockingQueue:这个是实现阻塞队列的接口,继承与Queue
常用方法介绍:
方法 | 说明 |
boolean add(E e) | 将指定的元素插入到此队列中,如果可以立即执行此操作而不违反容量限制, 如果当前没有可用空间,则抛出IllegalStateException |
void put(E e) | 将指定的元素插入到此队列中,等待空格可用。 |
E take() | 检索并删除此队列的头,如有必要,等待元素可用。 |
boolean offer(E e) | 将指定的元素插入到此队列中,如果可以立即执行此操作,而不会违 反容量限制, true在成功时 false如果当前没有可用空间,则返回false。 |
boolean offer (E e, long timeout, TimeUnit unit) | 将指定的元素插入到此队列中,等待指定的等待时间(如有必要)才能使空间变 得可用。 |
E poll (long timeout, TimeUnit unit) | 检索并删除此队列的头,等待指定的等待时间(如有必要)使元素变为可用。 |
注意:在这个些方法中,只有put()方法和take()方法带有阻塞特性,其他的方法不具备阻塞特性。
3.1基于标准库(BlockingQueue)实现一个简单的阻塞队列的应用
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class testDemo1 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue=new LinkedBlockingDeque<>();
//生产者
Thread producer=new Thread(()->{
int i=0;
while (true) {
try {
System.out.println("生产:"+i);
queue.put(i);
i++;
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
//消费者
Thread customer=new Thread(()-> {
while (true) {
try {
int value=queue.take();
System.out.println("消费:"+value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
customer.start();
}
}
代码运行结果:
4.基于循环队列模拟是实现阻塞队列
这此次模拟实现中,仅模拟了put()方法,和take()方法。
若有对循环队列不了解的话,可以参考下方链接了解循环队列:【数据结构趣味多】循环队列https://blog.csdn.net/qq_65228171/article/details/128613550?spm=1001.2014.3001.5501
代码实现如下:
public class MyBlockingDeque {
private int[] items=new int[1000];
//头指针
volatile private int head;
//尾指针
volatile private int tail;
//队列中元素数量
volatile private int size;
synchronized public void put(int value) throws InterruptedException {
//如果队列已满,阻塞等待
while (size == items.length) {
this.wait();
}
items[tail]=value;
tail++;
//如果队列满了,从头再来(循坏队列思想)
if(tail==items.length) {
tail=0;
}
size++;
//唤醒take
this.notify();
}
synchronized public int take() throws InterruptedException {
while (size==0){
//等待放入元素,等待put唤醒
this.wait();
}
int value=items[head];
head++;
//如果头到了队尾,重新加载
if(head == items.length) {
head=0;
}
size--;
this.notify();
return value;
}
}
4.1 put()方法
put 方法是向队列中存放数据,因此第一步我们需要判断队列满没满,若队列满了的话,就让队列通过wait方法进入阻塞状态,等待唤醒。若队列没有满,就将数据放入到尾指针所指位置,让尾指针向后移动一位;当尾指针达到数组的长度时,我们就需要重置尾指针了。
synchronized public void put(int value) throws InterruptedException {
//如果队列已满,阻塞等待
while (size == items.length) {
this.wait();
}
items[tail]=value;
tail++;
//如果队列满了,从头再来(循坏队列思想)
if(tail==items.length) {
tail=0;
}
size++;
//唤醒take
this.notify();
}
4.2 take()方法
take 方法是在队列中拿取数据,有两种情况,若队列空,发生阻塞,直到put进来数据,若不为空,拿到数据,头指针向后走,如果头指针达到数组长度,重置头指针,最后返回value。
synchronized public int take() throws InterruptedException {
while (size==0){
//等待放入元素,等待put唤醒
this.wait();
}
int value=items[head];
head++;
//如果头到了队尾,重新加载
if(head == items.length) {
head=0;
}
size--;
//唤醒put
this.notify();
return value;
}
4.3put方法和take方法是如何被唤醒的
1.当存入数据时,队列为满的时候,队列就会通过wait方法使线程进入阻塞态,它什么时候被唤醒呢?队列被拿走一个数据时,被唤醒,这个唤醒是在take中唤醒的,take拿走一个数据队列不为满,唤醒put。
2. 当拿去数据时,队列为空的时候,队列会通过while判断,让线程进入阻塞态,当有新的数据被放入时,take就会被唤醒。
写在最后:
以上就是本文全部内容,如果对你有所帮助,希望能留下你的点赞+关注,我会更加努力的更新内容!非常感谢🧡🧡🧡
若本篇文章有错误的地方,欢迎大佬们指正!