用最小的代价解决mybatis-plus关于批量保存的性能问题
1.问题说明
问题背景说明,在使用达梦数据库时,mybatis-plus的serviceImpl.saveBatch()方法或者updateBatchById()方法的时候,随着数据量、属性字段的增加,效率越发明显的慢。
serviceImpl.saveBatch();
serviceImpl.updateBatchById();
2.mysql的解决思路
如果你使用的是mysql的话,可以参考如下这个老哥的文章https://www.cnblogs.com/ajianbeyourself/p/18344695。改起来也简单,也就是配置参数加个属性。
spring.datasource.url=jdbc:mysql://localhost:3306/your_database?rewriteBatchedStatements=true
总结下就是:在 MyBatis-Plus 中启用 rewriteBatchedStatements 主要是为了提高批量插入/更新操作的性能。rewriteBatchedStatements 是 MySQL JDBC 驱动程序中的一个参数,用于将批量操作转换为单个 SQL 语句,以提高执行效率。
mysql的rewriteBatchedStatements属性,可以将多个SQL语句转化为一个sql语句。
这里我就不在啰嗦mysql的优化了,主要是针对其他数据库驱动没有rewriteBatchedStatements支持的情况下,我们该怎么优化,并且代价最小。
3.性能演示
以下代码是一个示例,先调用remove清空所有数据,然后记录开始时间,等待saveBatch以后,然后记录消耗时间
String oldData = JsonUtils.readFile("D:\\xxxx\\restdata\\" +"NR_RES_CHANNELROUTENODE_M" + ".json");
List<NrResChannelroutenodeM> list = JsonUtils.strToListBean(oldData, NrResChannelroutenodeM.class);
channelroutenodeMService.remove(null);
long start = System.currentTimeMillis();
channelroutenodeMService.saveBatch(list);
long end = System.currentTimeMillis() - start;
log.info("保存:{},数据总量:{},消耗时间:{}秒","List<NrResChannelroutenodeM>", list.size() , end / 1000f);
不然发现,使用mybatis-plus的原始saveBatch,基于NrResChannelroutenodeM这个实体来说,数据量约46万,消耗时间,大概110秒。这是在我本地的测试情况,实际上在用户现场的开发测试机上,这里的保存的时间已经超过了15分钟(原因有很多, 客户现场电脑配置较低,客户现场的部署环境有大概120个这样的批量保存,我这里只单独测试这一个所以只用110秒)。
接下来我们注释掉saveBatch,改成我自己编写的批量保存。这接近46万的数据量,我们切割成459份,一份保存1000条,1个线程保存需要17秒。
提升效果:110秒 -> 17秒
接下来,我改成500条数据一份,切割成918分,可以发现,性能还能更快,大概提升了2.5秒钟。也就是说,针对这个实体的数据来说,每次更新500条,比更新1000条快那么一小丢。
提升效果:17.1秒 -> 14.6秒
接下来,我将线程数提升到5:即5个线程运行,可以发现。时间来到了4.6秒,从最开始的110秒,到现在的4.6秒,这个提升很夸张了
提升效果:14.6秒 -> 4.6秒
4.优化思路
这里提一下,之前说的mysql是如何优化的。
mysql的rewriteBatchedStatements属性,可以将多个SQL语句转化为一个sql语句。
所以我思路也一样,如果是其他的非mysql,那就是把多个sql拼接成一个sql。
简单说,mybatis-plus执行批量保存到了数据库的时候,是下面这样的,
INSERT INTO users (name, email, age) VALUES ('John Doe', 'johndoe@example.com', 18);
INSERT INTO users (name, email, age) VALUES ('John Doe1', 'johndoe@example.com', 18);
INSERT INTO users (name, email, age) VALUES ('John Doe2', 'johndoe@example.com', 18);
INSERT INTO users (name, email, age) VALUES ('John Doe3', 'johndoe@example.com', 18);
而我们要的批量保存到了数据库执行的时候应该是下面这样的,
INSERT INTO users (name, email, age) VALUES ('John Doe', 'johndoe@example.com', 18),
VALUES ('John Doe1', 'johndoe@example.com', 18),
VALUES ('John Doe2', 'johndoe@example.com', 18),
VALUES ('John Doe3', 'johndoe@example.com', 18);
伪代码示意
StringBuilder insert = new StringBuilder();
insert.append(INSERT INTO).append(表名);
insert.append(表字段);
for insert.append(字段对应的值);
4.1.准备一个数据库实体类
任意实体类即可,例如如下这种实体类。说明下实体类的要求,我们需要从这个实体类中提取出哪些信息来:
表名:获取@TableName(“SG_PULL_CONFIG”)或者获取类名
字段名:获取@TableId(“TABLE_NAME”)或者获取属性名
@Data
@TableName("SG_PULL_CONFIG")
public class SgPullConfig implements Serializable {
private static final long serialVersionUID = 1L;
@TableId("TABLE_NAME")
private String tableName;
@TableField("DATA_URL")
private String dataUrl;
@TableField(value = "CREATE_TIME", fill = FieldFill.INSERT)
private Date createTime;
}
4.2.获取实体类对应的表名
/**
* @description:获取数据库实体类的的表名:TableName或者类名转驼峰
* @author:hutao
* @mail:hutao1@epri.sgcc.com.cn
* @date:2024年12月5日 上午11:10:01
*/
public static String getTableName(Object object) {
String name ="";
TableName annotation = object.getClass().getAnnotation(TableName.class);
if(annotation != null) {
name = annotation.value();
}else {
name = object.getClass().getSimpleName();
name = QueryService.humpToLine(name);
name = name.toUpperCase();
}
return name;
}
4.3.获取数据表的属性字段
/**
* @description:获取数据库实体类的字段名
* @author:hutao
* @mail:hutao1@epri.sgcc.com.cn
* @date:2024年12月5日 上午11:09:43
*/
public static List<String> getFields(Object object){
Field[] fields = object.getClass().getDeclaredFields();
List<String> list = new ArrayList<>(fields.length);
for (Field field : fields) {
field.setAccessible(true);
try {
//TableId修饰的主键排除自增
TableId tableId = field.getAnnotation(TableId.class);
if (tableId != null && !IdType.AUTO.equals(tableId.type())) {
list.add(tableId.value());
continue;
}
//TableField修饰的属性字段排除不存在的字段
TableField tableField = field.getAnnotation(TableField.class);
if (tableField != null && tableField.exist()) {
list.add(tableField.value());
continue;
}
//使用属性名和数据库字段名进行匹配的
if(tableId == null && tableField == null && !"serialVersionUID".equals(field.getName())) {
list.add(QueryService.humpToLine(field.getName()));
}
} catch (Exception e) {
log.info("获取实体类的TableId和TableField异常");
}
}
return list;
}
4.4.获取数据表的属性值
这里需要注意一下:获取的属性值,需要对字符串和时间做特殊处理。如下图所示,看VALUES里面的部分,如果是字符串,我们需要添加单引号。
/**
* @description:获取数据库实体类的属性值
* @author:hutao
* @mail:hutao1@epri.sgcc.com.cn
* @date:2024年12月5日 上午11:09:17
*/
public static List<Object> getdValues(Object object){
Field[] fields = object.getClass().getDeclaredFields();
List<Object> list = new ArrayList<>(fields.length);
for (Field field : fields) {
field.setAccessible(true);
try {
//TableId修饰的主键排除自增
TableId tableId = field.getAnnotation(TableId.class);
if ((tableId != null && !IdType.AUTO.equals(tableId.type()))) {
list.add(getSqlValueByType(field.get(object), field));
continue;
}
//TableField修饰的属性字段排除不存在的字段
TableField tableField = field.getAnnotation(TableField.class);
if (tableField != null && tableField.exist()) {
list.add(getSqlValueByType(field.get(object), field));
continue;
}
//使用属性名和数据库字段名进行匹配的
if(tableId == null && tableField == null && !"serialVersionUID".equals(field.getName())) {
list.add(getSqlValueByType(field.get(object), field));
}
} catch (Exception e) {
log.info("获取实体类的TableId和TableField异常");
}
}
return list;
}
private static Object getSqlValueByType(Object value, Field field) {
if(value == null) {
return null;
}
if(field.getType() == String.class) {
return "'" + value + "'";
}
if(field.getType() == Date.class) {
return "'" + DateUtils.getStrDate((Date)value, null) + "'";
}
return value;
}
4.5.用:表名_字段名_字段值---->拼接SQL
public static String getBatchInsertSql(List<?> list) {
StringBuilder insert = new StringBuilder();
String tableName = getTableName(list.get(0));
List<String> fields = getFields(list.get(0));
insert.append("INSERT INTO ").append(tableName).append("(");
fields.forEach(temp -> insert.append(temp).append(","));
insert.deleteCharAt(insert.length() - 1);
insert.append(") VALUES ");
StringBuilder valueTemp = null;
for (Object temp : list) {
valueTemp = new StringBuilder();
insert.append("(");
List<Object> values = getdValues(temp);
for (Object value : values) {
valueTemp.append(value).append(",");
}
valueTemp.deleteCharAt(valueTemp.length() - 1);
insert.append(valueTemp.toString());
insert.append("),");
}
insert.deleteCharAt(insert.length() - 1);
return insert.toString();
}
4.6.执行拼接的sql语句
//注入SqlRunner
@SpringBootConfiguration
@MapperScan(basePackages = "com.map.**.mapper")
@EnableTransactionManagement
public class MybatisConfig {
@Bean
public SqlRunner sqlRunner() {
return new SqlRunner();
}
}
//SqlRunner 执行sql语句
String sql = getBatchInsertSql(list);
sqlRunner.insert(sql);
5.1多线程优化
目前为止我的代码如下,其中DB database无视就好了,这是我多数据源的时候切换数据源用的
- 先把数据切割成N份
- 创建线程池(最长线程数是)
- 现场池提交任务
- 主线程等待线程池的任务执行完毕
/**
* @description:批量保存
* @author:hutao
* @mail:hutao1@epri.sgcc.com.cn
* @date:2024年12月9日 下午2:36:10
*/
public <T> void saveBatch(IService<T> service, List<T> list, DB database) {
if(ObjectUtils.isEmpty(list)) {
log.error("list数据为空!");
}
if(list.size() <= DEFAULT_BATCH_SIZE) {
service.saveBatch(list);
}else {
//限制批量保存最大的线程为5
int batchThread = Math.min(list.size() / DEFAULT_BATCH_SIZE, 5);
bigDataSave(list, database , DEFAULT_BATCH_SIZE, batchThread);
}
}
/**
* @description:大数据量的数据保存
* @author:hutao
* @mail:hutao1@epri.sgcc.com.cn
* @date:2024年12月4日 下午4:43:37
*/
public void bigDataSave(List<?> list, DB database, int size ,int thread) {
if(ObjectUtils.isEmpty(list)) {
return;
}
List<List<?>> splitList = ArraysUtils.splitList(list, size);
log.info("当前数据量:{},切割:{}份", list.size(), splitList.size());
ExecutorService executor = threadPoolService.newFixedThreadPool(thread);
for (int i = 0; i < splitList.size(); i++) {
executor.submit(new BigDataTask(splitList.get(i), database, sqlRunner));
}
threadPoolService.shutdownAndWait(executor);
}
线程池配置
@Component
public class ThreadPoolService {
//最大线程数
private int maximumPoolSize = 20;
public ThreadPoolService threadPoolService() {
return new ThreadPoolService();
}
public ExecutorService newFixedThreadPool(int nThreads) {
//防止线程数太大,印制最大为20
return new ThreadPoolExecutor(nThreads, Math.min(nThreads, maximumPoolSize),
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, maximumPoolSize,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public void shutdownAndWait(ExecutorService executor) {
executor.shutdown();
while (!executor.isTerminated()){};
}
}
多线程的子任务
@Log4j2
@AllArgsConstructor
public class BigDataTask implements Runnable {
//以下属性使用构造方法注入进来的,因为自己new BigDataTask,BigDataTask不是spring托管,因此无法使用Spring注入进来
private List<?> list;
private DB database;
private SqlRunner sqlRunner;
@Override
public void run() {
//这个代码是用来切换数据源的
DataSourceContextHolder.setDataSource(database.getEnumId());
try {
String sql = SaveBatchSerive.getBatchInsertSql(list);
sqlRunner.insert(sql);
} catch (Exception e) {
log.info("任务:BigDataTask,处理失败,失败原因:{}", e);
}
DataSourceContextHolder.removeDataSource();
}
}
在批量保存的地方调用即可,如下所示
优点:
- 入门难度低,不需要对mybatis-plus做任何修改,减少对mybatis-plus技术的研究工作量
- 操作可控,仅针对性能有问题的地方将xxxxService.saveBatch(list)改为我们自己编写的saveBatchSerive.saveBatch()即可,是局部性,而不是全局的,不至于出现为了修改某个地方的saveBatch()导致所有的saveBatch()都出现问题
- 可以合理自己配置适合自己的线程数以提升效率(并不是线程数越多越好)详情可以看我以前的介绍:系统适合开启多少线程数量?
saveBatchSerive.saveBatch(channelroutenodeMService, list, DB.从库);
//saveBatchSerive.bigDataSave(list, DB.从库 , 500, 1);
//channelroutenodeMService.saveBatch(list);
缺点:
1.没有在底层修改,如果开发团队其他开发成员调用原生的mybatis-plus,saveBatch时,还会出现性能问题
2.无法对已经编写的代码进行优化,需要将历史代码中的saveBatch替换成自己的。
5其他优化方式-替换saveBatch
具体实现方式参考:我这里就不废话了,但是我并不推荐这种方式,
https://openatomworkshop.csdn.net/6645aa50b12a9d168eb6bd90.html
大概思路如下:
- 编写一个RootMapper/RootService来替换原来的BaseMapper/IService
- 自己编写批量保存代码
- 业务Mapper/业务Service继承(实现)时,用RootMapper、RootService
- 批量保存的时候,用的是RootMapper的批量保存,不是BaseMapper的批量保存