当前位置: 首页 > article >正文

【线程池】springboot线程池的底层设计原理

【线程池】springboot线程池的底层设计原理

  • 【一】Java 线程池基础
    • 【1】ThreadPoolExecutor 核心参数
    • 【2】ThreadPoolExecutor 工作流程
  • 【二】Spring Boot 中的线程池配置
    • 【1】配置类方式
    • 【2】配置文件方式
  • 【三】Spring Boot 线程池的使用
    • 【1】启用异步支持
    • 【2】异步方法示例
  • 【四】Spring Boot 线程池的底层调度机制
  • 【五】线程池监控和调优
  • 【六】常见底层原理问题
    • 【1】线程池里的线程是怎么创建的?
    • 【2】线程池里的线程是怎么阻塞的?
    • 【3】线程池里的线程是怎么唤醒的?
    • 【4】线程池里的线程是怎么回收的?
    • 【5】线程池如何实现线程复用?
    • 【6】线程池如何知道一个线程的任务已经执行完成

在 Spring Boot 中,线程池是一个重要的组件,用于管理和调度线程,提高应用程序的性能和资源利用率。下面将详细汇总 Spring Boot 线程池的底层原理。

【一】Java 线程池基础

Spring Boot 的线程池底层基于 Java 的 java.util.concurrent 包中的线程池实现,主要涉及到 ThreadPoolExecutor 类。理解 ThreadPoolExecutor 的工作原理是掌握 Spring Boot 线程池的基础。

【1】ThreadPoolExecutor 核心参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

(1)corePoolSize:核心线程数,线程池初始化时创建的线程数量,当有新任务提交时,会优先使用核心线程来执行任务。
(2)maximumPoolSize:最大线程数,线程池允许创建的最大线程数量。当核心线程都在执行任务,且任务队列已满时,会创建新的线程直到达到最大线程数。
(3)keepAliveTime:线程空闲时间,当线程空闲时间超过该值时,非核心线程会被销毁。
(4)unit:空闲时间的时间单位,如 TimeUnit.SECONDS。
(5)workQueue:任务队列,用于存储等待执行的任务。常见的任务队列有 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue 等。
(6)threadFactory:线程工厂,用于创建线程。可以自定义线程工厂来设置线程的名称、优先级等属性。
(7)handler:拒绝策略,当任务队列已满且线程数达到最大线程数时,新提交的任务会触发拒绝策略。常见的拒绝策略有 AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy 等。

【2】ThreadPoolExecutor 工作流程

(1)当有新任务提交时,首先检查核心线程数是否已满。如果核心线程数未满,则创建新的核心线程来执行任务。
(2)如果核心线程数已满,将任务放入任务队列中等待执行。
如果任务队列已满,检查线程数是否达到最大线程数。如果未达到最大线程数,则创建新的(3)非核心线程来执行任务。
(4)如果线程数达到最大线程数,且任务队列已满,新提交的任务会触发拒绝策略。

【二】Spring Boot 中的线程池配置

在 Spring Boot 中,可以通过配置类或配置文件来创建和配置线程池。

【1】配置类方式

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
public class ThreadPoolConfig {

    @Bean("customThreadPool")
    public Executor customThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5); // 核心线程数
        executor.setMaxPoolSize(10); // 最大线程数
        executor.setQueueCapacity(20); // 任务队列容量
        executor.setKeepAliveSeconds(60); // 线程空闲时间
        executor.setThreadNamePrefix("custom-thread-"); // 线程名称前缀
        executor.setRejectedExecutionHandler(new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略
        executor.initialize();
        return executor;
    }
}

【2】配置文件方式

在 application.properties 或 application.yml 中配置线程池属性:

spring.task.execution.pool.core-size=5
spring.task.execution.pool.max-size=10
spring.task.execution.pool.queue-capacity=20
spring.task.execution.pool.keep-alive=60s
spring.task.execution.thread-name-prefix=custom-thread-

【三】Spring Boot 线程池的使用

在 Spring Boot 中,可以通过 @Async 注解来异步执行方法,使用线程池来管理这些异步任务。

【1】启用异步支持

在主应用类上添加 @EnableAsync 注解来启用异步支持:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

【2】异步方法示例

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class AsyncService {

    @Async("customThreadPool")
    public void asyncMethod() {
        // 异步执行的方法逻辑
        System.out.println("异步方法执行,线程名称:" + Thread.currentThread().getName());
    }
}

【四】Spring Boot 线程池的底层调度机制

Spring Boot 的线程池调度主要依赖于 ThreadPoolTaskExecutor 类,它是对 ThreadPoolExecutor 的封装。当调用 @Async 注解的方法时,Spring 会将任务提交给线程池进行处理。

(1)任务提交
ThreadPoolTaskExecutor 提供了 execute 和 submit 方法来提交任务:
execute 方法用于提交 Runnable 任务,没有返回值。
submit 方法可以提交 Runnable 或 Callable 任务,并且可以返回一个 Future 对象,用于获取任务的执行结果。

(2)任务调度
当任务提交到线程池后,线程池会根据 ThreadPoolExecutor 的工作流程进行任务调度。如果核心线程有空闲,会优先使用核心线程执行任务;如果核心线程已满,任务会被放入任务队列;如果任务队列已满,会创建新的非核心线程;如果线程数达到最大线程数,会触发拒绝策略。

【五】线程池监控和调优

在 Spring Boot 中,可以通过 Actuator 来监控线程池的状态,例如线程池的活跃线程数、任务队列大小等。同时,可以根据监控结果对线程池的参数进行调优,以提高应用程序的性能。

(1)线程池监控和调优
在 Spring Boot 中,可以通过 Actuator 来监控线程池的状态,例如线程池的活跃线程数、任务队列大小等。同时,可以根据监控结果对线程池的参数进行调优,以提高应用程序的性能。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

(2)监控线程池
通过访问 /actuator/metrics 端点可以查看线程池的相关指标,例如 executor.active 表示活跃线程数,executor.completed 表示已完成的任务数等。
综上所述,Spring Boot 线程池的底层原理基于 Java 的 ThreadPoolExecutor,通过配置和使用 ThreadPoolTaskExecutor 来实现任务的异步执行和调度。同时,可以通过 Actuator 对线程池进行监控和调优,以提高应用程序的性能和稳定性。

【六】常见底层原理问题

【1】线程池里的线程是怎么创建的?

当有新任务提交到线程池时,线程池会根据当前线程数量和配置参数决定是否创建新线程。若当前线程数小于核心线程数(corePoolSize),会直接创建新的核心线程;若核心线程已满,则将任务放入任务队列;若任务队列已满且线程数小于最大线程数(maximumPoolSize),会创建非核心线程。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 获取线程池的控制状态(包含线程数和运行状态)
    int c = ctl.get();
    // 如果当前线程数小于核心线程数
    if (workerCountOf(c) < corePoolSize) { 
        // 尝试创建新线程执行任务
        if (addWorker(command, true)) 
            return;
        c = ctl.get();
    }
    // 如果线程池在运行且任务能放入队列
    if (isRunning(c) && workQueue.offer(command)) { 
        int recheck = ctl.get();
        // 再次检查线程池状态,如果已停止则移除任务并拒绝
        if (! isRunning(recheck) && remove(command)) 
            reject(command);
        // 如果线程数为 0,创建一个无初始任务的线程
        else if (workerCountOf(recheck) == 0) 
            addWorker(null, false);
    }
    // 如果任务队列已满,尝试创建非核心线程
    else if (!addWorker(command, false)) 
        // 创建失败则拒绝任务
        reject(command); 
}

addWorker方法,这个方法负责创建新的工作线程。Worker类是继承AQS,同时实现了Runnable,每个Worker持有一个线程,在run方法里不断从队列里取任务执行。

// 线程创建(addWorker)
private boolean addWorker(Runnable firstTask, boolean core) {
    // 创建 Worker 对象(包含 Thread 和初始任务)
    Worker w = new Worker(firstTask);
    Thread t = w.thread;
    
    workers.add(w); // 加入线程集合
    t.start();      // 启动线程
}

private final class Worker extends AbstractQueuedSynchronizer 
    implements Runnable {
    final Thread thread;
    Runnable firstTask;
    
    Worker(Runnable firstTask) {
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    
    public void run() {
        runWorker(this); // 核心执行循环
    }
}
// 任务执行循环
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    
    while (task != null || (task = getTask()) != null) {
        try {
            task.run(); // 执行任务
        } finally {
            task = null; // 清空任务引用
        }
    }
    processWorkerExit(w, false); // 线程回收处理
}

【2】线程池里的线程是怎么阻塞的?

当线程执行完一个任务后,会通过getTask()方法从任务队列中获取下一个任务。若任务队列为空,阻塞队列的take()会阻塞线程,线程会进入阻塞状态等待新任务。而如果线程池正在关闭,可能就会返回null,导致Worker退出循环,线程结束。

ThreadPoolExecutor 中的线程通过 workQueue.take() 或 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方法获取任务,这两个方法在队列为空时会使线程阻塞。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 允许线程被中断
    boolean completedAbruptly = true;
    try {
        // 循环获取任务并执行
        while (task != null || (task = getTask()) != null) { 
            w.lock();
            // 检查线程池状态,确保线程在需要时被中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 执行任务前的钩子方法
                beforeExecute(wt, task); 
                Throwable thrown = null;
                try {
                    // 执行任务
                    task.run(); 
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 执行任务后的钩子方法
                    afterExecute(task, thrown); 
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 处理线程退出逻辑
        processWorkerExit(w, completedAbruptly); 
    }
}

private Runnable getTask() {
    boolean timedOut = false; // 记录上次 poll 是否超时

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 检查线程池状态和任务队列情况
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 判断线程是否需要超时控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 根据是否需要超时控制选择获取任务的方法
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            // 响应中断信号
            timedOut = false;
        }
    }
}

当队列为空时,线程通过workQueue.take()阻塞

【3】线程池里的线程是怎么唤醒的?

当有新任务提交到任务队列时,会唤醒处于阻塞状态的线程。任务队列(如 LinkedBlockingQueue)使用 ReentrantLock 和 Condition 来实现线程的阻塞和唤醒机制。新任务加入队列时,会调用 Condition 的 signal() 或 signalAll() 方法唤醒等待的线程。

以 LinkedBlockingQueue 的 offer 方法为例:

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    // 如果队列已满,返回 false
    if (count.get() == capacity) 
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        // 再次检查队列是否未满
        if (count.get() < capacity) { 
            // 将任务加入队列
            enqueue(node); 
            c = count.getAndIncrement();
            // 如果队列还有空间,唤醒等待放入任务的线程
            if (c + 1 < capacity) 
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    // 如果之前队列为空,唤醒等待获取任务的线程
    if (c == 0) 
        signalNotEmpty();
    return c >= 0;
}

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        // 唤醒等待获取任务的线程
        notEmpty.signal(); 
    } finally {
        takeLock.unlock();
    }
}

当队列为空时,线程通过workQueue.take()阻塞

【4】线程池里的线程是怎么回收的?

线程池根据线程的空闲时间和配置参数回收线程。当线程空闲时间超过 keepAliveTime,且线程数大于核心线程数时,非核心线程会被回收。在 getTask 方法中,若使用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方法获取任务超时,线程会退出 runWorker 循环,最终被回收。

在 getTask 方法中:

private Runnable getTask() {
    boolean timedOut = false; // 记录上次 poll 是否超时

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 检查线程池状态和任务队列情况
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 判断线程是否需要超时控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 减少线程计数
            if (compareAndDecrementWorkerCount(c)) 
                return null;
            continue;
        }

        try {
            // 根据是否需要超时控制选择获取任务的方法
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

【5】线程池如何实现线程复用?

线程池实现线程复用的核心在于避免频繁地创建和销毁线程,而是让线程在完成一个任务后继续去执行其他任务。下面详细介绍其底层原理:
(1)线程的创建和管理
线程池在初始化时会根据配置创建一定数量的核心线程。这些线程在创建后不会立即销毁,而是进入等待状态,随时准备执行新的任务。线程池使用一个内部的数据结构(通常是一个集合)来管理这些线程。
(2)任务队列
线程池中有一个任务队列,用于存储待执行的任务。当有新任务提交时,如果核心线程都在忙碌,任务会被放入任务队列中等待执行。
(3)线程的执行逻辑
每个线程在启动后会进入一个循环,不断从任务队列中获取任务并执行。当任务队列为空时,线程会进入阻塞状态,等待新任务的到来。一旦有新任务被添加到队列中,线程会被唤醒并继续执行任务。
(4)线程的复用
线程在完成一个任务后,不会终止,而是继续从任务队列中获取下一个任务。这样,同一个线程可以执行多个不同的任务,实现了线程的复用。

【6】线程池如何知道一个线程的任务已经执行完成

(1)基于任务执行逻辑
线程池中的线程在执行任务时,通常会遵循特定的执行逻辑。一般来说,线程会从任务队列中取出一个任务并调用其 run 方法(对于 Runnable 任务)或 call 方法(对于 Callable 任务),当这个方法正常返回或者抛出异常结束时,就意味着该任务执行完成,FutureTask会标记任务为完成,并调用done()方法。可以通过Future的isDone()方法来判断状态。

(2)线程池的状态管理
线程池内部会对线程和任务的状态进行管理。每个线程在执行任务时,线程池会记录该线程的状态,当任务执行完成后,线程池会更新相应的状态信息。

原理分析
1-任务队列操作:线程从任务队列中取出任务执行,当任务完成后,线程池会知道该任务已经从队列中移除并执行完毕。
2-线程状态标记:线程池可以通过内部的状态标记来记录线程是处于忙碌状态(正在执行任务)还是空闲状态(任务执行完成)。

(3)回调机制
有些线程池实现会使用回调机制来通知任务执行完成。例如,Future 接口和 CompletableFuture 类就提供了这样的功能。

bmit 方法返回一个 Future 对象,通过调用 isDone 方法可以检查任务是否执行完成。当 isDone 方法返回 true 时,就表示任务已经执行完成,可以通过 get 方法获取任务的执行结果。

public class FutureTask<V> implements RunnableFuture<V> {
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    protected void done() { 
        // 任务完成时回调(可用于异步通知)
    }

    public boolean isDone() {
        return state != NEW; // 任何非NEW状态都表示完成
    }
}

通过FutureTask的状态机跟踪任务生命周期
isDone()方法检查state是否为非NEW状态


http://www.kler.cn/a/561948.html

相关文章:

  • React七Formik
  • 免费使用 DeepSeek API 教程及资源汇总
  • BigDecimal线上异常解决方案:避免科学计数法输出的坑
  • 【Uniapp-Vue3】导入uni-id用户体系
  • 《Keras 3 : 使用迁移学习进行关键点检测》:此文为AI自动翻译
  • 「爬虫实战分享:如何高效爬取某汽车官方销售排行榜」
  • Linux 基本开发工具的使用(yum、vim、gcc、g++、gdb、make/makefile)
  • 全市场大模型分类及对比分析报告
  • 深度学习相关名词功能总结
  • 使用 Containerd 通过 HTTP 协议拉取 Harbor 私有镜像仓库的镜像
  • Qt layout
  • 网络安全入门|HTTP慢速攻击的终极防御:零信任与AI对抗
  • C#实现本地AI聊天功能(Deepseek R1及其他模型)。
  • Android 键盘输入按确认或换行 直接触发提交
  • 用AI写游戏3——python实现坦克大战1
  • 网络原理--TCP的特性
  • 中国旅游行业年度报告2024
  • JAVA【微服务】Spring AI 使用详解
  • 【STL专题】优先级队列priority_queue的使用和模拟实现,巧妙利用仿函数解决优先级
  • 操作系统前置汇编知识学习第九天