当前位置: 首页 > article >正文

什么不建议通过 `Executors` 构建线程池?

为什么不建议通过 Executors 构建线程池?

在Java并发编程中,使用线程池来管理并发任务是非常常见的。java.util.concurrent.Executors 提供了一系列方便的静态方法来创建线程池,但这些方法创建的线程池却存在一些潜在的问题。本文将探讨这些问题,并给出正确的创建线程池的方法。

1. 引言

Java并发编程的一个重要方面就是如何有效地管理和调度线程。java.util.concurrent.Executor 接口和它的实现类 ExecutorService 是Java并发框架的核心组件之一。java.util.concurrent.Executors 类则提供了创建 ExecutorService 的工厂方法。虽然这些方法提供了极大的便利性,但它们也有自己的局限性和缺陷。

2. Executors 的问题

2.1 不可预测的队列行为

2.1.1 newFixedThreadPool

Executors.newFixedThreadPool(int nThreads) 创建一个固定大小的线程池。当线程池中的所有线程都在执行任务时,新的任务将被放入一个无界队列(LinkedBlockingQueue)中等待执行。如果队列满了,提交新任务时将抛出 RejectedExecutionException

反例
import java.util.concurrent.*;

public class FixedThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 10; i++) {
            final int index = i;
            executor.execute(() -> {
                System.out.println("Task " + index + " is running");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.shutdown();
    }
}
输出结果
Task 0 is running
Task 1 is running
Task 2 is running
Task 3 is running
Task 4 is running
Task 5 is running
Task 6 is running
Task 7 is running
Task 8 is running
Task 9 is running

在这个例子中,如果提交的任务数量超过线程池处理能力,队列可能会无限增长,最终耗尽系统资源。

2.2 缺乏资源限制

2.2.1 newCachedThreadPool

Executors.newCachedThreadPool() 创建一个可根据需要创建新线程的线程池。该线程池在空闲线程存活时间超过60秒后会回收线程。如果任务持续不断地到达,线程池将不断创建新线程,直至系统资源耗尽。

反例
import java.util.concurrent.*;

public class CachedThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {
            final int index = i;
            executor.execute(() -> {
                System.out.println("Task " + index + " is running");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.shutdown();
    }
}
输出结果
Task 0 is running
Task 1 is running
Task 2 is running
... (持续输出,直到系统资源耗尽)

这个例子展示了如果任务到来速度过快,线程池可能会消耗大量的系统资源。

2.3 单一任务失败影响全局

2.3.1 newSingleThreadExecutor

Executors.newSingleThreadExecutor() 创建一个单线程的执行器。如果这个唯一的线程因任何原因(如抛出未捕获的异常)而退出,整个线程池将无法继续执行任务。

反例
import java.util.concurrent.*;

public class SingleThreadExecutorExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            final int index = i;
            executor.execute(() -> {
                System.out.println("Task " + index + " is running");
                if (index == 2) {
                    throw new RuntimeException("Task failed!");
                }
            });
        }
        executor.shutdown();
    }
}
输出结果
Task 0 is running
Task 1 is running
Task 2 is running
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: Task failed!
    at SingleThreadExecutorExample.lambda$main$0(SingleThreadExecutorExample.java:13)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

这个例子中,一旦任务2抛出异常,线程池将不再执行后续任务。

3. 正确创建线程池的方式

为了克服上述问题,推荐显式地创建 ThreadPoolExecutor,并设置合理的参数。

3.1 自定义线程池配置

import java.util.concurrent.*;

public class CustomThreadPoolExample {
    public static void main(String[] args) {
        int corePoolSize = 4;
        int maximumPoolSize = 8;
        long keepAliveTime = 1L;
        TimeUnit unit = TimeUnit.MINUTES;
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(100);
        RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();

        ExecutorService executor = new ThreadPoolExecutor(
            corePoolSize,
            maximumPoolSize,
            keepAliveTime,
            unit,
            workQueue,
            handler
        );

        for (int i = 0; i < 10; i++) {
            final int index = i;
            executor.execute(() -> {
                System.out.println("Task " + index + " is running");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executor.shutdown();
    }
}
输出结果
Task 0 is running
Task 1 is running
Task 2 is running
Task 3 is running
Task 4 is running
Task 5 is running
Task 6 is running
Task 7 is running
Task 8 is running
Task 9 is running

在这个例子中,我们显式地设置了线程池的大小、队列的大小以及拒绝策略,使得线程池能够更好地适应不同的负载。

3.2 选择合适的拒绝策略

当线程池达到最大容量时,需要有一个机制来处理新来的任务。Java提供了几种内置的拒绝策略:

  • AbortPolicy:抛出 RejectedExecutionException 异常。
  • CallerRunsPolicy:调用者的线程执行任务。
  • DiscardPolicy:丢弃任务,不做任何处理。
  • DiscardOldestPolicy:丢弃队列中最旧的任务,并尝试重新提交新任务。
示例
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
// 或者
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();

4. 扩展知识

4.1 线程工厂

ThreadFactory 可以用来创建线程,并允许你自定义线程的名字、优先级等属性。

import java.util.concurrent.*;

public class NamedThreadFactory implements ThreadFactory {
    private final AtomicInteger mThreadNum = new AtomicInteger(1);
    private final ThreadGroup mGroup;
    private final String mPrefix;
    private final boolean mDaemon;

    public NamedThreadFactory(String prefix, boolean daemon) {
        SecurityManager s = System.getSecurityManager();
        mGroup = (s != null) ? s.getThreadGroup() :
            Thread.currentThread().getThreadGroup();
        mPrefix = prefix;
        mDaemon = daemon;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(mGroup, r,
            mPrefix + mThreadNum.getAndIncrement(),
            0);
        t.setDaemon(mDaemon);
        return t;
    }
}

// 使用自定义线程工厂创建线程池
NamedThreadFactory factory = new NamedThreadFactory("CustomPool-", false);
ExecutorService executor = new ThreadPoolExecutor(
    4, 8, 1L, TimeUnit.MINUTES,
    new LinkedBlockingQueue<>(100), factory, new ThreadPoolExecutor.CallerRunsPolicy()
);

4.2 线程监控

通过 ThreadPoolExecutor 的方法可以监控线程池的状态:

  • getActiveCount():获取正在执行任务的线程数。
  • getCompletedTaskCount():获取已完成的任务总数。
  • getLargestPoolSize():获取曾经创建过的最大线程数。
  • getPoolSize():获取当前线程池中的线程数量。
  • getQueue():获取线程池使用的队列。
System.out.println("Active threads: " + ((ThreadPoolExecutor) executor).getActiveCount());
System.out.println("Completed tasks: " + ((ThreadPoolExecutor) executor).getCompletedTaskCount());
System.out.println("Largest pool size: " + ((ThreadPoolExecutor) executor).getLargestPoolSize());

5. 总结

尽管 Executors 提供的方法简化了线程池的创建过程,但它们缺乏灵活性和控制力,可能导致系统资源耗尽或其他不可预见的问题。通过显式地创建 ThreadPoolExecutor 并合理配置其参数,我们可以更好地控制线程池的行为,从而提高系统的稳定性和性能。

6. 参考链接

  • Creating and Managing a Thread Pool

通过遵循这些最佳实践,我们可以更好地管理和控制我们的线程池,从而提高应用程序的稳定性和性能。


http://www.kler.cn/a/302244.html

相关文章:

  • 软件测试 —— 性能测试(jmeter)
  • C#,入门教程(04)——Visual Studio 2022 数据编程实例:随机数与组合
  • 【Knife4j与Swagger的区别是什么?】
  • 小米Vela操作系统开源:AIoT时代的全新引擎
  • flume系列之:flume落cos
  • Java Web开发高级——单元测试与集成测试
  • 抓包工具检测手把手教学 - 某招聘网站
  • 7-6 列出连通集
  • pyqt自定义文本编辑器
  • TCP通信实现
  • 2024 天池云原生编程挑战赛决赛名单公布,9 月 20 日开启终极答辩
  • 【从0开始在CentOS 9中安装redis】
  • Windows编译Hikari-LLVM15[llvm-18.1.8rel]并集成到Android Studio NDK
  • openVX加速-常见问题:适用场景、AI加速、安装方式等
  • 模板(C++)
  • Java中的List与Set转换
  • jantic/DeOldify部署(图片上色)附带Dockerfile和镜像
  • Linux下的系统接口(实时更新)
  • 人工智能安全治理框架导图
  • 【泰克生物】酵母单杂交技术在基因调控研究中的应用
  • 数据结构——查找算法
  • 240908-结合DBGPT与Ollama实现RAG本地知识检索增强
  • OpenCV结构分析与形状描述符(23)确定一个点是否位于多边形内的函数pointPolygonTest()的使用
  • 单链表的查找与长度计算
  • PyCharm与Anaconda超详细安装配置教程
  • 高效Flutter应用开发:GetX状态管理实战技巧