多线程篇(阻塞队列- BlockingQueue)(持续更新迭代)
目录
一、了解什么是阻塞队列之前,需要先知道队列
1. Queue(接口)
二、阻塞队列
1. 前言
2. 什么是阻塞队列
3. Java里面常见的阻塞队列
三、BlockingQueue(接口)
1. 前言
2. 简介
3. 特性
3.1. 队列类型
3.2. 队列数据结构
2. 简介
4. 核心功能
入队(放入数据)
出队(取出数据)
总结
四、常见BlockQueue
1. LinkdBlockQueue
生产者
消费者
测试代码
2. DelayQueue
3. PriorityBlockingQueue
4. SynchronousQueue
5. 等等等
五、知识小结
一、了解什么是阻塞队列之前,需要先知道队列
1. Queue(接口)
定义了队列的基本功能,添加、删除、查询。
满足FIFO(先进先出原则)。
public interface Queue<E> extends Collection<E> {
//添加一个元素,添加成功返回true, 如果队列满了,就会抛出异常
boolean add(E e);
//添加一个元素,添加成功返回true, 如果队列满了,返回false
boolean offer(E e);
//返回并删除队首元素,队列为空则抛出异常
E remove();
//返回并删除队首元素,队列为空则返回null
E poll();
//返回队首元素,但不移除,队列为空则抛出异常
E element();
//获取队首元素,但不移除,队列为空则返回null
E peek();
}
二、阻塞队列
1. 前言
JDK中提供了一系列场景的并发安全队列。
总的来说,按照实现方式的不同可分为阻塞队列和非阻塞队列。
阻塞队列使用锁实现,而非阻塞队列则使用 CAS 非阻塞算法实现。
2. 什么是阻塞队列
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入
和移除方法。
- 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
- 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是 从队列里
取元素的线程。
阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。
在阻塞队列不可用时,这两个附加操作提供了4种处理方式,如表6-1所示。
- 抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。
- 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移 除方法,则是从队列里取出一个元素,如果没有则返回null。
- 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。
- 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程 一段时间,如果超过了指定的时间,生产者线程就会退出。
这两个附加操作的4种处理方式不方便记忆,所以我找了一下这几个方法的规律。
put和take分别尾首含有字母t,offer和poll都含有字母o。
注意:如果是无界阻塞队列,队列不可能会出现满的情况,所以使用put或offer方法永远不会被阻
塞,而且使用offer方法时,该方法永远返回true。
3. Java里面常见的阻塞队列
JDK 7 提供了 7 个阻塞队列,如下。
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向无界阻塞队列。
注意 - 是针对AQS这一块
三、BlockingQueue(接口)
1. 前言
在上面,我们已经了解到什么事阻塞队列。
阻塞队列,顾名思义,首先他是一个队列常用的队列主要有以下两种方式
(当然通过不同的方式可以衍生出不同类别的队列方式,比如DelayQueue)
- 先进先出(FIFO):先插入队列的元素也先出队列,类似排队的功能,从某种程度上来说,这种队列体现了公平性
- 先进后出(LIFO):后插入队列的元素,最先出队多线程环境,通过队列很容易实现数据共享,比如经典的生产者与消费者模型中,通过队列可以很方便的实现数据共享。
假设我们有若干个生产者线程,又有若干个消费者线程。假设生产者线程把准备好的数据要共享给
消费者线程,利用队列来共享数据,就可以很方便的解决他们之间数据共享问题。但如果生产者跟
消费者在某个时间段内,万一发生处理数据速度不匹配的情况呢?
理想情况下,假如生产者产出数据的速度大于消费者消费数据的速度,并且生产数据累积到一定程
度的时候,那生产者必须暂停等待一下(阻塞生产者线程),以便消费者线程把累积的数据处理完
毕。反之亦然。然而在Concurent 包发布之前,在多线程环境下,我们每个程序员都要自己去控制
这些细节,尤其还要兼顾效率和安全。这些会给我们的程序带来不少的复杂度。好在这是强大的
Concurent 包横空出世了,而他也给我们带来了强大的 BlockQueue,(在多线程领域,所谓阻塞,
会挂起线程(即阻塞),一旦满足条件,被挂起的线程,又会自动被唤醒)
- 当队列中没有数据的情况下,消费端的所有线程都会被阻塞,直到有数据放入队列
- 当队列中装满了数据的情况下,生产者端的所有线程都会被阻塞(挂起),直到队列中有空余的位置,线程被自动唤醒
这也是我们在多线程环境下,为什么使用BlockQueue 的原因,作为BlockQueue的使用者,我们
再也不需要关心什么时候阻塞线程,什么时候唤醒线程,因为一起 BlockQueue 都给你包办了,既
然BlockQueue 如此申通广大,让我们一起见识下他的方法。
2. 简介
在新增的 Concurent 包中,BlockQueue 很好解决了多线程中,如何高效传输数据的问题。
通过这些高效且线程安全的队列类,为我们搭建高质量的多线程程序带来极大的遍历。
在任意时刻只有一个线程可以进行take或者put操作,并且BlockingQueue提供了超时return null的
机制,在许BlockingQueue,是java.util.concurrent 包提供的用于解决并发生产者 - 消费者问题的
最有用的类,它的特性是多生产场景里都可以看到这个enq工具的身影。可以看到他是继承于JDK
的Queue。
接下来将会介绍BlockQueue家庭中的所有成员,包括他们的功能,一些经常使用的场景。
3. 特性
3.1. 队列类型
无限队列 (unbounded queue ) - 几乎可以无限增长
有限队列 ( bounded queue ) - 定义了最大容量
3.2. 队列数据结构
队列实质就是一种存储数据的结构,通常用链表或者数组实现。
一般而言队列具备FIFO先进先出的特性,当然也有双端队列(Deque)优先级队列
主要操作:入队(EnQueue)与出队(Dequeue)
2. 简介
在上面,我们已经了解到什么事阻塞队列。
阻塞队列,顾名思义,首先他是一个队列常用的队列主要有以下两种方式
(当然通过不同的方式可以衍生出不同类别的队列方式,比如DelayQueue)
- 先进先出FIFO
先插入队列的元素也先出队列,类似排队的功能,从某种程度上来说,这种队列体现了公平性
- 先进后出(LIFO)后插入队列的元素,最先出队多线程环境,通过队列很容易实现数据共享,比如经典的生产者与消费者模型中,通过队列可以很方便的实现数据共享。
假设我们有若干个生产者线程,又有若干个消费者线程。假设生产者线程把准备好的数据要共享给消费者线程,利用队列来共享数据,就可以很方便的解决他们之间数据共享问题。但如果生产者跟消费者在某个时间段内,万一发生处理数据速度不匹配的情况呢?
理想情况下,假如生产者产出数据的速度大于消费者消费数据的速度,并且生产数据累积到一定程度的时候,那生产者必须暂停等待一下(阻塞生产者线程),以便消费者线程把累积的数据处理完毕。反之亦然。然而在Concurent 包发布之前,在多线程环境下,我们每个程序员都要自己去控制这些细节,尤其还要兼顾效率和安全。这些会给我们的程序带来不少的复杂度。好在这是强大的 Concurent 包横空出世了,而他也给我们带来了强大的 BlockQueue,(在多线程领域,所谓阻塞,会挂起线程(即阻塞),一旦满足条件,被挂起的线程,又会自动被唤醒)
- 如上图所示,当队列中没有数据的情况下,消费端的所有线程都会被阻塞,直到有数据放入队列
- 如上图所示,当队列中装满了数据的情况下,生产者端的所有线程都会被阻塞(挂起),直到队列中有空余的位置,线程被自动唤醒
这也是我们在多线程环境下,为什么使用BlockQueue 的原因,作为BlockQueue的使用者,我们再也不需要关心什么时候阻塞线程,什么时候唤醒线程,因为一起 BlockQueue 都给你包办了,既然BlockQueue 如此申通广大,让我们一起见识下他的方法。
4. 核心功能
入队(放入数据)
- offer(anObject):表示有可能的话,将Object 放入BlockQueue 中,如果BlockQueue 可以容纳,则返回true,否则返回false,(本方法不阻塞当前线程)。
- offer( E e,Long timeOut,TimeUnit unit):可以设置指定的时间,如果指定时间内还没有往队列中加入,则返回false.
- put(anObject):把anObject 加入到队列中,如果BlockQueue 中没有空间,则调用此方法的线程被阻塞直到 BlockQueue有空间在继续。
出队(取出数据)
- poll(time):取走BlockQueue 中排在首位的对象,若不能及时取出,则等待time 参数规定的时间,取不出时返回null.
- poll(long time ,TimeUnit unit):取出BlockQueue 中排在首位的对象,在指定时间内,一旦有数据就返回,超时还没有数据,则返回失败
- take():取出BlockQueue 中排在首位的数据,如果BlockQueue 中没有数据,则当前线程一直阻塞,直到有新的数据加入。
- drainTo():一次性的从BlockQueue中取出所有可用数据对象(还可以指定取出数据个数),通过该方法可以提升获取数据效率,不需要多次加锁和释放锁,
- peek():获取队首元素,但不移除,队列为空则返回null
总结
当队列满了无法添加元素,或者是队列空了无法移除元素时:
抛出异常:add、remove、element
返回结果但不抛出异常:offer、poll、peek
阻塞:put、take
四、常见BlockQueue
在了解BlockQueue 后,让我们来了解下他的大家庭都有哪些成员
- ArrayBlockQueue,基于数组阻塞队列实现,在ArrayBlockQueue 内,维护了一个定长数组,以便缓存数据对象,这是个常用的阻塞队列,除了定长的数组外,ArralyBlockQueue 还保存着两个定长的整形变量,分别标志着队列的头部跟尾部在数组中的位置。
- ArrayBlockQueue生产者放入数据消费者获取数据,都是共用一个锁对象,由此也意味着两者无法真正的并行运行,这点尤其不同意LinkBlockQueue,按照实现原理来分析,ArrayBlockQueue 完全可用分离锁,从而实现生产者与消费者操作的完全并行执行,之所以没有这么做,也许是因为,ArrayBlockQueue 数据写入与获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性,其性能上完全占不到便宜。ArrayBlockQueue和LinkedBlockQueue还有一个明显的不同之处在于,前着在插入和删除对象元素时,不会产生和销毁任何的对象实列,而后者则会产生一个额外的 node 对象,在长时间内需要高效并发的处理大批量系统中,其对GC的影响还是存在一定的区别,而在创建ArrayBlockQueue时,我们还可以控制对象的内部锁是否使用公平锁,默认采用非公平锁。
1. LinkdBlockQueue
- 基于链表的阻塞队列,同ArrayBlockQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返】回,只有当队列缓存区达到最大缓存容量时(LinkedBlockQueue可以通过构造函数指定该值),才会阻塞生产队列,直到消费者消费掉一份数据后,生产者线程才会被唤醒,反之对消费者这端的处理也基于同样的原理。LinkedBlockQueue 之所以能够高效的处理并发数据,还因为生产者和消费者端分别采用了独立的锁来控制数据同步,这也意味着,在高并发的情况下,生产者和消费者可以并行的操作队列中的数据,以此来提供提供的队列的并发性。
- 作为开发者,我们需要注意的是, 如果构造一个LinkedBlockQueue 队像,而没有指定其容量大小,LinkedBlockQueue 会默认一个类似无线大小的容量(Integer.MAX_VALUE),这样的话,一旦生产者生产者的速度大于消费者的速度,也许还没等到队列满,阻塞产生,系统的内存就有可能被消耗殆尽了,
- ArrayBlockQueue和 linkedBlockQueue 是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程的生产者和消费者问题,这两个类足以。
- 下面演示了如何使用BlockQueue,
生产者
package com.cj.demo.thread.queue;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Producer implements Runnable {
private volatile boolean isRunning = true;//是否在运行标志
private BlockingQueue queue;//阻塞队列
private static AtomicInteger count = new AtomicInteger();//自动更新的值
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
private String threadName="";
//构造函数
public Producer(BlockingQueue queue,String threadName) {
this.queue = queue;
this.threadName = threadName;
}
@Override
public void run() {
String data = null;
Random r = new Random();
System.out.println("启动生产者线程!");
try {
while (isRunning) {
System.out.println("正在生产数据...");
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));//取0~DEFAULT_RANGE_FOR_SLEEP值的一个随机数
data = "data:" + count.incrementAndGet();//以原子方式将count当前值加1
System.out.println(threadName+" 将数据:" + data + "放入队列...");
if (!queue.offer(data, 2, TimeUnit.SECONDS)) {//设定的等待时间为2s,如果超过2s还没加进去返回true
System.out.println("放入数据失败:" + data);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出生产者线程!");
}
}
public void stop() {
isRunning = false;
}
}
消费者
package com.cj.demo.thread.queue;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class Consumer implements Runnable {
private BlockingQueue<String> queue;
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
//构造函数
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("启动消费者线程!");
Random r = new Random();
boolean isRunning = true;
try {
while (isRunning) {
System.out.println("正从队列获取数据...");
String data = queue.poll(2, TimeUnit.SECONDS);//有数据时直接从队列的队首取走,无数据时阻塞,在2s内有数据,取走,超过2s还没数据,返回失败
if (null != data) {
System.out.println("拿到数据:" + data);
System.out.println("正在消费数据:" + data);
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
} else {
// 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
isRunning = false;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出消费者线程!");
}
}
}
测试代码
package com.cj.demo.thread.queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
// 声明一个容量为10的缓存队列
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
//new了三个生产者和一个消费者
Producer producer1 = new Producer(queue,"thread1");
Producer producer2 = new Producer(queue,"thread2");
Producer producer3 = new Producer(queue,"thread3");
Consumer consumer = new Consumer(queue);
// 借助Executors
ExecutorService service = Executors.newCachedThreadPool();
// 启动线程
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer);
// 执行10s
Thread.sleep(10 * 1000);
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(2000);
// 退出Executor
service.shutdown();
}
}
2. DelayQueue
DelayQueue 中的元素,只有当其指定的延迟时间到了,才能够从队列中获取元素。
DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会阻塞, 而只有获取
数据消费者才会阻塞。
3. PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是
PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的
时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有
的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。
4. SynchronousQueue
一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者
拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,
如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue
来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,
而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对
于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方
面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及
时响应性能可能会降低。
声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。
公平模式和非公平模式的区别:
如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者
和消费者,从而体系整体的公平策略;
但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合
一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有
差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。
5. 等等等
五、知识小结
BlockingQueue不光实现了一个完整队列所具有的基本功能,同时在多线程环境下,他还自动管理
了多线间的自动等待于唤醒功能,从而使得程序员可以忽略这些细节,关注更高级的功能。