六、线程间的协作原理场景剖析
1. 如何使用同步工具CountDownLatch协调多线程?
解释:
CountDownLatch计数器,为0之前会一直阻塞,等待其他线程完成。
几个线程等待另外几个线程减少计数器的值 - 代码示例:
package imooc7.cdl;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @Description:测试几个线程等待另外几个线程减少计数器的值
*/
public class TestCountDownLatch {
public static void main(String[] args) {
//传入一个int值给countDownLatch
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(()->{
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("客户支付尾款");
countDownLatch.countDown();
System.out.println("客户支付后续操作");
}).start();
new Thread(()->{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("组装家具结束");
countDownLatch.countDown();
System.out.println("组装家具后续操作");
}).start();
//发货线程
new Thread(()->{
try {
//建议: 使用待超时时间的await方法
countDownLatch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("支付尾款和组装家具都已经完成,可以发货");
System.out.println("发货执行");
}).start();
new Thread(()->{
try {
countDownLatch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("支付尾款和组装家具都已经完成,可以发邮件通知客户");
System.out.println("发邮件执行 ");
}).start();
}
}
发令枪模式 - 代码示例:
package imooc7.cdl;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @Description:测试发令枪模式
*/
public class TestCountDownLatch1 {
public static void main(String[] args) {
//发令枪模式传入的int值为1
CountDownLatch countDownLatch = new CountDownLatch(1);
//模拟田径运动
for (int i = 0; i < 10; i++) {
//10个运动员
new Thread(()->{
try {
countDownLatch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("运动员起跑 "+Thread.currentThread().getName());
}).start();
}
//裁判线程,开发令枪
new Thread(()->{
System.out.println("裁判准备开枪");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
countDownLatch.countDown();
}).start();
}
}
运行结果:
解释:
1.使用join(),需要有线程对象调用join(),而是有那个线程池,无法获取当前线程对象;
2.CountDownLatch更加通用,而Thread.join()是等待join()线程的完成;
3.使用Thread.join()方式, 只能等待join()的线程结束,当前线程才能继续,而使用CountDownLatch,子线程可以继续执行操作流程;
2.如何使用同步工具CyclicBarrier协调多线程?
代码示例:
package imooc7.cb;
import java.util.Random;
import java.util.concurrent.*;
/**
* @Description:模拟多人游戏流程
*/
public class TestCyclicBarrierOld {
static CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
public static void main(String[] args) {
System.out.println(new Random().nextInt(3));
for (int i = 0; i < 4; i++) {
//模拟玩家线程
int finalI = i;
new Thread(()->{
try {
//匹配玩家
matchPlayer(finalI);
//选择角色
selectCharacter(finalI);
loadGame(finalI);
//加载游戏资源
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}).start();
}
}
private static void loadGame(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
System.out.println("player "+playNumber+" 加载游戏资源");
Thread.sleep(1000+new Random().nextInt(3)*1000);
System.out.println("player "+playNumber+" 完成加载游戏资源");
cyclicBarrier.await(10, TimeUnit.SECONDS);
}
private static void selectCharacter(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
System.out.println("player "+playNumber+" 选择角色");
Thread.sleep(1000+new Random().nextInt(3)*1000);
System.out.println("player "+playNumber+" 完成选择角色");
cyclicBarrier.await(10, TimeUnit.SECONDS);
}
private static void matchPlayer(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
System.out.println("player "+playNumber+" 开始匹配玩家");
Thread.sleep(1000+new Random().nextInt(3)*1000);
System.out.println("player "+playNumber+" 完成匹配玩家");
cyclicBarrier.await(10, TimeUnit.SECONDS);
}
public enum Step{
NONE,
MATCH,
SELECT,
LOAD;
}
}
运行结果:
package imooc7.cb;
import java.util.Random;
import java.util.concurrent.*;
/**
* @Description:模拟多人游戏流程
*/
public class TestCyclicBarrierOld {
//CyclicBarrier此处回调方法为同步方法
static CyclicBarrier cyclicBarrier = new CyclicBarrier(4, ()-> {
try {
//同步方法,会等待此处代码执行完成,所有线程才会继续执行下一步流程
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("游戏阶段到达");
});
public static void main(String[] args) {
System.out.println(new Random().nextInt(3));
for (int i = 0; i < 4; i++) {
//模拟玩家线程
int finalI = i;
new Thread(()->{
try {
//匹配玩家
matchPlayer(finalI);
//选择角色
selectCharacter(finalI);
loadGame(finalI);
//加载游戏资源
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}).start();
}
}
private static void loadGame(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
System.out.println("player "+playNumber+" 加载游戏资源");
Thread.sleep(1000+new Random().nextInt(3)*1000);
System.out.println("player "+playNumber+" 完成加载游戏资源");
cyclicBarrier.await(10, TimeUnit.SECONDS);
}
private static void selectCharacter(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
System.out.println("player "+playNumber+" 选择角色");
Thread.sleep(1000+new Random().nextInt(3)*1000);
System.out.println("player "+playNumber+" 完成选择角色");
cyclicBarrier.await(10, TimeUnit.SECONDS);
}
private static void matchPlayer(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
System.out.println("player "+playNumber+" 开始匹配玩家");
Thread.sleep(1000+new Random().nextInt(3)*1000);
System.out.println("player "+playNumber+" 完成匹配玩家");
cyclicBarrier.await(10, TimeUnit.SECONDS);
}
public enum Step{
NONE,
MATCH,
SELECT,
LOAD;
}
}
运行结果:
解释:
每一阶段最后一个线程执行完成后,会同步执行CyclicBarrier的回调方法,如果回调方法比较耗时,可以改成异步方法。
优化为异步执行:
package imooc7.cb;
import java.util.Random;
import java.util.concurrent.*;
/**
* @Description:模拟多人游戏流程
*/
public class TestCyclicBarrierOld {
static ExecutorService executorService = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
static CyclicBarrier cyclicBarrier = new CyclicBarrier(4, ()-> {
//通过使用线程池将同步执行回调方法改为异步执行回调方法
executorService.execute(()-> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("游戏阶段到达");
});
});
public static void main(String[] args) {
System.out.println(new Random().nextInt(3));
for (int i = 0; i < 4; i++) {
//模拟玩家线程
int finalI = i;
new Thread(()->{
try {
//匹配玩家
matchPlayer(finalI);
//选择角色
selectCharacter(finalI);
loadGame(finalI);
//加载游戏资源
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}).start();
}
}
private static void loadGame(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
System.out.println("player "+playNumber+" 加载游戏资源");
Thread.sleep(1000+new Random().nextInt(3)*1000);
System.out.println("player "+playNumber+" 完成加载游戏资源");
cyclicBarrier.await(10, TimeUnit.SECONDS);
}
private static void selectCharacter(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
System.out.println("player "+playNumber+" 选择角色");
Thread.sleep(1000+new Random().nextInt(3)*1000);
System.out.println("player "+playNumber+" 完成选择角色");
cyclicBarrier.await(10, TimeUnit.SECONDS);
}
private static void matchPlayer(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
System.out.println("player "+playNumber+" 开始匹配玩家");
Thread.sleep(1000+new Random().nextInt(3)*1000);
System.out.println("player "+playNumber+" 完成匹配玩家");
cyclicBarrier.await(10, TimeUnit.SECONDS);
}
public enum Step{
NONE,
MATCH,
SELECT,
LOAD;
}
}
运行结果:
优化代码 - 增加每个阶段结束语
package imooc7.cb;
import java.util.Random;
import java.util.concurrent.*;
/**
* @Description:模拟多人游戏流程
*/
public class TestCyclicBarrierOld {
static Step step;
static ExecutorService executorService = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
static CyclicBarrier cyclicBarrier = new CyclicBarrier(4, ()-> {
// executorService.execute(()-> {
switch (step) {
case MATCH -> System.out.println("游戏匹配结束");
case SELECT -> System.out.println("游戏选择角色结束");
case LOAD -> System.out.println("游戏加载结束");
}
// });
});
public static void main(String[] args) {
System.out.println(new Random().nextInt(3));
for (int i = 0; i < 4; i++) {
//模拟玩家线程
int finalI = i;
new Thread(()->{
try {
//匹配玩家
step = Step.MATCH;
matchPlayer(finalI);
//选择角色
step = Step.SELECT;
selectCharacter(finalI);
//加载游戏资源
step = Step.LOAD;
loadGame(finalI);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}).start();
}
}
private static void loadGame(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
System.out.println("player "+playNumber+" 加载游戏资源");
Thread.sleep(1000+new Random().nextInt(3)*1000);
System.out.println("player "+playNumber+" 完成加载游戏资源");
cyclicBarrier.await(10, TimeUnit.SECONDS);
}
private static void selectCharacter(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
System.out.println("player "+playNumber+" 选择角色");
Thread.sleep(1000+new Random().nextInt(3)*1000);
System.out.println("player "+playNumber+" 完成选择角色");
cyclicBarrier.await(10, TimeUnit.SECONDS);
}
private static void matchPlayer(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
System.out.println("player "+playNumber+" 开始匹配玩家");
Thread.sleep(1000+new Random().nextInt(3)*1000);
System.out.println("player "+playNumber+" 完成匹配玩家");
cyclicBarrier.await(10, TimeUnit.SECONDS);
}
public enum Step{
NONE,
MATCH,
SELECT,
LOAD;
}
}
运行结果:
3.异步编程(一)CompletableFuture介绍
解释:
1.Future只提供了异步线程的结果返回,并没有提供回调机制;
2.无法编排任务,处理有依赖关系的任务;
3.无法处理有竞争的任务,比如实现最快的任务结束直接返回。
Future的不足:
package imooc7.cf;
import java.util.concurrent.*;
/**
* @Description:Future的不足
*/
public class FutureLimit {
//生成订单的三个步骤
//1 查询商品信息
//2 查询用户信息
//3 生成订单
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
//查询商品信息
Future<String> productFuture = executorService.submit(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "iphone";
});
//查询用户信息
Future<String> userFuture = executorService.submit(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "小明";
});
//生成订单
Future<String> orderFuture = executorService.submit(() -> {
try {
String productName = productFuture.get(3, TimeUnit.SECONDS);
String userName = userFuture.get(3, TimeUnit.SECONDS);
return productName + " : " + userName;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
});
try {
System.out.println(orderFuture.get(3, TimeUnit.SECONDS));
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
运行结果:
结论:
虽然实现了上面看了的功能,但是还是存在不足的,
1)查询商品信息、查询用户信息结束后,通过回调去生成商品订单,是无法支持的;
2)无法支持编排任务 - 无法在语义上实现前两个任务执行结束,第三个任务开始执行;
3)刚才3个线程,代码耦合度非常高。
4.异步编程(二)开启异步任务
建议:在实际业务中,runAsync使用传入线程池的方法。
建议:在实际业务中,supplyAsync使用传入线程池的方法。
runAsync示例:
package imooc7.cf;
import java.util.concurrent.*;
/**
* @Description:开启异步任务
*/
public class CompletableFutureTest1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//runAsync方法
ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("异步任务开始 " + Thread.currentThread().getName());
},executorService);
future.get();
//异步任务开始 ForkJoinPool.commonPool-worker-1
}
}
supplyAsync示例:
package imooc7.cf;
import java.util.concurrent.*;
/**
* @Description:开启异步任务 supplyAsync
*/
public class CompletableFutureTest2 {
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
return "iphone";
});
//获取返回值
// String name = future.get();
//使用join()不用手动处理编译异常,只会有RunntimeException
//使用get()需要手动处理编译异常
future.join();
}
}
注意:
1)future.get()和 future.join()都可以获得异步任务的返回值;
2)使用join()不用手动处理编译异常,只会有RunntimeException,使用get()需要手动处理编译异常,推荐使用join()更加优雅。
5.异步编程(三)异步任务回调
thenRun代码示例:、
package imooc7.cf;
import java.util.concurrent.*;
/**
* @Description:测试回调
*/
public class TestCallBack {
public static void main(String[] args) {
//1 测试thenRun
ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName());
}).thenRun(() -> {
System.out.println(Thread.currentThread().getName());
System.out.println("thenRun回调函数执行");
});
future.join();
}
}
解释:
thenRun回调函数所在线程跟runAsync所在线程为同一线程。
运行结果:
thenRunAsync示例:
package imooc7.cf;
import java.util.concurrent.*;
/**
* @Description:测试回调
*/
public class TestCallBack {
public static void main(String[] args) {
//1 测试thenRun
ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName());
}).thenRunAsync(() -> {
System.out.println(Thread.currentThread().getName());
System.out.println("thenRunAsync回调函数执行");
}, executorService);
future.join();
}
}
解释:
thenRunAsync回调函数所在线程跟runAsync所在线程为不是同一个线程,异步执行。
运行结果:
thenAccept同步回调代码示例:
package imooc7.cf;
import java.util.concurrent.*;
/**
* @Description:测试回调
*/
public class TestCallBack {
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
//2 测试thenAccept()
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName());
return "iphone";
},executorService).thenAccept(name -> {
//这里的参数name,就是上面异步变成返回的"iphone"
System.out.println(name+" "+Thread.currentThread().getName());
});
future.join();
}
}
运行结果:
thenAcceptAsync异步 回调代码示例:
package imooc7.cf;
import java.util.concurrent.*;
/**
* @Description:测试回调
*/
public class TestCallBack {
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
//2 测试thenAccept()
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName());
return "iphone";
},executorService).thenAcceptAsync(name -> {
//这里的参数name,就是上面异步变成返回的"iphone"
System.out.println(name+" "+Thread.currentThread().getName());
},executorService);
future.join();
}
}
运行结果:
thenApply回调示例:
package imooc7.cf;
import java.util.concurrent.*;
/**
* @Description:测试回调
*/
public class TestCallBack {
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
//3 thenApply
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName());
return "iphone";
}).thenApply(name -> {
System.out.println(name + " " + Thread.currentThread().getName());
return 1;
});
Integer join = future.join();
System.out.println(join);
}
}
运行结果:
thenApplyAsync回调示例:
package imooc7.cf;
import java.util.concurrent.*;
/**
* @Description:测试回调
*/
public class TestCallBack {
public static void main(String[] args) {
//1 测试thenRun
ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
//3 thenApply
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName());
return "iphone";
}).thenApplyAsync(name -> {
System.out.println(name + " " + Thread.currentThread().getName());
return 1;
}, executorService);
Integer join = future.join();
System.out.println(join);
}
}
运行结果:
总结:
我们可以根据是否需要参数,是否需要返回值,作为异步编程回调函数选择依据。
6.异步编程(四)异步任务编排
thenCompose示例:
package imooc7.cf;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
/**
* @Description: 测试thenCompose
* 获取用户订单
* 1 获取用户信息
* 2 获取用户相关订单
*/
public class TestThenCompose {
public static void main(String[] args) {
//获取用户信息
CompletableFuture<User> userCompletableFuture = CompletableFuture.supplyAsync(() -> {
return getUserInfo();
});
//获取用户订单
CompletableFuture<Order> orderCompletableFuture = userCompletableFuture.thenCompose(user -> {
return getUserOrder(user);
});
CompletableFuture<CompletableFuture<Order>> completableFutureCompletableFuture = userCompletableFuture.thenApply(user -> getUserOrder(user));
System.out.println(orderCompletableFuture.join());
//扁平化 flatMap,可以把嵌套的数据转换为单层的数据
// List<klass> klasses = new ArrayList<>();
// //List<User> list ??
// List<List<User>> collect = klasses.stream().map(klass -> klass.userList).collect(Collectors.toList());
// List<User> collect1 = klasses.stream().flatMap(klass -> klass.userList.stream()).collect(Collectors.toList());
}
private static CompletableFuture<Order> getUserOrder(User user) {
return CompletableFuture.supplyAsync(()->{
return new Order(10,user.getId(),"手机");
});
}
private static User getUserInfo() {
return new User(1,"小明");
}
static class Order{
private Integer id;
private Integer userId;
private String product;
public Order(Integer id, Integer userId, String product) {
this.id = id;
this.userId = userId;
this.product = product;
}
@Override
public String toString() {
return "Order{" +
"id=" + id +
", userId=" + userId +
", product='" + product + '\'' +
'}';
}
}
static class User{
private Integer id;
private String name;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public User(Integer id, String name) {
this.id = id;
this.name = name;
}
}
//班级类
static class klass{
List<User> userList;
}
}
运行结果:
thenCombine示例:
package imooc7.cf;
import java.util.concurrent.CompletableFuture;
/**
* @Description: 测试thenCombine
* 生成订单
* 1 查询用户信息
* 2 查询商品信息
* 组装订单
*/
public class TestThenCombine {
public static void main(String[] args) {
//1 查询用户信息
CompletableFuture<User> userCompletableFuture = CompletableFuture.supplyAsync(() -> {
return getUserInfo();
});
//2 查询商品信息
CompletableFuture<Product> productCompletableFuture = CompletableFuture.supplyAsync(() -> {
return getProductInfo();
});
//联合 1 和2 的结果
//参数user和参数product就是前面两个CompletableFuture返回的结果
CompletableFuture<Order> orderCompletableFuture = userCompletableFuture.thenCombine(productCompletableFuture, (user, product) -> {
return new Order(user, product);
});
System.out.println(orderCompletableFuture.join());
}
private static Product getProductInfo() {
return new Product(10,"手机");
}
private static User getUserInfo() {
return new User(1,"小明");
}
static class User{
private Integer id;
private String name;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public User(Integer id, String name) {
this.id = id;
this.name = name;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}
static class Product{
private Integer id;
private String productName;
public Product(Integer id, String productName) {
this.id = id;
this.productName = productName;
}
@Override
public String toString() {
return "Product{" +
"id=" + id +
", productName='" + productName + '\'' +
'}';
}
}
static class Order{
private User user;
private Product product;
public Order(User user, Product product) {
this.user = user;
this.product = product;
}
@Override
public String toString() {
return "Order{" +
"user=" + user +
", product=" + product +
'}';
}
}
}
运行结果:
7.异步编程(五)竞争性任务处理
anyOf()示例:
package imooc7.cf;
import java.util.concurrent.CompletableFuture;
/**
* @Description:测试anyof 任意一个结束就返回
*/
public class TestAnyOf {
public static void main(String[] args) {
//查询天气预报
CompletableFuture<String> weatherForService = getWeatherForService("A",1000L);
CompletableFuture<String> weatherForService1 = getWeatherForService("B",2000L);
CompletableFuture<String> weatherForService2 = getWeatherForService("C",3000L);
CompletableFuture<Object> objectCompletableFuture = CompletableFuture.anyOf(weatherForService, weatherForService1, weatherForService2);
System.out.println(objectCompletableFuture.join());
}
public static CompletableFuture<String> getWeatherForService(String serviceName, Long time){
return CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(time);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return serviceName+" 20";
});
}
}
allOf示例:
package imooc7.cf;
import java.util.concurrent.CompletableFuture;
/**
* @Description: 测试allOf方法
* 比价
* 都返回才可以比较
*/
public class TestAllOf {
public static void main(String[] args) {
CompletableFuture<Integer> completableFuture1 = getPriceFromPlatform("A", 1000L);
CompletableFuture<Integer> completableFuture2 = getPriceFromPlatform("B", 2000L);
CompletableFuture<Integer> completableFuture3 = getPriceFromPlatform("C", 3000L);
CompletableFuture<Void> future = CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3);
future.join();
// System.out.println(completableFuture1.join());
// System.out.println(completableFuture2.join());
// System.out.println(completableFuture3.join());
Integer priceA = completableFuture1.join();
Integer priceB = completableFuture2.join();
Integer priceC = completableFuture3.join();
int min = Math.min(priceA, Math.min(priceB, priceC));
System.out.println(min);
}
public static CompletableFuture<Integer> getPriceFromPlatform(String platform,Long time){
return CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(time);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if(platform.equals("A")){
return 100;
}else if(platform.equals("B")){
return 200;
}else{
return 300;
}
});
}
}
8.异步编程(六)异步任务异常处理
代码示例:
package imooc7.cf;
import java.util.concurrent.CompletableFuture;
/**
* @Description:
*/
public class TestException {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "async1";
}).thenApply(content -> {
int i = 1/0;
return content + " async2";
}).thenApply(content -> {
return content + " async3";
}).exceptionally(ex->{
//异常处理
System.out.println(ex.getMessage());
return "exception";
});
System.out.println(future.join());
}
}
总结:
不管是在异步代码还是回调代码出现异常,都可以通过exceptionally捕捉到。
代码示例:
package imooc7.cf;
import java.util.concurrent.CompletableFuture;
/**
* @Description:
*/
public class TestHandle {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "async1";
}).handle((res,ex)->{
System.out.println("handle执行了");
if(ex!=null){
System.out.println(ex.getMessage());
return "exception1";
}
return res;
}).thenApply(content -> {
int i = 1/0;
return content + " async2";
}).handle((res,ex)->{
if(ex!=null){
System.out.println(ex.getMessage());
return "exception2";
}
return res;
}) .thenApply(content -> {
return content + " async3";
});
System.out.println(future.join());
}
}
总结:每一个异步任务,都可以有一个handle(),用于上一个异步任务处理数据和异常。
package imooc7.cf;
import java.util.concurrent.CompletableFuture;
/**
* @Description:
*/
public class TestWhenComplete {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "async1";
}).thenApply(content -> {
int i = 1/0;
return content + " async2";
}).thenApply(content -> {
return content + " async3";
}).whenComplete((res,ex)->{
if(ex!=null){
System.out.println(ex.getMessage());
}else{
System.out.println(res);
}
});
// System.out.println(future.join());
}
}
总结:
三种处理异常的方式,如何选择呢?
1)如果想要通用的异常处理,可以使用exceptionally()或whenComplete();
2) 如果每一步异步任务需要定制化处理,可以使用handle()。
9.CompletionService:如何批量执行异步任务?
需求:将商品链接转成二维码,返回打码链接并发送给客户,而且要支持批量打码!
package imooc7.cs;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
/**
* @Description:图片打二维码
*/
public class TestAddCode {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
//存储图片地址
List<String> images = new ArrayList<>();
images.add("image1");
images.add("image2");
images.add("image3");
images.add("image4");
images.add("image5");
images.add("image6");
List<Future<String>> futures = new ArrayList<>();
for (String image : images) {
Future<String> future = executorService.submit(() -> {
try {
Thread.sleep(new Random().nextLong(5) * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return image+ " Test";
});
futures.add(future);
}
for (Future<String> future : futures) {
String res = future.get();
sendToCustomer(res);
}
}
private static void sendToCustomer(String res) {
System.out.println("send image to customer "+res);
}
}
运行结果:
package imooc7.cs;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
/**
* @Description:
*/
public class TestAddCode1 {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(100);
//存储图片地址
List<String> images = new ArrayList<>();
images.add("image1");
images.add("image2");
images.add("image3");
images.add("image4");
images.add("image5");
images.add("image6");
List<Future<String>> futures = new ArrayList<>();
for (String image : images) {
Future<String> future = executorService.submit(() -> {
try {
Thread.sleep(new Random().nextLong(5) * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return image+ " Test";
});
futures.add(future);
}
for (Future<String> future : futures) {
executorService.submit(()->{
try {
blockingQueue.put(future.get());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
});
}
for (int i = 0; i < images.size(); i++) {
String take = blockingQueue.take();
sendToCustomer(take);
}
}
private static void sendToCustomer(String res) {
System.out.println("send image to customer "+res);
}
}
运行结果:
CompletionService示例:
package imooc7.cs;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
/**
* @Description:使用completionService实现打二维码业务
*/
public class TestAddCode2 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
CompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
//存储图片地址
List<String> images = new ArrayList<>();
images.add("image1");
images.add("image2");
images.add("image3");
images.add("image4");
images.add("image5");
images.add("image6");
for (String image : images) {
completionService.submit(()->{
Thread.sleep(new Random().nextLong(5)*1000);
return image+" Test";
});
}
for (int i = 0; i < images.size(); i++) {
String res = completionService.take().get();
sendToCustomer(res);
}
}
private static void sendToCustomer(String res) {
System.out.println("send image to customer "+res);
}
}
运行结果:
这样写代码简洁了很多。