当前位置: 首页 > article >正文

基于java线程池和EasyExcel实现数据异步导入

基于java线程池和EasyExcel实现数据异步导入

2.代码实现

2.1 controller层

    @PostMapping("import")
    public void importExcel(MultipartFile file) throws IOException {
        importService.importExcelAsync(file);
    }

2.2 service层

@Resource
private SalariesListener salariesListener;

private ExecutorService executorService = Executors.newFixedThreadPool(20);

public void importExcelAsync(MultipartFile file) {
    // 开20个线程分别处理20个sheet
    List<Callable<Object>> tasks = new ArrayList<>();
    for (int i = 0; i < 20; i++) {
        int num = i;
        tasks.add(() -> {
            EasyExcel.read(file.getInputStream(), Salaries.class, salariesListener)
                    .sheet(num).doRead();
            return null;
        });
    }

    try {
        //等待所有任务完成
        executorService.invokeAll(tasks);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }

}

2.3实体

@Data
@TableName("salaries")
public class Salaries {
    private Integer empNo;
    private Integer salary;
    private Date fromDate;
    private Date toDate;
}

2.4easyExcel 监听

import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.read.listener.ReadListener;
import com.baomidou.mybatisplus.extension.service.IService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.woniu.domain.Salaries;
import com.woniu.mapper.SalariesMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

@Component
public class SalariesListener extends ServiceImpl<SalariesMapper, Salaries> implements ReadListener<Salaries>, IService<Salaries> {

    private static final Log logger = LogFactory.getLog(SalariesListener.class);

    //创建一个线程池,用于异步保存数据
    private ExecutorService executorService = Executors.newFixedThreadPool(20);

    //创建一个线程安全的list,用于存储读取到的数据,使用ThreadLocal保证线程安全
    private ThreadLocal<ArrayList<Salaries>> salariesList = ThreadLocal.withInitial(ArrayList::new);

    //用于统计是第几次插入
    private static AtomicInteger count = new AtomicInteger(1);
    
    //设定需要异步批量插入的条数
    private static final int batchSize = 10000;

    @Resource
    private SalariesListener salariesListener;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void invoke(Salaries data, AnalysisContext context) {
        //读取excel每一行的数据,添加到list中
        salariesList.get().add(data);
        //如果list的数据大于设定需要异步批量插入的条数,则执行异步插入
        if (salariesList.get().size() >= batchSize) {
            asyncSaveData();
        }
    }

    public void saveData() {
        if (!salariesList.get().isEmpty()) {
            saveBatch(salariesList.get(), salariesList.get().size());
            logger.info("第" + count.getAndAdd(1) + "次插入" + salariesList.get().size() + "条数据");
            salariesList.get().clear();
        }
    }

    public void asyncSaveData() {
        if (!salariesList.get().isEmpty()) {
            ArrayList<Salaries> salaries = (ArrayList<Salaries>) salariesList.get().clone();
            executorService.execute(new SaveTask(salaries, salariesListener));
            salariesList.get().clear();
        }
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void doAfterAllAnalysed(AnalysisContext context) {
        logger.info("一个Sheet全部处理完");
        //考虑每个sheet批量插入数据的条数少于异步插入的条数
        asyncSaveData();
    }

    //创建一个线程类,用于异步保存数据
    static class SaveTask implements Runnable {

        private List<Salaries> salariesList;
        private SalariesListener salariesListener;

        public SaveTask(List<Salaries> salariesList, SalariesListener salariesListener) {
            this.salariesList = salariesList;
            this.salariesListener = salariesListener;
        }

        @Override
        public void run() {
            salariesListener.saveBatch(salariesList);
            //打印第几次插入,每次插入的数据
            logger.info("第" + count.getAndAdd(1) + "次插入" + salariesList.size() + "条数据");
        }
    }
}

2.5 建表语句

CREATE TABLE `salaries` (
  `emp_no` int(11) DEFAULT NULL COMMENT '员工号',
  `salary` int(11) DEFAULT NULL,
  `from_date` datetime DEFAULT NULL,
  `to_date` datetime DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
spring:
  servlet:
    multipart:
      max-request-size: 30MB
      max-file-size: 1024MB
  datasource:
    username: root
    password: root
    url: jdbc:mysql://127.0.0.1:3306/llp?rewriteBatchedStatements=true&characterEncoding=utf-8&allowPublicKeyRetrieval=true&useSSL=false&serverTimezone=Asia/Shanghai
  main:
    allow-circular-references: true

3.测试验证

可以看到导入95万多条数据,耗时差不多在一份多钟

  • 导入开始时间

image-20250123102709029

  • 导入结束时间

image-20250123102727318

  • 入库数据

image-20250123102825151


http://www.kler.cn/a/516420.html

相关文章:

  • 什么是稀疏 MoE?Doubao-1.5-pro 如何以少胜多?
  • 抖音小程序一键获取手机号
  • C语言操作符(上)
  • 零售业革命:改变行业的顶级物联网用例
  • Arweave的出块原理
  • Golang Gin系列-4:Gin Framework入门教程
  • 【Kong Gateway】全面解析Kong Gateway:服务、路由、upstream、插件的核心概念介绍
  • 【自然语言处理(NLP)】介绍、发展史
  • springboot 配置redis
  • 3b1b线性代数基础
  • 蓝桥杯lesson3---string的使用
  • RabbitMQ 匿名队列详解
  • Elasticsearch的经典面试题及详细解答
  • CrypTen项目实践
  • 人工智能学习(一)之python入门
  • STM32的ADC工作模式
  • Linux 主流桌面环境及其默认应用大横评
  • 面向对象和面向过程的区别
  • 从ChatGPT热潮看智算崛起
  • Unity3D 动态骨骼性能优化详解
  • 对grid布局有哪些了解【css】
  • el-dialog内容大于高度时可滑动
  • python自动生成pg数据库表对应的es索引
  • Day21-【软考】短文,计算机网络开篇,OSI七层模型有哪些协议?
  • C++ 通过域名获取服务器ip(跨平台)
  • 【2024 CSDN博客之星】个人收获分享