【Java 定时任务】小顶堆与时间轮算法介绍 + 多线程代码练习
前言
定时任务在实际的开发中特别常见,如电商平台 30 分钟后自动取消未支付的订单,以及定时发布朋友圈等,都需要借助定时任务来实现。
前期回顾:【Java 并发编程】线程池理解与使用
代码地址:吐司代码
目录
前言
Timer 定时器
小顶堆模拟实现 Timer 定时器
Timer 定时器的局限性
ScheduledThreadPoolExecutor 定时器
执行周期性任务:
执行定时性任务:
时间轮算法的简单介绍
时间轮的结构
HashedWheelTimer 定时器
HashedWheelTimer 使用
多线程代码练习
Timer 定时器
标准库中提供了⼀个 Timer 类。一个完整定时任务需要由 Timer、TimerTask 两个类来配合完成。Timer 是一种定时器工具,用来在一个前台线程计划执行指定任务,而 TimerTask 一个抽象类,它的子类代表一个可以被 Timer 计划的任务。
Timer 类的定时器的核⼼⽅法为 schedule:
schedule(TimerTask ,delay) |
schedule 包含两个参数:第⼀个参数指定即将要执⾏的任务代码,第⼆个参数指定多⻓时间之后执⾏ (单位为毫秒)。
代码案例:
class Demo{
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
System.out.println("Hello World");
}
},300);
ExecutorService service = new ThreadPoolExecutor(3, 6, 1,
TimeUnit.MINUTES,
new LinkedBlockingQueue<Runnable>(4),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 5; i++) {
service.execute(()->{
System.out.println(Thread.currentThread().getName()+"正在执行任务~");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
service.shutdown();
}
}
运行结果:
pool-1-thread-2正在执行任务~
pool-1-thread-1正在执行任务~
pool-1-thread-3正在执行任务~
Hello World
pool-1-thread-3正在执行任务~
pool-1-thread-2正在执行任务~
(程序运行没有结束...)
通过上述案例,我们可以明确知道,定时任务在我们执行主程序运行 300 毫秒后触发了。由于 Timer 创建的执行任务的线程是一个前台线程,会影响程序的结束,所以当所有任务都执行完后,程序也不会结束。
这里再举个栗子:
Timer timer1 = new Timer();
timer1.schedule(new TimerTask() {
public void run() {
System.out.println("Hello World1");
while(true){}
}
},300);
timer1.schedule(new TimerTask() {
public void run() {
System.out.println("Hello World2");
}
},400);
将第一个定时任务设为死循环,在创建第二个定时任务,那么第二个定时任务还能被执行到吗?观察一下打印结果:
pool-1-thread-3正在执行任务~
pool-1-thread-1正在执行任务~
pool-1-thread-2正在执行任务~
Hello World1
pool-1-thread-3正在执行任务~
pool-1-thread-1正在执行任务~
我们发现程序只输出了第一个定时任务的结果,这是为什么呢?Timer是单线程,其不会创建新线程。定时任务就被卡在 while 循环里了,线程就算想执行第二个定时任务,也无能为力。如果想让程序执行第二个定时任务,那么只需要让第二个定时任务的时间靠前即可。
timer1.schedule(new TimerTask() {
public void run() {
System.out.println("Hello World2");
}
},200);
这样两个定时任务都能被执行到了。Timer 的定时是不论任务的加入先后,而是看任务具体执行的时间,时间靠前的先执行。这主要得益于一种数据结构 -- 小顶堆。听到这个名字或许你会很陌生,那么它还有另外一个名字,优先级队列(ProprityQueue)。
小顶堆模拟实现 Timer 定时器
Timer 通过 ProprityQueue 优先级队列与 Event loop 单线程来实现定时功能。例如,现在加入了一个定时任务,那么就会触发 ProprityQueue 的重排序,这样就能保证最先执行的任务一定是排在最上面的。而 Event loop 只需要再检查首元素是否到了执行时间即可,到了时间就取出来执行,没到就测量还需要等待多长时间。比如,现在是 8:50 分,第一个任务需要 9:00 执行就 wait 5分钟。
模拟实现:
保证优先级队列中的每个元素是⼀个 MyTimerTask 对象 |
MyTimerTask 中带有⼀个时间属性,队⾸元素就是即将要执⾏的任务 |
同时当前工作线程在加入元素的需要扫描队⾸元素,查看队⾸元素是否需要执⾏,或者何时指执行 |
class MyTimerTask implements Comparable<MyTimerTask> {
private Runnable runnable;
// 记录当前时间
private long time;
public MyTimerTask(Runnable runnable, long delay) {
this.runnable = runnable;
// 当前毫秒级时间戳 + 定时时间
this.time = System.currentTimeMillis() + delay;
}
public void run(){
runnable.run();
}
public long getTime() {
return time;
}
@Override
public int compareTo(MyTimerTask o) {
return (int)(this.time-o.time);
}
}
class MyTimer {
// 锁对象
Object locker = new Object();
// 创建优先级阻塞队列,计时器的核心数据结构
private PriorityBlockingQueue<MyTimerTask> queue = new PriorityBlockingQueue<>();
public void schedule(Runnable runnable,long delay) {
// 创建任务
MyTimerTask myTask = new MyTimerTask(runnable,delay);
// 将任务放进计时器
queue.put(myTask);
synchronized (locker) {
// 叫醒线程
locker.notify();
}
}
public MyTimer() {
// 内置线程重复执行计时器中的任务
Thread thread = new Thread(() -> {
while (true) {
try {
synchronized (locker) {
// 取出计时器中最优先执行的任务
MyTimerTask myTask = queue.take();
// 判断是否到达执行时间
if (System.currentTimeMillis() >= myTask.getTime()) {
// 到达执行
myTask.run();
} else {
// 未到达再放回计时器
queue.put(myTask);
// 让线程等待相应时间
locker.wait(myTask.getTime() - System.currentTimeMillis());
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
}
}
代码测试:
class Test{
public static void main(String[] args) {
MyTimer timer1 = new MyTimer();
timer1.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("Hello World1");
}
},300);
MyTimer timer2 = new MyTimer();
timer2.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("Hello World2");
}
},1000);
ExecutorService service = new ThreadPoolExecutor(3, 6, 1,
TimeUnit.MINUTES,
new LinkedBlockingQueue<Runnable>(4),
new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
return thread;
}
},
new ThreadPoolExecutor.AbortPolicy());
for (int i = 1; i <= 5; i++) {
service.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"正在工作~");
try {
Thread.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
}
// 关闭线程池
service.shutdown();
}
}
运行结果:
Thread-2正在工作~
Thread-3正在工作~
Thread-4正在工作~
Hello World1
Thread-2正在工作~
Thread-3正在工作~
Hello World2
Timer 定时器的局限性
单线程调度:Timer 内部使用单个线程来执行所有任务,因此如果一个 TimerTask 执行时间过长,会影响其他任务的执行,导致后续任务的延迟。适合简单任务,不适合需要高并发的场景。 |
不能捕获的异常:如果 TimerTask 中抛出了 RuntimeException 并未被捕获,则该 Timer 线程会意外终止,导致所有已安排但未执行的任务被取消。 |
ScheduledThreadPoolExecutor 定时器
上文中介绍的 Timer 其实目前并不推荐用户使用,它是存在不少设计缺陷的。 为了解决 Timer 的设计缺陷,Java 提供了功能更加丰富的定时器 ScheduledThreadPoolExecutor。
ThreadPoolExecutor 线程池中的子类就是 ScheduledThreadPoolExecutor,主要用于执行周期性任务与定时性任务。另外 ScheduledThreadPoolExecutor 中使用的延迟队列,与 Timer 一样是优先级队列,也就是基于小顶堆实现的。
ScheduledThreadPoolExecutor 的核心周期执行功能是 schedulewithfixeddelay 完成的
schedulewithfixeddelay(runnable command,
long initialdelay,
long delay,
timeunit unit)
command:即将要执⾏的任务代码。
initialDelay:首次执行任务前的延迟时间。
delay:一次执行终止和下一次执行开始之间的延迟。
unit:initialDelay和delay参数的时间单位。
代码案例:
以下的创建 scheduledExecutorService 对象,并创建了三个线程的线程池
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
执行周期性任务:
scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { // 周期任务 } }, 0, 1, TimeUnit.SECONDS);
这里的任务会每秒钟执行一次。第二个参数是初始延迟时间,第三个参数是执行周期,第四个参数是时间单位。
public static void main(String[] args){ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println(df.format(new Date())); System.out.println(Thread.currentThread().getName()+"正在执行任务~"); } }, 0, 1, TimeUnit.SECONDS); }
运行结果:
2024-10-20 18:22:38 pool-1-thread-1正在执行任务~ 2024-10-20 18:22:39 pool-1-thread-1正在执行任务~ 2024-10-20 18:22:40 pool-1-thread-2正在执行任务~ 2024-10-20 18:22:41 pool-1-thread-1正在执行任务~ 2024-10-20 18:22:42 pool-1-thread-3正在执行任务~ 2024-10-20 18:22:43 pool-1-thread-2正在执行任务~ 2024-10-20 18:22:44 pool-1-thread-1正在执行任务~
我们可以看到程序此时是按周期进行的,而且是三个线程抢占执行。那么怎么才能设置成定时任务呢?
ScheduledThreadPoolExecutor 的核心定时执行功能是 schedule 完成的
schedule(Task ,delay,unit)
Task:首次执行任务前的延迟时间。
delay:参数指定多⻓时间之后执⾏ 。
unit:initialDelay和delay参数的时间单位。
执行定时性任务:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); scheduledExecutorService.schedule(new Runnable() { @Override public void run() { // 定时任务 } }, 3, TimeUnit.SECONDS);
文里的任务会延迟 3 秒后执行。第二个参数是延迟时间,第三个参数是时间单位。
public static void main(String[] args){ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println(df.format(new Date())); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); scheduledExecutorService.schedule(new Runnable() { @Override public void run() { // 定时任务 System.out.println(df.format(new Date())); System.out.println(Thread.currentThread().getName()+"正在执行任务"); } }, 3, TimeUnit.SECONDS); }
运行结果:
2024-10-20 18:33:52 2024-10-20 18:33:55 pool-1-thread-1正在执行任务
底层源码:
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
implements ScheduledExecutorService {
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
...
}
延迟工作队列(DelayedWorkQueue)是一种特殊的队列,用于在 Java 线程池中处理延时任务。它的核心是一个优先级队列,基于堆数据结构实现,能够确保队列中的任务按照预定的执行时间顺序执行。
所以 Timer、 ScheduledThreadPoolExecutor 都是基于优先级队列实现的,但是后者支持多线程不会影响其他程序运行,有利于高并发的场景。算是弥补了 Timer 的短板。往后的编程中建议更建议使用后者。
关于 Timer、 ScheduledThreadPoolExecutor 两者的区别考可以参考这个博主的文章:区别
性能问题:在数据量过于庞大的情况,Timer、 ScheduledThreadPoolExecutor 都会导致优先级队列过长,而每次加入任务,优先级队列的重排序都会对程序性能具有一定的影响。所以,一种新的时间轮算法,被用来实现时调度。
时间轮算法的简单介绍
时间轮的结构
时间轮简单来说就是一个环形的队列(底层一般基于数组实现),队列中的每一个元素(时间格)都可以存放一个定时任务列表。每个时间格大小可以是 100 ms 也可以是 1 ms。上图的时间轮有 6 个时间格,每个时间格大小是 100 ms,也就是每过 100 ms会顺时针移动一个时间格,走完一圈需要 600 ms。
再例如:下图是一个有 12 个时间格的时间轮,转完一圈需要 12 s。当我们需要新建一个 3s 后执行的定时任务,只需要将定时任务放在下标为 3 的时间格中即可。当我们需要新建一个 9s 后执行的定时任务,只需要将定时任务放在下标为 9 的时间格中即可。
但是我们要执行一个 13s 的任务时,该怎么办呢?
<1>方法一:加时间格,现在我们的时间格是 12 个,我们就可以增加到 24 个。那么 13s 的任务就有位置存放了。但是这很不灵活,而且对空间的浪费比较大,所以并不推荐。
<2>方法二:round 型时间轮,任务上记录个 round,遍历到了就将 rornd 减一,为 0 时取出执行。也就是说,我们可以把 13s 的任务存放在 1s 的位置,让 round = 1,当时间轮运行完一圈 rornd 减一恰好等于0。此时就可以执行 13s 的任务了。但是这种方法需要遍历所有的任务,效率较低。
<3>方法三:分层时间轮,使用多个不同时间维度的轮:天轮-记录几点执行,月轮-记录几号执行;月轮遍历到了,将任务取出放到天轮里面,即可实现几号几点执行。例如,我需要在星期三的 12点 让系统自动帮我点个外卖,那么就可以设计两个时间轮;一个星期时间轮,一个 1-24 小时时间轮,我们在星期时间轮里找到星期三这个时间格再把它放到 1-24 小时时间轮里,1-24 小时时间轮再继续去找 12 点的时间格。这样就能确定任务执行的具体时间。
工作原理:时间轮定时器最大的优势就是任务的新增和取消都是 O(1) 时间复杂度,而且只需要一个线程就可以驱动时间轮进行工作。每往时间轮提交一个延时任务,会判断该任务的执行时间要放在哪个时间格上。时间轮每移动一个时间格,就会执行当前时间格上的任务,一个时间格上的任务可能会有多个。
HashedWheelTimer 定时器
在 Java 中,因为 HashedWheelTimer 是基于时间轮的定时器。但 HashedWheelTimer 是一个粗略的定时器实现,之所以称之为粗略的实现是因为该时间轮并没有严格的准时执行定时任务,而是在每隔一个时间间隔之后的时间节点执行,并执行当前时间节点之前到期的定时任务
不过具体的定时任务的时间执行精度可以通过调节 HashedWheelTimer 构造方法的时间间隔的大小来进行调节,在大多数网络应用的情况下,由于 IO 延迟的存在,并不会严格要求具体的时间执行精度,所以默认的 100ms 时间间隔可以满足大多数的情况,不需要再花精力去调节该时间精度。
HashedWheelTimer 使用
// 构造一个 Timer 实例
Timer timer = new HashedWheelTimer();
// 提交一个任务,让它在 5s 后执行
Timeout timeout1 = timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) {
System.out.println("5s 后执行该任务");
}
}, 5, TimeUnit.SECONDS);
// 再提交一个任务,让它在 10s 后执行
Timeout timeout2 = timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) {
System.out.println("10s 后执行该任务");
}
}, 10, TimeUnit.SECONDS);
// 取消掉那个 5s 后执行的任务
if (!timeout1.isExpired()) {
timeout1.cancel();
}
// 原来那个 5s 后执行的任务,已经取消了。这里我们反悔了,我们要让这个任务在 3s 后执行
timer.newTimeout(timeout1.task(), 3, TimeUnit.SECONDS);
多线程代码练习
考虑下面这个使用 BlockingQueue 示例。有一台机器具有三个任务:一个制作吐司,一个给吐司抹黄油,另一个在抹过黄油的吐司上吐果酱。可以通过各个处理过程之间 BlockingQueue 来运行这个吐司制作程序:
class Toast{
public enum Status{做吐司,抹黄油,涂果酱}
private Status status = Status.做吐司;
private final int id;
public Toast(int idn){id = idn;}
public void butter(){
status = Status.抹黄油;
}
public void jam(){
status = Status.涂果酱;
}
public Status getStatus(){
return status;
}
public int getId(){
return id;
}
public String toString(){
return "Toast " + id;
}
}
class ToastQueue extends LinkedBlockingQueue<Toast>{}
class Toaster implements Runnable{
private ToastQueue toastQueue;
private int count = 0;
private Random rand = new Random();
public Toaster(ToastQueue tq){
toastQueue = tq;
}
@Override
public void run(){
try{
while (!Thread.interrupted()){
TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(100));
Toast t = new Toast(count++);
System.out.println(t+ " : " + t.getStatus());
toastQueue.put(t);
}
} catch (InterruptedException e) {
System.out.println("吐司机器中断~");
}
System.out.println("停止制作吐司工作");
}
}
class Butterer implements Runnable{
private ToastQueue dryQueue ,butteredQueue;
public Butterer(ToastQueue dry , ToastQueue buttered){
dryQueue = dry;
butteredQueue = buttered;
}
@Override
public void run(){
try{
while (!Thread.interrupted()){
Toast t = dryQueue.take();
t.butter();
System.out.println(t+ " : " + t.getStatus());
butteredQueue.put(t);
}
} catch (InterruptedException e) {
System.out.println("黄油机器中断~");
}
System.out.println("停止涂抹黄油工作");
}
}
class Jammer implements Runnable{
private ToastQueue butteredQueue,finishedQueue;
public Jammer(ToastQueue buttered,ToastQueue finished){
butteredQueue = buttered;
finishedQueue = finished;
}
@Override
public void run(){
try{
while (!Thread.interrupted()){
Toast t = butteredQueue.take();
t.jam();
System.out.println(t+ " : " + t.getStatus());
System.out.println(t+"号吐司制作完成");
finishedQueue.put(t);
}
} catch (InterruptedException e) {
System.out.println("果酱机器中断~");
}
System.out.println("停止涂抹果酱工作");
}
}
class Eater implements Runnable{
private ToastQueue finishedQueue;
private static int counter = 0;
private static Random rand = new Random();
private int balance = rand.nextInt(1000)+100;
private static final int price = rand.nextInt(100)+50;
private Object lock = new Object();
private volatile boolean isMoney = false;
public Eater(ToastQueue finished){
this.finishedQueue = finished;
}
public void dealToast(){
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(()->{
while(!Thread.interrupted()){
synchronized (lock){
if(isMoney == true){
balance+=rand.nextInt(1000)+100;
isMoney = false;
lock.notify();
}
}
}
});
while(balance < price){
synchronized (lock){
System.out.println(Thread.currentThread().getName()+"顾客离开餐厅...");
try {
isMoney = true;
lock.wait();
System.out.println(Thread.currentThread().getName()+"顾客再次进入餐厅...");
} catch (InterruptedException e) {
System.out.println("wait 等待中断");
}
}
}
balance -= price;
System.out.println(Thread.currentThread().getName()+"顾客花费"+price+"元购买了一个吐司,还剩下"+balance+"元");
}
@Override
public void run(){
try{
while (!Thread.interrupted()){
Toast t = finishedQueue.take();
if(t.getId() != counter++ || t.getStatus() != Toast.Status.涂果酱){
System.out.println(">>>> 错误 : " + t);
System.exit(1);
}else{
dealToast();
System.out.println(Thread.currentThread().getName()+"顾客吃了"+t+"号吐司");
}
}
} catch (InterruptedException e) {
System.out.println("吃吐司终止~");
}
System.out.println("吃吐司结束");
}
}
class Main{
public static void main(String[] args) throws InterruptedException {
ToastQueue dryQueue = new ToastQueue(),
butteredQueue = new ToastQueue(),
finishedQueue = new ToastQueue();
ExecutorService exec = new ThreadPoolExecutor(6, 9, 3,
TimeUnit.MINUTES,
new LinkedBlockingQueue<Runnable>(4),
new ThreadPoolExecutor.AbortPolicy());
exec.execute(new Toaster(dryQueue));
exec.execute(new Butterer(dryQueue,butteredQueue));
exec.execute(new Jammer(butteredQueue,finishedQueue));
Eater [] eaters = new Eater[3];
for(int i = 0;i<3;i++){
eaters[i] = new Eater(finishedQueue);
}
for(int i = 0;i<3;i++){
exec.execute(eaters[i]);
}
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
exec.shutdownNow();
}
},5000);
}
}
运行结果:
Toast 0 : 做吐司
Toast 0 : 抹黄油
Toast 0 : 涂果酱
Toast 0号吐司制作完成
pool-1-thread-4顾客花费101元购买了一个吐司,还剩下47元
pool-1-thread-4顾客吃了Toast 0号吐司
Toast 1 : 做吐司
Toast 1 : 抹黄油
Toast 1 : 涂果酱
Toast 1号吐司制作完成
pool-1-thread-5顾客花费101元购买了一个吐司,还剩下2元
pool-1-thread-5顾客吃了Toast 1号吐司
...
pool-1-thread-6顾客离开餐厅...
pool-1-thread-6顾客再次进入餐厅...
pool-1-thread-6顾客花费101元购买了一个吐司,还剩下787元
pool-1-thread-6顾客吃了Toast 29号吐司
吃吐司终止~
吃吐司结束
黄油机器中断~
吐司机器中断~
停止制作吐司工作
果酱机器中断~
停止涂抹果酱工作
吃吐司终止~
吃吐司结束
吃吐司终止~
吃吐司结束
停止涂抹黄油工作
Toast 是一个使用 enum 值的优秀示例。注意,这个示例中没有任何显式的同步(即使用Lock对象或synchronized关键字的同步),因为同步由队列(其内部是同步的)和系统的设计隐式地管理了,每片 Toast 在任何时刻都只由一个任务在操作。因为队列的阻塞,使得处理过程将被自动地挂起和恢复。你可以看到由 BlockingQueue 产生的简化十分明显。在使用显示的 wait() 和 notifyAll() 时存在的类和类之间的耦合被消除了,因为每个类都只和它的BlockingQueue通信。