java高性能处理10G大文件
背景:读取10G csv文件,然后 根据交易种类分分组,找出每个交易种类的交易最大值,最后给出最大的那个交易信息。
难点:最主要的是怎么快速读文件?
涉及的功能点:MappedByteBuffer 读文件方式、异步返回处理、treeset排序、线程池处理、分段切割排序
处理方式:
1,使用 MappedByteBuffer 读文件 。这里最主要是怎么提取csv中需要的列,怎么划分行,很麻烦(根据\r 字符识别换行,根据,逗号识别列)(漏洞:这里没有处理切割临界数据,太麻烦了)
2,多线程分块读取 (可以分块读取的前提是:可以指定文件内容下标读取)
3,把所有文件放到集合中
4,分组处理,异步 在每个分组中找出最大值(如果每个组的数据很多,那么且是单向比较,可以分段找出最大值)
5,最终比较每个分组的最大值。就是最终结果
MappedByteBuffer 可以先获取文件内容的总长度,然后根据机器线程处理核数,把文件分割成对应的核数个数。然后在各自的线程中做单线程处理。
这样就是最快的。重复利用了机器的处理能力。
线程的数量不能多,否则即使多了,只会徒增线程切换带来的消耗,而不能提高性能。
核心代码:
public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis();
if (args.length == 0) {
path = "D:\\RaceFile\\0002.csv";
// path = "D:\\RaceFile\\marketdata\\marketdata.csv";
} else {
path = args[0];
}
raf = new RandomAccessFile(new File(path), "r");
FileChannel channel = raf.getChannel();
//文件内容总长度
long fileSize = channel.size();
// 获取头文件,把文件头剔除掉
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, 3000);
int headIndex = readHead(buffer, fileSize);
//分割的每个文件的长度
long preSize = (long) Math.ceil((fileSize - headIndex) / threadNum);
ArrayList<FutureTask> threadList = new ArrayList(threadNum);
// 异步读取每个分区,通过分割个数*文件长度 就可以计算出每个分割块的读取起始下标。
for (int i = 0; i < threadNum; i++) {
int finalI = i;
//异步读文件处理
Callable callable1 = () -> {
return read(preSize * finalI, preSize);
};
FutureTask futureTask1 = new FutureTask(callable1);
threadList.add(futureTask1);
new Thread(futureTask1).start();
}
//使用callable获取每个文件的读取结果 汇总到一个集合中
List<HashMap<String, Trade>> trade2ListPre =new ArrayList(threadNum);
for (int i = 0; i < threadList.size(); i++) {
trade2ListPre.add((HashMap<String, Trade>) threadList.get(i).get());
}
HashMap<String, Trade> allData = new HashMap<>();
trade2ListPre.stream().forEach(m ->{
allData.putAll(m);
});
long end = System.currentTimeMillis();
// System.out.println(end - startTime);
// 遍历所有数据,分组处理
HashMap<String, TreeSet<Trade>> hashMap = new HashMap();
for (HashMap.Entry<String, Trade> entry : allData.entrySet()) {
if (entry == null) {
continue;
}
String key = entry.getKey().substring(0, entry.getKey().indexOf(MARK));
if (hashMap.containsKey(key) && hashMap.get(key) != null) {
hashMap.get(key).add(entry.getValue());
} else {
TreeSet<Trade> tradeTreeSet = new TreeSet();
tradeTreeSet.add(entry.getValue());
hashMap.put(key, tradeTreeSet);
}
}
//再次异步,在每个分组数据中,循环找出最大值
hashMap.keySet().stream().forEach(m -> {
threadPool2.execute(() -> {
TreeSet<Trade> treeSet = hashMap.get(m);
Trade maxTrade = treeSet.first();
Trade preTrade = treeSet.first();
for (Trade curTrade : treeSet) {
int volume = curTrade.volume - preTrade.volume;
preTrade = curTrade;
if (volume > maxTrade.volume) {
maxTrade = new Trade(curTrade.time, curTrade.updateTime, curTrade.updateMillisec,
curTrade.volume, curTrade.exchangeID, curTrade.instrumentID);
maxTrade.volume = volume;
}
}
if (result.volume < maxTrade.volume) {
result = maxTrade;
}
});
});
//线程池为空的时候 关闭线程池,也意味着数据处理完毕
threadPool2.shutdown();
while (!threadPool2.isTerminated()) {
}
end = System.currentTimeMillis();
// System.out.println(end - startTime);
//输出最终结果
System.out.println(result);
}
所有代码:
package test3;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.concurrent.*;
public class Solution1 {
static int threadNum = Runtime.getRuntime().availableProcessors();
static RandomAccessFile raf;
static String path;
static Trade result = new Trade();
static ThreadPoolExecutor threadPool2 =
new ThreadPoolExecutor(threadNum, threadNum, 10, TimeUnit.MILLISECONDS, new LinkedBlockingDeque());
public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis();
if (args.length == 0) {
path = "D:\\RaceFile\\0002.csv";
// path = "D:\\RaceFile\\marketdata\\marketdata.csv";
} else {
path = args[0];
}
raf = new RandomAccessFile(new File(path), "r");
FileChannel channel = raf.getChannel();
//文件内容总长度
long fileSize = channel.size();
// 获取头文件,把文件头剔除掉
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, 3000);
int headIndex = readHead(buffer, fileSize);
//分割的每个文件的长度
long preSize = (long) Math.ceil((fileSize - headIndex) / threadNum);
ArrayList<FutureTask> threadList = new ArrayList(threadNum);
// 异步读取每个分区,通过分割个数*文件长度 就可以计算出每个分割块的读取起始下标。
for (int i = 0; i < threadNum; i++) {
int finalI = i;
//异步读文件处理
Callable callable1 = () -> {
return read(preSize * finalI, preSize);
};
FutureTask futureTask1 = new FutureTask(callable1);
threadList.add(futureTask1);
new Thread(futureTask1).start();
}
//使用callable获取每个文件的读取结果 汇总到一个集合中
List<HashMap<String, Trade>> trade2ListPre =new ArrayList(threadNum);
for (int i = 0; i < threadList.size(); i++) {
trade2ListPre.add((HashMap<String, Trade>) threadList.get(i).get());
}
HashMap<String, Trade> allData = new HashMap<>();
trade2ListPre.stream().forEach(m ->{
allData.putAll(m);
});
long end = System.currentTimeMillis();
// System.out.println(end - startTime);
// 遍历所有数据,分组处理
HashMap<String, TreeSet<Trade>> hashMap = new HashMap();
for (HashMap.Entry<String, Trade> entry : allData.entrySet()) {
if (entry == null) {
continue;
}
String key = entry.getKey().substring(0, entry.getKey().indexOf(MARK));
if (hashMap.containsKey(key) && hashMap.get(key) != null) {
hashMap.get(key).add(entry.getValue());
} else {
TreeSet<Trade> tradeTreeSet = new TreeSet();
tradeTreeSet.add(entry.getValue());
hashMap.put(key, tradeTreeSet);
}
}
//再次异步,在每个分组数据中,循环找出最大值
hashMap.keySet().stream().forEach(m -> {
threadPool2.execute(() -> {
TreeSet<Trade> treeSet = hashMap.get(m);
Trade maxTrade = treeSet.first();
Trade preTrade = treeSet.first();
for (Trade curTrade : treeSet) {
int volume = curTrade.volume - preTrade.volume;
preTrade = curTrade;
if (volume > maxTrade.volume) {
maxTrade = new Trade(curTrade.time, curTrade.updateTime, curTrade.updateMillisec,
curTrade.volume, curTrade.exchangeID, curTrade.instrumentID);
maxTrade.volume = volume;
}
}
if (result.volume < maxTrade.volume) {
result = maxTrade;
}
});
});
//线程池为空的时候 关闭线程池,也意味着数据处理完毕
threadPool2.shutdown();
while (!threadPool2.isTerminated()) {
}
end = System.currentTimeMillis();
// System.out.println(end - startTime);
//输出最终结果
System.out.println(result);
}
/**
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, start, end-3);
StandardCharsets.UTF_8.decode(buffer).toString()
*/
public static HashMap<String, Trade> read(long position, long size) throws IOException {
FileChannel channel = raf.getChannel();
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, position, size);
return readLine(buffer, position, 0, size);
}
public static HashMap<String, Trade> readLine(MappedByteBuffer buffer, long position, int start, long end) {
HashMap<String, Trade> instruTimeGroupMap = new HashMap<>();
StringBuilder lineBuilder = new StringBuilder();
int countComma = 0;
List<String> dataItem = new ArrayList<>();
dataItem.add(String.valueOf(position));
for (int i = start + 1; i < end - 1; i++) {
byte b = buffer.get();
if (b == '\n') {
try {
if (dataItem.size() == 6) {
handleData(dataItem, instruTimeGroupMap,i);
}
} catch (Exception e) {
// e.printStackTrace();
} finally {
countComma = 0;
dataItem.clear();
dataItem.add(String.valueOf(position));
}
}
//获取需要的列
if(b == ','){
countComma++;
if(countComma==11 || countComma==20|| countComma==21|| countComma==22|| countComma==45){
dataItem.add(lineBuilder.toString());
}
lineBuilder.setLength(0);
}else{
lineBuilder.append((char) b);
}
}
return instruTimeGroupMap;
}
static final String MARK = "_$_";
public static void handleData(List<String> columns, HashMap<String, Trade> instruTimeGroupMap,int offset) {//0 19 20 21 22 23 24 25 45
String minute =columns.get(2).substring(0, 5);
StringBuilder sb = new StringBuilder();
sb.append(columns.get(4)).append(MARK).append(minute);
String key = sb.toString();
Trade trade =
new Trade(columns.get(2), columns.get(3), Integer.parseInt(columns.get(1)),
columns.get(5), columns.get(4), Long.parseLong(columns.get(0)),offset);
//同一个合约 保留position最大的
instruTimeGroupMap.put(key, trade);
}
public static int readHead(MappedByteBuffer buffer, long size) throws IOException {
for (int i = 0; i < size; i++) {
byte b = buffer.get(i);
if (b == '\n') {
return i;
}
}
return 0;
}
}
class Trade implements Comparable<Trade> {
long position;
int offset;
String time;
String updateTime;
String updateMillisec;
int volume;
String exchangeID;
String instrumentID;
public Trade(){}
public Trade(String updateTime, String updateMillisec, int volume, String exchangeID, String instrumentID) {
this.time = updateTime.substring(0, 5);
this.updateTime = updateTime;
this.updateMillisec = updateMillisec;
this.volume = volume;
this.exchangeID = exchangeID;
this.instrumentID = instrumentID;
}
public Trade(String updateTime, String updateMillisec, int volume, String exchangeID, String instrumentID,long position,int offset) {
this.time = updateTime.substring(0, 5);
this.updateTime = updateTime;
this.updateMillisec = updateMillisec;
this.volume = volume;
this.exchangeID = exchangeID;
this.instrumentID = instrumentID;
this.position=position;
this.offset=offset;
}
public Trade(String time, String updateTime, String updateMillisec, int volume, String exchangeID,
String instrumentID) {
this.time = time;
this.updateTime = updateTime;
this.updateMillisec = updateMillisec;
this.volume = volume;
this.exchangeID = exchangeID;
this.instrumentID = instrumentID;
}
@Override
public int compareTo(Trade o) {
if(this.position-o.position >0){
return 1;
}
if(this.position-o.position ==0 && this.offset>o.offset){
return 1;
}
return -1;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.time).append(",").append(this.exchangeID).append(",").append(this.volume).append(",").append(this.instrumentID);
return sb.toString();
}
}
class BadData implements Comparable<BadData>{
long position;
int size;
@Override
public int compareTo(BadData o) {
if(this.position>o.position){
return 1;
}
return -1;
}
}