Java 异步编程——常用线程池 ThreadPoolExecutor
文章目录
- ThreadPoolExecutor
- 核心参数详解
- 线程池的处理流程
- 线程池中使用的阻塞队列
- 4 种任务拒绝策略
- 4 种不同的线程池
- 线程池的五种状态
- Future 接口
ThreadPoolExecutor
java.uitl.concurrent.ThreadPoolExecutor 类是线程池中最核心的一个类。
线程池底层示例图:
ThreadPoolExecutor 的构造方法:
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
private volatile int maximumPoolSize;
private final BlockingQueue<Runnable> workQueue;
private volatile RejectedExecutionHandler handler;
private volatile ThreadFactory threadFactory;
private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
核心参数详解
-
corePoolSize:线程池中的常驻核心线程数。核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在
创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务
,除非调用了 prestartAllCoreThreads() 或者 prestartCoreThread() 方法,这2个方法是预创建线程的意思,即在没有任务到来之前就创建 corePoolSize 个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize 后,就会把到达的任务放到缓存队列当中; -
maximumPoolSize:线程池最大线程数(非核心线程数),表示在线程池中最多能创建多少个线程;
-
keepAliveTime:多余的空闲线程的存活时间。表示线程没有任务执行时最多保持多久时间会终止。当前线程池数量超过corePoolSize时,当空闲的时间达到keepAliveTime值时,多余的空闲线程会被直接销毁直到只剩下corePoolSize个线程为止。
默认情况下,只有当线程池中的线程数大于 corePoolSize 时,keepAliveTime 才会起作用,直到线程池中的线程数不大于 corePoolSize,即当线程池中的线程数大于 corePoolSize 时,如果一个线程空闲的时间达到 keepAliveTime,则会终止,直到线程池中的线程数不超过 corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean) 方法,在线程池中的线程数不大于 corePoolSize 时,keepAliveTime 参数也会起作用,直到线程池中的线程数为0;
-
TimeUnit:时间单位,keepAliveTime 的单位。
-
ThreadFactory:线程工厂,主要用来创建线程,用于创建线程一般用默认的即可。
-
workQueue:一个阻塞队列,用来存储等待执行的任务(被提交但未被执行的任务),维护着等待执行的 Runnable 对象。
-
RejectedExecutionHandler:饱和策略,线程拒绝策略,表示当队列满了并且工作线程大于等于线程池的最大线程数。
线程池的处理流程
- 在创建了线程池后,开始等待请求。
- 当调用 execute() 方法添加一个请求任务时,线程池会做出如下判断:
- 当任务数量少于 corePoolSize 时,会自动创建 thread 来处理这些任务并加入线程池;
- 当添加任务数大于 corePoolSize 且少于 maximumPoolSize 时,不再创建线程,而是将这些任务放到阻塞队列中,等待被执行;
- 当阻塞队列满了之后,线程池中的线程数没有超过 maximumPoolSize 最大线程数,则新建一个线程处理任务并加入到线程池中,从而加速处理阻塞队列;
- 当阻塞队列满了之后,且添加任务大于 maximmPoolSize 时,根据饱和策略决定是否容许继续向线程池中添加任务,默认的饱和策略是 AbortPolicy(直接丢弃)。
- 当一个线程完成任务时,它会从队列中取下一个任务来执行。
- 空闲的线程(非核心线程)会在到达 keepAliveTime 时间之后没有被使用的话就被回收。所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。
提交优先级:任务先提交到核心线程,然后提交到队列中,最后提交到非核心线程。
执行优先级:先执行核心线程中任务,然后执行非核心线程中任务,最后执行队列中的任务。
线程池中使用的阻塞队列
在前面我们多次提到了任务缓存队列,即 workQueue,它用来存放等待执行的任务。
workQueue 的类型为 BlockingQueue<Runnable>,通常可以取下面三种类型:
- ArrayBlockingQueue:基于数组的先进先出有界阻塞队列,此队列创建时必须指定大小;
- LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE(65536),当大量请求任务时,容易造成内存耗尽。
- SynchronousQueue:是一个特殊的 BlockingQueue,它没有容量。这个队列只有在另一个线程在同步 remove 的时候才可以 put 成功,对应到线程池中,简单来说就是如果有线程池任务处理完了,调用 poll 或者 take 方法获取新的任务的时候,新提交的任务才会 put 成功,否则如果当前的线程都在忙着处理任务,那么就会 put 失败,也就会走扩容的逻辑(新建一个线程来执行新来的任务)。
阻塞队列(BlockingQueue)
-
BlockingQueue 继承了 Queue 接口,是队列的一种。Queue 和 BlockingQueue 都是在 Java 5 中加入的。在新增的 Concurrent 包中(java.util.concurrent.BlockingQueue)。
public interface BlockingQueue<E> extends Queue<E> {...}
-
为什么出现 BlockingQueue
BlockingQueue 很好的解决了多线程高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。
多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。然而,在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
作为 BlockingQueue 的使用者,我们再也不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都给你一手包办了。(在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒)。
-
阻塞队列的特点
阻塞队列与普通队列的区别在于:
- 当队列是空的时,从队列中获取元素的操作将会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。
- 当队列是满时,往队列里添加元素的操作会被阻塞。试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列。
-
阻塞队列提供了四种处理方法:
方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出 插入方法 add(e) offer(e) put(e) offer(e,time,unit) 移除方法 remove() poll() take() poll(time,unit) 检查方法 element() peek() 不可用 不可用 -
异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出 IllegalStateException(“Queue full”) 异常。当队列为空时,从队列里获取元素时会抛出 NoSuchElementException 异常 。
-
返回特殊值:插入方法会返回是否成功,成功则返回 true。移除方法,则是从队列里拿出一个元素,如果没有则返回 null。
- 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
- 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。
-
-
阻塞队列的成员
队列 有界性 锁 数据结构 ArrayBlockingQueue bounded(有界) 加锁 arrayList LinkedBlockingQueue optionally-bounded 加锁 linkedList PriorityBlockingQueue unbounded 加锁 heap DelayQueue unbounded 加锁 heap SynchronousQueue bounded 加锁 无 LinkedTransferQueue unbounded 加锁 heap LinkedBlockingDeque unbounded 无锁 heap -
ArrayBlockingQueue:是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。【注:每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程的请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是当前等待时间最长的线程先获取锁】
-
ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
数组实现、有界、先进先出、队列不支持空元素、线程安全的阻塞循环队列;
对数据操作时,共用一把锁,所以不能同时读写操作;
enqueue()和dequeue()方法是入队和出队的核心方法,他们分别通知”队列非空”和”队列非满”,从而使阻塞中的入队和出队方法能够继续执行,以实现生产者消费者模式。
-
-
LinkedBlockingQueue:一个由链表结构组成的有界队列,此队列的长度为 Integer.MAX_VALUE。
-
此队列按照先进先出的顺序进行排序。
链表实现、先进先出、队列不支持空元素、线程安全的阻塞队列。
内部有两把锁,插入和取出各一把,互不干扰,所以能同时进行读写操作。
由于 FixedThreadPool 和 SingleThreadExecutor 的线程数是固定的,所以一般用容量无穷大的 LinkedBlockingQueue 队列来存任务。
-
-
PriorityBlockingQueue: 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。
-
PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。
数组实现、通过排序指定出对规则、不允许为null的元素插入、线程安全的无界队列。
可以指定内部元素的排序规则,依赖Comparator来确保不同元素的排序位置。
添加到PriorityBlockingQueue队列中的元素对应的Java类,通常需要实现Comparable接口或者是可以默认排序的对象(如数字、字符串),否则会抛出 ClassCastException。
-
-
DelayQueue: 一个实现 PriorityBlockingQueue 实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。(DelayQueue可以运用在以下应用场景:
-
缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
-
定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。)
可实现延时、用于放置实现了Delayed接口的对象、不允许为null的元素插入、线程安全的无界队列。
-
-
SynchronousQueue: 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool() 就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
-
一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者。
容量为0,不存储任何元素,即没有数据缓存的空间。一对一,生产者和消费者缺一就阻塞。入队线程和出队线程必须一一匹配,否则任意先到达的线程会阻塞。比如ThreadA进行入队操作,在有其它线程执行出队操作之前,ThreadA会一直等待,反之亦然
SynchronousQueue 提供两种实现方式,分别是 栈 和 队列 的方式实现。这两种实现方式中,栈 是属于非公平的策略,队列 是属于公平策略。默认情况下采用非公平性访问策略,当然也可以通过构造函数来设置为公平性访问策略(为true即可)。
在java线程池newCachedThreadPool中就使用了这种阻塞队列。
-
-
LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。
-
LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。
-
4 种任务拒绝策略
当线程池的任务缓存队列已满并且线程池中的线程数目达到 maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
- ThreadPoolExecutor.AbortPolicy(默认):丢弃任务并抛出 RejectedExecutionException 异常。
- ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,并将新任务加入对列,然后重新尝试执行任务(重复此过程)
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。
自定义拒绝策略:实现 RejectedExecutionHandle 接口。
4 种不同的线程池
除了可以使用 ThreadPoolExecutor 自己根据实际情况创建线程池以外,Executor 框架也提供了线程池,可以通过工具类 Executors 来创建。
Executors 类提供了4种不同的线程池:newCachedThreadPool,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor。(不过不建议使用 Executors 类来创建这些线程池,而是直接使用 ThreadPoolExecutor 自定义创建。)
-
newCachedThreadPool 创建一个可缓存线程池(CachedThreadPool),如果线程池长度超过处理需要,可灵活回收空闲线程;若无可回收,则新建线程。(使用 Executors 创建该线程池会使 CPU 100%)。
优点:如果当第二个任务开始,第一个任务已经执行结束,那么第二个任务会复用第一个任务创建的线程,并不会重新创建新的线程,提高了线程的复用率;否则可以创建线程加快任务处理。
不足:这种方式虽然可以根据业务场景自动的扩展线程数来处理我们的业务,但是最多需要多少个线程同时处理是我们无法控制的。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
-
newFixedThreadPool 创建一个固定大小的线程池(FixedThreadPool),可控制线程最大并发数,超出的线程会在队列中等待。(使用 Executors 创建该线程池会出现 OOM,因为使用 LinkedBlockingQueue)
优点:newFixedThreadPool 的线程数是可以进行控制的,因此我们可以通过控制最大线程来使我们的服务器达到最大的使用率,同时又可以保证及时流量突然增大也不会占用服务器过多的资源。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable()); }
-
newScheduledThreadPool 创建一个固定大小的线程池(ScheduledThreadPool),支持定时及周期性任务执行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
-
newSingleThreadExecutor 创建一个单线程化的线程池(SingleThreadExecutor),它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIF0,LIFO,优先级)执行。(使用 Executors 创建该线程池会出现 OOM,因为使用 LinkedBlockingQueue)
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
线程池的五种状态
线程池的5种状态:Running、ShutDown、Stop、Tidying、Terminated。
RUNNING
- 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
- 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0。
SHUTDOWN
- 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
- 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
STOP
- 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
- 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN) -> STOP。
TIDYING
- 状态说明:当所有的任务已终止,ctl 记录的”任务数量”为0,线程池会变为 TIDYING 状态。当线程池变为 TIDYING 状态时,会执行钩子函数 terminated()。terminated() 在 ThreadPoolExecutor 类中是空的,若用户想在线程池变为 TIDYING 时,进行相应的处理;可以通过重载 terminated() 函数来实现。
- 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。
- 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。
TERMINATED
- 状态说明:线程池彻底终止,就变成TERMINATED状态。
- 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。
Future 接口
Future 接口和实现 Future 接口的 FutureTask 类用来表示异步计算的结果。当我们把 Runnable 接口或 Callable 接口的实现类提交(submit)给 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 时,ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 会向我们返回一个FutureTask对象。
Future 的类图结构
Future 接口一般用于表示异步计算的结果。
- 异步工作机制,如果我们在主线程中直接写一个函数来执行任务,这是同步的任务,也就是说必须要等这个函数返回以后我们才能继续做接下的事情,但是如果这个函数返回的结果对接下来的任务并没有意义,那么我们等在这里是很浪费时间的,而 FutureTask 就提供了这么一个异步的返回结果的机制,当执行一个 FutureTask 的时候,我们可以接着做别的任务,在将来的某个时间,FutureTask 任务完成后会返回 FutureTask 对象来包装返回的结果,只要调用这个对象的 get() 方法即可获取返回值。
Future 接口主要包括5个方法。
- get() 方法:用于获取执行完后的结果。get() 是一个阻塞操作,即调用 get(),任务结束获取返回的结果;如果调用时工作还没有结束,则会阻塞线程,直到任务执行完毕。
- get(long timeout, TimeUnit unit):在规定等待时间任务结束,获取结果;超时则会抛出一个TimeoutException。
- cancel(boolean mayInterruptIfRunning)方法:选择是否中断正在运行的task。如果我们 cancel 之后,再次调用 get() 方法,则会抛出 CancellationException。
- isDone() 方法:判断当前方法是否完成。
- isCancel() 方法:判断当前方法是否取消。