Java定时任务实现方案(五)——时间轮
时间轮
这篇笔记,我们要来介绍实现Java定时任务的第五个方案,使用时间轮,以及该方案的优点和缺点。
时间轮是一种高效的定时任务调度算法,特别适用于大量定时任务的场景。时间轮的定时任务实现,可以使用DelayQueue作为基础。
在使用时间轮算法之前,我要来简单介绍一下时间轮的一些概念,便于大家理解。
我们可以把时间轮想象成一个时钟,这个时钟被划分为12个格子,每个格子代表一段时间间隔,我们假设是1000ms(1s),每个格子里存放着这个时间段内需要执行的所有定时任务。时钟上有一根指针,当指针指向哪个格子时,格子内的定时任务就可以开始执行或者准备执行了,每过一个时间间隔,指针就向前移动一格,执行下一个时间段的定时任务。
在我们上面的举例当中,12个格子叫做时间槽,时间轮可以被划分为多个固定大小的时间槽,每一个时间槽代表一个时间段;时钟上的指针,用来指示当前需要执行定时任务的时间槽;
我们日常中的时钟,是有三个指针的,我们的时间轮也可以拓展成多级时间轮,支持更长时间的定时任务调度。
实现
1.单个时间槽的实现
因为我们要使用DelayQueue作为基础实现时间轮,所以我们首先要有一个实现了Delay接口的类来承接我们的单个定时任务,如果对如何使用DelayQueue不了解的,可以去看一下我的另一篇关于使用DelayQueue实现定时任务的小作文哦。
private static class TimerTask implements Delayed {
private final Runnable task;
private final long expiration;
public TimerTask(Runnable task, long expiration) {
this.task = task;
this.expiration = expiration;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expiration - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.expiration, ((TimerTask) o).expiration);
}
public void run() {
task.run();
}
}
接着,因为我们的时间轮是处理一个时间槽内的一批定时任务,所以我们还需要一个存储一个时间槽内所有定时任务的集合类,或者说一个逻辑上的时间槽任务类。
这个逻辑时间槽任务类,我们可以把时间槽也当成一个定时任务,存放在时间轮中,因此,也要实现Delay接口,任务执行的时间,就是这个时间槽的表示的时间段的起始时间。
在这个时间槽类中,我们其实就是使用DelayQueue来存取我们单个的定时任务,说白了,就是将DelayQueue实现定时任务的方法进行封装,我们要对外暴露添加任务和执行任务的方法,为了能够实现时间槽的复用,当时间槽中的定时任务清空之后,我们要重置这个时间槽的时间。
/**
* TimerTaskList类实现了Delayed接口,用于管理一组具有延迟执行需求的任务
*
*/
private static class TimerTaskList implements Delayed {
// 任务的过期时间,即任务应该被执行的时间点
private long expiration;
private List<TimerTask> tasks;
// 使用DelayQueue来存储具有延迟执行需求的TimerTask对象
private DelayQueue<TimerTask> queue = new DelayQueue<>();
// ExecutorService用于执行任务,它是在类初始化时通过构造函数传入的
private final ExecutorService executorService;
/**
* 构造函数,初始化ExecutorService
*
* @param executorService 用于执行任务的线程池
*/
public TimerTaskList(ExecutorService executorService) {
this.executorService = executorService;
this.tasks = new ArrayList<>();
}
/**
* 向队列中添加一个新的TimerTask任务
*
* @param task 要添加的TimerTask对象
*/
public void addTask(TimerTask task) {
tasks.add(task);
queue.offer(task);
}
/**
* 设置任务的过期时间
* 只有当expiration尚未设置(即值为0)时,才更新expiration值
*
* @param expiration 任务的过期时间
* @return 如果expiration成功设置,则返回true;否则返回false
*/
public boolean setExpiration(long expiration) {
if (this.expiration == 0) {
this.expiration = expiration;
return true;
}
return false;
}
/**
* 清除所有任务并重置过期时间
*
* 本方法旨在清除所有当前持有的任务,并将过期时间重置为0
* 这在需要重新初始化或清理资源时特别有用
*/
public void clearTasks(){
// 清除所有任务
tasks.clear();
// 重置过期时间为0,表示没有过期时间
expiration = 0;
}
public List<TimerTask> getTasks(){
return tasks;
}
/**
* 执行所有任务
*
* 此方法遍历任务列表,并依次执行每个任务的方法run
* 在所有任务执行完毕后,调用clearTasks方法清除任务列表
*/
public void executeTasks(){
// 遍历任务列表
for(TimerTask task:tasks){
// 执行任务的run方法
task.run();
}
// 所有任务执行完毕后,清除任务列表
clearTasks();
}
/**
* 执行队列中的所有任务
* 如果队列不为空,则通过executorService执行每个任务
* 在所有任务执行完毕后,清除expiration值
*/
public void run() {
if (!queue.isEmpty()) {
executorService.execute(() -> {
while (!queue.isEmpty()) {
try {
queue.take().run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
clearExpiration();
});
}
}
/**
* 获取当前设置的过期时间
*
* @return 当前的expiration值
*/
public long getExpiration() {
return expiration;
}
/**
* 清除过期时间设置,将expiration重置为0
*/
public void clearExpiration() {
expiration = 0;
}
/**
* 实现Delayed接口的getDelay方法
* 计算当前时间与过期时间之间的差值,以确定延迟时间
*
* @param unit 时间单位
* @return 剩余的延迟时间,以指定的时间单位表示
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expiration - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 实现Delayed接口的compareTo方法
* 用于比较两个TimerTaskList对象的过期时间
*
* @param o 另一个Delayed对象
* @return 如果当前对象的过期时间小于、等于或大于参数对象的过期时间,则分别返回负数、零或正数
*/
@Override
public int compareTo(Delayed o) {
return Long.compare(this.expiration, ((TimerTaskList) o).expiration);
}
}
2.将时间槽组成时间轮
完成了单个时间槽的实现之后,剩下的就简单很多了,将上面的两个类作为时间轮类的内部类,将时间槽作为时间轮的一个定时任务来组成我们的时间轮。根据我们前面对于时间轮的描述,时间轮其实就是多个时间槽围成一圈变成了一个时间轮。我们在学数据结构的时候,学过循环队列,把循环队列中的元素换成我们的时间槽,再加上一个指针指向时间槽,就真正构成了一个单层的时间轮了。注意,这里的指针得使用原子类来保证并发安全,因为我们的时间轮可能被多个线程同时使用。
如果时间轮的每一个时间槽存的也是一个时间轮,那么多构成了多级时间轮,对于多级时间轮,我们只需要在最低级的时间轮中放置定时任务,不需要放置子轮,对于高级的时间轮,我们只需要放置子轮,不需要放置定时任务。
我们的时间轮要对外提供启动时间轮的方法,添加定时任务到多级时间轮中的方法,更新移动指针的方法,以及执行时间轮中一批定时任务的方法。
public class TimingWheel {
// 每个时间槽的时间间隔,单位毫秒
private static final int TICK_DURATION = 1000;
// 时间轮的大小,即每个时间轮包含的时间槽数量
private static final int WHEEL_SIZE = 20;
// 子时间轮列表,用于处理超过当前时间轮处理能力的任务
private final List<TimingWheel> subWheels;
// 当前时间轮的级别,从0开始,级别越高,表示处理的时间跨度越大
private final int level;
// 最大时间轮级别,用于确定时间轮的深度
private final int maxLevel;
// 共享的延迟队列,用于存储所有到期的任务列表
private final DelayQueue<TimerTaskList> sharedQueue;
// 时间槽数组,用于存储任务列表
private final TimerTaskList[] buckets = new TimerTaskList[WHEEL_SIZE];
// 时间轮的当前刻度,使用原子长整型确保线程安全
private final AtomicLong tick = new AtomicLong(0);
// 任务执行线程池
private final ExecutorService executorService;
/**
* 构造函数,初始化时间轮
*
* @param maxLevel 最大时间轮级别,用于确定时间轮的深度
*/
public TimingWheel(int maxLevel){
this.level = maxLevel;
this.maxLevel = maxLevel;
this.subWheels = new ArrayList<>();
this.sharedQueue = new DelayQueue<>();
this.executorService = Executors.newFixedThreadPool(WHEEL_SIZE+1);
if(maxLevel <= 0){
for(int i = 0;i < WHEEL_SIZE;i++){
buckets[i] = new TimerTaskList(this.executorService);
}
}else{
for(int i = 0; i < WHEEL_SIZE;i++){
subWheels.add(new TimingWheel(maxLevel - 1, maxLevel,this.sharedQueue,this.executorService));
}
}
}
/**
* 私有构造函数,用于创建子时间轮
*
* @param level 当前时间轮的级别
* @param maxLevel 最大时间轮级别
* @param sharedQueue 共享的延迟队列
* @param executorService 任务执行线程池
*/
private TimingWheel(int level, int maxLevel,DelayQueue<TimerTaskList> sharedQueue,ExecutorService executorService) {
this.level = level;
this.maxLevel = maxLevel;
this.subWheels = new ArrayList<>();
this.sharedQueue = sharedQueue;
this.executorService = executorService;
if (level > 0) {
for (int i = 0; i < WHEEL_SIZE; i++) {
subWheels.add(new TimingWheel(level - 1, maxLevel,this.sharedQueue,this.executorService));
}
}else{
for (int i = 0; i < WHEEL_SIZE; i++) {
buckets[i] = new TimerTaskList(this.executorService);
}
}
}
/**
* 启动时间轮,开始处理任务
*/
public void start() {
executorService.execute(() -> {
while (true) {
try {
TimerTaskList bucket = sharedQueue.take();
long ticks = (bucket.getExpiration() - System.currentTimeMillis())/ TICK_DURATION;
if (ticks > tick.get()) {
Thread.sleep((ticks - tick.get()) * TICK_DURATION);
}
processTasks(bucket);
// 更新时间轮指针
tick.set(ticks);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
/**
* 添加任务到时间轮
*
* @param task 要添加的任务
* @param delay 延迟时间,单位毫秒
*/
public void addTask(Runnable task, long delay) {
long currentTime = System.currentTimeMillis();
long expiration = currentTime + delay;
//计算定时任务要放在哪一个时间槽中
//下一层的时钟长度(如果level为0,那就是一个槽的时间长度)
long ts = TICK_DURATION * (long)Math.pow(WHEEL_SIZE,level);
//总时钟步数
int ticks = (int) ((delay) /ts);
int bucketIndex = ticks % WHEEL_SIZE;
TimerTaskList bucket = buckets[bucketIndex];
if (level > 0) {
// 修正传递给子时间轮的延迟时间
subWheels.get(bucketIndex).addTask(task, delay);
} else {
bucket.addTask(new TimerTask(task,expiration));
if (bucket.setExpiration(expiration)) {
sharedQueue.offer(bucket);
}
}
}
/**
* 处理任务列表中的任务
*
* @param bucket 任务列表
*/
private void processTasks(TimerTaskList bucket) {
bucket.run();
// bucket.executeTasks();
if (level < maxLevel) {
for (TimingWheel subWheel : subWheels) {
subWheel.advanceClock();
}
}
}
/**
* 推动时间轮前进
*/
public void advanceClock() {
tick.incrementAndGet();
for (TimingWheel subWheel : subWheels) {
subWheel.advanceClock();
}
}
/**
* TimerTask类,表示一个具有延迟执行需求的任务
*/
private static class TimerTask implements Delayed {
private final Runnable task;
private final long expiration;
public TimerTask(Runnable task, long expiration) {
this.task = task;
this.expiration = expiration;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expiration - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.expiration, ((TimerTask) o).expiration);
}
public void run() {
task.run();
}
}
/**
* TimerTaskList类实现了Delayed接口,用于管理一组具有延迟执行需求的任务
* 它使用DelayQueue来存储这些任务,并在满足执行条件时通过ExecutorService来执行它们
*/
private static class TimerTaskList implements Delayed {
// 任务的过期时间,即任务应该被执行的时间点
private long expiration;
private List<TimerTask> tasks;
// 使用DelayQueue来存储具有延迟执行需求的TimerTask对象
private DelayQueue<TimerTask> queue = new DelayQueue<>();
// ExecutorService用于执行任务,它是在类初始化时通过构造函数传入的
private final ExecutorService executorService;
/**
* 构造函数,初始化ExecutorService
*
* @param executorService 用于执行任务的线程池
*/
public TimerTaskList(ExecutorService executorService) {
this.executorService = executorService;
this.tasks = new ArrayList<>();
}
/**
* 向队列中添加一个新的TimerTask任务
*
* @param task 要添加的TimerTask对象
*/
public void addTask(TimerTask task) {
tasks.add(task);
queue.offer(task);
}
/**
* 设置任务的过期时间
* 只有当expiration尚未设置(即值为0)时,才更新expiration值
*
* @param expiration 任务的过期时间
* @return 如果expiration成功设置,则返回true;否则返回false
*/
public boolean setExpiration(long expiration) {
if (this.expiration == 0) {
this.expiration = expiration;
return true;
}
return false;
}
/**
* 清除所有任务并重置过期时间
*
* 本方法旨在清除所有当前持有的任务,并将过期时间重置为0
* 这在需要重新初始化或清理资源时特别有用
*/
public void clearTasks(){
// 清除所有任务
tasks.clear();
// 重置过期时间为0,表示没有过期时间
expiration = 0;
}
public List<TimerTask> getTasks(){
return tasks;
}
/**
* 执行所有任务
*
* 此方法遍历任务列表,并依次执行每个任务的方法run
* 在所有任务执行完毕后,调用clearTasks方法清除任务列表
*/
public void executeTasks(){
// 遍历任务列表
for(TimerTask task:tasks){
// 执行任务的run方法
task.run();
}
// 所有任务执行完毕后,清除任务列表
clearTasks();
}
/**
* 执行队列中的所有任务
* 如果队列不为空,则通过executorService执行每个任务
* 在所有任务执行完毕后,清除expiration值
*/
public void run() {
if (!queue.isEmpty()) {
executorService.execute(() -> {
while (!queue.isEmpty()) {
try {
queue.take().run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
clearExpiration();
});
}
}
/**
* 获取当前设置的过期时间
*
* @return 当前的expiration值
*/
public long getExpiration() {
return expiration;
}
/**
* 清除过期时间设置,将expiration重置为0
*/
public void clearExpiration() {
expiration = 0;
}
/**
* 实现Delayed接口的getDelay方法
* 计算当前时间与过期时间之间的差值,以确定延迟时间
*
* @param unit 时间单位
* @return 剩余的延迟时间,以指定的时间单位表示
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expiration - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 实现Delayed接口的compareTo方法
* 用于比较两个TimerTaskList对象的过期时间
*
* @param o 另一个Delayed对象
* @return 如果当前对象的过期时间小于、等于或大于参数对象的过期时间,则分别返回负数、零或正数
*/
@Override
public int compareTo(Delayed o) {
return Long.compare(this.expiration, ((TimerTaskList) o).expiration);
}
}
}
优点
1.高效的时间管理
时间轮将时间划分为固定大小的时间槽,每个时间槽代表一个时间段,通过指针逐个扫描这些时间槽,可以高效地管理和调度定时任务,避免了频繁的线程唤醒和上下文切换。
2.低延迟和高吞吐量
由于时间轮采用的是批量处理到期任务的方式,因此可以在较低的延迟下出来大量的定时任务,提高系统的吞吐量。
3.扩展性强
时间轮可以通过多级时间轮的设计来支持更长的延迟时间,子时间轮可以处理更长时间的任务,从而使得整个系统能够灵活应对不同延迟需求的任务
4.简单易懂
时间轮的结构和工作原理相对简单,易于理解和实现,这使得我们可以快速上手,并且在调试和维护的时候也更方便
缺点
1.固定的时间槽大小
时间轮的时间槽大小是固定的,这可能导致某些场景下的精度不足。如果时间槽设置得太小,会增加内存占用;如果设置得太大,则可能影响定时任务的精确度。
2.多级时间轮的复杂性
为了处理更长的延迟时间,可以采用多级时间轮的设计,但是这种设计会增加系统的复杂性。
3.任务堆积问题
当大量任务集中在同一个时间槽内时,可能会导致任务堆积,进而影响任务的执行效率和响应时间。
4.时钟漂移
在分布式系统中,不同节点的时钟可能存在偏差,这会影响时间轮的准确性。