二、Java 并发编程(5)
本章概要
- 线程上下文切换
- 线程上下文切换的流程
- 导致线程上下文切换的原因
- Java中的阻塞队列
- 阻塞队列的主要操作
- Java中阻塞队列的实现
2.7 线程上下文切换
CPU 利用时间片轮询来为每个任务都服务一定的时间,然后把当前任务的状态保存下来,继续服务下一个任务。任务的状态保存及再加载的过程叫做线程的上下文切换。
- 进程:指一个运行中的程序的实例。在一个进程内部可以有多个线程同时运行,并与创建它的进程共享同一地址空间(一段内存区域)和其它资源。
- 上下文:指线程切换时 CPU 寄存器和程序计数器锁保存的当前线程的信息。
- 寄存器:指 CPU 内部容量较小但速度很快的内存区域(与之对应的是 CPU 外部相对较慢的 RAM 主内存)。寄存器通过对常用值(通常是运算的中间值)的快速访问来加快计算程序运行的速度。
- 程序计数器:是一个专用的寄存器,用于表明指令序列中 CPU 正在执行的位置,存储的值为正在执行的指令的位置或者下一个将被执行的指令的位置。
2.7.1 线程上下文切换的流程
CPU为了能够执行多个线程,需要不停的切换执行的线程,这样才能使所有线程在一段时间内都有被执行的机会。CPU通过时间片分配算法来循环执行任务,当前任务执行一个时间片后切换到下一个任务。但是,在切换前会保存上一个任务的状态,以便下次切换回这个任务时,可以再加载这个任务的状态。所以任务从保存到再加载的过程就是一次上下文切换。
线程上下文切换的流程如下:
- 挂起一个线程,将这个线程在 CPU 中的状态(上下文信息)存储于内存的中。
- 在内存中检索下一个线程的上下文并将其在 CPU 的寄存器中恢复。
- 跳转到程序计数器所指向的位置(即跳转到线程被中断时的代码行)并恢复该线程。
2.7.2 导致线程上下文切换的原因
- 当前正在执行的任务完成,系统的 CPU 正常调度下一个任务。
- 当前正在执行的任务遇到 I/O 等阻塞操作,调度器挂起此任务,继续调度下一个任务。
- 多个任务并发抢占资源,当前任务没有抢到锁资源,被调度器挂起,继续调度下一个任务。
- 用户的代码挂起当前任务,比如线程执行 sleep 方法,让出 CPU (但仍持有锁)。
相关面试题:
- 什么是线程上下文切换?★★★★★
- 线程上下文切换的流程。★★★☆☆
2.8 Java 中的阻塞队列
队列是一种只允许在表的前端进行删除操作,而在表的后端进行插入操作的线性表。在阻塞队列中,线程阻塞有如下两种情况:
- 消费者阻塞:在队列为空时,消费者端的线程都会被自动阻塞(挂起),直到有数据放入队列,消费者线程才会被自动唤醒并消费数据。如下:
- 生产者阻塞:在队列已满且没有可用空间时,生产者端的线程会被自动阻塞(挂起),直到队列中有空的位置腾出,线程才会被自动唤醒并生产数据。如下:
2.8.1 阻塞队列的主要操作
阻塞队列的主要操作有插入操作和移除操作。插入操作有 add(e)、offer(e)、put(e)、offer(e,time,unit),移除操作有 remove()、poll()、take()、poll(time,unit),具体介绍如下:
- 插入操作
(1)boolean add(E e):将指定元素插入队列中,在成功时返回 true,如果没有可用的空间,则抛出 IllegalStateException 异常。如果该元素是 null,则抛出 空指针异常。 JDK 源码的实现如下(AbstractQueue.java):
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
(2)boolean offer(E e):将指定元素插入队列中,在成功时返回 true,如果没有可用的空间,则返回 false。JDK 源码的实现如下(ArrayBlockingQueue.java):
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
(3)boolean offer(E e, long timeout, TimeUnit unit):将指定的元素插入队列中,可以设定等待的时间,如果在设定的等待时间内仍不能向队列中加入元素,则返回 false。JDK 源码的实现如下(ArrayBlockingQueue.java):
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
(4)void put(E e) throws InterruptedException:将指定的元素插入队列中,如果队列已满,则阻塞、等待可用空间的释放,直到有可用空间释放且插入成功为止。JDK 源码的实现如下(ArrayBlockingQueue.java):
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
- 获取数据操作
(1)E poll():取走队头的对象,如果获取不到数据,则返回 null。 JDK 源码的实现如下(ArrayBlockingQueue.java):
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
(2)E poll(long timeout, TimeUnit unit) throws InterruptedException:取走队头的对象,如果在指定的时间内队列中有数据可获取,则返回队列中的数据,在等待超时并且没有数据可获取时,返回 null。JDK 源码的实现如下(ArrayBlockingQueue.java):
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
(3)E take():取走队头的对象,如果队列为空,则进入阻塞状态等待,直到队列有新的数据加入,再及时取出新加入的数据。JDK 源码的实现如下(ArrayBlockingQueue.java):
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
(4)drainTo(Collection<? super E> c, int maxElements):一次性从队列中批量获取所有可用的数据对象,同时可以指定获取数据的个数,通过该方法可以提高获取数据的效率,避免多次频繁操作引起的队列锁定。JDK 源码的实现如下(ArrayBlockingQueue.java):
public int drainTo(Collection<? super E> c, int maxElements) {
checkNotNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = Math.min(maxElements, count);
int take = takeIndex;
int i = 0;
try {
while (i < n) {
@SuppressWarnings("unchecked")
E x = (E) items[take];
c.add(x);
items[take] = null;
if (++take == items.length)
take = 0;
i++;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
count -= i;
takeIndex = take;
if (itrs != null) {
if (count == 0)
itrs.queueIsEmpty();
else if (i > take)
itrs.takeIndexWrapped();
}
for (; i > 0 && lock.hasWaiters(notFull); i--)
notFull.signal();
}
}
} finally {
lock.unlock();
}
}
2.8.2 Java 中阻塞队列的实现
Java 中的阻塞队列有 ArrayBlockingQueue,LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、SynchronousQueue、LinkedTransferQueue、LinkedBlockingDeque。如下:
名称 | 说明 |
---|---|
ArrayBlockingQueue | 基于数组结构实现的有界阻塞队列 |
LinkedBlockingQueue | 基于链表结构实现的有界阻塞队列 |
PriorityBlockingQueue | 支持优先级排序的无界阻塞队列 |
DelayQueue | 基于优先级队列实现的无界阻塞队列 |
SynchronousQueue | 用于控制互斥操作的阻塞队列 |
LinkedTransferQueue | 基于链表结构实现的无界阻塞队列 |
LinkedBlockingDeque | 基于链表结构实现的双向阻塞队列 |
- ArrayBlockingQueue
ArrayBlockingQueue 是基于数组实现的有界阻塞队列,按照先进先出原则对元素进行排序,在默认情况下不保证元素操作的公平性。
队列操作的公平性指在生产者线程或消费者线程发生阻塞后再次被唤醒时,按照阻塞的先后顺序操作队列,即先阻塞的生产者线程优先向队列中插入元素,先阻塞的消费者线程优先从队列中获取元素。
因为保证公平性会降低吞吐量,所以如果要处理数据没有先后顺序,则对其可以使用非公平处理的方式。可以通过以下代码创建一个公平或非公平的阻塞队列:
//大小为 1000 的公平队列
final ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
//大小为 1000 的非公平队列
final ArrayBlockingQueue unFairQueue = new ArrayBlockingQueue(1000,false);
- LinkedBlockingQueue
LinkedBlockingQueue 是基于链表实现的阻塞队列,同 ArrayBlockingQueue 类似,按照先进先出原则对元素进行排序。
LinkedBlockingQueue 对生产者端和消费者端分别采用了两个独立的锁来控制数据同步,我们可以将队列头的锁理解为写锁,将队列尾的锁理解为读锁,因此生产者和消费者可以基于各自独立的锁并行的操作队列的数据,LinkedBlockingQueue 的并发性较高。具体用法如下:
final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(100);
- PriorityBlockingQueue
PriorityBlockingQueue 是一个支持优先级的无界队列,元素在默认情况下采用自然顺序升序排列。可以通过 compareTo 方法来自定义元素的排序规则,或者在初始化 PriorityBlockingQueue 时指定构造参数 Comparator 来实现对元素的排序。
注意:如果两个元素的优先级相同,则不能保证该元素的存储和访问顺序。具体用法如下:
public class Test1 implements Comparable<Test1>{
private String id;
//排序字段 number
private Integer number;
//定义可排序的阻塞队列,根据 Test1 的 number 属性大小由小到大排序
final PriorityBlockingQueue<Test1> priorityBlockingQueue = new PriorityBlockingQueue<Test1>();
/**
* 自定义排序规则:将 number 字段作为排序字段
*/
public int compareTo(Test1 o) {
return this.number.compareTo(o.getNumber());
}
public Integer getNumber() {
return number;
}
public void setNumber(Integer number) {
this.number = number;
}
}
- DelayQueue
DelayQueue 是一个支持延时获取元素的无界阻塞队列,队列底层使用 PriorityQueue 实现。
DelayQueue 中的元素必须实现 Delayed 接口,该接口定义了创建元素时该元素的延迟时间,在内部通过为每个元素的操作加锁来保障数据的一致性,只有在延迟时间到后才能从队列中提取元素。
我们可以将 DelayQueue 运用于如下场景中:
- 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从 DelayQueue 中获取元素,则表示缓存元素的有效期到了。
- 定时任务调度:使用 DelayQueue 保存即将执行的任务和执行时间,一旦从 DelayQueue 中获取元素,就表示任务开始执行,Java 中的 timeQueue 就是使用 DelayQueue 实现的。
在具体使用时,延迟对象必须先实现 Delayed 类并重写 getDelay 方法和 compareTo 方法,才可以在延迟队列中使用:
public class TestDelayData implements Delayed {
//延迟对象的排序字段
private Integer number;
//设置队列延迟 5s 获取
private long delayTime = 50000;
public Integer getNumber() {
return number;
}
public void setNumber(Integer number) {
this.number = number;
}
public long getDelay(TimeUnit unit) {
return this.delayTime;
}
public int compareTo(Delayed o) {
TestDelayData testDelayData = (TestDelayData) o;
return this.number.compareTo(testDelayData.getNumber());
}
public static void main(String[] args) {
//创建延时队列
DelayQueue<TestDelayData> queue = new DelayQueue<TestDelayData>();
//实时添加数据
queue.add(new TestDelayData());
while (true) {
try {
//延迟 5s 才能获取数据
TestDelayData testDelayData = queue.take();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
- SynchronousQueue
SynchronousQueue 是一个不存储元素的阻塞队列。SynchronousQueue 中的每个 put 操作都必须等待一个 take 操作完成,否则不能继续向对队列中添加元素。
我们可以将 SynchronousQueue 看做“快递员”,负责把生产者线程的数据直接传递给消费者线程,非常适用于传递性场景,比如将一个线程中使用的数据传递给另一个线程使用。SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue。
具体的使用方法如下:
public class TestSynchronousQueue {
/**
* 生产者线程
*/
static class Producter extends Thread {
SynchronousQueue<Integer> queue;
public Producter(SynchronousQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
int product = new Random().nextInt(1000);
//生产一个随机数作为数据放入队列
queue.put(product);
System.out.println("生产了一个数据:" + product);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(queue.isEmpty());
}
}
}
/**
* 消费者线程
*/
static class Customer extends Thread {
SynchronousQueue<Integer> queue;
public Customer(SynchronousQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
int data = queue.take();
System.out.println("消费了一个数据:" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<Integer>();
new Producter(synchronousQueue).start();
new Customer(synchronousQueue).start();
}
}
- LinkedTransferQueue
LinkedTransferQueue 是基于链表结构实现的无界阻塞 TransferQueue 队列。相对于其他阻塞队列,LinkedTransferQueue 多了 transfer、tryTransfer 和 tryTransfer(E e,long time,TimeUnit unit) 方法。
- transfer 方法:如果当前消费者正在等待接收元素,transfer 方法就会直接把生产者传入的元素投递给消费者并返回 true。如果没有消费者在等待就收元素,transfer 方法就会将元素存放在队尾(tail)节点,直到该元素被消费后才返回。
- tryTransfer 方法:首先尝试能否将生产者传入的元素直接传给消费者,如果没有消费者等待接收元素,则返回 false。和 transfer 方法的区别是,无论消费者是否接收元素,tryTransfer 方法都立即返回,而 transfer 方法必须等到元素被消费后才返回。
- tryTransfer(E e,long time,TimeUnit unit) 方法:首先尝试把生产者传入的元素直接传给消费者,如果没有消费者,则等待指定时间,在超时后如果元素还没有被消费,则返回 false,否则返回 true。
- LinkedBlockingDeque
LinkedBlockingDeque 是基于链表结构实现的双向阻塞队列,可以在队列两端分别执行插入和移除元素操作。这样,在多线程同时操作队列时,可以减少一半的锁资源竞争,提高队列的操作效率。
LinkedBlockingDeque 相比其他阻塞队列,多了 addFirst 、addLast、offerFirst、offerLast、peekFirst、peekLast 等方法。以 First 结尾的方法表示在队头执行插入(add)、获取(peek)、移除(offer)操作;以 Last 结尾的方法表示在队尾执行插入、获取、移除操作。
在初始化 LinkedBlockingDeque 时,可以设置队列的大小以防止内存溢出,双向阻塞队列也常被用于工作窃取模式。
相关面试题:
- 什么是阻塞队列?阻塞队列的原理是什么?★★★★★
- Java 中的阻塞队列有哪些?★★★☆☆
- 如何使用 阻塞队列实现生产者-消费者模型?★★★☆☆
- 当阻塞队列为空时,如果某线程调用 take 取得队头的元素,则会发生什么?★★★☆☆
- 阻塞队列的线程安全是如何实现的?★★★☆☆
- 如何使用数组实现一个简单的阻塞队列?★★☆☆☆
- 阻塞队列的主要操作有哪些?★★☆☆☆
- ArrayBlockingQueue 的公平性和非公平性指的是什么?★☆☆☆☆
- 阻塞队列和非阻塞队列的区别是什么?★☆☆☆☆
- 阻塞队列的有界和无界指的是什么?★★☆☆☆