当前位置: 首页 > article >正文

《异步编程之美》— 全栈修仙《Java 8 CompletableFuture 对比 ES6 Promise 以及Spring @Async》

   哈喽,大家好!在平常开发过程中会遇到许多意想不到的坑,本篇文章就记录在开发过程中遇到一些常见的问题,看了许多博主的异步编程,我只能说一言难尽。本文详细的讲解了异步编程之美,是不可多得的好文,建议细细品尝。文章末尾附有思维导图以及参考文档。

        首先,我们得弄明白什么异步?JavaScript 语言的执行环境是单线程的,异步编程对于 JavaScript 来说必不可少。JavaScript 传统异步解决方案主要是通过回调函数,而回调函数最大的问题就是 Callback Hell。所以 ES6 标准提供的 Promise 对象,专门用于解决异步编程的问题。而 Java 语言是一个支持多线程的语言,语法以同步为主,在实际开发中很少需要用到大量的异步编程。但是要想追求更高的性能,异步通常是更好的选择。例如 Servlet 3 的异步支持、Spring 5 提供的 Spring WebFlux 等,都是为了追求更高的性能。和 JavaScript 一样,传统的 Callback 方式处理 Java 异步也会有 Callback Hell 问题,所以在 Java 8 中新增了和 ES6 的 Promise 类似的对象: java.util.concurrent.CompletableFuture 。

        其次,类似与前端 Promise 代表 异步对象,类似Java中的 CompletableFuture。Promise 是现代 JavaScript 中异步编程的基础,是一个由异步函数返回的可以向我们指示当前操作所处的状态的对象。在 Promise 返回给调用者的时候,操作往往还没有完成,但 Promise 对象可以让我们操作最终完成时对其进行处理(无论成功还是失败)。

        最后,@Async是Spring提供的一个异步注解,默认采用SimpleAsyncTaskExecutor线程池,该线程池不是真正意义上的线程池。使用此线程池无法实现线程重用,每次调用都会新建一条线程。若系统中不断的创建线程,最终会导致系统占用内存过高,引发OutOfMemoryError错误。所以我们得自定义线程池

public void execute(Runnable task, long startTimeout) {
  Assert.notNull(task, "Runnable must not be null");
  Runnable taskToUse = this.taskDecorator != null ? this.taskDecorator.decorate(task) : task;
  //判断是否开启限流,默认为否
  if (this.isThrottleActive() && startTimeout > 0L) {
    //执行前置操作,进行限流
    this.concurrencyThrottle.beforeAccess();
    this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(taskToUse));
  } else {
    //未限流的情况,执行线程任务
    this.doExecute(taskToUse);
  }

}

protected void doExecute(Runnable task) {
  //不断创建线程
  Thread thread = this.threadFactory != null ? this.threadFactory.newThread(task) : this.createThread(task);
  thread.start();
}

//创建线程
public Thread createThread(Runnable runnable) {
  //指定线程名,task-1,task-2...
  Thread thread = new Thread(this.getThreadGroup(), runnable, this.nextThreadName());
  thread.setPriority(this.getThreadPriority());
  thread.setDaemon(this.isDaemon());
  return thread;
}
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * 描述:异步配置2
 * 
 */
@Configuration
@EnableAsync    // 可放在启动类上或单独的配置类
public class AsyncConfiguration2 {
    @Bean(name = "asyncPoolTaskExecutor")
    public ThreadPoolTaskExecutor executor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //核心线程数
        taskExecutor.setCorePoolSize(10);
        //线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        taskExecutor.setMaxPoolSize(100);
        //缓存队列
        taskExecutor.setQueueCapacity(50);
        //许的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
        taskExecutor.setKeepAliveSeconds(200);
        //异步方法内部线程名称
        taskExecutor.setThreadNamePrefix("async-");
        /**
         * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
         * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
         * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
         */
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    }
}
//使用异步编程操作大量数据 

List<Long> collect = examSysQuestionReals.stream()
        .map(i -> CompletableFuture.supplyAsync(i::getId))
        .map(CompletableFuture::join)
        .collect(Collectors.toList());
        
 /** 与普通stream.map()对比
 优化后的代码中,stream.map() 分为两步:
第一次 map():将每个 examSysQuestionReal 对象转换为一个 CompletableFuture,该异步任务在后台线程(通常是线程池中的线程)中执行 i.getId() 方法。这意味着多个 getId() 操作可以同时进行,实现了并行处理。
第二次 map():对上一步产生的 CompletableFuture 列表调用 join() 方法,等待所有异步任务完成并获取它们的结果。这些结果随后被收集到一个新的 List<Long> 中。
区别总结:
执行模式:直接使用 stream.map() 时,映射操作是同步且顺序执行的;优化后的代码则利用 CompletableFuture 实现了异步并行执行。
性能:对于耗时的 getId() 操作(例如涉及数据库查询或其他远程服务调用),优化后的代码能利用多核处理器的能力,通过并行处理显著减少整体处理时间。而直接使用 stream.map() 只能在单一线程中顺序执行,无法利用多核优势,处理大量数据或耗时操作时可能效率较低。
资源消耗:优化后的代码引入了异步任务,可能增加线程池的使用和上下文切换的开销。然而,只要 getId() 操作的并行效益超过这些开销,总体上仍能提高性能。直接使用 stream.map() 无需额外的线程池资源,但在处理大规模或耗时任务时可能会因无法并行而显得低效。
综上所述,直接使用 stream.map() 适用于同步、轻量级且无需并行处理的场景。而优化后的代码结合 CompletableFuture 更适合处理可能耗时、受益于并行计算的任务,以提高程序的整体执行效率。在实际应用中,应根据具体需求和性能指标选择合适的方法
 /
 
//进阶,@Async与CompletableFuture
/**
在这个示例中,performComplexAsyncTask() 方法被标记为 @Async,由 Spring 异步执行。方法内部使用 CompletableFuture 实现了多个步骤的异步任务创建、组合和结果处理。这样既利用了 Spring 的异步方法抽象,又充分利用了 CompletableFuture 的灵活性和控制力。
总结来说,CompletableFuture 和 @Async 可以分别进行练习,然后在实践中结合使用,以适应不同复杂度和需求的异步编程场景。
/
@Service
public class AsyncService {

    @Async
    public CompletableFuture<String> performComplexAsyncTask(String input) {
        // Step 1: 异步获取数据
        CompletableFuture<String> dataFuture = CompletableFuture.supplyAsync(() -> {
            // 这里可能是耗时的数据库查询或网络请求
            return fetchData(input);
        });

        // Step 2: 异步处理数据,依赖于第一步的结果
        CompletableFuture<String> processedDataFuture = dataFuture.thenApply(this::processData);

        // Step 3: 异步发送通知,不依赖于前两步的结果,可以并发执行
        CompletableFuture<Void> notificationFuture = CompletableFuture.runAsync(() -> {
            sendNotification();
        });

        // Step 4: 等待所有异步操作完成
        return CompletableFuture.allOf(processedDataFuture, notificationFuture)
                .thenApply(unused -> {
                    // 返回最终处理结果或相关信息
                    return processedDataFuture.join();
                });
    }

    // 其他方法:fetchData(), processData(), sendNotification()
}

 /**
 在实际应用中,CompletableFuture 和 @Async 可以结合使用,发挥各自的优势。
使用 @Async 注解标记那些业务逻辑相对独立、适合作为单独任务执行的方法。这有助于简化代码结构,将异步处理逻辑从业务逻辑中分离出来。
在 @Async 方法内部,可以使用 CompletableFuture 构建更复杂的异步流程,如组合多个异步操作、处理中间结果等。这种方法结合了 Spring 的高级抽象与 CompletableFuture 的强大功能。
/  

现在我们从线程池配置类实战一次



import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 配置线程池
 * 
 */
@Configuration
@EnableAsync
public class ExecutorConfig {


	/* 获取快递单号线程池,量大 配置 start */
    // 维持最小的线程数
    private int corePoolSizeAutoTrace = 10;
    // 最大的活动线程数
    private int maxPoolSizeAutoTrace = 50;
    // 排队的线程数
    private int queueCapacityAutoTrace = maxPoolSizeAutoTrace * 2;
    // 线程长时间闲置关闭的时间,单位秒
    private int keepAliveSecondsAutoTrace = 1 * 60 * 60;

    private String asyncTask = "asyncTask-";
    private String eventExecute = "eventExecute-";

    private String pushStatus = "pushStatus-";

    // 异常记录
    private int recordCoreSize = 5;
    // 最大的活动线程数
    private int recordMaxPoolSize = 10;


    // 异常记录
    private int orderCoreSize = 10;
    // 最大的活动线程数
    private int orderMaxPoolSize = 100;

    /**
     * 异步任务线程池
     * @return 执行器
     */
    @Bean
    public Executor asyncTask() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSizeAutoTrace);
        executor.setMaxPoolSize(maxPoolSizeAutoTrace);
        executor.setQueueCapacity(queueCapacityAutoTrace);
        executor.setThreadNamePrefix(asyncTask);
        executor.setKeepAliveSeconds(keepAliveSecondsAutoTrace);
        executor.setThreadGroupName(asyncTask);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    @Bean
    public Executor eventExecute() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSizeAutoTrace);
        executor.setMaxPoolSize(maxPoolSizeAutoTrace);
        executor.setQueueCapacity(queueCapacityAutoTrace);
        executor.setThreadNamePrefix(eventExecute);
        executor.setKeepAliveSeconds(keepAliveSecondsAutoTrace);
        executor.setThreadGroupName(eventExecute);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    @Bean
    public Executor pushStatus() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSizeAutoTrace);
        executor.setMaxPoolSize(maxPoolSizeAutoTrace);
        executor.setQueueCapacity(queueCapacityAutoTrace);
        executor.setThreadNamePrefix(pushStatus);
        executor.setKeepAliveSeconds(keepAliveSecondsAutoTrace);
        executor.setThreadGroupName(pushStatus);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    @Bean
    public Executor addOperationLogList() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSizeAutoTrace);
        executor.setMaxPoolSize(maxPoolSizeAutoTrace);
        executor.setQueueCapacity(queueCapacityAutoTrace);
        executor.setThreadNamePrefix("addOperationLog-");
        executor.setKeepAliveSeconds(keepAliveSecondsAutoTrace);
        executor.setThreadGroupName("addOperationLog-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    /**
     * 新增托运单时,添加单据收发方信息
     * @return
     */
    @Bean
    public Executor addBillLogisticsInfo() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSizeAutoTrace);
        executor.setMaxPoolSize(maxPoolSizeAutoTrace);
        executor.setQueueCapacity(queueCapacityAutoTrace);
        executor.setThreadNamePrefix("addBillLogisticsInfo-");
        executor.setKeepAliveSeconds(keepAliveSecondsAutoTrace);
        executor.setThreadGroupName("addBillLogisticsInfo-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    /**
     * 派车单异常记录
     */
    @Bean(name = "recordBoxExecutor")
    public Executor recordBoxExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(recordCoreSize);
        executor.setMaxPoolSize(recordMaxPoolSize);
        executor.setQueueCapacity(queueCapacityAutoTrace);
        executor.setThreadNamePrefix("addTmsBillRecordBoxDtl-");
        executor.setKeepAliveSeconds(keepAliveSecondsAutoTrace);
        executor.setThreadGroupName("addTmsBillRecordBoxDtl-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    /**
     * 运单回写揽收线程池
     */
    @Bean(name = "orderHandExecutor")
    public Executor orderHandExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSizeAutoTrace);
        executor.setMaxPoolSize(maxPoolSizeAutoTrace);
        executor.setQueueCapacity(queueCapacityAutoTrace);
        executor.setThreadNamePrefix("orderHandExecutor-");
        executor.setKeepAliveSeconds(keepAliveSecondsAutoTrace);
        executor.setThreadGroupName("orderHandExecutor-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    /**
     * 运单回写签收线程池
     */
    @Bean(name = "orderSignExecutor")
    public Executor orderSignExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSizeAutoTrace);
        executor.setMaxPoolSize(maxPoolSizeAutoTrace);
        executor.setQueueCapacity(queueCapacityAutoTrace);
        executor.setThreadNamePrefix("orderSignExecutor-");
        executor.setKeepAliveSeconds(keepAliveSecondsAutoTrace);
        executor.setThreadGroupName("orderSignExecutor-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

在代码中使用

PS:线程安全问题及解决方法

参考文档:

网址:CompletableFuture 指南 |贝尔东 (baeldung.com)

思维图解:CompletableFuture的使用| ProcessOn免费在线作图,在线流程图,在线思维导图

优雅处理并发:Java CompletableFuture最佳实践 - 个人文章 - SegmentFault 思否

Java 8 CompletableFuture 对比 ES6 Promise | 叉叉哥的BLOG (xxgblog.com)

SpringBoot 实现异步调用@Async | 以及使用@Async注解可能会导致的问题_springboot @async异步类被aop拦截会报什么错误-CSDN博客

 线上调优:接口响应慢?那是你没用 CompletableFuture 来优化!

一次真实生产事故,让我总结了线程池的正确使用方式 (qq.com)


http://www.kler.cn/a/502953.html

相关文章:

  • 【HTML+CSS+JS+VUE】web前端教程-31-css3新特性
  • 论文笔记(六十一)Implicit Behavioral Cloning
  • Vue2+OpenLayers调用WMTS服务初始化天地图示例(提供Gitee源码)
  • 【机器学习案列】学生抑郁可视化及预测分析
  • 二分查找算法——山脉数组的峰顶索引
  • 基于springboot+vue+微信小程序的宠物领养系统
  • pytorch小记(七):pytorch中的保存/加载模型操作
  • 深入理解 Java 设计模式之策略模式
  • 简单组合逻辑
  • 【机器学习】神经网络(BP算法)含具体计算过程
  • <C++学习>C++ std 多线程教程
  • git仓库中提交上去了.idea文件夹内容怎么办?
  • E12.【C语言】练习:求两个数的最大公约数
  • 新时期下k8s 网络插件calico 安装
  • 服务器登陆后有java变量
  • unity打包sdk热更新笔记
  • 氧化铌在光学领域的独特贡献与应用拓展-京煌科技
  • Android Room 报错:too many SQL variables (code 1 SQLITE_ERROR) 原因及解决方法
  • 【Qt】QWidget核心属性2(windowOpacity、cursor、font、toolTip、focusPolicy、styleSheet)
  • Vue2:el-table 最后一列的操作按钮不换行,按钮过多时展示【更多】
  • 掌握 React 关键:理解 super () 和 super (props) 的不同应用
  • (PVG)Periodic Vibration Gaussian:自动驾驶过程中的三维重建 论文解读
  • 3.Qt Quick-QML地图引擎之v4.3版本(新增动态轨迹线/海图/天地图街道/天地图卫星)
  • java fastjson2 解析JSON用法解析
  • [3D] 3D雷达天眼监控系统:打造智能城市的安全防线
  • 多模态论文笔记——BLIP2