基于java线程池和EasyExcel实现数据异步导入
基于java线程池和EasyExcel实现数据异步导入
2.代码实现
2.1 controller层
@PostMapping("import")
public void importExcel(MultipartFile file) throws IOException {
importService.importExcelAsync(file);
}
2.2 service层
@Resource
private SalariesListener salariesListener;
private ExecutorService executorService = Executors.newFixedThreadPool(20);
public void importExcelAsync(MultipartFile file) {
// 开20个线程分别处理20个sheet
List<Callable<Object>> tasks = new ArrayList<>();
for (int i = 0; i < 20; i++) {
int num = i;
tasks.add(() -> {
EasyExcel.read(file.getInputStream(), Salaries.class, salariesListener)
.sheet(num).doRead();
return null;
});
}
try {
//等待所有任务完成
executorService.invokeAll(tasks);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
2.3实体
@Data
@TableName("salaries")
public class Salaries {
private Integer empNo;
private Integer salary;
private Date fromDate;
private Date toDate;
}
2.4easyExcel 监听
import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.read.listener.ReadListener;
import com.baomidou.mybatisplus.extension.service.IService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.woniu.domain.Salaries;
import com.woniu.mapper.SalariesMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
@Component
public class SalariesListener extends ServiceImpl<SalariesMapper, Salaries> implements ReadListener<Salaries>, IService<Salaries> {
private static final Log logger = LogFactory.getLog(SalariesListener.class);
//创建一个线程池,用于异步保存数据
private ExecutorService executorService = Executors.newFixedThreadPool(20);
//创建一个线程安全的list,用于存储读取到的数据,使用ThreadLocal保证线程安全
private ThreadLocal<ArrayList<Salaries>> salariesList = ThreadLocal.withInitial(ArrayList::new);
//用于统计是第几次插入
private static AtomicInteger count = new AtomicInteger(1);
//设定需要异步批量插入的条数
private static final int batchSize = 10000;
@Resource
private SalariesListener salariesListener;
@Override
@Transactional(rollbackFor = Exception.class)
public void invoke(Salaries data, AnalysisContext context) {
//读取excel每一行的数据,添加到list中
salariesList.get().add(data);
//如果list的数据大于设定需要异步批量插入的条数,则执行异步插入
if (salariesList.get().size() >= batchSize) {
asyncSaveData();
}
}
public void saveData() {
if (!salariesList.get().isEmpty()) {
saveBatch(salariesList.get(), salariesList.get().size());
logger.info("第" + count.getAndAdd(1) + "次插入" + salariesList.get().size() + "条数据");
salariesList.get().clear();
}
}
public void asyncSaveData() {
if (!salariesList.get().isEmpty()) {
ArrayList<Salaries> salaries = (ArrayList<Salaries>) salariesList.get().clone();
executorService.execute(new SaveTask(salaries, salariesListener));
salariesList.get().clear();
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public void doAfterAllAnalysed(AnalysisContext context) {
logger.info("一个Sheet全部处理完");
//考虑每个sheet批量插入数据的条数少于异步插入的条数
asyncSaveData();
}
//创建一个线程类,用于异步保存数据
static class SaveTask implements Runnable {
private List<Salaries> salariesList;
private SalariesListener salariesListener;
public SaveTask(List<Salaries> salariesList, SalariesListener salariesListener) {
this.salariesList = salariesList;
this.salariesListener = salariesListener;
}
@Override
public void run() {
salariesListener.saveBatch(salariesList);
//打印第几次插入,每次插入的数据
logger.info("第" + count.getAndAdd(1) + "次插入" + salariesList.size() + "条数据");
}
}
}
2.5 建表语句
CREATE TABLE `salaries` (
`emp_no` int(11) DEFAULT NULL COMMENT '员工号',
`salary` int(11) DEFAULT NULL,
`from_date` datetime DEFAULT NULL,
`to_date` datetime DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
spring:
servlet:
multipart:
max-request-size: 30MB
max-file-size: 1024MB
datasource:
username: root
password: root
url: jdbc:mysql://127.0.0.1:3306/llp?rewriteBatchedStatements=true&characterEncoding=utf-8&allowPublicKeyRetrieval=true&useSSL=false&serverTimezone=Asia/Shanghai
main:
allow-circular-references: true
3.测试验证
可以看到导入95万多条数据,耗时差不多在一份多钟
- 导入开始时间
- 导入结束时间
- 入库数据