当前位置: 首页 > article >正文

java 线程池Executor框架


Executor框架

  • Executor框架构成
  • 异步计算的结果涉及的接口或类
  • 多线程实现涉及的接口或类
  • 多线程的管理(线程池涉及的接口或类)
    • Executor接口
    • ExecutorService接口
    • AbstractExecutorService抽象类
    • ScheduledExecutorService接口
    • ThreadPoolExecutor类(线程池核心类)
    • ScheduledThreadPoolExecutor类

Executor框架简介:多线程管理所用到的线程池所对应的类 接口组合在一起的所形成的这么一个所谓框架, Executor框架实现的就是线程池的功能

Executor框架构成

Executor框架包括3大部分:

①任务。也就是工作单元,包括被执行任务需要实现的接口:Runnable接口或者Callable接口

②任务的执行。也就是把任务分派给多个线程的执行机制,包括Executor接口及继承自Executor接口的ExecutorService接口

③异步计算的结果。包括Future接口及实现了Future接口的FutureTask类

Executor框架成员关系图简示:
在这里插入图片描述

异步计算的结果涉及的接口或类

Future接口 FutureTask类和CompletableFuture<T>详情介绍详情链接

多线程实现涉及的接口或类

Callable接口详情介绍点击链接
Thread类使用详情介绍点击链接

多线程的管理(线程池涉及的接口或类)

Executor框架的使用
在这里插入图片描述

Executor接口

Executor接口是线程池的顶级接口,只有一个方法execute,用于执行Runnable任务

public interface Executor {
    void execute(Runnable command);
}

ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)execute()方法中,由Executor框架完成线程的调配和任务的执行部分。
如ExecutorService接口就增加了一些能力:(
1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;
2)提供了管控线程池的方法,比如停止线程池的运行

ExecutorService接口

由于ExecutorService接口是继承的Executor接口,所以自动拥有了execute方法
并扩展 Executor接口,添加任务生命周期管理(如 shutdown())、异步任务提交(submit())等功能方法

public interface ExecutorService extends Executor {

    //关闭线程池:停止接收新的任务,但是允许已经提交的任务继续执行直到它们完成
    void shutdown();

    //关闭线程池:会尝试停止所有正在执行的任务,并返回一个包含未执行任务的列表
    List<Runnable> shutdownNow();

    /**检查线程池是否已经开始关闭过程 也就是是否执行了 shutdown 或 shutdownNow 方法,
    *需要注意的是,如果调用 isShutdown() 方法的返回结果为 true,不代表线程池此时已经彻底关闭了,
    *这仅仅代表线程池开始了关闭的流程。也就是说,此时可能线程中依然有线程在执行任务,
    *队列里也可能有等待被执行的任务
    */
    boolean isShutdown();

    /**
     * isTerminated()方法用于判断ExecutorService是否已经终止。
	  *isTerminated()方法的返回值是一个布尔值,如ExecutorService已经终止,则返回true
	  *需要注意的是,除非先调用shutdown或shutdownNow,否则isTerminated()方法永远不返回true
     */
    boolean isTerminated();

    /**
	用于判断 等待所有任务在工作队列中是否执行完毕。
	它接收两个参数:一个是等待时间(long 类型),另一个是时间单位(TimeUnit 枚举类型)。
	如果在指定时间内所有任务都完成了执行,该方法将返回 true;
	如果在指定时间内还有任务未完成,该方法将返回 false
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 提交一个Callable任务,该任务有返回值。
     * 返回的Future对象的get()方法将返回Callable任务的返回值
     */
    <T> Future<T> submit(Callable<T> task);

    /**
   * 提交一个Runnable任务,并指定一个结果T result。
   * 即使Runnable任务没有返回值,也可以通过Future对象的get()方法获取到这个结果
   * 注意Runnable任务是没有返回值,想要获取结果是通过submit方法,你可以传递一个结果引用T result
   * 因为这个引用可以在任务执行过程中被修改,从而在主线程中获取到任务的执行结果。
   * 这是因为submit方法实际上返回了一个Future对象,该对象可以用来获取任务的执行结果
     */
    <T> Future<T> submit(Runnable task, T result);

    //提交一个Runnable任务,该任务没有返回值。返回的Future对象的get()方法将返回null
    Future<?> submit(Runnable task);

    //invokeAll方法是一个用于 批量提交多个任务 并等待所有任务完成,
    //并返回一个包含每个任务对应 Future 的列表
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    //是一个用于批量提交多个任务并控制超时时间,返回一个包含所有任务结果的 Future 列表
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    //取得第一个完成任务的结果值,当第一个任务执行完成后,会调用interrupt方法将其他的任务中断取消
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    //invokeAny() 方法用于并发执行多个任务并返回第一个成功完成的任务结果(或在超时前返回)
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

总结
1.关闭线程池方法(shutdownNow(),shutdown()方法)

2.关闭线程池方法配合使用的方法
a.isTerminated()(用于判断ExecutorService是否已经终止)
b.awaitTermination()(用于判断 等待所有任务在工作队列中是否执行完毕)

3.submit() 和 自动继承的execute(Runnable a)方法区别
相同点是都可以启动线程,
不同点
①submit()可以接收Runnable或Callable类型的参数 ,execute(Runnable a)只能接收Runnable类型的参数
②submit()可以返回Future对象,从而可以获取执行结果或者需要捕获执行过程中的异常,而execute()方法无返回值

AbstractExecutorService抽象类

AbstractExecutorService抽象类提供了ExecutorService接口部分方法的默认实现,并新增了部分方法,这是一个抽象类,但是该类中没有一个抽象方法,为何还要定义成一个抽象类
抽象类只能被继承,不可以实现,并且抽象类不可以实例化,这些都是抽象类的特性
AbstractExecutorService 被定义为抽象类的原因可以从以下几个方面来理解:
1.防止实例化:抽象类的一个重要特性是不能被实例化。AbstractExecutorService 作为一个抽象类,确保了它不能被直接实例化,而是必须通过其子类来实现具体的功能。这有助于保证 ExecutorService 接口的正确实现,避免直接使用不完整的实现。
2.提供模板方法:AbstractExecutorService 提供了一些默认的实现方法,这些方法可以被子类直接使用或重写。这种设计模式被称为模板方法模式,它允许在抽象类中定义算法的骨架,而将一些步骤的实现延迟到子类中。这种方式提高了代码的复用性和可维护性。
3.强制子类实现特定方法:虽然 AbstractExecutorService 中没有显式的抽象方法,但它可能包含了一些需要子类实现的方法。通过将这些方法定义为抽象方法,可以强制子类提供具体的实现,从而确保子类的行为符合预期。
4.设计意图:从设计角度来看,AbstractExecutorService 作为一个抽象类,表明它是一个不完整的实现,需要通过子类来完成。这种设计意图可以通过将类定义为抽象类来明确传达给开发者

public abstract class AbstractExecutorService implements ExecutorService {

	//将Runnable任务包装为RunnableFuture<T> 
	//参数runnable:原始任务 value:任务完成后返回的结果
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

	// 将 Callable 任务包装为 RunnableFuture<T>
	//参数callable:原始任务  返回值:RunnableFuture<T> 实例
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    //提交 Runnable 任务并指定返回结果。
    public Future<?> submit(Runnable task) {
      ..................................................
    }

	//提交 Callable 任务,返回 Future 对象
    public <T> Future<T> submit(Runnable task, T result) {
       ...................................................
    }

	//提交 Runnable 任务,返回 Future<?>(结果为 null)
    public <T> Future<T> submit(Callable<T> task) {
      ....................................................
    }

    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
		........................................................
    }

	//返回第一个成功完成的任务结果
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
     ......................................................................
    }

   //带超时的 invokeAny
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }

	//提交所有任务并阻塞等待全部完成
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
    	..................................................................
    }

	//提交所有任务并带超时等待
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        ...........................................................
      }

}

总结
1.除了以上方法继承ExecutorService接口提供了默认实现方法,其余的还是继承ExecutorService接口接口方法的特征,并且新增了newTaskFor方法
2.顶级接口Executor接口的execute方法也通过继承-实现等操作到子类这里,具体实现还是由继承 AbstractExecutorService的子类去进行重写实现

	// 继承 AbstractExecutorService类重写该方法就这样的
	@Override
    public void execute(Runnable command) {}

ScheduledExecutorService接口

ScheduledExecutorService接口继承了ExecutorService 接口,并且新增了关于任务的延迟执行、固定频率执行和固定延迟执行等方法

public interface ScheduledExecutorService extends ExecutorService {

    //单次延迟执行任务:提交一个 Runnable 任务,在指定延迟后 单次执行
    //ScheduledFuture<?>,用于跟踪任务状态或取消任务
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    //提交一个 Callable 任务,在指定延迟后 单次执行,并返回计算结果
    //参数command:待执行的任务 delay:延迟时间 unit:时间单位(如 TimeUnit.SECONDS
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    //提交一个 Runnable 任务,按固定频率 周期性执行
    //参数initialDelay:首次执行的延迟时间 period:连续两次任务开始时间的间隔
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    //提交一个Runnable任务,按固定延迟周期性执行
    //initialDelay:首次执行的延迟时间。delay:上一次任务结束 到下一次任务开始的时间间隔
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

代码示例

public class ScheduledTaskExample {
    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

        // 单次延迟任务
        scheduler.schedule(
            () -> System.out.println("单次延迟任务"),
            2, TimeUnit.SECONDS
        );

        // 固定频率任务
        ScheduledFuture<?> fixedRateFuture = scheduler.scheduleAtFixedRate(
          () -> System.out.println("固定频率任务 - 时间: " + System.currentTimeMillis()),
            1, 3, TimeUnit.SECONDS
        );
        //如果任务抛出未捕获的异常,周期性任务会终止,后续执行不再继续,需在任务内部处理异常
        scheduler.scheduleAtFixedRate(() -> {
    	try {
        // 业务逻辑
    	} catch (Exception e) {
        // 记录日志或恢复状态
         }}, 1, 5, TimeUnit.SECONDS);

        // 固定延迟任务
        ScheduledFuture<?> fixedDelayFuture = scheduler.scheduleWithFixedDelay(
          () -> System.out.println("固定延迟任务 - 时间: " + System.currentTimeMillis()),
            1, 3, TimeUnit.SECONDS
        );

        // 运行 10 秒后关闭
        Thread.sleep(10_000);
        fixedRateFuture.cancel(true);//Future的cancel取消方法
        fixedDelayFuture.cancel(true);
        scheduler.shutdown();//调用shutdown()或shutdownNow()关闭线程池,否则JVM不会退出
    }
}

ThreadPoolExecutor类(线程池核心类)

可以说ThreadPoolExecutor 就是线程池,实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务
在这里插入图片描述
ThreadPoolExecutor类(线程池)介绍使用点击链接
主要就三点
1.ThreadPoolExecutor类线程池的生命周期维护管理
2.ThreadPoolExecutor类线程池对于任务的分配管理
3.ThreadPoolExecutor类线程池对于线程池中的线程生命周期管理

ScheduledThreadPoolExecutor类

1. 核心继承与接口
ScheduledThreadPoolExecutor是Java并发包中的一个线程池实现,用于执行定时或周期性任务。它继承自ThreadPoolExecutor,所以应该具备基本的线程池功能,同时扩展了定时或延迟任务的能力

public class ScheduledThreadPoolExecutor
	//继承自ThreadPoolExecutor:复用线程池的基础功能(如线程管理、任务队列)。
        extends ThreadPoolExecutor
    //实现ScheduledExecutorService:提供定时任务的调度接口(如 schedule, scheduleAtFixedRate)    
        implements ScheduledExecutorService {
}

虽然继承自ThreadPoolExecutor:复用线程池的基础功能,但是有区别
ScheduledThreadPoolExecutor的构造函数最多只有三个参数

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }

少了maximumPoolSize,keepAliveTime,TimeUnit,BlockingQueue workQueue四个参数,原因在于ScheduledThreadPoolExecutor的内部类DelayedWorkQueue队列

2. 核心数据结构
任务队列:DelayedWorkQueue
DelayedWorkQueue专为 ScheduledThreadPoolExecutor 优化使用的类
DelayedWorkQueued介绍点击链接
①DelayedWorkQueue是初始化为16且动态扩容的,是无界的
所以线程池非核心线程不会触发,只会在核心线程满了进入队列然后队列动态扩容,所以关于非核心线程的三个参数都没了,因为无界队列,因此需要主要OOM内存溢出问题
②DelayedWorkQueue是一个基于堆的优先级队列:内部使用数组实现小顶堆,按任务的触发时间(time 字段)排序。我们定时周期或延迟执行的任务会封装进入DelayedWorkQueue,通过delayedExecute(sft)方法提交到队列,所以参数BlockingQueue workQueue也没有了

任务封装:ScheduledFutureTask

private class ScheduledFutureTask<V> 
    extends FutureTask<V> 
    implements RunnableScheduledFuture<V> {
    private long time;          // 任务触发时间(纳秒)
    private final long period;  // 周期(正数:固定速率;负数:固定延迟)
    //...
}

触发时间排序:通过 compareTo 方法实现堆排序。
周期模式:

  • 固定速率(Fixed Rate):基于初始调度时间计算下一次触发时间。
  • 固定延迟(Fixed Delay):基于任务实际完成时间计算下一次触发时间。

3.调度逻辑
①任务提交schedule 方法

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    // 包装任务为 ScheduledFutureTask
    ScheduledFutureTask<Void> sft = new ScheduledFutureTask<>(command, null, 	triggerTime(delay, unit));
    // 提交到队列
    delayedExecute(sft);
    return sft;
}

②delayedExecute 方法

    private void delayedExecute(RunnableScheduledFuture<?> task) {
   		 //如果线程池已关闭,执行拒绝策略。
        if (isShutdown())
            reject(task);
        else {
        	//将任务添加到 DelayedWorkQueue。
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

4.周期性任务处理
run 方法重写(在 ScheduledFutureTask 中):

public void run() {
    if (!isPeriodic()) {
        super.run(); // 一次性任务直接执行
    } else if (runAndReset()) { // 周期性任务执行后重置状态
        // 计算下一次触发时间
        setNextRunTime();
        // 重新加入队列
        reExecutePeriodic(outerTask);
    }
}

固定速率:time += period(基于初始时间)。
固定延迟:time = now() + period(基于当前时间)。

5. 线程池配置

  • 核心线程数固定:默认不回收核心线程(allowCoreThreadTimeOut 默认为 false),确保及时处理定时任务。
  • 最大线程数无效:由于使用无界队列,线程数不会超过核心线程数(除非设置 allowCoreThreadTimeOut == true)

总结关键问题与注意事项
任务堆积与 OOM
风险:DelayedWorkQueue 是无界的,大量提交任务会导致队列膨胀。
解决方案:监控任务提交速率,或自定义有界队列(需处理拒绝策略)。

任务取消与删除
remove 方法:直接从队列删除任务,但时间复杂度为 O(n)(堆结构导致低效)。
purge 方法:清理所有已取消的任务,减少内存占用。(父类ThreadPoolExecutor里面的方法)

异常处理
静默吞噬异常:任务执行抛异常后,后续周期任务不再执行(需在任务内捕获异常)


http://www.kler.cn/a/588465.html

相关文章:

  • 深入解析 Vue 3 Teleport:原理、应用与最佳实践
  • 使用Inno Setup将Unity程序打成一个安装包
  • Native层逆向:ARM汇编与JNI调用分析
  • node.js-WebScoket心跳机制(服务器定时发送数据,检测连接状态,重连)
  • 游戏成瘾与学习动力激发策略研究——自我效能理论
  • 深入理解Linux网络随笔(七):容器网络虚拟化--Veth设备对
  • 基于javaweb的SSM+Maven网上选课管理系统设计与实现(源码+文档+部署讲解)
  • JavaScript性能优化的12种方式
  • Function 和 Consumer函数式接口
  • Ubuntu docker镜像恢复至原始文件
  • React使用路由表
  • 使用GoldenGate完成SQLserver到Oracle的数据实时同步
  • Django项目之订单管理part3
  • Markdig:强大的 .NET Markdown 解析器详解
  • 【AI时代移动端安全开发实战:从基础防护到智能应用】
  • 责任链模式:优雅处理请求的设计艺术
  • k8s 网络基础解析
  • 织梦dedecmsV5.7提示信息提示框美化(带安装教程和效果展示)
  • python中print函数的flush如何使用
  • kubernetes|云原生|部署单master的kubernetes 1.25.5版本集群完全记录(使用contained 运行时)