当前位置: 首页 > article >正文

Java中CompletableFuture异步工具类

参考:CompletableFuture 详解 | JavaGuide

实际项目中,一个接口可能需要同时获取多种不同的数据,然后再汇总返回,举个例子:用户请求获取订单信息,可能需要同时获取用户信息、商品详情、物流信息、等数据。

如果是串行(按顺序依次执行每个任务)执行的话,接口的响应速度会非常慢。

考虑到这些任务之间有大部分都是 无前后顺序关联 的,可以 并行执行 ,就比如说调用获取商品详情的时候,可以同时调用获取物流信息。通过并行执行多个任务的方式,接口的响应速度会得到大幅优化。

Future介绍

Future是Java5引入的接口,提供了基本的异步处理功能,但它的局限性在于只能通过 get()方法阻塞获取结果,无法链式调用多个任务,也缺 少异常处理机制。

CompletableFuture 是 Future 的增强版,提供了非阻塞的结果处理、任务组合和异常处理,使得异步编程更加灵活和强大。

在 Java 中,Future 只是一个泛型接口,位于 java.util.concurrent 包下,其中定义了 5 个方法,主要包括下面这 4 个功能:

  • 取消任务;

  • 判断任务是否被取消;

  • 判断任务是否已经执行完成;

  • 获取任务执行结果。

// V 代表了Future执行的任务返回值的类型
public interface Future<V> {
    // 取消任务执行
    // 成功取消返回 true,否则返回 false
    boolean cancel(boolean mayInterruptIfRunning);
    // 判断任务是否被取消
    boolean isCancelled();
    // 判断任务是否已经执行完成
    boolean isDone();
    // 获取任务执行结果
    V get() throws InterruptedException, ExecutionException;
    // 指定时间内没有返回计算结果就抛出 TimeOutException 异常
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutExceptio
}

CompletableFuture是 java 8引入的一个强大的异步工具类。允许非阻塞地处理异步任务,并且可以通过链式调用组合多个异步操作。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
}

核心特性如下:

  1. 异步执行

    通过runAsyncsupplyAsync方法,可以异步地执行任务。

  2. 任务完成回调

    使用thenApply、thenAccept、thenRun等方法,可以在任务完成后执行回调。

  3. 任务组合

    可以将多个 CompletableFuture 组合在一起,通过 thenCombine、thenCompose 等方法,处理多个异步任务之间的依赖关系。

  4. 异常处理

    提供了exceptionally、handle等方法,可以在异步任务发生异常时进行处理。

  5. 并行处理

    可以通过allOfanyOf方法,并行地执行多个异步任务,并在所有任务完成或任意一个任务完成时执行回调。

扩展

创建异步任务
  • 通过 new 关键字

CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
//获取异步计算的结果,调用 get() 方法的线程会 阻塞 直到 CompletableFuture 完成运算
rpcResponse = completableFuture.get();  
// complete() 方法只能调用一次,后续调用将被忽略。
resultFuture.complete(rpcResponse);
​
//如果你已经知道计算的结果的话,可以使用静态方法 completedFuture() 来创建 CompletableFuture 
CompletableFuture<String> future = CompletableFuture.completedFuture("hello!");
assertEquals("hello!", future.get());
  • runAsync: 创建异步任务,不返回结果

  • supplyAsync: 创建异步任务并返回结果

CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {  //前面的这个future1不是用来接收数据的,仅仅表示完成状态
    // 异步任务
});
​
//future2接收异步执行的结果,即里面返回出来的字符串,后续可以利用thenAccept等方法处理这个结果
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { 
    // 异步任务并返回结果
    return "Hello, nihao.com!";
});
任务完成回调
  • thenApply: 在任务完成后可以对任务结果进行转换返回

  • thenAccept: 在任务完成后对结果进行消费,但不返回新结果

  • thenRun: 在任务完成后执行一个操作,但不需要使用任务结果

// 转换任务结果
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
    .thenApply(result -> result + " nihao.com");
    
// 消费任务结果,不返回新结果
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
    .thenAccept(result -> System.out.println(result));
    
// 不消费任务结果,也不返回新结果
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenRun(() -> System.out.println("Task finished"));
任务组合
  • thenCombine: 合并两个CompletableFuture 的结果

  • thenCompose: 将一个CompletableFuture 的结果作为另一个 CompletableFuture 的输入。

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");   //1
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "nihao");   //2
​
//future1.thenComebine(fu2,...) 
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + " " + result2);
​
//将前者的result 传给 后者
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " nihao"));
异常处理
  • exceptionally: 在任务发生异常时提供默认值。

  • handle: 在任务完成或发生异常时进行处理。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (true) {
        throw new RuntimeException("Exception");
    }
    return "Hello";
}).exceptionally(ex -> "nihao");     //提供发生异常时的默认值
​
​
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (true) {
        throw new RuntimeException("Exception");
    }
    return "Hello";
}).handle((result, ex) -> {       //提供发生异常后的处理
    if (ex != null) {
        return "Default Value";
    }
    return result;
});
并行处理
  • allOf: 等待多个CompletableFuture 全部完成

  • anyOf: 任意一个 CompletableFuture 完成时执行操作

CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);
allFutures.thenRun(() -> System.out.println("nihao tasks finished"));    //全部完成在执行
​
​
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
anyFuture.thenAccept(result -> System.out.println("nihao task finished with result: " + result));  //任意一个完成即可执行
使用建议
  • CompletableFuture 默认使用全局共享的 ForkJoinPool.commonPool() 作为执行器,所以应用程序、多个库或框架(如 Spring、第三方库)若都依赖 CompletableFuture,默认情况下它们都会共享同一个线程池。

  • 为避免这些问题,建议为 CompletableFuture 提供自定义线程池,根据任务特性调整线程池大小和队列类型,也更好地处理线程中的异常情况

  • CompletableFutureget()方法是阻塞的,尽量避免使用。如果必须要使用的话,需要添加超时时间,否则可能会导致主线程一直等待,无法执行其他任务。


http://www.kler.cn/a/550372.html

相关文章:

  • 微信云开发小程序音频播放踩坑记录 - 从熄屏播放到iOS静音
  • 碰一碰发视频@技术原理与实现开发步骤
  • 在docker中部署fastdfs一些思考
  • 2步破解官方sublime4最新版本 4192
  • Dest1ny漏洞库: 美团代付微信小程序系统任意文件读取漏洞
  • 基于 Python typing 模块的类型标注
  • 力扣hot100_矩阵_python版本
  • ORB-SLAM3的源码学习:TwoViewReconstruction通过两幅图像来实现重建
  • 2024Selenium自动化常见问题及解决方式!
  • 【云原生】最新版Kubernetes集群基于Containerd部署
  • STM32 PWM脉冲宽度调制介绍
  • 又是阿里云npm install报错:ENOENT: no such file or directory, open ‘/root/package.json‘
  • Kubernetes控制平面组件:etcd常用配置参数
  • 抢占川南数字枢纽高地:树莓集团将翠屏区位优势转为产业胜势
  • JavaScript数组-数组的概念
  • Blackbox.AI:高效智能的生产力工具新选择
  • Web项目测试专题(七)安全性测试
  • (四)Axure学习图文教程
  • 如何提高网站在百度中的权重?
  • RV1126-正点原子