当前位置: 首页 > article >正文

多线程-CompletableFuture

简介

CompletableFuture:异步任务编排工具。java 8中引入的一个类,位于juc包下,是Future的增强版。它可以让用户更好地构建和组合异步任务,避免回调地狱。

在CompletableFuture中,如果用户没有指定执行异步任务时的线程池,默认使用ForkJoinPool中的公共线程池。

使用案例

简单使用

几个入门案例,学习如何使用CompletableFuture提交异步任务并行接收返回值

提交异步任务 supplyAsync

提交一个异步任务,然后阻塞地获取它的返回值

@Test
public void test1() throws ExecutionException, InterruptedException {
    // 提交异步任务
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "[" + Thread.currentThread().getName() + "] " + "task1";
    });

    String s = future.get();  // 阻塞地获取异步任务的执行结果
    System.out.println("s = " + s);  // [ForkJoinPool.commonPool-worker-9] task1
}

阻塞地等待异步任务完成 join

join方法和get方法,都是阻塞地等待异步任务执行完成,然后获取返回值,只不过对于异常的处理不同,推荐使用get方法来获取返回值。

@Test
public void test2() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "[" + Thread.currentThread().getName() + "] " + "task2";
    });

    System.out.println(future.get());  // [ForkJoinPool.commonPool-worker-9] task2
}

消费异步任务的结果 thenAccept

thenAccept函数接收一个Consumer类型的实例作为参数,它会消费异步任务的执行结果,然后返回一个CompletableFuture<Void>,实际上没有必要处理它的返回值。

@Test
public void test8() throws InterruptedException {
    // 异步任务
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "[" + Thread.currentThread().getName() + "] " + "task2";
    });

    // 使用thenAccept,提供一个回调函数,消费异步任务的结果
    future.join();
    future.thenAccept(System.out::println);  // [ForkJoinPool.commonPool-worker-9] task2
}

异步任务编排

到这里开始,开始涉及到异步任务编排,在一个异步任务之后启动另一个异步任务,或者在多个异步任务之后另一个异步任务。

具有依赖关系的异步任务,根据异步任务的依赖数量,可以把异步任务分为 零依赖、一元依赖、二元依赖和多元依赖。

一元依赖

thenApply

thenApply方法,接收一个Function作为参数,返回另一个future。一个异步任务依赖另一个异步任务

@Test
public void test3() {
    // 异步任务1
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "[" + Thread.currentThread().getName() + "] " 
          + "[" + (System.currentTimeMillis() / 1000) + "]" + "任务1";
    });
  
    // 异步任务2
    CompletableFuture<String> future1 = future.thenApply(s -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return s + "," + "[" + (System.currentTimeMillis() / 1000) + "]" +  "任务2";
    });

    future1.join();
    // 注意结果中异步任务1的时间和异步任务2的时间,说明异步任务2是在异步任务执行完之后触发的
    // [ForkJoinPool.commonPool-worker-1] [1732975152]任务1,[1732975155]任务2
    future1.thenAccept(System.out::println);
}

thenAccept方法接收一个Consumer,没有返回值,thenApply接收一个Function,有返回值

thenCompose

从语义上看,thenCompose方法接收一个异步任务作为参数,thenApply方法接收一个普通任务作为参数,选择 thenApply还是 thenCompose 取决于用户的需求,如果新任务是另一个异步任务,选择thenCompose,如果新任务只是消费上一个异步任务的结果,然后返回消费结果,选择thenApply。

@Test
public void test9() {
    // 异步任务1
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "[" + Thread.currentThread().getName() + "] " +
                "[" + (System.currentTimeMillis() / 1000) + "]" + "任务1";
    });
    
    // 异步任务2
    CompletableFuture<String> future1 = future.thenCompose(s -> CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return s + "\n" +
                "[" + Thread.currentThread().getName() + "] " +
                "[" + (System.currentTimeMillis() / 1000) + "]" + "任务2";
    }));
    future1.join();
    //[ForkJoinPool.commonPool-worker-9] [1727663353]任务1
    //[ForkJoinPool.commonPool-worker-9] [1727663356]任务2
    future1.thenAccept(System.out::println);
}

二元依赖

thenCombine

融合当前异步任务(调用thenCombine的CF实例)和另一个异步任务(other)的结果,thenCombine() 会在两个任务都执行完成后,把两个任务的结果合并。两个任务是并行执行的,它们之间并没有先后依赖顺序。

@Test
public void test9() {
    System.out.println("[" + Thread.currentThread().getName() + "] " +
            "[" + System.currentTimeMillis() / 1000 + "]" + "任务0");
    // 异步任务1
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "[" + Thread.currentThread().getName() + "] " +
                "[" + System.currentTimeMillis() / 1000 + "]" + "任务1";
    });

    // 异步任务2
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "[" + Thread.currentThread().getName() + "] " +
                "[" + System.currentTimeMillis() / 1000 + "]" + "任务2";
    });

    // 融合异步任务1、2的结果,生成任务3
    CompletableFuture<String> future3 = future.thenCombine(future2,
            (s1, s2) -> {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return s1 + "\n" + s2 + "\n" +
                        "[" + Thread.currentThread().getName() + "] " +
                        "[" + System.currentTimeMillis() / 1000 + "]" + "任务3";
            });

    future3.join();

    // 任务1暂停5秒、任务2暂停2秒、任务3暂停3秒,任务3依赖任务1和任务2,可以看到,
    // 任务1和任务2的执行是互不影响的,等到任务1、任务2都执行完成,任务3才执行
    // [main] [1734753154]任务0
    // [ForkJoinPool.commonPool-worker-1] [1734753159]任务1
    // [ForkJoinPool.commonPool-worker-2] [1734753156]任务2
    // [ForkJoinPool.commonPool-worker-1] [1734753162]任务3
    future3.thenAccept(System.out::println);
}

多元依赖

allOf

allOf方法:多个异步任务都完成,才执行下一个异步任务

// allOf 方法接受一个 CompletableFuture 的可变参数列表,并返回一个新的 CompletableFuture,
// 当所有给定的 CompletableFuture 都完成时,新的 CompletableFuture 才会完成。这个方法通常
// 用于等待多个异步任务都完成。
@Test
public void test5() {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "[" + Thread.currentThread().getName() + "] " + 
               "[" + System.currentTimeMillis() / 1000 + "]" + "任务1";
    });

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "[" + Thread.currentThread().getName() + "] " + 
               "[" + System.currentTimeMillis() / 1000 + "]" + "任务2";
    });

    // allOf方法,创建一个新的future,它会等到前面两个异步任务完成后再执行
    CompletableFuture<Void> future3 = CompletableFuture.allOf(future, future2);
    CompletableFuture<String> future4 = future3.thenApply(v -> {
        // 在allOf方法返回的异步任务中,它需要调用join方法,来获取之前异步任务的结果
        String s1 = future.join();
        String s2 = future2.join();
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return s1 + "\n" + s2 + "\n" + 
                "[" + Thread.currentThread().getName() + "] " + 
                "[" + System.currentTimeMillis() / 1000 + "]" + "任务3";
    });

    future4.join(); // 阻塞当前线程
    future4.thenAccept(System.out::println);
}
anyOf

多个异步任务中只要有一个完成,就执行下一个异步任务

// anyOf方法
@Test
public void test6() {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "[" + Thread.currentThread().getName() + "] " 
          + "[" + System.currentTimeMillis() / 1000 + "]" + "任务1";
    });

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "[" + Thread.currentThread().getName() + "] " 
          + "[" + System.currentTimeMillis() / 1000 + "]" + "任务2";
    });

    CompletableFuture<Object> future3 = CompletableFuture.anyOf(future, future2);
    future3.join();
    // 任务2比任务1先结束,所以这里只获取到了任务2的结果
    // [ForkJoinPool.commonPool-worker-2] [1727662633]任务2
    future3.thenAccept(System.out::println);
}

异常处理

handle

handle 方法是一个双目方法,它接受一个 BiFunction 参数,该函数有两个参数:一个是异步任务的结果,另一个是异常对象。无论 CompletableFuture 正常完成还是异常完成,都会调用 handle 方法。

正常完成:

@Test
public void test7() {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "[" + Thread.currentThread().getName() + "] " +
                "[" + System.currentTimeMillis() / 1000 + "]" + "任务1";
    });

    CompletableFuture<String> future2 = future.handle((s, e) -> {
        if (e != null) {
            return "Error: " + e.getMessage();
        }
        return s + " handle";
    });
    future2.join();
    // [ForkJoinPool.commonPool-worker-9] [1727662978]任务1 handle
    future2.thenAccept(System.out::println);
}

exceptionally

exceptionally 方法是一个单目方法,它接受一个 Function 参数,该函数只处理异常情况。当 CompletableFuture异常完成时,exceptionally 方法会被调用。

@Test
public void test8() {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
            int i = 10 / 0;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return null;
    });

    CompletableFuture<String> future2 = future.exceptionally(e -> "Error: " + e.getMessage());
    future2.join();
    // Error: java.lang.ArithmeticException: / by zero
    future2.thenAccept(System.out::println);
}

API总结:

  • 零依赖:supplyAsync,直接提交

  • 一元依赖:thenApply、thenAccept、thenCompose

  • 二元依赖:thenCombine

  • 多元依赖:allOf、anyOf

使用时的注意事项

1、指定线程池:使用案例中的代码都没有指定线程池,但是实际使用过程中,最好指定线程池。

2、带有Async后缀的方法:CompletableFuture中提供的API,带有Async后缀的方法和普通方法,例如,thenApply和thenApplyAsync,thenApply使用和上一个任务一样的线程来执行异步任务,thenApplyAsync则使用新线程来执行异步任务,推荐使用带有Async后缀的方法。

源码分析

CompletableFuture使用一个栈来存储当前异步任务之后要触发的任务,栈使用的数据结构是单向链表。调用thenApply等方法编排异步任务时,实际上是向上一个任务的stack属性中注入当前任务,上一个任务结束后,会获取stack属性中的任务,然后继续执行,依次类推,直到stack属性为空

整体结构

CompletableFuture的继承体系

它实现了Future接口和CompletionStage接口:

  • Future代表异步计算的结果,只能通过阻塞或者轮询的方式获取结果,而且不支持设置回调方法
  • CompletionStage代表异步计算的阶段,定义了在异步任务完成之后要触发的操作,这个操作可以是一个普通任务,也可以是一个异步任务

Future:

public interface Future<V> {
    // 取消异步任务
    boolean cancel(boolean mayInterruptIfRunning);
    // 判断异步任务是否取消
    boolean isCancelled();
    // 判断异步任务是否结束
    boolean isDone();

    // 获取异步任务的结果
    V get() throws InterruptedException, ExecutionException;
    // 获取异步任务的结果,并且指定超时时间
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

CompletionStage:代表异步计算的一个阶段,定义了在一个异步任务完成之后执行的操作,之前在入门案例中演示的thenApply方法、thenCompose方法,就是定义在这个接口中

public interface CompletionStage<T> {
    
    /* 一元依赖的异步任务 */
    // 创建一个新的异步任务,使用Function接口来消费异步任务的结果
    public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
    // 创建一个新的异步任务,使用Consumer接口来消费异步任务的结果
    public CompletionStage<Void> thenAccept(Consumer<? super T> action);
    // 创建一个新的异步任务
    public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
    // ... 
}

CompletionFuture中的属性

CompletionFuture中只有两个属性:result、stack,result存储当前异步任务的结果,stack存储下一个要被触发的异步任务,stack是一个单向链表

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    /* 下面两个成员变量都是通过cas算法来操作的,所有的方法都是在操作这两个变量,通过这两个变量实现了异步任务编排 */
  
    volatile Object result;  // 存储当前异步任务的结果
  
    // 存储当前异步任务结束后需要触发的后续操作,它是一个单向链表,
    // 相当于观察者模式中的观察者,当前的CompletableFuture实例就是被观察者(主题)
    volatile Completion stack; 
  
    // 静态内部类,要执行的操作
    abstract static class Completion extends ForkJoinTask<Void>
        // AsynchronousCompletionTask,没有方法的接口,用于debug时标识一个异步任务
        implements Runnable, AsynchronousCompletionTask {

        volatile Completion next;      // 单向链表,指向下一个异步任务
    
        // 尝试触发异步任务
        abstract CompletableFuture<?> tryFire(int mode);
    }
}

stack属性的数据结构 Completion

Completion是CompletableFuture的静态内部类,可以把它理解为是异步任务要执行的操作,它存储了:

  • 操作,也就是用户要异步执行的任务

  • 操作对应的CompletableFuture实例

  • 操作依赖的CompletableFuture实例

  • 操作完成后需要触发的CompletableFuture实例

Completion比较复杂的地方在于它有多个子类,每个子类都有不同的功能,在前面使用案例中演示的每个API,它的内部都使用了不同的子类,代表不同的操作,所以这里介绍一下Completion和它的继承体系

Completion:顶层类,只定义了指向下一个异步任务的属性 next

// Completion,静态内部类,在继承体系中位于顶层
abstract static class Completion extends ForkJoinTask<Void>
    implements Runnable, AsynchronousCompletionTask {

    volatile Completion next;      // 下一个异步任务
}

// 一个没有方法的接口,用于标识某个被async方法创建的实例是异步任务,用于debug、监控
public static interface AsynchronousCompletionTask { }

有一个依赖的异步任务:UniCompletion、UniApply,UniCompletion是基类,存储了单个依赖的共有属性,UniApply是具体实现,是thenApply方法对应的内部类。

abstract static class UniCompletion<T,V> extends Completion {
    Executor executor;                 // 线程池
    // 当前异步任务对应的CompletableFuture实例
    CompletableFuture<V> dep;
    // 当前任务的依赖。具体而言,就是只有在src对应的异步任务执行完成后,才可以执行dep对应的异步任务,也就是当前任务,
    // 执行异步任务的逻辑中会判断前一个异步任务是否完成,并且获取它的结果。
    CompletableFuture<T> src;

    UniCompletion(Executor executor, CompletableFuture<V> dep,
                  CompletableFuture<T> src) {
        this.executor = executor; this.dep = dep; this.src = src;
    }

    // 确保异步任务只执行一次,它会通过cas算法修改状态变量,
    final boolean claim() {
        Executor e = executor;
        if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { // 这个方法继承自ForkJoinTask
            if (e == null)
                return true;
            executor = null; // disable
            e.execute(this);
        }
        return false;
    }

    // 判断当前异步任务是否还存活:dep不为null,就是还存活
    final boolean isLive() { return dep != null; }
}

// UniApply,UniCompletion的子类,定义了异步任务的执行逻辑,thenApply方法对应的内部类,
static final class UniApply<T,V> extends UniCompletion<T,V> {
    Function<? super T,? extends V> fn;  // 要异步执行的任务
}

有两个依赖的异步任务:BiCompletion、BiApply,类似于上面提到的,BiCompletion是基类,存储了共有属性,BiApply是具体实现

// BiCompletion
abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> {
    CompletableFuture<U> snd; // 异步任务的第二个依赖
}

// BiApply
static final class BiApply<T,U,V> extends BiCompletion<T,U,V> {
    BiFunction<? super T,? super U,? extends V> fn;  // 要异步执行的任务
}

AsyncSupply:不属于Completion的继承体系,存储没有依赖的单个异步任务

static final class AsyncSupply<T> extends ForkJoinTask<Void>
        implements Runnable, AsynchronousCompletionTask {
    CompletableFuture<T> dep;  // 异步任务对应的CompletableFuture实例
    Supplier<T> fn;            // 异步任务要执行的计算
}

总结

在介绍CompletableFuture的工作机制之前,先总结一下它的数据结构:

CompletableFuture:代表一个异步任务,它存储了这个异步任务的执行结果、执行完之后要触发的下一个异步任务,同时提供了面向用户的API

  • 它的属性 Object result、Completion stack

Completion:存储了一个异步任务执行过程中需要用到的全部信息,包括它的依赖、它要触发的下一个异步任务、异步任务要执行的计算、异步任务对应的CompletableFuture实例

  • 它的属性:

    • Function function:异步任务要进行的计算

    • CompletableFuture dep:异步任务对应的CompletableFuture实例

    • CompletableFuture src:异步任务的第一个依赖

    • CompletableFuture snd:异步任务的第二个依赖,// 一元依赖的异步任务中没有这个属性

    • CompletableFuture next:要被触发的下一个异步任务

可以看到,CompletableFuture和Completion互相持有对方的实例。

在了解了CompletableFuture的基本结构之后,接下来的问题是:

  • 两个有依赖关系的异步任务是如何被编排在一起的?
  • 上一个异步任务结束之后如何触发下一个异步任务?

哪个线程执行异步任务?

先了解一下异步任务在哪个线程执行。

CompletableFuture使用一个线程池来执行异步任务。用户提交异步任务后,如果没有指定线程池,那么使用默认线程池。如果当前环境下可用cpu核心数大于1,使用的线程池是ForkJoinPool,否则每次提交任务它都会创建一个新的线程(ThreadPerTaskExecutor)

确认使用哪个线程池的源码:

  • ForkJoinPool中公共线程池的并行度默认是CPU个数减1,用户也可以通过jvm参数指定
  • 如果公共线程池的并行度小于等于0,使用单线程线程池
// 判断使用哪个线程池
private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1);
private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

/* 一、ForkJoinPool中判断当前公共线程池的并行度: */
// 1. 首先从环境变量中获取一个指定的属性 parallelism
String pp = System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism");
int parallelism = -1;
if (pp != null) {
  parallelism = Integer.parseInt(pp);
}
// 2. 如果属性为空,获取当前环境下的CPU数个数,如果小于等于1个,那么公共线程池的并行度就是1,否则就是CPU数减1
if (parallelism < 0 && // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) {
   parallelism = 1;
}

// 二、单线程线程池,它的实现特别简单,为每个任务创建一个新的线程
static final class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) { new Thread(r).start(); }
}

工作机制

单个异步任务的执行

提交异步任务 supplyAsync方法

整体流程:

// 整体流程:这里省略了前面的方法调用
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                 Supplier<U> f) {
    if (f == null) throw new NullPointerException();
  
    // 第一步:创建CompletableFuture实例,向线程池提交任务
    CompletableFuture<U> d = new CompletableFuture<U>();
    // 第二步:线程池执行异步任务,异步任务编排就实现在AsyncSupply中,这里专门针对supplyAsync的内部类
    e.execute(new AsyncSupply<U>(d, f));
    // 第三步:返回CompletableFuture实例
    return d;
}

整体流程中的第二步:执行异步任务

// 要点1:AsyncSupply,用户调用supplyAsync方法时执行的内部类,异步任务编排的核心流程
static final class AsyncSupply<T> extends ForkJoinTask<Void>
        implements Runnable, AsynchronousCompletionTask {

    CompletableFuture<T> dep;   // 当前CompletableFuture的实例
    Supplier<T> fn;             // 需要在线程池中执行的任务

    // 参数1,当前CompletableFuture的实例,用于存储异步任务的返回值并且触发下一个异步任务
    // 参数2,需要在线程池中执行的任务
    AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
        this.dep = dep; this.fn = fn;
    }

    // 线程启动后会执行当前实例的run方法,因为它继承了Runnable接口
    public void run() {
        CompletableFuture<T> d; Supplier<T> f;
        if ((d = dep) != null && (f = fn) != null) {
            dep = null; fn = null;
            if (d.result == null) {
                try {
                    // 这里做了两个事情:
                    // 1. 执行异步任务,也就是Supplier接口中提供的函数
                    // 2. 将异步任务的返回值设置到当前CompletableFuture实例中
                    d.completeValue(f.get());
                } catch (Throwable ex) {
                    // 如果出现异常,设置异常返回值
                    d.completeThrowable(ex);
                }
            }
            // 触发下一个异步任务
            d.postComplete();
        }
    }
}

可以看到,AsyncSupply类继承了Runnable接口,持有当前CompletableFuture实例和要异步执行的任务,在run方法中,执行任务,然后调用CompletableFuture实例中的completeValue方法,设置返回值。

返回值如何被设置的?通过cas操作,将任务的结果设置到当前CompletableFuture实例的result属性中,如果任务正常结束,正常设置结果,如果任务抛出异常,把异常封装到一个Exception中

// 这是上一步中调用的

// 分支一:设置返回值
final boolean completeValue(T t) {
    // 通过cas操作来更新当前CompletableFuture实例中的result
    return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                       (t == null) ? NIL : t);
}

// 分支二:如果出现异常,设置异常返回值
final boolean completeThrowable(Throwable x) {
    // 也是通过cas操作,更新当前CompletableFuture实例中的result,只不过返回值是被包装到了AltResult中
    return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                       encodeThrowable(x));
}
static AltResult encodeThrowable(Throwable x) {
    return new AltResult((x instanceof CompletionException) ? x :
                         new CompletionException(x));
}
获取异步任务的返回值 get

如何获取异步任务的返回值?调用get方法或join方法,它们都会获取返回值。

整体流程:

// get方法
public T get() throws InterruptedException, ExecutionException {
    Object r;
    // 如果当前result实例等于null,调用waitingGet方法,阻塞地获取返回值,否则调用reportGet方法,
    // 上报返回值
    return reportGet((r = result) == null ? waitingGet(true) : r);
}

// waitingGet方法:阻塞地获取返回值,将返回值写到result变量中
private Object waitingGet(boolean interruptible) {
    // 信号器,负责线程的阻塞和唤醒
    Signaller q = null;
    // 当前线程是否入队
    boolean queued = false;
    // 旋转次数
    int spins = -1;
    Object r;
    // 判断:result实例是否等于null,如果等于null,需要阻塞地获取返回值
    while ((r = result) == null) {
        // 先自旋:spins,它是自旋,自旋次数是CPU个数的8次方以上,因为自旋次数减减的
        // 之前,会生成一个随机数,生成的随机数大于等于0,自旋次数才会减1。如果自旋结束之后
        // 还没有得到结果,会进入阻塞队列。
        if (spins < 0)
            spins = SPINS;   // SPINS = (Runtime.getRuntime().availableProcessors() > 1 ? 1 << 8 : 0) 256次
        else if (spins > 0) {
            if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                --spins;
        }
        else if (q == null)
            // 创建一个信号器,用于阻塞和唤醒线程
            q = new Signaller(interruptible, 0L, 0L);
        else if (!queued)
            // 如果信号器还没有入队,将信号器设置到当前CompletableFuture实例的stack属性中
            queued = tryPushStack(q);
        else if (interruptible && q.interruptControl < 0) {
            // 如果当前线程被打断,放弃阻塞,清理stack变量,返回null
            q.thread = null;
            cleanStack();
            return null;
        }
        // 线程进入阻塞,底层调用LockSupport的park方法,等待异步任务执行完成之后唤醒当前线程
        else if (q.thread != null && result == null) {
            try {
                ForkJoinPool.managedBlock(q);
            } catch (InterruptedException ie) {
                q.interruptControl = -1;
            }
        }
    }
    // 如果信号器不等于null
    if (q != null) {
        q.thread = null;
        if (q.interruptControl < 0) {
            if (interruptible)
                r = null; // report interruption
            else
                Thread.currentThread().interrupt();
        }
    }
    // 触发后续任务
    postComplete();
    // 返回异步任务的结果,result的变量是由其它线程设置的,当前线程只需要阻塞地获取它。
    return r;
}

1、信号器入栈的方法:

// 将信号器设置到CompletableFuture的stack属性中
final boolean tryPushStack(Completion c) { // c是信号器实例
    Completion h = stack;  // stack,当前CompletableFuture实例的stack属性
  
    /* lazySetNext方法加下面的cas操作,相当于将信号器入栈,信号器链接到头结点并且成为新的头结点 */
    lazySetNext(c, h);
    return UNSAFE.compareAndSwapObject(this, STACK, h, c); 
}

static void lazySetNext(Completion c, Completion next) {
    UNSAFE.putOrderedObject(c, NEXT, next);  // Completion的next属性指向下一个Completion,它们之间构成单向链表
}

2、阻塞当前线程的方法:

public static void managedBlock(ManagedBlocker blocker) throws InterruptedException {
    ForkJoinPool p;
    ForkJoinWorkerThread wt;
    Thread t = Thread.currentThread();
    if ((t instanceof ForkJoinWorkerThread) &&
         // 省略代码
    }
    else {
        // 阻塞当前线程。isReleasebale判断阻塞是否被释放,如果没有,继续进入阻塞状态
        do {} while (!blocker.isReleasable() &&
                     !blocker.block());
    }
}

// 阻塞当前线程,线程唤醒后,判断阻塞是否可以释放
public boolean block() {
    if (isReleasable())
        return true;
    else if (deadline == 0L)
        LockSupport.park(this);
    else if (nanos > 0L)
        LockSupport.parkNanos(this, nanos);
    return isReleasable();
}

// 阻塞是否可以释放:信号器中的线程实例为null、或者线程被打断、或者超过指定时间,都算阻塞结束
public boolean isReleasable() {
    // 信号器中的线程实例为null
    if (thread == null)
        return true;
    // 当前线程被打断
    if (Thread.interrupted()) {
        int i = interruptControl;
        interruptControl = -1;
        if (i > 0)
            return true;
    }
    // 过了超时时间
    if (deadline != 0L &&
        (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
        thread = null;
        return true;
    }
    return false;
}

3、上报异步任务的返回值:

// reportGet方法,处理异步任务的返回值
private static <T> T reportGet(Object r)
    throws InterruptedException, ExecutionException {
    // 如果返回值为空
    if (r == null) // by convention below, null means interrupted
        throw new InterruptedException();
    // 如果返回一个异常实例
    if (r instanceof AltResult) {
        Throwable x, cause;
        if ((x = ((AltResult)r).ex) == null)
            return null;
        if (x instanceof CancellationException)
            throw (CancellationException)x;
        if ((x instanceof CompletionException) &&
            (cause = x.getCause()) != null)
            x = cause;
        throw new ExecutionException(x);
    }
    // 如果返回一个正常结果,将结果进行泛型转换,返回给调用者
    @SuppressWarnings("unchecked") T t = (T) r;
    return t;
}

获取返回值的时候,get方法中,会先判断有没有返回值,如果没有,自旋,自旋次数是CPU个数的8次方,如果还没有返回值,进入阻塞队列。

4、线程阻塞后在什么地方被唤醒?在postComplete方法中

// 参考异步任务的执行过程,执行完之后,会调用CompletableFuture的postComplete方法,
// 触发STACK属性中存储的下一个任务
final void postComplete() {
    CompletableFuture<?> f = this; Completion h;
    while ((h = f.stack) != null ||  // 判断stack属性中是否有下一个任务
           (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; Completion t;
        if (f.casStack(h, t = h.next)) {  // 下一个任务出栈,现在STACK属性中存储下下一个任务
            if (t != null) {  // 下下一个任务不为null
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                h.next = null;    // detach
            }
            f = (d = h.tryFire(NESTED)) == null ? this : d;  // 触发下一个任务的执行
        }
    }
}

// 下一个任务出栈的方法
final boolean casStack(Completion cmp, Completion val) {
    return UNSAFE.compareAndSwapObject(this, STACK, cmp, val);
}

// 下一个任务的执行:在这里下一个任务是信号器 Signaller
final CompletableFuture<?> tryFire(int ignore) {
    Thread w; // no need to atomically claim
    if ((w = thread) != null) {
        thread = null;
        LockSupport.unpark(w);  // 它会唤醒信号器中存储的线程
    }
    return null;
}

一元依赖

这里以thenApply为例。

整体流程:

private <V> CompletableFuture<V> uniApplyStage(
    Executor e, Function<? super T,? extends V> f) {
    if (f == null) throw new NullPointerException();
  
    CompletableFuture<V> d =  new CompletableFuture<V>();
    // 第一步:执行异步任务,如果当前任务可以直接执行的话,就直接执行
    if (e != null || !d.uniApply(this, f, null)) {  // 参数this是上一个异步任务
        // 第二步:当前任务无法直接执行,创建UniApply实例
        UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);

        // 第三步:将当前实例推入栈中,表示上一个异步任务执行完成后触发
        push(c);  // this是上一个异步任务,所以push方法是由上一个异步任务的实例调用的
        // 尝试触发异步任务
        c.tryFire(SYNC);
    }
    return d;
}

1、异步任务的执行逻辑

// uniApply方法:执行异步任务,参数a是上一个异步任务,需要判断上一个异步任务有没有执行完成并且获取它的执行结果
final <S> boolean uniApply(CompletableFuture<S> a,
                           Function<? super S,? extends T> f,
                           UniApply<S,T> c) {  // 参数c是当前异步任务实例
    Object r; Throwable x;
    // 如果上一个异步任务没有结果,返回false
    if (a == null || (r = a.result) == null || f == null)
        return false;

    // 1、判断当前异步任务是否可以执行:如果当前异步任务没有结果
    tryComplete: if (result == null) {
        // 如果上一个异步任务是异常结束的
        if (r instanceof AltResult) {
            if ((x = ((AltResult)r).ex) != null) {
                completeThrowable(x, r);
                break tryComplete;  // 退出循环,不执行当前异步任务
            }
            r = null;
        }
        try {
            // 2、确保异步任务只会被执行一次
            if (c != null && !c.claim())
                return false;
          
            // 3、执行异步任务,并且上报结果
            @SuppressWarnings("unchecked") S s = (S) r;
            completeValue(f.apply(s));
        } catch (Throwable ex) {
            completeThrowable(ex);
        }
    }
    return true;
}

2、如果异步任务不可以直接执行,需要把异步任务push到上一个异步任务的STACK属性中

// 将新的异步任务推入栈中,参数c是当前异步任务
final void push(UniCompletion<?,?> c) {
    if (c != null) {
        // 如果当前异步任务没有结束,将异步任务推入栈中,这里是推入栈顶
        while (result == null && !tryPushStack(c))
            // 失败时清除,将新异步任务的next值设为null
            lazySetNext(c, null); // clear on failure
    }
}

3、再次尝试触发异步任务

// 尝试触发异步任务
final CompletableFuture<V> tryFire(int mode) {
    CompletableFuture<V> d; CompletableFuture<T> a;
  
    if ((d = dep) == null ||
        // 执行异步任务
        !d.uniApply(a = src, fn, mode > 0 ? null : this))
        return null;

    // 执行成功后,属性置空
    dep = null; src = null; fn = null;
    // 如果当前异步任务执行成功,触发后续任务
    return d.postFire(a, mode);
}

4、触发下一个异步任务

// 触发下一个异步任务,这里会触发前一个异步任务的后序任务和当前异步任务的后序任务
final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
    // 会判断前一个异步任务和当前异步任务有没有下一个任务。
  
    // a是前一个任务
    if (a != null && a.stack != null) {
        if (a.result == null)
            a.cleanStack();
        else if (mode >= 0)
            a.postComplete();
    }
    // this是当前任务
    if (result != null && stack != null) {
        if (mode < 0)
            return this;
        else
            postComplete();
    }
    return null;
}

// postComplete
final void postComplete() {
    CompletableFuture<?> f = this; Completion h;
    // 如果当前异步任务有下一个任务
    while ((h = f.stack) != null ||
           (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; Completion t;
        // 使用下下一个异步任务,代替下一个任务,赋值给stack变量
        if (f.casStack(h, t = h.next)) {
            if (t != null) {
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                h.next = null;    // detach
            }
            // 执行下一个异步任务,然后接收它的返回值
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}
如何确保异步任务只执行一次

claim方法

final boolean claim() {
    Executor e = executor;
    // 这个方法来自ForkJoinTask,用cas算法来操作任务实例中的status字段,保证任务只会被执行一次
    if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
        // 如果当前异步任务没有可用的线程池,返回true,由当前线程执行
        if (e == null)
            return true;
        // 把异步任务提交给线程池执行
        executor = null; // disable
        e.execute(this);
    }
    return false;
}

二元依赖

这里以thenCombine方法为例

整体流程:

private <U,V> CompletableFuture<V> biApplyStage(
    Executor e, CompletionStage<U> o,
    BiFunction<? super T,? super U,? extends V> f) {
  
    CompletableFuture<U> b;
    if (f == null || (b = o.toCompletableFuture()) == null)
        throw new NullPointerException();
    CompletableFuture<V> d = new CompletableFuture<V>();
    // biApply方法中的参数this、b,是当前异步任务依赖的前两个异步任务
    if (e != null || !d.biApply(this, b, f, null)) {
        BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);
        bipush(b, c);  // this和b是依赖,c是要被触发的异步任务
        c.tryFire(SYNC);
    }
    return d;
}

执行逻辑:

final <R,S> boolean biApply(CompletableFuture<R> a,
                            CompletableFuture<S> b,
                            BiFunction<? super R,? super S,? extends T> f,
                            BiApply<R,S,T> c) {
    Object r, s; Throwable x;
    // 关键逻辑,判断当前异步任务所依赖的两个异步任务是否完成,如果没有,退出
    if (a == null || (r = a.result) == null ||
        b == null || (s = b.result) == null || f == null)
        return false;
    tryComplete: if (result == null) {
        if (r instanceof AltResult) {
            if ((x = ((AltResult)r).ex) != null) {
                completeThrowable(x, r);
                break tryComplete;
            }
            r = null;
        }
        if (s instanceof AltResult) {
            if ((x = ((AltResult)s).ex) != null) {
                completeThrowable(x, s);
                break tryComplete;
            }
            s = null;
        }
        try {
            if (c != null && !c.claim())
                return false;
            @SuppressWarnings("unchecked") R rr = (R) r;
            @SuppressWarnings("unchecked") S ss = (S) s;
            completeValue(f.apply(rr, ss));
        } catch (Throwable ex) {
            completeThrowable(ex);
        }
    }
    return true;
}

总结

CompletableFuture存储了计算逻辑,Completion存储了计算过程中需要用到的数据。

两个有依赖关系的异步任务是如何被编排在一起的?上一个异步任务结束之后如何触发下一个异步任务?通过CompletableFuture的stack属性,当前异步任务执行完之后,会获取stack属性中的值,这个值就是下一个需要计算的异步任务

参考:
https://tech.meituan.com/2022/05/12/principles-and-practices-of-completablefuture.html
https://www.cnblogs.com/Createsequence/p/16963895.html
https://javaguide.cn/java/concurrent/completablefuture-intro.html


http://www.kler.cn/a/574021.html

相关文章:

  • AI推理革新:Dynasor-CoT如何提升复杂任务效率
  • 【AI学习从零至壹】Pytorch逻辑回归
  • FreeRTOS 任务管理与运行时间统计:API 解析与配置实践
  • 模块15.常用API
  • QT.....................................5
  • Redis 脚本:高效数据管理的利器
  • C++ list(双向链表)
  • 决策树(Decision Tree)基础知识
  • 网络安全可以从事什么工作?
  • 洛谷 P1480 A/B Problem(高精度详解)c++
  • 探索自适应学习在企业培训系统中的优势
  • 2025-03-06 学习记录--C/C++-C语言 函数参数传递的两种主要方法
  • NVIDIA Jetson Nano的国产替代,基于算能BM1684X+FPGA+AI算力盒子,支持deepseek边缘部署
  • 用Python分割并高效处理PDF大文件
  • Ubuntu 24.04 配置ODBC连接ORACLE 11G数据库
  • Java 大视界 -- 基于 Java 的大数据分布式任务调度系统设计与实现(117)
  • 力扣132. 分割回文串 II
  • js实现pdf文件路径预览和下载
  • Spring Boot使用JDBC /JPA访问达梦数据库
  • Dify部署踩坑指南(Windows+Mac)