当前位置: 首页 > article >正文

JUC并发—11.线程池源码分析

大纲

1.线程池的优势和JUC提供的线程池

2.ThreadPoolExecutor和Excutors创建的线程池

3.如何设计一个线程池

4.ThreadPoolExecutor线程池的执行流程

5.ThreadPoolExecutor的源码分析

6.如何合理设置线程池参数 + 定制线程池

1.线程池的优势和JUC提供的线程池

(1)为什么使用线程池

(2)线程池的优势

(3)JUC提供的线程池

(1)为什么使用线程池

线程的创建涉及内存分配、CPU资源使用、系统调用从用户态切换到内核态,这些都是比较耗时的操作,当这些操作频率非常高时,会严重影响性能。为了解决这个问题,Java提供了线程池技术。

线程池运用的是一种池化技术,池化技术就是提前创建好大量的资源保存在某个容器中,在需要使用时可直接从容器中获取资源,用完之后会进行回收以便下次使用。

池化技术是一种比较常见的设计思想,在请求量大时能很好优化应用的性能,减少频繁创建和销毁资源的性能开销。常见的池化技术实现有:线程池、数据库连接池、对象池、内存池等,需要用到池化技术的场景对应的资源都比较"费时且昂贵"。

(2)线程池的优势

合理使用线程池,可以带来很多好处:

一.减少频繁创建和销毁线程的性能开销

二.重复利用线程,避免对每个任务都创建线程,可以提高响应速度

三.合理设置线程池的大小,可以避免因为线程池过大影响性能

(3)JUC提供的线程池

可以通过如下两种方式来创建线程池:

一.ThreadPoolExecutor线程池的具体实现类

二.Executors的工厂方法

Executors中常用的线程池分4种:

一.Fixed线程池里有固定数量的线程

二.Single线程池里只有一个线程

三.Cached线程池里的线程数量不固定,但线程空闲的时候会被释放

四.Scheduled线程池里有固定数量的线程,可以延期执行和周期执行

2.ThreadPoolExecutor和Excutors创建的线程池

(1)ThreadPoolExecutor构造方法的参数

(2)Excutors创建的线程池的实现

(1)ThreadPoolExecutor构造方法的参数

一.核心线程数corePoolSize

二.最大线程数maximumPoolSize

三.线程存活时间keepAliveTime

四.线程存活时间的单位TimeUnit unit

五.阻塞队列workQueue

用来存放待处理任务的阻塞队列BlockingQueueworkQueue。

六.线程工厂threadFactory

用来创建线程池中的线程的线程工厂ThreadFactory threadFactory。

七.拒绝策略handler

当线程池处于满负荷状态时,无法处理后续进来的任务便会采用拒绝策略。

public class ThreadPoolExecutor extends AbstractExecutorService {
    ...
    public ThreadPoolExecutor(
        int corePoolSize,//核心线程数
        int maximumPoolSize,//最大线程数
        long keepAliveTime,//线程存活时间
        TimeUnit unit,//线程存活时间的单位
        BlockingQueue<Runnable> workQueue,//阻塞队列,用来存放待处理的任务
        ThreadFactory threadFactory,//线程工厂,用来创建线程池中的线程
        RejectedExecutionHandler handler//拒绝策略,当线程池处于满负荷状态时,无法处理后续进来的任务便会采用拒绝策略
    ) {
        if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) { 
            throw new IllegalArgumentException();
        }
        if (workQueue == null || threadFactory == null || handler == null) {
            throw new NullPointerException();
        }
        this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    ...
}

(2)Excutors创建的线程池的实现

一.newFixedThreadPool()方法

二.newSingleThreadExecutor()方法

三.newCachedThreadPool()方法

四.newScheduledThreadPool()方法

一.newFixedThreadPool()方法

该方法提供的是一个有固定线程数量的线程池,最多创建nThreads个线程。

该方法在构建ThreadPoolExecutor时,核心线程数和最大线程数都是nThreads。在某些需要限制线程数量的场景中,可采用newFixedThreadPool()方法创建一个Fixed线程池。

Fixed线程池会通过固定数量的线程,配合一个无界队列来处理提交的任务。

public class Executors {
    ...
    //Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue.  
    //At any point, at most nThreads threads will be active processing tasks.
    //If additional tasks are submitted when all threads are active, 
    //they will wait in the queue until a thread is available.
    //If any thread terminates due to a failure during execution prior to shutdown, 
    //a new one will take its place if needed to execute subsequent tasks.
    //The threads in the pool will exist until it is explicitly ExecutorService#shutdown shutdown.
    //Fixed线程池会通过固定数量的线程,配合一个无界队列来处理提交的任务;
    //无论什么时候最多只能有指定数量的线程来处理任务;
    //如果线程池里所有的线程都在繁忙地处理任务,此时再次提交任务时,就只能把任务压入无界队列中等待;
    //如果线程池里的某线程挂掉了,此时会启动一个新的线程加入到线程池中;
    //线程池里的线程会一直存活,等待处理新提交过来的任务,直到关闭线程池;
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(
            nThreads,//核心线程数
            nThreads,//最大线程数
            0L,//线程存活时间
            TimeUnit.MILLISECONDS,//线程存活时间的单位
            new LinkedBlockingQueue<Runnable>()//阻塞队列,用来存放待处理的任务
        );
    }
    ...
}

二.newSingleThreadExecutor()方法

该方法提供只有一个线程的线程池,这意味着所有任务只会由一个线程来执行,因此可以保证任务执行的顺序。

Single线程池会通过一个线程,配合一个无界队列来处理提交的任务。

public class Executors {
    ...
    //Creates an Executor that uses a single worker thread operating off an unbounded queue. 
    //(Note however that if this single thread terminates due to a failure during execution prior to shutdown, 
    //a new one will take its place if needed to execute subsequent tasks.)
    //Tasks are guaranteed to execute sequentially, and no more than one task will be active at any given time. 
    //Unlike the otherwise equivalent
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService(
            new ThreadPoolExecutor(
                1,//核心线程数
                1,//最大线程数
                0L,//线程存活时间
                TimeUnit.MILLISECONDS,//线程存活时间的单位
                new LinkedBlockingQueue<Runnable>()//阻塞队列,用来存放待处理的任务
            )
        );
    }
    ...
}

三.newCachedThreadPool()方法

该方法提供一个可以缓存线程的线程池,核心线程数0,最大线程数MAX_VALUE,阻塞队列SynchronousQueue。

SynchronousQueue是一个没有存储容器的阻塞队列,一个生产者对其进行的插入操作在消费者消费之前会被阻塞。所以提交一个任务到Cached线程池时,线程池会分配一个线程来处理任务。

当任务量较多时,newCachedThreadPool()方法会创建多个线程来处理。当任务量下降时,并不会马上回收这些新创建的线程,而是会缓存起来。缓存起来的线程在60秒后还处于空闲状态才会被回收。因此newCachedThreadPool()方法适合处理突发流量。

public class Executors {
    ...
    //Creates a thread pool that creates new threads as needed, 
    //but will reuse previously constructed threads when they are available.
    //These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.
    //Calls to execute will reuse previously constructed threads if available. 
    //If no existing thread is available, a new thread will be created and added to the pool. 
    //Threads that have not been used for sixty seconds are terminated and removed from the cache. 
    //Thus, a pool that remains idle for long enough will not consume any resources. 
    //Note that pools with similar properties but different details (for example, timeout parameters)
    //may be created using ThreadPoolExecutor constructors.
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(
            0,//核心线程数
            Integer.MAX_VALUE,//最大线程数
            60L,//线程存活时间
            TimeUnit.SECONDS,//线程存活时间的单位
            new SynchronousQueue<Runnable>()//阻塞队列,用来存放待处理的任务
        );
    }
    ...
}

四.newScheduledThreadPool()方法

该方法会创建一个执行定时任务或者周期性任务的线程池,核心线程数为传入的corePoolSize,最大线程数为MAX_VALUE。该方法比较适合用来实现定时任务,比如心跳检测、定时轮询等。

public class Executors {
    ...
    //Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    //Creates a new ScheduledThreadPoolExecutor with the given core pool size.
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
    }
    ...
}

3.如何设计一个线程池

(1)如何让线程可以重复使用

(2)需要构建生产者消费者模型

(3)需要考虑拒绝策略

(4)需要回收非核心线程

(1)如何让线程可以重复使用

对于线程来说,本身的调度和执行并不由开发者控制,而且线程是当Thread中的run()方法结束后自动销毁完成回收的,所以应该如何让线程可重复使用?

可以在run()方法中增加一个while(true)循环,只要run()方法一直运行,那么线程就不会被回收。虽然while(true)可以让线程不被销毁,但是线程一直运行会占用CPU资源。

我们希望线程池中线程的运行机制是:有任务的时候执行任务,没有任务的时候不需要做无效的运转。所以可以让线程在没有任务的时候阻塞,这样就不会占用CPU资源了。

因此我们对于线程池中的线程复用的要求是:如果有任务时,线程池中的线程会处理这个任务。如果没有任务,就让线程池中的线程进行阻塞。

(2)需要构建生产者消费者模型

为了构建生产者消费者模型,可以使用阻塞队列。通过线程池的execute(task)方法提交任务时,任务会被放入阻塞队列。线程池的线程在循环执行时,会不断从阻塞队列中获取任务来执行。所以当阻塞队列中没有任务时,线程池的线程就会被阻塞住。

注意:ConcurrentLinkedQueue是无界队列,LinkedBlockingQueue / ArrayBlockingQueue是有界队列,LinkedBlockingQueue的阻塞效果是队列满放数据和队列空取数据都会阻塞。

public class ThreadPool {
    public void execute(Task task) {
        blockingQueued.put(task);//将任务提交到阻塞队列 
    }
    
    public class WorkThread extends Thread {
        @Override
        public void run() {
            while(true) {
                Task task = blockingQueued.take();//从阻塞队列中获取任务        
            }    
        }
    }
}

(3)需要考虑拒绝策略

如果生产者的请求非常多,阻塞队列可能满了。此时要么提高消费者的消费能力,要么限流生产者降低生产者的生产频率。对应于线程池就是,要么增加线程数量,要么拒绝处理满了之后的任务。

所以当阻塞队列满了之后,一般会先考虑扩容,也就是增加线程数量。如果扩容之后,生产者的请求量依然很大,那只能再采取拒绝策略来处理。

(4)需要回收非核心线程

当引入线程扩容机制解决请求任务过多的问题后,生产者的请求量开始减少。此时线程池中就不需要那么多线程来处理任务了,新增的线程最好进行回收。

线程的回收就是让线程跳出while循环,当run()方法执行完毕后会自动销毁。所以可以让线程池的线程从阻塞队列中获取任务时,如果等待一段时间后还没获取到任务,说明当前线程池处于空闲状态。这也就意味着这个线程也没有必要继续等待了,于是直接退出即可。

4.ThreadPoolExecutor线程池的执行流程

当调用execute(Runnable command)方法往线程池提交一个任务后,线程池首先会判断的核心线程是否已经初始化。

如果核心线程还没有初始化,则创建核心线程并启动该线程,该线程会从阻塞队列workQueue中获取任务并执行。然后再将command任务添加到阻塞队列workQueue中。

如果阻塞队列workQueue满了,则尝试创建非核心线程并启动。这些非核心线程也会从阻塞队列workQueue中获取任务并执行。

如果线程池中总的线程数达到阈值且阻塞队列已经满了,则执行拒绝策略。

5.ThreadPoolExecutor的源码分析

(1)线程池的成员变量

(2)线程池状态和线程数量的存储

(3)线程池的状态机及变更

(4)线程池的execute()方法

(5)线程池的addWorker()方法

(6)线程池的工作线程Worker

(7)线程池的runWorker()方法

(8)线程池的getTask()方法

(9)线程池的processWorkerExit()方法

(10)线程池的拒绝策略

(1)线程池的成员变量

public class ThreadPoolExecutor extends AbstractExecutorService {
    ...
    //用来存储线程池的状态和线程数量的原子变量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    //因为Integer.SIZE = 32,所以COUNT_BITS = 29,用29位来表示线程数量
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //表示线程池容量
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
    //线程池的状态
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    
    //阻塞队列
    private final BlockingQueue<Runnable> workQueue;
    
    //重入锁
    private final ReentrantLock mainLock = new ReentrantLock();

    //存储工作线程的集合
    private final HashSet<Worker> workers = new HashSet<Worker>();
    private final Condition termination = mainLock.newCondition();

    //记录线程池在整个生命周期中出现过的最大线程个数
    private int largestPoolSize;

    //已完成任务数
    private long completedTaskCount;
    
    //线程工厂,用来创建工作线程
    private volatile ThreadFactory threadFactory;

    //线程拒绝策略
    private volatile RejectedExecutionHandler handler;

    //空闲线程的存活时间
    private volatile long keepAliveTime;

    //是否允许核心线程
    private volatile boolean allowCoreThreadTimeOut;

    //核心线程数
    private volatile int corePoolSize;

    //最大线程数
    private volatile int maximumPoolSize;
    ...
}

(2)线程池状态和线程数量的存储

一.ctl原子变量

线程池采用了一个32位的整数来存储线程池的状态和线程数量。其中高3位表示线程池的状态,低29位表示线程的数量。

这个32位的整数由ThreadPoolExecutor的ctl原子变量来表示,ctl原子变量的初始状态为RUNNING,初始线程数量为0。

public class ThreadPoolExecutor extends AbstractExecutorService {
    ...
    //用来存储线程池的状态和线程数量的原子变量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    //从整型变量c中获取线程池的状态
    private static int runStateOf(int c) {
        return c & ~CAPACITY;
    }
    
    //从整型变量c中获取当前的线程数量
    private static int workerCountOf(int c) {
        return c & CAPACITY;
    }
    
    //用来更新线程池中的ctl原子变量,也就是更新线程池的状态和线程数量
    private static int ctlOf(int rs, int wc) {
        return rs | wc;
    }
    ...
}

二.线程容量的计算

COUNT_BITS表示用来统计数量的位数,Integer.SIZE - 3 = 29。

CAPACITY表示线程池容量,大小为 (1 << 29) - 1,即表示一个线程池最多能够创建的线程数量。

//1的32位二进制表示是:0000 0000 0000 0000 0000 0000 0000 0001
//1左移29位后的32位二进制表示是:0010 0000 0000 0000 0000 0000 0000 0000
//1左移29位后再减1得到的二进制是:0001 1111 1111 1111 1111 1111 1111 1111
//1左移29位后再减1得到的二进制转为10进制是:536870911

三.线程池状态的计算

线程池的状态一共有5种,用ctl原子变量的高3位来表示不同的状态。

//-1的二进制各个位上全部是1,因为负数涉及符号位,负数通过原码->反码->补码得到
//所以:-1 << COUNT_BITS = -1 << 29 = 1110 0000 0000 0000 0000 0000 0000 0000
private static final int RUNNING    = -1 << COUNT_BITS;
//0的二进制也是0,所以:0 << COUNT_BITS = 0 << 29 = 0000 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//1的二进制是1,所以:1 << COUNT_BITS = 1 << 29 = 0010 0000 0000 0000 0000 0000 0000 0000
private static final int STOP       =  1 << COUNT_BITS;
//2的二进制是10,所以:2 << COUNT_BITS = 2 << 29 = 0100 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING    =  2 << COUNT_BITS;
//3的二进制是11,所以:3 << COUNT_BITS = 3 << 29 = 0110 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED =  3 << COUNT_BITS;

(3)线程池的状态机及变更

线程池的5种状态说明如下:

一.运行状态RUNNING

线程池此时可以接收新的任务,可以处理阻塞队列中的任务。

二.关闭状态SHUTDOWN

线程池此时不再接收新的任务,但是可以继续处理阻塞队列中的任务。

三.停止状态STOP

不再接收新的任务,也不处理阻塞队列的任务,同时中断正在处理的任务。

四.过渡状态TIDYING

该状态意味着所有的任务都执行完了,且线程池中已没有有效的工作线程,该状态下会调用terminated()方法进入TERMINATED状态。

五.终止状态TERMINATED

调用terminated()方法完成后的状态。

(4)线程池的execute()方法

向线程池提交一个任务是通过execute()方法的三个步骤来完成的。

步骤一:

首先根据ctl当前的值来判断当前线程数量是否小于核心线程数量,主要用来解决线程池中核心线程未初始化的问题。

如果是,则调用addWorker()方法创建一个核心线程并启动,同时把当前任务command传递进新创建的核心线程直接执行。如果否,则说明核心线程已经初始化了。

步骤二:

当前线程数量大于等于核心线程数量,核心线程已初始化,那么就把任务command添加到阻塞队列workQueue中。

步骤三:

如果向阻塞队列添加任务失败,说明阻塞队列workQueue已满,那么就调用addWorker()方法来创建一个非核心线程。

如果通过addWorker()方法创建一个非核心线程也失败了,则说明当前线程池中的线程总数已达到最大线程数量,此时需要调用reject()方法执行拒绝策略。

第一种情况:

如果线程池的线程数量 < corePoolSize,就创建一个新的线程并执行任务。刚开始时线程池里的线程数量一定是小于corePoolSize指定的数量,此时每提交一个任务就会创建一个新的线程放到线程池里去。

第二种情况:

如果线程池的线程数量 >= corePoolSize,就不会创建新的线程和执行任务,此时唯一做的事情就是通过offer()方法把任务提交到阻塞队列里进行排队。LinkedBlockingQueue的offer()是非阻塞的,而put()和take()则是阻塞的。队列满了offer()不会阻塞等待其他线程take()一个元素,而是直接返回false。但LinkedBlockingQueue默认下是近乎无界的,此时offer()只会返回true。

第三种情况:

如果线程池的线程数量 >= corePoolSize + 阻塞队列已满即尝试入队失败了,此时就会再次尝试根据最大线程数来创建新的非核心线程。如果创建非核心线程失败最大线程数也满了,就会执行拒绝策略降级处理。超过corePoolSize数量的线程,在keepAliveTime时间范围内空闲会被回收。

public class ThreadPoolExecutor extends AbstractExecutorService {
    ...
    //Executes the given task sometime in the future.
    //The task may execute in a new thread or in an existing pooled thread.
    //If the task cannot be submitted for execution, 
    //either because this executor has been shutdown or because its capacity has been reached,
    //the task is handled by the current RejectedExecutionHandler.
    //向线程池提交一个任务
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
     
        //Proceed in 3 steps:
        //1. If fewer than corePoolSize threads are running, 
        //try to start a new thread with the given command as its first task.  
        //The call to addWorker atomically checks runState and workerCount, 
        //and so prevents false alarms that would add threads when it shouldn't, by returning false.
      
        //2. If a task can be successfully queued, 
        //then we still need to double-check whether we should have added a thread
        //(because existing ones died since last checking) or that the pool shut down since entry into this method. 
        //So we recheck state and if necessary roll back the enqueuing if stopped, or start a new thread if there are none.
      
        //3. If we cannot queue task, then we try to add a new thread.  
        //If it fails, we know we are shut down or saturated and so reject the task.
        int c = ctl.get();
        //首先根据ctl当前的值来判断当前线程数量是否小于核心线程数量,主要用来解决线程池中核心线程未初始化的问题
        if (workerCountOf(c) < corePoolSize) {
            //如果小于,则调用addWorker()方法创建一个核心线程并启动,同时把当前任务command传递进去直接执行
            if (addWorker(command, true)) {
                return;
            }
            c = ctl.get();
        }
        //如果核心线程已经初始化,则把任务command添加到阻塞队列workQueue中
        //LinkedBlockingQueue的offer()方法是非阻塞的,而put()和take()方法则是阻塞的
        //如果该阻塞队列满了,不会阻塞等待其他线程take()一个元素,而是直接返回false
        //但LinkedBlockingQueue默认下是近乎无界的,此时可认为offer()入队时,永远返回true
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command)) {
                reject(command);
            } else if (workerCountOf(recheck) == 0) {
                addWorker(null, false);
            }
        } else if (!addWorker(command, false)) {//如果添加失败,说明队列已满,则调用addWorker()创建非核心线程
            //如果通过addWorker()方法创建非核心线程失败,则说明当前线程池中的线程总数已达到最大线程数量
            //此时需要调用reject()方法执行拒绝策略
            reject(command);
        }
    }
    
    //从整型变量c中获取当前的线程数量
    private static int workerCountOf(int c)  {
        return c & CAPACITY;
    }
    ...
}

(5)线程池的addWorker()方法

addWorker()方法会创建一个线程并启动,该线程会不断从阻塞队列workQueue中获取任务来执行。该方法有两个参数,firstTask表示要执行的任务,core表示是否是核心线程。

addWorker()方法主要工作分为三部分:

第一部分:判断是否需要创建新的Worker

第二部分:通过CAS + 自旋更新ctl中的线程数量

第三部分:创建并启动工作线程

核心就是第三部分,也就是初始化一个Worker并把firstTask传进去。

在初始化Worker的时候,首先会通过线程工厂同步创建一个新的线程。接着将新创建的线程添加到线程集合workers中,因为这里用到独占锁,所以线程集合workers使用非线程安全的HashSet即可。然后通过Thread的start()方法启动这个新创建的线程,如果线程启动失败,则要从集合中删除新增的线程,并回退增加的线程数。

public class ThreadPoolExecutor extends AbstractExecutorService {
    ...
    //Checks if a new worker can be added with respect to current pool state and the given bound (either core or maximum). 
    //If so, the worker count is adjusted accordingly, 
    //and, if possible, a new worker is created and started, running firstTask as its first task. 
    //This method returns false if the pool is stopped or eligible to shut down. 
    //It also returns false if the thread factory fails to create a thread when asked.  
    //If the thread creation fails, either due to the thread factory returning null, 
    //or due to an exception (typically OutOfMemoryError in Thread.start()), we roll back cleanly.
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            //第一部分:根据线程池状态和阻塞队列来判断是否需要创建新的Worker
            int c = ctl.get();
            int rs = runStateOf(c);


            //Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) {
                return false;
            }
            //第一部分结束
        
            //第二部分:通过CAS+自旋更新ctl中的线程数量
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) {
                    return false;
                }
                if (compareAndIncrementWorkerCount(c)) {
                    break retry;
                }
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs) {
                    continue retry;
                }
                //else CAS failed due to workerCount change; retry inner loop
            }
            //第二部分结束
        }
      
        //第三部分:创建并启动工作线程
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //初始化一个Worker并把firstTask传进去,在Worker初始化后,会同步创建一个新的线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                //增加全局锁,因为线程池在关闭时会抢占这把锁
                //这里加锁可以避免在创建工作线程时其他线程关闭线程池
                mainLock.lock();
                try {
                    //Recheck while holding lock.
                    //Back out on ThreadFactory failure or if shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
            
                    //判断是否允许添加工作线程
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) {// precheck that t is startable
                            throw new IllegalThreadStateException();
                        }
                        //因为前面已经使用了独占锁,所以workers集合使用非线程安全的HashSet即可
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize) {
                            largestPoolSize = s;
                        }
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //通过Thread的start()方法启动这个新创建的线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted) {
                //如果线程启动失败,则要从集合中删除新增的线程,并回退增加的线程数
                addWorkerFailed(w);
            }
        }
        return workerStarted;
    }
    ...
}

如果当前线程数量 < corePoolSize,执行如下:

一.自旋 + CAS设置线程数量

二.创建Worker(AQS + 线程工厂theadFactory)

三.加入线程集合workers(独占锁 + HashSet)

四.start()方法启动线程执行Worker的run()方法

run()方法又会触发执行runWorker()方法,最终执行提交的Runnable任务。

(6)线程池的工作线程Worker

Worker是ThreadPoolExecutor类中具体的工作线程。首先Worker实现了Runnable接口,这意味着Worker也是一个线程。

然后在其构造方法中会通过getThreadFactory().newThread(this),将实现了Runnable接口的Worker传递进去来创建一个线程,这样新创建的线程的引用就会指向传递进去的Worker实例。

所以在addWorker()方法中通过t.start()启动线程时,就会触发执行Worker的run()方法去调用runWorker(this)方法。

public class ThreadPoolExecutor extends AbstractExecutorService {
    ...
    //Class Worker mainly maintains interrupt control state for threads running tasks, along with other minor bookkeeping.
    //This class opportunistically extends AbstractQueuedSynchronizer to simplify acquiring and releasing a lock surrounding each task execution.
    //This protects against interrupts that are intended to wake up a worker thread waiting for a task from instead interrupting a task being run. 
    //We implement a simple non-reentrant mutual exclusion lock rather than use ReentrantLock 
    //because we do not want worker tasks to be able to reacquire the lock when they invoke pool control methods like setCorePoolSize.  
    //Additionally, to suppress interrupts until the thread actually starts running tasks, 
    //we initialize lock state to a negative value, and clear it upon start (in runWorker).
    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        //Thread this worker is running in.  Null if factory fails.
        final Thread thread;
        //Initial task to run.  Possibly null.
        Runnable firstTask;

        //Creates with given first task and thread from ThreadFactory.
        //@param firstTask the first task (null if none)
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //通过getThreadFactory().newThread()方法将实现了Runnable接口的Worker传递进去来创建一个线程
            //这样线程的引用就会指向传递进去的Worker实例
            //所以在addWorker()方法中通过t.start()方法启动线程时,就会触发执行Worker的run()方法
            this.thread = getThreadFactory().newThread(this);
        }

        //Delegates main run loop to outer runWorker
        public void run() {
            runWorker(this);
        }
        ...
    }
    ...
}

(7)线程池的runWorker()方法

runWorker()是工作线程Worker启动后执行的方法,它的主要功能就是不断从阻塞队列中获取任务并执行。只要获取的task任务不为空就循环调用getTask()方法从阻塞队列中获取任务。

如果获取的task任务不为空,就可以调用task任务的run()方法来执行任务。如果获取的task任务为空,则执行processWorkerExit()处理工作线程的回收。

public class ThreadPoolExecutor extends AbstractExecutorService {
    ...
    //Main worker run loop. 
    //Repeatedly gets tasks from queue and executes them, while coping with a number of issues:
    
    //1.We may start out with an initial task, in which case we don't need to get the first one. 
    //Otherwise, as long as pool is running, we get tasks from getTask. 
    //If it returns null then the worker exits due to changed pool state or configuration parameters.
    //Other exits result from exception throws in external code, in which case completedAbruptly holds, 
    //which usually leads processWorkerExit to replace this thread.
    
    //2.Before running any task, the lock is acquired to prevent other pool interrupts while the task is executing, 
    //and then we ensure that unless pool is stopping, this thread does not have its interrupt set.
    
    //3.Each task run is preceded by a call to beforeExecute, which might throw an exception, 
    //in which case we cause thread to die (breaking loop with completedAbruptly true) without processing the task.
    
    //4.Assuming beforeExecute completes normally, we run the task, 
    //gathering any of its thrown exceptions to send to afterExecute.
    //We separately handle RuntimeException, Error (both of which the specs guarantee that we trap) and arbitrary Throwables.
    //Because we cannot rethrow Throwables within Runnable.run, 
    //we wrap them within Errors on the way out (to the thread's UncaughtExceptionHandler).  
    //Any thrown exception also conservatively causes thread to die.
    
    //5.After task.run completes, we call afterExecute, which may also throw an exception, which will also cause thread to die.
    //According to JLS Sec 14.20, this exception is the one that will be in effect even if task.run throws.
    
    //The net effect of the exception mechanics is that afterExecute and 
    //the thread's UncaughtExceptionHandler have as accurate information as we can provide about any problems encountered by user code.
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //只要task不为空就一直循环,getTask()会从阻塞队列中获取任务
            while (task != null || (task = getTask()) != null) {
                //由于Worker继承了AQS,所以w.lock()表示当前Worker要开始执行任务了
                w.lock();
                //If pool is stopping, ensure thread is interrupted; if not, ensure thread is not interrupted.
                //This requires a recheck in second case to deal with shutdownNow race while clearing interrupt
                //用来判断是否应该中断当前线程
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) { 
                    wt.interrupt();//中断当前线程
                }
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //通过调用task任务的run()方法来执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //处理工作线程的回收
            processWorkerExit(w, completedAbruptly);
        }
    }
    ...
}

(8)线程池的getTask()方法

getTask()方法用来从阻塞队列中获取任务,它会返回一个Runnable对象。这个Runnable对象表示的是:由线程池的execute()方法通过workQueue.offer()提交到阻塞队列中的任务。getTask()方法的核心是通过workQueue.take()方法从阻塞队列中获取任务。

如果允许线程获取任务超时后退出,那么只需满足以下两个条件之一即可:

一.配置项(allowCoreThreadTimedOut)为true

二.当前线程数量超过了corePoolSize

allowCoreThreadTimedOut默认是false,也就是不允许核心线程因为获取不到任务就超时退出。如果allowCoreThreadTimedOut设置为true,那么会导致线程池中的核心线程超过一定时间获取不到任务就会自动退出。

超过corePoolSize数量的线程,在keepAliveTime时间范围内空闲会被回收,这是通过getTask()方法在超过keepAliveTime时间范围内都没能获取到任务来实现的。

public class ThreadPoolExecutor extends AbstractExecutorService {
    ...
    //Performs blocking or timed wait for a task, depending on current configuration settings, 
    //or returns null if this worker must exit because of any of:
    //1.There are more than maximumPoolSize workers (due to a call to setMaximumPoolSize).
    //2.The pool is stopped.
    //3.The pool is shutdown and the queue is empty.
    //4.This worker timed out waiting for a task, and timed-out workers are subject to termination 
    //(that is, allowCoreThreadTimeOut || workerCount > corePoolSize) both before and after the timed wait, 
    //and if the queue is non-empty, this worker is not the last thread in the pool.
    //@return task, or null if the worker must exit, in which case workerCount is decremented
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {//自旋
            int c = ctl.get();
            int rs = runStateOf(c);

            //Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);

            //Are workers subject to culling?
            //是否允许线程超时后退出
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c)) {
                    return null;
                }
                continue;
            }
           
            try {
                //通过workQueue.take()方法从阻塞队列中获取任务
                //如果timed为true,即允许线程超时后退出,那么就使用poll()超时获取则返回null来结束线程的run()方法
                //如果timed为false,即不允许线程超时后退出,那么就使用take()方法阻塞式获取队列中的任务,此时不会结束线程的run()方法 
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null) {
                    return r;
                }
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    ...
}

(9)线程池的processWorkerExit()方法

线程的回收只需让run()方法的逻辑执行结束即可。

processWorkerExit()方法的逻辑是:

首先记录总的完成任务数,然后把当前线程从集合中移除,接着尝试结束线程池,最后判断是否需要再增加一个工作线程来处理任务。

public class ThreadPoolExecutor extends AbstractExecutorService {
    ...
    //Performs cleanup and bookkeeping for a dying worker.
    //Called only from worker threads. Unless completedAbruptly is set,
    //assumes that workerCount has already been adjusted to account for exit.
    //This method removes thread from worker set, 
    //and possibly terminates the pool or replaces the worker 
    //if either it exited due to user task exception or 
    //if fewer than corePoolSize workers are running or queue is non-empty but there are no workers.
    //@param w the worker
    //@param completedAbruptly if the worker died due to user exception
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) {//If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //记录总的完成任务数
            completedTaskCount += w.completedTasks;
            //把当前线程从集合中移除
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
      
        //尝试结束线程池
        tryTerminate();

        int c = ctl.get();
        //如果线程池还处于RUNNING或SHUTDOWN状态,则需要判断是否需要再增加一个工作线程来处理线程池中的任务
        if (runStateLessThan(c, STOP)) {
            //completedAbruptly = true,说明执行的任务时出现了异常
            if (!completedAbruptly) {
                //min代表的是核心线程数量
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                //没有工作线程,阻塞队列不为空
                if (min == 0 && ! workQueue.isEmpty()) {
                    min = 1;
                }
                //如果工作线程的数量大于等于线程池的最小线程数量
                if (workerCountOf(c) >= min) {
                    return; // replacement not needed
                }
            }
            //如果执行的任务时出现了异常则添加工作线程
            //如果工作线程数量小于最小线程数量则添加工作线程
            addWorker(null, false);
        }
    }
    ...
}

(10)线程池的拒绝策略

在执行线程池的execute()方法向线程池提交一个任务时,如果阻塞队列和工作线程都满了,那么该任务只能通过拒绝策略来降级处理。

ThreadPoolExecutor提供了4种拒绝策略:

一.AbortPolicy策略

这是ThreadPoolExecutor默认使用的拒绝策略,这种策略就是简单抛出一个RejectedExecutionHandler异常。这种策略适合用在一些关键业务上,如果这些业务不能承载更大的并发量,那么可以通过抛出的异常及时发现问题并做出相关处理。

二.CallerRunsPolicy策略

只要线程池没有被关闭,就由提交任务的线程执行任务的run()方法来执行,这种策略相当于保证了所有的任务都必须执行完成。

三.DiscardPolicy策略

直接把任务丢弃,不做任何处理,这种策略使得系统无法发现具体的问题,建议用在不重要的业务上。

四.DiscardOldestPolicy策略

如果线程池没有调用shutdown()方法,则通过workQueue.poll()把阻塞队列头部也就是等待最久的任务丢弃,然后把当前任务通过execute()方法提交到阻塞队列中。

public class ThreadPoolExecutor extends AbstractExecutorService {
    ...
    //Handler called when saturated or shutdown in execute.
    private volatile RejectedExecutionHandler handler;
    //The default rejected execution handler
    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
    
    //向线程池提交一个任务
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        int c = ctl.get();
        //首先根据ctl当前的值来判断当前线程数量是否小于核心线程数量,主要用来解决线程池中核心线程未初始化的问题
        if (workerCountOf(c) < corePoolSize) {
            //如果小于,则调用addWorker()方法创建一个核心线程并启动,同时把当前任务command传递进去直接执行
            if (addWorker(command, true)) {
                return;
            }
            c = ctl.get();
        }
        //如果核心线程已经初始化,则把任务command添加到阻塞队列workQueue中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command)) {
                reject(command);
            } else if (workerCountOf(recheck) == 0) {
                addWorker(null, false);
            }
        } else if (!addWorker(command, false)) {//如果添加失败,说明队列已满,则调用addWorker()创建非核心线程
            //如果通过addWorker()方法创建非核心线程失败,则说明当前线程池中的线程总数已达到最大线程数量
            //此时需要调用reject()方法执行拒绝策略
            reject(command);
        }
    }
    
    //Invokes the rejected execution handler for the given command.
    //Package-protected for use by ScheduledThreadPoolExecutor.
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

    ...
    
    //A handler for rejected tasks that throws a RejectedExecutionException.
    //这是ThreadPoolExecutor默认使用的拒绝策略
    //这种策略就是简单抛出一个RejectedExecutionHandler异常
    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() {
        }

        //Always throws RejectedExecutionException.
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
        }
    }
    
    //A handler for rejected tasks that runs the rejected task directly in the calling thread of the execute method,
    //unless the executor has been shut down, in which case the task is discarded.
    //只要线程池没有被关闭,就由提交任务的线程执行任务的run()方法来执行
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() {
        }

        //Executes task r in the caller's thread, 
        //unless the executor has been shut down, in which case the task is discarded.
        //@param r the runnable task requested to be executed
        //@param e the executor attempting to execute this task
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
    
    //A handler for rejected tasks that silently discards the rejected task.
    //直接把任务丢弃,不做任何处理
    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() {
        }

        //Does nothing, which has the effect of discarding task r.
        //@param r the runnable task requested to be executed
        //@param e the executor attempting to execute this task
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

        }
    }

    //A handler for rejected tasks that discards the oldest unhandled request and then retries execute, 
    //unless the executor is shut down, in which case the task is discarded.
    //通过workQueue.poll()把阻塞队列头部也就是等待最久的任务丢弃
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() {
        }

        //Obtains and ignores the next task that the executor would otherwise execute, 
        //if one is immediately available, and then retries execution of task r, 
        //unless the executor is shut down, in which case task r is instead discarded.
        //@param r the runnable task requested to be executed
        //@param e the executor attempting to execute this task
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
}

注意:使用无界队列的线程池(如Fixed线程池),不会使用拒绝策略。

public class ThreadPoolExecutor extends AbstractExecutorService {
    ...
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(
            nThreads,//核心线程数
            nThreads,//最大线程数
            0L,//线程存活时间
            TimeUnit.MILLISECONDS,//线程存活时间的单位
            new LinkedBlockingQueue<Runnable>()//阻塞队列,用来存放待处理的任务
        );
    }
    
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true)) {
                return;
            }
            c = ctl.get();
        }
        //通过LinkedBLockingQueue的offer()方法添加任务到无界的阻塞队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command)) {
                reject(command);
            } else if (workerCountOf(recheck) == 0) {
                addWorker(null, false);
            }
        }
        //Fixed线程池不会执行到这里
        else if (!addWorker(command, false)) {
            reject(command);
        }
    }
    ...
}

6.如何合理设置线程池参数 + 定制线程池

(1)线程池的核心参数

(2)如何设置线程池的大小

(3)如何动态设置线程池参数

(4)定制线程池的注意事项

(1)线程池的核心参数

构建线程池时的核心参数其实就是线程数量和队列类型及长度:corePoolSize、MaximumPoolSize、workQueue类型、workQueue长度。

如果最大线程数设置过大,可能会创建大量线程导致不必要的上下文切换;

如果最大线程数设置过小,可能会频繁触发线程池的拒绝策略影响运行;

如果阻塞队列为无界队列,会导致线程池的非核心线程无法被创建,从而导致最大线程数量的设置是失效的,造成大量任务堆积在阻塞队列中。如果这些任务涉及上下游请求,那么就会造成大量请求超时失败。

(2)如何设置线程池的大小

要看当前线程池中执行的任务是属于IO密集型还是CPU密集型。

一.IO密集型

就是线程需要频繁和磁盘或者远程网络通信的场景,这种场景中磁盘的耗时和网络通信的耗时较大。这意味着线程处于阻塞期间,不会占用CPU资源,所以线程数量超过CPU核心数并不会造成问题。

二.CPU密集型

就是对CPU的利用率较高的场景,比如循环、递归、逻辑运算等。这种场景下线程数量设置越少,就越能减少CPU的上下文频繁切换。

如果N表示CPU的核心数量,那么:

对于CPU密集型,线程池大小可以设置为N + 1;

对于IO密集型,线程池大小可以设置为2N + 1;

(3)如何动态设置线程池参数

一.线程池的setMaximumPoolSize()方法可以动态设置最大线程数量;

二.线程池的setCorePoolSize()方法可以动态设置核心线程数量;

三.线程池没有动态设置队列大小的方法,但可以继承LinkedBlockingQueue实现一个队列,提供方法修改其大小。然后根据该队列去创建线程池,这样就能实现动态修改阻塞队列大小了;

(4)定制线程池的注意事项

当线程池的线程数少于corePoolSize,会自动创建核心线程。这是在执行execute()方法提交任务时,由addWorker()方法创建的。线程创建后,会通过阻塞队列的take()方法阻塞式地获取任务,所以这些线程(也就是核心线程)是不会自动退出的。

当线程池的线程数达到corePoolSize后,execute()提交的任务会直接入队。如果阻塞队列是有界队列且队列满了,入队失败,就尝试创建非核心线程。但是要注意此时线程池里的线程数量最多不能超过maximumPoolSize,而且非核心线程在一定时间内获取不到任务,就会自动退出释放线程。

当阻塞队列满了+线程数达到maximumPoolSize,就执行拒绝策略来降级。拒绝策略有4种:抛出异常、提交线程自己执行、丢弃任务、丢弃头部任务。


http://www.kler.cn/a/558852.html

相关文章:

  • 通过Hive小文件合并(CombineHiveInputFormat)减少80%的Map任务数
  • w803|联盛德|WM IoT SDK2.X测试|window11|VSCODE|(4):IDE配置
  • 解决Open WebU无法显示基于OpenAI API接口的推理内容的问题
  • 【开源项目】分布式文本多语言翻译存储平台
  • 关于Python的一些基础知识(太阳太阳,持续更新)
  • 【DeepSeek-R1】写了个DeepSeek-R1本地软件,欢迎参与测试
  • 使用ESP-IDF来驱动INMP441全向麦克风
  • Redis 如何实现消息队列?
  • Python|OpenCV-实现人物眨眼检测(21)
  • 《从GPT-4到“东数西算”:AI算力的全球格局与技术趋势》
  • 五、Three.js顶点UV坐标、纹理贴图
  • 算法与数据结构(旋转链表)
  • (四)趣学设计模式 之 原型模式!
  • YOLOv12:以注意力为中心的物体检测
  • Servlet 国际化
  • Python pip 缓存清理:全面方法与操作指南
  • 失眠治愈手册(二):问题优化
  • 一文2000字从0到1用Jmeter全流程性能测试实战
  • 为 Power Automate 注册 Adobe PDF Services
  • 深度学习训练camp:第R4周: Pytorch实现:LSTM-火灾温度预测