【JavaEE】生产者消费者模式
作者主页:paper jie_博客
本文作者:大家好,我是paper jie,感谢你阅读本文,欢迎一建三连哦。
本文于《JavaEE》专栏,本专栏是针对于大学生,编程小白精心打造的。笔者用重金(时间和精力)打造,将MySQL基础知识一网打尽,希望可以帮到读者们哦。
其他专栏:《MySQL》《C语言》《javaSE》《数据结构》等
内容分享:本期将会分享设计模式中的生产者消费者模式
目录
什么是阻塞队列
什么是生产者消费者模式
生产者消费者模式的特点
Java标准库中的阻塞队列
自定义实现一个阻塞队列
普通队列
实现线程安全
正解
实现阻塞
正解
基于自己实现的阻塞队列实现一个简单的生产者消费者模型
什么是阻塞队列
阻塞队列它也是队列,遵守着先进先出的原则.且它是一种线程安全的数据结构.它有两个特性:
1. 当队列满的时候,继续入队就会进行堵塞,直到有其他线程从队列中拿走元素才会解除堵塞.
2. 当队列为空的时候,继续出队就会进行堵塞,直到有其他线程从队列中插入元素后才会解除堵塞.
我们的阻塞队列最经典的使用场景就是生产者消费者模式.
举个栗子:
在我们家中,捏饺子一般有两个步骤: 1. 捏饺子皮, 2.包饺子. 假设有三个人.一个滑稽捏饺子皮,其他两个滑稽包饺子. 滑稽捏好饺子皮厚后就会放到板子上,而其他的滑稽就不用直接去捏饺子皮的滑稽手上拿饺子皮,而是去板子上拿即可.这样他们直接就只需要关注这个板子即可. 而这个板子就充当了我们的阻塞队列.
什么是生产者消费者模式
生产者消费者模式就是基于一个中间容器来解决它们之间的强耦合性问题.而这个中间容器就是阻塞队列.加入阻塞队列后,生产者与消费者之间就不会直接联系,而是通过通过阻塞队列来进行数据传输.这样生产者生产数据就不用知道是谁来处理他的数据,不用等待,直接交给阻塞队列即可.而消费者也不会去找生产者要数据,而是去阻塞队列里拿.
生产者消费者模式的特点
1. 通过阻塞队列可以降低生产者与消费者之间的耦合性
假设有三个服务器ABC,BC是将数据处理到,A是接受它们的数据.如果在没有加入阻塞队列的情况下.可能就会发生: 当B或者C挂了,这可能就会导致A也会挂,它们之间是强耦合的关系.因为B或者C的操作中需要涉及到一些关于A的操作.而A的操作也会涉及到一些关于B或C的操作.
但是当加入阻塞队列后就会发生不一样的结果. C,B处理好的结果只需要放到阻塞对列中,而A也只需要去阻塞队列中拿即可.这样B,C的操作对于A的影响就会很小,从而降低了他们之间的耦合性.
2. 削峰填谷
这里就是加入阻塞队列起到一个平衡生产者与消费者之间的处理能力, 加入阻塞队列就可以防止当生产者一下生产出大量数据,而消费者一时间消费不了而导致挂了的问题.
举个栗子:
假设一个场景: 客户端发出请求,服务A接受请求,然后将请求交给BC处理器进行逻辑处理. 在正常情况下处理器是可以及时处理的.但是在一些特殊的时候会有一些突发峰值.外界客户端的请求非常的多.A接受这些请求一下子全部交给B,C服务器来处理,它们一下子可能就会支撑不住. 因为B,C需要就行逻辑处理业务,需要的资源开销就会比较大.如果一下给它大量的请求进行处理,处理器的资源可能就会超过它的上限而导致机器挂了.
但是在加入阻塞队列后就不用担心这种情况了. 就算客户端有大量的请求,A接受后也是传送给阻塞队列,再由B,C去阻塞队列中拿数据处理来慢慢消化.这就算数据再多,只要A服务器,阻塞队列不挂(阻塞队列和A服务器抗压能力很强,它们只需要进行存储数据和传送数据),BC也可以按正常速度进行处理.
Java标准库中的阻塞队列
在Java标准库中也提供了阻塞队列,如果我们需要使用阻塞队列,只需要使用标准库中的即可. 库中的阻塞队列叫BlockingQueue,它是一个interface接口,它实现的的类有:
ArrayBlockingQueue
LinkedBlockingQueue
priorityBlockingQueue
里面的put和offer方法就是入队方法,但是put是带有阻塞的功能. take也是出队方法,但它也带有阻塞的功能.
public class ThreadDemo1 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
queue.put("aaa");
System.out.println(queue.take());
System.out.println(queue.take());
}
}
自定义实现一个阻塞队列
这里我们准备实现一个基于数组的阻塞队列,也就是环形队列.这里队列里面我们需要一个数组,计数器和两个头尾指针,put和take方法. 这里我们分三步来实现这个阻塞队列.
1) 普通队列
2) 实现线程安全
3) 实现阻塞功能
普通队列
class MyarrayBlockingQueue {
private int elems[] = null;
private int size = 0;
private int head = 0;
private int tail = 0;
public MyarrayBlockingQueue(int capactiy) {
elems = new int[capactiy];
}
public void put(int value) {
//判断队列满没满
if(size == elems.length) {
//阻塞
return;
}
//添加元素
elems[tail] = value;
tail++;
//判断尾指针是不是需要循环到0位置
if(tail == elems.length) {
tail = 0;
}
size++;
}
public int take() {
int elem = 0;
//判断队列为不为空
if(size == 0) {
//阻塞
return elem;
}
//出队
elem = elems[head];
head++;
if(head == elems.length) {
head = 0;
}
size--;
return elem;
}
}
实现线程安全
实现线程安全,我们就需要加锁.但是当我们下面这个代码这样加锁时,就会出现问题.
public void put(int value) {
//判断队列满没满
if(size == elems.length) {
//阻塞
return;
}
synchronized (this) {
//添加元素
elems[tail] = value;
tail++;
//判断尾指针是不是需要循环到0位置
if(tail == elems.length) {
tail = 0;
}
size++;
}
}
public int take() {
int elem = 0;
//判断队列为不为空
if(size == 0) {
//阻塞
return elem;
}
synchronized(this) {
//出队
elem = elems[head];
head++;
if(head == elems.length) {
head = 0;
}
size--;
return elem;
}
}
我们发现如果当有两个线程t1,t2同时使用put或者take方法.假设当前有99个元素,容量为100.当t1执行到if(size == elems.length)后被调度走了,再轮到t2执行.当t2执行完后,队列的元素语已经满了.但是当轮到t1执行时因为上一次的if判断它还会再入队一个元素,这就会出现size为101的问题.
正解
我们将if判断条件也放到锁中就可以了.
public void put(int value) {
synchronized (this) {
//判断队列满没满
if(size == elems.length) {
//阻塞
return;
}
//添加元素
elems[tail] = value;
tail++;
//判断尾指针是不是需要循环到0位置
if(tail == elems.length) {
tail = 0;
}
size++;
}
}
public int take() {
int elem = 0;
synchronized(this) {
//判断队列为不为空
if(size == 0) {
//阻塞
return elem;
}
//出队
elem = elems[head];
head++;
if(head == elems.length) {
head = 0;
}
size--;
return elem;
}
}
实现阻塞
这里需要实现阻塞就要使用我们的wait和notify方法.
当队列满时,使用put就会执行wait进入阻塞,只有当使用take调用notify队列才会解除堵塞.
当队列为空时,使用take就会执行wait进入堵塞,只有当使用put调用notify队列才会解除堵塞.
public void put(int value) throws InterruptedException {
synchronized (this) {
//判断队列满没满
if(size == elems.length) {
this.wait();
return;
}
//添加元素
elems[tail] = value;
tail++;
//判断尾指针是不是需要循环到0位置
if(tail == elems.length) {
tail = 0;
}
size++;
this.notify();
}
}
public int take() throws InterruptedException {
int elem = 0;
synchronized(this) {
//判断队列为不为空
if(size == 0) {
this.wait();
return elem;
}
//出队
elem = elems[head];
head++;
if(head == elems.length) {
head = 0;
}
size--;
this.notify();
return elem;
}
}
但是这样又会出现一个问题. 假设有三个线程t1,t2,t3 t1和t2调用put. t3调用take. 且这个队列已经满了. 这里就会有一种情况: t1和t2调用put发现满了就都会在wait那里堵塞,且解锁. 这时t3就执行take方法出队了一个且调用了notify方法唤醒了t1. 则t1也就向下执行入队了一个,此时队列是满了.但是!!!t1的notify方法就可能会唤醒t2.而t2就会直接向下执行又入队一个,但队列是满的,这就出现问题了.
正解
我们可以在if判断那里将if改成while,这样就算被notify唤醒了,也会再次判断队列是不是满/空.才会选择是不是执行还是继续堵塞.
public void put(int value) throws InterruptedException {
synchronized (this) {
//判断队列满没满
while(size == elems.length) {
this.wait();
return;
}
//添加元素
elems[tail] = value;
tail++;
//判断尾指针是不是需要循环到0位置
if(tail == elems.length) {
tail = 0;
}
size++;
this.notify();
}
}
public int take() throws InterruptedException {
int elem = 0;
synchronized(this) {
//判断队列为不为空
while(size == 0) {
this.wait();
return elem;
}
//出队
elem = elems[head];
head++;
if(head == elems.length) {
head = 0;
}
size--;
this.notify();
return elem;
}
}
基于自己实现的阻塞队列实现一个简单的生产者消费者模型
public class ThreadDemo8 {
public static void main(String[] args) {
block queue = new block(1000);
Thread t1 = new Thread(() -> {
int count = 0;
while(true) {
try {
queue.put(count);
System.out.println("生产者: " + count);
Thread.sleep(1000);
count++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread t2 = new Thread(() -> {
while(true) {
try {
System.out.println("消费者: " + queue.take());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
t2.start();
}
}