如何设计一个能根据任务优先级来执行的线程池
- 不同的线程池会选用不同的阻塞队列作为任务队列,比如
FixedThreadPool
使用的是LinkedBlockingQueue
(有界队列),默认构造器初始的队列长度为Integer.MAX_VALUE
,由于队列永远不会被放满,因此FixedThreadPool
最多只能创建核心线程数的线程。- 假如需要实现一个优先级任务线程池的话,那可以考虑使用
PriorityBlockingQueue
(优先级阻塞队列)作为任务队列(ThreadPoolExecutor
的构造函数有一个workQueue
参数可以传入任务队列)。
要想让 PriorityBlockingQueue
实现对任务的排序,传入其中的任务必须是具备排序能力的,方式有两种:
实现 Comparable
接口
提交到线程池的任务实现 Comparable
接口,并重写 compareTo
方法来指定任务之间的优先级比较规则。
缺点:1.任务类必须实现 Comparable
接口,硬编码不够灵活。2.如果需要多种优先级规则,任务类代码会变得复杂。
import java.util.concurrent.*;
public class PriorityTask implements Runnable, Comparable<PriorityTask> {
private final int priority;
private final String name;
public PriorityTask(int priority, String name) {
this.priority = priority;
this.name = name;
}
@Override
public void run() {
System.out.println("Executing task: " + name + " with priority: " + priority);
}
@Override
public int compareTo(PriorityTask other) {
return Integer.compare(this.priority, other.priority); // 优先级值越小,优先级越高
}
}
public class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<Runnable>());
}
}
//使用示例
public class Main {
public static void main(String[] args) {
PriorityThreadPoolExecutor executor = new PriorityThreadPoolExecutor(2, 4, 1, TimeUnit.MINUTES);
executor.execute(new PriorityTask(10, "Low priority task"));
executor.execute(new PriorityTask(1, "High priority task"));
executor.execute(new PriorityTask(5, "Medium priority task"));
executor.shutdown();
}
}
Comparator
创建 PriorityBlockingQueue
时传入一个 Comparator
对象来指定任务之间的排序规则(推荐)。
import java.util.concurrent.*;
public class Task implements Runnable {
private final int priority;
private final String name;
public Task(int priority, String name) {
this.priority = priority;
this.name = name;
}
@Override
public void run() {
System.out.println("Executing task: " + name + " with priority: " + priority);
}
public int getPriority() {
return priority;
}
}
public class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
new PriorityBlockingQueue<>(11, Comparator.comparingInt(Task::getPriority)));
}
}
//使用示例
public class Main {
public static void main(String[] args) {
PriorityThreadPoolExecutor executor = new PriorityThreadPoolExecutor(2, 4, 1, TimeUnit.MINUTES);
executor.execute(new Task(10, "Low priority task"));
executor.execute(new Task(1, "High priority task"));
executor.execute(new Task(5, "Medium priority task"));
executor.shutdown();
}
}