当前位置: 首页 > article >正文

流处理 CompletableFuture

专栏系列文章地址:https://blog.csdn.net/qq_26437925/article/details/145290162

本文目标:

  1. 掌握:流处理 & CompletableFuture的使用

目录

    • 直接例子看同步和异步处理的区别
    • 普通流处理进行批量查询
      • 批量查询1:并行流操作
      • 批量查询2:使用`CompletableFuture`: 组合式异步编程
      • 更对的数据进行对比两种批处理
      • CompletableFuture 使用线程池定制
      • 使用`流`还是`CompletableFutures`?
    • CompletableFuture
      • 原理
      • 和future对比
      • CompletableFuture 组合使用例子
      • 异常处理

直接例子看同步和异步处理的区别

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;


class Shop {

    /**
     * 模拟1秒中延迟的方法
     */
    public static void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private double calculatePrice(String product) {
        delay();
        Random random = new Random();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    private double calculatePriceException(String product) {
        delay();
        throw new RuntimeException(product + " not available");
    }

    /**
     * 同步API
     *
     * @param product
     * @return
     */
    public double getPrice(String product) {
        return calculatePrice(product);
    }

    /**
     * 异步API
     *
     * @param product
     * @return
     */
    public Future<Double> getPriceAsync(String product) {
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread(() -> {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        }).start();
        return futurePrice;
    }

    public void doSomethingElse() {
        System.out.println("doSomethingElse ......");
    }
}

public class Main {

    public static void main(String[] args) {
        double price = 0;
        Shop shop = new Shop();

        // 同步获取:查询商店,试图取得商品的价格
        long start = System.nanoTime();
        price = shop.getPrice("apple");
        System.out.printf("Price is %.2f%n", price);
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("sync get price returned after " + duration + " msecs");

        System.out.println("======================");
        start = System.nanoTime();
        // 异步任务:查询商店,试图取得商品的价格
        Future<Double> futurePrice = shop.getPriceAsync("apple");
        long invocationTime = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("future Invocation returned after " + invocationTime + " msecs");

        // 执行更多任务,比如查询其他商店
        shop.doSomethingElse();

        try {
            // 通过该对象可以在将来的某个时刻取得的价格
            // 执行了这个操作后,要么获得Future中封装的值(如果异步任务已经完成),
            // 要么发生阻塞,直到该异步任务完成,期望的值能够访问。
            price = futurePrice.get();
            System.out.printf("get Price is %.2f%n", price);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("Price returned after " + retrievalTime + " msecs");
    }
}

异步可以让代码免受阻塞之苦;可以查询之后,后续获取,而期间可以干别的事情
在这里插入图片描述

普通流处理进行批量查询

如下查询一批商品的价格

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.stream.Collectors;


class Shop {

    private String name;

    public Shop(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    /**
     * 模拟1秒中延迟的方法
     */
    public static void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private double calculatePrice(String product) {
        delay();
        Random random = new Random();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    /**
     * 同步API
     *
     * @param product
     * @return
     */
    public double getPrice(String product) {
        return calculatePrice(product);
    }

    /**
     * 异步API
     *
     * @param product
     * @return
     */
    public Future<Double> getPriceAsync(String product) {
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread(() -> {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        }).start();
        return futurePrice;
    }

    public void doSomethingElse() {
        System.out.println("doSomethingElse ......");
    }
}

public class Main {

    List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
            new Shop("LetsSaveBig"),
            new Shop("MyFavoriteShop"),
            new Shop("BuyItAll"));

    public List<String> findPrices(String product) {
        return shops.stream()
                .map(shop ->
                        String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))
                )
                .collect(Collectors.toList());
    }

    public static void main(String[] args) {
        Main mainTest = new Main();
        long start = System.nanoTime();
        List<String> shopPriceList = mainTest.findPrices("apple");
        shopPriceList.forEach( shopPrice -> System.out.println(shopPrice) );
        long duration = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("Done in " + duration + " msecs");
    }
}

输出如下:耗时4秒多,因为一个商品查询就要1秒
在这里插入图片描述

批量查询1:并行流操作

 public List<String> parallelFindPrices(String product) {
       return shops.parallelStream()
               .map(shop -> String.format("%s price is %.2f",
                       shop.getName(), shop.getPrice(product)))
               .collect(Collectors.toList());
   }

   public static void main(String[] args) {
       Main mainTest = new Main();
       long start = System.nanoTime();
       List<String> shopPriceList = mainTest.parallelFindPrices("apple");
       shopPriceList.forEach( shopPrice -> System.out.println(shopPrice) );
       long duration = ((System.nanoTime() - start) / 1_000_000);
       System.out.println("Done in " + duration + " msecs");
   }

输出如下:1秒多处理完成

批量查询2:使用CompletableFuture: 组合式异步编程

  private List<CompletableFuture<String>> parallelFindPricesAnyc(String product) {
        return shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(
                        () -> String.format("%s price is %.2f",
                                shop.getName(), shop.getPrice(product))))
                .collect(Collectors.toList());
    }

    public List<String> bestFindPrices(String product) {
        // 异步方式计算么个商店商品价格
        List<CompletableFuture<String>> priceFutures = parallelFindPricesAnyc(product);
        // 等待所有异步操作结束
        return priceFutures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());

    }

    public static void main(String[] args) {
        Main mainTest = new Main();
        long start = System.nanoTime();
        List<String> shopPriceList = mainTest.bestFindPrices("apple");
        shopPriceList.forEach( shopPrice -> System.out.println(shopPrice) );
        long duration = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("Done in " + duration + " msecs");
    }

输出如下:比流处理似乎慢点,但是肯定比同步处理快
在这里插入图片描述

更对的数据进行对比两种批处理

当有9个商品的时候,对比如下,看着2者差不多了
在这里插入图片描述

原因:

二者内部采用的是同样的通用线程池,默认都使用固定数目的线程,具体线程数取决于Runtime. getRuntime().availableProcessors()的返回值(即CPU核心数)。

然而,CompletableFuture具有一定的优势,因为它允许你对执行器(Executor)进行配置,尤其是线程池的大小,让它以更适合应用需求的方式进行配置,满足程序的要求,而这是并行流API无法提供的。让我们看看怎样利用这种配置上的灵活性带来实际应用程序性能上的提升。

CompletableFuture 使用线程池定制

 private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100)
            , new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setDaemon(true);
                    return t;
                }
            });

    public List<CompletableFuture<String>> parallelFindPricesAsync(String product) {
        // supplyAsync工􏱣方法 指定线程池
        return shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(
                        () -> String.format("%s price is %.2f",
                                shop.getName(), shop.getPrice(product)), executor))
                .collect(Collectors.toList());
    }

    public List<String> bestFindPricesAsync(String product) {
        // 异步方式计算么个商店商品价格
        List<CompletableFuture<String>> priceFutures = parallelFindPricesAsync(product);
        // 等待所有异步操作结束
        return priceFutures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());

    }

    public static void main(String[] args) {
        Main mainTest = new Main();
        long start = System.nanoTime();
        List<String> shopPriceList = mainTest.bestFindPricesAsync("apple");
        shopPriceList.forEach( shopPrice -> System.out.println(shopPrice) );
        long duration = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("Done in " + duration + " msecs");
    }

同样处理9个商品现在是1秒多了,比之前默认的设置快多了
在这里插入图片描述

反问:创建一个配有线程池的执行器,线程池中线程的数目取决于你预计你的应用需要处理的负荷,但是你该如何选择合适的线程数目呢?

《Java 并发编程实战》: 如果线程池中的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。反之,如果线程的数目过少,正如你的应用所面临的情况,处理器的一些核可能就无法充分利用。Brian Goetz建议,线程池大小与处理器的利用率之比可以使用下面的公式进行估算:

线程池数目 = CPU核心数 * 期望的CPU利用率(介于0和1之间) *(1 + 等待时间和计算时间的比率)

不过实际也需要进行压测和系统观察。

使用还是CompletableFutures?

目前为止,对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操作展开工作,要么枚举出集合中的每一个元素,创建新的线程,在CompletableFuture内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待I/O而发生阻塞。

建议是:

对于计算密集型操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也是最高的(如果所有的线程都是计算密集型的),那就没有必要创建比处理器和数更多的线程。

如果并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture的灵活性更好。不使用并行流另一个原因是:处理流的流水线中如果发生I/O等待,流的延迟特效会让我们很难判断到底什么时候触发了等待。

CompletableFuture

原理

在这里插入图片描述

forkjoinpool可参考阅读:https://blog.csdn.net/qq_26437925/article/details/145417518

和future对比

在这里插入图片描述

CompletableFuture 组合使用例子

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import java.util.stream.Stream;


class Shop {

    private String name;

    public Shop(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    /**
     * 模拟1秒中延迟的方法
     */
    public static void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private double calculatePrice(String productName) {
        delay();
        Random random = new Random();
        return random.nextDouble() * productName.charAt(0) + productName.charAt(1);
    }


    public String getPrice() {
        Random random = new Random();
        double price = calculatePrice(this.getName());
        Discount.Code code = Discount.Code.values()[
                random.nextInt(Discount.Code.values().length)];
        return String.format("%s:%.2f:%s", name, price, code);
    }
}


/**
 * 折扣服务api
 */
class Discount {
    public enum Code {
        NONE(0), SILVER(0), GOLD(10), PLATINUM(15), DIAMOND(20);
        private final int percentage;

        Code(int percentage) {
            this.percentage = percentage;
        }
    }

    public static String applyDiscount(Quote quote) {
        return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
    }

    private static double apply(double price, Code code) {
        delay();
        return price * (100 - code.percentage) / 100;
    }

    /**
     * 模拟计算,查询数据库等耗时
     */
    public static void delay() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


/**
 * 商店返回消息实体,不可变对象模式 线程安全
 */
final class Quote {
    private final String shopName;
    private final double price;
    private final Discount.Code discountCode;

    public Quote(String shopName, double price, Discount.Code discountCode) {
        this.shopName = shopName;
        this.price = price;
        this.discountCode = discountCode;
    }

    public static Quote parse(String s) {
        String[] split = s.split(":");
        String shopName = split[0];
        double price = Double.parseDouble(split[1]);
        Discount.Code discountCode = Discount.Code.valueOf(split[2]);
        return new Quote(shopName, price, discountCode);
    }

    public String getShopName() {
        return shopName;
    }

    public double getPrice() {
        return price;
    }

    public Discount.Code getDiscountCode() {
        return discountCode;
    }
}

public class Main {

    List<Shop> shops = Arrays.asList(
            new Shop("BestPrice"),
            new Shop("LetsSaveBig"),
            new Shop("MyFavoriteShop"),
            new Shop("BuyItAll"),
            new Shop("five"),
            new Shop("six"),
            new Shop("seven"),
            new Shop("eight"),
            new Shop("nine")
    );


    private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100)
            , new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setDaemon(true);
                    return t;
                }
            });


    // 同步处理
    public List<String> findprices(String product) {
        // 1. 取出商品的原始价格 -- 耗时1秒多
        // 2. 在Quote对象中对shop返回对字符串进行转换
        // 3. 联系Discount服务,为每个Quote申请折扣 -- 耗时1秒多
        return shops.stream()
                .map(shop -> shop.getPrice())
                .map(Quote::parse)
                .map(Discount::applyDiscount)
                .collect(Collectors.toList());
    }

    // 异步组合处理
    public Stream<CompletableFuture<String>> findPricesStream() {
        return shops.stream()
                // 异步方式取得每个shop中指定产品的原始价格
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(), executor))
                //  在Quote对象中对shop返回对字符串进行转换
                .map(future -> future.thenApply(Quote::parse))
                // 另一个异步任务构建期望的Future,申请折扣 thenCompose 将多个future组合 一个一个执行
                .map(future -> future.thenCompose(quote ->
                        CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));

    }

    public static void main(String[] args) {
        Main mainTest = new Main();
        long start = System.nanoTime();
        CompletableFuture[] futures = mainTest.findPricesStream()
                .map(f -> f.thenAccept(
                        s -> System.out.println(s + " (done in " +
                                ((System.nanoTime() - start) / 1_000_000) + " msecs)")))
                .toArray(size -> new CompletableFuture[size]);
        CompletableFuture.allOf(futures).join();
        System.out.println("All shops have now responded in "
                + ((System.nanoTime() - start) / 1_000_000) + " msecs");
    }

}

异常处理

CompletableFuture提供了非阻塞的方式来处理计算结果,无论是计算成功还是遇到异常。CompletableFuture 提供了多种方法来处理异常,包括使用 exceptionally、handle、whenComplete 等方法。

 static void testException1(){
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            // 模拟异常抛出
            throw new RuntimeException("Something went wrong");
        });

        completableFuture.exceptionally(ex -> {
            System.err.println("Exception caught: " + ex.getMessage());
            return "Default result"; // 返回一个默认结果
        });
    }


    static void testException2() {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            // 模拟异常抛出
            throw new RuntimeException("Something went wrong");
        });

        completableFuture.handle((result, ex) -> {
            if (ex != null) {
                System.err.println("Exception caught: " + ex.getMessage());
                return "Exception handled"; // 返回异常处理结果
            }
            else {
                return result; // 返回正常结果
            }
        });
    }


    static void testException3() {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            // 模拟异常抛出
            throw new RuntimeException("Something went wrong");
        });

        completableFuture.whenComplete((result, ex) -> {
            if (ex != null) {
                System.err.println("Exception caught: " + ex.getMessage());
            }
            else {
                System.out.println("Result: " + result); // 仅当没有异常时执行
            }
        });
    }

    public static void main(String[] args) {
        testException1();
        testException2();
        testException3();
    }

http://www.kler.cn/a/530243.html

相关文章:

  • 【自然语言处理(NLP)】深度学习架构:Transformer 原理及代码实现
  • C# Winform enter键怎么去关联button
  • 【Redis】Redis 经典面试题解析:深入理解 Redis 的核心概念与应用
  • 《 C++ 点滴漫谈: 二十五 》空指针,隐秘而危险的杀手:程序崩溃的真凶就在你眼前!
  • 自然语言处理(NLP)入门:基础概念与应用场景
  • Linux环境下的Java项目部署技巧:环境安装
  • 马铃薯叶子病害检测数据集VOC+YOLO格式1332张9类别
  • 基于SpringBoot的青年公寓服务平台的设计与实现(源码+SQL脚本+LW+部署讲解等)
  • Flutter Raw Image Provider
  • Python 中 `finally` 的执行时机与 `return` 的微妙关系
  • SD存储卡功能特性解析
  • 【C++语言】卡码网语言基础课系列----11. 句子缩写
  • DeepSeek让英伟达狂跌三年?
  • openEuler系统磁盘管理方法
  • Ubuntu 下 nginx-1.24.0 源码分析 - ngx_strerror_init()函数
  • OpenAI发布o3-mini:免费推理模型,DeepSeek引发的反思
  • MySQL 基础学习(4):条件查询(WHERE)更新操作(UPDATE)删除操作(DELETE)分页查询(LIMIT)
  • 算法随笔_36: 复写零
  • 面向初学者的卷积神经网络_卷积神经网络好学吗
  • C++泛型编程指南03-CTAD
  • shell编程(1)——shell介绍
  • Hive分区和分桶
  • unity中的动画混合树
  • Games104——网络游戏的进阶架构
  • 分享10个实用的Python工具的源码,支持定制
  • Java项目: 基于SpringBoot+mybatis+maven+mysql实现的图书管理系统(含源码+数据库+答辩PPT+毕业论文)