MyBatis-Plus批量保存与多线程保存比较
在日常开发中经常会涉及大量数据保存的情况,之前就是使用saveBatch的方式,直接放一个list进去,看到一篇关于MyBatisPlus批量保存saveBatch的文章,里面对saveBatch进行了数据量的保存测试,还有解析rewriteBatchedStatements=true 的作用,但测试的批次和对比比较少,所以又对各种方式的保存性能进行分析,通过逐个插入,多线程插入,批量插入,多线程批量插入的方式,比较具体的差异情况。
1. 测试前的数据准备
为了保证足够的数据量,每次筛选出5000条数据进行插入,每条数据具有11个字段,大约100个字节,总体数据大小约500KB。
逐个保存方案:遍历5000条数据,逐个使用save方式进行保存。
多线程逐个保存方案:新建线程池,其中线程数量为5个,遍历5000条数据时每次新建一个任务扔到线程池中进行处理,线程使用save的方式进行保存。
saveBatch方案:不设置saveBatch的batchSize参数,直接将5000条数据的list放入方法中进行批量保存。
多线程saveBatch方案:新建线程池,其中线程数量为5个,遍历5000条数据时将数据均分成5个list,分别放到线程池中进行执行,线程使用saveBatch的方式直接将1000条数据进行批量保存。
以上多线程的执行方式采用submit有future的返回方式,任务放入线程池后保存future对象,后续手动进行get请求,保证计时内的任务都执行完毕。因为如果使用这种异步方式直接保存,计时器只会统计扔到线程池的时间,大概5ms就能结束,不具备参考的意义。
测试前的代码内容
方案1:逐个保存方案
public String dbDataTest() {
// 获取能源数据列表
List<Energy> dataList = getEnergy();
// 记录开始时间
long start = System.currentTimeMillis();
// 遍历数据列表,将每个能源数据转换为能源测试数据并保存到数据库
for (Energy energy : dataList) {
// 创建能源测试数据对象
EnergyTest test = EnergyTest.builder().id(energy.getId()).buildId(energy.getBuildId()).buildName(energy.getBuildName())
.collectionTime(energy.getCollectionTime()).dataType(energy.getDataType()).usageData(energy.getUsageData())
.meterId(energy.getMeterId()).meterName(energy.getMeterName()).meterType(energy.getMeterType())
.reading(energy.getReading()).totalSurface(energy.getTotalSurface()).build();
// 将能源测试数据保存到数据库
energyTestService.save(test);
}
// 记录结束时间
long end = System.currentTimeMillis();
// 返回执行时间
return end - start + "ms";
}
方案2:多线程逐个保存方案
public String dbDataTest2() throws ExecutionException, InterruptedException {
// 获取能源数据列表
List<Energy> dataList = getEnergy();
// 记录开始时间
long start = System.currentTimeMillis();
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
// 创建一个用于存储异步任务执行结果的列表
List<Future<?>> futures = new ArrayList<>();
for (Energy energy : dataList) {
// 创建能源测试数据对象
EnergyTest test = EnergyTest.builder().id(energy.getId()).buildId(energy.getBuildId()).buildName(energy.getBuildName())
.collectionTime(energy.getCollectionTime()).dataType(energy.getDataType()).usageData(energy.getUsageData())
.meterId(energy.getMeterId()).meterName(energy.getMeterName()).meterType(energy.getMeterType())
.reading(energy.getReading()).totalSurface(energy.getTotalSurface()).build();
// 将能源测试数据保存到数据库
futures.add(executorService.submit(() -> {
energyTestService.save(test);
return "null";
}));
}
//获取异步任务执行结果
for (Future<?> future : futures) {
future.get();
}
// 记录结束时间
long end = System.currentTimeMillis();
// 返回执行时间
return end - start + "ms";
}
方案3:saveBatch方案
public String dbDataTest3() {
// 获取能源数据列表
List<Energy> dataList = getEnergy();
// 记录开始时间
long start = System.currentTimeMillis();
// 创建一个用于存储 EnergyTest 对象的列表
List<EnergyTest> testList = new ArrayList<>();
// 遍历数据列表,将每个能源数据转换为能源测试数据并添加到 testList 中
for (Energy energy : dataList) {
// 创建能源测试数据对象
EnergyTest test = EnergyTest.builder().id(energy.getId()).buildId(energy.getBuildId()).buildName(energy.getBuildName())
.collectionTime(energy.getCollectionTime()).dataType(energy.getDataType()).usageData(energy.getUsageData())
.meterId(energy.getMeterId()).meterName(energy.getMeterName()).meterType(energy.getMeterType())
.reading(energy.getReading()).totalSurface(energy.getTotalSurface()).build();
// 将能源测试数据添加到 testList 中
testList.add(test);
}
// 将 testList 中的所有能源测试数据批量保存到数据库
energyTestService.saveBatch(testList);
// 记录结束时间
long end = System.currentTimeMillis();
// 返回执行时间
return end - start + "ms";
}
方案4:多线程saveBatch方案
public String dbDataTest4() throws ExecutionException, InterruptedException {
// 获取能源数据列表
List<Energy> dataList = getEnergy();
// 记录开始时间
long start = System.currentTimeMillis();
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
// 用于存储分批后的能源测试数据
Map<String, List<EnergyTest>> testListMap = new HashMap<>(8);
// 标记当前批次
int saveFlag = 0;
// 遍历数据列表,将每个能源数据转换为能源测试数据并添加到对应的批次中
for (Energy energy : dataList) {
EnergyTest test = EnergyTest.builder().id(energy.getId()).buildId(energy.getBuildId()).buildName(energy.getBuildName())
.collectionTime(energy.getCollectionTime()).dataType(energy.getDataType()).usageData(energy.getUsageData())
.meterId(energy.getMeterId()).meterName(energy.getMeterName()).meterType(energy.getMeterType())
.reading(energy.getReading()).totalSurface(energy.getTotalSurface()).build();
// 如果当前批次的列表不存在或大小超过1000,则创建新的批次
if (!testListMap.containsKey(String.valueOf(saveFlag)) || testListMap.get(String.valueOf(saveFlag)).size() >= 1000) {
saveFlag++;
testListMap.put(String.valueOf(saveFlag), new ArrayList<>());
}
// 将能源测试数据添加到当前批次的列表中
testListMap.get(String.valueOf(saveFlag)).add(test);
}
// 创建一个用于存储异步任务执行结果的列表
List<Future<?>> futures = new ArrayList<>();
// 遍历批次列表,将每个批次的能源测试数据批量保存到数据库
for (Map.Entry<String, List<EnergyTest>> entry : testListMap.entrySet()) {
List<EnergyTest> testList = entry.getValue();
// 提交异步任务,将当前批次的数据批量保存到数据库
futures.add(executorService.submit(() -> {
energyTestService.saveBatch(testList);
return "null";
}));
}
// 获取异步任务执行结果
for (Future<?> future : futures) {
future.get();
}
// 记录结束时间
long end = System.currentTimeMillis();
// 返回执行时间
return end - start + "ms";
}
2. 第一次测试(不设置rewriteBatchedStatements=true)
测试批次/耗时 | 逐个保存方案 | 多线程逐个保存方案 | saveBatch方案 | 多线程saveBatch方案 |
1 | 1461ms | 514ms | 432ms | 167ms |
2 | 1432ms | 544ms | 416ms | 170ms |
3 | 1347ms | 539ms | 428ms | 163ms |
4 | 1288ms | 486ms | 413ms | 184ms |
5 | 1434ms | 560ms | 440ms | 168ms |
6 | 1460ms | 513ms | 462ms | 188ms |
7 | 1453ms | 480ms | 466ms | 194ms |
8 | 1435ms | 477ms | 459ms | 170ms |
9 | 1508ms | 491ms | 408ms | 160ms |
10 | 1437ms | 484ms | 417ms | 178ms |
最大值 | 1508ms | 560ms | 466ms | 194ms |
最小值 | 1288ms | 477ms | 408ms | 160ms |
平均值 | 1425.5ms | 508.8ms | 434.1ms | 174.2ms |
通过十次测试数据,虽然还有偏差,但也具体有些参考的价值,首先是逐个保存的方案效率最低,多线程的方式会提高很多,而saveBatch明显要比多线程的方式更好,saveBatch并没有对多条SQL进行合并,可能saveBatch的线程数量多一些,这里我将多线程逐个保存方案自定义的线程池内线程数量调整为10,耗时基本和saveBatch的相同,甚至还比saveBatch要快一些,而调大线程池的逐个保存方案在300ms左右达到瓶颈,很难再根据线程数量将耗时降低。这里多线程saveBatch的方案明显是最快的,应该是saveBatch还有一些其他方式的优化。
3. 第二次测试(设置rewriteBatchedStatements=true)
测试批次/耗时 | 逐个保存方案 | 多线程逐个保存方案 | saveBatch方案 | 多线程saveBatch方案 |
1 | 1536ms | 505ms | 244ms | 106ms |
2 | 1591ms | 495ms | 277ms | 89ms |
3 | 1628 | 510ms | 261ms | 96ms |
4 | 1618ms | 487ms | 281ms | 100ms |
5 | 1581ms | 519ms | 258ms | 111ms |
6 | 1655ms | 515ms | 264ms | 112ms |
7 | 1618ms | 508ms | 271ms | 103ms |
8 | 1507ms | 519ms | 282ms | 98ms |
9 | 1531ms | 509ms | 280ms | 85ms |
10 | 1651ms | 507ms | 287ms | 96ms |
最大值 | 1655ms | 519ms | 287ms | 112ms |
最小值 | 1507ms | 487ms | 244ms | 85ms |
平均值 | 1591.6ms | 507.4ms | 270.5ms | 99.6ms |
通过对比第一次测试的结果可以看出来,逐个保存和多线程逐个保存的原理都是每次执行一条SQL语句,所以在性能上没有任何优化提升,而saveBatch则提升了40~50%。
4. 总结rewriteBatchedStatements=true的作用
4.1 JDBC批处理机制
JDBC批处理机制是一种优化数据库操作性能的技术,允许将多条SQL语句作为一个批次发送到数据库服务器执行,从而减少客户端与数据库之间的交互次数,显著提高性能。通常用于批量插入、批量更新和批量删除等场景。具体的流程如下:
//创建 PreparedStatement 对象,用于定义批处理的 SQL 模板。
PreparedStatement pstmt = conn.prepareStatement(sql);
for (Data data : dataList) {
// 多次调用 addBatch() 方法,每次调用都会将一条 SQL 加入批处理队列。
pstmt.addBatch();
}
//执行批处理,调用 executeBatch() 方法,批量发送 SQL 并执行。
pstmt.executeBatch();
4.2 MySQL JDBC 驱动的默认行为对批处理的影响
未开启重写:在默认状态下,MySQL JDBC驱动会逐一条目地发送批处理中的SQL语句,未开启重写功能。
性能瓶颈:频繁的网络交互以及数据库解析操作,使得批量操作的性能提升效果有限,形成了性能瓶颈。
4.3 rewriteBatchedStatements=true
启用批处理重写:启用批处理重写功能后,驱动能够将多条同类型的SQL语句进行合并,进而发送给数据库执行。
减少网络交互:一次发送多条SQL,可有效降低网络延迟,减少网络交互次数。
提高执行效率:当所有数据都通过一条SQL插入时,MySQL只需要解析一次SQL,降低了解析和执行的开销。
减少内存消耗:虽然批量操作时将数据合并到一条SQL中,理论上会增加内存使用(因为需要构建更大的SQL字符串),但相比多次单条插入的网络延迟和处理开销,整体的资源消耗和执行效率是更优的。
未开启参数时的批处理SQL:
INSERTINTO question (exam_id, content) VALUES (?, ?);
INSERT INTO question (exam_id, content) VALUES (?, ?);
INSERT INTO question (exam_id, content) VALUES (?, ?);
开启参数后的批处理 SQL:
INSERT INTO question (exam_id, content) VALUES (?, ?), (?, ?), (?, ?);