JSON合并工具
JSON合并工具
1. 项目概述
本项目旨在开发一个强大而灵活的JSON合并工具,能够合并多个JSON文件,处理复杂的嵌套结构,提供详细的合并报告,并实现全面的验证和错误处理机制。
2. 功能需求
2.1 基本合并功能
- 支持合并两个或多个JSON文件
- 处理嵌套的JSON结构
- 提供不同的合并策略选项(如覆盖、保留原值)
2.2 验证和错误处理
- JSON结构验证
- 数据类型一致性检查
- 文件大小限制检查
- 键名验证
- 详细的错误报告
- 警告系统
- 错误恢复机制
- 操作日志记录
2.3 合并报告生成
- 生成详细的合并过程报告
- 包含基本信息、合并统计、详细操作日志、警告和错误摘要、性能指标
2.4 多文件合并
- 支持任意数量的输入JSON文件
- 按指定顺序依次合并文件
- 为每个输入文件生成单独的统计信息
3. 技术设计
3.1 合并算法
使用递归方法处理嵌套的JSON结构:
- 遍历第二个JSON对象的所有键值对
- 如果键在第一个对象中不存在,直接添加
- 如果键存在且值都是字典,递归合并
- 如果键存在且值都是列表,合并列表
- 如果键存在但值类型不同,根据策略处理(覆盖或保留)
- 如果键存在且值类型相同,根据策略更新
3.2 验证机制
- JSON结构验证:使用json.loads()验证JSON格式
- 深度检查:递归检查JSON嵌套深度,设置最大深度限制
- 大小检查:在读取文件前检查文件大小
- 类型一致性:在合并过程中检查相同键的值类型
3.3 错误处理
- 使用try-except块捕获并处理异常
- 实现自定义异常类处理特定错误
- 使用logging模块记录警告和错误
- 对于非致命错误,提供继续处理的选项
3.4 报告生成
使用MergeReport类管理报告生成:
- 在合并过程中记录每个操作
- 统计新增、更新和冲突的键数量
- 记录警告和错误
- 生成性能指标(处理时间、内存使用)
- 格式化输出详细的报告
3.5 多文件处理
- 使用列表存储多个输入文件路径
- 逐个处理文件,将结果合并到一个主JSON对象中
- 在报告中分别记录每个文件的处理情况
3.6 命令行接口
使用argparse模块处理命令行参数:
- 输入文件路径(支持多个)
- 输出文件路径
- 合并策略选项
- 报告输出路径选项
4. 实现细节
4.1 主要类和函数
MergeReport
类:管理报告生成merge_json()
函数:实现JSON合并逻辑merge_json_files()
函数:处理文件I/O和调用合并函数main()
函数:处理命令行参数和orchestrate整个过程
4.2 数据结构
- 使用Python的字典表示JSON对象
- 使用列表存储多个输入文件路径
4.3 外部依赖
- json:用于JSON解析和序列化
- argparse:用于命令行参数处理
- logging:用于日志记录
- psutil:用于获取内存使用情况(可选)
5. 使用示例
python merge_json.py file1.json file2.json file3.json output.json --strategy overwrite --report merge_report.txt
6. 未来扩展
- 性能优化:实现流式处理或分块处理大文件
- 并行处理:使用多线程或多进程加速处理
- 配置文件:支持通过配置文件指定复杂的合并规则
- 可视化:生成合并过程的可视化表示
- GUI界面:开发图形用户界面,提高易用性
7. 结论
这个JSON合并工具提供了强大的功能,包括多文件合并、详细的报告生成、全面的验证和错误处理。考虑了灵活性和可扩展性,能够满足各种复杂的JSON合并需求。持续的优化和功能扩展,这个工具可以成为处理JSON数据的有力助手。
8.代码
import json
import sys
import os
import logging
import time
import argparse
from typing import Dict, Any, List
class MergeReport:
def __init__(self):
self.start_time = time.time()
self.total_keys = 0
self.new_keys = 0
self.updated_keys = 0
self.conflict_keys = 0
self.warnings = []
self.errors = []
self.detailed_log = []
self.file_stats = {}
def add_operation(self, file: str, key: str, operation: str, details: str = ""):
self.detailed_log.append(f"{file} - {key}: {operation} - {details}")
self.total_keys += 1
if operation == "新增":
self.new_keys += 1
elif operation == "更新":
self.updated_keys += 1
elif operation == "冲突":
self.conflict_keys += 1
if file not in self.file_stats:
self.file_stats[file] = {"新增": 0, "更新": 0, "冲突": 0}
self.file_stats[file][operation] += 1
def add_warning(self, message: str):
self.warnings.append(message)
def add_error(self, message: str):
self.errors.append(message)
def generate_report(self, input_files: List[str], output_file: str, strategy: str) -> str:
end_time = time.time()
process_time = end_time - self.start_time
report = f"""
合并报告
========
基本信息:
- 合并时间: {time.strftime('%Y-%m-%d %H:%M:%S')}
- 输入文件:
{chr(10).join([' - ' + file for file in input_files])}
- 输出文件: {output_file}
- 合并策略: {strategy}
合并统计:
- 总处理键数: {self.total_keys}
- 新增键数: {self.new_keys}
- 更新键数: {self.updated_keys}
- 冲突键数: {self.conflict_keys}
文件统计:
"""
for file, stats in self.file_stats.items():
report += f"- {file}:\n"
report += f" 新增: {stats['新增']}, 更新: {stats['更新']}, 冲突: {stats['冲突']}\n"
report += f"""
详细操作日志:
{chr(10).join(self.detailed_log)}
警告:
{chr(10).join(self.warnings) if self.warnings else "无"}
错误:
{chr(10).join(self.errors) if self.errors else "无"}
性能指标:
- 处理时间: {process_time:.2f} 秒
- 峰值内存使用: {self.get_peak_memory_usage()} MB
"""
return report
def get_peak_memory_usage(self):
import psutil
process = psutil.Process(os.getpid())
return process.memory_info().peak_wset / 1024 / 1024 # 转换为MB
def merge_json(data1: Dict[str, Any], data2: Dict[str, Any], strategy: str, report: MergeReport, file_name: str) -> Dict[str, Any]:
result = data1.copy()
for key, value in data2.items():
if key in result:
if isinstance(result[key], dict) and isinstance(value, dict):
result[key] = merge_json(result[key], value, strategy, report, file_name)
report.add_operation(file_name, key, "更新", "合并嵌套字典")
elif isinstance(result[key], list) and isinstance(value, list):
result[key] = result[key] + value
report.add_operation(file_name, key, "更新", "合并列表")
elif type(result[key]) != type(value):
report.add_warning(f"类型不匹配: 文件 {file_name} 中的键 '{key}' 与现有数据类型不同")
if strategy == 'overwrite':
result[key] = value
report.add_operation(file_name, key, "冲突", f"类型不匹配,使用新文件的值")
else:
report.add_operation(file_name, key, "冲突", f"类型不匹配,保留原值")
elif strategy == 'overwrite':
result[key] = value
report.add_operation(file_name, key, "更新", "覆盖现有值")
else:
report.add_operation(file_name, key, "保留", "保留原有值")
else:
result[key] = value
report.add_operation(file_name, key, "新增", "添加新键")
return result
def merge_json_files(input_files: List[str], output_file: str, strategy: str = 'overwrite') -> str:
report = MergeReport()
merged_data = {}
try:
for file in input_files:
with open(file, 'r', encoding='utf-8') as f:
data = json.load(f)
merged_data = merge_json(merged_data, data, strategy, report, file)
with open(output_file, 'w', encoding='utf-8') as out_file:
json.dump(merged_data, out_file, ensure_ascii=False, indent=4)
return report.generate_report(input_files, output_file, strategy)
except Exception as e:
report.add_error(f"合并过程中发生错误: {str(e)}")
return report.generate_report(input_files, output_file, strategy)
def main():
parser = argparse.ArgumentParser(description="合并多个JSON文件并生成报告")
parser.add_argument('input_files', nargs='+', type=str, help="输入JSON文件的路径列表")
parser.add_argument('output', type=str, help="输出JSON文件的路径")
parser.add_argument('--strategy', type=str, choices=['overwrite', 'keep'], default='overwrite',
help="合并策略: 'overwrite' 覆盖重复键, 'keep' 保留原始值 (默认: overwrite)")
parser.add_argument('--report', type=str, help="合并报告输出路径")
args = parser.parse_args()
try:
report = merge_json_files(args.input_files, args.output, args.strategy)
if args.report:
with open(args.report, 'w', encoding='utf-8') as report_file:
report_file.write(report)
print(f"合并报告已保存到: {args.report}")
else:
print(report)
except Exception as e:
print(f"错误: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()