Java中CompletableFuture异步工具类
参考:CompletableFuture 详解 | JavaGuide
实际项目中,一个接口可能需要同时获取多种不同的数据,然后再汇总返回,举个例子:用户请求获取订单信息,可能需要同时获取用户信息、商品详情、物流信息、等数据。
如果是串行(按顺序依次执行每个任务)执行的话,接口的响应速度会非常慢。
考虑到这些任务之间有大部分都是 无前后顺序关联 的,可以 并行执行 ,就比如说调用获取商品详情的时候,可以同时调用获取物流信息。通过并行执行多个任务的方式,接口的响应速度会得到大幅优化。
Future介绍
Future是Java5引入的接口,提供了基本的异步处理功能,但它的局限性在于只能通过 get()方法阻塞获取结果,无法链式调用多个任务,也缺 少异常处理机制。
CompletableFuture 是 Future 的增强版,提供了非阻塞的结果处理、任务组合和异常处理,使得异步编程更加灵活和强大。
在 Java 中,Future
只是一个泛型接口,位于 java.util.concurrent
包下,其中定义了 5 个方法,主要包括下面这 4 个功能:
-
取消任务;
-
判断任务是否被取消;
-
判断任务是否已经执行完成;
-
获取任务执行结果。
// V 代表了Future执行的任务返回值的类型 public interface Future<V> { // 取消任务执行 // 成功取消返回 true,否则返回 false boolean cancel(boolean mayInterruptIfRunning); // 判断任务是否被取消 boolean isCancelled(); // 判断任务是否已经执行完成 boolean isDone(); // 获取任务执行结果 V get() throws InterruptedException, ExecutionException; // 指定时间内没有返回计算结果就抛出 TimeOutException 异常 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutExceptio }
CompletableFuture
是 java 8引入的一个强大的异步工具类。允许非阻塞地处理异步任务,并且可以通过链式调用组合多个异步操作。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { }
核心特性如下:
-
异步执行
通过
runAsync
和supplyAsync
方法,可以异步地执行任务。 -
任务完成回调
使用
thenApply、thenAccept、thenRun
等方法,可以在任务完成后执行回调。 -
任务组合
可以将多个 CompletableFuture 组合在一起,通过 thenCombine、thenCompose 等方法,处理多个异步任务之间的依赖关系。
-
异常处理
提供了
exceptionally、handle
等方法,可以在异步任务发生异常时进行处理。 -
并行处理
可以通过
allOf
和anyOf
方法,并行地执行多个异步任务,并在所有任务完成或任意一个任务完成时执行回调。
扩展
创建异步任务
-
通过 new 关键字
CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>(); //获取异步计算的结果,调用 get() 方法的线程会 阻塞 直到 CompletableFuture 完成运算 rpcResponse = completableFuture.get(); // complete() 方法只能调用一次,后续调用将被忽略。 resultFuture.complete(rpcResponse); //如果你已经知道计算的结果的话,可以使用静态方法 completedFuture() 来创建 CompletableFuture CompletableFuture<String> future = CompletableFuture.completedFuture("hello!"); assertEquals("hello!", future.get());
-
runAsync: 创建异步任务,不返回结果
-
supplyAsync: 创建异步任务并返回结果
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { //前面的这个future1不是用来接收数据的,仅仅表示完成状态 // 异步任务 }); //future2接收异步执行的结果,即里面返回出来的字符串,后续可以利用thenAccept等方法处理这个结果 CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { // 异步任务并返回结果 return "Hello, nihao.com!"; });
任务完成回调
-
thenApply: 在任务完成后可以对任务结果进行转换返回
-
thenAccept: 在任务完成后对结果进行消费,但不返回新结果
-
thenRun: 在任务完成后执行一个操作,但不需要使用任务结果
// 转换任务结果 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello") .thenApply(result -> result + " nihao.com"); // 消费任务结果,不返回新结果 CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello") .thenAccept(result -> System.out.println(result)); // 不消费任务结果,也不返回新结果 CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello") .thenRun(() -> System.out.println("Task finished"));
任务组合
-
thenCombine: 合并两个CompletableFuture 的结果
-
thenCompose: 将一个CompletableFuture 的结果作为另一个 CompletableFuture 的输入。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello"); //1 CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "nihao"); //2 //future1.thenComebine(fu2,...) CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + " " + result2); //将前者的result 传给 后者 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello") .thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " nihao"));
异常处理
-
exceptionally: 在任务发生异常时提供默认值。
-
handle: 在任务完成或发生异常时进行处理。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (true) { throw new RuntimeException("Exception"); } return "Hello"; }).exceptionally(ex -> "nihao"); //提供发生异常时的默认值 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (true) { throw new RuntimeException("Exception"); } return "Hello"; }).handle((result, ex) -> { //提供发生异常后的处理 if (ex != null) { return "Default Value"; } return result; });
并行处理
-
allOf: 等待多个CompletableFuture 全部完成
-
anyOf: 任意一个 CompletableFuture 完成时执行操作
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2); allFutures.thenRun(() -> System.out.println("nihao tasks finished")); //全部完成在执行 CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2); anyFuture.thenAccept(result -> System.out.println("nihao task finished with result: " + result)); //任意一个完成即可执行
使用建议
-
CompletableFuture
默认使用全局共享的ForkJoinPool.commonPool()
作为执行器,所以应用程序、多个库或框架(如 Spring、第三方库)若都依赖CompletableFuture
,默认情况下它们都会共享同一个线程池。 -
为避免这些问题,建议为
CompletableFuture
提供自定义线程池,根据任务特性调整线程池大小和队列类型,也更好地处理线程中的异常情况 -
CompletableFuture
的get()
方法是阻塞的,尽量避免使用。如果必须要使用的话,需要添加超时时间,否则可能会导致主线程一直等待,无法执行其他任务。