当前位置: 首页 > article >正文

CompletableFuture使用

在实际项目中,一个接口可能需要同时获取多种不同的数据,然后再汇总返回,这种场景还是挺常见的。举个例子:用户请求获取订单信息,可能需要同时获取用户信息、商品详情、物流信息、商品推荐等数据。如果是串行(按顺序依次执行每个任务)执行的话,接口的响应速度会非常慢。考虑到这些任务之间有大部分都是 无前后顺序关联 的,可以 并行执行 ,就比如说调用获取商品详情的时候,可以同时调用获取物流信息。通过并行执行多个任务的方式,接口的响应速度会得到大幅优化。这个时候 CompletableFuture 就闪亮登场了
 
 
 

说明:使用CompletableFuture异步编排大多方法都会有一个重载方法,会多出一个executor参数,用来传来自定义的线程池,如果不传就会使用默认的线程池

一些方法以及一个简单的例子

创建异步编排对象:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

线程串行方法:

// 使线程串行执行,无入参,无返回值
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);

// 使线程串行执行,有入参,无返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);

// 使线程串行执行,有入参,有返回值
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

多任务组合:

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

代码示例:

import lombok.SneakyThrows;

import java.util.concurrent.*;

public class CompletableFutureTest5 {

    @SneakyThrows
    public static void main(String[] args) {
        //动态获取服务器核数
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                processors+1, // 核心线程个数 io:2n ,cpu: n+1  n:内核数据
                processors+1,
                0,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );

        CompletableFuture<String> future01 = CompletableFuture.supplyAsync(() -> "任务1", executor);
        CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> "任务2", executor);
        CompletableFuture<String> future03 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务3";
        }, executor);

        // 串联起若干个线程任务, 没有返回值
        CompletableFuture<Void> all = CompletableFuture.allOf(future01, future02, future03);
        // 等待所有线程执行完成
        // .join()和.get()都会阻塞并获取线程的执行情况
        // .join()会抛出未经检查的异常,不会强制开发者处理异常 .get()会抛出检查异常,需要开发者处理
        all.join();
        all.get();
    }
}

 
  
 
 

下面以我个人的项目中的实际应用举例:

全局自定义线程池配置:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 全局自定义线程池配置
 */
@Configuration
public class ThreadPoolConfig {

    @Bean
    public ThreadPoolExecutor threadPoolExecutor(){
        //动态获取服务器核数
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                processors+1, // 核心线程个数 io:2n ,cpu: n+1  n:内核数据
                processors+1,
                0,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
        //  返回线程池对象
        return threadPoolExecutor;
    }
}

 
 
 
这是一个代驾项目司机模块结束代驾的接口,* 司机结束代驾服务页面非常复杂,数据的获取都需要远程调用,必然需要花费更多的时间。

假如司机结束代驾服务的每个查询,需要如下标注的时间才能完成

  1. 获取订单信息 1s
  2. 计算防止刷单 0.5s
  3. 计算订单实际里程 0.5s
  4. 计算订单实际代驾费用 1s

使用 CompletableFuture 之前的代码:

@Override
    public Boolean endDrive(OrderFeeForm orderFeeForm) {
        //1.获取订单信息
        OrderInfo orderInfo = orderInfoFeignClient.getOrderInfo(orderFeeForm.getOrderId()).getData();
        if(orderInfo.getDriverId().longValue() != orderFeeForm.getDriverId().longValue()) {
            throw new GuiguException(ResultCodeEnum.ARGUMENT_VALID_ERROR);
        }

        //2.计算订单实际里程
        BigDecimal realDistance = locationFeignClient.calculateOrderRealDistance(orderFeeForm.getOrderId()).getData();
        log.info("结束代驾,订单实际里程:{}", realDistance);

        //3.计算代驾实际费用
        FeeRuleRequestForm feeRuleRequestForm = new FeeRuleRequestForm();
        feeRuleRequestForm.setDistance(realDistance);
        feeRuleRequestForm.setStartTime(orderInfo.getStartServiceTime());
        //等候时间
        Integer waitMinute = Math.abs((int) ((orderInfo.getArriveTime().getTime() - orderInfo.getAcceptTime().getTime()) / (1000 * 60)));
        feeRuleRequestForm.setWaitMinute(waitMinute);
        log.info("结束代驾,费用参数:{}", JSON.toJSONString(feeRuleRequestForm));
        FeeRuleResponseVo feeRuleResponseVo = feeRuleFeignClient.calculateOrderFee(feeRuleRequestForm).getData();
        log.info("费用明细:{}", JSON.toJSONString(feeRuleResponseVo));
        //订单总金额 需加上 路桥费、停车费、其他费用、乘客好处费
        BigDecimal totalAmount = feeRuleResponseVo.getTotalAmount().add(orderFeeForm.getTollFee()).add(orderFeeForm.getParkingFee()).add(orderFeeForm.getOtherFee()).add(orderInfo.getFavourFee());
        feeRuleResponseVo.setTotalAmount(totalAmount);

        //4.计算系统奖励
        //4.1.获取订单数
        String startTime = new DateTime(orderInfo.getStartServiceTime()).toString("yyyy-MM-dd") + " 00:00:00";
        String endTime = new DateTime(orderInfo.getStartServiceTime()).toString("yyyy-MM-dd") + " 24:00:00";
        Long orderNum = orderInfoFeignClient.getOrderNumByTime(startTime, endTime).getData();
        //4.2.封装参数
        RewardRuleRequestForm rewardRuleRequestForm = new RewardRuleRequestForm();
        rewardRuleRequestForm.setStartTime(orderInfo.getStartServiceTime());
        rewardRuleRequestForm.setOrderNum(orderNum);
        //4.3.执行
        RewardRuleResponseVo rewardRuleResponseVo = rewardRuleFeignClient.calculateOrderRewardFee(rewardRuleRequestForm).getData();
        log.info("结束代驾,系统奖励:{}", JSON.toJSONString(rewardRuleResponseVo));

        //5.计算分账信息
        ProfitsharingRuleRequestForm profitsharingRuleRequestForm = new ProfitsharingRuleRequestForm();
        profitsharingRuleRequestForm.setOrderAmount(feeRuleResponseVo.getTotalAmount());
        profitsharingRuleRequestForm.setOrderNum(orderNum);
        ProfitsharingRuleResponseVo profitsharingRuleResponseVo = profitsharingRuleFeignClient.calculateOrderProfitsharingFee(profitsharingRuleRequestForm).getData();
        log.info("结束代驾,分账信息:{}", JSON.toJSONString(profitsharingRuleResponseVo));

        //6.封装更新订单账单相关实体对象
        UpdateOrderBillForm updateOrderBillForm = new UpdateOrderBillForm();
        updateOrderBillForm.setOrderId(orderFeeForm.getOrderId());
        updateOrderBillForm.setDriverId(orderFeeForm.getDriverId());
        //路桥费、停车费、其他费用
        updateOrderBillForm.setTollFee(orderFeeForm.getTollFee());
        updateOrderBillForm.setParkingFee(orderFeeForm.getParkingFee());
        updateOrderBillForm.setOtherFee(orderFeeForm.getOtherFee());
        //乘客好处费
        updateOrderBillForm.setFavourFee(orderInfo.getFavourFee());

        //实际里程
        updateOrderBillForm.setRealDistance(realDistance);
        //订单奖励信息
        BeanUtils.copyProperties(rewardRuleResponseVo, updateOrderBillForm);
        //代驾费用信息
        BeanUtils.copyProperties(feeRuleResponseVo, updateOrderBillForm);

        //分账相关信息
        BeanUtils.copyProperties(profitsharingRuleResponseVo, updateOrderBillForm);
        updateOrderBillForm.setProfitsharingRuleId(profitsharingRuleResponseVo.getProfitsharingRuleId());
        log.info("结束代驾,更新账单信息:{}", JSON.toJSONString(updateOrderBillForm));

        //7.结束代驾更新账单
        orderInfoFeignClient.endDrive(updateOrderBillForm);
        return true;
    }
使用之后的代码:
@Autowired
private ThreadPoolExecutor threadPoolExecutor;

@SneakyThrows
@Override
public Boolean endDrive(OrderFeeForm orderFeeForm) {
   //1.获取订单信息
   CompletableFuture<OrderInfo> orderInfoCompletableFuture = CompletableFuture.supplyAsync(() -> {
      OrderInfo orderInfo = orderInfoFeignClient.getOrderInfo(orderFeeForm.getOrderId()).getData();
      return orderInfo;
   }, threadPoolExecutor);

   //2.防止刷单,计算司机的经纬度与代驾的终点经纬度是否在2公里范围内
   CompletableFuture<OrderServiceLastLocationVo> orderServiceLastLocationVoCompletableFuture = CompletableFuture.supplyAsync((() -> {
      OrderServiceLastLocationVo orderServiceLastLocationVo = locationFeignClient.getOrderServiceLastLocation(orderFeeForm.getOrderId()).getData();
      return orderServiceLastLocationVo;
   }), threadPoolExecutor);

   //合并
   CompletableFuture.allOf(orderInfoCompletableFuture,
         orderServiceLastLocationVoCompletableFuture
   ).join();

   //获取数据
   OrderInfo orderInfo = orderInfoCompletableFuture.get();
   //2.1.判断刷单
   OrderServiceLastLocationVo orderServiceLastLocationVo = orderServiceLastLocationVoCompletableFuture.get();
   //司机的位置与代驾终点位置的距离
   double distance = LocationUtil.getDistance(orderInfo.getEndPointLatitude().doubleValue(), orderInfo.getEndPointLongitude().doubleValue(), orderServiceLastLocationVo.getLatitude().doubleValue(), orderServiceLastLocationVo.getLongitude().doubleValue());
   if(distance > SystemConstant.DRIVER_START_LOCATION_DISTION) {
      throw new GuiguException(ResultCodeEnum.DRIVER_END_LOCATION_DISTION_ERROR);
   }

   //3.计算订单实际里程
   CompletableFuture<BigDecimal> realDistanceCompletableFuture = CompletableFuture.supplyAsync(() -> {
      BigDecimal realDistance = locationFeignClient.calculateOrderRealDistance(orderFeeForm.getOrderId()).getData();
      log.info("结束代驾,订单实际里程:{}", realDistance);
      return realDistance;
   }, threadPoolExecutor);


   //4.计算代驾实际费用
   CompletableFuture<FeeRuleResponseVo> feeRuleResponseVoCompletableFuture = realDistanceCompletableFuture.thenApplyAsync((realDistance)->{
      FeeRuleRequestForm feeRuleRequestForm = new FeeRuleRequestForm();
      feeRuleRequestForm.setDistance(realDistance);
      feeRuleRequestForm.setStartTime(orderInfo.getStartServiceTime());
      //等候时间
      Integer waitMinute = Math.abs((int) ((orderInfo.getArriveTime().getTime() - orderInfo.getAcceptTime().getTime()) / (1000 * 60)));
      feeRuleRequestForm.setWaitMinute(waitMinute);
      log.info("结束代驾,费用参数:{}", JSON.toJSONString(feeRuleRequestForm));
      FeeRuleResponseVo feeRuleResponseVo = feeRuleFeignClient.calculateOrderFee(feeRuleRequestForm).getData();
      log.info("费用明细:{}", JSON.toJSONString(feeRuleResponseVo));
      //订单总金额 需加上 路桥费、停车费、其他费用、乘客好处费
      BigDecimal totalAmount = feeRuleResponseVo.getTotalAmount().add(orderFeeForm.getTollFee()).add(orderFeeForm.getParkingFee()).add(orderFeeForm.getOtherFee()).add(orderInfo.getFavourFee());
      feeRuleResponseVo.setTotalAmount(totalAmount);
      return feeRuleResponseVo;
   });

   //5.计算系统奖励
   //5.1.获取订单数
   CompletableFuture<Long> orderNumCompletableFuture = CompletableFuture.supplyAsync(() -> {
      String startTime = new DateTime(orderInfo.getStartServiceTime()).toString("yyyy-MM-dd") + " 00:00:00";
      String endTime = new DateTime(orderInfo.getStartServiceTime()).toString("yyyy-MM-dd") + " 24:00:00";
      Long orderNum = orderInfoFeignClient.getOrderNumByTime(startTime, endTime).getData();
      return orderNum;
   }, threadPoolExecutor);
   //5.2.封装参数
   CompletableFuture<RewardRuleResponseVo> rewardRuleResponseVoCompletableFuture = orderNumCompletableFuture.thenApplyAsync((orderNum)->{
      RewardRuleRequestForm rewardRuleRequestForm = new RewardRuleRequestForm();
      rewardRuleRequestForm.setStartTime(orderInfo.getStartServiceTime());
      rewardRuleRequestForm.setOrderNum(orderNum);
      //5.3.执行
      RewardRuleResponseVo rewardRuleResponseVo = rewardRuleFeignClient.calculateOrderRewardFee(rewardRuleRequestForm).getData();
      log.info("结束代驾,系统奖励:{}", JSON.toJSONString(rewardRuleResponseVo));
      return rewardRuleResponseVo;
   });

   //6.计算分账信息
   CompletableFuture<ProfitsharingRuleResponseVo> profitsharingRuleResponseVoCompletableFuture = feeRuleResponseVoCompletableFuture.thenCombineAsync(orderNumCompletableFuture, (feeRuleResponseVo, orderNum)->{
      ProfitsharingRuleRequestForm profitsharingRuleRequestForm = new ProfitsharingRuleRequestForm();
      profitsharingRuleRequestForm.setOrderAmount(feeRuleResponseVo.getTotalAmount());
      profitsharingRuleRequestForm.setOrderNum(orderNum);
      ProfitsharingRuleResponseVo profitsharingRuleResponseVo = profitsharingRuleFeignClient.calculateOrderProfitsharingFee(profitsharingRuleRequestForm).getData();
      log.info("结束代驾,分账信息:{}", JSON.toJSONString(profitsharingRuleResponseVo));
      return profitsharingRuleResponseVo;
   });
   CompletableFuture.allOf(orderInfoCompletableFuture,
         realDistanceCompletableFuture,
         feeRuleResponseVoCompletableFuture,
         orderNumCompletableFuture,
         rewardRuleResponseVoCompletableFuture,
         profitsharingRuleResponseVoCompletableFuture
   ).join();

   //获取执行结果
   BigDecimal realDistance = realDistanceCompletableFuture.get();
   FeeRuleResponseVo feeRuleResponseVo = feeRuleResponseVoCompletableFuture.get();
   RewardRuleResponseVo rewardRuleResponseVo = rewardRuleResponseVoCompletableFuture.get();
   ProfitsharingRuleResponseVo profitsharingRuleResponseVo = profitsharingRuleResponseVoCompletableFuture.get();

   //7.封装更新订单账单相关实体对象
   UpdateOrderBillForm updateOrderBillForm = new UpdateOrderBillForm();
   updateOrderBillForm.setOrderId(orderFeeForm.getOrderId());
   updateOrderBillForm.setDriverId(orderFeeForm.getDriverId());
   //路桥费、停车费、其他费用
   updateOrderBillForm.setTollFee(orderFeeForm.getTollFee());
   updateOrderBillForm.setParkingFee(orderFeeForm.getParkingFee());
   updateOrderBillForm.setOtherFee(orderFeeForm.getOtherFee());
   //乘客好处费
   updateOrderBillForm.setFavourFee(orderInfo.getFavourFee());

   //实际里程
   updateOrderBillForm.setRealDistance(realDistance);
   //订单奖励信息
   BeanUtils.copyProperties(rewardRuleResponseVo, updateOrderBillForm);
   //代驾费用信息
   BeanUtils.copyProperties(feeRuleResponseVo, updateOrderBillForm);

   //分账相关信息
   BeanUtils.copyProperties(profitsharingRuleResponseVo, updateOrderBillForm);
   updateOrderBillForm.setProfitsharingRuleId(profitsharingRuleResponseVo.getProfitsharingRuleId());
   log.info("结束代驾,更新账单信息:{}", JSON.toJSONString(updateOrderBillForm));

   //8.结束代驾更新账单
   orderInfoFeignClient.endDrive(updateOrderBillForm);
   return true;
}

使用 CompletableFuture 来进行一步任务编排,将串行的多个接口调用优化为并行的接口调用,最后只需要整合返回结果,以此来加快接口的响应速度


http://www.kler.cn/a/530941.html

相关文章:

  • Jenkins 触发构建的几种常见方式
  • K8S集群部署--亲测好用
  • deepseek本地部署会遇到哪些坑
  • 计算机网络 应用层 笔记1(C/S模型,P2P模型,FTP协议)
  • 可被electron等调用的Qt截图-录屏工具【源码开放】
  • 深入剖析 HTML5 新特性:语义化标签和表单控件完全指南
  • 简易CPU设计入门:指令单元(二)
  • Google C++ Style / 谷歌C++开源风格
  • 租房管理系统助力数字化转型提升租赁服务质量与用户体验
  • csapp笔记3.6节——控制(1)
  • 整形的存储形式和浮点型在计算机中的存储形式
  • 【Redis】安装配置Redis超详细教程 / Linux版
  • 集合通讯概览
  • 基于JMX实现消息队列监控
  • 动手学深度学习-3.2 线性回归的从0开始
  • 【数据结构-Trie树】力扣648. 单词替换
  • Kafka流式计算架构
  • Linux——进程间通信之SystemV共享内存
  • (回溯递归dfs 电话号码的字母组合 remake)leetcode 17
  • OpenCV4.8 开发实战系列专栏之 30 - OpenCV中的自定义滤波器
  • html中的列表元素
  • 全域旅游景区导览系统小程序独立部署
  • 使用冒泡排序模拟实现qsort函数
  • NeetCode刷题第20天(2025.2.1)
  • STM32 DMA+AD多通道
  • 音标-- 02-- 重音 音节 变音