当前位置: 首页 > article >正文

java高性能处理10G大文件

背景:读取10G csv文件,然后 根据交易种类分分组,找出每个交易种类的交易最大值,最后给出最大的那个交易信息。

难点:最主要的是怎么快速读文件?

涉及的功能点:MappedByteBuffer 读文件方式、异步返回处理、treeset排序、线程池处理、分段切割排序

处理方式:

1,使用 MappedByteBuffer 读文件 。这里最主要是怎么提取csv中需要的列,怎么划分行,很麻烦(根据\r 字符识别换行,根据,逗号识别列)(漏洞:这里没有处理切割临界数据,太麻烦了

2,多线程分块读取  (可以分块读取的前提是:可以指定文件内容下标读取)

3,把所有文件放到集合中

4,分组处理,异步 在每个分组中找出最大值(如果每个组的数据很多,那么且是单向比较,可以分段找出最大值)

5,最终比较每个分组的最大值。就是最终结果

MappedByteBuffer 可以先获取文件内容的总长度,然后根据机器线程处理核数,把文件分割成对应的核数个数。然后在各自的线程中做单线程处理。

这样就是最快的。重复利用了机器的处理能力。

线程的数量不能多,否则即使多了,只会徒增线程切换带来的消耗,而不能提高性能。

核心代码:

public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();
        if (args.length == 0) {
            path = "D:\\RaceFile\\0002.csv";
//            path = "D:\\RaceFile\\marketdata\\marketdata.csv";
        } else {
            path = args[0];
        }
        raf = new RandomAccessFile(new File(path), "r");

        FileChannel channel = raf.getChannel();
        //文件内容总长度
        long fileSize = channel.size();
        // 获取头文件,把文件头剔除掉
        MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, 3000);
        int headIndex = readHead(buffer, fileSize);
        //分割的每个文件的长度
        long preSize = (long) Math.ceil((fileSize - headIndex) / threadNum);
        ArrayList<FutureTask> threadList = new ArrayList(threadNum);
        // 异步读取每个分区,通过分割个数*文件长度 就可以计算出每个分割块的读取起始下标。
        for (int i = 0; i < threadNum; i++) {
            int finalI = i;
            //异步读文件处理
            Callable callable1 = () -> {
                    return  read(preSize * finalI, preSize);
            };
            FutureTask futureTask1 = new FutureTask(callable1);
            threadList.add(futureTask1);
            new Thread(futureTask1).start();
        }
        //使用callable获取每个文件的读取结果  汇总到一个集合中
        List<HashMap<String, Trade>> trade2ListPre =new ArrayList(threadNum);
        for (int i = 0; i < threadList.size(); i++) {
            trade2ListPre.add((HashMap<String, Trade>) threadList.get(i).get());
        }
        HashMap<String, Trade> allData = new HashMap<>();
        trade2ListPre.stream().forEach(m ->{
            allData.putAll(m);
        });
        long end = System.currentTimeMillis();
//        System.out.println(end - startTime);
        // 遍历所有数据,分组处理
        HashMap<String, TreeSet<Trade>> hashMap = new HashMap();
        for (HashMap.Entry<String, Trade> entry : allData.entrySet()) {
            if (entry == null) {
                continue;
            }
            String key = entry.getKey().substring(0, entry.getKey().indexOf(MARK));
            if (hashMap.containsKey(key) && hashMap.get(key) != null) {
                hashMap.get(key).add(entry.getValue());
            } else {
                TreeSet<Trade> tradeTreeSet = new TreeSet();
                tradeTreeSet.add(entry.getValue());
                hashMap.put(key, tradeTreeSet);
            }
        }
        //再次异步,在每个分组数据中,循环找出最大值
        hashMap.keySet().stream().forEach(m -> {
            threadPool2.execute(() -> {
                TreeSet<Trade> treeSet = hashMap.get(m);
                Trade maxTrade = treeSet.first();
                Trade preTrade = treeSet.first();
                for (Trade curTrade : treeSet) {
                    int volume = curTrade.volume - preTrade.volume;
                    preTrade = curTrade;
                    if (volume > maxTrade.volume) {
                        maxTrade = new Trade(curTrade.time, curTrade.updateTime, curTrade.updateMillisec,
                                curTrade.volume, curTrade.exchangeID, curTrade.instrumentID);
                        maxTrade.volume = volume;
                    }
                }
                if (result.volume < maxTrade.volume) {
                    result = maxTrade;
                }
            });
        });
        //线程池为空的时候 关闭线程池,也意味着数据处理完毕
        threadPool2.shutdown();
        while (!threadPool2.isTerminated()) {
        }
         end = System.currentTimeMillis();
//        System.out.println(end - startTime);
        //输出最终结果
        System.out.println(result);

    }

所有代码:

package test3;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.concurrent.*;

public class Solution1 {

    static int threadNum =  Runtime.getRuntime().availableProcessors();
    static RandomAccessFile raf;
    static String path;
    static Trade result = new Trade();
    static ThreadPoolExecutor threadPool2 =
            new ThreadPoolExecutor(threadNum, threadNum, 10, TimeUnit.MILLISECONDS, new LinkedBlockingDeque());


    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();
        if (args.length == 0) {
            path = "D:\\RaceFile\\0002.csv";
//            path = "D:\\RaceFile\\marketdata\\marketdata.csv";
        } else {
            path = args[0];
        }
        raf = new RandomAccessFile(new File(path), "r");

        FileChannel channel = raf.getChannel();
        //文件内容总长度
        long fileSize = channel.size();
        // 获取头文件,把文件头剔除掉
        MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, 3000);
        int headIndex = readHead(buffer, fileSize);
        //分割的每个文件的长度
        long preSize = (long) Math.ceil((fileSize - headIndex) / threadNum);
        ArrayList<FutureTask> threadList = new ArrayList(threadNum);
        // 异步读取每个分区,通过分割个数*文件长度 就可以计算出每个分割块的读取起始下标。
        for (int i = 0; i < threadNum; i++) {
            int finalI = i;
            //异步读文件处理
            Callable callable1 = () -> {
                    return  read(preSize * finalI, preSize);
            };
            FutureTask futureTask1 = new FutureTask(callable1);
            threadList.add(futureTask1);
            new Thread(futureTask1).start();
        }
        //使用callable获取每个文件的读取结果  汇总到一个集合中
        List<HashMap<String, Trade>> trade2ListPre =new ArrayList(threadNum);
        for (int i = 0; i < threadList.size(); i++) {
            trade2ListPre.add((HashMap<String, Trade>) threadList.get(i).get());
        }
        HashMap<String, Trade> allData = new HashMap<>();
        trade2ListPre.stream().forEach(m ->{
            allData.putAll(m);
        });
        long end = System.currentTimeMillis();
//        System.out.println(end - startTime);
        // 遍历所有数据,分组处理
        HashMap<String, TreeSet<Trade>> hashMap = new HashMap();
        for (HashMap.Entry<String, Trade> entry : allData.entrySet()) {
            if (entry == null) {
                continue;
            }
            String key = entry.getKey().substring(0, entry.getKey().indexOf(MARK));
            if (hashMap.containsKey(key) && hashMap.get(key) != null) {
                hashMap.get(key).add(entry.getValue());
            } else {
                TreeSet<Trade> tradeTreeSet = new TreeSet();
                tradeTreeSet.add(entry.getValue());
                hashMap.put(key, tradeTreeSet);
            }
        }
        //再次异步,在每个分组数据中,循环找出最大值
        hashMap.keySet().stream().forEach(m -> {
            threadPool2.execute(() -> {
                TreeSet<Trade> treeSet = hashMap.get(m);
                Trade maxTrade = treeSet.first();
                Trade preTrade = treeSet.first();
                for (Trade curTrade : treeSet) {
                    int volume = curTrade.volume - preTrade.volume;
                    preTrade = curTrade;
                    if (volume > maxTrade.volume) {
                        maxTrade = new Trade(curTrade.time, curTrade.updateTime, curTrade.updateMillisec,
                                curTrade.volume, curTrade.exchangeID, curTrade.instrumentID);
                        maxTrade.volume = volume;
                    }
                }
                if (result.volume < maxTrade.volume) {
                    result = maxTrade;
                }
            });
        });
        //线程池为空的时候 关闭线程池,也意味着数据处理完毕
        threadPool2.shutdown();
        while (!threadPool2.isTerminated()) {
        }
         end = System.currentTimeMillis();
//        System.out.println(end - startTime);
        //输出最终结果
        System.out.println(result);

    }

    /**
        MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, start, end-3);
           StandardCharsets.UTF_8.decode(buffer).toString()
     */
    public static HashMap<String, Trade> read(long position, long size) throws IOException {
        FileChannel channel = raf.getChannel();
        MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, position, size);
        return readLine(buffer, position, 0, size);
    }

    public static HashMap<String, Trade> readLine(MappedByteBuffer buffer, long position, int start, long end) {
        HashMap<String, Trade> instruTimeGroupMap = new HashMap<>();
        StringBuilder lineBuilder = new StringBuilder();
        int countComma = 0;
        List<String> dataItem = new ArrayList<>();
        dataItem.add(String.valueOf(position));
        for (int i = start + 1; i < end - 1; i++) {
            byte b = buffer.get();
            if (b == '\n') {
                try {
                    if (dataItem.size() == 6) {
                        handleData(dataItem, instruTimeGroupMap,i);
                    }
                } catch (Exception e) {
//                    e.printStackTrace();
                } finally {
                    countComma = 0;
                    dataItem.clear();
                    dataItem.add(String.valueOf(position));
                }
            }
            //获取需要的列
            if(b == ','){
                countComma++;
                if(countComma==11 || countComma==20|| countComma==21|| countComma==22|| countComma==45){
                    dataItem.add(lineBuilder.toString());
                }
                lineBuilder.setLength(0);
            }else{
                lineBuilder.append((char) b);
            }
        }
        return instruTimeGroupMap;
    }
    static final String MARK = "_$_";
    public static void handleData(List<String> columns, HashMap<String, Trade> instruTimeGroupMap,int offset) {//0 19 20 21 22 23 24 25 45
        String minute =columns.get(2).substring(0, 5);
        StringBuilder sb = new StringBuilder();
        sb.append(columns.get(4)).append(MARK).append(minute);
        String key = sb.toString();
        Trade trade =
                new Trade(columns.get(2), columns.get(3), Integer.parseInt(columns.get(1)),
                        columns.get(5), columns.get(4), Long.parseLong(columns.get(0)),offset);
        //同一个合约 保留position最大的
        instruTimeGroupMap.put(key, trade);
    }

    public static int readHead(MappedByteBuffer buffer, long size) throws IOException {
        for (int i = 0; i < size; i++) {
            byte b = buffer.get(i);
            if (b == '\n') {
                return i;
            }
        }
        return 0;
    }
}

class Trade implements Comparable<Trade> {
    long position;
    int offset;
    String time;
    String updateTime;
    String updateMillisec;
    int volume;
    String exchangeID;
    String instrumentID;

    public Trade(){}
    public Trade(String updateTime, String updateMillisec, int volume, String exchangeID, String instrumentID) {
        this.time = updateTime.substring(0, 5);
        this.updateTime = updateTime;
        this.updateMillisec = updateMillisec;
        this.volume = volume;
        this.exchangeID = exchangeID;
        this.instrumentID = instrumentID;
    }
    public Trade(String updateTime, String updateMillisec, int volume, String exchangeID, String instrumentID,long position,int offset) {
        this.time = updateTime.substring(0, 5);
        this.updateTime = updateTime;
        this.updateMillisec = updateMillisec;
        this.volume = volume;
        this.exchangeID = exchangeID;
        this.instrumentID = instrumentID;
        this.position=position;
        this.offset=offset;
    }
    public Trade(String time, String updateTime, String updateMillisec, int volume, String exchangeID,
                 String instrumentID) {
        this.time = time;
        this.updateTime = updateTime;
        this.updateMillisec = updateMillisec;
        this.volume = volume;
        this.exchangeID = exchangeID;
        this.instrumentID = instrumentID;
    }

    @Override
    public int compareTo(Trade o) {
        if(this.position-o.position >0){
            return 1;
        }
        if(this.position-o.position ==0 && this.offset>o.offset){
            return 1;
        }
        return -1;
    }


    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.time).append(",").append(this.exchangeID).append(",").append(this.volume).append(",").append(this.instrumentID);
        return sb.toString();
    }
}

class BadData implements Comparable<BadData>{
    long position;
    int size;

    @Override
    public int compareTo(BadData o) {
        if(this.position>o.position){
            return 1;
        }
        return -1;
    }
}


http://www.kler.cn/news/365187.html

相关文章:

  • webView 支持全屏播放
  • 威胁 Windows 和 Linux 系统的新型跨平台勒索软件:Cicada3301
  • 【2024CANN训练营第二季】使用华为云体验AscendC_Sample仓算子运行
  • filebeat收集日志直接输出到elasticsearch
  • 网站建设中需要注意哪些安全问题?----雷池社区版
  • h5页面与小程序页面互相跳转
  • 使用Vue.js构建响应式Web应用
  • [DB] NSM
  • 【python openai function2json小工具】
  • 在PHP中,读取大文件
  • html+css+js实现Notification 通知
  • 洞察云上风险,主机安全尽在掌握
  • 【LeetCode】1297、子串的最大出现次数
  • AI 开启财富密码:探索多元化赚钱之路
  • 【PyTorch][chapter31][transformer-4]
  • sharpkeys-键盘部分按键不好用,用其它不常用按键代替
  • 【机器学习】13. 决策树
  • 现代Web界面交互新利器!来探一探这个魔法组件库——MagicUI
  • 【SPIE出版,EI检索稳定】2024年人机交互与虚拟现实国际会议(HCIVR 2024,11月15-17日)
  • 系统架构设计师教程 第18章18.8 安全架构设计案例分析 笔记
  • Android修改第三方应用相机方向
  • paddleocr使用FastDeploy 部署工具部署 rknn 模型
  • 250MS/s 4通道16bit PCIE采集卡
  • 【YOLOv11改进[损失函数]】使用结合InnerIoU和Focaler的各种损失函数助力YOLOv11更优秀
  • Xshell远程连接工具详解
  • 什么是标准差?详解