当前位置: 首页 > article >正文

并发基础之线程池(Thread Pool)

目录

    • 前言
    • 何为线程池
    • 线程池优势
    • 创建线程池方式
      • 直接实例化ThreadPoolExecutor类
      • JUC Executors 创建线程池
    • 线程池挖掘
      • Executors简单介绍
      • ThreadPoolExecutor核心类
      • ThreadPoolExecutor 类构造参数含义
      • 线程池运行规则
      • 线程设置数量
    • 结语

前言

相信大家都知道当前的很多系统架构都要求高并发,所谓高并发(High Concurrency)就是系统通过设计满足多个请求并行的能力,如果非要通俗一点就是系统在单位时间要满足较高的QPS\TPS。那么,如何让系统满足这些高并发能力呢?满足高并发能力不仅仅是分布式解耦、读写分离、限流削峰、缓存、队列,当前还有我们代码编写层面的多线程运用,让单位时间尽可能快的完成业务功能以提升系统吞吐量,吞吐量上来了QPS/TPS自然会提升。所以,今天我们主要对并发基础之线程池简要说明。

何为线程池

线程池英文 Thread Pool,是一种线程处理形式,望文生义就是一个装满线程的池子。当我们需要处理任务时直接从线程池中抓取线程执行,从而减少创建线程开销,避免创建过多线程影响系统开销,也为了尽可能压榨资源提升系统运行效率。

线程池优势

使用线程池的优点我们可以总结为以下几点:
1、重复使用线程,避免频繁创建线程开销,提升系统性能;
2、提供定时调度、单线程、并发数量控制功能,方便实现具体业务场景;
3、灵活的并发线程数量控制,尽可能多的压榨资源提升系统效率,避免过多线程阻塞系统;
4、提供了线程监控功能,可以监控系统运行资源情况

创建线程池方式

直接实例化ThreadPoolExecutor类

直接实例化ThreadPoolExecutor类,传入自定义构造参数

private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
        // 线程池核心池的大小
        1,
        // 线程池的最大线程数
        2,
        // 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间
        1,
        // 等待的时间单位
        TimeUnit.SECONDS,
        // 用来储存等待执行任务的队列
        new ArrayBlockingQueue<Runnable>(10),
        //线程工厂
        new ThreadPoolExecutor.DiscardOldestPolicy());

JUC Executors 创建线程池

Executors 类有很多创建线程池的构造方法,如:
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

其本质上还是调用的ThreadPoolExecutor 线程执行类的构造方法。

线程池挖掘

Executors简单介绍

Java JUC 包下Executors类提供了多种创建线程池的方法:
在这里插入图片描述

总的来说我们可以分为如下几种线程池类型:
1、Executors.newCachedThreadPool:创建一个可缓存的线程池,如果线程池的大小超过了需要,可以灵活回收空闲线程,如果没有可回收线程,则新建线程
2、Executors.newFixedThreadPool:创建一个定长的线程池,可以控制线程的最大并发数,超出的线程会在队列中等待
3、Executors.newScheduledThreadPool:创建一个定长的线程池,支持定时、周期性的任务执行
4、Executors.newSingleThreadExecutor:创建一个单线程化的线程池,使用一个唯一的工作线程执行任务,保证所有任务按照指定顺序(先入先出或者优先级)执行
5、Executors.newSingleThreadScheduledExecutor:创建一个单线程化的线程池,支持定时、周期性的任务执行
6、Executors.newWorkStealingPool:创建一个具有并行级别的work-stealing线程池

ThreadPoolExecutor核心类

上文已经讲述了我们创建线程池常见的几种方式,这些方式JUC下Executors都已经提供。那么,这些常用的方法是如何创建线程池的呢?我们先查看选择一个创建方式查看源码:

//Executors.newFixedThreadPool 创建定长线程池方式
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

查看源码可知,创建一个定长线程池的静态方法内部实例化了一个线程池执行类ThreadPoolExecutor,再次进入ThreadPoolExecutor类查看源码:

//ThreadPoolExecutor 线程池执行类的一个有参数构造方法
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
// ThreadPoolExecutor 线程池执行类内部构造方法,
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

查看源码可知,创建线程池是本质上是实例化了一个ThreadPoolExecutor 线程执行类,且传入了多个构造参数。ThreadPoolExecutor 线程执行类内部此时仅仅是将这些配置参数赋值给这些变量,已备后续线程池执行时候对线程的创建、销毁、调用等操作。

根据线程池的使用场景,我选用excute() 执行方法进行源码解读:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

如源码所示,可知excute()方法主要做了如下三件事:
1、如果运行线程少于corePoolSize 核心线程会尝试启动一个新线程执行任务,并在addWorker方法中检验runState和workerCount以防止报警。
2、如果任务可以成功排队,也要检查是否应该新建一个线程。因为可能在上次检查后已有线程死亡或者线程池关闭,这个时候就需要回滚排队重新创建一个线程执行任务。
3、如果任务不能排队,我们应该尝试新增一个线程。如果新增线程失败我们应该知道已经关闭或者饱和,此时就会用拒绝策略提示用户

当然还有其他的一些方法如:submit()提交任务、shutdown()关闭线程池、shutdownNow()立即关闭线程池。这些源码也较为简单,可以自行阅读。

ThreadPoolExecutor 类构造参数含义

corePoolSize:核心线程数量
maximumPoolSize: 最大线程数量
keepAliveTime:空闲线程存活时间,当线程数量大于corePoolSize核心线程数,且这些线程处于空闲状态,你们超过这个存活时间的线程将被销毁
TimeUnit:线程时间单位
BlockingQueue:线程阻塞队列,我们可以根据自身情况传入喜欢的阻塞队列
ThreadFactory: 线程创建工厂
RejectedExecutionHandler:线程池拒绝策略,线程池根据传入的配置参数在特定情况下会触发拒绝策略,目前常用的拒绝策略有:
1、直接抛出异常,这也是默认的策略,实现类为AbortPolicy;
2、用调用者所在的线程来执行任务,实现类为CallerRunsPolicy;
3、丢弃队列中最靠前的任务并执行当前任务,实现类为DiscardOldestPolicy;
4、直接丢弃当前任务,实现类为DiscardPolicy。

线程池运行规则

线程池执行任务运行规则如下:
1、如果运行线程数小于 corePoolSize 核心线程数,无论核心线程是否空闲都会新建一个线程执行
2、如果运行线程数大于、等于 corePoolSize 核心线程数,小于 maximumPoolSize 最大线程数,如果BlockingQueue 阻塞队列已满则新建一个线程执行,如果没有满则放入阻塞队列等待空闲线程执行
3、如果运行线程数大于maximumPoolSize,且BlockingQueue 阻塞队列已满则会执行RejectedExecutionHandler 异常策略,默认是直接抛出异常
4、如果运行线程数据大于 corePoolSize 核心线程数量,且存在空闲线程的情况,空闲线程会在 keepAliveTime 存活时间超时被踢掉,直至线程数等于 corePoolSize 核心线程数量

线程设置数量

1、对于CPU密集型任务,需要尽量的压榨CPU,一般建议线程数量为 nCPU + 1
2、对于IO密集型任务,一般建议线程数量为 2nCPU

结语

线程池的灵活运用是多线程开发以满足高并发场景的一大利器,在开发高并发功能业务时候,应当合理使用多线程。特别是应当理解 ThreadPoolExecutor 核心类源码设计,以便于我们创建出适宜的线程池。水能载舟亦能覆舟,良好多线程运用可以提升系统吞吐量,滥用多线程也会导致异常情况的发生。


http://www.kler.cn/a/1366.html

相关文章:

  • 谷歌开放语音命令数据集,助力初学者踏入音频识别领域
  • Three.js 渲染技术:打造逼真3D体验的幕后功臣
  • java1-相对路径与绝对路径
  • 2025年XR行业展望:超越虚拟,融合现实
  • springmvc前端传参,后端接收
  • 人工智能-机器学习之多元线性回归(项目实践一)
  • SpringBoot(微服务)注册分布式Consul
  • 第十四届蓝桥杯三月真题刷题训练——第 15 天
  • Linux环境C语言开发基础
  • 进程和线程的区别和联系
  • Linux分文件编程:静态库与动态库的生成和使用
  • Java二叉树的前中后序遍历
  • Hive 数据倾斜
  • 超详细的堆排序,进来看看吧。
  • ESP32设备驱动-LM35温度传感器驱动
  • 深入理解WebSocket协议
  • Ribbon负载均衡的原理(源码分析)
  • 网络编程1(网络背景知识)
  • 全面剖析OpenAI发布的GPT-4比其他GPT模型强在哪里
  • win10下使用docker运行部署nginx,mysql
  • java如何创建线程
  • JVM监控搭建
  • 改进YOLO系列 | CVPR2023最新 PConv | 提供 YOLOv5 / YOLOv7 / YOLOv7-tiny 模型 YAML 文件
  • day2 —— 判断字符串中的字符是否唯一
  • *p++,*(p++),*++p,(*p)++区别?
  • 蓝桥杯嵌入式--字符串比较在串口通信中的应用