当前位置: 首页 > article >正文

CompletableFuture详解

1、概述

咱们都知道可以通过继承Thread类或者实现Runnable接口两种方式实现多线程。但是有时候我们希望得到多线程异步任务执行后的结果,也就是异步任务执行后有返回值,Thread和Runnable是不能实现的。当我们需要返回值的时候怎么办呢? Java 1.5 推出的Callable和Future接口就解决了这个问题。但是因为Future有几个局限,由于这几个局限,在Java1.8就推出了加强版的Future类:CompletableFuture。本篇文章我们通过实际需求、实例代码分析Future缺陷讲解CompletableFuture的设计原理。

2、Future使用

假如我们现在有如下需求:

老板正在开会,开会过程中发现少一份材料,通知秘书去整理,在秘书整理过程中老板这边还在继续开会,秘书整理完以后将材料给到老板手中。

需求分析:

老板开会是主线程,不能中断。

秘书就是异步任务

秘书执行完任务需要将结果返回给老板这个主线程手中。

咱们看看通过Future实现此需求有什么局限,然后再通过CompletableFuture实现此需求看看是否更好。

Future接口(实现类:FutureTask)定义了操作异步任务执行的一些方法:如获取异步任务执行结果、取消任务的执行结果、判断任务是否被取消、判断任务执行是否完成等。

 实现老板开会,秘书整理材料需求方式一代码:

package com.lc;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

/**
 * 老板开会Future实现
 *
 * @author liuchao
 * @date 2023/4/5
 */
public class BossMeeting {

    /**
     * 主线程为老板正在开会
     *
     * @param args
     */
    public static void main(String[] args) {
        System.out.println("老板开会start");
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        FutureTask<String> secretaryFuture = new FutureTask<>(() -> {
            Thread.sleep(1000);
            return "老板需要的材料";
        });

        //老板发现缺少材料,提交异步任务(找秘书)
        executorService.submit(secretaryFuture);

        /**
         * 方法1
         * 局限:导致线程堵塞
         */
        try {
            //获取秘书搜集的材料 (堵塞线程)
            String material = secretaryFuture.get();
            System.out.println("秘书搜集到的材料:" + material);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }

        /**
         * 方法2
         * 通过while轮询方式会消耗cpu
         */
        while (true) {
            if (secretaryFuture.isDone()) {
                try {
                    //获取秘书搜集的材料 (堵塞线程)
                    String material = secretaryFuture.get();
                    System.out.println("秘书搜集到的材料:" + material);
                    break;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } catch (ExecutionException e) {
                    throw new RuntimeException(e);
                }
            } else {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }

            }
        }

        System.out.println("老板开会end");

    }
}

实例代码提供了两种方式获取秘书搜集到的材料,都是有局限性并且堵塞了主线程。

通过现实需求分析,老板开会能一直等着秘书将材料整理完再继续吗,显然是不行的。

现实情况是秘书(异步任务)执行完任务后,主动告知老板(主线程)。

Future使用局限性汇总:

  • Future的get方法会导致主线程阻塞
  • 轮询获取结果会消耗cpu资源
  • 多个Future任务不能按照顺序执行
  • Future Api无异常处理

3、CompletableFuture实现

CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。

首先看看CompletableFuture的类图关系,CompletableFuture实现了Future和CompletionStage接口,因此看来CompletableFuture具有Future和CompletionStage的特性。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {

 CompletionStage接口拥有的API

 咱们看看通过CompletableFuture实现的老板开会需求代码实例如下:

package com.lc;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 老板开会Future实现
 *
 * @author liuchao
 * @date 2023/4/5
 */
public class BossMeeting {

    /**
     * 主线程为老板正在开会
     *
     * @param args
     */
    public static void main(String[] args) {
        System.out.println("老板开会start");
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        try {
            CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "秘书搜集完材料";
                //结束返回
            }, executorService).whenComplete((v, e) -> {
                //无异常说明 执行成功
                if (e == null) {
                    System.out.println("秘书搜集到的材料:" + v);
                }
                //异常处理
            }).exceptionally(e -> {
                e.printStackTrace();
                System.out.println("执行异常:" + e.getCause());
                return null;
            });

            System.out.println("老板继续开会");

            try {
                //模拟老板继续开会3秒钟
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("老板开会end");
        } finally {
            executorService.shutdown();
        }

    }
}

执行结果:发现没有任何堵塞,任务提交主线程继续执行,异步任务执行完成主动告知主线程

老板开会start
老板继续开会
秘书搜集到的材料:秘书搜集完材料
老板开会end 

4、CompletableFuture Api详解

4.1、CompletableFuture创建方式

官方推荐使用CompletableFuture提供的静态方法创建CompletableFuture实例,以下是提供的静态方法:

// 无返回值 使用ForkJoinPool线程池
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 无返回值 可以自定义线程池
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// 有返回值 使用ForkJoinPool线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 有返回值 可以自定义线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

supply开头:这种方法,可以返回异步线程执行之后的结果。
run开头:这种不会返回结果,就只是执行线程任务。
如果你想异步运行一些后台任务并且不想从任务中返回任何东西,那么你可以使用run开头的

实例:

package com.lc;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author liuchao
 * @date 2023/4/5
 */
public class Test {
    public static void main(String[] args) {
        //无返回结果
        CompletableFuture.runAsync(() -> {
            System.out.println("无返回值线程:" + Thread.currentThread().getName());
            System.out.println("执行异步任务,无返回结果");
        });

        //无返回值 自定义线程
        ExecutorService executors = Executors.newFixedThreadPool(2);
        CompletableFuture.runAsync(() -> {
            System.out.println("无返回值,自定义线程:" + Thread.currentThread().getName());
            System.out.println("执行异步任务,无返回自定义线程结果");
        }, executors);


        //有返回结果
        CompletableFuture.supplyAsync(() -> {
            System.out.println("执行异步任务,有返回值");
            System.out.println("有返回值线程:" + Thread.currentThread().getName());
            return "返回值";
        });

        //有返回结果自定义线程
        CompletableFuture.supplyAsync(() -> {
            System.out.println("执行异步任务,有返回值");
            System.out.println("有返回值线程:" + Thread.currentThread().getName());
            return "返回值";
        }, executors);

    }
}

 执行效果:

无返回值线程:ForkJoinPool.commonPool-worker-1
执行异步任务,无返回结果
无返回值,自定义线程:pool-1-thread-1
执行异步任务,无返回自定义线程结果
执行异步任务,有返回值
有返回值线程:ForkJoinPool.commonPool-worker-1
执行异步任务,有返回值
有返回值线程:pool-1-thread-2
 

4.2、CompletableFuture获取返回值

通过get、join、getNow获取返回值,区别如下:

  • join:返回结果或者抛出一个unchecked异常(CompletionException),不需要显示捕获异常。
  • get:返回结果或者一个具体的异常(ExecutionException, InterruptedException),此方法继承至Future是堵塞的。
  • getNow:如果当前任务执行完成,返回执行结果,否则返回valueIfAbsent(默认值)。

实例:


    /**
     * 通过get获取方法
     */
    public void test1() {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "get方法需要显示捕获异常");
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * join 不需要显示捕获异常
     */
    public void test2() {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "join方法不需要显示捕获异常");
        System.out.println(future.join());
    }

    /**
     * getNow方法可以设置默认值
     * 在有效的时间内,未返回结果,则直接返回默认值
     */
    public void test3() {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "getNow获取返回值");
        System.out.println(future.getNow("默认值"));
    }

4.3、其他Api详解

  • thenApply():拿到上一个异步执行的结果继续后续操作

实例:

        // 模拟 1 + 1 + 1
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1).thenApply(v -> v + 1).thenApply(v -> v + 1);
        System.out.println("执行结果:" + future.getNow(-1));

返回结果:3

thenApply()是线程的后续操作,可以拿到上一次线程执行的返回结果作为本次thenApply()的参数一直传递下去。 并且是有返回结果的。

  • thenAccept() 和 thenRun()方法

如果你不想从你的回调函数中返回任何东西,只想在 Future 完成后运行一些代码,那么你可以使用thenAccept()andthenRun()方法。这些方法是消费者Consumer<? super T> action,通常用作回调链中的最后一个回调。

实例:

        // 模拟 1 + 1 + 1
        CompletableFuture.supplyAsync(() -> 1).thenApply(v -> v + 1).thenApply(v -> v + 1)
                .thenAccept(r -> System.out.println("1+1+1=" + r));

结果:1+1+1=3

  • complete():当前阶段异步任务执行完成

complete()其实也是个消费操作,但是与thenRun()不同的是,里面可以可抛出的异常

// 区别就是不是异步处理
public CompletableFuture<T>  whenComplete(BiConsumer<? super T,? super Throwable> action)
// 使用异步处理
public CompletableFuture<T>  whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
// 区别在于可以指定线程池
public CompletableFuture<T>  whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
// 接收一个可抛出的异常,且必须有返回值
public CompletableFuture<T>  exceptionally(Function<Throwable,? extends T> fn)
  • handle():相比thenApply()抛出异常后还可以继续执行

public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

handle方法集和上面的complete方法集没有区别,同样有两个参数一个返回结果和可抛出异常,区别就在于返回值

5、CompletableFuture综合使用

需求:要查找10个订单信息以及关联的商品、图片信息

订单上有商品ID,通过商品ID可以查询到商品详细信息,图片信息存储在商品详细信息中。

那就需要查询完订单再查询商品最后查询图片信息,这3个异步任务需要串行执行。

实例代码:

package com.lc;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

/**
 * @author liuchao
 * @date 2023/4/5
 */
public class Test {

    public static void main(String[] args) {
        //10个订单号
        List<String> orderCodeList = Arrays.asList(new String[]{"order_01", "order_02", "order_03", "order_04",
                "order_05", "order_06", "order_07", "order_08", "order_09", "order_10"});

        //定义线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(15);

        try {
            List<String> collect = orderCodeList.stream().map(o ->
                    CompletableFuture.supplyAsync(() -> String.format("订单:%s,关联商品ID为:%s", o, ThreadLocalRandom.current().nextInt()), threadPool)
                            .thenApplyAsync((v) -> String.format(v + ",关联图片ID为:%s", ThreadLocalRandom.current().nextInt()), threadPool)
                            .thenApplyAsync((v) -> String.format(v + ",关联图信息获取成功"), threadPool)
                            .exceptionally(e -> {
                                e.printStackTrace();
                                return null;
                            })
                            .join()
            ).collect(Collectors.toList());

            //打印结果
            System.out.println(collect);
        } finally {
            //释放资源
            threadPool.shutdown();
        }

    }
}


http://www.kler.cn/a/9534.html

相关文章:

  • LeetCode【0035】搜索插入位置
  • 【go从零单排】JSON序列化和反序列化
  • INQUIRE:一个包含五百万张自然世界图像,涵盖10,000个不同物种的专为专家级文本到图像检索任务设计的新型基准数据集。
  • 【C++】一种针对代码的连续条件检查方案,累计布尔结果
  • 前端--> nginx-->gateway产生的跨域问题分析
  • POI实现根据PPTX模板渲染PPT
  • 2022国赛23:linux服务器如何添加开放端口
  • 从零开始,手把手教你实现基于 Websocket 的微服务
  • 搜索词分析工具-网站关键词挖掘
  • 从JVM 源码的角度深度剖析CAS是如何实现原子性的
  • 校区机房物联网数据采集及远程监控5G应用系统方案
  • Spring(Ioc和Bean的作用域)
  • Docker-Compose镜像仓库
  • 【云原生进阶之容器】第六章容器网络6.4.1--Flannel组网方案综述
  • ServletContext
  • 《剑指offer》——从尾到头打印链表
  • 【Python】1分钟就能制作精美的框架图?太棒啦
  • Minio上传html文件
  • 分享10个前端开发者需要掌握的DOM技巧
  • 超越辅助:分享一个基于GPT引擎的免费AI工具
  • 一文解读基于PaddleSeg的钢筋长度超限监控方案
  • 管廊隧道怎么定位人员?分享管廊隧道人员定位系统解决方案
  • ubuntu16.04搭建gitlab
  • 原油期货是什么?原油期货交易盈利技巧有哪些?
  • MyBatis动态SQL教程:灵活处理复杂SQL场景,提升性能与可维护性
  • 二叉树练习题(递归展开图详解哦)