【Java多线程案例】实现阻塞队列
1. 阻塞队列简介
1.1 阻塞队列概念
阻塞队列:是一种特殊的队列,具有队列"先进先出"的特性,同时相较于普通队列,阻塞队列是线程安全
的,并且带有阻塞功能
,表现形式如下:
- 当队列满时,继续入队列就会阻塞,直到有其他线程从队列中取出元素
- 当队列空时,继续出队列就会阻塞,直到有其他线程往队列中插入元素
基于阻塞队列我们可以实现生产者消费者模型
,这在后端开发场景中是相当重要的!
1.2 生产者-消费者模型优势
基于阻塞队列实现的 生产者消费者模型 具有以下两大优势:
- 解耦合:
以搜狗搜索的服务器举例,用户输入搜索关键字 **美容,**客户端的请求到达搜狗的"入口服务器"时,会将请求转发到 广告服务器 和 大搜索服务器,此时广告服务器返回相关广告内容,大搜索服务器根据搜索算法匹配对应结果返回,如果按照这种方式通信,那么入口服务器需要编写两套代码分别同广告服务器和大搜索服务器进行交互,并且一个严重问题是如果其中广告服务器宕机了,会导致入口服务器无法正常工作进而影响大搜索服务器也无法正常工作!!
而引入阻塞队列后,入口服务器不需要知晓广告服务器和大搜索服务器的存在,只需要往阻塞队列中发送请求即可,而广告服务器和大搜索服务器也不需要知道入口服务器的存在,只需要从阻塞队列中取出请求处理完毕返回给阻塞队列即可,并且当其中大搜索服务器宕机时,不影响其他服务器以及入口服务器的正常运作!
- 削峰填谷:
如果没有阻塞队列,当遇到一些突发场景例如"双十一"大促等客户请求量激增的时候,入口服务器转发的请求量增多,压力就会变大,同理广告服务器和大搜索服务器处理过程复杂繁多,消耗的硬件资源就会激增,达到硬件瓶颈之后服务器就宕机了(直观现象就是客户端发送请求,服务器不会响应了)
而引入阻塞队列/消息队列之后,由于阻塞队列只负责存储相应的请求或者响应,无需额外的业务处理,因此抗压能力比广告服务器和大搜索服务器更强,当客户请求量激增的时候交由阻塞队列承受,而广告服务器和大搜索服务器只需要按照特定的速率进行读取并返回处理结果即可,就起到了 削峰填谷 的作用!
注意:此处的阻塞队列在现实场景中并不是一个单纯的数据结构,往往是一个基于阻塞队列的服务器程序,例如消息队列(MQ)
2. 标准库中的阻塞队列
2.1 基本介绍
Java标准库提供了现成的阻塞队列数据结构供开发者使用,即BlockingQueue
接口
BlockingQueue:该接口具有以下实现类:
- ArrayBlockingQueue:基于数组实现的阻塞队列
- LinkedBlockingQueue:基于链表实现的阻塞队列
- PriorityBlockingQueue:带有优先级的阻塞队列
BlockingQueue方法:该接口具有以下常用方法
- 带有阻塞功能:
put
:向队列中入元素,队列满则阻塞等待take
:向队列中取出元素,队列空则阻塞等待
- 不带有阻塞功能:
peek
:返回队头元素(不取出)poll
:返回队头元素(取出)offer
:向队列中插入元素
2.2 代码示例
/**
* 测试Java标准库提供的阻塞队列实现
*/
public class TestStandardBlockingQueue {
private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
public static void main(String[] args) {
// 生产者
Thread t1 = new Thread(() -> {
int i = 0;
while (true) {
try {
queue.put(i);
System.out.println("生产数据:" + i);
i++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
// 消费者
Thread t2 = new Thread(() -> {
while (true) {
try {
Thread.sleep(1000);
int ele = queue.take();
System.out.println("消费数据:" + ele);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
t2.start();
}
}
运行效果:
我们在主线程中创建了两个线程,其中t1
线程作为生产者不断循环生产元素,而线程t2
作为消费者每隔1s消费一个数据,所以我们很快看到当生产数据个数达到容量capacity
时就会继续生产就会阻塞等待,直到消费者线程消费数据后才可以继续入队列,这样就实现了一个 生产者-消费者模型 !
3. 自定义实现阻塞队列
首先我们需要明确实现一个阻塞队列需要哪些步骤?
- 首先我们需要实现一个普通队列
- 使用锁机制将普通队列变成线程安全的
- 通过特殊机制让该队列能够带有"阻塞"功能
3.1 实现普通队列
相信大家如果学过 数据结构与算法 相关课程,应该对队列这种数据结构的实现并不陌生!实现队列有基于数组的也有基于链表的,我们此处采用基于数组实现的,基于数组实现的循环队列也有以下两种方式:
- 腾出一个空间用来判断队列空或者满
- 使用额外的变量
size
用来记录当前元素的个数
我们使用第二种方式实现,实现代码如下:
/**
* 自定义实现阻塞队列
*/
public class MyBlockingQueue {
private int head = 0; // 头指针
private int tail = 0; // 尾指针
private int size = 0; // 当前元素个数
private String[] array = null;
private int capacity; // 容量
public MyBlockingQueue(int capacity) {
this.capacity = capacity;
this.array = new String[capacity];
}
/**
* 入队列方法
*/
public void put(String elem) {
if (size == capacity) {
// 队列已经满了
return;
}
array[tail] = elem;
tail++;
if (tail >= capacity) {
tail = 0;
}
size++;
}
/**
* 出队列方法
*/
public String take() {
// 判断队列是否为空
if (size == 0) {
return null;
}
String topElem = array[head];
head++;
if (head >= capacity) {
head = 0;
}
size--;
return topElem;
}
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue(3);
queue.put("11");
queue.put("22");
queue.put("33");
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
}
}
3.2 引入锁机制实现线程安全
引入synchronized
关键字在原有队列实现的基础上实现线程安全,代码如下:
/**
* 自定义实现阻塞队列
*/
public class MyBlockingQueue {
private int head = 0; // 头指针
private int tail = 0; // 尾指针
private int size = 0; // 当前元素个数
private String[] array = null;
private int capacity; // 容量
private Object locker = new Object(); // 锁对象
public MyBlockingQueue(int capacity) {
this.capacity = capacity;
this.array = new String[capacity];
}
/**
* 入队列方法
*/
public void put(String elem) {
synchronized (locker) {
if (size == capacity) {
// 队列已经满了
return;
}
array[tail] = elem;
tail++;
if (tail >= capacity) {
tail = 0;
}
size++;
}
}
/**
* 出队列方法
*/
public String take() {
String topElem = "";
synchronized (locker) {
// 判断队列是否为空
if (size == 0) {
return null;
}
topElem = array[head];
head++;
if (head >= capacity) {
head = 0;
}
size--;
}
return topElem;
}
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue(3);
queue.put("11");
queue.put("22");
queue.put("33");
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
}
}
我们在put
、take
等关键方法上将 多个线程修改同一个变量 部分的操作进行加锁处理,实现线程安全!
3.3 加入阻塞功能
在普通队列的实现中,如果队列满或者空我们直接使用return
关键字返回,但是在多线程环境下我们希望实现阻塞等待的功能,这就可以使用Object类提供的wait/notify
这组方法实现阻塞与唤醒机制了!我们就需要考虑阻塞与唤醒的时机了!
何时阻塞:这个问题非常简单,当队列满时入队列操作就应该阻塞等待,而当队列为空时出队列操作就需要阻塞等待
何时唤醒:想必大家都可以想到,对于入队列操作来说,只要队列不满就可以被唤醒,而对于出队列操作来说,队列不为空就可以被唤醒,因此,只要有线程调用take
操作出队列,那么入队列的线程就可以被唤醒,而只要有线程调用put
操作入队列,那么出队列的线程就可以被唤醒
/**
* 自定义实现阻塞队列
*/
public class MyBlockingQueue {
private int head = 0; // 头指针
private int tail = 0; // 尾指针
private int size = 0; // 当前元素个数
private String[] array = null;
private int capacity; // 容量
private Object locker = new Object(); // 锁对象
public MyBlockingQueue(int capacity) {
this.capacity = capacity;
this.array = new String[capacity];
}
/**
* 入队列方法
*/
public void put(String elem) {
synchronized (locker) {
while (size == capacity) {
// 队列已经满了(进行阻塞)
try {
locker.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
array[tail] = elem;
tail++;
if (tail >= capacity) {
tail = 0;
}
size++;
locker.notifyAll();
}
}
/**
* 出队列方法
*/
public String take() {
String topElem = "";
synchronized (locker) {
// 判断队列是否为空
while (size == 0) {
try {
locker.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
topElem = array[head];
head++;
if (head >= capacity) {
head = 0;
}
size--;
locker.notifyAll();
}
return topElem;
}
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue(10);
// 生产者
Thread producer = new Thread(() -> {
int i = 0;
while (true) {
queue.put(i + "");
System.out.println("生产元素:" + i);
i++;
}
});
// 消费者
Thread consumer = new Thread(() -> {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
String elem = queue.take();
System.out.println("消费元素" + elem);
}
});
producer.start();
consumer.start();
}
}
我们使用wait/notify
这组操作实现了阻塞/唤醒功能,并且满足必须使用在synchronized
关键字内部的使用条件,这里有一个注意点
为什么我们将if判断条件改成了while循环呢???这是需要考虑清楚的!
如图所示:一开始由于队列满所以生产者1进入阻塞状态,释放锁,然后生产者2也进入阻塞状态释放锁,此时消费者消费一个元素后唤醒生产者1,然后生产者1生产一个元素后(记住此时队列已满)继续唤醒,但是此时唤醒的恰恰是 生产者2 ,生产者2继续执行生产元素,于是就出现问题,我们总结一下出现问题的原因:
notifyAll
是随机唤醒,无法指定唤醒线程,因此可能出现生产者唤醒生产者,消费者唤醒消费者的情况if
判定条件一经执行就无法继续判定,所以生产者2被唤醒后没有再次判断当前队列是否满
于是我们的应对策略就是使用while
循环,当线程被唤醒使重新判断,如果队列仍满,入队列操作继续阻塞,而队列仍空,出队列操作继续阻塞!Java标准也推荐我们使用 while 关键字和 wait 关键字一起使用!
4. 应用场景(实现生产者消费者模型)
我们继续基于我们自定义实现的阻塞队列再来实现 生产者-消费者模型
代码示例(主函数):
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue(10);
// 生产者
Thread producer = new Thread(() -> {
int i = 0;
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
queue.put(i + "");
System.out.println("生产元素:" + i);
i++;
}
});
// 消费者
Thread consumer = new Thread(() -> {
while (true) {
String elem = queue.take();
System.out.println("消费元素" + elem);
}
});
producer.start();
consumer.start();
}
运行效果:
此时我们创建两个两个线程,producer
作为生产者线程每隔1s生产一个元素,consumer
作为消费者线程不断消费元素,此时我们看到的就是消费者消费很快,当阻塞队列空时就进入阻塞状态,直到生产者线程生产元素后才被唤醒继续执行!此时我们真正模拟实现了 阻塞队列 这样的数据结构!