异步线程池 CompletableFuture 异步编排 【下篇】
1、创建异步对象
CompletableFuture 提供了四个静态方法来创建一个异步操作。
提示
- 1、runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的
- 2、可以传入自定义的线程池,否则就用默认的线程池
1.1 不存在返回结果
public class ThreadTestDemo {
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
System.out.println("main.....start......");
CompletableFuture.runAsync(()->{
System.out.println("当前线程是:"+Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运算结果是:"+i);
},executor);
System.out.println("main.....end......");
}
}
测试结果
1.2 存在返回结果
public class ThreadTestDemo {
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main.....start......");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程是:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运算结果是:" + i);
return i;
}, executor);
Integer integer = future.get();
System.out.println("main.....end......"+integer);
}
测试结果
2、计算完成时回调方法
whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况。
whenComplete 和 whenCompleteAsync 的区别:
- whenComplete:是执行当前任务的线程执行继续执行
whenComplete
的任务。 - whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执
2.1 whenComplete
public class ThreadTestDemo {
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main.....start......");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程是:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运算结果是:" + i);
return i;
}, executor).whenComplete((res,exception)->{
System.out.println("异步任务完成了...结果是..."+res+";异常是:"+exception);
});
System.out.println("main.....end......");
}
}
如果出现了异常情况
2.2 出现异常、处理返回值
public class ThreadTestDemo {
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main.....start......");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程是:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运算结果是:" + i);
return i;
}, executor).whenComplete((res,exception)->{
//可以得到异常信息,但是不能返回修改数据
System.out.println("异步任务完成了...结果是..."+res+";异常是:"+exception);
}).exceptionally(throwable -> {
//可以感知异常,同时返回默认值
return 666;
});
Integer integer = future.get();
System.out.println("main.....end......"+integer);
}
}
结果
3、handle 方法
和 complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。
3.1 出现异常情况
public class ThreadTestDemo {
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main.....start......");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程是:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运算结果是:" + i);
return i;
}, executor).handle((res,thr)->{
if(res != null){
return res*2;
}
if(thr != null){
return 666;
}
return 0;
});
Integer integer = future.get();
System.out.println("main.....end......"+integer);
}
}
结果
3.2 正常情况
public class ThreadTestDemo {
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main.....start......");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程是:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运算结果是:" + i);
return i;
}, executor).handle((res,thr)->{
if(res != null){
return res*2;
}
if(thr != null){
return 666;
}
return 0;
});
Integer integer = future.get();
System.out.println("main.....end......"+integer);
}
}
结果
4、线程串行化方法
thenApply
方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
thenAccept
方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
thenRun
方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行thenRun
的后续操作
带有 Async 默认是异步执行的。同之前。
以上都要前置任务成功完成。
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型
4.1 thenRun
不能获取到上一步的执行结果
public class ThreadTestDemo {
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main.....start......");
CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程是:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运算结果是:" + i);
return i;
}, executor).thenRunAsync(()->{
System.out.println("任务二启动了");
},executor);
System.out.println("main.....end......");
}
}
结果
4.2 thenAccept
能接收上一步的返回值,但是无返回值
public class ThreadTestDemo {
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main.....start......");
CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程是:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运算结果是:" + i);
return i;
}, executor).thenAcceptAsync(res->{
System.out.println("任务二启动了,获取上一步的结果:"+res);
},executor);
System.out.println("main.....end......");
}
}
结果
4.3 thenApply
获取上一步的返回结果,同时返回结果
public class ThreadTestDemo {
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main.....start......");
CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程是:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运算结果是:" + i);
return i;
}, executor).thenApplyAsync(res->{
System.out.println("任务二启动了,上一步的结果是:"+res);
return "Hello" + res;
},executor);
System.out.println("main.....end......"+future1.get());
}
}
结果
5、两任务组合 - 都要完成
两个任务必须都完成,触发该任务。
thenCombine
:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值
thenAcceptBoth
:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值。
runAfterBoth
:组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后,处理该任务。
5.1 runAfterBoth
组合两个future,不接收值,也不返回值
public class ThreadTestDemo {
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main.....start......");
CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1线程开始..." + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("任务1线程结束..." + i);
return i;
}, executor);
CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2线程开始..." + Thread.currentThread().getId());
System.out.println("任务2线程结束..." );
return "hello";
}, executor);
future01.runAfterBothAsync(future02,()->{
System.out.println("任务三开始");
},executor);
System.out.println("main.....end......"+future01.get());
}
}
结果
5.2 thenAcceptBoth
接收前两个的值,不返回值
public class ThreadTestDemo {
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main.....start......");
CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1线程开始..." + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("任务1线程结束..." + i);
return i;
}, executor);
CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2线程开始..." + Thread.currentThread().getId());
System.out.println("任务2线程结束..." );
return "hello";
}, executor);
future01.thenAcceptBothAsync(future02,(f1,f2)->{
System.out.println("任务三开始...之前的结果是:"+f1 +"......"+f2);
},executor);
System.out.println("main.....end......"+future01.get());
}
结果
5.3 thenCombine
接收值同时返回值
public class ThreadTestDemo {
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main.....start......");
CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1线程开始..." + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("任务1线程结束..." + i);
return i;
}, executor);
CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2线程开始..." + Thread.currentThread().getId());
System.out.println("任务2线程结束..." );
return "hello";
}, executor);
CompletableFuture<Object> future = future01.thenCombineAsync(future02, (f1, f2) -> {
return "任务三开始:"+f1+"..."+f2+"---->";
}, executor);
System.out.println("main.....end......"+future.get());
}
结果
6、两任务组合 - 一个完成
当两个任务中,任意一个 future 任务完成的时候,执行任务。
applyToEither
:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
acceptEither
:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
runAfterEither
:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返回值。
6.1 runAfterEither
两个有一个执行结束,就执行。不接收返回值,本身没有返回值。休息几秒钟,看效果
public class ThreadTestDemo {
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main.....start......");
CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1线程开始..." + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("任务1线程结束..." + i);
return i;
}, executor);
CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2线程开始..." + Thread.currentThread().getId());
try {
Thread.sleep(6000);
System.out.println("任务2线程结束..." );
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}, executor);
future01.runAfterEitherAsync(future02, () -> {
System.out.println("任务三开始");
}, executor);
System.out.println("main.....end......");
}
}
效果
6.2 acceptEither
其中一个执行完,就执行,得到值,本身无返回值。两个future的返回值类型要相同。
public class ThreadTestDemo {
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main.....start......");
CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1线程开始..." + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("任务1线程结束..." + i);
return i;
}, executor);
CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2线程开始..." + Thread.currentThread().getId());
try {
Thread.sleep(6000);
System.out.println("任务2线程结束..." );
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}, executor);
future01.acceptEitherAsync(future02,(res)->{
System.out.println("任务三开始...之前的结果:"+res);
},executor);
System.out.println("main.....end......");
}
6.3 applyToEither
接收返回值,同时返回值
public class ThreadTestDemo {
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main.....start......");
CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1线程开始..." + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("任务1线程结束..." + i);
return i;
}, executor);
CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2线程开始..." + Thread.currentThread().getId());
try {
Thread.sleep(6000);
System.out.println("任务2线程结束..." );
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}, executor);
CompletableFuture<String> future = future01.applyToEitherAsync(future02, res -> {
System.out.println("任务三开始....之前的结果:" + res);
return res.toString() + " -->哈喽";
}, executor);
System.out.println("main.....end......"+future.get());
}
结果
7、多任务组合
allOf
:等待所有任务完成
anyOf
:只要有一个任务完成
7.1 allOf
public class ThreadTestDemo {
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main.....start......");
CompletableFuture<String> futureImage = CompletableFuture.supplyAsync(() -> {
System.out.println("商品图片查询");
return "水杯.png";
}, executor);
CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
System.out.println("商品属性查询");
return "黑色+128G";
}, executor);
CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
System.out.println("商品介绍查询");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "华为手机好用呢";
}, executor);
CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImage, futureAttr, futureDesc);
allOf.get();//阻塞等待
System.out.println("main.....end......");
}
}
如果没有allOf.get();
查看结果
7.2 anyOf
任意一个执行成功,就返回
public class ThreadTestDemo {
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main.....start......");
CompletableFuture<String> futureImage = CompletableFuture.supplyAsync(() -> {
System.out.println("商品图片查询");
return "水杯.png";
}, executor);
CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
System.out.println("商品属性查询");
return "黑色+128G";
}, executor);
CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
System.out.println("商品介绍查询");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "华为手机好用呢";
}, executor);
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImage, futureAttr, futureDesc);
System.out.println("main.....end......查看哪个执行成功"+anyOf.get());
}
结果