【并发篇】CompletableFuture学习
CompletableFuture 异步编程
前言
我们异步执行一个任务时,一般是用线程池 Executor 去创建。
- 如果不需要有返回值,任务实现 Runnable 接口;
- 如果需要有返回值,任务实现 Callable 接口,调用 Executor 的 submit 方法,再使用 Future 获取即可。
如果多个线程存在依赖组合的话,我们怎么处理呢?
- 可使用同步组件 CountDownLatch、CyclicBarrier 等,但是比较麻烦。
- 其实有简单的方法,就是用 CompeletableFuture。
在现代的软件开发中,处理并发和异步任务变得越来越重要。Java 中的 CompletableFuture
类为我们提供了一种强大的方式来处理异步编程,让我们能够更有效地利用多核处理器和并行执行。
源码解析
源码:
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
}
从源码可以看出 CompletableFuture
同时实现了 Future
和 CompletionStage
接口。
CompletableFuture
除了提供了更为好用和强大的 Future
特性之外,还提供了函数式编程的能力。
Future 接口有 5 个方法:
boolean cancel(boolean mayInterruptIfRunning)
:尝试取消执行任务。boolean isCancelled()
:判断任务是否被取消。boolean isDone()
:判断任务是否已经被执行完成。get()
:等待任务执行完成并获取运算结果。get(long timeout, TimeUnit unit)
:多了一个超时时间。
/**
* 表示一个异步计算的结果。该接口提供方法来检查计算是否完成、等待计算完成以及获取计算结果。
* 一旦计算完成(无论是正常完成、异常终止还是被取消),则不能再次改变其状态。
*
* @param <V> 计算结果的类型;如果计算不返回结果,则使用 {@code Void} 类型。
*/
public interface Future<V> {
/**
* 尝试取消任务的执行。如果任务已经完成、已经被取消,或者由于某些原因无法取消,则此方法将不会产生任何效果。
* 如果调用时任务尚未开始,则该任务不应启动。如果任务已经开始,则 mayInterruptIfRunning 参数决定是否应该尝试中断正在运行的任务。
*
* @param mayInterruptIfRunning 如果为 true 并且任务已经开始执行,则会尝试中断任务。如果为 false,则任务允许继续运行直到完成。
* @return 如果任务在调用此方法之前未完成,则返回 true;否则返回 false。
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 判断任务是否在它正常完成前被取消。注意,只有当任务确实被取消时才会返回 true。
*
* @return 如果任务被取消则返回 true;否则返回 false。
*/
boolean isCancelled();
/**
* 判断任务是否已经完成。注意,这可能是因为任务正常结束、异常结束或被取消。
*
* @return 如果任务已完成则返回 true;否则返回 false。
*/
boolean isDone();
/**
* 如果任务已完成,则返回任务的结果。如果任务尚未完成,则此方法将会阻塞,直到任务完成。
* 如果任务被取消,则抛出 CancellationException 异常;如果任务异常完成,则抛出 ExecutionException 异常。
*
* @return 任务的结果值。
* @throws CancellationException 如果任务被取消。
* @throws ExecutionException 如果计算过程中抛出了异常。
* @throws InterruptedException 如果当前线程在等待时被中断。
*/
V get() throws InterruptedException, ExecutionException;
/**
* 如果任务在指定的超时时间内完成,则返回任务的结果。如果任务尚未完成,则此方法将会阻塞,直到任务完成或超时发生为止。
* 如果任务被取消,则抛出 CancellationException 异常;如果任务异常完成,则抛出 ExecutionException 异常。
* 如果超时时间到达而任务仍未完成,则抛出 TimeoutException 异常。
*
* @param timeout 等待任务完成的最大时间。
* @param unit timeout 参数的时间单位。
* @return 任务的结果值。
* @throws CancellationException 如果任务被取消。
* @throws ExecutionException 如果计算过程中抛出了异常。
* @throws InterruptedException 如果当前线程在等待时被中断。
* @throws TimeoutException 如果在给定的超时时间内未能完成任务。
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
CompletionStage 接口
CompletionStage
接口描述了一个异步计算的阶段。很多计算可以分成多个阶段或步骤,此时可以通过它将所有步骤组合起来,形成异步计算的流水线。(大量使用了函数式编程)
什么是 CompletableFuture?
CompletableFuture
是 Java 8 引入的一个类,用于支持异步编程和操作多个异步任务。它是 Future
的扩展,提供了更多的功能和灵活性。通过 CompletableFuture
,我们可以将多个异步任务串行或并行执行,然后等待它们的完成结果。
使用步骤
一、创建 CompletableFuture
常见的有两种方法
- 通过 new 关键字
- 基于
CompletableFuture
自带的静态工厂方法:runAsync()
、supplyAsync()
。
1、new 关键字
通过 new 关键字创建 CompletableFuture
对象这种使用方式可以看作是将 CompletableFuture
当做 Future
来使用。
举例:
创建异步运算的载体
CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
上面代码创建了一个结果值类型为 RpcResponse<Object>
的 CompletableFuture
,你可以把 resultFuture
看作是异步运算结果的载体。
传入运算结果
// complete() 方法只能调用一次,后续调用将被忽略。
resultFuture.complete(rpcResponse);
假设在未来的某个时刻,我们得到了最终的结果。这时,我们可以调用 complete()
方法为其传入结果,这表示 resultFuture
已经被完成了。
判断任务是否已经被完成
public boolean isDone() {
return result != null;
}
可以通过 isDone()
方法来检查是否已经完成。(Future 接口的方法)
等待任务执行完成并获取运算结果
rpcResponse = completableFuture.get();
可以通过调用 get()
方法来获取异步计算结果。调用 get()
方法的线程会阻塞直到 CompletableFuture
完成运算。(阻塞等待)
如果你已经知道计算的结果的话,可以使用静态方法
completedFuture()
来创建CompletableFuture
。
CompletableFuture<String> future = CompletableFuture.completedFuture("hello!");
assertEquals("hello!", future.get());
completedFuture()
方法底层调用的是带参数的 new 方法,只不过,这个方法不对外暴露。
public static <U> CompletableFuture<U> completedFuture(U value) {
return new CompletableFuture<U>((value == null) ? NIL : value);
}
代码示例
/**
* new 关键字创建 CompletableFuture
* 利用 complete 方法手动完成 CompletableFuture
*/
@Test
void testNew() throws ExecutionException, InterruptedException {
// 1、创建一个新的未完成的 CompletableFuture
CompletableFuture<String> future = new CompletableFuture<>();
// 模拟异步操作完成后手动完成 CompletableFuture
String expectedResult = "Hello, World!";
future.complete(expectedResult);
// 测试是否成功完成并返回预期结果
assertEquals(expectedResult, future.get());
// 2、测试异常完成的情况
CompletableFuture<Void> failingFuture = new CompletableFuture<>();
RuntimeException expectedException = new RuntimeException("Oops!");
failingFuture.completeExceptionally(expectedException);
try {
failingFuture.get();
fail("Expected exception not thrown");
} catch (ExecutionException e) {
assertInstanceOf(RuntimeException.class, e.getCause());
assertEquals(expectedException.getMessage(), e.getCause().getMessage());
}
}
/**
* 静态方法 completedFuture 创建一个已完成的 CompletableFuture
* 底层用的也是 new
*/
@Test
void testCompletedFuture() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.completedFuture("hello!");
assertEquals("hello!", future.get());
}
2、静态工厂方法
supplyAsync
执行 CompletableFuture 任务,支持返回值runAsync
执行 CompletableFuture 任务,没有返回值。因为runAsync()
方法接受的参数是Runnable
,这是一个函数式接口,不允许返回值。
supplyAsync 方法
// 使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 使用自定义线程池,根据supplier构建执行任务(推荐)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
supplyAsync()
方法接受的参数是 Supplier<U>
,是一个函数式接口,U
是返回结果值的类型。
@FunctionalInterface
public interface Supplier<T> {
/**
* Gets a result.
*
* @return a result
*/
T get();
}
使用场景:当你需要异步操作且关心返回结果的时候,可以使用
supplyAsync()
方法。
runAsync 方法
// 使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 使用自定义线程池,根据runnable构建执行任务(推荐)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
使用场景:当你需要异步操作且不关心返回结果的时候可以使用
runAsync()
方法。
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
代码示例
/**
* supplyAsync 和 runAsync 的区别
* supplyAsync 支持返回值
* runAsync 不支持返回值
*/
@Test
void testSupplyAsyncAndRunAsync() throws ExecutionException, InterruptedException {
CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("hello runAsync!"));
// 控制台输出 "hello!"
runFuture.get();
CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> "hello supplyAsync!");
// 控制台 不会 输出 "hello!"
supplyFuture.get();
// 进行断言,判断返回值是否为 "hello!",不通过就会抛出异常
assertEquals("hello supplyAsync!", supplyFuture.get());
}
/**
* 自定义线程池写法
*/
@Test
void testSupplyAsyncAndRunAsync2() {
// 自定义线程池
ExecutorService executor = Executors.newCachedThreadPool();
// runAsync的使用
CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("run, cmty256"), executor);
// supplyAsync的使用
CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("supply, cmty256");
return "cmty256";
}, executor);
System.out.println("=============================异步操作,输出顺序不定=============================");
// runAsync的future没有返回值,输出null
System.out.println(runFuture.join());
System.out.println("=============================异步操作,输出顺序不定=============================");
// supplyAsync的future,有返回值
System.out.println(supplyFuture.join());
executor.shutdown(); // 线程池需要关闭
}
二、简单任务异步回调
处理异步结算结果
当我们获取到异步计算的结果之后,还可以对其进行进一步的处理,比较常用的方法有下面几个:
thenRun()
/thenRunAsync()
thenAccept()
/thenAcceptAsync()
thenApply()
/thenApplyAsync()
whenComplete()
thenRun 和 thenRunAsync 有什么区别?
源码:
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
如果你执行第一个任务的时候,传入了一个自定义线程池:
- 调用
thenRun
方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。 - 调用
thenRunAsync
执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是 ForkJoin 线程池
tips: thenAccept 和 thenAcceptAsync,thenApply 和 thenApplyAsync 等,它们之间的区别也是这个。
thenRun / thenRunAsync
CompletableFuture 的 thenRun
方法,
- 通俗点讲就是,做完第一个任务后,再做第二个任务。
- 某个任务执行完成后,执行回调方法;
- 但是前后两个任务没有参数传递,第二个任务也没有返回值。
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
代码示例:
/**
* thenRun()方法
* <p>
* 做完第一个任务后,再做第二个任务
* 但是前后两个任务没有参数传递,第二个任务也没有返回值。
* </p>
*/
@Test
void testThenRun() throws ExecutionException, InterruptedException {
CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(
() -> {
System.out.println("先执行第一个CompletableFuture方法任务");
return "沉梦听雨";
}
);
CompletableFuture<Void> thenRunFuture = firstFuture.thenRun(() -> {
System.out.println("thenRun-接着执行第二个任务");
});
System.out.println("返回值:" + thenRunFuture.get());
// 输出
/*
先执行第一个CompletableFuture方法任务
thenRun-接着执行第二个任务
返回值:null
*/
}
thenAccept / thenAcceptAsync
CompletableFuture 的 thenAccept 方法表示,
- 第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,
- 但是回调方法是没有返回值的。
代码示例:
/**
* thenAccept()方法
* <p>
* 做完第一个任务后,再做第二个任务
* 可以接收入参,但是没有返回值。
* </p>
*/
@Test
void testThenAccept() throws ExecutionException, InterruptedException {
CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(
() -> {
System.out.println("第一个CompletableFuture方法任务");
return "沉梦听雨";
}
);
CompletableFuture<Void> thenAcceptFuture = firstFuture.thenAccept((a) -> {
if ("沉梦听雨".equals(a)) {
System.out.println("入参校验成功");
}
System.out.println("thenAccept-接着执行第二个任务");
});
System.out.println("返回值:" + thenAcceptFuture.get());
// 输出
/*
第一个CompletableFuture方法任务
入参校验成功
thenAccept-接着执行第二个任务
返回值:null
*/
}
thenApply / thenApplyAsync
thenApply()
方法接收一个 Function
实例,用它来处理结果。
// 沿用上一个任务的线程池
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
// 使用默认的 ForkJoinPool 线程池(不推荐)
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(defaultExecutor(), fn);
}
// 使用自定义线程池(推荐)
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
代码示例:
CompletableFuture 的 thenApply
方法表示,
- 第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,
- 并且回调方法是有返回值的。
/**
* thenApply()方法
* <p>
* 做完第一个任务后,再做第二个任务
* 可以接收入参,并且有返回值。
* </p>
*/
@Test
void testThenApply() throws ExecutionException, InterruptedException {
CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(
() -> {
System.out.println("第一个CompletableFuture方法任务");
return "cmty256";
}
);
CompletableFuture<String> thenApplyFuture = firstFuture.thenApply((a) -> {
if ("沉梦听雨".equals(a)) {
return "第一个任务的返回值";
}
return "thenApply-第二个任务的返回值";
});
System.out.println("返回值:" + thenApplyFuture.get());
// 输出
/*
第一个CompletableFuture方法任务
返回值:thenApply-第二个任务的返回值
*/
}
whenComplete
CompletableFuture 的 whenComplete
方法表示,
- 某个任务执行完成后,执行的回调方法,无返回值;
- 并且 whenComplete 方法返回的 CompletableFuture 的 result 是上个任务的结果。
/**
* whenComplete()方法
* <p>
* 两个任务在同一个线程中执行
* 第二个任务可以接收入参
* 第二个任务返回的是第一个任务的返回值
* </p>
*/
@Test
void testWhenComplete() throws ExecutionException, InterruptedException {
CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(
() -> {
System.out.println("当前线程名称:" + Thread.currentThread().getName());
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "沉梦听雨";
}
);
CompletableFuture<String> whenCompleteFuture = firstFuture.whenComplete((a, throwable) -> {
System.out.println("当前线程名称:" + Thread.currentThread().getName());
System.out.println("上个任务执行完啦,还把【" + a + "】传过来");
if ("沉梦听雨".equals(a)) {
System.out.println("入参校验成功");
}
System.out.println("whenComplete-接着执行第二个任务");
});
System.out.println("返回值:" + whenCompleteFuture.get());
// 输出
/*
当前线程名称:ForkJoinPool.commonPool-worker-19
当前线程名称:ForkJoinPool.commonPool-worker-19
上个任务执行完啦,还把【沉梦听雨】传过来
入参校验成功
whenComplete-接着执行第二个任务
返回值:沉梦听雨
*/
}
异常处理
异步操作可能会失败,CompletableFuture 允许我们使用 exceptionally()
或 handle()
方法来处理异步操作的异常。
handle
主要用于处理异步任务的结果或异常。
- 如果任务正常完成,则返回任务的结果;
- 如果任务抛出异常,则可以指定一个默认值或其他处理逻辑。
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(defaultExecutor(), fn);
}
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
return uniHandleStage(screenExecutor(executor), fn);
}
代码示例:
/**
* handle()方法
* <p>
* 该方法用于处理异步任务的结果或异常。
* - 如果任务正常完成,则返回任务的结果;
* - 如果任务抛出异常,则可以指定一个默认值或其他处理逻辑。
* 在此示例中,异步任务会抛出一个 RuntimeException,
* 而 handle() 方法会捕获该异常并返回一个默认字符串 "world!"。
* </p>
*/
@Test
void testHandle() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Computation error!");
// return "world!";
}).handle((res, ex) -> {
// res 代表返回的结果
// ex 的类型为 Throwable,代表抛出的异常
if (ex != null) {
// 异常被捕获: java.lang.RuntimeException: Computation error!
System.out.println("异常被捕获: " + ex.getMessage());
return "world!";
}
return (String) res;
// return (String) (res != null ? res : "world!");
});
assertEquals("world!", future.get());
}
exceptionally
- 主要用于处理异步任务中发生的异常
/**
* exceptionally() 方法
* <p>
* 该方法用于处理异步任务中发生的异常。
* 如果任务抛出异常,则可以指定一个默认值或其他处理逻辑。
* 在此示例中,异步任务会抛出一个 RuntimeException,
* 而 exceptionally() 方法会捕获该异常并返回一个默认字符串 "world!"。
* </p>
*/
@Test
void testExceptionally() throws ExecutionException, InterruptedException {
CompletableFuture<Object> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Computation error!");
}).exceptionally(ex -> {
// java.util.concurrent.CompletionException: java.lang.RuntimeException: Computation error!
System.out.println(ex.toString());
// 返回默认值 "world!"
return "world!";
});
assertEquals("world!", future.get());
}
completeExceptionally
设置 CompletableFuture 的结果就是异常
- 可以使用
completeExceptionally()
方法为其赋值。 - 当你需要手动控制 CompletableFuture 的状态,并且希望在某些条件下将其标记为异常完成时,可以使用
completeExceptionally()
。
/**
* completeExceptionally() 方法
* <p>
* 该方法用于手动将 CompletableFuture 标记为异常完成状态。
* 在此示例中,我们创建了一个 CompletableFuture 对象,并使用 completeExceptionally() 方法手动设置一个异常。
* 然后,调用 get() 方法会抛出该异常。
* </p>
*/
@Test
void testCompleteExceptionally() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
// 手动设置 CompletableFuture 为异常完成状态
completableFuture.completeExceptionally(new RuntimeException("Calculation failed!"));
try {
// 直接 get() 会抛出异常
completableFuture.get();
} catch (ExecutionException e) {
// 捕获到异常: Calculation failed!
System.out.println("捕获到异常: " + e.getCause().getMessage());
}
}
三、多个任务组合处理
AND 组合关系
thenCombine
/ thenAcceptBoth
/ runAfterBoth
都表示:
- 将两个 CompletableFuture 组合起来,只有这两个都正常执行完了,才会执行某个任务。
区别在于:
thenCombine
:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值thenAcceptBoth
: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值runAfterBoth
不会把执行结果当做方法入参,且没有返回值。
thenCombine / thenCombineAsync
代码示例:
/**
* thenCombineAsync() 方法测试
* <p>
* 该方法用于组合两个异步任务的结果。
* 在此示例中,我们创建了两个异步任务,并使用 thenCombineAsync() 方法将它们的结果组合在一起。
* 第一个任务是一个已经完成的 CompletableFuture,第二个任务通过 supplyAsync() 方法异步执行。
* 最终,两个任务的结果会被组合成一个新的字符串,并通过 join() 方法获取结果。
* </p>
*/
@Test
void testThenCombineAsync() {
// 创建一个已经完成的 CompletableFuture,结果为 "第一个异步任务"
CompletableFuture<String> firstFuture = CompletableFuture.completedFuture("第一个异步任务");
// 创建一个固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
// 创建第二个异步任务,并使用 thenCombineAsync() 方法组合两个任务的结果
CompletableFuture<String> future = CompletableFuture
// 第二个异步任务
.supplyAsync(() -> "第二个异步任务", executor)
// 第三个任务,组合前两个任务的结果
.thenCombineAsync(firstFuture, (s, other) -> {
System.out.println(s); // 打印 supplyAsync 任务的结果
System.out.println(other); // 打印 firstFuture 任务的结果
return "两个异步任务的组合"; // 返回组合后的结果
}, executor);
// 获取并打印组合后的结果
System.out.println(future.join());
// 关闭线程池
executor.shutdown();
// 输出
/*
第二个异步任务
第一个异步任务
两个异步任务的组合
*/
}
源码分析:
// 不能传入自定义线程池
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
// 不能传入自定义线程池
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(defaultExecutor(), other, fn);
}
// 可以传入自定义线程池
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}
allOf 全部执行完
所有任务都执行完成后,才执行 allOf
返回的 CompletableFuture。
- 如果任意一个任务异常,
allOf
的 CompletableFuture,执行 get 方法,会抛出异常。
/**
* allOf() 方法测试
* <p>
* 该方法用于组合多个 CompletableFuture 任务,确保所有任务都完成后才继续执行后续操作。
* 在此示例中,我们创建了两个异步任务,并使用 allOf() 方法将它们组合在一起。
* 当所有任务完成后,会执行 whenComplete() 方法中的回调,打印 "finish"。
* </p>
*/
@Test
void testAllOf() throws ExecutionException, InterruptedException {
// 创建第一个异步任务,任务完成后打印 "我执行完了"
CompletableFuture<Void> a = CompletableFuture.runAsync(() -> System.out.println("我执行完了"));
// 创建第二个异步任务,任务完成后打印 "我也执行完了"
CompletableFuture<Void> b = CompletableFuture.runAsync(() -> System.out.println("我也执行完了"));
// 使用 CompletableFuture.allOf 组合两个异步任务,由于 runAsync 方法中的任务是异步执行的,具体的执行顺序可能会有所不同
// 当所有任务完成后,执行 whenComplete 回调
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(a, b)
// 回调函数中的参数 res 和 ex 分别表示结果和异常
// 在 allOf 的情况下,res 为 null,ex 为可能的异常(如果没有异常则为 null)
.whenComplete((res, ex) -> System.out.println("finish"));
// 为 null
allOfFuture.get();
// 输出
/*
我也执行完了
我执行完了
finish
*/
}
@Test
void testAllOf2() {
List<CompletableFuture<?>> futures = Arrays.asList(
CompletableFuture.runAsync(() -> System.out.println("Task 1")),
CompletableFuture.runAsync(() -> {
// try {
// Thread.sleep(500); // 模拟较长时间的任务
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// }
System.out.println("Task 2");
})
);
// 等待所有任务完成
CompletableFuture
// 这里使用了 List 的 toArray(T[] a) 方法将 List<CompletableFuture<?>> 转换为 CompletableFuture<?>[] 数组。
// 之所以传递一个空的 CompletableFuture[0] 是因为 toArray(T[] a) 方法需要一个与列表元素类型相同的数组作为参数,以确定返回数组的类型和大小。
// 如果不提供任何数组作为参数,toArray() 方法将返回 Object[],这在调用 allOf() 时会导致编译错误,因为它期望的是 CompletableFuture<?>[]
.allOf(futures.toArray(new CompletableFuture[0]))
.join();
// 输出(顺序不固定)
/*
Task 2
Task 1
*/
}
anyOf 任一执行完
任意一个任务执行完,就执行 anyOf
返回的 CompletableFuture。
- 如果执行的任务异常,
anyOf
的 CompletableFuture,执行 get 方法,会抛出异常。
/**
* anyOf() 方法测试
* <p>
* 该方法用于组合多个 CompletableFuture 任务,只要其中任何一个任务完成,就会继续执行后续操作。
* 在此示例中,我们创建了两个异步任务,并使用 anyOf() 方法将它们组合在一起。
* 只要其中一个任务完成,就会执行 whenComplete() 方法中的回调,打印 "finish"。
* </p>
*/
@Test
void testAnyOf() {
// 创建第一个异步任务,任务完成后打印 "我执行完了"
// 该任务会休眠 3 秒钟
CompletableFuture<Void> a = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我执行完了");
});
// 创建第二个异步任务,任务完成后打印 "我也执行完了"
CompletableFuture<Void> b = CompletableFuture.runAsync(() -> System.out.println("我也执行完了"));
// 使用 CompletableFuture.anyOf 组合两个异步任务
// 只要其中一个任务完成,就会执行 whenComplete 回调
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b)
.whenComplete((res, ex) -> System.out.println("finish"));
// 等待任意一个给定的 CompletableFuture 完成
anyOfFuture.join();
// 输出(这里由于第一个任务会休眠 3 秒,所以一直会输出第一种情况)
/*
我也执行完了
finish
或者
我执行完了
finish
*/
}
join() 的含义是:
-
等待一个异步操作(也就是
CompletableFuture
)完成并获取其结果。 -
具体来说,
join
方法会阻塞当前线程,直到相应的CompletableFuture
完成,并返回其计算结果(或异常)。如果在调用join
时异步操作还未完成,那么当前线程将一直阻塞等待,直到操作完成或者抛出异常。
get() 和 join() 方法
CompletableFuture
的 get()
方法和 join()
方法都是用来等待异步任务完成并获取其结果的,但它们在处理异常和返回类型上有不同的行为。
下面详细解释这两个方法的区别:
1、get()
方法
- 签名:
V get() throws InterruptedException, ExecutionException;
- 功能:阻塞当前线程直到
CompletableFuture
完成,并返回结果。 - 异常处理:
- 如果任务被中断,则抛出
InterruptedException
。 - 如果任务执行过程中抛出了异常,则封装为
ExecutionException
或CompletionException
并抛出。
- 如果任务被中断,则抛出
try {
String result = future.get(); // 阻塞直到任务完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态
// 处理中断异常
} catch (ExecutionException e) {
// 处理执行期间发生的异常
}
2. join()
方法
- 签名:
V join();
- 功能:类似于
get()
,它也会阻塞当前线程直到CompletableFuture
完成,但它不会抛出受检异常(checked exception),而是将所有异常都封装为未受检的CompletionException
。 - 异常处理:
- 如果任务正常完成,
join()
返回任务的结果。 - 如果任务因为异常而失败,
join()
将抛出一个包含原始异常的CompletionException
。 - 不会抛出
InterruptedException
,即使任务被中断;相反,它会继续等待直到任务完成或遇到其他类型的异常。
- 如果任务正常完成,
String result = future.join(); // 阻塞直到任务完成,不抛出受检异常
使用建议
-
选择
get()
还是join()
:- 如果你希望明确区分不同类型的异常并且需要处理
InterruptedException
,那么应该使用get()
方法。 - 如果你更倾向于简化代码,不需要显式处理
InterruptedException
,并且可以接受所有的异常都被封装为CompletionException
,那么可以使用join()
方法。
- 如果你希望明确区分不同类型的异常并且需要处理
-
性能考虑:
- 在大多数情况下,两者之间的性能差异可以忽略不计,选择哪个方法主要取决于你的异常处理策略和个人偏好。
示例代码
这里有一个简单的例子来展示 get()
和 join()
的用法以及它们如何处理异常:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
public static void main(String[] args) {
// 创建一个可能会失败的 CompletableFuture
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Task failed!");
}
return "Success!";
});
try {
// 使用 get() 方法
System.out.println("Using get(): " + future.get());
} catch (InterruptedException | ExecutionException e) {
System.err.println("Error using get(): " + e.getCause());
}
// 使用 join() 方法
try {
System.out.println("Using join(): " + future.join());
} catch (CompletionException e) {
System.err.println("Error using join(): " + e.getCause());
}
}
}
在这个例子中,你可以看到当 CompletableFuture
因为异常而失败时,get()
和 join()
方法都会捕获到异常,
- 但是
get()
抛出了ExecutionException
, - 而
join()
则抛出了CompletionException
。
学习参考
- 异步编程利器:CompletableFuture详解 |Java 开发实战 - 掘金 (juejin.cn)
- CompletableFuture 详解 | JavaGuide(Java面试 + 学习指南)