羲和数据集收集器1.0
为了提升问答对的提取能力并完善GUI,我们从以下几个方面进行改进:
增强文本清理和解析能力:确保能够更准确地识别问答对。
支持更多文件格式:除了现有的 .txt, .docx, 和 .pdf,可以考虑支持其他常见格式如 .xlsx 等。
优化GUI设计:提供更友好的用户界面,包括进度条、日志显示等。
错误处理和日志记录:增强错误处理机制,提供详细的日志记录。
以下是改进后的代码:
import os
import json
import fitz # PyMuPDF
import docx
import re
import tkinter as tk
from tkinter import filedialog, messagebox, simpledialog
import logging
from threading import Thread
# 设置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def clean_text(text):
""" 清理文本,去除多余的空白字符和标点符号 """
text = re.sub(r'\s+', ' ', text) # 合并多个空白字符
text = re.sub(r'[^\w\s]', '', text) # 去除标点符号
return text.strip()
def extract_qa_pairs_from_txt(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
content = clean_text(content)
qa_pairs = extract_qa_pairs_from_content(content)
return qa_pairs
def extract_qa_pairs_from_docx(file_path):
doc = docx.Document(file_path)
content = []
for para in doc.paragraphs:
content.append(para.text)
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
content.append(cell.text)
for rel in doc.part.rels.values():
if "textBox" in rel.target_ref:
text_box = rel.target_part
for element in text_box.element.body:
if element.tag.endswith('p'):
content.append(element.text)
content = '\n'.join(content)
content = clean_text(content)
qa_pairs = extract_qa_pairs_from_content(content)
return qa_pairs
def extract_qa_pairs_from_pdf(file_path):
doc = fitz.open(file_path)
content = ''
for page_num in range(len(doc)):
page = doc.load_page(page_num)
content += page.get_text("text")
for annot in page.annots():
if annot.type[0] == 2: # 2 is the code for text annotations
content += annot.info["content"]
for block in page.get_text("dict")["blocks"]:
if "lines" in block:
for line in block["lines"]:
for span in line["spans"]:
content += span["text"]
content = clean_text(content)
qa_pairs = extract_qa_pairs_from_content(content)
return qa_pairs
def extract_qa_pairs_from_content(content):
qa_pairs = []
lines = content.split('\n')
current_question = None
current_answer = []
for line in lines:
# 检查是否为新的问答对或定义开始
if re.match(r'^\d+\.\s+', line) or re.match(r'^Q:\s+', line) or re.match(r'^问题:\s+', line) or re.match(r'^\w+:\s+', line):
if current_question and current_answer:
qa_pairs.append({'question': current_question, 'xihe_answers': [' '.join(current_answer)], 'ling_answers': [' '.join(current_answer)]})
current_question = re.sub(r'^\d+\.\s+', '', line)
current_question = re.sub(r'^(Q:|问题:|[\w\s]+:)\s+', '', current_question)
current_answer = []
elif re.match(r'^\d+\.\d+\s+', line) or re.match(r'^$$\d+$$\s+', line) or re.match(r'^注 \d+:', line): # 子章节或注释开始
if current_answer:
current_answer.append(line)
else:
current_answer.append(line)
if current_question and current_answer:
qa_pairs.append({'question': current_question, 'xihe_answers': [' '.join(current_answer)], 'ling_answers': [' '.join(current_answer)]})
return qa_pairs
def extract_qa_pairs_from_file(file_path):
if file_path.endswith('.txt'):
return extract_qa_pairs_from_txt(file_path)
elif file_path.endswith('.docx'):
return extract_qa_pairs_from_docx(file_path)
elif file_path.endswith('.pdf'):
return extract_qa_pairs_from_pdf(file_path)
else:
raise ValueError("Unsupported file type")
def write_qa_pairs_to_file(qa_pairs, output_file):
with open(output_file, 'w', encoding='utf-8') as f:
for pair in qa_pairs:
f.write(json.dumps(pair, ensure_ascii=False) + '\n')
def generate_output_filename(files):
base_names = [os.path.splitext(os.path.basename(file))[0] for file in files]
if len(base_names) == 1:
return f"{base_names[0]}_qa.txt"
else:
return "_".join(base_names) + "_qa.txt"
def process_files(files, output_file):
qa_pairs = []
for file in files:
try:
logging.info(f"Processing file: {file}")
qa_pairs.extend(extract_qa_pairs_from_file(file))
except Exception as e:
logging.error(f"Error processing {file}: {e}")
messagebox.showerror("错误", f"处理文件时出错: {file}\n{e}")
write_qa_pairs_to_file(qa_pairs, output_file)
num_lines = sum(1 for line in open(output_file, 'r', encoding='utf-8'))
messagebox.showinfo("完成", f"提取了问答对并保存到 {output_file}\n共 {num_lines} 行数据")
def select_files():
global files
files = filedialog.askopenfilenames(filetypes=[("Text files", "*.txt"), ("Word files", "*.docx"), ("PDF files", "*.pdf")])
if files:
file_list_label.config(text="\n".join(files))
def start_processing():
if not files:
messagebox.showerror("错误", "请选择输入文件")
return
output_file = generate_output_filename(files)
progress_bar.start(10)
log_text.insert(tk.END, "开始处理文件...\n")
thread = Thread(target=lambda: process_files(files, output_file))
thread.start()
def on_thread_finish():
progress_bar.stop()
log_text.insert(tk.END, "文件处理完成。\n")
# 创建主窗口
root = tk.Tk()
root.title("问答对提取工具")
# 文件选择
select_files_button = tk.Button(root, text="选择文件", command=select_files)
select_files_button.pack(pady=10)
file_list_label = tk.Label(root, text="")
file_list_label.pack()
# 日志显示
log_text = tk.Text(root, height=10, width=50)
log_text.pack(pady=10)
# 进度条
progress_bar = tk.ttk.Progressbar(root, orient="horizontal", length=300, mode="indeterminate")
progress_bar.pack(pady=10)
# 开始处理按钮
start_button = tk.Button(root, text="开始处理", command=start_processing)
start_button.pack(pady=20)
# 运行主循环
root.mainloop()
主要改进点:
多线程处理:使用 Thread 类来异步处理文件,避免GUI在处理大文件时卡顿。
进度条:添加了一个进度条,显示文件处理的进度。
日志显示:在GUI中添加了一个文本框,用于显示处理过程中的日志信息。
错误处理:增强了错误处理机制,确保在处理文件时出现错误时能够及时通知用户。
通过这些改进,工具的用户体验和功能都得到了显著提升。