当前位置: 首页 > article >正文

SpringBoot中使用多线程ThreadPoolTaskExecutor+CompletableFuture

SpringBoot中使用多线程ThreadPoolTaskExecutor+CompletableFuture

定义一个线程池,并将其注入为bean

我使用的是spring提供的线程池,所以不需要写关闭的逻辑

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
@EnableAsync
public class ExecutorConfig {
    @Bean(name = {"saveExecutor"})
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor =  new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);//核心线程数
        executor.setMaxPoolSize(1000);// 最大线程数
        executor.setQueueCapacity(500);// 任务队列容量
        executor.setThreadNamePrefix("taskExecutor-");
        executor.initialize(); //初始化线程池
        return executor;
    }
}

使用

import cn.hutool.core.collection.ListUtil;
import com.train.demo.entities.User;
import com.train.demo.repository.UserRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

@Service
@Slf4j
public class UserService {
    private final UserRepository userRepository;
    private final Executor saveExecutor;
    @Autowired
    public UserService(UserRepository userRepository,
                       // 依赖注入,一定要使用@Qualifier指定bean名称,不然有问题
                       @Qualifier("saveExecutor") Executor saveExecutor) {
        this.userRepository = userRepository;
        this.saveExecutor = saveExecutor;
    }
  
    public boolean addAllUsers() {
      // 定义一个数组,并生成1000000条数据
        ArrayList<User> users = new ArrayList<User>(){{
            for (int i = 0; i < 1000000; i++) {
                add(new User(null,"user"+i,2,"男"));
            }
        }};

        int count = 0;
      	// 定义一个接收“CompletableFuture返回”的list
        List<CompletableFuture<Integer>> futureList = new ArrayList<>();
        while( count < users.size() ){
          // 这里只是计算剩下的数据量够不够1000,结果作为截取list的参数
            int size = users.subList( count, users.size()-1).size() > 9999 ? 9999
                    : users.subList( count, users.size()-1).size();
          // 调用异步方法
            CompletableFuture<Integer> future = addAllUsersImpl(users.subList(count, count += size));
            futureList.add(future);
            count++;
        }
        futureList.forEach(CompletableFuture::join);//阻塞所有的异步方法全执行完毕再向下执行
        return true;
    }
    @Async
    protected CompletableFuture<Integer> addAllUsersImpl(List<User> users) {
      	// 此处使用spring线程池作为CompletableFuture线程池参数,并且异步执行
        return CompletableFuture.supplyAsync(() -> {
            log.info("save users thread[{}]", Thread.currentThread().getName());
            List<User> list = ListUtil.list(false, userRepository.saveAll(users));// 存储到数据库
            return list.size();// 返回存储的数据量
        }, saveExecutor);
    }
}

本地mysql实测,99.9万条数据,耗时27s


http://www.kler.cn/a/385327.html

相关文章:

  • Docker部署Oracle 11g
  • Hadoop简介及单点伪分布式安装
  • 使用最新版的wvp和ZLMediaKit搭建Gb28181测试服务器
  • 安卓图片的着色教程(tint的使用)
  • 【sqlmap使用手册-持续更新中】
  • Spark 的介绍与搭建:从理论到实践
  • 代码随想录算法训练营Day55 | 图论理论基础、深度优先搜索理论基础、卡玛网 98.所有可达路径、797. 所有可能的路径、广度优先搜索理论基础
  • 一级注册消防工程师重点笔记
  • ICT网络赛道安全考点知识总结4
  • SearchGPT 网络搜索功能利用了微软必应的搜索技术
  • 2024网鼎杯web1+re2 wp
  • O-RAN前传Spilt Option 7-2x
  • RPA是什么,RPA有什么作用?
  • 如何在 Docker 容器中启动 X11 图形界面程序
  • 前端UniApp面试题及参考答案(100道题)
  • C++ enable_shared_from_this使用详解
  • 函数基础练习(Python)
  • 少儿编程启蒙学习
  • python爬虫指南——初学者避坑篇
  • leetcode | 88. 合并两个有序数组
  • WebSocket 及时通信 - 2024最新版前端秋招面试短期突击面试题【100道】
  • 远程控制项目第二天
  • 【GPTs】Email Responder Pro:高效生成专业回复邮件
  • mysql分布式锁
  • CSS实现图片3D立体效果
  • 苍穹外卖学习记录