当前位置: 首页 > article >正文

【JUC】十二、CompletableFuture(上)

文章目录

  • 1、CompletionStage
  • 2、创建CompletableFuture对象
  • 3、CompletbaleFuture
  • 4、函数式接口
  • 5、chain链式调用
  • 6、实例:电商网站比价

针对前面提到的Future接口的实现类FutureTask的缺点,考虑传入一个回调函数,当任务完成时,自动去调用,since Java8,有了Future接口的新实现CompleteableFuture,CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。

在这里插入图片描述

1、CompletionStage

CompletionStage是CompletableFuture实现的接口,代表异步计算过程中的某一个阶段,一个阶段完成后,可能触发另一个阶段。类比Linux的管道符。

stage.thenApply(x -> square(x))
   	 .thenAccept(x -> System.out.println(x))
   	 .thenRun(() -> System.out.println())

一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。

2、创建CompletableFuture对象

CompletableFuture是Future接口新的实现类,是对实现异步任务的的再一次扩展。创建CompletableFuture对象:

CompletbaleFuture future = new CompletableFuture();

在这里插入图片描述

可以看到这样创建的是一个未完成的CompletableFuture对象,即这个构造方法只是语法层面提供一下,真正创建CompletableFuture对象并不用它。正确用法为调用:

runAsync
supplyAsync

在这里插入图片描述

无返回值的,传参为:

  • Runnable对象
  • Runnable对象+线程池

有返回值的:

  • Supplier(供给型函数式接口)
  • Supplier+线程池

在这里插入图片描述

调用runAsync和supplyAsync这两个静态方法时,关于Executor参数:

  • 没有指定Executor,使用默认的ForkJoinPool.commonPool()做为线程池来执行异步任务
  • 指定了Executor,则使用指定的线程池

前面提到的Future(FutureTask)+ 线程池来提高执行效率,这里CompletableFuture也可以结合线程池。

public class CompletDemo1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
            
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        
        System.out.println(completableFuture.get());
    }
}

在这里插入图片描述

指定线程池后,线程名称发生变化:

public class CompletDemo1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    
        Executor pool = Executors.newFixedThreadPool(3);
        
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },pool);
        
        System.out.println(completableFuture.get());
    }
}

在这里插入图片描述

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
	    System.out.println(Thread.currentThread().getName());
	    try {
	        TimeUnit.SECONDS.sleep(1);
	    } catch (Exception e) {
	        e.printStackTrace();
	    }
	    return "9527";
	});
System.out.println(completableFuture.get());

在这里插入图片描述

public class CompletDemo2 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
    
        Executor pool = Executors.newFixedThreadPool(3);
        
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "9527";
        },pool);
        
        System.out.println(completableFuture.get());
    }
}

3、CompletbaleFuture

从Java8开始引入CompletbaleFuture,它是Future功能的增强版,会较少阻塞和轮询,可以传入回调函数,当异步任务完成或者发生异常时,自动调用传入的回调方法。

在这里插入图片描述

上面创建CompletableFuture对象时,演示的CompletableFuture和FutureTask差不多,这里演示CompletableFuture的回调:

  • whenComplete:传入一个BiConsumer,即异步任务完成后自己调用的逻辑
  • exceptionally:传入一个Function,即异步任务发生异常时的逻辑
public class CompletDemo2 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(3);

        try {
            CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "come in...");
                //十以内的随机数
                int result = ThreadLocalRandom.current().nextInt(10);
                if(result > 5){
                    throw new RuntimeException("test");
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("----1s后该任务出结果:" + result);
                return result;
            },pool).whenComplete((v,e) -> {   //v是上一步的计算结果,e为上一步发生了的异常
                if (null == e){
                    System.out.println("计算完成,update:" + v);
                }
            }).exceptionally(e -> {
                e.printStackTrace();
                System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());
                return null;
            });
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            pool.shutdown();
        }
        System.out.println(Thread.currentThread().getName() + "线程接着去忙其他任务了");
        //使用默认线程池时,主线程结束会导致线程池立刻关闭,导致分支线程task没来得及输出
        //可使用自定义线程池或者sleep,不要让主线程立刻结束


    }
}

注意:

  • supplyAsync或者runAsync方法不传pool时,则默认使用fork-join-pool线程池,此时主线程结束会导致线程池立刻关闭,可能会导致分支线程task没来得及输出
  • 可使用自定义线程池或者让主线程sleep一会儿,总之不要让主线程立刻结束

别程序运行,分支任务结果没输出,一脸懵不知道啥原因。

在这里插入图片描述

生产随机数:

//十以内的随机数
int result = ThreadLocalRandom.current().nextInt(10);

get和join的区别:

二者作用基本一样,join在编译阶段不会报异常,不用throw或者try处理

4、函数式接口

以下是后面要用到的几个函数式接口(since Java8):
在这里插入图片描述
在这里插入图片描述
消费型函数式接口:

在这里插入图片描述

两个入参的消费型函数式接口:

在这里插入图片描述
生产型函数式接口:

在这里插入图片描述

总结:

在这里插入图片描述

5、chain链式调用

链式编程一直在用,这里记下@Accessors(之前有个@Builder)

@AllArgsConstructor
@NoArgsConstructor
@Data
@Accessors(chain = true)
public class Student {

    private String uid;

    private String name;

    private String address;
}

加@Accessors注解后,再给对象赋值,就可以直接链式调用。

new Student().setUid("001").setName("code9527").setAddress("Tianjin");

6、实例:电商网站比价

需求分析:

对比同一款商品在各大电商平台的售价,返回一个List,元素格式:

-MySQL》 in JD price is 88.05
-Mysql》 in TianMao price is 90.43
- ...

定义个实体类:

@AllArgsConstructor
public class NetMall {

    @Getter
    private String netMallName;   //电商平台名称

    /**
     * 模拟计算某平台某商品的价格
     * @param productName 商品名称
     * @return 商品价格
     */
    public double calcPrice(String productName){
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //自定义一个价格计算方式,模拟查到返回一个价格
        return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
    }
}

实现这个功能,可step by step,即一个一个电商平台去查:

public class CompletablePlatPrice {

    static List<String> platList = Arrays.asList(
            "JD",
            "TaoBao",
            "TianMao",
            "PDD"
    );

    public static List<String> getPrice(String productName){
        return platList.stream()
                .map(t -> String.format(productName + " in %s price is %.2f", t, new NetMall(t).calcPrice(productName)))
                .collect(Collectors.toList());
    }
    public static void main(String[] args) {

        long startTime = System.currentTimeMillis();
        List<String> priceList = getPrice("mysql基础");
        for (String price : priceList) {
            System.out.println(price);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("--耗时--" + (endTime-startTime) + "ms");
    }
}

String.format方法修改字符串格式,p1为pattern,p2为占位符对应的变量 %.2f即小数点后保留两位。这个功能,使用异步task来实现,即多个CompletableFuture查多个平台:

public static List<String> getPriceByCompletableFuture(String productName){
    return platList.stream()
            .map(t -> CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f", t, new NetMall(t).calcPrice(productName))))
            .collect(Collectors.toList())
            .stream()
            .map(t -> t.join())
            .collect(Collectors.toList());
}

此后,电商平台数量再增加,新实现的耗时也基本不变,这就是从功能到性能:

public static void main(String[] args) {

    long startTime = System.currentTimeMillis();
    List<String> priceList = getPrice("mysql基础");
    for (String price : priceList) {
        System.out.println(price);
    }
    long endTime = System.currentTimeMillis();
    System.out.println("--耗时--" + (endTime-startTime) + "ms");

    long startTime2 = System.currentTimeMillis();
    List<String> priceList2 = getPriceByCompletableFuture("mysql基础");
    for (String price : priceList2) {
        System.out.println(price);
    }
    long endTime2 = System.currentTimeMillis();
    System.out.println("--耗时--" + (endTime2-startTime2) + "ms");
}

在这里插入图片描述

使用IDEA看下上面的过程,实际是把一个List<String>里的String转成了一个个CompletableFuture对象,得到List<CompletableFuture<Result>>,再二次stream处理,join或者get拿到异步计算的值。

在这里插入图片描述


http://www.kler.cn/a/148271.html

相关文章:

  • 机器学习总结
  • Qwen2-VL:发票数据提取、视频聊天和使用 PDF 的多模态 RAG 的实践指南
  • Java面向对象高级2
  • Zotero 6.0 安装包及安装教程
  • 建筑施工特种作业人员安全生产知识试题
  • 微擎框架php7.4使用phpexcel导出数据报错修复
  • Java 之 lambda 表达式(二)---- Stream 操作 API
  • C语言—什么是数组名
  • 5种主流API网关技术选型,yyds!
  • Python基础语法之学习print()函数
  • JAVA 使用stream流将List中的对象某一属性创建新的List
  • ubuntu中root和普通用户切换方法
  • LeetCode Hot100 20.有效的括号
  • Android Frameworks 开发总结之七
  • UniApp 中的 u-input 属性讲解
  • 大数据-之LibrA数据库系统告警处理(ALM-37006 Coordinator进程异常)
  • python tkinter 使用(八)
  • C++之STL库:string类(用法列举和总结)
  • springboot项目修改项目名称
  • JVM的知识点
  • python:由深浅拷贝谈到变量值的核心区别
  • 【SpringCloud】设计原则之单一职责与服务拆分
  • 如何在Ubuntu系统上安装Node.js
  • HTTP 响应头信息
  • 简易键值对文本解析
  • 面试:Kafka相关问题