java 线程池Executor框架
Executor框架
- Executor框架构成
- 异步计算的结果涉及的接口或类
- 多线程实现涉及的接口或类
- 多线程的管理(线程池涉及的接口或类)
- Executor接口
- ExecutorService接口
- AbstractExecutorService抽象类
- ScheduledExecutorService接口
- ThreadPoolExecutor类(线程池核心类)
- ScheduledThreadPoolExecutor类
Executor框架简介:多线程管理所用到的线程池所对应的类 接口组合在一起的所形成的这么一个所谓框架, Executor框架实现的就是线程池的功能
Executor框架构成
Executor框架包括3大部分:
①任务。也就是工作单元,包括被执行任务需要实现的接口:Runnable接口或者Callable接口;
②任务的执行。也就是把任务分派给多个线程的执行机制,包括Executor接口及继承自Executor接口的ExecutorService接口
③异步计算的结果。包括Future接口及实现了Future接口的FutureTask类
Executor框架成员关系图简示:
异步计算的结果涉及的接口或类
Future接口 FutureTask类和CompletableFuture<T>详情介绍详情链接
多线程实现涉及的接口或类
Callable接口详情介绍点击链接
Thread类使用详情介绍点击链接
多线程的管理(线程池涉及的接口或类)
Executor框架的使用
Executor接口
Executor接口是线程池的顶级接口,只有一个方法execute,用于执行Runnable任务
public interface Executor {
void execute(Runnable command);
}
ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)execute()方法中,由Executor框架完成线程的调配和任务的执行部分。
如ExecutorService接口就增加了一些能力:(
1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;
2)提供了管控线程池的方法,比如停止线程池的运行
ExecutorService接口
由于ExecutorService接口是继承的Executor接口,所以自动拥有了execute方法
并扩展 Executor接口,添加任务生命周期管理(如 shutdown())、异步任务提交(submit())等功能方法
public interface ExecutorService extends Executor {
//关闭线程池:停止接收新的任务,但是允许已经提交的任务继续执行直到它们完成
void shutdown();
//关闭线程池:会尝试停止所有正在执行的任务,并返回一个包含未执行任务的列表
List<Runnable> shutdownNow();
/**检查线程池是否已经开始关闭过程 也就是是否执行了 shutdown 或 shutdownNow 方法,
*需要注意的是,如果调用 isShutdown() 方法的返回结果为 true,不代表线程池此时已经彻底关闭了,
*这仅仅代表线程池开始了关闭的流程。也就是说,此时可能线程中依然有线程在执行任务,
*队列里也可能有等待被执行的任务
*/
boolean isShutdown();
/**
* isTerminated()方法用于判断ExecutorService是否已经终止。
*isTerminated()方法的返回值是一个布尔值,如ExecutorService已经终止,则返回true
*需要注意的是,除非先调用shutdown或shutdownNow,否则isTerminated()方法永远不返回true
*/
boolean isTerminated();
/**
用于判断 等待所有任务在工作队列中是否执行完毕。
它接收两个参数:一个是等待时间(long 类型),另一个是时间单位(TimeUnit 枚举类型)。
如果在指定时间内所有任务都完成了执行,该方法将返回 true;
如果在指定时间内还有任务未完成,该方法将返回 false
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 提交一个Callable任务,该任务有返回值。
* 返回的Future对象的get()方法将返回Callable任务的返回值
*/
<T> Future<T> submit(Callable<T> task);
/**
* 提交一个Runnable任务,并指定一个结果T result。
* 即使Runnable任务没有返回值,也可以通过Future对象的get()方法获取到这个结果
* 注意Runnable任务是没有返回值,想要获取结果是通过submit方法,你可以传递一个结果引用T result
* 因为这个引用可以在任务执行过程中被修改,从而在主线程中获取到任务的执行结果。
* 这是因为submit方法实际上返回了一个Future对象,该对象可以用来获取任务的执行结果
*/
<T> Future<T> submit(Runnable task, T result);
//提交一个Runnable任务,该任务没有返回值。返回的Future对象的get()方法将返回null
Future<?> submit(Runnable task);
//invokeAll方法是一个用于 批量提交多个任务 并等待所有任务完成,
//并返回一个包含每个任务对应 Future 的列表
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
//是一个用于批量提交多个任务并控制超时时间,返回一个包含所有任务结果的 Future 列表
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
//取得第一个完成任务的结果值,当第一个任务执行完成后,会调用interrupt方法将其他的任务中断取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
//invokeAny() 方法用于并发执行多个任务并返回第一个成功完成的任务结果(或在超时前返回)
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
总结
1.关闭线程池方法(shutdownNow(),shutdown()方法)
2.关闭线程池方法配合使用的方法
a.isTerminated()(用于判断ExecutorService是否已经终止)
b.awaitTermination()(用于判断 等待所有任务在工作队列中是否执行完毕)
3.submit() 和 自动继承的execute(Runnable a)方法区别
相同点是都可以启动线程,
不同点
①submit()可以接收Runnable或Callable类型的参数 ,execute(Runnable a)只能接收Runnable类型的参数
②submit()可以返回Future对象,从而可以获取执行结果或者需要捕获执行过程中的异常,而execute()方法无返回值
AbstractExecutorService抽象类
AbstractExecutorService抽象类提供了ExecutorService接口部分方法的默认实现,并新增了部分方法,这是一个抽象类,但是该类中没有一个抽象方法,为何还要定义成一个抽象类
抽象类只能被继承,不可以实现,并且抽象类不可以实例化,这些都是抽象类的特性
AbstractExecutorService 被定义为抽象类的原因可以从以下几个方面来理解:
1.防止实例化:抽象类的一个重要特性是不能被实例化。AbstractExecutorService 作为一个抽象类,确保了它不能被直接实例化,而是必须通过其子类来实现具体的功能。这有助于保证 ExecutorService 接口的正确实现,避免直接使用不完整的实现。
2.提供模板方法:AbstractExecutorService 提供了一些默认的实现方法,这些方法可以被子类直接使用或重写。这种设计模式被称为模板方法模式,它允许在抽象类中定义算法的骨架,而将一些步骤的实现延迟到子类中。这种方式提高了代码的复用性和可维护性。
3.强制子类实现特定方法:虽然 AbstractExecutorService 中没有显式的抽象方法,但它可能包含了一些需要子类实现的方法。通过将这些方法定义为抽象方法,可以强制子类提供具体的实现,从而确保子类的行为符合预期。
4.设计意图:从设计角度来看,AbstractExecutorService 作为一个抽象类,表明它是一个不完整的实现,需要通过子类来完成。这种设计意图可以通过将类定义为抽象类来明确传达给开发者
public abstract class AbstractExecutorService implements ExecutorService {
//将Runnable任务包装为RunnableFuture<T>
//参数runnable:原始任务 value:任务完成后返回的结果
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
// 将 Callable 任务包装为 RunnableFuture<T>
//参数callable:原始任务 返回值:RunnableFuture<T> 实例
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
//提交 Runnable 任务并指定返回结果。
public Future<?> submit(Runnable task) {
..................................................
}
//提交 Callable 任务,返回 Future 对象
public <T> Future<T> submit(Runnable task, T result) {
...................................................
}
//提交 Runnable 任务,返回 Future<?>(结果为 null)
public <T> Future<T> submit(Callable<T> task) {
....................................................
}
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
........................................................
}
//返回第一个成功完成的任务结果
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
......................................................................
}
//带超时的 invokeAny
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
//提交所有任务并阻塞等待全部完成
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
..................................................................
}
//提交所有任务并带超时等待
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
...........................................................
}
}
总结
1.除了以上方法继承ExecutorService接口提供了默认实现方法,其余的还是继承ExecutorService接口接口方法的特征,并且新增了newTaskFor方法
2.顶级接口Executor接口的execute方法也通过继承-实现等操作到子类这里,具体实现还是由继承 AbstractExecutorService的子类去进行重写实现
// 继承 AbstractExecutorService类重写该方法就这样的
@Override
public void execute(Runnable command) {}
ScheduledExecutorService接口
ScheduledExecutorService接口继承了ExecutorService 接口,并且新增了关于任务的延迟执行、固定频率执行和固定延迟执行等方法
public interface ScheduledExecutorService extends ExecutorService {
//单次延迟执行任务:提交一个 Runnable 任务,在指定延迟后 单次执行
//ScheduledFuture<?>,用于跟踪任务状态或取消任务
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
//提交一个 Callable 任务,在指定延迟后 单次执行,并返回计算结果
//参数command:待执行的任务 delay:延迟时间 unit:时间单位(如 TimeUnit.SECONDS
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
//提交一个 Runnable 任务,按固定频率 周期性执行
//参数initialDelay:首次执行的延迟时间 period:连续两次任务开始时间的间隔
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
//提交一个Runnable任务,按固定延迟周期性执行
//initialDelay:首次执行的延迟时间。delay:上一次任务结束 到下一次任务开始的时间间隔
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
代码示例
public class ScheduledTaskExample {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
// 单次延迟任务
scheduler.schedule(
() -> System.out.println("单次延迟任务"),
2, TimeUnit.SECONDS
);
// 固定频率任务
ScheduledFuture<?> fixedRateFuture = scheduler.scheduleAtFixedRate(
() -> System.out.println("固定频率任务 - 时间: " + System.currentTimeMillis()),
1, 3, TimeUnit.SECONDS
);
//如果任务抛出未捕获的异常,周期性任务会终止,后续执行不再继续,需在任务内部处理异常
scheduler.scheduleAtFixedRate(() -> {
try {
// 业务逻辑
} catch (Exception e) {
// 记录日志或恢复状态
}}, 1, 5, TimeUnit.SECONDS);
// 固定延迟任务
ScheduledFuture<?> fixedDelayFuture = scheduler.scheduleWithFixedDelay(
() -> System.out.println("固定延迟任务 - 时间: " + System.currentTimeMillis()),
1, 3, TimeUnit.SECONDS
);
// 运行 10 秒后关闭
Thread.sleep(10_000);
fixedRateFuture.cancel(true);//Future的cancel取消方法
fixedDelayFuture.cancel(true);
scheduler.shutdown();//调用shutdown()或shutdownNow()关闭线程池,否则JVM不会退出
}
}
ThreadPoolExecutor类(线程池核心类)
可以说ThreadPoolExecutor 就是线程池,实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务
ThreadPoolExecutor类(线程池)介绍使用点击链接
主要就三点
1.ThreadPoolExecutor类线程池的生命周期维护管理
2.ThreadPoolExecutor类线程池对于任务的分配管理
3.ThreadPoolExecutor类线程池对于线程池中的线程生命周期管理
ScheduledThreadPoolExecutor类
1. 核心继承与接口
ScheduledThreadPoolExecutor是Java并发包中的一个线程池实现,用于执行定时或周期性任务。它继承自ThreadPoolExecutor,所以应该具备基本的线程池功能,同时扩展了定时或延迟任务的能力
public class ScheduledThreadPoolExecutor
//继承自ThreadPoolExecutor:复用线程池的基础功能(如线程管理、任务队列)。
extends ThreadPoolExecutor
//实现ScheduledExecutorService:提供定时任务的调度接口(如 schedule, scheduleAtFixedRate)
implements ScheduledExecutorService {
}
虽然继承自ThreadPoolExecutor:复用线程池的基础功能,但是有区别
ScheduledThreadPoolExecutor的构造函数最多只有三个参数
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
少了maximumPoolSize,keepAliveTime,TimeUnit,BlockingQueue workQueue四个参数,原因在于ScheduledThreadPoolExecutor的内部类DelayedWorkQueue队列
2. 核心数据结构
任务队列:DelayedWorkQueue
DelayedWorkQueue专为 ScheduledThreadPoolExecutor 优化使用的类
DelayedWorkQueued介绍点击链接
①DelayedWorkQueue是初始化为16且动态扩容的,是无界的
所以线程池非核心线程不会触发,只会在核心线程满了进入队列然后队列动态扩容,所以关于非核心线程的三个参数都没了,因为无界队列,因此需要主要OOM内存溢出问题
②DelayedWorkQueue是一个基于堆的优先级队列:内部使用数组实现小顶堆,按任务的触发时间(time 字段)排序。我们定时周期或延迟执行的任务会封装进入DelayedWorkQueue,通过delayedExecute(sft)方法提交到队列,所以参数BlockingQueue workQueue也没有了
任务封装:ScheduledFutureTask
private class ScheduledFutureTask<V>
extends FutureTask<V>
implements RunnableScheduledFuture<V> {
private long time; // 任务触发时间(纳秒)
private final long period; // 周期(正数:固定速率;负数:固定延迟)
//...
}
触发时间排序:通过 compareTo 方法实现堆排序。
周期模式:
- 固定速率(Fixed Rate):基于初始调度时间计算下一次触发时间。
- 固定延迟(Fixed Delay):基于任务实际完成时间计算下一次触发时间。
3.调度逻辑
①任务提交schedule 方法:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
// 包装任务为 ScheduledFutureTask
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<>(command, null, triggerTime(delay, unit));
// 提交到队列
delayedExecute(sft);
return sft;
}
②delayedExecute 方法
private void delayedExecute(RunnableScheduledFuture<?> task) {
//如果线程池已关闭,执行拒绝策略。
if (isShutdown())
reject(task);
else {
//将任务添加到 DelayedWorkQueue。
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
4.周期性任务处理
run 方法重写(在 ScheduledFutureTask 中):
public void run() {
if (!isPeriodic()) {
super.run(); // 一次性任务直接执行
} else if (runAndReset()) { // 周期性任务执行后重置状态
// 计算下一次触发时间
setNextRunTime();
// 重新加入队列
reExecutePeriodic(outerTask);
}
}
固定速率:time += period(基于初始时间)。
固定延迟:time = now() + period(基于当前时间)。
5. 线程池配置
- 核心线程数固定:默认不回收核心线程(allowCoreThreadTimeOut 默认为 false),确保及时处理定时任务。
- 最大线程数无效:由于使用无界队列,线程数不会超过核心线程数(除非设置 allowCoreThreadTimeOut == true)
总结关键问题与注意事项
任务堆积与 OOM
风险:DelayedWorkQueue 是无界的,大量提交任务会导致队列膨胀。
解决方案:监控任务提交速率,或自定义有界队列(需处理拒绝策略)。
任务取消与删除
remove 方法:直接从队列删除任务,但时间复杂度为 O(n)(堆结构导致低效)。
purge 方法:清理所有已取消的任务,减少内存占用。(父类ThreadPoolExecutor里面的方法)
异常处理
静默吞噬异常:任务执行抛异常后,后续周期任务不再执行(需在任务内捕获异常)