多线程案例——阻塞队列
目录
一、阻塞队列
1. 生产者消费者模型
(1)解耦合
(2)“削峰填谷”
2. 标准库中的阻塞队列
3. 自己实现一个阻塞队列(代码)
4. 自己实现生产者消费者模型(代码)
一、阻塞队列
阻塞队列同样是一个符合先进先出的队列,相比于普通队列,阻塞队列还有其他的功能:
- 1. 线程安全
- 2. 产生阻塞效果
- (1)如果队列为空,尝试出队列,就会出现阻塞,阻塞到队列不为空为止;
- (2)如果队列满了,尝试入队列,就会出现阻塞,阻塞到队列不为满为止。
基于上述特点的阻塞队列,我们就可以实现“生产者消费者模型”。(处理多线程问题的一个典型方式)。
1. 生产者消费者模型
生产者消费者模型:通过一个容器(阻塞队列),来解决生产者消费者之间的强耦合问题。(生产者生产商品,消费者购买商品)
开发实际生活中,使用的 阻塞队列 并不是一个简单的数据结构,而是一组专门的服务器程序。里面也不仅仅包含了阻塞队列的功能,还有其他的功能。这样的队列叫做 “消息队列” 。
(1)解耦合
假设现在有两个服务器 A 和 B ,A 作为入口服务器(直接接收用户的网络请求),B 作为应用服务器(给 A 提供服务)。
假设不使用 生产者消费者模型:
即生产者和消费者之间直接进行沟通,则具有 强耦合性 。
强耦合性 体现在:
当开发 A 代码的时候就得充分了解 B 提供的一些接口,开发 B 代码的时候也得充分了解到 A 是怎么调用的。当我们想把 B 换成 C 时,A 的代码就得发生较大的改动,并且如果 B 崩溃了,也可能导致 A 也崩溃。
因此强耦合性是一种不好的现象。 而生产者消费者模型刚好可以降低这里的耦合,即模块和模块之间的联系尽可能的少。
当 A 给 B 发送请求时,A 先将请求写到阻塞队列中,然后 B 再从阻塞队列中取出请求。当 B 返回结果时,先把结果放到阻塞队列中,然后 A 再从阻塞队列中取出结果。
- 请求:A 是生产者,B 是消费者
- 响应:A 是消费者,B 是生产者
- 阻塞队列:作为交易场所
使用了生产者消费者模型后,A 只需要关注如何和队列进行交互,不需要认识 B ,B 也是如此。当 B 崩溃时,对 A 没有影响。A 崩溃时, B 也没有影响。相应的,如果将 B 换做 C ,A 也完全感知不到。 就达到了解耦合的目的。
(2)“削峰填谷”
使用生产者消费者模型,能够对于请求进行“削峰填谷”。(类似水库 -> 当雨季时,可以利用水库正常为稻田送水;当旱季时,打开水库,同样可以保证为稻田正常送水)
还是使用上述 A 和 B 服务器的例子。假设我们没有使用生产者消费者模型:
削峰:
当有大量请求时,A 服务器作为入口服务器,计算量轻,不会造成太大影响。而因为强耦合性,B 也会突然暴涨,B 作为应用服务器,计算量比 A 大得多,可能就直接造成崩溃了。
如果使用了生产者消费者模型,遇到请求暴涨的情况:
A 的请求暴涨,导致了阻塞队列的暴涨,因为阻塞队列只是存储数据,因此没有太大影响。B 这边依然按照原来的速度消费数据,不会因为 A 的暴涨而引起暴涨。因此 B 就被保护起来了,不会因为这种请求暴涨而崩溃,即 削峰 。(让 B 感受不到)
填谷:
当 A 的请求很少,而 B 依然会按照原来的速度消费数据。虽然 A 的请求少,但是阻塞队列中还有挤压的数据,就能够让 B 正常运行。
2. 标准库中的阻塞队列
Java标准库中自带了阻塞队列,当我们编写代码时需要使用阻塞队列时,直接使用即可。
- BlockingQueue 是一个接口,真正实现的类时 LinkedBlockingQueue。(类似List,LinkedList和ArrayList)
- put:阻塞式出队列
- take:阻塞式出队列
- (也有 offer、poll、peek等普通队列有的方法,但是这些方法不带有阻塞特性)
public class Demo1 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new LinkedBlockingDeque<>();
//阻塞式入队列,如果队列满,就会阻塞
queue.put("jiu");
//阻塞式出队列,如果队列空,就会阻塞
System.out.println(queue.take()); //结果为 jiu
}
}
3. 自己实现一个阻塞队列(代码)
要实现一个阻塞队列:一个普通队列 + 线程安全 + 阻塞功能 。
① 实现队列(先进先出)可以通过链表或者数组,这里使用数组。——> 循环队列(队首head 和 队尾 tail)
- 入队列:把新元素放到 tail 的位置,tail++
- 出队列:取出 head 位置的元素,head++
- 循环队列:当 head 或者 tail 到达数组末尾时,就需要从头开始(= 0),重新循环。
- 判空 / 满:要实现循环队列,这是一个重要问题。我们用 变量 size 记录元素的个数,进行判断。
② 实现线程安全。因为在方法中全是共享变量,所以可以直接对整个方法进行加锁,也可以对整个代码块进行加锁,同时专门设置一个锁对象locker。
③ 实现阻塞效果。利用 wait 和 notify 来实现。使用哪个对象加锁就要用哪个对象 wait
- 对于 put 来说,阻塞条件:队列为满。而 put 中的 wait 要由 take 来唤醒。(take 成功了队列就不满了)
- 对于 take 来说,阻塞条件:队列为空。而 take 中的 wait 要由 put 来唤醒。(put 成功了队列就不为空了)
如果有人等待,notify 可以唤醒;如果没人等待,notify 也没有任何影响。
//假设队列中存放的是整型元素
class MyBlockingQueue {
private int[] array = new int[1000];
private int size = 0;
private int head = 0;
private int tail = 0;
private Object locker = new Object();
//阻塞式入队列
public void put(int value) throws InterruptedException {
synchronized(locker) {
if (size == array.length) {
//说明队列满了,无法再入队列了
//所以进入阻塞状态
locker.wait();
}
//队列没有满,可以入队列
//1.把值放到 tail 的位置
array[tail] = value;
tail++;
//2.判断是否到队尾的情况(等价于 tail = tail % array.length)
if (tail >= array.length) {
tail = 0;
}
//3.size++
size++;
//入队列成功,队列非空,从阻塞队列中唤醒 take
locker.notify();
}
}
//阻塞式出队列
public int take() throws InterruptedException {
synchronized(locker) {
if (size == 0) {
//队列为空,没有元素可以出
//进入阻塞状态
locker.wait();
}
//队列中有元素,可以出队列
//1.取出 head 位置的值
int ans = array[head];
head++;
//2. 判断是否到队尾
if (head >= array.length) {
head = 0;
}
//3. size--
size--;
//出队列成功,队列不为满,从阻塞队列中唤醒 put
locker.notify();
return ans;
}
}
}
4. 自己实现生产者消费者模型(代码)
public class Demo2 {
//自己实现生产者消费者模型
public static MyBlockingQueue queue = new MyBlockingQueue();
public static void main(String[] args) {
Thread producer = new Thread(() -> {
int num = 0;
while(true) {
try {
queue.put(num);
System.out.println("生产了: " + num);
num++;
//让生产者生产慢一点,消费者就得跟着生产者的步伐
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
Thread customer = new Thread(() -> {
while(true) {
try {
int num = queue.take();
System.out.println("消费了: " + num);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
customer.start();
}
}