当前位置: 首页 > article >正文

多个线程处理不同的数据,等线程都完成后再进行下一步操作

现在有三个任务,三个任务之间没有关联关系,但是第四个任务要等前三个完成之后才能进行,于是使用多线程完成前三个任务节省时间

示例代码:

public void saveDataByOnlineTimeNew(LocalDateTime startTime, LocalDateTime endTime) {
        Objects.requireNonNull(startTime, "开始时间不能为空");
        Objects.requireNonNull(endTime, "结束时间不能为空");

        List<User> users = baseMapper.selectAllRightUser(startTime, endTime);

        if (users.isEmpty()) {
            return;
        }

        List<Integer> userIdList = users.stream().map(User::getId).collect(Collectors.toList());

        // 创建三个 CompletableFuture 分别处理三个列表
        CompletableFuture<List<ProUserStatisticsNew>> future1 = CompletableFuture.supplyAsync(() -> {
            List<ProUserStatisticsNew> proUserStatisticsNews = saveDataByOnlineTime(startTime, endTime, userIdList);

            users.parallelStream().forEach(user -> {

                log.trace("当前线程名字:"+Thread.currentThread().getName());
                List<ProUserStatisticsNew> versionNotInRelationList = this.saveDataByOnlineTimeByUser(startTime, endTime, user);

                if (!versionNotInRelationList.isEmpty()) {
                    proUserStatisticsNews.addAll(versionNotInRelationList);
                }
            });
            return proUserStatisticsNews;
        });
        CompletableFuture<List<ProUserStatisticsNew>> future2 = CompletableFuture.supplyAsync(() -> saveNoVersionDataByOnlineTime(startTime, endTime, userIdList));
        CompletableFuture<List<ProUserStatisticsNew>> future3 = CompletableFuture.supplyAsync(() -> saveLearnDataByOnlineTime(startTime, endTime, userIdList));

        // 等待所有 CompletableFuture 完成
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);

        // 等待所有 CompletableFuture 完成后,进行合并操作
        CompletableFuture<List<ProUserStatisticsNew>> join = allFutures.thenApplyAsync(v ->
                mergeLists(future1.join(), future2.join(), future3.join())
        );

        List<ProUserStatisticsNew> result = join.join();


        //List<ProUserStatisticsNew> result = mergeLists(proUserStatisticsNews, noVersionDataList, learnDataList);

        SysConfigTime sysConfigTimeByTime = SysConfigTimeUtil.getSysConfigTimeByTimeForQuery(endTime);

        result.parallelStream().forEach(one -> {
            one.setRecordDate(sysConfigTimeByTime.getYearmonth());
            one.setProjectHoursDifference(one.getWorkHours().subtract(one.getActualHours()));
        });

        // 批量保存或更新
        int batchSize = 100; // 根据具体要求和系统能力设置适当的批处理大小
        for (int i = 0; i < result.size(); i += batchSize) {
            int end = Math.min(result.size(), i + batchSize);
            this.saveOrUpdateBatch(result.subList(i, end));
        }
    }

代码解析:

这段代码实现了一个方法 `saveDataByOnlineTimeNew`,它通过多个异步任务(使用 `CompletableFuture`)并行处理数据,并最终将结果批量保存或更新到数据库中。具体的逻辑流程可以分为几个部分,下面我会逐步解释每个部分。

### 方法说明

- **输入参数**: `startTime` 和 `endTime` 是查询的开始和结束时间(`LocalDateTime` 类型),指定了需要查询的数据范围。
  
- **核心目标**: 这个方法的目标是根据 `startTime` 和 `endTime` 获取相关数据,并将结果合并后保存或更新到数据库。

### 代码分析

1. **非空检查**:
   ```java
   Objects.requireNonNull(startTime, "开始时间不能为空");
   Objects.requireNonNull(endTime, "结束时间不能为空");
   ```
   这两行代码确保 `startTime` 和 `endTime` 不能为空。如果为 `null`,将抛出 `NullPointerException`,并带有指定的错误信息。

2. **获取用户列表**:
   ```java
   List<User> users = baseMapper.selectAllRightUser(startTime, endTime);
   if (users.isEmpty()) {
       return;
   }
   ```
   这里通过调用 `baseMapper.selectAllRightUser(startTime, endTime)` 查询所有符合条件的用户。如果查询结果为空,则直接返回,不进行后续处理。

3. **获取用户 ID 列表**:
   ```java
   List<Integer> userIdList = users.stream().map(User::getId).collect(Collectors.toList());
   ```
   使用 `stream` 获取所有用户的 ID,方便后续操作。

4. **创建多个 `CompletableFuture`**:
   ```java
   CompletableFuture<List<ProUserStatisticsNew>> future1 = CompletableFuture.supplyAsync(() -> {
       // 异步任务1
   });
   CompletableFuture<List<ProUserStatisticsNew>> future2 = CompletableFuture.supplyAsync(() -> saveNoVersionDataByOnlineTime(startTime, endTime, userIdList));
   CompletableFuture<List<ProUserStatisticsNew>> future3 = CompletableFuture.supplyAsync(() -> saveLearnDataByOnlineTime(startTime, endTime, userIdList));
   ```
   创建了三个异步任务,每个任务执行不同的操作:

   - **`future1`**:调用 `saveDataByOnlineTime` 来保存基于在线时间的数据,然后对每个用户并行处理数据,并将结果合并。
   - **`future2`**:调用 `saveNoVersionDataByOnlineTime` 来保存没有版本数据的数据。
   - **`future3`**:调用 `saveLearnDataByOnlineTime` 来保存学习数据。

5. **等待所有异步任务完成**:
   ```java
   CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
   ```
   使用 `CompletableFuture.allOf()` 来等待所有的 `future1`, `future2`, `future3` 完成。`allOf` 返回一个 `CompletableFuture<Void>`,表明所有任务已完成。

6. **合并结果**:
   ```java
   CompletableFuture<List<ProUserStatisticsNew>> join = allFutures.thenApplyAsync(v ->
       mergeLists(future1.join(), future2.join(), future3.join())
   );
   ```
   当所有的异步任务完成后,调用 `thenApplyAsync` 继续处理合并操作。`join` 方法会阻塞直到每个 `CompletableFuture` 返回结果,合并三个任务的结果列表。

7. **结果后处理**:
   ```java
   List<ProUserStatisticsNew> result = join.join();
   SysConfigTime sysConfigTimeByTime = SysConfigTimeUtil.getSysConfigTimeByTimeForQuery(endTime);
   result.parallelStream().forEach(one -> {
       one.setRecordDate(sysConfigTimeByTime.getYearmonth());
       one.setProjectHoursDifference(one.getWorkHours().subtract(one.getActualHours()));
   });
   ```
   - `join.join()` 等待合并操作完成,获得最终的 `result` 列表。
   - 使用 `SysConfigTimeUtil.getSysConfigTimeByTimeForQuery(endTime)` 获取系统配置的时间信息。
   - 对 `result` 列表中的每一项,设置其记录日期(`setRecordDate`)和工作时间差(`setProjectHoursDifference`)。

8. **批量保存或更新**:
   ```java
   int batchSize = 100;
   for (int i = 0; i < result.size(); i += batchSize) {
       int end = Math.min(result.size(), i + batchSize);
       this.saveOrUpdateBatch(result.subList(i, end));
   }
   ```
   为了避免一次性将大量数据写入数据库,采用批处理的方式分批保存或更新数据。每次处理 `batchSize` 条记录(这里设定为100),直到所有记录处理完。

### 关键点总结

- **异步任务并行执行**:使用 `CompletableFuture.supplyAsync()` 创建三个并行任务来处理数据,从而加快整个处理过程。
- **合并操作**:通过 `thenApplyAsync` 合并三个异步任务的结果。
- **批量操作**:为了提高性能,使用批量保存或更新的方法 `saveOrUpdateBatch`,避免一次性提交过多数据。

### 性能优化
- 该方法使用了并行流 (`parallelStream()`) 和异步执行(`CompletableFuture`),这些可以显著提升性能,尤其是对于大规模数据的处理。
- 批量操作减少了数据库访问的次数,提高了数据库的写入效率。

总的来说,这段代码实现了一个高效的数据处理和保存机制,使用了现代 Java 特性(如异步编程和流操作)来优化性能。


http://www.kler.cn/a/460457.html

相关文章:

  • OkHttp接口自动化测试
  • AI 助力游戏开发中的常用算法实现
  • 关于使用vue-cropperjs上传一张图后,再次上传时,裁剪的图片不更新的问题
  • Android 系统 ActivityManager 系统层深度定制
  • 【ArcGISPro/GeoScenePro】检查并处理高程数据
  • Lumos学习王佩丰Excel第二十二讲:制作甘特图与动态甘特图
  • 百度热力图数据获取,原理,处理及论文应用
  • 【记录】vue 添加全局 dialog 弹框
  • .net core 的正则表达式
  • 数据挖掘笔记 | 插值 | 拉格朗日插值 | 龙格现象 | 埃尔米特插值 | 分段三次埃尔米特插值
  • Appium2.0:发生了哪些重大变化?
  • Linux umami网站统计工具自定义API开发
  • 科技云报到:洞见2025年科技潮流,技术大融合开启“智算时代”
  • 计算机网络——网络安全_计算机网络安全
  • 【Java 新特性】常用函数式接口
  • npm istall 卡住的结解决方法
  • React之从0开始(2)
  • Linux 安全加固的10个常用脚本
  • 数据结构(链式栈)
  • 【玩转23种Java设计模式】行为型模式篇:命令模式
  • 二十三种设计模式-单例模式
  • FQ-GAN代码解析
  • HarmonyOS-面试整理
  • Day2 微服务 网关路由转发、网关登录校验、配置管理
  • 小程序基础 —— 07 创建小程序项目
  • 基于Flask后端框架的均值填充