当前位置: 首页 > article >正文

JSON合并工具

JSON合并工具

1. 项目概述

本项目旨在开发一个强大而灵活的JSON合并工具,能够合并多个JSON文件,处理复杂的嵌套结构,提供详细的合并报告,并实现全面的验证和错误处理机制。

2. 功能需求

2.1 基本合并功能

  • 支持合并两个或多个JSON文件
  • 处理嵌套的JSON结构
  • 提供不同的合并策略选项(如覆盖、保留原值)

2.2 验证和错误处理

  • JSON结构验证
  • 数据类型一致性检查
  • 文件大小限制检查
  • 键名验证
  • 详细的错误报告
  • 警告系统
  • 错误恢复机制
  • 操作日志记录

2.3 合并报告生成

  • 生成详细的合并过程报告
  • 包含基本信息、合并统计、详细操作日志、警告和错误摘要、性能指标

2.4 多文件合并

  • 支持任意数量的输入JSON文件
  • 按指定顺序依次合并文件
  • 为每个输入文件生成单独的统计信息

3. 技术设计

3.1 合并算法

使用递归方法处理嵌套的JSON结构:

  1. 遍历第二个JSON对象的所有键值对
  2. 如果键在第一个对象中不存在,直接添加
  3. 如果键存在且值都是字典,递归合并
  4. 如果键存在且值都是列表,合并列表
  5. 如果键存在但值类型不同,根据策略处理(覆盖或保留)
  6. 如果键存在且值类型相同,根据策略更新

3.2 验证机制

  1. JSON结构验证:使用json.loads()验证JSON格式
  2. 深度检查:递归检查JSON嵌套深度,设置最大深度限制
  3. 大小检查:在读取文件前检查文件大小
  4. 类型一致性:在合并过程中检查相同键的值类型

3.3 错误处理

  1. 使用try-except块捕获并处理异常
  2. 实现自定义异常类处理特定错误
  3. 使用logging模块记录警告和错误
  4. 对于非致命错误,提供继续处理的选项

3.4 报告生成

使用MergeReport类管理报告生成:

  1. 在合并过程中记录每个操作
  2. 统计新增、更新和冲突的键数量
  3. 记录警告和错误
  4. 生成性能指标(处理时间、内存使用)
  5. 格式化输出详细的报告

3.5 多文件处理

  1. 使用列表存储多个输入文件路径
  2. 逐个处理文件,将结果合并到一个主JSON对象中
  3. 在报告中分别记录每个文件的处理情况

3.6 命令行接口

使用argparse模块处理命令行参数:

  1. 输入文件路径(支持多个)
  2. 输出文件路径
  3. 合并策略选项
  4. 报告输出路径选项

4. 实现细节

4.1 主要类和函数

  1. MergeReport 类:管理报告生成
  2. merge_json() 函数:实现JSON合并逻辑
  3. merge_json_files() 函数:处理文件I/O和调用合并函数
  4. main() 函数:处理命令行参数和orchestrate整个过程

4.2 数据结构

  • 使用Python的字典表示JSON对象
  • 使用列表存储多个输入文件路径

4.3 外部依赖

  • json:用于JSON解析和序列化
  • argparse:用于命令行参数处理
  • logging:用于日志记录
  • psutil:用于获取内存使用情况(可选)

5. 使用示例

python merge_json.py file1.json file2.json file3.json output.json --strategy overwrite --report merge_report.txt

6. 未来扩展

  1. 性能优化:实现流式处理或分块处理大文件
  2. 并行处理:使用多线程或多进程加速处理
  3. 配置文件:支持通过配置文件指定复杂的合并规则
  4. 可视化:生成合并过程的可视化表示
  5. GUI界面:开发图形用户界面,提高易用性

7. 结论

这个JSON合并工具提供了强大的功能,包括多文件合并、详细的报告生成、全面的验证和错误处理。考虑了灵活性和可扩展性,能够满足各种复杂的JSON合并需求。持续的优化和功能扩展,这个工具可以成为处理JSON数据的有力助手。

8.代码

import json
import sys
import os
import logging
import time
import argparse
from typing import Dict, Any, List

class MergeReport:
    def __init__(self):
        self.start_time = time.time()
        self.total_keys = 0
        self.new_keys = 0
        self.updated_keys = 0
        self.conflict_keys = 0
        self.warnings = []
        self.errors = []
        self.detailed_log = []
        self.file_stats = {}

    def add_operation(self, file: str, key: str, operation: str, details: str = ""):
        self.detailed_log.append(f"{file} - {key}: {operation} - {details}")
        self.total_keys += 1
        if operation == "新增":
            self.new_keys += 1
        elif operation == "更新":
            self.updated_keys += 1
        elif operation == "冲突":
            self.conflict_keys += 1
        
        if file not in self.file_stats:
            self.file_stats[file] = {"新增": 0, "更新": 0, "冲突": 0}
        self.file_stats[file][operation] += 1

    def add_warning(self, message: str):
        self.warnings.append(message)

    def add_error(self, message: str):
        self.errors.append(message)

    def generate_report(self, input_files: List[str], output_file: str, strategy: str) -> str:
        end_time = time.time()
        process_time = end_time - self.start_time

        report = f"""
合并报告
========

基本信息:
- 合并时间: {time.strftime('%Y-%m-%d %H:%M:%S')}
- 输入文件:
{chr(10).join(['  - ' + file for file in input_files])}
- 输出文件: {output_file}
- 合并策略: {strategy}

合并统计:
- 总处理键数: {self.total_keys}
- 新增键数: {self.new_keys}
- 更新键数: {self.updated_keys}
- 冲突键数: {self.conflict_keys}

文件统计:
"""
        for file, stats in self.file_stats.items():
            report += f"- {file}:\n"
            report += f"  新增: {stats['新增']}, 更新: {stats['更新']}, 冲突: {stats['冲突']}\n"

        report += f"""
详细操作日志:
{chr(10).join(self.detailed_log)}

警告:
{chr(10).join(self.warnings) if self.warnings else "无"}

错误:
{chr(10).join(self.errors) if self.errors else "无"}

性能指标:
- 处理时间: {process_time:.2f} 秒
- 峰值内存使用: {self.get_peak_memory_usage()} MB
        """
        return report

    def get_peak_memory_usage(self):
        import psutil
        process = psutil.Process(os.getpid())
        return process.memory_info().peak_wset / 1024 / 1024  # 转换为MB

def merge_json(data1: Dict[str, Any], data2: Dict[str, Any], strategy: str, report: MergeReport, file_name: str) -> Dict[str, Any]:
    result = data1.copy()
    for key, value in data2.items():
        if key in result:
            if isinstance(result[key], dict) and isinstance(value, dict):
                result[key] = merge_json(result[key], value, strategy, report, file_name)
                report.add_operation(file_name, key, "更新", "合并嵌套字典")
            elif isinstance(result[key], list) and isinstance(value, list):
                result[key] = result[key] + value
                report.add_operation(file_name, key, "更新", "合并列表")
            elif type(result[key]) != type(value):
                report.add_warning(f"类型不匹配: 文件 {file_name} 中的键 '{key}' 与现有数据类型不同")
                if strategy == 'overwrite':
                    result[key] = value
                    report.add_operation(file_name, key, "冲突", f"类型不匹配,使用新文件的值")
                else:
                    report.add_operation(file_name, key, "冲突", f"类型不匹配,保留原值")
            elif strategy == 'overwrite':
                result[key] = value
                report.add_operation(file_name, key, "更新", "覆盖现有值")
            else:
                report.add_operation(file_name, key, "保留", "保留原有值")
        else:
            result[key] = value
            report.add_operation(file_name, key, "新增", "添加新键")
    return result

def merge_json_files(input_files: List[str], output_file: str, strategy: str = 'overwrite') -> str:
    report = MergeReport()
    merged_data = {}
    try:
        for file in input_files:
            with open(file, 'r', encoding='utf-8') as f:
                data = json.load(f)
                merged_data = merge_json(merged_data, data, strategy, report, file)

        with open(output_file, 'w', encoding='utf-8') as out_file:
            json.dump(merged_data, out_file, ensure_ascii=False, indent=4)

        return report.generate_report(input_files, output_file, strategy)
    except Exception as e:
        report.add_error(f"合并过程中发生错误: {str(e)}")
        return report.generate_report(input_files, output_file, strategy)

def main():
    parser = argparse.ArgumentParser(description="合并多个JSON文件并生成报告")
    parser.add_argument('input_files', nargs='+', type=str, help="输入JSON文件的路径列表")
    parser.add_argument('output', type=str, help="输出JSON文件的路径")
    parser.add_argument('--strategy', type=str, choices=['overwrite', 'keep'], default='overwrite',
                        help="合并策略: 'overwrite' 覆盖重复键, 'keep' 保留原始值 (默认: overwrite)")
    parser.add_argument('--report', type=str, help="合并报告输出路径")

    args = parser.parse_args()

    try:
        report = merge_json_files(args.input_files, args.output, args.strategy)
        if args.report:
            with open(args.report, 'w', encoding='utf-8') as report_file:
                report_file.write(report)
            print(f"合并报告已保存到: {args.report}")
        else:
            print(report)
    except Exception as e:
        print(f"错误: {str(e)}")
        sys.exit(1)

if __name__ == "__main__":
    main()

http://www.kler.cn/a/316799.html

相关文章:

  • 前端请求后端php接口跨域 cors问题
  • 微擎框架php7.4使用phpexcel导出数据报错修复
  • Jmeter基础篇(22)服务器性能监测工具Nmon的使用
  • 车载空气净化器语音芯片方案
  • 基于非时空的离身与反身智能
  • request爬虫库的小坑
  • JVM-类加载器的双亲委派模型详解
  • 前后端数据交互 笔记03(get和post方法)
  • 使用 Azure Functions 开发 Serverless 应用:详解与实战
  • LeetCode 1014. 最佳观光组合 一次遍历数组,时间复杂度O(n)
  • 【matlab】将程序打包为exe文件(matlab r2023a为例)
  • Linux文件IO(三)-Linux系统如何管理文件
  • 【基础知识】网络套接字编程
  • QT-MOC元对象系统详解
  • 【小程序】微信小程序课程 -1 安装与配置
  • 【2025】基于微信小程序的人工智能课程学习平台的设计与实现(源码+文档+解答)
  • 职业技能大赛-自动化测试笔记分享
  • while语句
  • CANdela/Diva系列8--如何生成0x27服务解锁的DLL
  • MySQL 数据库课程设计详解与操作示例
  • Java : 图书管理系统
  • ArcGIS Pro SDK (十四)地图探索 6 图形与工具
  • AIGC7: 高通骁龙AIPC开发者沙龙过程记录A
  • 力扣刷题之2398.预算内的最多机器人数目
  • Shelly实测天工的音乐创作功能,写了一首歌,来听听效果
  • 学习笔记JVM篇(四)