Javaee:阻塞队列和生产者消费者模型
文章目录
- 什么是阻塞队列
- java中的主要阻塞队列
- 生产者消费者模型
- 阻塞队列发挥的作用
- 解耦合
- 削峰填谷
- 模拟实现阻塞队列
- put方法
- take方法
- 生产者消费者模型
什么是阻塞队列
阻塞队列是一种支持阻塞操作
的队列,在多线程中实现通线程之间的通信协调的特殊队列
java中的主要阻塞队列
java中的阻塞队列是一个接口BlockingQueue<>
带有实例化类有:
ArrayBlockingQueue(有界队列) |
---|
LinkedBlockingQueue(无界队列) |
ProrityBlockingQueue(FIFO队列) |
阻塞队列不仅继承了队列的所有方法,还提供了带有阻塞效果的put方法和take方法
put(E e):将元素插入队列,如果队列满,则等待直到队列有空间。 |
---|
take():从队列中取出并移除元素,如果队列为空,则等待直到队列有元素。 |
特点
线程安全
阻塞特性
队列为空,尝试出队列,出队列操作就会阻塞,阻塞到其他线程添加元素为止
队列为满,尝试进队列,进队列操作也会阻塞,阻塞到其他线程取走元素为止
以上是线程一往阻塞队列添加元素,线程二从阻塞队列中移除元素
理解有界和无界
有界:在实例化对象的时候,可以在构造方法传参,表示阻塞队列能容纳的最大元素个数
无界:没有设置固定大小的队列,表示可以存储的元素个数范围很大
生产者消费者模型
阻塞队列是生产者消费者模型的交易场所
举个生活中的例子
包饺子
奶奶负责擀饺子皮,奶奶把饺子皮放到一个容器里,我和妈妈把饺子皮从容器中拿出来包饺子。
饺子皮——>资源
奶奶——>生产者
我和妈妈——>消费者
容器——>交易场所(阻塞队列)
阻塞队列发挥的作用
解耦合
举例
服务器A直接与服务器B进行交互,一旦服务器A中的代码发生改变,服务器B越需要做出相应的改变,也就是他们之间的关联度高,耦合度高
解耦合——通过阻塞队列
服务器A不直接与服务器B进行交互,而是通过阻塞队列这个交易场所进行交互,降低了服务器之间的关联度
在生产者消费者模型中的应用
阻塞队列通过充当中间缓存,允许生产者先把数据放入队列,消费者在有空闲时再取出处理,这样就不会因为速度不一致而导致数据丢失或重复处理
削峰填谷
如果服务器A传给B的数据量激增,服务器B没有那么大的接收量,就会瞬间过载
削峰填谷——阻塞队列
在请求量突然增加的情况下,阻塞队列可以缓冲住大量的生产者请求,避免消费者瞬时过载。
当生产量较小时,消费者可以逐步消费队列中积累的数据,从而实现负载平衡和资源优化。
模拟实现阻塞队列
put方法
public void put(String elem) throws InterruptedException {
synchronized(locker) {//保证原子性
// 使用while循环来检查队列是否已满,以处理虚假唤醒和条件变化的情况
while (size >= data.length) {//if的话会出问题,使用wait需要搭配while进行二次判断,如果唤醒wait之后,size还是为0,那么执行下面的操作就会出问题
//如果队列已满,则当前线程等待,直到被其他线程唤醒
locker.wait();//抛一下异常
}
data[tail] = elem;
tail++;
if (tail >= data.length) {
tail = 0;//便于理解
//tail=(tail+1)%data.length;
}
size++;
locker.notify();//说明不为空就去唤醒take的wait
}
}
take方法
public String take() throws InterruptedException {
synchronized (locker) {//保证原子性
// 使用while循环来检查队列是否为空,以处理虚假唤醒和条件变化的情况
if (size == 0) {
locker.wait();// 如果队列为空,则当前线程等待,直到被其他线程唤醒
}
String ret = data[head];
head++;
if (head >= data.length) {
head = 0;
}
size--;
// 在取出元素并更新队列状态后,通知其他等待的线程
locker.notify();//相互唤醒,说明没满,就去唤醒put的wait
return ret;
}
}
生产者消费者模型
Thread producer=new Thread(()->{
int n=0;
try {
while(true){
queue.put(n+" ");
System.out.println("生产元素"+n);
n++;
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},"生产者");
Thread consumer=new Thread(()->{
while(true){
String n=null;
try {
n=queue.take();
System.out.println("消费元素"+n);
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread consumer2=new Thread(()->{
while(true){
String n=null;
try {
n=queue.take();
System.out.println("消费元素"+n);
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});