多个线程处理不同的数据,等线程都完成后再进行下一步操作
现在有三个任务,三个任务之间没有关联关系,但是第四个任务要等前三个完成之后才能进行,于是使用多线程完成前三个任务节省时间
示例代码:
public void saveDataByOnlineTimeNew(LocalDateTime startTime, LocalDateTime endTime) {
Objects.requireNonNull(startTime, "开始时间不能为空");
Objects.requireNonNull(endTime, "结束时间不能为空");
List<User> users = baseMapper.selectAllRightUser(startTime, endTime);
if (users.isEmpty()) {
return;
}
List<Integer> userIdList = users.stream().map(User::getId).collect(Collectors.toList());
// 创建三个 CompletableFuture 分别处理三个列表
CompletableFuture<List<ProUserStatisticsNew>> future1 = CompletableFuture.supplyAsync(() -> {
List<ProUserStatisticsNew> proUserStatisticsNews = saveDataByOnlineTime(startTime, endTime, userIdList);
users.parallelStream().forEach(user -> {
log.trace("当前线程名字:"+Thread.currentThread().getName());
List<ProUserStatisticsNew> versionNotInRelationList = this.saveDataByOnlineTimeByUser(startTime, endTime, user);
if (!versionNotInRelationList.isEmpty()) {
proUserStatisticsNews.addAll(versionNotInRelationList);
}
});
return proUserStatisticsNews;
});
CompletableFuture<List<ProUserStatisticsNew>> future2 = CompletableFuture.supplyAsync(() -> saveNoVersionDataByOnlineTime(startTime, endTime, userIdList));
CompletableFuture<List<ProUserStatisticsNew>> future3 = CompletableFuture.supplyAsync(() -> saveLearnDataByOnlineTime(startTime, endTime, userIdList));
// 等待所有 CompletableFuture 完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
// 等待所有 CompletableFuture 完成后,进行合并操作
CompletableFuture<List<ProUserStatisticsNew>> join = allFutures.thenApplyAsync(v ->
mergeLists(future1.join(), future2.join(), future3.join())
);
List<ProUserStatisticsNew> result = join.join();
//List<ProUserStatisticsNew> result = mergeLists(proUserStatisticsNews, noVersionDataList, learnDataList);
SysConfigTime sysConfigTimeByTime = SysConfigTimeUtil.getSysConfigTimeByTimeForQuery(endTime);
result.parallelStream().forEach(one -> {
one.setRecordDate(sysConfigTimeByTime.getYearmonth());
one.setProjectHoursDifference(one.getWorkHours().subtract(one.getActualHours()));
});
// 批量保存或更新
int batchSize = 100; // 根据具体要求和系统能力设置适当的批处理大小
for (int i = 0; i < result.size(); i += batchSize) {
int end = Math.min(result.size(), i + batchSize);
this.saveOrUpdateBatch(result.subList(i, end));
}
}
代码解析:
这段代码实现了一个方法 `saveDataByOnlineTimeNew`,它通过多个异步任务(使用 `CompletableFuture`)并行处理数据,并最终将结果批量保存或更新到数据库中。具体的逻辑流程可以分为几个部分,下面我会逐步解释每个部分。
### 方法说明
- **输入参数**: `startTime` 和 `endTime` 是查询的开始和结束时间(`LocalDateTime` 类型),指定了需要查询的数据范围。
- **核心目标**: 这个方法的目标是根据 `startTime` 和 `endTime` 获取相关数据,并将结果合并后保存或更新到数据库。
### 代码分析
1. **非空检查**:
```java
Objects.requireNonNull(startTime, "开始时间不能为空");
Objects.requireNonNull(endTime, "结束时间不能为空");
```
这两行代码确保 `startTime` 和 `endTime` 不能为空。如果为 `null`,将抛出 `NullPointerException`,并带有指定的错误信息。
2. **获取用户列表**:
```java
List<User> users = baseMapper.selectAllRightUser(startTime, endTime);
if (users.isEmpty()) {
return;
}
```
这里通过调用 `baseMapper.selectAllRightUser(startTime, endTime)` 查询所有符合条件的用户。如果查询结果为空,则直接返回,不进行后续处理。
3. **获取用户 ID 列表**:
```java
List<Integer> userIdList = users.stream().map(User::getId).collect(Collectors.toList());
```
使用 `stream` 获取所有用户的 ID,方便后续操作。
4. **创建多个 `CompletableFuture`**:
```java
CompletableFuture<List<ProUserStatisticsNew>> future1 = CompletableFuture.supplyAsync(() -> {
// 异步任务1
});
CompletableFuture<List<ProUserStatisticsNew>> future2 = CompletableFuture.supplyAsync(() -> saveNoVersionDataByOnlineTime(startTime, endTime, userIdList));
CompletableFuture<List<ProUserStatisticsNew>> future3 = CompletableFuture.supplyAsync(() -> saveLearnDataByOnlineTime(startTime, endTime, userIdList));
```
创建了三个异步任务,每个任务执行不同的操作:
- **`future1`**:调用 `saveDataByOnlineTime` 来保存基于在线时间的数据,然后对每个用户并行处理数据,并将结果合并。
- **`future2`**:调用 `saveNoVersionDataByOnlineTime` 来保存没有版本数据的数据。
- **`future3`**:调用 `saveLearnDataByOnlineTime` 来保存学习数据。
5. **等待所有异步任务完成**:
```java
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
```
使用 `CompletableFuture.allOf()` 来等待所有的 `future1`, `future2`, `future3` 完成。`allOf` 返回一个 `CompletableFuture<Void>`,表明所有任务已完成。
6. **合并结果**:
```java
CompletableFuture<List<ProUserStatisticsNew>> join = allFutures.thenApplyAsync(v ->
mergeLists(future1.join(), future2.join(), future3.join())
);
```
当所有的异步任务完成后,调用 `thenApplyAsync` 继续处理合并操作。`join` 方法会阻塞直到每个 `CompletableFuture` 返回结果,合并三个任务的结果列表。
7. **结果后处理**:
```java
List<ProUserStatisticsNew> result = join.join();
SysConfigTime sysConfigTimeByTime = SysConfigTimeUtil.getSysConfigTimeByTimeForQuery(endTime);
result.parallelStream().forEach(one -> {
one.setRecordDate(sysConfigTimeByTime.getYearmonth());
one.setProjectHoursDifference(one.getWorkHours().subtract(one.getActualHours()));
});
```
- `join.join()` 等待合并操作完成,获得最终的 `result` 列表。
- 使用 `SysConfigTimeUtil.getSysConfigTimeByTimeForQuery(endTime)` 获取系统配置的时间信息。
- 对 `result` 列表中的每一项,设置其记录日期(`setRecordDate`)和工作时间差(`setProjectHoursDifference`)。
8. **批量保存或更新**:
```java
int batchSize = 100;
for (int i = 0; i < result.size(); i += batchSize) {
int end = Math.min(result.size(), i + batchSize);
this.saveOrUpdateBatch(result.subList(i, end));
}
```
为了避免一次性将大量数据写入数据库,采用批处理的方式分批保存或更新数据。每次处理 `batchSize` 条记录(这里设定为100),直到所有记录处理完。
### 关键点总结
- **异步任务并行执行**:使用 `CompletableFuture.supplyAsync()` 创建三个并行任务来处理数据,从而加快整个处理过程。
- **合并操作**:通过 `thenApplyAsync` 合并三个异步任务的结果。
- **批量操作**:为了提高性能,使用批量保存或更新的方法 `saveOrUpdateBatch`,避免一次性提交过多数据。
### 性能优化
- 该方法使用了并行流 (`parallelStream()`) 和异步执行(`CompletableFuture`),这些可以显著提升性能,尤其是对于大规模数据的处理。
- 批量操作减少了数据库访问的次数,提高了数据库的写入效率。
总的来说,这段代码实现了一个高效的数据处理和保存机制,使用了现代 Java 特性(如异步编程和流操作)来优化性能。