当前位置: 首页 > article >正文

【并发篇】CompletableFuture学习

CompletableFuture 异步编程

前言

我们异步执行一个任务时,一般是用线程池 Executor 去创建。

  • 如果不需要有返回值,任务实现 Runnable 接口;
  • 如果需要有返回值,任务实现 Callable 接口,调用 Executor 的 submit 方法,再使用 Future 获取即可。

如果多个线程存在依赖组合的话,我们怎么处理呢?

  • 可使用同步组件 CountDownLatch、CyclicBarrier 等,但是比较麻烦。
  • 其实有简单的方法,就是用 CompeletableFuture。

在现代的软件开发中,处理并发和异步任务变得越来越重要。Java 中的 CompletableFuture 类为我们提供了一种强大的方式来处理异步编程,让我们能够更有效地利用多核处理器和并行执行。

源码解析

源码:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
}

从源码可以看出 CompletableFuture 同时实现了 FutureCompletionStage 接口。

CompletableFuture 除了提供了更为好用和强大的 Future 特性之外,还提供了函数式编程的能力。

Future 接口有 5 个方法:

  1. boolean cancel(boolean mayInterruptIfRunning):尝试取消执行任务。
  2. boolean isCancelled():判断任务是否被取消。
  3. boolean isDone():判断任务是否已经被执行完成。
  4. get():等待任务执行完成并获取运算结果。
  5. get(long timeout, TimeUnit unit):多了一个超时时间。
/**
 * 表示一个异步计算的结果。该接口提供方法来检查计算是否完成、等待计算完成以及获取计算结果。
 * 一旦计算完成(无论是正常完成、异常终止还是被取消),则不能再次改变其状态。
 *
 * @param <V> 计算结果的类型;如果计算不返回结果,则使用 {@code Void} 类型。
 */
public interface Future<V> {

    /**
     * 尝试取消任务的执行。如果任务已经完成、已经被取消,或者由于某些原因无法取消,则此方法将不会产生任何效果。
     * 如果调用时任务尚未开始,则该任务不应启动。如果任务已经开始,则 mayInterruptIfRunning 参数决定是否应该尝试中断正在运行的任务。
     *
     * @param mayInterruptIfRunning 如果为 true 并且任务已经开始执行,则会尝试中断任务。如果为 false,则任务允许继续运行直到完成。
     * @return 如果任务在调用此方法之前未完成,则返回 true;否则返回 false。
     */
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * 判断任务是否在它正常完成前被取消。注意,只有当任务确实被取消时才会返回 true。
     * 
     * @return 如果任务被取消则返回 true;否则返回 false。
     */
    boolean isCancelled();

    /**
     * 判断任务是否已经完成。注意,这可能是因为任务正常结束、异常结束或被取消。
     *
     * @return 如果任务已完成则返回 true;否则返回 false。
     */
    boolean isDone();

    /**
     * 如果任务已完成,则返回任务的结果。如果任务尚未完成,则此方法将会阻塞,直到任务完成。
     * 如果任务被取消,则抛出 CancellationException 异常;如果任务异常完成,则抛出 ExecutionException 异常。
     *
     * @return 任务的结果值。
     * @throws CancellationException 如果任务被取消。
     * @throws ExecutionException 如果计算过程中抛出了异常。
     * @throws InterruptedException 如果当前线程在等待时被中断。
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * 如果任务在指定的超时时间内完成,则返回任务的结果。如果任务尚未完成,则此方法将会阻塞,直到任务完成或超时发生为止。
     * 如果任务被取消,则抛出 CancellationException 异常;如果任务异常完成,则抛出 ExecutionException 异常。
     * 如果超时时间到达而任务仍未完成,则抛出 TimeoutException 异常。
     *
     * @param timeout 等待任务完成的最大时间。
     * @param unit    timeout 参数的时间单位。
     * @return 任务的结果值。
     * @throws CancellationException 如果任务被取消。
     * @throws ExecutionException 如果计算过程中抛出了异常。
     * @throws InterruptedException 如果当前线程在等待时被中断。
     * @throws TimeoutException 如果在给定的超时时间内未能完成任务。
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

CompletionStage 接口

CompletionStage 接口描述了一个异步计算的阶段。很多计算可以分成多个阶段或步骤,此时可以通过它将所有步骤组合起来,形成异步计算的流水线。(大量使用了函数式编程)

什么是 CompletableFuture?

CompletableFuture 是 Java 8 引入的一个类,用于支持异步编程和操作多个异步任务。它是 Future 的扩展,提供了更多的功能和灵活性。通过 CompletableFuture,我们可以将多个异步任务串行或并行执行,然后等待它们的完成结果。

使用步骤

一、创建 CompletableFuture

常见的有两种方法

  1. 通过 new 关键字
  2. 基于 CompletableFuture 自带的静态工厂方法:runAsync()supplyAsync()
1、new 关键字

通过 new 关键字创建 CompletableFuture 对象这种使用方式可以看作是将 CompletableFuture 当做 Future 来使用。

举例:

创建异步运算的载体

CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();

上面代码创建了一个结果值类型为 RpcResponse<Object>CompletableFuture,你可以把 resultFuture 看作是异步运算结果的载体。

传入运算结果

// complete() 方法只能调用一次,后续调用将被忽略。
resultFuture.complete(rpcResponse);

假设在未来的某个时刻,我们得到了最终的结果。这时,我们可以调用 complete() 方法为其传入结果,这表示 resultFuture 已经被完成了。

判断任务是否已经被完成

public boolean isDone() {
    return result != null;
}

可以通过 isDone() 方法来检查是否已经完成。(Future 接口的方法)

等待任务执行完成并获取运算结果

rpcResponse = completableFuture.get();

可以通过调用 get() 方法来获取异步计算结果。调用 get() 方法的线程会阻塞直到 CompletableFuture 完成运算。(阻塞等待)

如果你已经知道计算的结果的话,可以使用静态方法 completedFuture() 来创建 CompletableFuture

CompletableFuture<String> future = CompletableFuture.completedFuture("hello!");
assertEquals("hello!", future.get());

completedFuture() 方法底层调用的是带参数的 new 方法,只不过,这个方法不对外暴露。

public static <U> CompletableFuture<U> completedFuture(U value) {
    return new CompletableFuture<U>((value == null) ? NIL : value);
}
代码示例
    /**
     * new 关键字创建 CompletableFuture
     * 利用 complete 方法手动完成 CompletableFuture
     */
    @Test
    void testNew() throws ExecutionException, InterruptedException {
        // 1、创建一个新的未完成的 CompletableFuture
        CompletableFuture<String> future = new CompletableFuture<>();

        // 模拟异步操作完成后手动完成 CompletableFuture
        String expectedResult = "Hello, World!";
        future.complete(expectedResult);

        // 测试是否成功完成并返回预期结果
        assertEquals(expectedResult, future.get());

        // 2、测试异常完成的情况
        CompletableFuture<Void> failingFuture = new CompletableFuture<>();
        RuntimeException expectedException = new RuntimeException("Oops!");
        failingFuture.completeExceptionally(expectedException);

        try {
            failingFuture.get();
            fail("Expected exception not thrown");
        } catch (ExecutionException e) {
            assertInstanceOf(RuntimeException.class, e.getCause());
            assertEquals(expectedException.getMessage(), e.getCause().getMessage());
        }
    }

    /**
     * 静态方法 completedFuture 创建一个已完成的 CompletableFuture
     * 底层用的也是 new
     */
    @Test
    void testCompletedFuture() throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.completedFuture("hello!");
        assertEquals("hello!", future.get());
    }
2、静态工厂方法
  1. supplyAsync 执行 CompletableFuture 任务,支持返回值
  2. runAsync 执行 CompletableFuture 任务,没有返回值。因为 runAsync() 方法接受的参数是 Runnable ,这是一个函数式接口,不允许返回值。
supplyAsync 方法
// 使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    
// 使用自定义线程池,根据supplier构建执行任务(推荐)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

supplyAsync() 方法接受的参数是 Supplier<U> ,是一个函数式接口,U 是返回结果值的类型。

@FunctionalInterface
public interface Supplier<T> {

    /**
     * Gets a result.
     *
     * @return a result
     */
    T get();
}

使用场景:当你需要异步操作且关心返回结果的时候,可以使用 supplyAsync() 方法。

runAsync 方法
// 使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable) 
    
// 使用自定义线程池,根据runnable构建执行任务(推荐)
public static CompletableFuture<Void> runAsync(Runnable runnable,  Executor executor)

使用场景:当你需要异步操作且不关心返回结果的时候可以使用 runAsync() 方法。

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}
代码示例
    /**
     * supplyAsync 和 runAsync 的区别
     * supplyAsync 支持返回值
     * runAsync    不支持返回值
     */
    @Test
    void testSupplyAsyncAndRunAsync() throws ExecutionException, InterruptedException {
        CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("hello runAsync!"));
        // 控制台输出 "hello!"
        runFuture.get();

        CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> "hello supplyAsync!");
        // 控制台 不会 输出 "hello!"
        supplyFuture.get();
        // 进行断言,判断返回值是否为 "hello!",不通过就会抛出异常
        assertEquals("hello supplyAsync!", supplyFuture.get());
    }

    /**
     * 自定义线程池写法
     */
    @Test
    void testSupplyAsyncAndRunAsync2() {
        // 自定义线程池
        ExecutorService executor = Executors.newCachedThreadPool();

        // runAsync的使用
        CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("run, cmty256"), executor);

        // supplyAsync的使用
        CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("supply, cmty256");
            return "cmty256";
        }, executor);

        System.out.println("=============================异步操作,输出顺序不定=============================");

        // runAsync的future没有返回值,输出null
        System.out.println(runFuture.join());

        System.out.println("=============================异步操作,输出顺序不定=============================");

        // supplyAsync的future,有返回值
        System.out.println(supplyFuture.join());


        executor.shutdown(); // 线程池需要关闭
    }

二、简单任务异步回调

image

处理异步结算结果

当我们获取到异步计算的结果之后,还可以对其进行进一步的处理,比较常用的方法有下面几个:

  1. thenRun() / thenRunAsync()
  2. thenAccept() / thenAcceptAsync()
  3. thenApply() / thenApplyAsync()
  4. whenComplete()

thenRun 和 thenRunAsync 有什么区别?

源码

    private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
  
    public CompletableFuture<Void> thenRun(Runnable action) {
        return uniRunStage(null, action);
    }

    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return uniRunStage(asyncPool, action);
    }

如果你执行第一个任务的时候,传入了一个自定义线程池:

  • 调用 thenRun 方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池
  • 调用 thenRunAsync 执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是 ForkJoin 线程池

tips: thenAccept 和 thenAcceptAsync,thenApply 和 thenApplyAsync 等,它们之间的区别也是这个。

thenRun / thenRunAsync

CompletableFuture 的 thenRun 方法,

  • 通俗点讲就是,做完第一个任务后,再做第二个任务。
  • 某个任务执行完成后,执行回调方法;
  • 但是前后两个任务没有参数传递,第二个任务也没有返回值
public CompletableFuture<Void> thenRun(Runnable action);

public CompletableFuture<Void> thenRunAsync(Runnable action);

代码示例:

    /**
     * thenRun()方法
     * <p>
     * 做完第一个任务后,再做第二个任务
     * 但是前后两个任务没有参数传递,第二个任务也没有返回值。
     * </p>
     */
    @Test
    void testThenRun() throws ExecutionException, InterruptedException {
        CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(
                () -> {
                    System.out.println("先执行第一个CompletableFuture方法任务");
                    return "沉梦听雨";
                }
        );

        CompletableFuture<Void> thenRunFuture = firstFuture.thenRun(() -> {
            System.out.println("thenRun-接着执行第二个任务");
        });

        System.out.println("返回值:" + thenRunFuture.get());

        // 输出
        /*
        先执行第一个CompletableFuture方法任务
        thenRun-接着执行第二个任务
        返回值:null
         */
    }
thenAccept / thenAcceptAsync

CompletableFuture 的 thenAccept 方法表示,

  • 第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,
  • 但是回调方法是没有返回值的。

代码示例:

    /**
     * thenAccept()方法
     * <p>
     * 做完第一个任务后,再做第二个任务
     * 可以接收入参,但是没有返回值。
     * </p>
     */
    @Test
    void testThenAccept() throws ExecutionException, InterruptedException {
        CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(
                () -> {
                    System.out.println("第一个CompletableFuture方法任务");
                    return "沉梦听雨";
                }
        );

        CompletableFuture<Void> thenAcceptFuture = firstFuture.thenAccept((a) -> {
            if ("沉梦听雨".equals(a)) {
                System.out.println("入参校验成功");
            }

            System.out.println("thenAccept-接着执行第二个任务");
        });

        System.out.println("返回值:" + thenAcceptFuture.get());

        // 输出
        /*
        第一个CompletableFuture方法任务
        入参校验成功
        thenAccept-接着执行第二个任务
        返回值:null
         */
    }
thenApply / thenApplyAsync

thenApply() 方法接收一个 Function 实例,用它来处理结果。

// 沿用上一个任务的线程池
public <U> CompletableFuture<U> thenApply(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}

// 使用默认的 ForkJoinPool 线程池(不推荐)
public <U> CompletableFuture<U> thenApplyAsync(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(defaultExecutor(), fn);
}
// 使用自定义线程池(推荐)
public <U> CompletableFuture<U> thenApplyAsync(
    Function<? super T,? extends U> fn, Executor executor) {
    return uniApplyStage(screenExecutor(executor), fn);
}

代码示例:

CompletableFuture 的 thenApply 方法表示,

  • 第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,
  • 并且回调方法是有返回值的。
    /**
     * thenApply()方法
     * <p>
     * 做完第一个任务后,再做第二个任务
     * 可以接收入参,并且有返回值。
     * </p>
     */
    @Test
    void testThenApply() throws ExecutionException, InterruptedException {
        CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(
                () -> {
                    System.out.println("第一个CompletableFuture方法任务");
                    return "cmty256";
                }
        );

        CompletableFuture<String> thenApplyFuture = firstFuture.thenApply((a) -> {
            if ("沉梦听雨".equals(a)) {
                return "第一个任务的返回值";
            }

            return "thenApply-第二个任务的返回值";
        });

        System.out.println("返回值:" + thenApplyFuture.get());

        // 输出
        /*
        第一个CompletableFuture方法任务
        返回值:thenApply-第二个任务的返回值
         */
    }
whenComplete

CompletableFuture 的 whenComplete 方法表示,

  • 某个任务执行完成后,执行的回调方法,无返回值
  • 并且 whenComplete 方法返回的 CompletableFuture 的 result 是上个任务的结果
    /**
     * whenComplete()方法
     * <p>
     * 两个任务在同一个线程中执行
     * 第二个任务可以接收入参
     * 第二个任务返回的是第一个任务的返回值
     * </p>
     */
    @Test
    void testWhenComplete() throws ExecutionException, InterruptedException {
        CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(
                () -> {
                    System.out.println("当前线程名称:" + Thread.currentThread().getName());
                    try {
                        Thread.sleep(2000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "沉梦听雨";
                }
        );

        CompletableFuture<String> whenCompleteFuture = firstFuture.whenComplete((a, throwable) -> {
            System.out.println("当前线程名称:" + Thread.currentThread().getName());
            System.out.println("上个任务执行完啦,还把【" + a + "】传过来");
            if ("沉梦听雨".equals(a)) {
                System.out.println("入参校验成功");
            }

            System.out.println("whenComplete-接着执行第二个任务");
        });

        System.out.println("返回值:" + whenCompleteFuture.get());

        // 输出
        /*
        当前线程名称:ForkJoinPool.commonPool-worker-19
        当前线程名称:ForkJoinPool.commonPool-worker-19
        上个任务执行完啦,还把【沉梦听雨】传过来
        入参校验成功
        whenComplete-接着执行第二个任务
        返回值:沉梦听雨
         */
    }
异常处理

异步操作可能会失败,CompletableFuture 允许我们使用 exceptionally()handle() 方法来处理异步操作的异常。

handle

主要用于处理异步任务的结果异常

  • 如果任务正常完成,则返回任务的结果;
  • 如果任务抛出异常,则可以指定一个默认值或其他处理逻辑。
public <U> CompletableFuture<U> handle(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(null, fn);
}

public <U> CompletableFuture<U> handleAsync(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(defaultExecutor(), fn);
}

public <U> CompletableFuture<U> handleAsync(
    BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
    return uniHandleStage(screenExecutor(executor), fn);
}

代码示例:

    /**
     * handle()方法
     * <p>
     * 该方法用于处理异步任务的结果或异常。
     * - 如果任务正常完成,则返回任务的结果;
     * - 如果任务抛出异常,则可以指定一个默认值或其他处理逻辑。
     * 在此示例中,异步任务会抛出一个 RuntimeException,
     * 而 handle() 方法会捕获该异常并返回一个默认字符串 "world!"。
     * </p>
     */
    @Test
    void testHandle() throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Computation error!");
            // return "world!";
        }).handle((res, ex) -> {
            // res 代表返回的结果
            // ex 的类型为 Throwable,代表抛出的异常

            if (ex != null) {
                // 异常被捕获: java.lang.RuntimeException: Computation error!
                System.out.println("异常被捕获: " + ex.getMessage());
                return "world!";
            }
            return (String) res;

            // return (String) (res != null ? res : "world!");
        });

        assertEquals("world!", future.get());
    }
exceptionally
  • 主要用于处理异步任务中发生的异常
    /**
     * exceptionally() 方法
     * <p>
     * 该方法用于处理异步任务中发生的异常。
     * 如果任务抛出异常,则可以指定一个默认值或其他处理逻辑。
     * 在此示例中,异步任务会抛出一个 RuntimeException,
     * 而 exceptionally() 方法会捕获该异常并返回一个默认字符串 "world!"。
     * </p>
     */
    @Test
    void testExceptionally() throws ExecutionException, InterruptedException {
        CompletableFuture<Object> future = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Computation error!");
        }).exceptionally(ex -> {
            // java.util.concurrent.CompletionException: java.lang.RuntimeException: Computation error!
            System.out.println(ex.toString());
            // 返回默认值 "world!"
            return "world!";
        });
        assertEquals("world!", future.get());
    }
completeExceptionally

设置 CompletableFuture 的结果就是异常

  • 可以使用 completeExceptionally() 方法为其赋值。
  • 当你需要手动控制 CompletableFuture 的状态,并且希望在某些条件下将其标记为异常完成时,可以使用 completeExceptionally()
    /**
     * completeExceptionally() 方法
     * <p>
     * 该方法用于手动将 CompletableFuture 标记为异常完成状态。
     * 在此示例中,我们创建了一个 CompletableFuture 对象,并使用 completeExceptionally() 方法手动设置一个异常。
     * 然后,调用 get() 方法会抛出该异常。
     * </p>
     */
    @Test
    void testCompleteExceptionally() throws InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();

        // 手动设置 CompletableFuture 为异常完成状态
        completableFuture.completeExceptionally(new RuntimeException("Calculation failed!"));

        try {
            // 直接 get() 会抛出异常
            completableFuture.get();
        } catch (ExecutionException e) {
            // 捕获到异常: Calculation failed!
            System.out.println("捕获到异常: " + e.getCause().getMessage());
        }
    }

三、多个任务组合处理

AND 组合关系

thenCombine / thenAcceptBoth / runAfterBoth 都表示:

  • 将两个 CompletableFuture 组合起来,只有这两个都正常执行完了,才会执行某个任务

区别在于:

  • thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值
  • thenAcceptBoth: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值
  • runAfterBoth 不会把执行结果当做方法入参,且没有返回值
thenCombine / thenCombineAsync

代码示例:

    /**
     * thenCombineAsync() 方法测试
     * <p>
     * 该方法用于组合两个异步任务的结果。
     * 在此示例中,我们创建了两个异步任务,并使用 thenCombineAsync() 方法将它们的结果组合在一起。
     * 第一个任务是一个已经完成的 CompletableFuture,第二个任务通过 supplyAsync() 方法异步执行。
     * 最终,两个任务的结果会被组合成一个新的字符串,并通过 join() 方法获取结果。
     * </p>
     */
    @Test
    void testThenCombineAsync() {
        // 创建一个已经完成的 CompletableFuture,结果为 "第一个异步任务"
        CompletableFuture<String> firstFuture = CompletableFuture.completedFuture("第一个异步任务");

        // 创建一个固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(10);

        // 创建第二个异步任务,并使用 thenCombineAsync() 方法组合两个任务的结果
        CompletableFuture<String> future = CompletableFuture
                // 第二个异步任务
                .supplyAsync(() -> "第二个异步任务", executor)
                // 第三个任务,组合前两个任务的结果
                .thenCombineAsync(firstFuture, (s, other) -> {
                    System.out.println(s); // 打印 supplyAsync 任务的结果
                    System.out.println(other); // 打印 firstFuture 任务的结果
                    return "两个异步任务的组合"; // 返回组合后的结果
                }, executor);

        // 获取并打印组合后的结果
        System.out.println(future.join());

        // 关闭线程池
        executor.shutdown();

        // 输出
        /*
        第二个异步任务
        第一个异步任务
        两个异步任务的组合
        */
    }

源码分析:

    // 不能传入自定义线程池
    public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(null, other, fn);
    }

	// 不能传入自定义线程池
    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(defaultExecutor(), other, fn);
    }

	// 可以传入自定义线程池
    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
        return biApplyStage(screenExecutor(executor), other, fn);
    }
allOf 全部执行完

所有任务都执行完成后,才执行 allOf 返回的 CompletableFuture。

  • 如果任意一个任务异常,allOf 的 CompletableFuture,执行 get 方法,会抛出异常。
    /**
     * allOf() 方法测试
     * <p>
     * 该方法用于组合多个 CompletableFuture 任务,确保所有任务都完成后才继续执行后续操作。
     * 在此示例中,我们创建了两个异步任务,并使用 allOf() 方法将它们组合在一起。
     * 当所有任务完成后,会执行 whenComplete() 方法中的回调,打印 "finish"。
     * </p>
     */
    @Test
    void testAllOf() throws ExecutionException, InterruptedException {
        // 创建第一个异步任务,任务完成后打印 "我执行完了"
        CompletableFuture<Void> a = CompletableFuture.runAsync(() -> System.out.println("我执行完了"));

        // 创建第二个异步任务,任务完成后打印 "我也执行完了"
        CompletableFuture<Void> b = CompletableFuture.runAsync(() -> System.out.println("我也执行完了"));

        // 使用 CompletableFuture.allOf 组合两个异步任务,由于 runAsync 方法中的任务是异步执行的,具体的执行顺序可能会有所不同
        // 当所有任务完成后,执行 whenComplete 回调
        CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(a, b)
                // 回调函数中的参数 res 和 ex 分别表示结果和异常
                // 在 allOf 的情况下,res 为 null,ex 为可能的异常(如果没有异常则为 null)
                .whenComplete((res, ex) -> System.out.println("finish"));

        // 为 null
        allOfFuture.get();

        // 输出
        /*
        我也执行完了
        我执行完了
        finish
        */
    }

    @Test
    void testAllOf2() {
        List<CompletableFuture<?>> futures = Arrays.asList(
                CompletableFuture.runAsync(() -> System.out.println("Task 1")),
                CompletableFuture.runAsync(() -> {
                    // try {
                    //     Thread.sleep(500); // 模拟较长时间的任务
                    // } catch (InterruptedException e) {
                    //     Thread.currentThread().interrupt();
                    // }
                    System.out.println("Task 2");
                })
        );

        // 等待所有任务完成
        CompletableFuture
                // 这里使用了 List 的 toArray(T[] a) 方法将 List<CompletableFuture<?>> 转换为 CompletableFuture<?>[] 数组。
                // 之所以传递一个空的 CompletableFuture[0] 是因为 toArray(T[] a) 方法需要一个与列表元素类型相同的数组作为参数,以确定返回数组的类型和大小。
                // 如果不提供任何数组作为参数,toArray() 方法将返回 Object[],这在调用 allOf() 时会导致编译错误,因为它期望的是 CompletableFuture<?>[]
                .allOf(futures.toArray(new CompletableFuture[0]))
                .join();

        // 输出(顺序不固定)
        /*
        Task 2
        Task 1
        */
    }
anyOf 任一执行完

任意一个任务执行完,就执行 anyOf 返回的 CompletableFuture。

  • 如果执行的任务异常,anyOf 的 CompletableFuture,执行 get 方法,会抛出异常。
    /**
     * anyOf() 方法测试
     * <p>
     * 该方法用于组合多个 CompletableFuture 任务,只要其中任何一个任务完成,就会继续执行后续操作。
     * 在此示例中,我们创建了两个异步任务,并使用 anyOf() 方法将它们组合在一起。
     * 只要其中一个任务完成,就会执行 whenComplete() 方法中的回调,打印 "finish"。
     * </p>
     */
    @Test
    void testAnyOf() {
        // 创建第一个异步任务,任务完成后打印 "我执行完了"
        // 该任务会休眠 3 秒钟
        CompletableFuture<Void> a = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("我执行完了");
        });

        // 创建第二个异步任务,任务完成后打印 "我也执行完了"
        CompletableFuture<Void> b = CompletableFuture.runAsync(() -> System.out.println("我也执行完了"));

        // 使用 CompletableFuture.anyOf 组合两个异步任务
        // 只要其中一个任务完成,就会执行 whenComplete 回调
        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b)
                .whenComplete((res, ex) -> System.out.println("finish"));

        // 等待任意一个给定的 CompletableFuture 完成
        anyOfFuture.join();

        // 输出(这里由于第一个任务会休眠 3 秒,所以一直会输出第一种情况)
        /*
        我也执行完了
        finish
        或者
        我执行完了
        finish
        */
    }

join() 的含义是:

  • 等待一个异步操作(也就是 CompletableFuture)完成并获取其结果。

  • 具体来说,join 方法会阻塞当前线程,直到相应的 CompletableFuture 完成,并返回其计算结果(或异常)。如果在调用 join 时异步操作还未完成,那么当前线程将一直阻塞等待,直到操作完成或者抛出异常。

get() 和 join() 方法

CompletableFutureget() 方法和 join() 方法都是用来等待异步任务完成并获取其结果的,但它们在处理异常和返回类型上有不同的行为。

下面详细解释这两个方法的区别:

1、get() 方法

  • 签名V get() throws InterruptedException, ExecutionException;
  • 功能:阻塞当前线程直到 CompletableFuture 完成,并返回结果。
  • 异常处理
    • 如果任务被中断,则抛出 InterruptedException
    • 如果任务执行过程中抛出了异常,则封装为 ExecutionExceptionCompletionException 并抛出。
try {
    String result = future.get(); // 阻塞直到任务完成
} catch (InterruptedException e) {
    Thread.currentThread().interrupt(); // 恢复中断状态
    // 处理中断异常
} catch (ExecutionException e) {
    // 处理执行期间发生的异常
}

2. join() 方法

  • 签名V join();
  • 功能:类似于 get(),它也会阻塞当前线程直到 CompletableFuture 完成,但它不会抛出受检异常(checked exception),而是将所有异常都封装为未受检的 CompletionException
  • 异常处理
    • 如果任务正常完成,join() 返回任务的结果。
    • 如果任务因为异常而失败,join() 将抛出一个包含原始异常的 CompletionException
    • 不会抛出 InterruptedException,即使任务被中断;相反,它会继续等待直到任务完成或遇到其他类型的异常。
String result = future.join(); // 阻塞直到任务完成,不抛出受检异常

使用建议

  • 选择 get() 还是 join()

    • 如果你希望明确区分不同类型的异常并且需要处理 InterruptedException,那么应该使用 get() 方法。
    • 如果你更倾向于简化代码,不需要显式处理 InterruptedException,并且可以接受所有的异常都被封装为 CompletionException,那么可以使用 join() 方法。
  • 性能考虑

    • 在大多数情况下,两者之间的性能差异可以忽略不计,选择哪个方法主要取决于你的异常处理策略和个人偏好。

示例代码

这里有一个简单的例子来展示 get()join() 的用法以及它们如何处理异常:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExample {

    public static void main(String[] args) {
        // 创建一个可能会失败的 CompletableFuture
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("Task failed!");
            }
            return "Success!";
        });

        try {
            // 使用 get() 方法
            System.out.println("Using get(): " + future.get());
        } catch (InterruptedException | ExecutionException e) {
            System.err.println("Error using get(): " + e.getCause());
        }

        // 使用 join() 方法
        try {
            System.out.println("Using join(): " + future.join());
        } catch (CompletionException e) {
            System.err.println("Error using join(): " + e.getCause());
        }
    }
}

在这个例子中,你可以看到当 CompletableFuture 因为异常而失败时,get()join() 方法都会捕获到异常,

  • 但是 get() 抛出了 ExecutionException
  • join() 则抛出了 CompletionException

学习参考

  • 异步编程利器:CompletableFuture详解 |Java 开发实战 - 掘金 (juejin.cn)
  • CompletableFuture 详解 | JavaGuide(Java面试 + 学习指南)

http://www.kler.cn/a/473067.html

相关文章:

  • spring mvc源码学习笔记之九
  • mysql -> 达梦数据迁移(mbp大小写问题兼容)
  • PHP进阶-在Ubuntu上搭建LAMP环境教程
  • go语言学习 笔记 1(变量,语法,数据类型)
  • flink cdc oceanbase(binlog模式)
  • Linux内核 -- Mailbox Subsystem 之 devm_mbox_controller_register 的作用与使用示例
  • JavaScript 正则表达式
  • 代码随想录算法训练营第1天(数组1)| 704. 二分查找、27. 移除元素、977.有序数组的平方
  • 【数据库】SQL相关知识点总结1(数据库约束、三大范式、关系模型、聚合函数)
  • 为什么页面无法正确显示?都有哪些HTML和CSS相关问题?
  • PHP语言的函数实现
  • svelte5中使用react组件
  • 跨界融合:人工智能与区块链如何重新定义数据安全?
  • MATLAB语言的软件工程
  • c#13新特性
  • 推动多语言语音科技迈向新高度:INTERSPEECH 2025 ML-SUPERB 2.0 挑战赛
  • JAVA常见问题解答
  • 【LeetCode Hot100 贪心算法】 买卖股票的最佳时机、跳跃游戏、划分字母区间
  • 【网络云SRE运维开发】2025第2周-每日【2025/01/08】小测-【第8章 STP生成树协议】理论和实操
  • 【Linux】shell脚本编程
  • 详解opencv resize之INTER_LINEAR和INTER_AREA
  • 用户注册模块(芒果头条项目进度4)
  • JVM三JVM虚拟机
  • 战地雷达通信系统中无人机与特种车辆智能组网及雷达通信一体化研究报告
  • UE蓝图节点备忘录
  • C++ 泛型编程:动态数据类模版类内定义、类外实现