当前位置: 首页 > article >正文

【源码解析】聊聊线程池 实现原理与源码深度解析(一)

一、Java 线程池 实现原理与源码深度解析

架构

在这里插入图片描述
总揽线程池设计,其实可以发现都是符合顶层的接口设计,中间抽象类,最终是实际工作类

使用示例

public class MyRunnable implements Runnable{

    @Override
    public void run() {
        System.out.println("我是要执行的任务,正在处理中");
    }

    public static void main(String[] args) {
        Runnable runnable = new MyRunnable();
        Runnable runnable2 = new MyRunnable();

        ExecutorService threadPool = Executors.newFixedThreadPool(1);
        threadPool.execute(runnable);
        threadPool.execute(runnable2);

        try {
            threadPool.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Executor接口

public interface Executor {

    void execute(Runnable command);
}

只定义了一个接口方法,核心就是处理具体的工作任务。

ExecutorService

public interface ExecutorService extends Executor {

    // 关闭线程池,已提交的任务继续执行,不接受继续提交新任务
    void shutdown();

    // 关闭线程池,尝试停止正在执行的所有任务,不接受继续提交新任务
    // 它和前面的方法相比,加了一个单词“now”,区别在于它会去停止当前正在进行的任务
    List<Runnable> shutdownNow();

    // 线程池是否已关闭
    boolean isShutdown();

    // 如果调用了 shutdown() 或 shutdownNow() 方法后,所有任务结束了,那么返回true
    // 这个方法必须在调用shutdown或shutdownNow方法之后调用才会返回true
    boolean isTerminated();

    // 等待所有任务完成,并设置超时时间
    // 我们这么理解,实际应用中是,先调用 shutdown 或 shutdownNow,
    // 然后再调这个方法等待所有的线程真正地完成,返回值意味着有没有超时
    boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;

    // 提交一个 Callable 任务
    <T> Future<T> submit(Callable<T> task);

    // 提交一个 Runnable 任务,第二个参数将会放到 Future 中,作为返回值,
    // 因为 Runnable 的 run 方法本身并不返回任何东西
    <T> Future<T> submit(Runnable task, T result);

    // 提交一个 Runnable 任务
    Future<?> submit(Runnable task);

    ......
}

AbstractExecutorService

其实java中框架的特点都是设计一个顶层接口,然后有基于顶层接口设计相关的抽象基础类,实现一些基础的功能。AbstractExecutorService最核心的方法就是newTaskFor() 将任务包装成一个FutureTask,FutureTask间接实现了Runnable类。但是最核心的方法,execute AbstractExecutorService是没有实现的。而是需要子类进行实现。

public abstract class AbstractExecutorService implements ExecutorService {

    /**
     * 将任务包装成FutureTask任务。
     */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

   
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        //1.将任务包装成RunableFuture对象,由于RunnableFuture是实现Runable类,所以execute的参数是一个可拓展的类型
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        //2,交给具体的执行器进行实现
        execute(ftask);
        return ftask;
    }

   
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
}

ThreadPoolExecutor

我们知道通过Executors.newXXX 可以创建固定线程池、可缓存线程池等,但是最终指向的都是ThreadPoolExecutor类

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        //参数异常值处理,直接返回异常
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        //核心线程数
        this.corePoolSize = corePoolSize;
        //最大线程数
        //默认使用核心线程,超过直接,将任务存储到任务队列中,如果任务队列中也满了,查看是否
        //超过最大线程数,如果没有超过就new一个线程执行任务。
        //异常情况,
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        //线程工厂,一般推荐自定义的
        this.threadFactory = threadFactory;
        //拒绝策略
        this.handler = handler;
    }
  • corePoolSize

线程池中的核心线程数。

  • maximumPoolSize

最大线程数,线程池允许创建的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;当阻塞队列是无界队列, 则maximumPoolSize则不起作用, 因为无法提交至核心线程池的线程会一直持续地放入workQueue。

无界队列会一直存储任务,所以不会创建新的线程执行任务。

  • workQueue

用来保存等待被执行的任务的阻塞队列. 在JDK中提供了如下阻塞队列:

(1) ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
    (2) LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
    (3) SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
    (4) priorityBlockingQuene:具有优先级的无界阻塞队列;

  • keepAliveTime

空闲线程的保活时间,如果某线程的空闲时间超过这个值都没有任务给它做,那么可以被关闭了。注意这个值并不会对所有线程起作用,如果线程池中的线程数少于等于核心线程数 corePoolSize,那么这些线程不会因为空闲太长时间而被关闭,当然,也可以通过调用 allowCoreThreadTimeOut(true)使核心线程数内的线程也可以被回收;默认情况下,该参数只在线程数大于corePoolSize时才有用, 超过这个时间的空闲线程将被终止。

  • unit

keepAliveTime的单位

  • threadFactory

用于生成线程,一般我们可以用默认的就可以了。通常,我们可以通过它将我们的线程的名字设置得比较可读一些,如 Message-Thread-1, Message-Thread-2 类似这样。

  • handler

线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:

AbortPolicy:直接抛出异常,默认策略;
      CallerRunsPolicy:用调用者所在的线程来执行任务;
      DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
      DiscardPolicy:直接丢弃任务;
    当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

拒绝策略

    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
    
    
     public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
    
    
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {

        public DiscardOldestPolicy() { }


        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
    
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }
      
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

属性

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 这里 COUNT_BITS 设置为 29(32-3),意味着前三位用于存放线程状态,后29位用于存放线程数
private static final int COUNT_BITS = Integer.SIZE - 3;

// 000 11111111111111111111111111111
// 这里得到的是 29 个 1,也就是说线程池的最大线程数是 2^29-1=536870911
// 以我们现在计算机的实际情况,这个数量还是够用的
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 我们说了,线程池的状态存放在高 3 位中
// 运算结果为 111跟29个0:111 00000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;
// 000 00000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;

// 将整数 c 的低 29 位修改为 0,就得到了线程池的状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 将整数 c 的高 3 为修改为 0,就得到了线程池中的线程数
private static int workerCountOf(int c)  { return c & CAPACITY; }

private static int ctlOf(int rs, int wc) { return rs | wc; }

private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

属性主要是通过描述线程池的状态,

  • RUNNING:这个没什么好说的,这是最正常的状态:接受新的任务,处理等待队列中的任务
  • SHUTDOWN:不接受新的任务提交,但是会继续处理等待队列中的任务
  • STOP:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程
  • TIDYING:所有的任务都销毁了,workCount 为 0。线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()
  • TERMINATED:terminated() 方法结束后,线程池的状态就会变成这个

在这里插入图片描述

Worker

将执行的任务封装成一个Woker,内部通过引用真正执行任务的线程和实际的任务。

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {

        /** Thread this worker is running in.  Null if factory fails. */
        // 真正执行任务的线程
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        // 实际的任务,
        Runnable firstTask;
        /** Per-thread task counter */
        // 记录此线程完成的任务数
        volatile long completedTasks;

        /**
         * 将运行任务封装成worker
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //调用线程工作创建一个新的线程,创建的线程到时候用来执行任务
            this.thread = getThreadFactory().newThread(this);
        }

        //任务实际执行的方法
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
    }

execute

判断是否创建线程

    public void execute(Runnable command) {
        // 若任务为空,则抛 NPE,不能执行空任务
        if (command == null) {
            throw new NullPointerException();
        }
        // 表示线程数的状态和线程数的整数
        int c = ctl.get();
        // 若工作线程数小于核心线程数,则创建新的线程,并把当前任务 command 作为这个线程的第一个任务
        //一个新创建的线程池,里面没有线程
        if (workerCountOf(c) < corePoolSize) {
            //如果当前线程小于核心线程数,则创建核心线程数
            if (addWorker(command, true)) {
                return;
            }
            //添加核心线程数失败了,重新获取。可能存在同时多个线程任务创建任务,A,B,C
            c = ctl.get();
        }
        /**
         * 至此,有以下两种情况:
         * 1.当前工作线程数大于等于核心线程数 
         * 2.新建核心线程失败
         * 此时会尝试将任务添加到阻塞队列 workQueue
         */
        // 若线程池处于 RUNNING 状态,将任务添加到阻塞队列 workQueue 中
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次检查线程池标记
            int recheck = ctl.get();
            // 如果线程池已不处于 RUNNING 状态,那么移除已入队的任务,并且执行拒绝策略
            if (!isRunning(recheck) && remove(command)) {
                // 任务添加到阻塞队列失败,执行拒绝策略
                reject(command);
            }
            // 如果线程池还是 RUNNING 的,并且线程数为 0,那么开启新的线程
            else if (workerCountOf(recheck) == 0) {
                addWorker(null, false);
            }
        }
        /**
         * 至此,有以下两种情况:
         * 1.线程池处于非运行状态,线程池不再接受新的线程
         * 2.线程处于运行状态,但是阻塞队列已满,无法加入到阻塞队列
         * 此时会尝试以最大线程数为限制创建新的工作线程
         */
        else if (!addWorker(command, false)) {
            // 任务进入线程池失败,执行拒绝策略
            reject(command);
        }
    }

执行任务是通过threadPool.execute(runnable); 进行执行的,这里其实就是核心的流程。

addWorker

主要的作用就是将要执行的任务添加到set中

    //第一个参数是要执行的任务,第二个参数是是否是核心线程
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        //经过判断处理,给工作线程数+1
        for (;;) {
            int c = ctl.get(); //线程个数
            int rs = runStateOf(c);//线程池运行状态

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                    !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
                //线程池状态stop或者其他状态
                return false;
            }

            for (;;) {
                int wc = workerCountOf(c);
                //1.目前线程个数超过总容量
                //2.大于核心线程或者大于最大线程数 返回true
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //使用CAS 添加线程数
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                //重新获取状态
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        //
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //将任务封装成一个Worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //加锁 获取线程池的全局锁,避免添加任务时,避免线程之间竞争,
                //避免中断当前执行的任务。
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //将工作线程保存在集合中 使用hashSet保存
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            //记录最大值
                            largestPoolSize = s;
                        workerAdded = true; //添加工作线程成功
                    }
                } finally {
                    mainLock.unlock();
                }
                // 如果成功添加了 Worker,就可以启动 Worker 了
                if (workerAdded) {
                    t.start(); //启动工作线程
                    workerStarted = true; //启动工作线程成功
                }
            }
        } finally {
            if (! workerStarted) //启动失败
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Worker

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        // 真正执行任务的线程
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        // 实际的任务,
        Runnable firstTask;
        /** Per-thread task counter */
        // 记录此线程完成的任务数
        volatile long completedTasks;

        /**
         * 将运行任务封装成worker
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //调用线程工作创建一个新的线程,创建的线程到时候用来执行任务
            this.thread = getThreadFactory().newThread(this);
        }

        //任务实际执行的方法
        //因为上层是通过firstTask将要执行的任务传递进来的,所以具体执行就是run 
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
 }

runWorker

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //执行任务的地方 如果获取不到任务就阻塞
            //getTask()方法
            while (task != null || (task = getTask()) != null) {
                w.lock(); //加锁
                //判断当前线程是否大于stop 被拒
                //
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) {
                    wt.interrupt();
                }
                try {
                    //执行任务前的操作
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //这里其实就是执行Worker的run方法
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //执行任务后的操作
                        afterExecute(task, thrown);
                    }
                } finally {
                    //任务执行完毕,接着从队列中拉去任务执行
                    task = null;
                    //计算完成的任务
                    w.completedTasks++;
                    //释放锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //getTask返回了null,非核心的指定时间没有获取到任务超过设置的时候,woker结束
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask

    //1,核心线程不会销毁,所以一直会等待获取任务
    //2.非核心超过了设置的等待时间
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

小结

在这里插入图片描述

在这里插入图片描述


http://www.kler.cn/a/155930.html

相关文章:

  • 【最新版】Stable Diffusion4.9(AI绘画)下载及安装教程(附软件安装包)!
  • linux设置主机名
  • 《MYSQL45讲》误删数据怎么办
  • 九州未来再度入选2024边缘计算TOP100
  • 万字长文解读深度学习——ViT、ViLT、DiT
  • 网络安全-蓝队基础
  • 从零构建属于自己的GPT系列3:模型训练2(训练函数解读、模型训练函数解读、代码逐行解读)
  • 算法复习,数据结构 ,算法特性,冒泡法动态演示,复杂度,辗转相除法*,寻找最大公因数
  • Win中Redis部署与配置
  • SCAU:判断点是否在圆上
  • QWebChannel 是 Qt 框架中用于在 Web 页面和 Qt 应用程序之间进行通信的类
  • 【doccano】文本标注工具——属性级情感分析标注自己的业务数据
  • 使用SLS日志服务采集Kong网关的日志
  • c语言编程题经典100例——(41~45例)
  • Android textView 显示: STRING_TOO_LARGE
  • 23.12.3日总结
  • 鸿蒙工具DevEco Studio调试Build task failed. Open the Run window to view details.
  • 讲一讲redis的使用
  • WordPress外贸站优化工具,WordPress外贸SEO优化方法
  • iOS Class Guard 成功了,但无法区分差异
  • ssm医药进出口交易系统源码和论文
  • 移除元素、合并两个有序数组(leetcode)
  • 人工智能(pytorch)搭建模型21-基于pytorch搭建卷积神经网络VoVNetV2模型,并利用简单数据进行快速训练
  • Stable Diffusion 系列教程 - 1 基础准备(针对新手)
  • 浅析SD-WAN技术如何加强企业网络安全
  • YOLOv8 区域计数 | 入侵检测 | 人员闯入