CompletableFuture-详解使用及源码解析
背景
上一篇文章我们看了FutureTask,分析了他的问题,异步编程并不方便。
- 问题1: FutureTask获取执行结果前,主线程需要通过get()方法一直阻塞等待子线程执行完成call方法,才可以拿到返回结果
- 问题2:如果不通过get挂起线程,通过while循环,不停的判断任务的状态是否结束,结束后,再拿结果。如果任务长时间没有执行完毕,CPU会一直调度查看任务状态的方法,浪费CPU资源。
CompletableFuture在一定程度上提高了各种异步非阻塞的方案,并且响应式变成,代码编写效果上,效率更高。
1、相关API
1.1 等在前置结果再执行当前任务的API
-
supplyAsync(Supplier supplier) 异步执行任务,有返回结果
-
runAsync(Runnable runnable) 异步执行任务,无返回结果
在不指定线程池的前提下,两个异步任务都交给ForkJoinPool去执行,而ForkJoinPool内部是守护线程,守护线程是主线程结束后就结束了。
-
thenApply(Function fn) 等待前一个任务结束,拿到前一个方法的结果并处理然后返回结果。使用和前一个任务想同的线程
thenApplyAsync(Function fn) 和上面的一样,但是使用不同的线程执行
thenApplyAsync(Function fn, Executor executor) 和上面一样,这里需要自定义线程池
CompletableFuture中的大部分方法都有三个重载,(不带Async、带Async、带Async和线程池)
-
thenAccept(Consumer action) 等待前一个任务结束,拿到前一个方法的结果并处理,没有返回值
-
thenRun(Runnable action) 等待前一个任务结束,再处理,不需要前面的结果,没有返回值
1.2 等待多个并行任务完成后,并拿到结果再执行当前任务API
-
thenCombine(CompletionStage other, BiFunction fn) 等待前两个并行任务完成后,拿到结果再执行当前任务,并返回结果
thenCombineAsync(CompletionStage other, BiFunction fn) 等待前两个并行任务完成后,拿到结果,并在新线程执行当前任务,并返回结果
thenCombineAsync(CompletionStage other, BiFunction fn, Executor executor) 等待前两个并行任务完成后,拿到结果,并在新线程执行当前任务(自定义线程池),并返回结果
-
thenAcceptBoth(CompletionStage other, BiConsumer action) 等待前两个并行任务结束,拿到结果并处理,没有返回值
thenAcceptBothAsync(CompletionStage other, BiConsumer action)
thenAcceptBothAsync(CompletionStage other, BiConsumer action, Executor executor)
-
runAfterBoth(CompletionStage other, Runnable action) 让两个并行任务结束,再执行,不需要前面的结果,没有返回值
runAfterBothAsync
1.3 两个任务一起执行,有一个任务返回结果后,拿到结果就可以处理API
-
applyToEither(CompletionStage other, Function fn) 等待前两个并行任务执行,任意一个有返回结果,拿到结果执行,并返回结果
Async方法同样如上
-
acceptEither(CompletionStage other, Consumer action) 等待前两个并行任务执行,任意一个有返回结果,拿到结果执行,无返回
-
runAfterEither(CompletionStage other, Runnable action) 等待前两个并行任务执行结束,再执行,无返回
1.4 等前置任务执行完成,再处理,后续返回结果为CompletableStage
-
thenCompose(Function> fn) 等待前置任务执行完成,拿到结果并执行,返回CompletableStage
thenApply(Function fn) 用Apply执行就够了,等同于Apply
1.5 异常和其他处理
- exceptionally(Function fn) 异常处理
- whenComplete(BiConsumer action) 可以拿到上一个任务的返回结果和异常,当前处理不会返回结果
- handle(BiFunction fn) 可以拿到上一个任务的返回结果和异常,同时当前处理可以返回结果
1.6 总结
上面的是常用API,CompletableFuture这么多API很难记,但是有规律可循:
- Async结尾的是异步执行的API,通常有带线程池和不带线程池的版本
- run开头的是无参方法,没有返回值
- supply开头的是有参方法,有返回值
- 以Accept开头或者结尾的方法,有参数,没有返回值
- 以Apply开头或者结尾的方法,有参数,有返回值
- 带有either后缀的方法,表示谁先完成就消费谁
2、如何使用?
基于上面API做应用案例
2.1 一个简单案例
小连要回家干饭, 小严做饭, 小连看电视, 等小严做完, 小连干饭。
我们把能同时执行的并发执行,必须分开的顺序执行,设计如下:
/**
* 小连要回家干饭, 小严做饭, 小连看电视, 等小严做完, 小连干饭
*
* main线程 小连
* 异步线程 小严
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
print("小连回家干饭");
CompletableFuture<String> future= CompletableFuture.supplyAsync(()->{
print("小严做饭");
sleep(2);
print("小严做完");
return "锅包肉";
});
print("小连看电视");
print("小连干饭,"+future.join());
}
输出结果:
可以看到默认线程池是ForkJoinPool
main: 小连回家干饭
main: 小连看电视
ForkJoinPool.commonPool-worker-1: 小严做饭
ForkJoinPool.commonPool-worker-1: 小严做完
main: 小连干饭,锅包肉
2.2 稍微复杂的案例
小连要回家干饭, 小严炒菜, 小李焖饭, 小连看电视, 等小严小李做完, 小陈端菜和饭给小连, 小连干饭。
我们把能同时执行的并发执行,必须分开的顺序执行,设计如下:
/**
* 小连要回家干饭, 小严炒菜, 小李焖饭, 小连看电视, 等小严小李做完, 小陈端菜和饭给小连, 小连干饭 *
* main线程 小连
* 异步线程 小严, 小李,小陈
*
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
print("小连回家干饭");
CompletableFuture<String> future= CompletableFuture.supplyAsync(()->{
print("小严做菜");
sleep(2);
print("小严做完");
return "锅包肉";
}, executor).thenCombineAsync(CompletableFuture.supplyAsync(()->{
print("小李焖饭");
sleep(3);
print("饭好了");
return "大米饭";
}, executor), (r1, r2)->{
print("饭菜好了,小李端菜");
sleep(1);
return r1+", "+r2;
}, executor);
print("小连看电视");
print("小连干饭,"+future.join());
executor.shutdown();
}
执行结果:
这里指定了线程池
main: 小连回家干饭
pool-1-thread-1: 小严做菜
pool-1-thread-2: 小李焖饭
main: 小连看电视
pool-1-thread-1: 小严做完
pool-1-thread-2: 饭好了
pool-1-thread-3: 饭菜好了,小李端菜
main: 小连干饭,锅包肉, 大米饭
3、源码分析
3.1 runAsync源码
3.1.1 从runAsync进入,来到asyncRunStage方法
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
//声明当前任务的CompletableFuture对象
//任务执行和后续任务的触发是两个操作,这里的d是为了触发后续任务的执行
CompletableFuture<Void> d = new CompletableFuture<Void>();
//将任务和CompletableFuture封装到一起,作为Async对象
//将Async交给线程池执行
e.execute(new AsyncRun(d, f));
return d;
}
3.1.2 进入new AsyncRun
封装任务和CompletableFuture,作为Async对象,将Async交给线程池执行
static final class AsyncRun extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<Void> dep; Runnable fn;
//存储CompletableFuture以及当前任务
AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
this.dep = dep; this.fn = fn;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return false; }
public void run() {
CompletableFuture<Void> d; Runnable f;
//将成员变量做临时存储
if ((d = dep) != null && (f = fn) != null) {
// help gc
dep = null; fn = null;
//当前任务是否已经有返回结果
if (d.result == null) {
try {
//线程池异步执行任务
f.run();
//当前Runnable是没有返回结果的,所以直接封装一个null值
d.completeNull();
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
//执行后续任务
d.postComplete();
}
}
}
3.1.3 completeXXX系列
completeXXX系列方法都差不多,如下,还有completeThrowable等等
//不需要返回值的时候封装null
final boolean completeNull() {
//CAS设置result值
return UNSAFE.compareAndSwapObject(this, RESULT, null, NIL);
}
//需要返回值的时候,封装结果
final boolean completeValue(T t) {
return UNSAFE.compareAndSwapObject(this, RESULT, null,
(t == null) ? NIL : t);
}
3.1.4 postComplete后续任务的触发方式
当前任务执行完毕后,触发的后续处理。即触发后续任务执行。
final void postComplete() {
/*
* On each step, variable f holds current dependents to pop
* and run. It is extended along only one path at a time,
* pushing others to avoid unbounded recursion.
*/
//h 是栈顶
CompletableFuture<?> f = this; Completion h;
//f.stack存储后续任务的栈
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
//栈结构中有后续需要处理的任务,进入while循环,没循环一次,h指针会后移。
//casStack将h后移,栈顶出栈
if (f.casStack(h, t = h.next)) {
if (t != null) {
if (f != this) {
pushStack(h);
continue;
}
h.next = null; // detach
}
//执行栈顶任务
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
3.2 thenRun源码
后续任务触发的方式有两种:
- 一种是基于前置任务执行完毕,执行postComplete方法触发
- 另一种是后续任务在压栈之前和之后,会尝试执行后续任务,只要前置任务执行结束快,后续任务就可以直接执行,不需要前置任务触发
3.2.1 thenRun系列
几个方法如下,都是走的uniRunStage,同步执行的线程池参数是null。
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
3.2.2 从thenRun进入uniRunStage
//e 线程池执行器,如果是Async异步调用,会传递线程池
private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
//如果传递了线程池,代表异步执行,直接走if代码块执行
//如果没有传递线程池,先执行d.uniRun同步执行,如果d.uniRun返回false,继续向下
if (e != null || !d.uniRun(this, f, null)) {
//如果前置任务没有执行完成,那就压栈
//将线程池、后续任务、前置任务,封装成c
UniRun<T> c = new UniRun<T>(e, d, this, f);
//将封装好的c压栈
//不确保一定压到栈中
//在这个位置,可能出现前置任务已经执行完毕,导致无法压到栈中
push(c);
//尝试执行后续任务
c.tryFire(SYNC);
}
return d;
}
final void push(UniCompletion<?,?> c) {
if (c != null) {
//result是前置任务的结果
//只有前置任务还没有执行完成,才能将封装好的UniRun对象压栈
while (result == null && !tryPushStack(c))
lazySetNext(c, null); // clear on failure
}
}
3.2.3 uniRun方法,参数c==null表示同步执行,否则异步执行(进入claim执行)
//尝试执行后续任务,a是前置任务, f:后续任务
final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) {
Object r; Throwable x;
//如果前置任务没有执行完,直接走后面的后续任务
//只看第二个判断,如果前置任务没有执行完成,直接返回false
if (a == null || (r = a.result) == null || f == null)
return false;
//说明前置任务已经完成,要执行后续任务,但是要先判断后续任务执行了么
if (result == null) {
//后续任务还没有执行
//如果前置任务异常结束,那么后续任务不需要执行了
if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
completeThrowable(x, r);
else
//前置任务正常结束,尝试执行后继任务
try {
//c==null,同步执行,否则同步执行c.claim
if (c != null && !c.claim())
//异步执行完毕
return false;
//同步执行
f.run();
completeNull(); //已分析,见3.1.3
} catch (Throwable ex) {
completeThrowable(ex);//已分析,见3.1.3
}
}
return true;
}
abstract static class UniCompletion<T,V> extends Completion {
//异步执行任务
final boolean claim() {
Executor e = executor;
//判断当前任务标记,是否执行
if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
if (e == null)
//线程池为null,代表同步执行,直接返回false
return true;
//异步执行,使用线程池执行即可
executor = null; // disable
e.execute(this);
}
return false;
}
}
3.2.4 UniRun类(和uniRun方法没关系)
这个类将线程池、后续任务、前置任务、后续具体任务(Runable实现)封装,此类继承了UniCompletion
//后续任务执行,以及将前置任务封装成UniRun对象
static final class UniRun<T> extends UniCompletion<T,Void> {
Runnable fn;
UniRun(Executor executor, CompletableFuture<Void> dep,
CompletableFuture<T> src, Runnable fn) {
super(executor, dep, src); this.fn = fn;
}
//dep 后续任务
//src 前置任务
//fn 后续具体任务(Runable实现)
final CompletableFuture<Void> tryFire(int mode) {
//d 后续任务, a 前置任务
CompletableFuture<Void> d; CompletableFuture<T> a;
if ((d = dep) == null ||
//mode>0是同步
!d.uniRun(a = src, fn, mode > 0 ? null : this))
return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);
}
}