CompletableFuture使用
在实际项目中,一个接口可能需要同时获取多种不同的数据,然后再汇总返回,这种场景还是挺常见的。举个例子:用户请求获取订单信息,可能需要同时获取用户信息、商品详情、物流信息、商品推荐等数据。如果是串行(按顺序依次执行每个任务)执行的话,接口的响应速度会非常慢。考虑到这些任务之间有大部分都是 无前后顺序关联 的,可以 并行执行 ,就比如说调用获取商品详情的时候,可以同时调用获取物流信息。通过并行执行多个任务的方式,接口的响应速度会得到大幅优化。这个时候 CompletableFuture 就闪亮登场了
说明:使用CompletableFuture
异步编排大多方法都会有一个重载方法,会多出一个executor参数,用来传来自定义的线程池,如果不传就会使用默认的线程池
一些方法以及一个简单的例子
创建异步编排对象:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
线程串行方法:
// 使线程串行执行,无入参,无返回值
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);
// 使线程串行执行,有入参,无返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);
// 使线程串行执行,有入参,有返回值
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
多任务组合:
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
代码示例:
import lombok.SneakyThrows;
import java.util.concurrent.*;
public class CompletableFutureTest5 {
@SneakyThrows
public static void main(String[] args) {
//动态获取服务器核数
int processors = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
processors+1, // 核心线程个数 io:2n ,cpu: n+1 n:内核数据
processors+1,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
CompletableFuture<String> future01 = CompletableFuture.supplyAsync(() -> "任务1", executor);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> "任务2", executor);
CompletableFuture<String> future03 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务3";
}, executor);
// 串联起若干个线程任务, 没有返回值
CompletableFuture<Void> all = CompletableFuture.allOf(future01, future02, future03);
// 等待所有线程执行完成
// .join()和.get()都会阻塞并获取线程的执行情况
// .join()会抛出未经检查的异常,不会强制开发者处理异常 .get()会抛出检查异常,需要开发者处理
all.join();
all.get();
}
}
下面以我个人的项目中的实际应用举例:
全局自定义线程池配置:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 全局自定义线程池配置
*/
@Configuration
public class ThreadPoolConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor(){
//动态获取服务器核数
int processors = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
processors+1, // 核心线程个数 io:2n ,cpu: n+1 n:内核数据
processors+1,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
// 返回线程池对象
return threadPoolExecutor;
}
}
这是一个代驾项目司机模块结束代驾的接口,* 司机结束代驾服务页面非常复杂,数据的获取都需要远程调用,必然需要花费更多的时间。
假如司机结束代驾服务的每个查询,需要如下标注的时间才能完成
- 获取订单信息 1s
- 计算防止刷单 0.5s
- 计算订单实际里程 0.5s
- 计算订单实际代驾费用 1s
- …
使用 CompletableFuture 之前的代码:
@Override
public Boolean endDrive(OrderFeeForm orderFeeForm) {
//1.获取订单信息
OrderInfo orderInfo = orderInfoFeignClient.getOrderInfo(orderFeeForm.getOrderId()).getData();
if(orderInfo.getDriverId().longValue() != orderFeeForm.getDriverId().longValue()) {
throw new GuiguException(ResultCodeEnum.ARGUMENT_VALID_ERROR);
}
//2.计算订单实际里程
BigDecimal realDistance = locationFeignClient.calculateOrderRealDistance(orderFeeForm.getOrderId()).getData();
log.info("结束代驾,订单实际里程:{}", realDistance);
//3.计算代驾实际费用
FeeRuleRequestForm feeRuleRequestForm = new FeeRuleRequestForm();
feeRuleRequestForm.setDistance(realDistance);
feeRuleRequestForm.setStartTime(orderInfo.getStartServiceTime());
//等候时间
Integer waitMinute = Math.abs((int) ((orderInfo.getArriveTime().getTime() - orderInfo.getAcceptTime().getTime()) / (1000 * 60)));
feeRuleRequestForm.setWaitMinute(waitMinute);
log.info("结束代驾,费用参数:{}", JSON.toJSONString(feeRuleRequestForm));
FeeRuleResponseVo feeRuleResponseVo = feeRuleFeignClient.calculateOrderFee(feeRuleRequestForm).getData();
log.info("费用明细:{}", JSON.toJSONString(feeRuleResponseVo));
//订单总金额 需加上 路桥费、停车费、其他费用、乘客好处费
BigDecimal totalAmount = feeRuleResponseVo.getTotalAmount().add(orderFeeForm.getTollFee()).add(orderFeeForm.getParkingFee()).add(orderFeeForm.getOtherFee()).add(orderInfo.getFavourFee());
feeRuleResponseVo.setTotalAmount(totalAmount);
//4.计算系统奖励
//4.1.获取订单数
String startTime = new DateTime(orderInfo.getStartServiceTime()).toString("yyyy-MM-dd") + " 00:00:00";
String endTime = new DateTime(orderInfo.getStartServiceTime()).toString("yyyy-MM-dd") + " 24:00:00";
Long orderNum = orderInfoFeignClient.getOrderNumByTime(startTime, endTime).getData();
//4.2.封装参数
RewardRuleRequestForm rewardRuleRequestForm = new RewardRuleRequestForm();
rewardRuleRequestForm.setStartTime(orderInfo.getStartServiceTime());
rewardRuleRequestForm.setOrderNum(orderNum);
//4.3.执行
RewardRuleResponseVo rewardRuleResponseVo = rewardRuleFeignClient.calculateOrderRewardFee(rewardRuleRequestForm).getData();
log.info("结束代驾,系统奖励:{}", JSON.toJSONString(rewardRuleResponseVo));
//5.计算分账信息
ProfitsharingRuleRequestForm profitsharingRuleRequestForm = new ProfitsharingRuleRequestForm();
profitsharingRuleRequestForm.setOrderAmount(feeRuleResponseVo.getTotalAmount());
profitsharingRuleRequestForm.setOrderNum(orderNum);
ProfitsharingRuleResponseVo profitsharingRuleResponseVo = profitsharingRuleFeignClient.calculateOrderProfitsharingFee(profitsharingRuleRequestForm).getData();
log.info("结束代驾,分账信息:{}", JSON.toJSONString(profitsharingRuleResponseVo));
//6.封装更新订单账单相关实体对象
UpdateOrderBillForm updateOrderBillForm = new UpdateOrderBillForm();
updateOrderBillForm.setOrderId(orderFeeForm.getOrderId());
updateOrderBillForm.setDriverId(orderFeeForm.getDriverId());
//路桥费、停车费、其他费用
updateOrderBillForm.setTollFee(orderFeeForm.getTollFee());
updateOrderBillForm.setParkingFee(orderFeeForm.getParkingFee());
updateOrderBillForm.setOtherFee(orderFeeForm.getOtherFee());
//乘客好处费
updateOrderBillForm.setFavourFee(orderInfo.getFavourFee());
//实际里程
updateOrderBillForm.setRealDistance(realDistance);
//订单奖励信息
BeanUtils.copyProperties(rewardRuleResponseVo, updateOrderBillForm);
//代驾费用信息
BeanUtils.copyProperties(feeRuleResponseVo, updateOrderBillForm);
//分账相关信息
BeanUtils.copyProperties(profitsharingRuleResponseVo, updateOrderBillForm);
updateOrderBillForm.setProfitsharingRuleId(profitsharingRuleResponseVo.getProfitsharingRuleId());
log.info("结束代驾,更新账单信息:{}", JSON.toJSONString(updateOrderBillForm));
//7.结束代驾更新账单
orderInfoFeignClient.endDrive(updateOrderBillForm);
return true;
}
使用之后的代码:
@Autowired
private ThreadPoolExecutor threadPoolExecutor;
@SneakyThrows
@Override
public Boolean endDrive(OrderFeeForm orderFeeForm) {
//1.获取订单信息
CompletableFuture<OrderInfo> orderInfoCompletableFuture = CompletableFuture.supplyAsync(() -> {
OrderInfo orderInfo = orderInfoFeignClient.getOrderInfo(orderFeeForm.getOrderId()).getData();
return orderInfo;
}, threadPoolExecutor);
//2.防止刷单,计算司机的经纬度与代驾的终点经纬度是否在2公里范围内
CompletableFuture<OrderServiceLastLocationVo> orderServiceLastLocationVoCompletableFuture = CompletableFuture.supplyAsync((() -> {
OrderServiceLastLocationVo orderServiceLastLocationVo = locationFeignClient.getOrderServiceLastLocation(orderFeeForm.getOrderId()).getData();
return orderServiceLastLocationVo;
}), threadPoolExecutor);
//合并
CompletableFuture.allOf(orderInfoCompletableFuture,
orderServiceLastLocationVoCompletableFuture
).join();
//获取数据
OrderInfo orderInfo = orderInfoCompletableFuture.get();
//2.1.判断刷单
OrderServiceLastLocationVo orderServiceLastLocationVo = orderServiceLastLocationVoCompletableFuture.get();
//司机的位置与代驾终点位置的距离
double distance = LocationUtil.getDistance(orderInfo.getEndPointLatitude().doubleValue(), orderInfo.getEndPointLongitude().doubleValue(), orderServiceLastLocationVo.getLatitude().doubleValue(), orderServiceLastLocationVo.getLongitude().doubleValue());
if(distance > SystemConstant.DRIVER_START_LOCATION_DISTION) {
throw new GuiguException(ResultCodeEnum.DRIVER_END_LOCATION_DISTION_ERROR);
}
//3.计算订单实际里程
CompletableFuture<BigDecimal> realDistanceCompletableFuture = CompletableFuture.supplyAsync(() -> {
BigDecimal realDistance = locationFeignClient.calculateOrderRealDistance(orderFeeForm.getOrderId()).getData();
log.info("结束代驾,订单实际里程:{}", realDistance);
return realDistance;
}, threadPoolExecutor);
//4.计算代驾实际费用
CompletableFuture<FeeRuleResponseVo> feeRuleResponseVoCompletableFuture = realDistanceCompletableFuture.thenApplyAsync((realDistance)->{
FeeRuleRequestForm feeRuleRequestForm = new FeeRuleRequestForm();
feeRuleRequestForm.setDistance(realDistance);
feeRuleRequestForm.setStartTime(orderInfo.getStartServiceTime());
//等候时间
Integer waitMinute = Math.abs((int) ((orderInfo.getArriveTime().getTime() - orderInfo.getAcceptTime().getTime()) / (1000 * 60)));
feeRuleRequestForm.setWaitMinute(waitMinute);
log.info("结束代驾,费用参数:{}", JSON.toJSONString(feeRuleRequestForm));
FeeRuleResponseVo feeRuleResponseVo = feeRuleFeignClient.calculateOrderFee(feeRuleRequestForm).getData();
log.info("费用明细:{}", JSON.toJSONString(feeRuleResponseVo));
//订单总金额 需加上 路桥费、停车费、其他费用、乘客好处费
BigDecimal totalAmount = feeRuleResponseVo.getTotalAmount().add(orderFeeForm.getTollFee()).add(orderFeeForm.getParkingFee()).add(orderFeeForm.getOtherFee()).add(orderInfo.getFavourFee());
feeRuleResponseVo.setTotalAmount(totalAmount);
return feeRuleResponseVo;
});
//5.计算系统奖励
//5.1.获取订单数
CompletableFuture<Long> orderNumCompletableFuture = CompletableFuture.supplyAsync(() -> {
String startTime = new DateTime(orderInfo.getStartServiceTime()).toString("yyyy-MM-dd") + " 00:00:00";
String endTime = new DateTime(orderInfo.getStartServiceTime()).toString("yyyy-MM-dd") + " 24:00:00";
Long orderNum = orderInfoFeignClient.getOrderNumByTime(startTime, endTime).getData();
return orderNum;
}, threadPoolExecutor);
//5.2.封装参数
CompletableFuture<RewardRuleResponseVo> rewardRuleResponseVoCompletableFuture = orderNumCompletableFuture.thenApplyAsync((orderNum)->{
RewardRuleRequestForm rewardRuleRequestForm = new RewardRuleRequestForm();
rewardRuleRequestForm.setStartTime(orderInfo.getStartServiceTime());
rewardRuleRequestForm.setOrderNum(orderNum);
//5.3.执行
RewardRuleResponseVo rewardRuleResponseVo = rewardRuleFeignClient.calculateOrderRewardFee(rewardRuleRequestForm).getData();
log.info("结束代驾,系统奖励:{}", JSON.toJSONString(rewardRuleResponseVo));
return rewardRuleResponseVo;
});
//6.计算分账信息
CompletableFuture<ProfitsharingRuleResponseVo> profitsharingRuleResponseVoCompletableFuture = feeRuleResponseVoCompletableFuture.thenCombineAsync(orderNumCompletableFuture, (feeRuleResponseVo, orderNum)->{
ProfitsharingRuleRequestForm profitsharingRuleRequestForm = new ProfitsharingRuleRequestForm();
profitsharingRuleRequestForm.setOrderAmount(feeRuleResponseVo.getTotalAmount());
profitsharingRuleRequestForm.setOrderNum(orderNum);
ProfitsharingRuleResponseVo profitsharingRuleResponseVo = profitsharingRuleFeignClient.calculateOrderProfitsharingFee(profitsharingRuleRequestForm).getData();
log.info("结束代驾,分账信息:{}", JSON.toJSONString(profitsharingRuleResponseVo));
return profitsharingRuleResponseVo;
});
CompletableFuture.allOf(orderInfoCompletableFuture,
realDistanceCompletableFuture,
feeRuleResponseVoCompletableFuture,
orderNumCompletableFuture,
rewardRuleResponseVoCompletableFuture,
profitsharingRuleResponseVoCompletableFuture
).join();
//获取执行结果
BigDecimal realDistance = realDistanceCompletableFuture.get();
FeeRuleResponseVo feeRuleResponseVo = feeRuleResponseVoCompletableFuture.get();
RewardRuleResponseVo rewardRuleResponseVo = rewardRuleResponseVoCompletableFuture.get();
ProfitsharingRuleResponseVo profitsharingRuleResponseVo = profitsharingRuleResponseVoCompletableFuture.get();
//7.封装更新订单账单相关实体对象
UpdateOrderBillForm updateOrderBillForm = new UpdateOrderBillForm();
updateOrderBillForm.setOrderId(orderFeeForm.getOrderId());
updateOrderBillForm.setDriverId(orderFeeForm.getDriverId());
//路桥费、停车费、其他费用
updateOrderBillForm.setTollFee(orderFeeForm.getTollFee());
updateOrderBillForm.setParkingFee(orderFeeForm.getParkingFee());
updateOrderBillForm.setOtherFee(orderFeeForm.getOtherFee());
//乘客好处费
updateOrderBillForm.setFavourFee(orderInfo.getFavourFee());
//实际里程
updateOrderBillForm.setRealDistance(realDistance);
//订单奖励信息
BeanUtils.copyProperties(rewardRuleResponseVo, updateOrderBillForm);
//代驾费用信息
BeanUtils.copyProperties(feeRuleResponseVo, updateOrderBillForm);
//分账相关信息
BeanUtils.copyProperties(profitsharingRuleResponseVo, updateOrderBillForm);
updateOrderBillForm.setProfitsharingRuleId(profitsharingRuleResponseVo.getProfitsharingRuleId());
log.info("结束代驾,更新账单信息:{}", JSON.toJSONString(updateOrderBillForm));
//8.结束代驾更新账单
orderInfoFeignClient.endDrive(updateOrderBillForm);
return true;
}
使用 CompletableFuture 来进行一步任务编排,将串行的多个接口调用优化为并行的接口调用,最后只需要整合返回结果,以此来加快接口的响应速度