流处理 CompletableFuture
专栏系列文章地址:https://blog.csdn.net/qq_26437925/article/details/145290162
本文目标:
- 掌握:流处理 & CompletableFuture的使用
目录
- 直接例子看同步和异步处理的区别
- 普通流处理进行批量查询
- 批量查询1:并行流操作
- 批量查询2:使用`CompletableFuture`: 组合式异步编程
- 更对的数据进行对比两种批处理
- CompletableFuture 使用线程池定制
- 使用`流`还是`CompletableFutures`?
- CompletableFuture
- 原理
- 和future对比
- CompletableFuture 组合使用例子
- 异常处理
直接例子看同步和异步处理的区别
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
class Shop {
/**
* 模拟1秒中延迟的方法
*/
public static void delay() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private double calculatePrice(String product) {
delay();
Random random = new Random();
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
private double calculatePriceException(String product) {
delay();
throw new RuntimeException(product + " not available");
}
/**
* 同步API
*
* @param product
* @return
*/
public double getPrice(String product) {
return calculatePrice(product);
}
/**
* 异步API
*
* @param product
* @return
*/
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
double price = calculatePrice(product);
futurePrice.complete(price);
}).start();
return futurePrice;
}
public void doSomethingElse() {
System.out.println("doSomethingElse ......");
}
}
public class Main {
public static void main(String[] args) {
double price = 0;
Shop shop = new Shop();
// 同步获取:查询商店,试图取得商品的价格
long start = System.nanoTime();
price = shop.getPrice("apple");
System.out.printf("Price is %.2f%n", price);
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("sync get price returned after " + duration + " msecs");
System.out.println("======================");
start = System.nanoTime();
// 异步任务:查询商店,试图取得商品的价格
Future<Double> futurePrice = shop.getPriceAsync("apple");
long invocationTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("future Invocation returned after " + invocationTime + " msecs");
// 执行更多任务,比如查询其他商店
shop.doSomethingElse();
try {
// 通过该对象可以在将来的某个时刻取得的价格
// 执行了这个操作后,要么获得Future中封装的值(如果异步任务已经完成),
// 要么发生阻塞,直到该异步任务完成,期望的值能够访问。
price = futurePrice.get();
System.out.printf("get Price is %.2f%n", price);
} catch (Exception e) {
throw new RuntimeException(e);
}
long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Price returned after " + retrievalTime + " msecs");
}
}
异步可以让代码免受阻塞之苦;可以查询之后,后续获取,而期间可以干别的事情
普通流处理进行批量查询
如下查询一批商品的价格
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
class Shop {
private String name;
public Shop(String name) {
this.name = name;
}
public String getName() {
return name;
}
/**
* 模拟1秒中延迟的方法
*/
public static void delay() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private double calculatePrice(String product) {
delay();
Random random = new Random();
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
/**
* 同步API
*
* @param product
* @return
*/
public double getPrice(String product) {
return calculatePrice(product);
}
/**
* 异步API
*
* @param product
* @return
*/
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
double price = calculatePrice(product);
futurePrice.complete(price);
}).start();
return futurePrice;
}
public void doSomethingElse() {
System.out.println("doSomethingElse ......");
}
}
public class Main {
List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("MyFavoriteShop"),
new Shop("BuyItAll"));
public List<String> findPrices(String product) {
return shops.stream()
.map(shop ->
String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))
)
.collect(Collectors.toList());
}
public static void main(String[] args) {
Main mainTest = new Main();
long start = System.nanoTime();
List<String> shopPriceList = mainTest.findPrices("apple");
shopPriceList.forEach( shopPrice -> System.out.println(shopPrice) );
long duration = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Done in " + duration + " msecs");
}
}
输出如下:耗时4秒多,因为一个商品查询就要1秒
批量查询1:并行流操作
public List<String> parallelFindPrices(String product) {
return shops.parallelStream()
.map(shop -> String.format("%s price is %.2f",
shop.getName(), shop.getPrice(product)))
.collect(Collectors.toList());
}
public static void main(String[] args) {
Main mainTest = new Main();
long start = System.nanoTime();
List<String> shopPriceList = mainTest.parallelFindPrices("apple");
shopPriceList.forEach( shopPrice -> System.out.println(shopPrice) );
long duration = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Done in " + duration + " msecs");
}
输出如下:1秒多处理完成
批量查询2:使用CompletableFuture
: 组合式异步编程
private List<CompletableFuture<String>> parallelFindPricesAnyc(String product) {
return shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> String.format("%s price is %.2f",
shop.getName(), shop.getPrice(product))))
.collect(Collectors.toList());
}
public List<String> bestFindPrices(String product) {
// 异步方式计算么个商店商品价格
List<CompletableFuture<String>> priceFutures = parallelFindPricesAnyc(product);
// 等待所有异步操作结束
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
public static void main(String[] args) {
Main mainTest = new Main();
long start = System.nanoTime();
List<String> shopPriceList = mainTest.bestFindPrices("apple");
shopPriceList.forEach( shopPrice -> System.out.println(shopPrice) );
long duration = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Done in " + duration + " msecs");
}
输出如下:比流处理似乎慢点,但是肯定比同步处理快
更对的数据进行对比两种批处理
当有9个商品的时候,对比如下,看着2者差不多了
原因:
二者内部采用的是同样的通用线程池,默认都使用固定数目的线程,具体线程数取决于Runtime. getRuntime().availableProcessors()
的返回值(即CPU核心数)。
然而,CompletableFuture
具有一定的优势,因为它允许你对执行器(Executor)进行配置,尤其是线程池的大小,让它以更适合应用需求的方式进行配置,满足程序的要求,而这是并行流API无法提供的。让我们看看怎样利用这种配置上的灵活性带来实际应用程序性能上的提升。
CompletableFuture 使用线程池定制
private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100)
, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
public List<CompletableFuture<String>> parallelFindPricesAsync(String product) {
// supplyAsync工方法 指定线程池
return shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> String.format("%s price is %.2f",
shop.getName(), shop.getPrice(product)), executor))
.collect(Collectors.toList());
}
public List<String> bestFindPricesAsync(String product) {
// 异步方式计算么个商店商品价格
List<CompletableFuture<String>> priceFutures = parallelFindPricesAsync(product);
// 等待所有异步操作结束
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
public static void main(String[] args) {
Main mainTest = new Main();
long start = System.nanoTime();
List<String> shopPriceList = mainTest.bestFindPricesAsync("apple");
shopPriceList.forEach( shopPrice -> System.out.println(shopPrice) );
long duration = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Done in " + duration + " msecs");
}
同样处理9个商品现在是1秒多了,比之前默认的设置快多了
反问:创建一个配有线程池的执行器,线程池中线程的数目取决于你预计你的应用需要处理的负荷,但是你该如何选择合适的线程数目呢?
《Java 并发编程实战》: 如果线程池中的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。反之,如果线程的数目过少,正如你的应用所面临的情况,处理器的一些核可能就无法充分利用。Brian Goetz建议,线程池大小与处理器的利用率之比可以使用下面的公式进行估算:
线程池数目 = CPU核心数 * 期望的CPU利用率(介于0和1之间) *(1 + 等待时间和计算时间的比率)
不过实际也需要进行压测和系统观察。
使用流
还是CompletableFutures
?
目前为止,对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操作展开工作,要么枚举出集合中的每一个元素,创建新的线程,在CompletableFuture内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待I/O而发生阻塞。
建议是:
对于计算密集型操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也是最高的(如果所有的线程都是计算密集型的),那就没有必要创建比处理器和数更多的线程。
如果并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture
的灵活性更好。不使用并行流另一个原因是:处理流的流水线中如果发生I/O等待,流的延迟特效会让我们很难判断到底什么时候触发了等待。
CompletableFuture
原理
forkjoinpool可参考阅读:https://blog.csdn.net/qq_26437925/article/details/145417518
和future对比
CompletableFuture 组合使用例子
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import java.util.stream.Stream;
class Shop {
private String name;
public Shop(String name) {
this.name = name;
}
public String getName() {
return name;
}
/**
* 模拟1秒中延迟的方法
*/
public static void delay() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private double calculatePrice(String productName) {
delay();
Random random = new Random();
return random.nextDouble() * productName.charAt(0) + productName.charAt(1);
}
public String getPrice() {
Random random = new Random();
double price = calculatePrice(this.getName());
Discount.Code code = Discount.Code.values()[
random.nextInt(Discount.Code.values().length)];
return String.format("%s:%.2f:%s", name, price, code);
}
}
/**
* 折扣服务api
*/
class Discount {
public enum Code {
NONE(0), SILVER(0), GOLD(10), PLATINUM(15), DIAMOND(20);
private final int percentage;
Code(int percentage) {
this.percentage = percentage;
}
}
public static String applyDiscount(Quote quote) {
return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
}
private static double apply(double price, Code code) {
delay();
return price * (100 - code.percentage) / 100;
}
/**
* 模拟计算,查询数据库等耗时
*/
public static void delay() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 商店返回消息实体,不可变对象模式 线程安全
*/
final class Quote {
private final String shopName;
private final double price;
private final Discount.Code discountCode;
public Quote(String shopName, double price, Discount.Code discountCode) {
this.shopName = shopName;
this.price = price;
this.discountCode = discountCode;
}
public static Quote parse(String s) {
String[] split = s.split(":");
String shopName = split[0];
double price = Double.parseDouble(split[1]);
Discount.Code discountCode = Discount.Code.valueOf(split[2]);
return new Quote(shopName, price, discountCode);
}
public String getShopName() {
return shopName;
}
public double getPrice() {
return price;
}
public Discount.Code getDiscountCode() {
return discountCode;
}
}
public class Main {
List<Shop> shops = Arrays.asList(
new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("MyFavoriteShop"),
new Shop("BuyItAll"),
new Shop("five"),
new Shop("six"),
new Shop("seven"),
new Shop("eight"),
new Shop("nine")
);
private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100)
, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
// 同步处理
public List<String> findprices(String product) {
// 1. 取出商品的原始价格 -- 耗时1秒多
// 2. 在Quote对象中对shop返回对字符串进行转换
// 3. 联系Discount服务,为每个Quote申请折扣 -- 耗时1秒多
return shops.stream()
.map(shop -> shop.getPrice())
.map(Quote::parse)
.map(Discount::applyDiscount)
.collect(Collectors.toList());
}
// 异步组合处理
public Stream<CompletableFuture<String>> findPricesStream() {
return shops.stream()
// 异步方式取得每个shop中指定产品的原始价格
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(), executor))
// 在Quote对象中对shop返回对字符串进行转换
.map(future -> future.thenApply(Quote::parse))
// 另一个异步任务构建期望的Future,申请折扣 thenCompose 将多个future组合 一个一个执行
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
}
public static void main(String[] args) {
Main mainTest = new Main();
long start = System.nanoTime();
CompletableFuture[] futures = mainTest.findPricesStream()
.map(f -> f.thenAccept(
s -> System.out.println(s + " (done in " +
((System.nanoTime() - start) / 1_000_000) + " msecs)")))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
System.out.println("All shops have now responded in "
+ ((System.nanoTime() - start) / 1_000_000) + " msecs");
}
}
异常处理
CompletableFuture提供了非阻塞的方式来处理计算结果,无论是计算成功还是遇到异常。CompletableFuture 提供了多种方法来处理异常,包括使用 exceptionally、handle、whenComplete 等方法。
static void testException1(){
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
// 模拟异常抛出
throw new RuntimeException("Something went wrong");
});
completableFuture.exceptionally(ex -> {
System.err.println("Exception caught: " + ex.getMessage());
return "Default result"; // 返回一个默认结果
});
}
static void testException2() {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
// 模拟异常抛出
throw new RuntimeException("Something went wrong");
});
completableFuture.handle((result, ex) -> {
if (ex != null) {
System.err.println("Exception caught: " + ex.getMessage());
return "Exception handled"; // 返回异常处理结果
}
else {
return result; // 返回正常结果
}
});
}
static void testException3() {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
// 模拟异常抛出
throw new RuntimeException("Something went wrong");
});
completableFuture.whenComplete((result, ex) -> {
if (ex != null) {
System.err.println("Exception caught: " + ex.getMessage());
}
else {
System.out.println("Result: " + result); // 仅当没有异常时执行
}
});
}
public static void main(String[] args) {
testException1();
testException2();
testException3();
}