日志分析(二)
日志分析(二)
要求分析日志统计出:
c.y.c.p.s.impl.UserService.apply:130-2172517-48,696 KB
可以直观看出这行日志打印了多少数据
LogParAnalyzer2
public class LogParAnalyzer2 {
//日志原始文件
private File log;
private List<Pattern> list;
private ExecutorService executorService;
//生成的分割文件
private String subPath = "D:\\split\\";
private List<File> files;
public LogParAnalyzer2(File log, List<String> patterns) {
this.log = log;
executorService = Executors.newFixedThreadPool(30);
list = new ArrayList<>();
try {
for (String pattern : patterns) {
Pattern p = Pattern.compile(pattern);
list.add(p);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void analyze() throws Exception {
// 使用 try-with-resources 自动关闭 BufferedReader
int chunkSize = 100000;
try (BufferedReader reader = new BufferedReader(new FileReader(log))) {
File file = new File(subPath);
if (!file.exists()) {
file.mkdirs();
}
String line;
List<CompletableFuture<?>> task = new ArrayList<>();
int cur = 0;
List<String> list = new ArrayList<>();
AtomicInteger batch = new AtomicInteger(0);
while ((line = reader.readLine()) != null) {
//sb 会通过Arrays.copy复制字节数组,内存频繁复制
list.add(line);
cur++;
if ((cur % chunkSize) == 0) {
//深拷贝
List<String> tt = list.stream().map(String::new).collect(Collectors.toList());
list.clear();
CompletableFuture f =
CompletableFuture.runAsync(() -> processChunk(tt, batch.get()), executorService);
task.add(f);
batch.incrementAndGet();
}
}
if (list.size()>0) {
CompletableFuture f =
CompletableFuture.runAsync(() -> processChunk(list, batch.get()), executorService);
task.add(f);
}
//等待所有任务结束
CompletableFuture.allOf(task.toArray(new CompletableFuture[0])).get();
System.out.println("task execute finished");
}
}
private void processChunk(List<String> lines, int batch) {
try {
System.out.println(Thread.currentThread().getName()+" execute "+ batch+".txt start");
Map<String, LogLine> map = new HashMap<>();
try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(subPath + batch + ".txt"))) {
lines.forEach(line -> {
for (Pattern pattern : list) {
Matcher matcher = pattern.matcher(line);
if (matcher.find()) {
String method = matcher.group(1);
String message = matcher.group(2);
LogLine ll = map.computeIfAbsent(method, k -> {
LogLine logLine = new LogLine();
logLine.setCnt(new AtomicInteger(0));
logLine.setSize(0);
return logLine;
});
ll.getCnt().incrementAndGet();
ll.setSize(ll.getSize()+message.length());
}
}
if (map.size() > 0) {
//每个文件只保存100前100条
writeBatchToFile(writer, map);
}
});
}
System.out.println(Thread.currentThread().getName()+" execute "+ batch+".txt end");
} catch (Exception e) {
e.printStackTrace();
}
}
private void writeBatchToFile(BufferedWriter writer, Map<String, LogLine> map) {
Map<String, LogLine> limit = limit(map, 100);
try {
for (Map.Entry<String, LogLine> entry : limit.entrySet()) {
LogLine value = entry.getValue();
writer.write(entry.getKey() + "=" + value.getCnt()+"="+value.getSize());
writer.newLine();
}
} catch (IOException e) {
e.printStackTrace();
}
//清除缓存
map.clear();
limit.clear();
}
public void mergeAndSort() throws Exception {
files = Files.list(Paths.get(subPath))
.map(Path::toFile)
.filter(f -> f.length() > 0)
.collect(Collectors.toList());
// 创建 ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool();
MergeFileTask2 mergeFileTask = new MergeFileTask2(files.toArray(new File[0]), forkJoinPool);
Path finalPath = mergeFileTask.invoke();
System.out.println("final path: " + finalPath.toAbsolutePath());
try (BufferedReader reader = Files.newBufferedReader(finalPath)) {
String line;
while ((line = reader.readLine()) != null) {
String[] split = line.split("=");
long l = Long.valueOf(split[2]) / 1024;
System.out.println(MessageFormat.format("{0}-{1}-{2} KB", split[0],split[1],l));
}
}
mergeFileTask.finished();
}
public void finished() throws IOException {
if (!CollectionUtils.isEmpty(files)){
files.stream().parallel().forEach(File::delete);
}
Files.deleteIfExists(Paths.get(subPath));
}
public Map<String, LogLine> limit(Map<String, LogLine> map, int limit) {
return map.entrySet().stream()
.sorted(Map.Entry.comparingByValue((o1, o2) -> o2.getCnt().get()-o1.getCnt().get()))
.limit(limit)
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue,
(oldValue, newValue) -> oldValue, // 解决键冲突
LinkedHashMap::new
));
}
}
MergeFileTask2
public class MergeFileTask2 extends RecursiveTask<Path> {
private File[] files;
private ForkJoinPool forkJoinPool;
private String tmp = "d:\\cc\\";
public MergeFileTask2(File[] files, ForkJoinPool forkJoinPool) {
this.files = files;
this.forkJoinPool = forkJoinPool;
File file = new File(tmp);
if (!file.exists()) {
file.mkdir();
}
}
@Override
protected Path compute() {
if (files.length <= 1 && files.length > 0) {
//只有一个文件时就返回路径就行
return files[0].toPath();
} else {
//如果大于两个文件就合并
int mid = files.length / 2;
MergeFileTask2 left = new MergeFileTask2(Arrays.copyOfRange(files, 0, mid), forkJoinPool);
MergeFileTask2 right = new MergeFileTask2(Arrays.copyOfRange(files, mid, files.length), forkJoinPool);
invokeAll(left, right);
Path leftResult = left.join();
Path rightResult = right.join();
//合并两个文件
return mergeTwoFiles(leftResult, rightResult);
}
}
private Path mergeTwoFiles(Path leftResult, Path rightResult) {
try {
Path tempFile = Files.createTempFile(Paths.get(tmp), "merged-", ".txt");
mergeToOne(leftResult, rightResult, tempFile);
return tempFile;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void mergeToOne(Path leftResult, Path rightResult, Path tempFile) throws Exception {
Map<String, LogLine> map = new ConcurrentHashMap<>();
try (BufferedReader leftReader = Files.newBufferedReader(leftResult)) {
mergeReader(map, leftReader);
}
try (BufferedReader rightReader = Files.newBufferedReader(rightResult)) {
mergeReader(map, rightReader);
}
//排序取前100条,写入临时文件
Map<String, LogLine> limit = limit(map, 100);
try (BufferedWriter writer = Files.newBufferedWriter(tempFile)) {
writeBatchToFile(writer, limit);
}
}
/**
* 写入临时文件
*
* @param writer
* @param map
* @throws Exception
*/
private void writeBatchToFile(BufferedWriter writer, Map<String, LogLine> map) throws Exception {
for (Map.Entry<String, LogLine> entry : map.entrySet()) {
writer.write(entry.getKey() + "=" + entry.getValue().getCnt().get() + "=" + entry.getValue().getSize());
writer.newLine();
}
}
/**
* 排序取前 limit
*
* @param map
* @param limit
* @return
*/
public Map<String, LogLine> limit(Map<String, LogLine> map, int limit) {
if (map.size() <= limit) {
return map;
}
// 排序并过滤结果
return map.entrySet().stream()
.sorted(Map.Entry.comparingByValue((o1, o2) -> o2.getCnt().get() - o1.getCnt().get()))
.limit(limit)
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue,
(oldValue, newValue) -> oldValue, // 解决键冲突
LinkedHashMap::new
));
}
private static void mergeReader(Map<String, LogLine> map, BufferedReader reader) throws IOException {
String line = null;
while ((line = reader.readLine()) != null) {
String[] split = line.split("=");
int val = Integer.parseInt(split[1]);
int size = Integer.parseInt(split[2]);
//合并重复的key
map.merge(split[0], new LogLine(new AtomicInteger(val), size), (a, b) -> {
a.getCnt().addAndGet(b.getCnt().get());
a.setSize(a.getSize() + b.getSize());
return a;
});
}
}
public void finished() {
for (File file : new File(tmp).listFiles()) {
file.delete();
}
new File(tmp).delete();
}
}
LogLine
@Data
@NoArgsConstructor
@AllArgsConstructor
public class LogLine {
//计数
private AtomicInteger cnt;
//日志大小
private long size;
}
测试
@Test
public void ccd() throws Exception {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
//(com.example.[\w*.*]*:\d*)
File log = new File("E:\\log.log");
//2023-09-26 11:10:00.123 INFO - none --- [main] com.example.service.UserService.create:42 - User service started successfully.
//配置出 com.example.service.UserService.create:42 和 - User service started successfully.
List<String> list = Arrays.asList("(com\\.abc\\.[\\w\\.\\*]*:\\d*) (.*)", "(c\\.y\\.c\\.[\\w\\.\\*]*:\\d*) (.*)");
LogParAnalyzer2 logAnalyzer = new LogParAnalyzer2(log, list);
logAnalyzer.analyze();
logAnalyzer.mergeAndSort();
logAnalyzer.finished();
stopWatch.stop();
System.out.println(stopWatch.prettyPrint());
//c.y.c.s.service.impl.UserService.apply:98 count: 6: 3 KB
}