什么不建议通过 `Executors` 构建线程池?
为什么不建议通过 Executors
构建线程池?
在Java并发编程中,使用线程池来管理并发任务是非常常见的。java.util.concurrent.Executors
提供了一系列方便的静态方法来创建线程池,但这些方法创建的线程池却存在一些潜在的问题。本文将探讨这些问题,并给出正确的创建线程池的方法。
1. 引言
Java并发编程的一个重要方面就是如何有效地管理和调度线程。java.util.concurrent.Executor
接口和它的实现类 ExecutorService
是Java并发框架的核心组件之一。java.util.concurrent.Executors
类则提供了创建 ExecutorService
的工厂方法。虽然这些方法提供了极大的便利性,但它们也有自己的局限性和缺陷。
2. Executors
的问题
2.1 不可预测的队列行为
2.1.1 newFixedThreadPool
Executors.newFixedThreadPool(int nThreads)
创建一个固定大小的线程池。当线程池中的所有线程都在执行任务时,新的任务将被放入一个无界队列(LinkedBlockingQueue
)中等待执行。如果队列满了,提交新任务时将抛出 RejectedExecutionException
。
反例
import java.util.concurrent.*;
public class FixedThreadPoolExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
for (int i = 0; i < 10; i++) {
final int index = i;
executor.execute(() -> {
System.out.println("Task " + index + " is running");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
输出结果
Task 0 is running
Task 1 is running
Task 2 is running
Task 3 is running
Task 4 is running
Task 5 is running
Task 6 is running
Task 7 is running
Task 8 is running
Task 9 is running
在这个例子中,如果提交的任务数量超过线程池处理能力,队列可能会无限增长,最终耗尽系统资源。
2.2 缺乏资源限制
2.2.1 newCachedThreadPool
Executors.newCachedThreadPool()
创建一个可根据需要创建新线程的线程池。该线程池在空闲线程存活时间超过60秒后会回收线程。如果任务持续不断地到达,线程池将不断创建新线程,直至系统资源耗尽。
反例
import java.util.concurrent.*;
public class CachedThreadPoolExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
final int index = i;
executor.execute(() -> {
System.out.println("Task " + index + " is running");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
输出结果
Task 0 is running
Task 1 is running
Task 2 is running
... (持续输出,直到系统资源耗尽)
这个例子展示了如果任务到来速度过快,线程池可能会消耗大量的系统资源。
2.3 单一任务失败影响全局
2.3.1 newSingleThreadExecutor
Executors.newSingleThreadExecutor()
创建一个单线程的执行器。如果这个唯一的线程因任何原因(如抛出未捕获的异常)而退出,整个线程池将无法继续执行任务。
反例
import java.util.concurrent.*;
public class SingleThreadExecutorExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
final int index = i;
executor.execute(() -> {
System.out.println("Task " + index + " is running");
if (index == 2) {
throw new RuntimeException("Task failed!");
}
});
}
executor.shutdown();
}
}
输出结果
Task 0 is running
Task 1 is running
Task 2 is running
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: Task failed!
at SingleThreadExecutorExample.lambda$main$0(SingleThreadExecutorExample.java:13)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
这个例子中,一旦任务2抛出异常,线程池将不再执行后续任务。
3. 正确创建线程池的方式
为了克服上述问题,推荐显式地创建 ThreadPoolExecutor
,并设置合理的参数。
3.1 自定义线程池配置
import java.util.concurrent.*;
public class CustomThreadPoolExample {
public static void main(String[] args) {
int corePoolSize = 4;
int maximumPoolSize = 8;
long keepAliveTime = 1L;
TimeUnit unit = TimeUnit.MINUTES;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(100);
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
ExecutorService executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
handler
);
for (int i = 0; i < 10; i++) {
final int index = i;
executor.execute(() -> {
System.out.println("Task " + index + " is running");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
输出结果
Task 0 is running
Task 1 is running
Task 2 is running
Task 3 is running
Task 4 is running
Task 5 is running
Task 6 is running
Task 7 is running
Task 8 is running
Task 9 is running
在这个例子中,我们显式地设置了线程池的大小、队列的大小以及拒绝策略,使得线程池能够更好地适应不同的负载。
3.2 选择合适的拒绝策略
当线程池达到最大容量时,需要有一个机制来处理新来的任务。Java提供了几种内置的拒绝策略:
AbortPolicy
:抛出RejectedExecutionException
异常。CallerRunsPolicy
:调用者的线程执行任务。DiscardPolicy
:丢弃任务,不做任何处理。DiscardOldestPolicy
:丢弃队列中最旧的任务,并尝试重新提交新任务。
示例
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
// 或者
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
4. 扩展知识
4.1 线程工厂
ThreadFactory
可以用来创建线程,并允许你自定义线程的名字、优先级等属性。
import java.util.concurrent.*;
public class NamedThreadFactory implements ThreadFactory {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
private final ThreadGroup mGroup;
private final String mPrefix;
private final boolean mDaemon;
public NamedThreadFactory(String prefix, boolean daemon) {
SecurityManager s = System.getSecurityManager();
mGroup = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
mPrefix = prefix;
mDaemon = daemon;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(mGroup, r,
mPrefix + mThreadNum.getAndIncrement(),
0);
t.setDaemon(mDaemon);
return t;
}
}
// 使用自定义线程工厂创建线程池
NamedThreadFactory factory = new NamedThreadFactory("CustomPool-", false);
ExecutorService executor = new ThreadPoolExecutor(
4, 8, 1L, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(100), factory, new ThreadPoolExecutor.CallerRunsPolicy()
);
4.2 线程监控
通过 ThreadPoolExecutor
的方法可以监控线程池的状态:
getActiveCount()
:获取正在执行任务的线程数。getCompletedTaskCount()
:获取已完成的任务总数。getLargestPoolSize()
:获取曾经创建过的最大线程数。getPoolSize()
:获取当前线程池中的线程数量。getQueue()
:获取线程池使用的队列。
System.out.println("Active threads: " + ((ThreadPoolExecutor) executor).getActiveCount());
System.out.println("Completed tasks: " + ((ThreadPoolExecutor) executor).getCompletedTaskCount());
System.out.println("Largest pool size: " + ((ThreadPoolExecutor) executor).getLargestPoolSize());
5. 总结
尽管 Executors
提供的方法简化了线程池的创建过程,但它们缺乏灵活性和控制力,可能导致系统资源耗尽或其他不可预见的问题。通过显式地创建 ThreadPoolExecutor
并合理配置其参数,我们可以更好地控制线程池的行为,从而提高系统的稳定性和性能。
6. 参考链接
- Creating and Managing a Thread Pool
通过遵循这些最佳实践,我们可以更好地管理和控制我们的线程池,从而提高应用程序的稳定性和性能。