ScheduledThreadPoolExecutor 延迟任务执行原理以及小顶堆的应用(源码)
目录
- 主要特点
- 源码解读
- (1)初始化
- (2)创建工作线程并启动
- (3)线程调度和执行任务
- 主要流程
- 任务封装 `ScheduledFutureTask`
- 延时工作队列 DelayedWorkQueue
- 初始化
- 添加任务 offer
- 获取任务 poll / take
- 小顶堆是什么?
- 概念
- 堆的典型操作
- 常用计算公式
- 典型应用场景
ScheduledThreadPoolExecutor 支持以下任务,
(1)延迟执行;
(2)固定速率(scheduleAtFixedRate
);
(3)固定延迟(scheduleWithFixedDelay
);
主要特点
(1)基于 ThreadPoolExecutor
扩展,复用线程资源;
(2)工作队列使用 延迟队列**(DelayedWorkQueue
)**:基于二叉堆(最小堆)的无界优先级队列,按任务到期时间排序;
(2)任务封装**(ScheduledFutureTask
)**:
封装 Runnable
或 Callable
,记录下次执行时间(time
)、周期(period
)等。
重写 run()
方法,任务重新入队,实现周期性任务的重调度逻辑**;**
本文将对源码进行解读,指导 ScheduledThreadPoolExecutor 线程池的使用,
由于 ScheduledThreadPoolExecutor 是基于 ThreadPoolExecutor 线程池扩展实现的,核心线程调度和执行逻辑依然由 ThreadPoolExecutor 实现,需要先了解 ThreadPoolExecutor 的运行机制:Java线程池底层是怎么创建和运行的?(源码阅读)
源码解读
(1)初始化
这里使用 Executors 创建 ScheduledThreadPoolExecutor 线程池,指定 核心线程数 corePoolSize
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// java.util.concurrent.Executors
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
进入构造函数,注意此处最大线程数默认为 Integer.MAX_VALUE,该参数实际无作用。
因为工作队列使用的是 DelayedWorkQueue 无界队列,是一个基于二叉堆结构的****优先级队列,用于管理定时任务,可以自动扩容不会被填满,因此线程数会始终保持 corePoolSize
。keepAliveTime
也默认设置为0即可。
// ScheduledThreadPoolExecutor.class
// 构造函数
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
// 使用的工作队列是内部类 ScheduledThreadPoolExecutor.DelayedWorkQueue
new DelayedWorkQueue());
}
// 实际是进入父类 java.util.concurrent.ThreadPoolExecutor 构造器,进行线程池初始化
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
(2)创建工作线程并启动
execute/submit
提交任务,主要流程:
(1)将任务ScheduledFutureTask
加入工作队列 DelayedWorkQueue 中;
(2)若工作线程数小于 corePoolSize
,创建新工作线程并启动;(ensurePrestart
方法)
// 重写了execute方法, 实际调用 schedule 方法
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
// 重写了submit方法, 实际调用 schedule 方法
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}
// 封装 Runnable 任务并提交工作队列
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
// Runnable 包装为 ScheduledThreadPoolExecutor.ScheduledFutureTask 任务,无返回值
RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
// 提交延时任务
delayedExecute(t);
return t;
}
// 封装 Callable 任务并提交工作队列
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
// Callable 包装为 ScheduledThreadPoolExecutor.ScheduledFutureTask 任务,带返回值
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
// 提交延时任务
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
// 若线程池已关闭, 执行拒绝策略 - 父类 ThreadPoolExecutor 的方法
reject(task);
else {
// 将任务加入工作队列
super.getQueue().add(task);
// ..
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 小于核心线程数,创建新的工作线程
ensurePrestart();
}
}
父类 java.util.concurrent.ThreadPoolExecutor
中用到的方法,
// 根据ctl判断非运行状态
public boolean isShutdown() {
return ! isRunning(ctl.get());
}
// 执行拒绝策略
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
// 小于核心线程数,新增工作线程
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
(3)线程调度和执行任务
主要流程
(1)工作线程仍然是调用 ThreadPoolExecutor.runWorker
方法实现任务调度执行,详见 Java线程 ThreadPoolExecutor
源码部分 -(3);
(2)区别在于工作队列为 DelayedWorkQueue ,获取任务需要达到 执行时间 ScheduledFutureTask.time
;
(3)任务ScheduledFutureTask
若为周期性任务,执行完后,通过运行周期ScheduledFutureTask.period
, 计算并设置下次执行时间ScheduledFutureTask.time
并重新加入工作队列,实现周期性执行。
任务封装 ScheduledFutureTask
首先看看,内部任务类 ScheduledThreadPoolExecutor.ScheduledFutureTask
。
用于封装需要定时或周期性执行的任务。它实现了 RunnableScheduledFuture
接口,结合了 Runnable
(可执行任务)和 Future
(异步计算结果)的特性,同时支持任务调度逻辑。
// 内部类 ScheduledFutureTask,表示定时任务
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
// 触发时间 nanos(纳秒)
private long time;
// 任务提交时的序号,用于在多个任务具有相同 time 时定义执行顺序(先进先出),在下面CompareTo方法中有用
private final long sequenceNumber;
// 任务的周期时间(纳秒)
private final long period;
/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture<V> outerTask = this;
// 任务在 DelayedWorkQueue 堆数组中的下标位置,后续进/出 工作队列时会更新
int heapIndex;
// 构造器,上面schedule()方法中,包装Runnable/Callable时使用到了
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
// run() 实现
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
// (1)非周期一次性任务:直接执行
ScheduledFutureTask.super.run();
// (2)周期性任务:执行任务并重置任务状态,设置下次执行时间并重新加入工作队列
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置下次执行的时间
setNextRunTime();
// 任务重新加入工作队列
reExecutePeriodic(outerTask);
}
}
/**
* 是否是周期性任务
* Returns {@code true} if this is a periodic (not a one-shot) action.
*/
public boolean isPeriodic() {
return period != 0;
}
/**
* 设置下次执行时间
*/
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
// 实现 Delayed 接口, 返回当前任务剩余延迟时长
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
// 实现任务优先级比较方法:
// (1)首先按 time 排序(执行时间早的任务优先)。
// (2)如果 time 相同,则按 sequenceNumber 排序(先提交的任务优先)
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
}
延时工作队列 DelayedWorkQueue
初始化
数据结构:基于数组的小顶堆。(小顶堆 知识点温习 见文末)
通过最小堆****管理任务顺序,使得任务按照执行时间先后执行,插入/删除的 ***O(***log n) 复杂度 保证了高吞吐场景下的性能;
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
// 数组存储,初始长度 16,表示一个 “基于数组的最小堆”
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
private Thread leader = null;
// 当队列为空时,工作线程通过 available.await() 等待;
// 添加新任务时,通过 available.signal() 唤醒等待线程
private final Condition available = lock.newCondition();
...
}
添加任务 offer
/**
* 添加任务 (核心)
**/
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// 容量不足, 扩容 50%
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
// 首个元素直接加入
queue[0] = e;
setIndex(e, 0);
} else {
// 根据 ScheduledFutureTask.compareTo 大小关系构造小顶堆
siftUp(i, e);
}
// 若更新了堆顶元素 queue[0] ,唤醒 available 上所有等待执行任务的工作线程Worker
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable e, long timeout, TimeUnit unit) {
return offer(e);
}
/**
* 扩容 50%
*/
private void grow() {
int oldCapacity = queue.length;
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
queue = Arrays.copyOf(queue, newCapacity);
}
/**
* 更新 ScheduledFutureTask.heapIndex , 存储在堆数组的下标
*/
private void setIndex(RunnableScheduledFuture<?> f, int idx) {
if (f instanceof ScheduledFutureTask)
((ScheduledFutureTask)f).heapIndex = idx;
}
/**
* 根据任务的CompareTo优先级关系,构造小顶堆。即优先执行的任务越靠前。
*/
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
// 获取父节点下标 = (k - 1) / 2
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
// 若大于父节点,符合小顶堆特点,直接返回
if (key.compareTo(e) >= 0)
break;
// 若小于父节点,替换父节点,继续构造小顶堆
queue[k] = e;
setIndex(e, k);
k = parent;
}
// 设置下标位置到 ScheduledFutureTask 任务的 heapIndex 中
queue[k] = key;
setIndex(key, k);
}
获取任务 poll / take
poll(long timeout, TimeUnit unit) 非阻塞获取任务,允许超时;
/**
* 限时 获取堆顶元素
**/
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
// 等待超时时间 nanos
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 获取堆顶任务 queue[0]
RunnableScheduledFuture<?> first = queue[0];
// 当前没有任务,阻塞等待超时时间 - available 条件变量阻塞
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
// 当前有任务
long delay = first.getDelay(NANOSECONDS);
// 任务到达执行时间 time,finishPoll 取出堆顶任务后调整堆结构
if (delay <= 0)
return finishPoll(first);
// 任务还未到执行时间time,
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
// 超时时间不足以等待任务剩余延迟时间 delay
// 或者有其他工作线程 leader 在等待执行堆顶任务
// 直接等待超时时间结束
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
// 当前工作线程设置为leader,阻塞等待剩余延迟时间 delay,准备执行任务
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
// 等待结束,移除 leader 身份标记
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 堆顶任务不为空,且没有 leader 工作线程在等待执行,唤醒所有工作线程
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
/**
* Performs common bookkeeping for poll and take: Replaces
* first element with last and sifts it down. Call only when
* holding lock.
* @param f the task to remove and return
*/
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
// 更新 size - 1
int s = --size;
// 拿出最后一个元素 x(优先级最低-即执行时间time最晚的任务)
RunnableScheduledFuture<?> x = queue[s];
// 将最后一个元素的位置置空
queue[s] = null;
// 剩余元素数量不为0,重新调整堆
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
// 取出堆顶任务后,重新调整小顶堆
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
while (k < half) {
// k的左子节点下标 = (k * 2) + 1
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
// k的右子节点下标 = (k * 2) + 2
int right = child + 1;
// 取出左右子节点中较小任务
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
// 若 待插入任务 key 小于左右子节点,表示找到了该插入的位置,直接退出
if (key.compareTo(c) <= 0)
break;
// 否则将较小子节点移动到父节点,继续往下查找
queue[k] = c;
setIndex(c, k);
k = child;
}
// 设置下标位置到 ScheduledFutureTask 任务的 heapIndex 中
queue[k] = key;
setIndex(key, k);
}
// 非等待获取堆顶任务
public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture<?> first = queue[0];
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return finishPoll(first);
} finally {
lock.unlock();
}
}
take () 阻塞获取堆顶任务(直到任务到期或线程中断)
// 阻塞获取堆顶任务(直到任务到期或线程中断)
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
// 没有任务,阻塞等待,直到堆顶 queue[0] 更新了新任务被唤醒
available.await();
else {
// 到达了任务执行时间,直接返回任务
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
// leader 不为空,证明有其他工作线程在等待执行了,继续阻塞
if (leader != null)
available.await();
else {
// 没有leader,当前工作线程作为leader,限时阻塞等待delay时间,准备执行任务
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
小顶堆是什么?
概念
(1) 什么是堆?堆是一种完全二叉树。
(2) 什么是完全二叉树?完全二叉树表示除最后一层外,其余各层的节点数都达到最大值(即满的)的二叉树。
1
/ \
2 3
/ \ /
4 5 6 <-- 最后一层可以不满,但必须都靠左。可用数组表示: [1,2,3,4,5,6]
1
/ \
2 3
/ \ \
4 5 6 <-- 最后一层非都靠左,不是完全二叉树
完全二叉树的节点按层次顺序紧密排列,适合用数组高效存储和遍历。
(3) 什么是最大堆? 最小堆?
- 最大堆(大顶堆):每个父节点的值 ≥ 其子节点的值(根节点是最大值)。
- 最小堆**(小顶堆)**:每个父节点的值 ≤ 其子节点的值(根节点是最小值)。
- 与二叉搜索树(BST)的区别:
- BST:左子树节点 < 父节点 < 右子树节点,用于快速查找。
- 堆:仅保证父节点与子节点的大小关系,不维护左右子树的顺序。堆的用途是快速获取极值(最大值或最小值)。
堆的典型操作
(1)插入(Insert):将新元素放到末尾,通过“上浮”(与父节点比较并交换)调整位置。
(2)删除根节点(Extract-Max/Min):移除根节点,将末尾元素移到根位置,通过“下沉”(与子节点比较并交换)调整。
(3)时间复杂度:插入和删除均为 **O(**log n),获取极值(即根节点)为 O(1)。
常用计算公式
(1)高度公式:h=logN + 1,其中 N 是节点总数;
(2)完全二叉树可以用 数组 高效存储(起始下标0):
- 计算子节点索引:若父节点索引为 i,则左子节点为 2i+1,右子节点为 2i+2;
- 计算父节点索引:若子节点索引为 k,父节点索引为 (k-1)/2;
典型应用场景
(1)优先队列:任务调度、Dijkstra最短路径算法。
(2)堆排序:通过反复提取极值实现排序(时间复杂度 O(n log n))。
(3)动态极值维护:如实时统计流数据中的Top K元素。