当前位置: 首页 > article >正文

ScheduledThreadPoolExecutor 延迟任务执行原理以及小顶堆的应用(源码)

目录

  • 主要特点
  • 源码解读
    • (1)初始化
    • (2)创建工作线程并启动
    • (3)线程调度和执行任务
        • 主要流程
        • 任务封装 `ScheduledFutureTask`
        • 延时工作队列 DelayedWorkQueue
          • 初始化
          • 添加任务 offer
          • 获取任务 poll / take
  • 小顶堆是什么?
    • 概念
    • 堆的典型操作
    • 常用计算公式
    • 典型应用场景

ScheduledThreadPoolExecutor 支持以下任务,

(1)延迟执行;

(2)固定速率(scheduleAtFixedRate);

(3)固定延迟(scheduleWithFixedDelay);

主要特点

(1)基于 ThreadPoolExecutor 扩展,复用线程资源;

(2)工作队列使用 延迟队列**(DelayedWorkQueue)**:基于二叉堆(最小堆)的无界优先级队列,按任务到期时间排序;

(2)任务封装**(ScheduledFutureTask)**:

封装 RunnableCallable,记录下次执行时间(time)、周期(period)等。

重写 run() 方法,任务重新入队,实现周期性任务的重调度逻辑**;**

本文将对源码进行解读,指导 ScheduledThreadPoolExecutor 线程池的使用,

由于 ScheduledThreadPoolExecutor 是基于 ThreadPoolExecutor 线程池扩展实现的,核心线程调度和执行逻辑依然由 ThreadPoolExecutor 实现,需要先了解 ThreadPoolExecutor 的运行机制:Java线程池底层是怎么创建和运行的?(源码阅读)

源码解读

(1)初始化

这里使用 Executors 创建 ScheduledThreadPoolExecutor 线程池,指定 核心线程数 corePoolSize

private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

// java.util.concurrent.Executors
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

进入构造函数,注意此处最大线程数默认为 Integer.MAX_VALUE,该参数实际无作用。

因为工作队列使用的是 DelayedWorkQueue 无界队列,是一个基于二叉堆结构的****优先级队列,用于管理定时任务,可以自动扩容不会被填满,因此线程数会始终保持 corePoolSizekeepAliveTime 也默认设置为0即可。

// ScheduledThreadPoolExecutor.class

// 构造函数
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
           // 使用的工作队列是内部类 ScheduledThreadPoolExecutor.DelayedWorkQueue
          new DelayedWorkQueue());
}

// 实际是进入父类 java.util.concurrent.ThreadPoolExecutor 构造器,进行线程池初始化
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

(2)创建工作线程并启动

execute/submit 提交任务,主要流程:

(1)将任务ScheduledFutureTask加入工作队列 DelayedWorkQueue 中;

(2)若工作线程数小于 corePoolSize ,创建新工作线程并启动;(ensurePrestart方法)

// 重写了execute方法, 实际调用 schedule 方法
public void execute(Runnable command) {
    schedule(command, 0, NANOSECONDS);
}

// 重写了submit方法, 实际调用 schedule 方法
public <T> Future<T> submit(Callable<T> task) {
    return schedule(task, 0, NANOSECONDS);
}

// 封装 Runnable 任务并提交工作队列
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    // Runnable 包装为 ScheduledThreadPoolExecutor.ScheduledFutureTask 任务,无返回值
    RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
    // 提交延时任务
    delayedExecute(t);
    return t;
}

// 封装 Callable 任务并提交工作队列
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    // Callable 包装为 ScheduledThreadPoolExecutor.ScheduledFutureTask 任务,带返回值
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable,
                                   triggerTime(delay, unit)));
    // 提交延时任务
    delayedExecute(t);
    return t;
}

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        // 若线程池已关闭, 执行拒绝策略 - 父类 ThreadPoolExecutor 的方法
        reject(task);
    else {
        // 将任务加入工作队列
        super.getQueue().add(task);
        // ..
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            // 小于核心线程数,创建新的工作线程
            ensurePrestart();
    }
}

父类 java.util.concurrent.ThreadPoolExecutor 中用到的方法,

// 根据ctl判断非运行状态
public boolean isShutdown() {
    return ! isRunning(ctl.get());
}

// 执行拒绝策略
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

// 小于核心线程数,新增工作线程
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

(3)线程调度和执行任务

主要流程

(1)工作线程仍然是调用 ThreadPoolExecutor.runWorker 方法实现任务调度执行,详见 Java线程 ThreadPoolExecutor 源码部分 -(3);

(2)区别在于工作队列为 DelayedWorkQueue ,获取任务需要达到 执行时间 ScheduledFutureTask.time;

(3)任务ScheduledFutureTask若为周期性任务,执行完后,通过运行周期ScheduledFutureTask.period, 计算并设置下次执行时间ScheduledFutureTask.time并重新加入工作队列,实现周期性执行。

任务封装 ScheduledFutureTask

首先看看,内部任务类 ScheduledThreadPoolExecutor.ScheduledFutureTask

用于封装需要定时或周期性执行的任务。它实现了 RunnableScheduledFuture 接口,结合了 Runnable(可执行任务)和 Future(异步计算结果)的特性,同时支持任务调度逻辑。

// 内部类 ScheduledFutureTask,表示定时任务
private class ScheduledFutureTask<V>
    extends FutureTask<V> implements RunnableScheduledFuture<V> {
    
    // 触发时间 nanos(纳秒)
    private long time;

    // 任务提交时的序号,用于在多个任务具有相同 time 时定义执行顺序(先进先出),在下面CompareTo方法中有用
    private final long sequenceNumber;

    // 任务的周期时间(纳秒)
    private final long period;

    /** The actual task to be re-enqueued by reExecutePeriodic */
    RunnableScheduledFuture<V> outerTask = this;

    // 任务在 DelayedWorkQueue 堆数组中的下标位置,后续进/出 工作队列时会更新
    int heapIndex;

    // 构造器,上面schedule()方法中,包装Runnable/Callable时使用到了
    ScheduledFutureTask(Runnable r, V result, long ns) {
        super(r, result);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }
    
    // run() 实现
    public void run() {
        boolean periodic = isPeriodic();
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
        else if (!periodic)
            // (1)非周期一次性任务:直接执行
            ScheduledFutureTask.super.run();
        // (2)周期性任务:执行任务并重置任务状态,设置下次执行时间并重新加入工作队列
        else if (ScheduledFutureTask.super.runAndReset()) {
            // 设置下次执行的时间
            setNextRunTime();
            // 任务重新加入工作队列
            reExecutePeriodic(outerTask);
        }
    }

    /**
     * 是否是周期性任务
     * Returns {@code true} if this is a periodic (not a one-shot) action.
     */
    public boolean isPeriodic() {
        return period != 0;
    }

    /**
     * 设置下次执行时间
    */
    private void setNextRunTime() {
        long p = period;
        if (p > 0)
            time += p;
        else
            time = triggerTime(-p);
    }

    // 实现 Delayed 接口, 返回当前任务剩余延迟时长
    public long getDelay(TimeUnit unit) {
        return unit.convert(time - now(), NANOSECONDS);
    }

    // 实现任务优先级比较方法:
    //     (1)首先按 time 排序(执行时间早的任务优先)。
    //     (2)如果 time 相同,则按 sequenceNumber 排序(先提交的任务优先)
    public int compareTo(Delayed other) {
        if (other == this) // compare zero if same object
            return 0;
        if (other instanceof ScheduledFutureTask) {
            ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
            else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }
        long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
        return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    }
}
延时工作队列 DelayedWorkQueue
初始化

数据结构:基于数组的小顶堆。(小顶堆 知识点温习 见文末)

通过最小堆****管理任务顺序,使得任务按照执行时间先后执行,插入/删除的 ***O(***log n) 复杂度 保证了高吞吐场景下的性能;

static class DelayedWorkQueue extends AbstractQueue<Runnable>
    implements BlockingQueue<Runnable> {

    // 数组存储,初始长度 16,表示一个 “基于数组的最小堆”
    private static final int INITIAL_CAPACITY = 16;
    private RunnableScheduledFuture<?>[] queue =
        new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
    private final ReentrantLock lock = new ReentrantLock();
    private int size = 0;

    
    private Thread leader = null;

    // 当队列为空时,工作线程通过 available.await() 等待;
    // 添加新任务时,通过 available.signal() 唤醒等待线程
    private final Condition available = lock.newCondition();
    
    ...
}
添加任务 offer
/**
     * 添加任务 (核心)
    **/
    public boolean offer(Runnable x) {
        if (x == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = size;
            // 容量不足, 扩容 50%
            if (i >= queue.length)
                grow();
            size = i + 1;
            if (i == 0) {
                // 首个元素直接加入
                queue[0] = e;
                setIndex(e, 0);
            } else {
                // 根据 ScheduledFutureTask.compareTo 大小关系构造小顶堆
                siftUp(i, e);
            }
            // 若更新了堆顶元素 queue[0] ,唤醒 available 上所有等待执行任务的工作线程Worker
            if (queue[0] == e) {
                leader = null;
                available.signal();
            }
        } finally {
            lock.unlock();
        }
        return true;
    }

    public boolean add(Runnable e) {
        return offer(e);
    }

    public boolean offer(Runnable e, long timeout, TimeUnit unit) {
        return offer(e);
    }

    /**
         * 扩容 50%
         */
    private void grow() {
        int oldCapacity = queue.length;
        int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
        if (newCapacity < 0) // overflow
            newCapacity = Integer.MAX_VALUE;
        queue = Arrays.copyOf(queue, newCapacity);
    }

    /**
     * 更新 ScheduledFutureTask.heapIndex , 存储在堆数组的下标
    */
    private void setIndex(RunnableScheduledFuture<?> f, int idx) {
        if (f instanceof ScheduledFutureTask)
            ((ScheduledFutureTask)f).heapIndex = idx;
    }

    /**
     * 根据任务的CompareTo优先级关系,构造小顶堆。即优先执行的任务越靠前。
    */
    private void siftUp(int k, RunnableScheduledFuture<?> key) {
        while (k > 0) {
            // 获取父节点下标 = (k - 1) / 2
            int parent = (k - 1) >>> 1;
            RunnableScheduledFuture<?> e = queue[parent];
            // 若大于父节点,符合小顶堆特点,直接返回
            if (key.compareTo(e) >= 0)
                break;
            // 若小于父节点,替换父节点,继续构造小顶堆
            queue[k] = e;
            setIndex(e, k);
            k = parent;
        } 
        // 设置下标位置到 ScheduledFutureTask 任务的 heapIndex 中 
        queue[k] = key;
        setIndex(key, k);
    }
获取任务 poll / take

poll(long timeout, TimeUnit unit) 非阻塞获取任务,允许超时;

    /**
     * 限时 获取堆顶元素
    **/
    public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
        throws InterruptedException {
        // 等待超时时间 nanos
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 获取堆顶任务 queue[0]
                RunnableScheduledFuture<?> first = queue[0];
                // 当前没有任务,阻塞等待超时时间 - available 条件变量阻塞
                if (first == null) {
                    if (nanos <= 0)
                        return null;
                    else
                        nanos = available.awaitNanos(nanos);
                } else {
                    // 当前有任务
                    long delay = first.getDelay(NANOSECONDS);
                    
                    // 任务到达执行时间 time,finishPoll 取出堆顶任务后调整堆结构
                    if (delay <= 0)
                        return finishPoll(first);
                    
                    // 任务还未到执行时间time,    
                    if (nanos <= 0)
                        return null;
                    first = null; // don't retain ref while waiting
                    // 超时时间不足以等待任务剩余延迟时间 delay
                    // 或者有其他工作线程 leader 在等待执行堆顶任务
                    // 直接等待超时时间结束
                    if (nanos < delay || leader != null)
                        nanos = available.awaitNanos(nanos);
                    else {
                        // 当前工作线程设置为leader,阻塞等待剩余延迟时间 delay,准备执行任务
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            long timeLeft = available.awaitNanos(delay);
                            nanos -= delay - timeLeft;
                        } finally {
                            // 等待结束,移除 leader 身份标记
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // 堆顶任务不为空,且没有 leader 工作线程在等待执行,唤醒所有工作线程
            if (leader == null && queue[0] != null)
                available.signal();
            lock.unlock();
        }
    }
    
    /**
         * Performs common bookkeeping for poll and take: Replaces
         * first element with last and sifts it down.  Call only when
         * holding lock.
         * @param f the task to remove and return
         */
    private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
        // 更新 size - 1
        int s = --size;
        // 拿出最后一个元素 x(优先级最低-即执行时间time最晚的任务)
        RunnableScheduledFuture<?> x = queue[s];
        // 将最后一个元素的位置置空
        queue[s] = null;
        // 剩余元素数量不为0,重新调整堆
        if (s != 0)
            siftDown(0, x);
        setIndex(f, -1);
        return f;
    }
    
    // 取出堆顶任务后,重新调整小顶堆
    private void siftDown(int k, RunnableScheduledFuture<?> key) {
        int half = size >>> 1;
        while (k < half) {
            // k的左子节点下标 = (k * 2) + 1
            int child = (k << 1) + 1;
            RunnableScheduledFuture<?> c = queue[child];
            // k的右子节点下标 = (k * 2) + 2
            int right = child + 1;
            // 取出左右子节点中较小任务
            if (right < size && c.compareTo(queue[right]) > 0)
                c = queue[child = right];
            // 若 待插入任务 key 小于左右子节点,表示找到了该插入的位置,直接退出
            if (key.compareTo(c) <= 0)
                break;
            // 否则将较小子节点移动到父节点,继续往下查找
            queue[k] = c;
            setIndex(c, k);
            k = child;
        }
        // 设置下标位置到 ScheduledFutureTask 任务的 heapIndex 中 
        queue[k] = key;
        setIndex(key, k);
    }
    
    // 非等待获取堆顶任务
    public RunnableScheduledFuture<?> poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                return finishPoll(first);
        } finally {
            lock.unlock();
        }
    }

take () 阻塞获取堆顶任务(直到任务到期或线程中断)

    // 阻塞获取堆顶任务(直到任务到期或线程中断)
    public RunnableScheduledFuture<?> take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                RunnableScheduledFuture<?> first = queue[0];
                if (first == null)
                    // 没有任务,阻塞等待,直到堆顶 queue[0] 更新了新任务被唤醒
                    available.await();
                else {
                    // 到达了任务执行时间,直接返回任务
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return finishPoll(first);
                    first = null; // don't retain ref while waiting
                    // leader 不为空,证明有其他工作线程在等待执行了,继续阻塞
                    if (leader != null)
                        available.await();
                    else {
                        // 没有leader,当前工作线程作为leader,限时阻塞等待delay时间,准备执行任务
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && queue[0] != null)
                available.signal();
            lock.unlock();
        }
    }

小顶堆是什么?

概念

(1) 什么是堆?堆是一种完全二叉树。

(2) 什么是完全二叉树?完全二叉树表示除最后一层外,其余各层的节点数都达到最大值(即满的)的二叉树。

       1
     /   \
    2     3        
   / \   /    
  4  5  6            <-- 最后一层可以不满,但必须都靠左。可用数组表示: [123456]
  
       1
     /   \
    2     3        
   / \      \  
  4  5       6      <-- 最后一层非都靠左,不是完全二叉树

完全二叉树的节点按层次顺序紧密排列,适合用数组高效存储和遍历

(3) 什么是最大堆? 最小堆?

  • 最大堆(大顶堆):每个父节点的值 ≥ 其子节点的值(根节点是最大值)。
  • 最小堆**(小顶堆)**:每个父节点的值 ≤ 其子节点的值(根节点是最小值)。
  • 二叉搜索树BST)的区别:
    • BST:左子树节点 < 父节点 < 右子树节点,用于快速查找。
    • :仅保证父节点与子节点的大小关系,不维护左右子树的顺序。堆的用途是快速获取极值(最大值或最小值)。

堆的典型操作

(1)插入(Insert):将新元素放到末尾,通过“上浮”(与父节点比较并交换)调整位置。

(2)删除根节点(Extract-Max/Min):移除根节点,将末尾元素移到根位置,通过“下沉”(与子节点比较并交换)调整。

(3)时间复杂度:插入和删除均为 **O(**log n),获取极值(即根节点)为 O(1)

常用计算公式

(1)高度公式:h=logN + 1,其中 N 是节点总数;

(2)完全二叉树可以用 数组 高效存储(起始下标0):

  • 计算子节点索引:若父节点索引为 i,则左子节点为 2i+1,右子节点为 2i+2;
  • 计算父节点索引:若子节点索引为 k,父节点索引为 (k-1)/2;

典型应用场景

(1)优先队列:任务调度、Dijkstra最短路径算法。

(2)堆排序:通过反复提取极值实现排序(时间复杂度 O(n log n))。

(3)动态极值维护:如实时统计流数据中的Top K元素。


http://www.kler.cn/a/598088.html

相关文章:

  • 运维智能体的可行性研究
  • 图解AUTOSAR_SWS_IPDUMultiplexer
  • 多个内容滑动轮播图【前端】
  • 算法训练营第二十二天 | 回溯算法(四)
  • A2O MAY首支单曲《Under My Skin(A2O)》成功打入美国“MediaBase主流电台榜单”,中国女团首次登榜
  • C#控制台应用程序学习——3.23
  • nacos-actuator漏洞
  • 国产芯片解析:沁恒USB PD无线充电功率芯片新势力:CH271与CH275
  • 【Go】Go语言结构体笔记
  • 自定义mavlink 生成wireshark wlua插件错误(已解决)
  • HTTP长连接与短连接的前世今生
  • 2025年01月02日浙江鼎永前端面试
  • 大模型RLHF训练-PPO算法详解:Proximal Policy Optimization Algorithms
  • Linux shell脚本2-test条件测试语句:文件类型、字符串是否相等、数字大小比较、多重条件判断,测试语句验证
  • Xss Game1-8关通关
  • IM 基于 WebRtc 视频通信功能
  • Mongodb分片模式部署
  • CATIA二次开发:基于牛顿迭代法的参数化衰减球体生成系统
  • 【Flask公网部署】采用Nginx+gunicorn解决Flask框架静态资源无法加载的问题
  • ECMAScript、DOM和BOM是个啥(通俗地来讲)