【线程池】springboot线程池的底层设计原理
【线程池】springboot线程池的底层设计原理
- 【一】Java 线程池基础
- 【1】ThreadPoolExecutor 核心参数
- 【2】ThreadPoolExecutor 工作流程
- 【二】Spring Boot 中的线程池配置
- 【1】配置类方式
- 【2】配置文件方式
- 【三】Spring Boot 线程池的使用
- 【1】启用异步支持
- 【2】异步方法示例
- 【四】Spring Boot 线程池的底层调度机制
- 【五】线程池监控和调优
- 【六】常见底层原理问题
- 【1】线程池里的线程是怎么创建的?
- 【2】线程池里的线程是怎么阻塞的?
- 【3】线程池里的线程是怎么唤醒的?
- 【4】线程池里的线程是怎么回收的?
- 【5】线程池如何实现线程复用?
- 【6】线程池如何知道一个线程的任务已经执行完成
在 Spring Boot 中,线程池是一个重要的组件,用于管理和调度线程,提高应用程序的性能和资源利用率。下面将详细汇总 Spring Boot 线程池的底层原理。
【一】Java 线程池基础
Spring Boot 的线程池底层基于 Java 的 java.util.concurrent 包中的线程池实现,主要涉及到 ThreadPoolExecutor 类。理解 ThreadPoolExecutor 的工作原理是掌握 Spring Boot 线程池的基础。
【1】ThreadPoolExecutor 核心参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
(1)corePoolSize:核心线程数,线程池初始化时创建的线程数量,当有新任务提交时,会优先使用核心线程来执行任务。
(2)maximumPoolSize:最大线程数,线程池允许创建的最大线程数量。当核心线程都在执行任务,且任务队列已满时,会创建新的线程直到达到最大线程数。
(3)keepAliveTime:线程空闲时间,当线程空闲时间超过该值时,非核心线程会被销毁。
(4)unit:空闲时间的时间单位,如 TimeUnit.SECONDS。
(5)workQueue:任务队列,用于存储等待执行的任务。常见的任务队列有 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue 等。
(6)threadFactory:线程工厂,用于创建线程。可以自定义线程工厂来设置线程的名称、优先级等属性。
(7)handler:拒绝策略,当任务队列已满且线程数达到最大线程数时,新提交的任务会触发拒绝策略。常见的拒绝策略有 AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy 等。
【2】ThreadPoolExecutor 工作流程
(1)当有新任务提交时,首先检查核心线程数是否已满。如果核心线程数未满,则创建新的核心线程来执行任务。
(2)如果核心线程数已满,将任务放入任务队列中等待执行。
如果任务队列已满,检查线程数是否达到最大线程数。如果未达到最大线程数,则创建新的(3)非核心线程来执行任务。
(4)如果线程数达到最大线程数,且任务队列已满,新提交的任务会触发拒绝策略。
【二】Spring Boot 中的线程池配置
在 Spring Boot 中,可以通过配置类或配置文件来创建和配置线程池。
【1】配置类方式
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
public class ThreadPoolConfig {
@Bean("customThreadPool")
public Executor customThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5); // 核心线程数
executor.setMaxPoolSize(10); // 最大线程数
executor.setQueueCapacity(20); // 任务队列容量
executor.setKeepAliveSeconds(60); // 线程空闲时间
executor.setThreadNamePrefix("custom-thread-"); // 线程名称前缀
executor.setRejectedExecutionHandler(new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略
executor.initialize();
return executor;
}
}
【2】配置文件方式
在 application.properties 或 application.yml 中配置线程池属性:
spring.task.execution.pool.core-size=5
spring.task.execution.pool.max-size=10
spring.task.execution.pool.queue-capacity=20
spring.task.execution.pool.keep-alive=60s
spring.task.execution.thread-name-prefix=custom-thread-
【三】Spring Boot 线程池的使用
在 Spring Boot 中,可以通过 @Async 注解来异步执行方法,使用线程池来管理这些异步任务。
【1】启用异步支持
在主应用类上添加 @EnableAsync 注解来启用异步支持:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication
@EnableAsync
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
【2】异步方法示例
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Service
public class AsyncService {
@Async("customThreadPool")
public void asyncMethod() {
// 异步执行的方法逻辑
System.out.println("异步方法执行,线程名称:" + Thread.currentThread().getName());
}
}
【四】Spring Boot 线程池的底层调度机制
Spring Boot 的线程池调度主要依赖于 ThreadPoolTaskExecutor 类,它是对 ThreadPoolExecutor 的封装。当调用 @Async 注解的方法时,Spring 会将任务提交给线程池进行处理。
(1)任务提交
ThreadPoolTaskExecutor 提供了 execute 和 submit 方法来提交任务:
execute 方法用于提交 Runnable 任务,没有返回值。
submit 方法可以提交 Runnable 或 Callable 任务,并且可以返回一个 Future 对象,用于获取任务的执行结果。
(2)任务调度
当任务提交到线程池后,线程池会根据 ThreadPoolExecutor 的工作流程进行任务调度。如果核心线程有空闲,会优先使用核心线程执行任务;如果核心线程已满,任务会被放入任务队列;如果任务队列已满,会创建新的非核心线程;如果线程数达到最大线程数,会触发拒绝策略。
【五】线程池监控和调优
在 Spring Boot 中,可以通过 Actuator 来监控线程池的状态,例如线程池的活跃线程数、任务队列大小等。同时,可以根据监控结果对线程池的参数进行调优,以提高应用程序的性能。
(1)线程池监控和调优
在 Spring Boot 中,可以通过 Actuator 来监控线程池的状态,例如线程池的活跃线程数、任务队列大小等。同时,可以根据监控结果对线程池的参数进行调优,以提高应用程序的性能。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
(2)监控线程池
通过访问 /actuator/metrics 端点可以查看线程池的相关指标,例如 executor.active 表示活跃线程数,executor.completed 表示已完成的任务数等。
综上所述,Spring Boot 线程池的底层原理基于 Java 的 ThreadPoolExecutor,通过配置和使用 ThreadPoolTaskExecutor 来实现任务的异步执行和调度。同时,可以通过 Actuator 对线程池进行监控和调优,以提高应用程序的性能和稳定性。
【六】常见底层原理问题
【1】线程池里的线程是怎么创建的?
当有新任务提交到线程池时,线程池会根据当前线程数量和配置参数决定是否创建新线程。若当前线程数小于核心线程数(corePoolSize),会直接创建新的核心线程;若核心线程已满,则将任务放入任务队列;若任务队列已满且线程数小于最大线程数(maximumPoolSize),会创建非核心线程。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取线程池的控制状态(包含线程数和运行状态)
int c = ctl.get();
// 如果当前线程数小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 尝试创建新线程执行任务
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果线程池在运行且任务能放入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次检查线程池状态,如果已停止则移除任务并拒绝
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程数为 0,创建一个无初始任务的线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果任务队列已满,尝试创建非核心线程
else if (!addWorker(command, false))
// 创建失败则拒绝任务
reject(command);
}
addWorker方法,这个方法负责创建新的工作线程。Worker类是继承AQS,同时实现了Runnable,每个Worker持有一个线程,在run方法里不断从队列里取任务执行。
// 线程创建(addWorker)
private boolean addWorker(Runnable firstTask, boolean core) {
// 创建 Worker 对象(包含 Thread 和初始任务)
Worker w = new Worker(firstTask);
Thread t = w.thread;
workers.add(w); // 加入线程集合
t.start(); // 启动线程
}
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this); // 核心执行循环
}
}
// 任务执行循环
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
while (task != null || (task = getTask()) != null) {
try {
task.run(); // 执行任务
} finally {
task = null; // 清空任务引用
}
}
processWorkerExit(w, false); // 线程回收处理
}
【2】线程池里的线程是怎么阻塞的?
当线程执行完一个任务后,会通过getTask()方法从任务队列中获取下一个任务。若任务队列为空,阻塞队列的take()会阻塞线程,线程会进入阻塞状态等待新任务。而如果线程池正在关闭,可能就会返回null,导致Worker退出循环,线程结束。
ThreadPoolExecutor 中的线程通过 workQueue.take() 或 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方法获取任务,这两个方法在队列为空时会使线程阻塞。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许线程被中断
boolean completedAbruptly = true;
try {
// 循环获取任务并执行
while (task != null || (task = getTask()) != null) {
w.lock();
// 检查线程池状态,确保线程在需要时被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行任务前的钩子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行任务后的钩子方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 处理线程退出逻辑
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false; // 记录上次 poll 是否超时
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 检查线程池状态和任务队列情况
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 判断线程是否需要超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 根据是否需要超时控制选择获取任务的方法
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
// 响应中断信号
timedOut = false;
}
}
}
当队列为空时,线程通过workQueue.take()阻塞
【3】线程池里的线程是怎么唤醒的?
当有新任务提交到任务队列时,会唤醒处于阻塞状态的线程。任务队列(如 LinkedBlockingQueue)使用 ReentrantLock 和 Condition 来实现线程的阻塞和唤醒机制。新任务加入队列时,会调用 Condition 的 signal() 或 signalAll() 方法唤醒等待的线程。
以 LinkedBlockingQueue 的 offer 方法为例:
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 如果队列已满,返回 false
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 再次检查队列是否未满
if (count.get() < capacity) {
// 将任务加入队列
enqueue(node);
c = count.getAndIncrement();
// 如果队列还有空间,唤醒等待放入任务的线程
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
// 如果之前队列为空,唤醒等待获取任务的线程
if (c == 0)
signalNotEmpty();
return c >= 0;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 唤醒等待获取任务的线程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
当队列为空时,线程通过workQueue.take()阻塞
【4】线程池里的线程是怎么回收的?
线程池根据线程的空闲时间和配置参数回收线程。当线程空闲时间超过 keepAliveTime,且线程数大于核心线程数时,非核心线程会被回收。在 getTask 方法中,若使用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方法获取任务超时,线程会退出 runWorker 循环,最终被回收。
在 getTask 方法中:
private Runnable getTask() {
boolean timedOut = false; // 记录上次 poll 是否超时
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 检查线程池状态和任务队列情况
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 判断线程是否需要超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 减少线程计数
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 根据是否需要超时控制选择获取任务的方法
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
【5】线程池如何实现线程复用?
线程池实现线程复用的核心在于避免频繁地创建和销毁线程,而是让线程在完成一个任务后继续去执行其他任务。下面详细介绍其底层原理:
(1)线程的创建和管理
线程池在初始化时会根据配置创建一定数量的核心线程。这些线程在创建后不会立即销毁,而是进入等待状态,随时准备执行新的任务。线程池使用一个内部的数据结构(通常是一个集合)来管理这些线程。
(2)任务队列
线程池中有一个任务队列,用于存储待执行的任务。当有新任务提交时,如果核心线程都在忙碌,任务会被放入任务队列中等待执行。
(3)线程的执行逻辑
每个线程在启动后会进入一个循环,不断从任务队列中获取任务并执行。当任务队列为空时,线程会进入阻塞状态,等待新任务的到来。一旦有新任务被添加到队列中,线程会被唤醒并继续执行任务。
(4)线程的复用
线程在完成一个任务后,不会终止,而是继续从任务队列中获取下一个任务。这样,同一个线程可以执行多个不同的任务,实现了线程的复用。
【6】线程池如何知道一个线程的任务已经执行完成
(1)基于任务执行逻辑
线程池中的线程在执行任务时,通常会遵循特定的执行逻辑。一般来说,线程会从任务队列中取出一个任务并调用其 run 方法(对于 Runnable 任务)或 call 方法(对于 Callable 任务),当这个方法正常返回或者抛出异常结束时,就意味着该任务执行完成,FutureTask会标记任务为完成,并调用done()方法。可以通过Future的isDone()方法来判断状态。
(2)线程池的状态管理
线程池内部会对线程和任务的状态进行管理。每个线程在执行任务时,线程池会记录该线程的状态,当任务执行完成后,线程池会更新相应的状态信息。
原理分析
1-任务队列操作:线程从任务队列中取出任务执行,当任务完成后,线程池会知道该任务已经从队列中移除并执行完毕。
2-线程状态标记:线程池可以通过内部的状态标记来记录线程是处于忙碌状态(正在执行任务)还是空闲状态(任务执行完成)。
(3)回调机制
有些线程池实现会使用回调机制来通知任务执行完成。例如,Future 接口和 CompletableFuture 类就提供了这样的功能。
bmit 方法返回一个 Future 对象,通过调用 isDone 方法可以检查任务是否执行完成。当 isDone 方法返回 true 时,就表示任务已经执行完成,可以通过 get 方法获取任务的执行结果。
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
protected void done() {
// 任务完成时回调(可用于异步通知)
}
public boolean isDone() {
return state != NEW; // 任何非NEW状态都表示完成
}
}
通过FutureTask的状态机跟踪任务生命周期
isDone()方法检查state是否为非NEW状态