Java并发10 - ForkJoin并发框架
ForkJoin并发框架
ForkJoin只适用于大型任务且能将任务切分的场景,因此使用场景不多
了解一下即可,太过庞大复杂
文章目录
- ForkJoin并发框架
- 一:ForkJoin框架概述
- 二:ForkJoin框架原理
- 1:ForkJoin框架原理
- 2:成员分析
- 2.1:执行者线程
- 2.2:任务实体
- 2.3:线程池
- 2.3.1:ForkJoinPool构造器
- 2.3.2:ForkJoinPool内部成员
- 2.3.3:内部类WorkQueue工作队列
- 3:invoke任务提交原理
- 4:任务工作原理
- 4.1、工作线程创建及注册原理
- 4.2:工作线程注销原理
- 5:执行任务/工作窃取实现过程
- 6:任务取消实现原理
- 7:线程池的关闭实现
- 三:总结
一:ForkJoin框架概述
ForkJoinPool是一个建立在分治思想上的产物,其采用任务“大拆小”的方式以及工作窃取算法实现并行处理任务。
通俗来说,ForkJoin框架的作用主要是为了实现将大型复杂任务进行递归的分解,直到任务小到指定阈值时才开始执行,从而递归的返回各个小任务的结果汇集成一个大任务的结果,依次类推最终得出最初提交的那个大型复杂任务的结果,这和方法的递归调用思想是一样的。
当然ForkJoinPool线程池为了提高任务的并行度和吞吐量做了非常多而且复杂的设计实现,其中最著名的就是任务窃取机制。
但ForkJoinPool更适合于处理一些大型任务,因此,ForkJoinPool的适用范围不大,仅限于某些密集且能被分解成多个子任务的任务,同时这些子任务运行的结果可以合并成最终结果。
ForkJoin框架主体由三部分组成:
- ForkJoinWorkerThread:任务的执行者,具体的线程实体
- ForkJoinTask:需要执行的任务实体
- ForkJoinPool:管理执行者线程的管理池
ForkJoinPool提交任务的方式也有三种:
- execute():可提交Runnbale类型的任务
- submit():可提交Callable类型的任务
- invoke():可提交ForkJoinTask类型的任务,但ForkJoinTask存在三个子类:
- RecursiveAction:无返回值型ForkJoinTask任务
- RecursiveTask:有返回值型ForkJoinTask任务
- CountedCompleter:任务执行完成后可以触发钩子回调函数的任务
package com.cui.commonboot.myjuc;
import lombok.AllArgsConstructor;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
* <p>
* 功能描述:fork join pool test
* </p>
*
* @author cui haida
* @date 2024/01/04/10:07
*/
public class ForkJoinPoolTest {
public static void main(String[] args) {
testFor();
testForkJoin();
}
private static void testFor() {
Instant startTime = Instant.now();
ArrayList<Integer> list = new ArrayList<>();
for (int id = 1; id <= 1000 * 10000; id++) {
// todo: get data by id from database
list.add(id);
}
Instant endTime = Instant.now();
System.out.println("for method need time is: " + Duration.between(startTime, endTime).toMillis() + "ms");
}
private static void testForkJoin() {
ForkJoinPool forkJoinPool = new ForkJoinPool();
Instant startTime = Instant.now();
List<Integer> invoke = forkJoinPool.invoke(new IdByFindUpdate(1, 1000 * 10000));
Instant endTime = Instant.now();
System.out.println("fork/join method need time is: " + Duration.between(startTime, endTime).toMillis() + "ms");
}
}
@AllArgsConstructor
class IdByFindUpdate extends RecursiveTask<List<Integer>> {
private Integer startID;
private Integer endID;
private static final Integer THRESHOLD = 10000; // threshold value
/**
* 递归计算任务
* @return 计算结果
*/
@Override
protected List<Integer> compute() {
int taskSize = endID - startID;
List<Integer> list = new ArrayList<>();
// if the task size less than threshold value, direct handler all task
if (taskSize <= THRESHOLD) {
for (int id = startID; id <= endID; id++) {
// todo: get data by id from database
list.add(id);
}
return list;
}
// task fork split
IdByFindUpdate leftTask = new IdByFindUpdate(startID, (startID + endID) / 2);
leftTask.fork();
IdByFindUpdate rightTask = new IdByFindUpdate(((startID + endID) / 2) + 1, endID);
rightTask.fork();
// task join
list.addAll(leftTask.join());
list.addAll(rightTask.join());
return list;
}
}
很明显,ForkJoin的执行速度比普通的for循环速度快上三倍左右
但是值得一提的是:如果任务的量级太小,ForkJoin的处理速度反而比不上普通的For循环。
这是因为ForkJoin框架在拆分任务fork阶段于合并结果join阶段需要时间,并且开启多条线程处理任务,CPU切换也需要时间
所以当一个任务fork/join阶段以及CPU切换的时间开销大于原本任务的执行时间时,这种情况下则没有必要使用ForkJoin框架。
ForkJoin的执行时间跟机器硬件配置以及拆分临界值/阈值的设定也有关系,拆分的阈值并不是越小越好,因为阈值越小时,一个任务拆分的小任务也就会越多,而拆分、合并阶段都是需要时间的,所以阈值需要根据机器的具体硬件设施和任务的类型进行合理的计算,这样才能保证任务执行时能够到达最佳状态。
二:ForkJoin框架原理
1:ForkJoin框架原理
在前面提到过,ForkJoin框架是建立在分治思想上的产物,而向FoorkJoinPool中传递一个任务时,任务的执行流程大体如下:
提交的任务会被分割成一个个小的左/右任务,当分割到最小时,会分别执行每个小的任务
执行完成后,会将每个左/右任务的结果进行,从而合并出父级任务的结果,依次类推,直至最终计算出整个任务的最终结果
工作窃取
在ForkJoin框架同样存在这样的情况,某条线程已经执行完成了分配给自己的任务后,有些线程却还在执行并且堆积着很多任务
那么这条已经处理完自己任务的线程则会去“窃取”其他线程的任务执行。【咱们是一个小组的,我干完了就帮你干】
2:成员分析
ForkJoin框架是由三部分组成,分别为:执行者线程、任务实体以及线程池。
2.1:执行者线程
ForkJoinWorkerThread继承了Thread线程类,作为Thread的子类,但是却并没有对线程的调度、执行做改变,只是仅仅增加了一些额外功能。
ForkJoinWorkerThread线程被创建出来后都交由ForkJoinPool线程池管理,并且设置为了守护线程
ForkJoinWorkerThread线程创建出来之后都是被注册到FrokJoinPool线程池,由这些线程来执行用户提交的任务,所以ForkJoinWorkerThread也被称为任务的执行者。
ForkJoinPool线程池与之前的线程池有一点区别在于:之前的线程池中,总共只有一个任务队列,而ForkJoinPool中,每个ForkJoinWorkerThread线程在创建时,都会为它分配一个任务队列。
同时为了实现工作窃取机制,该队列被设计为双向队列,线程执行自身队列中的任务时,采用LIFO的方式获取任务,当其他线程窃取任务时,采用FIFO的方式获取任务。
ForkJoinWorkerThread线程的主要工作为执行自身队列中的任务,其次是窃取其他线程队列中的任务执行。
public class ForkJoinWorkerThread extends Thread {
final ForkJoinPool pool; // 当前线程所属的线程池
final ForkJoinPool.WorkQueue workQueue; // 当前线程的双向任务队列
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// 调用Thread父类的构造函数创建线程实体对象
// 在这里是先暂时使用aFJT作为线程名称,当外部传递线程名称时会替换
super("aForkJoinWorkerThread");
// 当前设置线程池
this.pool = pool;
// 向ForkJoinPool线程池中注册当前线程,为当前线程分配任务队列
this.workQueue = pool.registerWorker(this);
}
// ForkJoinWorkerThread类 → run方法
public void run() {
// 如果队列中有任务
if (workQueue.array == null) {
// 定义异常对象,方便后续记录异常
Throwable exception = null;
try {
// 执行前置钩子函数(预留方法,内部未实现)
onStart();
// 执行工作队列中的任务
pool.runWorker(workQueue);
} catch (Throwable ex) {
// 记录捕获的异常信息
exception = ex;
} finally {
try {
// 对外写出捕获的异常信息
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
// 调用 deregisterWorker 方法进行清理
pool.deregisterWorker(this, exception);
}
}
}
}
// 省略其他代码.....
}
2.2:任务实体
ForkJoinTask与FutrueTask一样,是Futrue接口的子类
ForkJoinTask是一种可以将任务进行递归分解执行,从而提高执行并行度的任务类型,执行结束后也可以支持结果返回。
但ForkJoinTask仅是一个抽象类,子类有三个:
- RecursiveAction:无返回值型ForkJoinTask任务
- RecursiveTask:有返回值型ForkJoinTask任务
- CountedCompleter:任务执行完成后可以触发钩子回调函数的任务
ForkJoinTask的作用就是根据任务的分解实现,将任务进行拆分,以及等待子任务的执行结果合并成父任务的结果。
ForkJoinTask内部存在一个整数类型的成员status
- 该成员高16位记录任务的执行状态,如:如NORMAL、CANCELLED或EXCEPTIONAL
- 低16位预留用于记录用户自定义的任务标签。
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
// 表示任务的执行状态,总共有如下几种值
volatile int status;
// 获取任务状态的掩码,后续用于位计算,判断任务是否正常执行结束
static final int DONE_MASK = 0xf0000000;
// 表示任务正常执行结束
static final int NORMAL = 0xf0000000;
// 表示任务被取消
static final int CANCELLED = 0xc0000000;
// 表示任务出现异常结束
static final int EXCEPTIONAL = 0x80000000;
// 表示当前任务被别的任务依赖,在结束前会通知其他任务join结果
static final int SIGNAL = 0x00010000;
// 低16位掩码,预留占位(short mask)
// setForkJoinTaskTag方法中应用了该成员,但这个方法没实现/应用
static final int SMASK = 0x0000ffff;
// 异常哈希链表数组(异常哈希表,类似于hashmap1.8之前的实现)
// 因为任务拆分之后会很多,异常信息要么都没有,要么都会出现
// 所以不直接记录在ForkJoinTask对象中,而是采用哈希表结构存储弱引用类型的节点
// 注意这些都是 static 类属性,所有的ForkJoinTask共用的
private static final ExceptionNode[] exceptionTable;
private static final ReentrantLock exceptionTableLock;
// 在ForkJoinTask的node被GC回收之后,相应的异常节点对象的引用队列
private static final ReferenceQueue<Object> exceptionTableRefQueue;
/**
* 固定容量的exceptionTable(代表数组长度为32,下标存储链表头节点)
*/
private static final int EXCEPTION_MAP_CAPACITY = 32;
// 内部节点类:异常数组存储的元素:
// 数组是固定长度,这样方便外部访问
// 但是为了保证内存可用性,所以是弱引用类型
// 因为不能确定任务的最后一个join何时完成,所以在下次GC发生时会被回收
// 在GC回收后,这些异常信息会被转存到exceptionTableRefQueue队列
static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {
final Throwable ex;
ExceptionNode next;
final long thrower; // 抛出异常的线程id
final int hashCode; // 在弱引用消失之前存储hashCode
ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
// //在ForkJoinTask被GC回收之后,会将该节点加入队列exceptionTableRefQueue
super(task, exceptionTableRefQueue);
this.ex = ex;
this.next = next;
this.thrower = Thread.currentThread().getId();
this.hashCode = System.identityHashCode(task);
}
}
/* 抽象方法:用于拓展 */
// 任务执行完成后返回结果,未完成返回null
public abstract V getRawResult();
// 强制性的给定返回结果
protected abstract void setRawResult(V value);
// 执行任务
// 如果执行过程抛出异常则记录捕获的异常并更改任务状态为EXCEPTIONAL
// 如果执行正常结束,设置任务状态为NORMAL正常结束状态
// 如果当前是子任务,设置为SIGNAL状态并通知其他需要join该任务的线程
protected abstract boolean exec();
/* 实现Future接口的方法 */
// 阻塞等待任务执行结果
public final V get();
// 在给定时间内等待返回结果,超出给定时间则中断线程
public final V get(long timeout, TimeUnit unit);
// 阻塞非工作线程直至任务结束或者中断(该过程可能会发生窃取动作),返回任务的status值
private int externalInterruptibleAwaitDone();
// 尝试取消任务,成功返回true,反之false
public boolean cancel(boolean mayInterruptIfRunning);
// 判断任务是否已执行结束
public final boolean isDone();
// 判断任务是否被取消
public final boolean isCancelled();
/* 一些重要的方法 */
// 执行任务的方法
final int doExec();
// 修改任务状态的方法
private int setcompletion (int completion);
// 取消任务的方法
public boolean cancel(boolean mayInterruptIfRunning);
// 将新创建的子任务放入当前线程的任务(工作)队列
public final ForkJoinTask<V> fork();
// 将当前线程阻塞,直到对应的子任务完成运行并返回执行结果
public final V join();
// 获取任务执行状态,如果还未结束,当前线程获取任务帮助执行
private int doJoin();
// 执行任务,正常结束则返回结果,异常结束则报告异常
public final V invoke();
// 使用当前线程执行任务
private int doInvoke();
// 阻塞线程直至任务执行结束,如果未执行完成,外部线程尝试帮助执行
private int externalAwaitDone();
// 同时执行两个任务,第一个任务由当前线程执行,第二个交由工作线程执行
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2);
// 执行多个任务,入参为任意个任务对象,除开第一个任务,其他交由工作线程执行
public static void invokeAll(ForkJoinTask<?>... tasks);
// 入参为Collection集合,可以支持返回结果
public static <T extends ForkJoinTask<?>>
Collection<T> invokeAll(Collection<T> tasks);
/* 异常相关的方法 */
// 记录异常信息以及设置任务状态
final int recordExceptionalCompletion(Throwable ex);
// 删除异常结点并清理状态
private void clearExceptionalCompletion();
// 删除哈希表中过期的异常信息引用
private static void expungeStaleExceptions();
// 获取任务异常判断与当前线程堆栈关系是否相关,
// 不相关则构建一个相同类型的异常,作为记录
// 这样做的原因是为了提供准确的堆栈跟踪
private Throwable getThrowableException();
}
ForkJoinTask内部成员主要由两部分构成,一个是表示任务状态的int成员:status,其他的成员则都是跟任务异常信息记录相关的。
不过ForkJoinTask内部有关异常信息记录的成员都是static关键字修饰的,也就代表着这些成员是所有ForkJoinTask对象共享的
ForkJoinTask使用类似与HashMap的实现结构:固定长度32的数组+单向链表实现了一个哈希表结构,用于记录所有ForkJoinTask执行过程中出现的异常,所有异常信息都会被封装成ExceptionNode节点加入哈希表中存储,但是ExceptionNode节点是一种弱引用的实现,当程序下次GC发生时会被GC机制回收,GC时这些已捕获的异常则会被转移到exceptionTableRefQueue队列中存储。
成员status代表任务的执行状态,成员类型为int,从最大程度上减少了内存占用,为了保证原子性,该成员使用了volatile修饰以及操作时都是CAS操作。
当任务未结束时,status都会大于0,任务执行结束后,status都会小于0。
ForkJoinTask中的所有方法也可以分为三大类:基于status状态成员操作以及维护方法 & 执行任务以及等待完成方法 & 附加对外报告结果的用户级方法
而其中的核心便是join和fork方法
// ForkJoinTask类 → fork方法
public final ForkJoinTask<V> fork() {
Thread t;
// 判断当前执行的线程是否为池中的工作线程
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
// 如果是的则直接将任务压入当前线程的任务队列
((ForkJoinWorkerThread)t).workQueue.push(this);
else
// 如果不是则压入common池中的某个工作线程的任务队列中
ForkJoinPool.common.externalPush(this);
// 返回当前ForkJoinTask对象,方便递归拆分
return this;
}
// ForkJoinTask类 → join方法
public final V join() {
int s;
// 判断任务执行状态如果是非正常结束状态
if ((s = doJoin() & DONE_MASK) != NORMAL)
// 抛出相关的异常堆栈信息
reportException(s);
// 正常执行结束则返回执行结果
return getRawResult();
}
// ForkJoinTask类 → doJoin方法
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
// status<0则直接返回status值
return (s = status) < 0 ? s :
// 判断当前线程是否为池中的工作线程
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
// 是则取出线程任务队列中的当前task执行,执行完成返回status值
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
// 执行未完成则调用awaitJoin方法等待执行完成
wt.pool.awaitJoin(w, this, 0L) :
// 不是则调用externalAwaitDone()方法阻塞挂起当前线程
externalAwaitDone();
}
ForkJoinTask的内部类一共有五个,分别是:
// 用于记录任务执行过程中抛出的异常信息,是ForkJoinTask的弱引用
static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {}
// 用于封装Runable类型任务的适配器,可以通过构造器设置返回集
static final class AdaptedRunnable<T> extends ForkJoinTask<T> implements RunnableFuture<T> {}
// 用于封装Runable类型任务的适配器,抽象方法实现
// getRawResult()方法:直接返回null
// setRawResult()方法:空实现
// exec()方法:直接调用的run()方法
static final class AdaptedRunnableAction extends ForkJoinTask<Void> implements RunnableFuture<Void> {}
// 同上,只不过能抛出异常
static final class RunnableExecuteAction extends ForkJoinTask<Void> {}
// 用于封装Callable类型任务的适配器,抽象方法实现
// getRawResult()方法:返回call方法的执行结果
// setRawResult()方法:设置Callable执行后的返回值
// exec()方法:调用的call()方法
static final class AdaptedCallable<T> extends ForkJoinTask<T> implements RunnableFuture<T> {}
2.3:线程池
ForkJoinPool也是实现了ExecutorService的线程池
但ForkJoinPool不同于其他类型的线程池,因为其内部实现了工作窃取机制
所有线程在执行完自己的任务之后都会尝试窃取其他线程的任务执行,只有当窃取不到任务的情况下才会发生阻塞等待工作。
ForkJoinPool主要是为了执行ForkJoinTask而存在的,是整个ForkJoin框架的核心,负责整个框架的核心管理、检查监控与资源调度。
关于@Contended和伪共享参考大佬笔记
2.3.1:ForkJoinPool构造器
// 构造器1:使用默认的参数配置创建
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
// 构造器2:可指定并行度
public ForkJoinPool(int parallelism) {
this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
// 构造器3:可指定并行度、线程工厂、异常策略以及调度模式
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
// 私有全参构造函数:提供给内部其他三个构造器调用
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
ForkJoinPool对外提供了三个构造器,都是基于内部的私有构造完成的,所以直接分析最后一个全参的私有构造器,该构造器共有五个参数:
- parallelism并行度:默认为CPU核数,最小为1。相当于工作线程数,但会有些不同
- factory线程工厂:用于创建ForkJoinWorkerThread线程
- handler异常捕获策略:默认为null,执行任务出现异常从中被抛出时,就会被handler捕获
- mode调度模式:对应前三个构造中的asyncMode参数,默认为0,也就是false
- false:使用LIFO_QUEUE成员,mode=0,使用先进后出的模式调度工作
- true:使用FIFO_QUEUE成员,mode=1<<16,使用先进先出的模式调度工作
- workerNamePrefix工作名称前缀:工作线程的名称前缀,有默认值,不需要传递该参数
创建ForkJoinPool线程池除开通过构造函数的方式之外,在JDK1.8中还提供了一个静态方法:commonPool()
该方法可以通过指定系统参数的方式(System.setProperty(?,?))定义“并行度、线程工厂和异常处理策略”
但是该方法是一个静态方法,调用commonPool()方法获取到的ForkJoinPool对象是整个程序通用的。
2.3.2:ForkJoinPool内部成员
// 线程池的ctl控制变量(与上篇中分析的ctl性质相同)
volatile long ctl;
// 线程池的运行状态,值为常量中对应的值
volatile int runState;
// 将并行度和mode参数放到了一个int中,便于后续通过位操作计算
final int config;
// 随机种子,与SEED_INCREMENT魔数配合使用
int indexSeed;
// 组成WorkQueue数组,是线程池的核心数据结构
volatile WorkQueue[] workQueues;
// 创建线程的线程工厂
final ForkJoinWorkerThreadFactory factory;
// 任务在执行过程中出现抛出异常时的处理策略,类似于之前线程池的拒绝策略
final UncaughtExceptionHandler ueh;
// 创建线程时,线程名称的前缀
final String workerNamePrefix;
// 任务窃取的原子计数器
volatile AtomicLong stealCounter;
// 默认创建工作线程的工厂类
public static final ForkJoinWorkerThreadFactory
defaultForkJoinWorkerThreadFactory;
// 线程修改许可,用于检测代码是否具备修改线程状态的权限
private static final RuntimePermission modifyThreadPermission;
// 通用的ForkJoinPool线程池,用于commonPool()方法
static final ForkJoinPool common;
// 通用线程池的并行数
static final int commonParallelism;
// 通用线程池的最大线程数
private static int commonMaxSpares;
// 用于记录已创建的ForkJoinPool线程池的个数
private static int poolNumberSequence;
// 当线程执行完成自己的任务且池中没有活跃线程时,用于计算阻塞时间,默认2s
private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L;
// 平衡计数,通过IDLE_TIMEOUT会减去TIMEOUT_SLOP,
// 主要为了平衡系统定时器唤醒时带来的延时时间,默认20ms
private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L;
// 通用线程池默认的最大线程数 256
private static final int DEFAULT_COMMON_MAX_SPARES = 256;
/**
* 自旋次数:阻塞之前旋转等待的次数,目前使用的是随机旋转
* 在awaitRunStateLock、awaitWork以及awaitRunstateLock方法中使用,
* 当前设置为零,以减少自旋带来的CPU开销
* 如果大于零,则SPINS的值必须为2的幂,至少为 4
*/
private static final int SPINS = 0;
// 这个是产生随机性的魔数,用于扫描的时候进行计算(与ThreadLocal类似)
private static final int SEED_INCREMENT = 0x9e3779b9;
// runState的状态:处于SHUTDOWN时值必须为负数,其他状态只要是2的次幂即可
// 锁定状态:线程池被某条线程获取了锁
private static final int RSLOCK = 1;
// 信号状态:线程阻塞前需要设置RSIGNAL,告诉其他线程在释放锁时要叫醒我
private static final int RSIGNAL = 1 << 1;
// 启动状态:表示线程池正常,可以创建线程且接受任务处理
private static final int STARTED = 1 << 2;
// 停止状态:线程池已停止,不能创建线程且不接受新任务,同时会取消未处理的任务
private static final int STOP = 1 << 29;
// 死亡状态:表示线程池内所有任务已取消,所有工作线程已销毁
private static final int TERMINATED = 1 << 30;
// 关闭状态:尝试关闭线程池,不再接受新的任务,但依旧处理已接受的任务
private static final int SHUTDOWN = 1 << 31;
// CTL变量的一些掩码
// 低32位的掩码
private static final long SP_MASK = 0xffffffffL;
// 高32位的掩码
private static final long UC_MASK = ~SP_MASK;
// 有效(活跃/存活)计数:正在处理任务的活跃线程数
// 高16位的偏移量,用高16位记录活跃线程数
private static final int AC_SHIFT = 48;
// 高16位:活跃线程的计数单元,高16位+1
private static final long AC_UNIT = 0x0001L << AC_SHIFT;
// 高16位:活跃线程数的掩码
private static final long AC_MASK = 0xffffL << AC_SHIFT;
// 总计数:整个池中存在的所有线程数量
// 总线程数量的偏移量,使用高32位中的低16位记录
private static final int TC_SHIFT = 32;
// 总线程数的计数单元
private static final long TC_UNIT = 0x0001L << TC_SHIFT;
// 总线程数的掩码
private static final long TC_MASK = 0xffffL << TC_SHIFT;
// 最大总线程数的掩码,用于判断线程数量是否已达上限
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15);
// 低16位掩码,表示workQueue在数组中的最大索引值
static final int SMASK = 0xffff;
// 最大工作线程数
static final int MAX_CAP = 0x7fff;
// 偶数掩码,第一个bit为0,任何值与它进行 与 计算,结果都为偶数
static final int EVENMASK = 0xfffe;
// 用于计算偶数值下标,SQMASK值为126
// 0~126之间只存在64个偶数,所以偶数位的槽数只有64个
static final int SQMASK = 0x007e;
// Masks and units for WorkQueue.scanState and ctl sp subfield
// 用于检测工作线程是否在执行任务的掩码
static final int SCANNING = 1;
// 负数,用于workQueue.scanState,与scanState进行位或可将scanState变成负数,
// 表示工作线程扫描不到任务,进入不活跃状态,将可能被阻塞
static final int INACTIVE = 1 << 31;
// 版本计数,用于workQueue.scanState,解决ABA问题
static final int SS_SEQ = 1 << 16;
// Mode bits for ForkJoinPool.config and WorkQueue.config
// 获取队列的工作调度模式
static final int MODE_MASK = 0xffff << 16;
// 表示先进后出模式
static final int LIFO_QUEUE = 0;
// 表示先进先出模式
static final int FIFO_QUEUE = 1 << 16;
// 表示共享模式
static final int SHARED_QUEUE = 1 << 31;
// Unsafe类对象:用于直接操作内存
private static final sun.misc.Unsafe U;
// ForkJoinTask[]数组的基准偏移量
// 使用这个值+元素的大小可以直接定位一个内存位置
private static final int ABASE;
// ForkJoinTask[]数组两个元素之间的间距的幂 → log(间距) 底数为2
private static final int ASHIFT;
// ctl成员的内存偏移量地址
private static final long CTL;
// runState成员的内存偏移量地址
private static final long RUNSTATE;
// stealCounter成员的内存偏移量地址
private static final long STEALCOUNTER;
// parkBlocker成员的内存偏移量地址
private static final long PARKBLOCKER;
// WorkQueue队列的top元素偏移量地址
private static final long QTOP;
// WorkQueue队列的qlock偏移量地址
private static final long QLOCK;
// WorkQueue队列的scanState偏移量地址
private static final long QSCANSTATE;
// WorkQueue队列的parker偏移量地址
private static final long QPARKER;
// WorkQueue队列的CurrentSteal偏移量地址
private static final long QCURRENTSTEAL;
// WorkQueue队列的CurrentJoin偏移量地址
private static final long QCURRENTJOIN;
总的可以分为四类:
- ForkJoinPool线程池运行过程中的成员结构
- 通用线程池common以及所有线程池对象的默认配置
- 线程池运行状态以及ctl成员的位存储记录
- 直接操作内存的Unsafe相关成员及内存偏移量
这里只需要强调最为核心的成员ctl, 它是一个long类型的成员,占位8byte/64bit,ctl不是只存储一个数据,而是同时记录四个
64Bit被拆分为四个16位的子字段,分别记录:
- 1-16bit/AC:记录池中的活跃线程数
- 17-32bit/TC:记录池中总线程数
- 33-48bit/SS:记录WorkQueue状态,第一位表示active还是inactive,其余15位表示版本号(避免ABA问题)
- 49-64bit/ID:记录第一个WorkQueue在数组中的下标,和其他worker通过字段stackPred组成的一个Treiber堆栈
- 低32位可以直接获取,如SP=(int)ctl,如果为负则代表存在空闲worker
2.3.3:内部类WorkQueue工作队列
WorkQueue是整个Fork/Join框架的桥接者,每个执行者ForkJoinWorkerThread对象中存在各自的工作队列
ForkJoinTask被存储在工作队列中,而ForkJoinPool使用一个WorkQueue数组管理协调所有执行者线程的队列。
// 使用@Contended防止伪共享
@sun.misc.Contended
static final class WorkQueue {
// 队列可存放任务数的初始容量:8192
// 初始化时分配这么大容量的原因是为了减少哈希冲突导致的扩容
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
// 队列可存放任务数的最大容量
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
// 队列的扫描状态。高十六位用于版本计数,低16位用于记录扫描状态
// 偶数位的WorkQueue,该值为负数时表示不活跃
// 奇数位的WorkQueue,该值一般为workQueues数组中的下标值,表示当前线程在执行
// 如果scanState为负数,代表线程没有找到任务执行,被挂起了,处于不活跃状态
// 如果scanState是奇数,代表线程在寻找任务过程中,如果变成了偶数,代表线程在执行任务
volatile int scanState;
// 前一个栈顶的ctl值
int stackPred;
// 窃取任务的数量统计
int nsteals;
// 用于记录随机选择窃取任务,被窃取任务workQueue在数组中的下标值
int hint;
// 记录当前队列在数组中的下标和工作模式
// 高十六位记录工作模式,低十六位记录数组下标
int config;
// 锁标识(类似于AQS是state锁标识)
// 为1表示队列被锁定,为0表示未锁定
// 小于0代表当前队列注销或线程池关闭(terminate状态时为-1)
volatile int qlock;
// 下一个pool操作的索引值(栈底/队列头部)
volatile int base;
// 下一个push操作的索引值(栈顶/队列尾部)
int top;
// 存放任务的数组,初始化时不会分配空间,采用懒加载形式初始化空间
ForkJoinTask<?>[] array;
// 所属线程池的引用指向
final ForkJoinPool pool;
// 当前队列所属线程的引用,如果为外部提交任务的共享队列则为null
final ForkJoinWorkerThread owner;
// 线程在执行过程中如果被挂起阻塞,该成员保存被挂起的线程,否则为空
volatile Thread parker;
// 等待join合并的任务
volatile ForkJoinTask<?> currentJoin;
// 窃取过来的任务
volatile ForkJoinTask<?> currentSteal;
// 构造器
WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
this.pool = pool;
this.owner = owner;
// 开始的时候都指向栈顶
base = top = INITIAL_QUEUE_CAPACITY >>> 1;
}
/* 重点方法 */
// 添加方法:将一个任务添加进队列中
// 注意:索引不是通过数组下标计算的,而是通过计算内存偏移量定位
final void push(ForkJoinTask<?> task);
// 扩容方法:队列元素数量达到容量时,扩容两倍并移动元素到新数组
final ForkJoinTask<?>[] growArray();
// 获取方法:从栈顶(LIFO)弹出一个任务
final ForkJoinTask<?> pop();
// 获取方法:从栈底(FIFO)弹出一个任务
final ForkJoinTask<?> poll();
}
WorkQueue内部采用一个数组存储所有分配的任务,线程执行时会从该队列中获取任务,如果数组为空,那么则会尝试窃取其他线程的任务。
因此,可以得出结论:
- 在ForkJoinPool中存在一个由WorkQueue构成的数组成员workQueues
- 在每个WorkQueue中又存在一个ForkJoinTask构成的数组成员array
所以Fork/Join框架中存储任务的结构如下:
重点:
- workQueues数组的容量必须为2的整次幂。下标为偶数的用于存储外部提交的任务,奇数位置存储内部fork出的子任务
- 偶数位置的任务属于共享任务,由工作线程竞争获取,模式为FIFO
- 奇数位置的任务属于某个工作线程,一般是fork产生的子任务
- 工作线程在处理完自身任务时会窃取其他线程的任务,窃取方式为FIFO
- 工作线程执行自己队列中任务的模式默认为LIFO(可以改成FIFO,不推荐)
3:invoke任务提交原理
execute()、submit()方法最终实现都是相同的,只是入口不同。
// ForkJoinPool类 → invoke()方法
public <T> T invoke(ForkJoinTask<T> task) {
// 如果任务为空,抛出空指针异常
if (task == null)
throw new NullPointerException();
// 如果不为空则提交任务执行
externalPush(task);
// 等待任务执行结果返回
return task.join();
}
// ForkJoinPool类 → externalPush()方法
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; int m;
// 获取线程的探针哈希值以及线程池运行状态
int r = ThreadLocalRandom.getProbe();
int rs = runState;
// 判断线程池是否具备了任务提交的环境
// 如果工作队列数组已经初始化
// 并且数组以及数组中偶数位的工作队列不为空
// 并且线程池状态正常
// 并且获取队列锁成功
// 满足条件则开始提交任务
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a; int am, n, s;
// 判断队列中的任务数组是否初始化并且数组是否还有空位
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
// 通过计算内存偏移量得到任务要被的存储索引
int j = ((am & s) << ASHIFT) + ABASE;
// 通过Unsafe类将任务写入到数组中
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
U.putIntVolatile(q, QLOCK, 0);
// 如果队列任务很多
if (n <= 1)
// 唤醒或者新启一条线程帮忙处理
signalWork(ws, q);
return;
}
// 释放队列锁
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
// 提交执行
externalSubmit(task);
}
// ForkJoinPool类 → externalSubmit()方法
private void externalSubmit(ForkJoinTask<?> task) {
int r;
// 如果当前提交任务的线程的探针哈希值为0,
// 则初始化当前线程的探针哈希值
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
// 开启死循环直至成功提交任务为止
for (;;) {
WorkQueue[] ws; WorkQueue q; int rs, m, k;
// 定义竞争标识
boolean move = false;
// 如果runState小于0代表为负数,代表线程池已经要关闭了
if ((rs = runState) < 0) {
// 尝试关闭线程池
tryTerminate(false, false); // help terminate
// 线程池关闭后同时抛出异常
throw new RejectedExecutionException();
}
// 如果线程池还未初始化,先对线程池进行初始化操作
else if ((rs & STARTED) == 0 || // initialize
((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
int ns = 0;
// 获取池锁,没获取锁的线程则会自旋或者阻塞挂起
rs = lockRunState();
try {
// 再次检测是否已初始化
if ((rs & STARTED) == 0) {
U.compareAndSwapObject(this, STEALCOUNTER, null,
new AtomicLong());
// 获取并行数
int p = config & SMASK;
// 通过如下计算得到最接近2次幂的值
// 找到之后对该值 * 2倍
// 原理:将p中最高位的那个1以后的位都设置为1,
// 最后加1得到最接近的二次幂的值
int n = (p > 1) ? p - 1 : 1;
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
workQueues = new WorkQueue[n];
ns = STARTED;
}
} finally {
// 释放锁,并更改运行状态为STARTED
unlockRunState(rs, (rs & ~RSLOCK) | ns);
}
}
// r:随机值,m:工作队列的容量减1,SQMASK:偶数位最大的64个的掩码
// r&m计算出了下标,位与SQMASK之后会变成一个<=126的偶数
// 如果随机出来的偶数位下标位置队列不为空
else if ((q = ws[k = r & m & SQMASK]) != null) {
// 先获取队列锁
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
// 获取队列的任务数组
ForkJoinTask<?>[] a = q.array;
// 记录队列原本的栈顶/队列尾部的数组下标
int s = q.top;
// 提交标识
boolean submitted = false;
try {
// 如果数组不为空并且数组中还有空位
// (a.length > s+1-q.base如果不成立则代表空位不足)
// 队列元素数量达到容量,没有空位时调用growArray进行扩容
if ((a != null && a.length > s + 1 - q.base) ||
(a = q.growArray()) != null) {
// 通过计算内存偏移量得到栈顶/队列尾部位置
int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
// 将新的任务放在栈顶/队列尾部位置
U.putOrderedObject(a, j, task);
// 更新栈顶/队列尾部
U.putOrderedInt(q, QTOP, s + 1);
// 提交标识改为true
submitted = true;
}
} finally {
// 释放队列锁
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
// 如果任务已经提交到了工作队列
if (submitted) {
// 创建新的线程处理,如果线程数已满,唤醒线程处理
signalWork(ws, q);
return;
}
}
// 能执行到这里则代表前面没有获取到锁,该位置的队列有其他线程在操作
// 将竞争标识改为true
move = true;
}
// 如果随机出来的偶数下标位置的队列为空
// 那么则在该位置上新建工作队列,然后将任务放进去
else if (((rs = runState) & RSLOCK) == 0) {
// 新建一个工作队列,第二个参数是所属线程
// 现在创建的第二个参数为null,因为偶数位的队列是共享的
q = new WorkQueue(this, null);
// 队列记录一下前面的随机值
q.hint = r;
// k是前面计算出的偶数位置索引,SHARED_QUEUE是共享队列模式
// 使用高16位存储队列模式,低16位存储数组索引
q.config = k | SHARED_QUEUE;
// 扫描状态为失活状态(负数,因为共享队列
// 不属于任何一个工作线程,它不需要标记工作线程状态)
q.scanState = INACTIVE;
// 获取池锁
rs = lockRunState();
// 将新创建的工作队列放入数组中
if (rs > 0 && (ws = workQueues) != null &&
k < ws.length && ws[k] == null)
ws[k] = q;
// 释放池锁
unlockRunState(rs, rs & ~RSLOCK);
}
else
move = true;
// 如果计算出的偶数位置有其他线程在操作,为了减少竞争,
// 获取下一个随机值,重新定位一个新的位置处理
if (move)
r = ThreadLocalRandom.advanceProbe(r);
}
}
流程非常的复杂,总体来说,一共10步:
- 判断任务是否为空,为空抛出异常,不为空则开始提交任务
- 调用externalPush()方法尝试快速提交任务
- 快速提交条件:探针哈希值已初始化、池中队列数组已初始化、随机的偶数位置队列不为空、线程池已初始化并状态正常、能成功获取队列锁
- 如上条件全部成立则快速提交任务,提交成功直接返回,结束执行
- 如果不成立则调用externalSubmit()提交任务,流程如下述步骤
- 初始化线程的探针哈希值并开启死循环提交任务(这个循环会直至提交成功才终止)
- 检查线程池状态是否正常,如果状态为关闭状态则拒绝任务,抛出异常
- 检查线程池是否已启动,如果还未启动则获取池锁,初始化线程池
- 如果通过探针值+队列数组容量减一的值+掩码计算出的偶数位队列不为空 -> 尝试获取队列锁:
- 成功:将任务添加到计算出的偶数位队列的任务数组中,如过数组长度不够则先扩容,任务添加成功后新建或唤醒一条线程,然后返回
- 失败:代表有其他线程在操作这个偶数位置的队列,将move标识改为true
- 如果计算出的偶数位置队列还未初始化,那么则先尝试获取池锁
- 成功:在该位置上创建一个共享队列,最后再释放池锁
- 失败:代表有其他线程也在操作池中的队列数组,将move标识改为true
- 如果move竞争标识为true,代表本次操作存在线程竞争,为了减少竞争,重新获取一个新的探针哈希值,计算出一个新的偶数位进行操作
- 当任务第一次执行没有添加成功时,会继续重复这些步骤,直至任务成功入列,例如第一次添加失败的情况:
- 队列存在其他线程操作没有获取到队列锁
- 计算出的偶数索引位的队列为空,第一次执行会先初始化队列
- 任务成功提交到工作队列后,join等待任务执行完成,返回结果合并
4:任务工作原理
任务的执行都是通过调用signalWork()执行的,这个方法会新创建一条线程处理任务,当线程数量已经达到线程池的最大线程数时,会尝试唤醒一条线程
// ForkJoinPool类 → signalWork()方法
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c; int sp, i; WorkQueue v; Thread p;
// ctl初始化时为负数,如果ctl<0则代表有任务需要处理
while ((c = ctl) < 0L) {
// sp==0代表不存在空闲的线程
// 前面分析成员构成的时候提到过:
// 低32位可以直接获取,如SP=(int)ctl,
// 如果sp为负则代表存在空闲worker
if ((sp = (int)c) == 0) {
// 如果池中线程数量还未达到最大线程数
if ((c & ADD_WORKER) != 0L)
// 创建一条新的线程来处理工作
tryAddWorker(c);
// 新建线程完成后退出循环并返回
break;
}
// 下面这三个判断都是在检测线程池状态是否正常
// 因为signalWork只能框架内部调用,所以传入的队列不可能为空,
// 除非是处于unstarted/terminated状态,代表线程池即将关闭,
// 尝试中断未执行任务,直接清空了任务,所以此时直接中断执行
if (ws == null)
break;
if (ws.length <= (i = sp & SMASK))
break;
if ((v = ws[i]) == null)
break;
// 下述代码是获取所有阻塞线程链中的top线程并唤醒它
// 但是在唤醒之前需要先把top线程的stackPerd标识放在ctl中
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
int d = sp - v.scanState; // screen CAS
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
// 利用CAS机制修改ctl值
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs;
if ((p = v.parker) != null)
// 唤醒线程
U.unpark(p);
// 唤醒后退出
break;
}
// 如果队列为空或者没有task,退出执行
if (q != null && q.base == q.top)
break;
}
}
// ForkJoinPool类 → tryAddWorker()方法
private void tryAddWorker(long c) {
// 定义新增标识
boolean add = false;
do {
// 添加活跃线程数和总线程数
long nc = ((AC_MASK & (c + AC_UNIT)) |
(TC_MASK & (c + TC_UNIT)));
// 如果ctl值没有被其他线程修改
if (ctl == c) {
int rs, stop;
// 获取锁并检测线程池状态是否正常
if ((stop = (rs = lockRunState()) & STOP) == 0)
// 只有当线程池没有停止才可以创建线程
add = U.compareAndSwapLong(this, CTL, c, nc);
// 释放池锁
unlockRunState(rs, rs & ~RSLOCK);
// 如果线程池状态已经stop,那么则退出执行
if (stop != 0)
break;
// 如果没有stop
if (add) {
// 则新建线程
createWorker();
// 退出
break;
}
}
// ADD_WORKER的第48位是1,和ctl位与运算是为了检查总线程是否已满
// (int)c == 0代表池中不存在空闲线程数
// 只有当总线程数未满时以及池中不存在空闲线程数才会创建线程
} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}
- 如果有任务需要处理并且池中目前不存在空闲线程并且池中线程还未满,调用tryAddWorker()方法尝试创建线程
- 获取池锁更改ctl值并检测线程池的状态是否正常,正常则调用createWorker()创建线程
- tryAddWorker()是一个自旋方法,在池中线程数未满且没有出现空闲线程的情况下,会一直循环至成功创建线程或者池关闭
- 如果池中存在空闲线程或者线程数已满,那么则会尝试唤醒阻塞链上的第一条线程
4.1、工作线程创建及注册原理
// ForkJoinPool类 → createWorker()方法
private boolean createWorker() {
// 获取池中的线程工厂
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
// 通过线程工厂的newThread方法创建一条新线程
if (fac != null && (wt = fac.newThread(this)) != null) {
// 创建成功后返回true
wt.start();
return true;
}
} catch (Throwable rex) {
// 如果出现异常则记录异常信息
ex = rex;
}
// 然后注销线程以及将之前tryAddWorker()方法中修改的ctl值改回去
deregisterWorker(wt, ex);
return false;
}
// DefaultForkJoinWorkerThreadFactory类 → newThread()方法
public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
// 直接创建了一条工作线程
return new ForkJoinWorkerThread(pool);
}
首先会获取池中采用的线程工厂,然后通过线程工厂创建一条ForkJoinWorkerThread工作线程
回到最开始的ForkJoin中的ForkJoinWorkerThread构造函数:
// ForkJoinWorkerThread类 → 构造函数
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// 调用Thread父类的构造函数创建线程实体对象
// 在这里是先暂时使用aFJT作为线程名称,当外部传递线程名称时会替换
super("aForkJoinWorkerThread");
// 当前设置线程池
this.pool = pool;
// 向ForkJoinPool线程池中注册当前线程,为当前线程分配任务队列
this.workQueue = pool.registerWorker(this);
}
// ForkJoinPool类 → registerWorker()方法
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
// 异常策略
UncaughtExceptionHandler handler;
// 设置为守护线程
wt.setDaemon(true);
// 为创建的线程设置异常汇报策略
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
// 为创建的线程分配任务队列
WorkQueue w = new WorkQueue(this, wt);
int i = 0; // 池中队列数组的索引
int mode = config & MODE_MASK; // 获取队列模式
int rs = lockRunState(); // 获取池锁
try {
WorkQueue[] ws; int n; // skip if no array
// 如果池中队列数组不为空并且已经初始化
if ((ws = workQueues) != null && (n = ws.length) > 0) {
// 获取用于计算数组下标随机的索引种子
int s = indexSeed += SEED_INCREMENT;
int m = n - 1; // 获取队列数组的最大索引值
// 计算出一个奇数位索引
// 与1位或,就是将第1个bit位设为1,此时这个数必然是奇数
// 与m位与,为了保证得到的值是在m以内奇数下标值
i = ((s << 1) | 1) & m;
// 如果计算出的位置不为空则代表已经有队列了,
// 代表此时发生了碰撞冲突,那么此时则需要换个位置
if (ws[i] != null) {
int probes = 0;
// 计算步长,步长是一个不能为2以及2次幂值的偶数
// 为了保证计算出的值不为2的次幂值,会在最后进行+2操作
// 后续会用原本的索引值+步长得到一个新的奇数索引值
// 奇数+偶数=奇数,所以不需要担心会成为偶数位索引
// 这里计算步长是通过长度n来计算的,
// 因为步长大一些,避免冲突的概念就会小一些
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
// 如果新计算出的奇数位索引位置依旧不为空
while (ws[i = (i + step) & m] != null) {
// 从下标0开始遍历整个数组
if (++probes >= n) {
// 如果所有奇数位值都不为空,代表数组满了,
// 那么扩容两倍,扩容后重新再遍历一次新数组
// 直至找出为空的奇数位下标
workQueues = ws = Arrays.copyOf(ws, n <<= 1);
m = n - 1;
probes = 0;
}
}
}
// 记录这个随机出来的索引种子
w.hint = s; // use as random seed
// 将前面计算得到的奇数位索引值以及工作模式记录在config
w.config = i | mode;
// 扫描状态队列在数组中的下标,为正数表示正在扫描任务状态
w.scanState = i; // publication fence
// 将前面创建的队列放在队列数组的i位置上
ws[i] = w;
}
} finally {
// 释放池锁
unlockRunState(rs, rs & ~RSLOCK);
}
// 在这里再设置线程的名称
wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
return w;
}
工作线程的创建与注册原理:
- 将线程设置为守护线程,同时为新线程创建工作队列和设置异常处理策略
- 尝试获取池锁成功后,先获取一个随机生成的用于计算数组下标的索引种子,然后通过种子和数组最大下标计算出一个奇数索引值
- 如果计算出的奇数位值不为空,则通过偶数掩码+数组最大下标计算出一个偶数步长,然后通过这个步长循环整个数组找一个空的位置,如果找完了整个数组还是没有奇数空位,则对数组发生两倍扩容,然后再次依照步长遍历新数组找空位,直至找到奇数空位为止
- 为队列设置hint、config、scanState值并将队列放到计算出的奇数位置上
- 释放池锁并设置工作线程名字
4.2:工作线程注销原理
// ForkJoinPool类 → deregisterWorker()方法
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
WorkQueue w = null;
// 如果工作线程以及它的工作队列不为空
if (wt != null && (w = wt.workQueue) != null) {
WorkQueue[] ws;
// 获取队列在池中数组的下标
int idx = w.config & SMASK;
// 获取池锁
int rs = lockRunState();
// 移除队列数组中idx位置的队列
if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
ws[idx] = null;
// 释放池锁
unlockRunState(rs, rs & ~RSLOCK);
}
long c;
// 在CTL成员中减去一个线程数
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
(TC_MASK & (c - TC_UNIT)) |
(SP_MASK & c))));
if (w != null) {
// 标识这个队列已停止工作
w.qlock = -1;
// 将当前工作队列的偷取任务数加到ForkJoinPool#stealCounter中
w.transferStealCount(this);
// 取消队列中剩余的任务
w.cancelAll();
}
for (;;) {
WorkQueue[] ws; int m, sp;
// 如果线程池是要关闭了,那么直接退出
if (tryTerminate(false, false) || w == null || w.array == null ||
(runState & STOP) != 0 || (ws = workQueues) == null ||
(m = ws.length - 1) < 0)
break;
// 如果线程池不是要关闭,那么先通过ctl看看有没有阻塞的线程
if ((sp = (int)(c = ctl)) != 0) {
// 如果有则唤醒它来代替被销毁的线程工作
if (tryRelease(c, ws[sp & m], AC_UNIT))
break;
}
// 如果池中不存在阻塞挂起的线程,则先判断池内线程是否已满
else if (ex != null && (c & ADD_WORKER) != 0L) {
// 如果没满则新建一条线程代替被销毁的线程工作
tryAddWorker(c);
break;
}
// 如果池运行正常,不存在线程阻塞,线程数已满
else
// 那么直接退出
break;
}
if (ex == null)
// 清理异常哈希表中当前线程的异常节点信息
ForkJoinTask.helpExpungeStaleExceptions();
else
// 抛出异常
ForkJoinTask.rethrow(ex);
}
- 获取池锁之后将工作线程的任务队列从数组中移除,移除后释放池锁
- 将偷窃的任务数加到stealCounter成员,然后取消自身队列中的所有任务
- 判断当前线程池的情况,判断当前销毁线程是否是因为线程池要关闭了:
- 如果是:直接退出
- 如果不是:再判断池中是否存在挂起阻塞的线程
- 存在:唤醒阻塞线程来代替被销毁的线程工作
- 不存在:判断池中线程是否已满
- 没满:新建一条线程代替被销毁的线程工作
- 满了:直接退出
- 清除异常哈希表中当前线程的异常节点信息,然后抛出异常
总的来说,在销毁线程时,会先注销已注册的工作队列,注销之后会根据情况选择唤醒或新建一条线程来补偿线程池。
5:执行任务/工作窃取实现过程
先回到之前的createWorker()方法:
// ForkJoinPool类 → createWorker()方法
private boolean createWorker() {
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
if (fac != null && (wt = fac.newThread(this)) != null) {
// 创建成功则调用start()方法执行
wt.start();
return true;
}
} catch (Throwable rex) {
ex = rex;
}
// 如果创建过程出现异常则注销线程
deregisterWorker(wt, ex);
return false;
}
可以看到,创建线程成功后则会开始调用start()方法执行任务,最终会找到run()方法执行它:
// ForkJoinWorkerThread类 → run()方法
public void run() {
// 如果任务数组不为空
if (workQueue.array == null) {
Throwable exception = null;
try {
// 钩子函数,用于拓展,这里是空实现
onStart();
// 使用线程池的runWorker方法执行队列任务
pool.runWorker(workQueue);
} catch (Throwable ex) {
// 如果执行过程中出现异常则先记录
exception = ex;
} finally {
try {
// 钩子函数,报告异常
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
// 如果执行出现异常,注销线程
pool.deregisterWorker(this, exception);
}
}
}
}
// ForkJoinPool类 → runWorker()方法
final void runWorker(WorkQueue w) {
// 初始化任务数组,任务数组一开始是没有初始化的
// 这个方法是初始化或两倍扩容数组
w.growArray();
// 获取注册队列时记录的用于计算索引的随机种子
int seed = w.hint;
// 如果种子为0,那么则改为1,避免使用0
int r = (seed == 0) ? 1 : seed;
// 死循环
for (ForkJoinTask<?> t;;) {
// 扫描任务:在池的队列数组中随机选择工作队列,获取任务执行
if ((t = scan(w, r)) != null)
// 如果获取到任务则执行
w.runTask(t);
// 没有扫描到任务则尝试自旋或挂起阻塞
else if (!awaitWork(w, r))
break;
// 每次执行完后修改随机值,换个队列获取任务
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}
// ForkJoinPool类 → scan()方法
private ForkJoinTask<?> scan(WorkQueue w, int r) {
WorkQueue[] ws; int m;
// 如果队列数组不为空并且任务队列已经初始化且不为空
if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
// 获取当前队列scanState,最开始为队列在数组中的下标
int ss = w.scanState;
// r&m:随机得到一个数组中的下标,oldSum/checkSum:比较效验和的标识
// 开启循环
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
int b, n; long c;
// 如果随机出的下标位置队列不为空
if ((q = ws[k]) != null) {
// 判断队列中有没有任务
if ((n = (b = q.base) - q.top) < 0 &&
(a = q.array) != null) {
// FIFO模式,通过内存偏移量计算出栈底/队头位置
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
// 获取栈底的任务
if ((t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i))) != null &&
q.base == b) {
// 如果工作线程处于活跃状态
if (ss >= 0) {
// 尝试利用CAS机制抢占线程(可能存在多个线程)
if (U.compareAndSwapObject(a, i, t, null)) {
// 抢占任务成功后将栈底挪一个位置
// 方便其他线程继续获取任务
q.base = b + 1;
// 如果队列中还剩有其他任务
if (n < -1) // signal others
// 新建或唤醒一条线程继续处理
signalWork(ws, q);
return t;
}
}
// 如果当前线程未激活,处于阻塞状态
else if (oldSum == 0 && // try to activate
w.scanState < 0)
// 唤醒线程
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
}
// 更新一次scanState值(因为前面可能唤醒了线程)
if (ss < 0) // refresh
ss = w.scanState;
// 获取一个新的随机值,用于随机下一个索引位置
r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
// 根据新的随机种子计算出一个新的下标索引
origin = k = r & m; // move and rescan
// 效验和的标识复位
oldSum = checkSum = 0;
// 结束本次循环,继续下次循环
continue;
}
// 如果没有获取到任务,checkSum+1,表示遍历完了一个位置
// 用于效验
checkSum += b;
}
// k=(k+1)&m代表去队列数组的下个位置继续查找下个坑位的队列,
// 如果 ==origin 了,代表已经遍历了所有的队列
if ((k = (k + 1) & m) == origin) { // continue until stable
// 如果工作线程还处于活跃状态并且扫描完成整个队列后,
// 效验和 还未发生改变,那代表着没有新的任务提交到线程池
if ((ss >= 0 || (ss == (ss = w.scanState))) &&
oldSum == (oldSum = checkSum)) {
// 如果活跃状态变为了<0,代表已经处于不活跃状态
// 那么则退出扫描,返回null,回到runWorker()阻塞线程
if (ss < 0 || w.qlock < 0) // already inactive
break;
// 灭活操作(灭活后的线程被称为失活状态):
// 先将当前scanState变为负数
int ns = ss | INACTIVE; // try to inactivate
// 在ctl中减去一个活跃线程数,
// 并且将失活的ss保存到ctl的低三十二位
long nc = ((SP_MASK & ns) |
(UC_MASK & ((c = ctl) - AC_UNIT)));
// 用工作线程的stackPred成员保存上一个失活线程的
// scanState,从而形成一个阻塞栈,ctl的低32位保存栈顶
w.stackPred = (int)c; // hold prev stack top
// 更新当前工作线程的scanState
U.putInt(w, QSCANSTATE, ns);
// 使用cas机制更新ctl值
if (U.compareAndSwapLong(this, CTL, c, nc))
ss = ns;
else
// 如果更新失败则退出回滚,继续扫描任务,因为cas过程
// 中,导致失败的原因就一个:ctl值发生了
// 改变,这可能是有新任务提交进来了之后,唤醒或
// 添加了一条线程
w.scanState = ss; // back out
}
// 检查标识复位
checkSum = 0;
}
}
}
// 如果未扫描到任务则直接返回null,并在外边的runWorker()发生阻塞
return null;
}
流程十分的复杂,共有八个步骤
- 线程start()启动之后会找到run()方法,然后会开始去竞争线程池中的共享任务
- 初始化线程的工作队列,同时获取注册队列时计算索引的随机种子
- 开启循环扫描,通过随机种子计算出池中队列数组中的一个下标索引
- 判断随机出来的索引位置是否为空:
- 不为空:判断队列中是否存在任务:
- 存在:判断当前线程状态是否为活跃状态:
- 是:通过cas机制,以FIFO的模式取出栈底/队头的任务,如果还剩余任务则新建或唤醒一条新的线程继续处理,然后返回获取到的任务
- 否:先将ctl中记录的失活线程唤醒,随机计算一个新的位置,跳出本次循环,继续下次循环
- 不存在:更新scanState并随机计算一个新的位置,跳出本次循环,继续下次循环
- 如果存在任务但是没有获取到任务,代表没有有其他线程抢了任务,checkSum+1,随机计算一个新的位置,跳出本次循环,继续下次循环
- 存在:判断当前线程状态是否为活跃状态:
- 为空:代表该位置不存在队列,找到下一个位置,依次类推…
- 不为空:判断队列中是否存在任务:
- 如果队列为空时会找到下一个位置,然后接着重复第4步
- 如果遍历完所有队列还是没有获取到任务,并且扫描期间也没有新的任务提交到线程池时,先判断工作线程的活跃状态:
- 失活(不活跃)状态:直接退出循环回到runWorker()方法自旋或挂起阻塞
- 活跃状态:进行灭活操作,利用cas机制减去ctl中一个活跃线程数,同时将当前线程的scanState值记录到ctl的低32位做为栈顶,使用stackPred保存上一条失活线程的scanState值,从而形成一个阻塞栈
- 如果灭活操作失败,则代表ctl发生了改变,代表有新任务提交进了线程池,则取消灭活操作
- 线程如果是处于活跃状态,在扫描一圈没有获取到任务之后,会再重新扫描一次所有队列,在第二遍扫描中线程是有机会被重新“复活(唤醒)”的
- 当线程第二圈扫描后,依旧未获取到任务,那么当前线程会退出循环,返回空
- 扫描完毕后回到runWorker()方法,在该方法中会判断扫描结果是否为空:
- 非空:调用runTask()执行扫描获取到的任务
- 为空:调用awaitWork()自旋或挂起阻塞线程,直至有新任务提交后唤醒
- 如果获取任务成功,在执行过程中出现异常则报告异常信息并注销线程
在扫描的实现中,其实也是包含了任务窃取的实现的,因为在扫描的过程中是不会区分偶数队列和奇数队列,而且将所有队列都进行扫描,只要有任务就获取执行,而获取任务的方式是通过FIFO方式进行的,代表着共享队列中的任务获取以及工作窃取是通过获取队列头部/栈底的元素实现。
而线程在执行自己工作队列中的任务时,是通过LIFO的模式进行的,是从队列尾部/栈顶获取任务执行
这样做的好处是可以避免工作窃取和本地执行时的CAS竞争。
6:任务取消实现原理
任务取消的cancel方法是实现于Future接口的,逻辑比较简单
// ForkJoinTask类 → cancel()方法
public boolean cancel(boolean mayInterruptIfRunning) {
// 尝试将任务状态修改为CANCELLED,成功返回true,失败返回false
return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
}
// ForkJoinTask类 → setCompletion()方法
private int setCompletion(int completion) {
// 开启自旋(死循环)
for (int s;;) {
// 如果任务已经完成,则直接返回执行后的状态
if ((s = status) < 0)
return s;
// 如果还未完成则尝试通过cas机制修改状态为入参:completion状态
if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
if ((s >>> 16) != 0)
synchronized (this) { notifyAll(); }
return completion;
}
}
}
任务取消只能发生在任务还未被执行的情况下,如果任务已经完成则会直接返回执行状态。
如果任务还未执行,则会尝试使用自旋+CAS机制修改任务状态为CANCELLED状态,成功则代表任务取消成功。
7:线程池的关闭实现
一般在正常关闭线程池时,都会通过shundown方法来停止线程池
// ForkJoinPool类 → shutdown()方法
public void shutdown() {
// 检查权限
checkPermission();
// 关闭线程池
tryTerminate(false, true);
}
// ForkJoinPool类 → checkPermission()方法
private static void checkPermission() {
// 获取权限管理器
SecurityManager security = System.getSecurityManager();
// 检测当前线程是否具备关闭线程池的权限
if (security != null)
security.checkPermission(modifyThreadPermission);
}
// ForkJoinPool类 → tryTerminate()方法
private boolean tryTerminate(boolean now, boolean enable) {
int rs;
// 如果是common公开池,不能关闭,common的关闭和Java程序绑定
if (this == common) // cannot shut down
return false;
// 如果线程池还在运行,那么检测enable是否为true,如果是false则退出
if ((rs = runState) >= 0) {
if (!enable)
return false;
rs = lockRunState(); // enter SHUTDOWN phase
// 如果线程池是要关闭,首先把运行状态改为 SHUTDOWN 标记
unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN);
}
// 如果线程池还不是stop停止状态(rs&stop==1表示处于stop状态)
if ((rs & STOP) == 0) {
// 如果now入参为false会进入如下逻辑
if (!now) { // check quiescence
// 遍历整个工作队列数组
for (long oldSum = 0L;;) { // repeat until stable
WorkQueue[] ws; WorkQueue w; int m, b; long c;
// 以目前的ctl值作为初始效验和
long checkSum = ctl;
// 检测池内活跃线程数,如果>0则不能直接置为stop状态
if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0)
return false; // still active workers
// 如果工作队列全部被注销了则可以设置为stop状态
if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
break; // check queues
// 开启循环
for (int i = 0; i <= m; ++i) {
// 循环每个工作队列
if ((w = ws[i]) != null) {
// 如果队列中还存在任务,且当前队列处于活跃状态
if ((b = w.base) != w.top || w.scanState >= 0 ||
w.currentSteal != null) {
// 唤醒空闲的线程帮助执行还未处理的任务
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
return false; // arrange for recheck
}
// 以栈底作为校验和
checkSum += b;
// 将偶数位队列中的任务全部取消(外部提交的任务)
if ((i & 1) == 0)
w.qlock = -1; // try to disable external
}
}
// 循环数组两次后,效验和都一致,代表任务都空了,
// 同时也没有新的线程被创建出来,那么可以设置stop状态了
if (oldSum == (oldSum = checkSum))
break;
}
}
// 如果线程池还未stop,那么则设置为stop状态
if ((runState & STOP) == 0) {
rs = lockRunState(); // enter STOP phase
unlockRunState(rs, (rs & ~RSLOCK) | STOP);
}
}
int pass = 0; // 3 passes to help terminate
for (long oldSum = 0L;;) { // or until done or stable
WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt; int m;
long checkSum = ctl;
// 所有队列全部已经空了或所有线程都注销了
if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 ||
(ws = workQueues) == null || (m = ws.length - 1) <= 0) {
// 如果线程池是还不是TERMINATED状态
if ((runState & TERMINATED) == 0) {
rs = lockRunState(); // done
// 先将线程池状态改为TERMINATED状态
unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED);
synchronized (this) { notifyAll(); } // for awaitTermination
}
break;
}
// 开启循环
for (int i = 0; i <= m; ++i) {
// 处理每个队列
if ((w = ws[i]) != null) {
checkSum += w.base;
w.qlock = -1; // try to disable
if (pass > 0) {
// 取消每个队列中的所有任务
w.cancelAll(); // clear queue
// 中断执行线程,唤醒所有被挂起的线程
if (pass > 1 && (wt = w.owner) != null) {
if (!wt.isInterrupted()) {
try { // unblock join
wt.interrupt();
} catch (Throwable ignore) {
}
}
if (w.scanState < 0)
U.unpark(wt); // wake up
}
}
}
}
// 如果两次效验和不一致,赋值上一次的效验和
if (checkSum != oldSum) { // unstable
oldSum = checkSum;
pass = 0;
}
// 线程池状态稳定了
// 所有任务被取消,执行线程被中断,挂起线程被唤醒中断了
else if (pass > 3 && pass > m) // can't further help
break;
// 如果有线程因为失活被挂起
else if (++pass > 1) { // try to dequeue
long c; int j = 0, sp; // bound attempts
// 根据ctl中记录的阻塞链唤醒所有线程
while (j++ <= m && (sp = (int)(c = ctl)) != 0)
tryRelease(c, ws[sp & m], AC_UNIT);
}
}
return true;
}
线程池关闭的实现逻辑也比较简单,首先会将线程池标记为SHUTDOWN状态,然后根据情况进行下一步处理,如果线程池中没啥活跃线程了,同时任务也不多了,将状态改为STOP状态,在STOP状态中会处理四件事:
- ①将所有活跃的队列状态改为注销状态,w.qlock=-1
- ②取消整个线程池中所有还未执行的任务
- ③唤醒所有因为失活挂起阻塞的线程
- ④尝试中断所有执行的活跃线程,唤醒scanState<0的线程,确保一些还没来得及挂起的线程也能被中断
最后当所有线程都被中断了,并且未执行的任务都被取消了,那么会把状态改为TERMINATED状态,线程池关闭完成。
三:总结
ForkJoin分支合并框架几乎是整个JUC包源码中最难的部分,因为整个框架比较庞大,分析起来也比较复杂
ForkJoin分支合并框架思想非常优秀,完全的落地了分治以及工作窃取思想,整个框架中的各个成员各司其职却有配合紧密,内部采用了一个队列数组以奇/偶位存储内外任务,双端队列的方式实现工作与窃取思想。其内部实现涉及了很多的位运算知识,所以看源码实现会有些吃劲
最后的总结:
- 创建池ForkJoinPool,初始化并行数=cpu逻辑核心数,池中没有队列,没有线程
- 外部向线程池提交一个任务:pool.submit(task)
- 初始化队列数组,容量:2 * Max { 并行数, 2 ^ n }
- 创建一个共享队列,容量为2^13,随机放在队列数组的某一个偶数索引位
- 外部提交的任务存入这个共享队列,位置值为2^12处
- 再创建一条线程,并为其分配一个队列,容量为2^13,随机放在数组中某个奇数索引位
- 线程启动执行
- 随机一个位置,线程从此位置开始遍历所有队列,最终扫描到前面提交的任务,并将其从所在的队列取出
- 线程执行处理任务,首先拆分出两个子任务
- 如果用invokeAll提交,一个子任务执行,另一个压入队列
- 如果用fork提交,则两个都压入工作队列
- 提交的子任务触发创建新的线程并分配新的工作队列,同样放在奇数位置
- 提交的子任务可能仍然被当前线程执行,但也有可能被其它线程窃取
- 线程在子任务处join合并,join期间会帮助窃取者处理任务,窃取它的任务执行
- 优先偷窃取者队列栈底的任务
- 如果窃取者队列为空,则会根据窃取链去找窃取者的窃取者偷任务…
- 如果整个池内都没任务了,则进入阻塞,阻塞前会根据情况补偿活跃线程
- 提交的子任务不管被哪条线程执行,仍可能会重复上述拆分/提交/窃取/阻塞步骤
- 当任务被拆分的足够细,达到了拆分阈值时,才会真正的开始执行这些子任务
- 处理完成会和拆分任务时一样,递归一层一层返回结果
- 直至最终所有子任务全部都执行结束,从而合并所有子结果,得到最终结果
- 如果外部没有再提交任务,所有线程扫描不到会被灭活,会进入失活(inactive)状态
- 一直没有任务时,线程池会削减线程数,直至最终所有线程销毁,所有奇数索引位的队列被注销,ForkJoinPool中只剩下一个最初创建的在偶数索引位的队列,以便于再次接受外部提交的任务,然后再从头开始重复所有步骤…