多线程-CompletableFuture
简介
CompletableFuture:异步任务编排工具。java 8中引入的一个类,位于juc包下,是Future的增强版。它可以让用户更好地构建和组合异步任务,避免回调地狱。
在CompletableFuture中,如果用户没有指定执行异步任务时的线程池,默认使用ForkJoinPool中的公共线程池。
使用案例
简单使用
几个入门案例,学习如何使用CompletableFuture提交异步任务并行接收返回值
提交异步任务 supplyAsync
提交一个异步任务,然后阻塞地获取它的返回值
@Test
public void test1() throws ExecutionException, InterruptedException {
// 提交异步任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "[" + Thread.currentThread().getName() + "] " + "task1";
});
String s = future.get(); // 阻塞地获取异步任务的执行结果
System.out.println("s = " + s); // [ForkJoinPool.commonPool-worker-9] task1
}
阻塞地等待异步任务完成 join
join方法和get方法,都是阻塞地等待异步任务执行完成,然后获取返回值,只不过对于异常的处理不同,推荐使用get方法来获取返回值。
@Test
public void test2() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "[" + Thread.currentThread().getName() + "] " + "task2";
});
System.out.println(future.get()); // [ForkJoinPool.commonPool-worker-9] task2
}
消费异步任务的结果 thenAccept
thenAccept函数接收一个Consumer类型的实例作为参数,它会消费异步任务的执行结果,然后返回一个CompletableFuture<Void>
,实际上没有必要处理它的返回值。
@Test
public void test8() throws InterruptedException {
// 异步任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "[" + Thread.currentThread().getName() + "] " + "task2";
});
// 使用thenAccept,提供一个回调函数,消费异步任务的结果
future.join();
future.thenAccept(System.out::println); // [ForkJoinPool.commonPool-worker-9] task2
}
异步任务编排
到这里开始,开始涉及到异步任务编排,在一个异步任务之后启动另一个异步任务,或者在多个异步任务之后另一个异步任务。
具有依赖关系的异步任务,根据异步任务的依赖数量,可以把异步任务分为 零依赖、一元依赖、二元依赖和多元依赖。
一元依赖
thenApply
thenApply方法,接收一个Function作为参数,返回另一个future。一个异步任务依赖另一个异步任务
@Test
public void test3() {
// 异步任务1
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "[" + Thread.currentThread().getName() + "] "
+ "[" + (System.currentTimeMillis() / 1000) + "]" + "任务1";
});
// 异步任务2
CompletableFuture<String> future1 = future.thenApply(s -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return s + "," + "[" + (System.currentTimeMillis() / 1000) + "]" + "任务2";
});
future1.join();
// 注意结果中异步任务1的时间和异步任务2的时间,说明异步任务2是在异步任务执行完之后触发的
// [ForkJoinPool.commonPool-worker-1] [1732975152]任务1,[1732975155]任务2
future1.thenAccept(System.out::println);
}
thenAccept方法接收一个Consumer,没有返回值,thenApply接收一个Function,有返回值
thenCompose
从语义上看,thenCompose方法接收一个异步任务作为参数,thenApply方法接收一个普通任务作为参数,选择 thenApply还是 thenCompose 取决于用户的需求,如果新任务是另一个异步任务,选择thenCompose,如果新任务只是消费上一个异步任务的结果,然后返回消费结果,选择thenApply。
@Test
public void test9() {
// 异步任务1
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "[" + Thread.currentThread().getName() + "] " +
"[" + (System.currentTimeMillis() / 1000) + "]" + "任务1";
});
// 异步任务2
CompletableFuture<String> future1 = future.thenCompose(s -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return s + "\n" +
"[" + Thread.currentThread().getName() + "] " +
"[" + (System.currentTimeMillis() / 1000) + "]" + "任务2";
}));
future1.join();
//[ForkJoinPool.commonPool-worker-9] [1727663353]任务1
//[ForkJoinPool.commonPool-worker-9] [1727663356]任务2
future1.thenAccept(System.out::println);
}
二元依赖
thenCombine
融合当前异步任务(调用thenCombine的CF实例)和另一个异步任务(other)的结果,thenCombine() 会在两个任务都执行完成后,把两个任务的结果合并。两个任务是并行执行的,它们之间并没有先后依赖顺序。
@Test
public void test9() {
System.out.println("[" + Thread.currentThread().getName() + "] " +
"[" + System.currentTimeMillis() / 1000 + "]" + "任务0");
// 异步任务1
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "[" + Thread.currentThread().getName() + "] " +
"[" + System.currentTimeMillis() / 1000 + "]" + "任务1";
});
// 异步任务2
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "[" + Thread.currentThread().getName() + "] " +
"[" + System.currentTimeMillis() / 1000 + "]" + "任务2";
});
// 融合异步任务1、2的结果,生成任务3
CompletableFuture<String> future3 = future.thenCombine(future2,
(s1, s2) -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return s1 + "\n" + s2 + "\n" +
"[" + Thread.currentThread().getName() + "] " +
"[" + System.currentTimeMillis() / 1000 + "]" + "任务3";
});
future3.join();
// 任务1暂停5秒、任务2暂停2秒、任务3暂停3秒,任务3依赖任务1和任务2,可以看到,
// 任务1和任务2的执行是互不影响的,等到任务1、任务2都执行完成,任务3才执行
// [main] [1734753154]任务0
// [ForkJoinPool.commonPool-worker-1] [1734753159]任务1
// [ForkJoinPool.commonPool-worker-2] [1734753156]任务2
// [ForkJoinPool.commonPool-worker-1] [1734753162]任务3
future3.thenAccept(System.out::println);
}
多元依赖
allOf
allOf方法:多个异步任务都完成,才执行下一个异步任务
// allOf 方法接受一个 CompletableFuture 的可变参数列表,并返回一个新的 CompletableFuture,
// 当所有给定的 CompletableFuture 都完成时,新的 CompletableFuture 才会完成。这个方法通常
// 用于等待多个异步任务都完成。
@Test
public void test5() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "[" + Thread.currentThread().getName() + "] " +
"[" + System.currentTimeMillis() / 1000 + "]" + "任务1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "[" + Thread.currentThread().getName() + "] " +
"[" + System.currentTimeMillis() / 1000 + "]" + "任务2";
});
// allOf方法,创建一个新的future,它会等到前面两个异步任务完成后再执行
CompletableFuture<Void> future3 = CompletableFuture.allOf(future, future2);
CompletableFuture<String> future4 = future3.thenApply(v -> {
// 在allOf方法返回的异步任务中,它需要调用join方法,来获取之前异步任务的结果
String s1 = future.join();
String s2 = future2.join();
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return s1 + "\n" + s2 + "\n" +
"[" + Thread.currentThread().getName() + "] " +
"[" + System.currentTimeMillis() / 1000 + "]" + "任务3";
});
future4.join(); // 阻塞当前线程
future4.thenAccept(System.out::println);
}
anyOf
多个异步任务中只要有一个完成,就执行下一个异步任务
// anyOf方法
@Test
public void test6() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "[" + Thread.currentThread().getName() + "] "
+ "[" + System.currentTimeMillis() / 1000 + "]" + "任务1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "[" + Thread.currentThread().getName() + "] "
+ "[" + System.currentTimeMillis() / 1000 + "]" + "任务2";
});
CompletableFuture<Object> future3 = CompletableFuture.anyOf(future, future2);
future3.join();
// 任务2比任务1先结束,所以这里只获取到了任务2的结果
// [ForkJoinPool.commonPool-worker-2] [1727662633]任务2
future3.thenAccept(System.out::println);
}
异常处理
handle
handle 方法是一个双目方法,它接受一个 BiFunction 参数,该函数有两个参数:一个是异步任务的结果,另一个是异常对象。无论 CompletableFuture 正常完成还是异常完成,都会调用 handle 方法。
正常完成:
@Test
public void test7() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "[" + Thread.currentThread().getName() + "] " +
"[" + System.currentTimeMillis() / 1000 + "]" + "任务1";
});
CompletableFuture<String> future2 = future.handle((s, e) -> {
if (e != null) {
return "Error: " + e.getMessage();
}
return s + " handle";
});
future2.join();
// [ForkJoinPool.commonPool-worker-9] [1727662978]任务1 handle
future2.thenAccept(System.out::println);
}
exceptionally
exceptionally 方法是一个单目方法,它接受一个 Function 参数,该函数只处理异常情况。当 CompletableFuture异常完成时,exceptionally 方法会被调用。
@Test
public void test8() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
int i = 10 / 0;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return null;
});
CompletableFuture<String> future2 = future.exceptionally(e -> "Error: " + e.getMessage());
future2.join();
// Error: java.lang.ArithmeticException: / by zero
future2.thenAccept(System.out::println);
}
API总结:
-
零依赖:supplyAsync,直接提交
-
一元依赖:thenApply、thenAccept、thenCompose
-
二元依赖:thenCombine
-
多元依赖:allOf、anyOf
使用时的注意事项
1、指定线程池:使用案例中的代码都没有指定线程池,但是实际使用过程中,最好指定线程池。
2、带有Async后缀的方法:CompletableFuture中提供的API,带有Async后缀的方法和普通方法,例如,thenApply和thenApplyAsync,thenApply使用和上一个任务一样的线程来执行异步任务,thenApplyAsync则使用新线程来执行异步任务,推荐使用带有Async后缀的方法。
源码分析
CompletableFuture使用一个栈来存储当前异步任务之后要触发的任务,栈使用的数据结构是单向链表。调用thenApply等方法编排异步任务时,实际上是向上一个任务的stack属性中注入当前任务,上一个任务结束后,会获取stack属性中的任务,然后继续执行,依次类推,直到stack属性为空
整体结构
CompletableFuture的继承体系
它实现了Future接口和CompletionStage接口:
- Future代表异步计算的结果,只能通过阻塞或者轮询的方式获取结果,而且不支持设置回调方法
- CompletionStage代表异步计算的阶段,定义了在异步任务完成之后要触发的操作,这个操作可以是一个普通任务,也可以是一个异步任务
Future:
public interface Future<V> {
// 取消异步任务
boolean cancel(boolean mayInterruptIfRunning);
// 判断异步任务是否取消
boolean isCancelled();
// 判断异步任务是否结束
boolean isDone();
// 获取异步任务的结果
V get() throws InterruptedException, ExecutionException;
// 获取异步任务的结果,并且指定超时时间
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
CompletionStage:代表异步计算的一个阶段,定义了在一个异步任务完成之后执行的操作,之前在入门案例中演示的thenApply方法、thenCompose方法,就是定义在这个接口中
public interface CompletionStage<T> {
/* 一元依赖的异步任务 */
// 创建一个新的异步任务,使用Function接口来消费异步任务的结果
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
// 创建一个新的异步任务,使用Consumer接口来消费异步任务的结果
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
// 创建一个新的异步任务
public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
// ...
}
CompletionFuture中的属性
CompletionFuture中只有两个属性:result、stack,result存储当前异步任务的结果,stack存储下一个要被触发的异步任务,stack是一个单向链表
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
/* 下面两个成员变量都是通过cas算法来操作的,所有的方法都是在操作这两个变量,通过这两个变量实现了异步任务编排 */
volatile Object result; // 存储当前异步任务的结果
// 存储当前异步任务结束后需要触发的后续操作,它是一个单向链表,
// 相当于观察者模式中的观察者,当前的CompletableFuture实例就是被观察者(主题)
volatile Completion stack;
// 静态内部类,要执行的操作
abstract static class Completion extends ForkJoinTask<Void>
// AsynchronousCompletionTask,没有方法的接口,用于debug时标识一个异步任务
implements Runnable, AsynchronousCompletionTask {
volatile Completion next; // 单向链表,指向下一个异步任务
// 尝试触发异步任务
abstract CompletableFuture<?> tryFire(int mode);
}
}
stack属性的数据结构 Completion
Completion是CompletableFuture的静态内部类,可以把它理解为是异步任务要执行的操作,它存储了:
-
操作,也就是用户要异步执行的任务
-
操作对应的CompletableFuture实例
-
操作依赖的CompletableFuture实例
-
操作完成后需要触发的CompletableFuture实例
Completion比较复杂的地方在于它有多个子类,每个子类都有不同的功能,在前面使用案例中演示的每个API,它的内部都使用了不同的子类,代表不同的操作,所以这里介绍一下Completion和它的继承体系
Completion:顶层类,只定义了指向下一个异步任务的属性 next
// Completion,静态内部类,在继承体系中位于顶层
abstract static class Completion extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
volatile Completion next; // 下一个异步任务
}
// 一个没有方法的接口,用于标识某个被async方法创建的实例是异步任务,用于debug、监控
public static interface AsynchronousCompletionTask { }
有一个依赖的异步任务:UniCompletion、UniApply,UniCompletion是基类,存储了单个依赖的共有属性,UniApply是具体实现,是thenApply方法对应的内部类。
abstract static class UniCompletion<T,V> extends Completion {
Executor executor; // 线程池
// 当前异步任务对应的CompletableFuture实例
CompletableFuture<V> dep;
// 当前任务的依赖。具体而言,就是只有在src对应的异步任务执行完成后,才可以执行dep对应的异步任务,也就是当前任务,
// 执行异步任务的逻辑中会判断前一个异步任务是否完成,并且获取它的结果。
CompletableFuture<T> src;
UniCompletion(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src) {
this.executor = executor; this.dep = dep; this.src = src;
}
// 确保异步任务只执行一次,它会通过cas算法修改状态变量,
final boolean claim() {
Executor e = executor;
if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { // 这个方法继承自ForkJoinTask
if (e == null)
return true;
executor = null; // disable
e.execute(this);
}
return false;
}
// 判断当前异步任务是否还存活:dep不为null,就是还存活
final boolean isLive() { return dep != null; }
}
// UniApply,UniCompletion的子类,定义了异步任务的执行逻辑,thenApply方法对应的内部类,
static final class UniApply<T,V> extends UniCompletion<T,V> {
Function<? super T,? extends V> fn; // 要异步执行的任务
}
有两个依赖的异步任务:BiCompletion、BiApply,类似于上面提到的,BiCompletion是基类,存储了共有属性,BiApply是具体实现
// BiCompletion
abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> {
CompletableFuture<U> snd; // 异步任务的第二个依赖
}
// BiApply
static final class BiApply<T,U,V> extends BiCompletion<T,U,V> {
BiFunction<? super T,? super U,? extends V> fn; // 要异步执行的任务
}
AsyncSupply:不属于Completion的继承体系,存储没有依赖的单个异步任务
static final class AsyncSupply<T> extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<T> dep; // 异步任务对应的CompletableFuture实例
Supplier<T> fn; // 异步任务要执行的计算
}
总结
在介绍CompletableFuture的工作机制之前,先总结一下它的数据结构:
CompletableFuture:代表一个异步任务,它存储了这个异步任务的执行结果、执行完之后要触发的下一个异步任务,同时提供了面向用户的API
- 它的属性 Object result、Completion stack
Completion:存储了一个异步任务执行过程中需要用到的全部信息,包括它的依赖、它要触发的下一个异步任务、异步任务要执行的计算、异步任务对应的CompletableFuture实例
-
它的属性:
-
Function function:异步任务要进行的计算
-
CompletableFuture dep:异步任务对应的CompletableFuture实例
-
CompletableFuture src:异步任务的第一个依赖
-
CompletableFuture snd:异步任务的第二个依赖,// 一元依赖的异步任务中没有这个属性
-
CompletableFuture next:要被触发的下一个异步任务
-
可以看到,CompletableFuture和Completion互相持有对方的实例。
在了解了CompletableFuture的基本结构之后,接下来的问题是:
- 两个有依赖关系的异步任务是如何被编排在一起的?
- 上一个异步任务结束之后如何触发下一个异步任务?
哪个线程执行异步任务?
先了解一下异步任务在哪个线程执行。
CompletableFuture使用一个线程池来执行异步任务。用户提交异步任务后,如果没有指定线程池,那么使用默认线程池。如果当前环境下可用cpu核心数大于1,使用的线程池是ForkJoinPool,否则每次提交任务它都会创建一个新的线程(ThreadPerTaskExecutor)
确认使用哪个线程池的源码:
- ForkJoinPool中公共线程池的并行度默认是CPU个数减1,用户也可以通过jvm参数指定
- 如果公共线程池的并行度小于等于0,使用单线程线程池
// 判断使用哪个线程池
private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1);
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
/* 一、ForkJoinPool中判断当前公共线程池的并行度: */
// 1. 首先从环境变量中获取一个指定的属性 parallelism
String pp = System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism");
int parallelism = -1;
if (pp != null) {
parallelism = Integer.parseInt(pp);
}
// 2. 如果属性为空,获取当前环境下的CPU数个数,如果小于等于1个,那么公共线程池的并行度就是1,否则就是CPU数减1
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) {
parallelism = 1;
}
// 二、单线程线程池,它的实现特别简单,为每个任务创建一个新的线程
static final class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) { new Thread(r).start(); }
}
工作机制
单个异步任务的执行
提交异步任务 supplyAsync方法
整体流程:
// 整体流程:这里省略了前面的方法调用
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
// 第一步:创建CompletableFuture实例,向线程池提交任务
CompletableFuture<U> d = new CompletableFuture<U>();
// 第二步:线程池执行异步任务,异步任务编排就实现在AsyncSupply中,这里专门针对supplyAsync的内部类
e.execute(new AsyncSupply<U>(d, f));
// 第三步:返回CompletableFuture实例
return d;
}
整体流程中的第二步:执行异步任务
// 要点1:AsyncSupply,用户调用supplyAsync方法时执行的内部类,异步任务编排的核心流程
static final class AsyncSupply<T> extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<T> dep; // 当前CompletableFuture的实例
Supplier<T> fn; // 需要在线程池中执行的任务
// 参数1,当前CompletableFuture的实例,用于存储异步任务的返回值并且触发下一个异步任务
// 参数2,需要在线程池中执行的任务
AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
this.dep = dep; this.fn = fn;
}
// 线程启动后会执行当前实例的run方法,因为它继承了Runnable接口
public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
// 这里做了两个事情:
// 1. 执行异步任务,也就是Supplier接口中提供的函数
// 2. 将异步任务的返回值设置到当前CompletableFuture实例中
d.completeValue(f.get());
} catch (Throwable ex) {
// 如果出现异常,设置异常返回值
d.completeThrowable(ex);
}
}
// 触发下一个异步任务
d.postComplete();
}
}
}
可以看到,AsyncSupply类继承了Runnable接口,持有当前CompletableFuture实例和要异步执行的任务,在run方法中,执行任务,然后调用CompletableFuture实例中的completeValue方法,设置返回值。
返回值如何被设置的?通过cas操作,将任务的结果设置到当前CompletableFuture实例的result属性中,如果任务正常结束,正常设置结果,如果任务抛出异常,把异常封装到一个Exception中
// 这是上一步中调用的
// 分支一:设置返回值
final boolean completeValue(T t) {
// 通过cas操作来更新当前CompletableFuture实例中的result
return UNSAFE.compareAndSwapObject(this, RESULT, null,
(t == null) ? NIL : t);
}
// 分支二:如果出现异常,设置异常返回值
final boolean completeThrowable(Throwable x) {
// 也是通过cas操作,更新当前CompletableFuture实例中的result,只不过返回值是被包装到了AltResult中
return UNSAFE.compareAndSwapObject(this, RESULT, null,
encodeThrowable(x));
}
static AltResult encodeThrowable(Throwable x) {
return new AltResult((x instanceof CompletionException) ? x :
new CompletionException(x));
}
获取异步任务的返回值 get
如何获取异步任务的返回值?调用get方法或join方法,它们都会获取返回值。
整体流程:
// get方法
public T get() throws InterruptedException, ExecutionException {
Object r;
// 如果当前result实例等于null,调用waitingGet方法,阻塞地获取返回值,否则调用reportGet方法,
// 上报返回值
return reportGet((r = result) == null ? waitingGet(true) : r);
}
// waitingGet方法:阻塞地获取返回值,将返回值写到result变量中
private Object waitingGet(boolean interruptible) {
// 信号器,负责线程的阻塞和唤醒
Signaller q = null;
// 当前线程是否入队
boolean queued = false;
// 旋转次数
int spins = -1;
Object r;
// 判断:result实例是否等于null,如果等于null,需要阻塞地获取返回值
while ((r = result) == null) {
// 先自旋:spins,它是自旋,自旋次数是CPU个数的8次方以上,因为自旋次数减减的
// 之前,会生成一个随机数,生成的随机数大于等于0,自旋次数才会减1。如果自旋结束之后
// 还没有得到结果,会进入阻塞队列。
if (spins < 0)
spins = SPINS; // SPINS = (Runtime.getRuntime().availableProcessors() > 1 ? 1 << 8 : 0) 256次
else if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
else if (q == null)
// 创建一个信号器,用于阻塞和唤醒线程
q = new Signaller(interruptible, 0L, 0L);
else if (!queued)
// 如果信号器还没有入队,将信号器设置到当前CompletableFuture实例的stack属性中
queued = tryPushStack(q);
else if (interruptible && q.interruptControl < 0) {
// 如果当前线程被打断,放弃阻塞,清理stack变量,返回null
q.thread = null;
cleanStack();
return null;
}
// 线程进入阻塞,底层调用LockSupport的park方法,等待异步任务执行完成之后唤醒当前线程
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
// 如果信号器不等于null
if (q != null) {
q.thread = null;
if (q.interruptControl < 0) {
if (interruptible)
r = null; // report interruption
else
Thread.currentThread().interrupt();
}
}
// 触发后续任务
postComplete();
// 返回异步任务的结果,result的变量是由其它线程设置的,当前线程只需要阻塞地获取它。
return r;
}
1、信号器入栈的方法:
// 将信号器设置到CompletableFuture的stack属性中
final boolean tryPushStack(Completion c) { // c是信号器实例
Completion h = stack; // stack,当前CompletableFuture实例的stack属性
/* lazySetNext方法加下面的cas操作,相当于将信号器入栈,信号器链接到头结点并且成为新的头结点 */
lazySetNext(c, h);
return UNSAFE.compareAndSwapObject(this, STACK, h, c);
}
static void lazySetNext(Completion c, Completion next) {
UNSAFE.putOrderedObject(c, NEXT, next); // Completion的next属性指向下一个Completion,它们之间构成单向链表
}
2、阻塞当前线程的方法:
public static void managedBlock(ManagedBlocker blocker) throws InterruptedException {
ForkJoinPool p;
ForkJoinWorkerThread wt;
Thread t = Thread.currentThread();
if ((t instanceof ForkJoinWorkerThread) &&
// 省略代码
}
else {
// 阻塞当前线程。isReleasebale判断阻塞是否被释放,如果没有,继续进入阻塞状态
do {} while (!blocker.isReleasable() &&
!blocker.block());
}
}
// 阻塞当前线程,线程唤醒后,判断阻塞是否可以释放
public boolean block() {
if (isReleasable())
return true;
else if (deadline == 0L)
LockSupport.park(this);
else if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
return isReleasable();
}
// 阻塞是否可以释放:信号器中的线程实例为null、或者线程被打断、或者超过指定时间,都算阻塞结束
public boolean isReleasable() {
// 信号器中的线程实例为null
if (thread == null)
return true;
// 当前线程被打断
if (Thread.interrupted()) {
int i = interruptControl;
interruptControl = -1;
if (i > 0)
return true;
}
// 过了超时时间
if (deadline != 0L &&
(nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
thread = null;
return true;
}
return false;
}
3、上报异步任务的返回值:
// reportGet方法,处理异步任务的返回值
private static <T> T reportGet(Object r)
throws InterruptedException, ExecutionException {
// 如果返回值为空
if (r == null) // by convention below, null means interrupted
throw new InterruptedException();
// 如果返回一个异常实例
if (r instanceof AltResult) {
Throwable x, cause;
if ((x = ((AltResult)r).ex) == null)
return null;
if (x instanceof CancellationException)
throw (CancellationException)x;
if ((x instanceof CompletionException) &&
(cause = x.getCause()) != null)
x = cause;
throw new ExecutionException(x);
}
// 如果返回一个正常结果,将结果进行泛型转换,返回给调用者
@SuppressWarnings("unchecked") T t = (T) r;
return t;
}
获取返回值的时候,get方法中,会先判断有没有返回值,如果没有,自旋,自旋次数是CPU个数的8次方,如果还没有返回值,进入阻塞队列。
4、线程阻塞后在什么地方被唤醒?在postComplete方法中
// 参考异步任务的执行过程,执行完之后,会调用CompletableFuture的postComplete方法,
// 触发STACK属性中存储的下一个任务
final void postComplete() {
CompletableFuture<?> f = this; Completion h;
while ((h = f.stack) != null || // 判断stack属性中是否有下一个任务
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
if (f.casStack(h, t = h.next)) { // 下一个任务出栈,现在STACK属性中存储下下一个任务
if (t != null) { // 下下一个任务不为null
if (f != this) {
pushStack(h);
continue;
}
h.next = null; // detach
}
f = (d = h.tryFire(NESTED)) == null ? this : d; // 触发下一个任务的执行
}
}
}
// 下一个任务出栈的方法
final boolean casStack(Completion cmp, Completion val) {
return UNSAFE.compareAndSwapObject(this, STACK, cmp, val);
}
// 下一个任务的执行:在这里下一个任务是信号器 Signaller
final CompletableFuture<?> tryFire(int ignore) {
Thread w; // no need to atomically claim
if ((w = thread) != null) {
thread = null;
LockSupport.unpark(w); // 它会唤醒信号器中存储的线程
}
return null;
}
一元依赖
这里以thenApply为例。
整体流程:
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
// 第一步:执行异步任务,如果当前任务可以直接执行的话,就直接执行
if (e != null || !d.uniApply(this, f, null)) { // 参数this是上一个异步任务
// 第二步:当前任务无法直接执行,创建UniApply实例
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
// 第三步:将当前实例推入栈中,表示上一个异步任务执行完成后触发
push(c); // this是上一个异步任务,所以push方法是由上一个异步任务的实例调用的
// 尝试触发异步任务
c.tryFire(SYNC);
}
return d;
}
1、异步任务的执行逻辑
// uniApply方法:执行异步任务,参数a是上一个异步任务,需要判断上一个异步任务有没有执行完成并且获取它的执行结果
final <S> boolean uniApply(CompletableFuture<S> a,
Function<? super S,? extends T> f,
UniApply<S,T> c) { // 参数c是当前异步任务实例
Object r; Throwable x;
// 如果上一个异步任务没有结果,返回false
if (a == null || (r = a.result) == null || f == null)
return false;
// 1、判断当前异步任务是否可以执行:如果当前异步任务没有结果
tryComplete: if (result == null) {
// 如果上一个异步任务是异常结束的
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
completeThrowable(x, r);
break tryComplete; // 退出循环,不执行当前异步任务
}
r = null;
}
try {
// 2、确保异步任务只会被执行一次
if (c != null && !c.claim())
return false;
// 3、执行异步任务,并且上报结果
@SuppressWarnings("unchecked") S s = (S) r;
completeValue(f.apply(s));
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
2、如果异步任务不可以直接执行,需要把异步任务push到上一个异步任务的STACK属性中
// 将新的异步任务推入栈中,参数c是当前异步任务
final void push(UniCompletion<?,?> c) {
if (c != null) {
// 如果当前异步任务没有结束,将异步任务推入栈中,这里是推入栈顶
while (result == null && !tryPushStack(c))
// 失败时清除,将新异步任务的next值设为null
lazySetNext(c, null); // clear on failure
}
}
3、再次尝试触发异步任务
// 尝试触发异步任务
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d; CompletableFuture<T> a;
if ((d = dep) == null ||
// 执行异步任务
!d.uniApply(a = src, fn, mode > 0 ? null : this))
return null;
// 执行成功后,属性置空
dep = null; src = null; fn = null;
// 如果当前异步任务执行成功,触发后续任务
return d.postFire(a, mode);
}
4、触发下一个异步任务
// 触发下一个异步任务,这里会触发前一个异步任务的后序任务和当前异步任务的后序任务
final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
// 会判断前一个异步任务和当前异步任务有没有下一个任务。
// a是前一个任务
if (a != null && a.stack != null) {
if (a.result == null)
a.cleanStack();
else if (mode >= 0)
a.postComplete();
}
// this是当前任务
if (result != null && stack != null) {
if (mode < 0)
return this;
else
postComplete();
}
return null;
}
// postComplete
final void postComplete() {
CompletableFuture<?> f = this; Completion h;
// 如果当前异步任务有下一个任务
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
// 使用下下一个异步任务,代替下一个任务,赋值给stack变量
if (f.casStack(h, t = h.next)) {
if (t != null) {
if (f != this) {
pushStack(h);
continue;
}
h.next = null; // detach
}
// 执行下一个异步任务,然后接收它的返回值
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
如何确保异步任务只执行一次
claim方法
final boolean claim() {
Executor e = executor;
// 这个方法来自ForkJoinTask,用cas算法来操作任务实例中的status字段,保证任务只会被执行一次
if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
// 如果当前异步任务没有可用的线程池,返回true,由当前线程执行
if (e == null)
return true;
// 把异步任务提交给线程池执行
executor = null; // disable
e.execute(this);
}
return false;
}
二元依赖
这里以thenCombine方法为例
整体流程:
private <U,V> CompletableFuture<V> biApplyStage(
Executor e, CompletionStage<U> o,
BiFunction<? super T,? super U,? extends V> f) {
CompletableFuture<U> b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
// biApply方法中的参数this、b,是当前异步任务依赖的前两个异步任务
if (e != null || !d.biApply(this, b, f, null)) {
BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);
bipush(b, c); // this和b是依赖,c是要被触发的异步任务
c.tryFire(SYNC);
}
return d;
}
执行逻辑:
final <R,S> boolean biApply(CompletableFuture<R> a,
CompletableFuture<S> b,
BiFunction<? super R,? super S,? extends T> f,
BiApply<R,S,T> c) {
Object r, s; Throwable x;
// 关键逻辑,判断当前异步任务所依赖的两个异步任务是否完成,如果没有,退出
if (a == null || (r = a.result) == null ||
b == null || (s = b.result) == null || f == null)
return false;
tryComplete: if (result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
if (s instanceof AltResult) {
if ((x = ((AltResult)s).ex) != null) {
completeThrowable(x, s);
break tryComplete;
}
s = null;
}
try {
if (c != null && !c.claim())
return false;
@SuppressWarnings("unchecked") R rr = (R) r;
@SuppressWarnings("unchecked") S ss = (S) s;
completeValue(f.apply(rr, ss));
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
总结
CompletableFuture存储了计算逻辑,Completion存储了计算过程中需要用到的数据。
两个有依赖关系的异步任务是如何被编排在一起的?上一个异步任务结束之后如何触发下一个异步任务?通过CompletableFuture的stack属性,当前异步任务执行完之后,会获取stack属性中的值,这个值就是下一个需要计算的异步任务
参考:
https://tech.meituan.com/2022/05/12/principles-and-practices-of-completablefuture.html
https://www.cnblogs.com/Createsequence/p/16963895.html
https://javaguide.cn/java/concurrent/completablefuture-intro.html