pdf文件的读取,基于深度学习的方法
需要安装一些依赖解析 PDF 文件的详细指南_unstructured.partition.pdf-CSDN博客文章浏览阅读1.3k次,点赞13次,收藏9次。通过 unstructured.partition.pdf 函数,可以方便地解析 PDF 文件并提取其中的文本和表格内容。尽管在使用过程中可能会遇到一些错误,但通过正确的安装和配置依赖项,以及尝试其他 PDF 解析库,可以有效地解决这些问题。本文将介绍如何使用 unstructured.partition.pdf 函数来解析 PDF 文件,并提取其中的文本和表格内容。这个错误通常表示你的 PDF 文件在解压缩过程中出现了问题,可能是由于文件损坏、格式不兼容或不支持的压缩方法等原因。_unstructured.partition.pdfhttps://blog.csdn.net/qq_28704101/article/details/140464343
解析 PDF 文档的挑战在于准确提取整个页面的布局并将内容(包括表格、标题、段落和图像)转换为文档的文本表示形式。该过程涉及处理文本提取、图像识别中的不准确以及表格中行列关系的混乱。
挑战1:如何从表格和图像中提取数据
检测到的表数据可以直接导出为HTML,导出来的表格数据可以直接被识别
挑战2:如何重新排列检测到的块?特别是对于双列PDF
在确定布局后,unstructured[3]框架会将每个页面划分为几个矩形块,如图8所示:
主流解析 PDF 的方法有以下三种:
-
基于规则的方法:每个部分的风格和内容根据文档的组织特征确定。然而,这种方法的通用性不是很强,因为 PDF 的类型和布局多种多样,不可能用预定义的规则涵盖所有类型和布局。
-
基于深度学习模型的方法:例如当前流行的结合目标检测(yolox)和OCR模型的解决方案。
-
基于多模态大型模型传递复杂结构或提取 PDF 中的关键信息。
最具代表性的工具之一是pypdf,它是一种广泛使用的基于规则的解析器。它是LangChain和LlamaIndex中用于解析PDF文件的标准方法。使用起来非常简单
# 确保已安装PyPDF2模块
try:
import PyPDF2
except ImportError:
import sys
sys.exit("Please install the PyPDF2 module first, using: pip install PyPDF2")
def extract_text_from_pdf(filename, page_num):
try:
with open(filename, 'rb') as pdf_file:
reader = PyPDF2.PdfReader(pdf_file)
if page_num < len(reader.pages):
page = reader.pages[page_num]
text = page.extract_text()
if text:
return text
else:
return "No text found on this page."
else:
return f"Page number {page_num} is out of range. This document has {len(reader.pages)} pages."
except Exception as e:
return f"An error occurred: {str(e)}"
if __name__ == '__main__':
# 示例用法
filename = "/home/00_rag/fufan-chat-api/data/parse/data/1706.03762v7.pdf"
page_num = 5
text = extract_text_from_pdf(filename, page_num)
print('--------------------------------------------------')
print(f"Text from file '{filename}' on page {page_num}:")
print(text if text else "No text available on the selected page.")
print('--------------------------------------------------')
基于深度学习模型的方法
这种方法的优点是能够准确识别整个文档的布局,包括表格和段落。它甚至可以理解表内的结构。这意味着它可以将文档划分为定义明确、完整的信息单元,同时保留预期的含义和结构。
当前使用的是
目标检测模型
yolox 来进行读取,并且使用的是hi_res策略
if __name__ == "__main__":
processor = UnstructuredProcessor()
# 提取PDF中的表格数据
content, table_content = processor.extract_data(
'/home/00_rag/fufan-chat-api/data/parse/data/invoice_2.pdf',
'hi_res', #
'yolox', # https://github.com/Megvii-BaseDetection/YOLOX
['tables', 'unstructured'],
True,
True)
这个rich.progress 就是来美化输出打印进度条的
from rich.progress import Progress, SpinnerColumn, TextColumn
def invoke_pipeline_step(self, task_call, task_description, local):
"""
执行管道步骤,可以在本地或非本地环境中运行任务。
:param task_call: callable,一个无参数的函数或lambda表达式,它执行实际的任务。
:param task_description: str,任务的描述,用于进度条或打印输出。
:param local: bool,指示是否在本地环境中执行任务。如果为True,则使用进度条;如果为False,则仅打印任务描述。
方法的执行流程:
- 如果`local`为True,使用`Progress`上下文管理器来显示一个动态的进度条。
- `SpinnerColumn()`:在进度条中添加一个旋转的指示器。用来表示程序还在运行中
- `TextColumn("[progress.description]{task.description}")`:添加一个文本列来显示任务描述。
- `transient=False`:进度条显示完成后不会消失。
- 在进度条中添加一个任务,然后调用`task_call()`执行实际的任务,任务的返回结果保存在`ret`中。
- 如果`local`为False,则直接打印任务描述,不使用进度条,之后调用`task_call()`执行任务,任务的返回结果同样保存在`ret`中。
:return: 返回从`task_call()`获取的结果。
"""
if local:
# 本地执行时,显示带有进度指示的进度条
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
transient=False,
) as progress:
# 添加进度任务,总步长为None表示不确定的任务进度
progress.add_task(description=task_description, total=None)
# 调用task_call执行任务,并获取结果
ret = task_call()
else:
print(task_description)
ret = task_call()
return ret
def extract_data(self, file_path, strategy, model_name, options, local=True, debug=False):
"""
从指定的文件中提取数据。
:param file_path: str,文件的路径,指定要处理的文件。
:param strategy: 使用的策略来提取数据。
:param model_name: 使用的模型名称,这里使用 目标检测模型 yolox
:param options: dict,额外的选项或参数,用来干预数据提取的过程或结果。
:param local: bool,一文件处理是否应在本地执行,默认为True。
:param debug: bool,如果设置为True,则会显示更多的调试信息,帮助理解处理过程中发生了什么,默认为False。
函数的执行流程:
- 调用`invoke_pipeline_step`方法,这是一个高阶函数,它接受一个lambda函数和其他几个参数。
- lambda函数调用`process_file`方法,处理文件并根据指定的策略和模型名提取数据。
- `invoke_pipeline_step`方法除了执行传入的lambda函数,还可能处理本地执行逻辑,打印进程信息,并依据`local`参数决定执行环境。
- 最终,数据提取的结果将从`process_file`方法返回,并由`invoke_pipeline_step`方法输出。
"""
# # 调用数据提取流程,处理PDF文件并提取元素
elements = self.invoke_pipeline_step(
lambda: self.process_file(file_path, strategy, model_name),
"Extracting elements from the document...",
local
)
if debug:
new_extension = 'json' # You can change this to any extension you want
new_file_path = self.change_file_extension(file_path, new_extension)
content, table_content = self.invoke_pipeline_step(
lambda: self.load_text_data(elements, new_file_path, options),
"Loading text data...",
local
)
else:
with tempfile.TemporaryDirectory() as temp_dir:
temp_file_path = os.path.join(temp_dir, "file_data.json")
content, table_content = self.invoke_pipeline_step(
lambda: self.load_text_data(elements, temp_file_path, options),
"Loading text data...",
local
)
if debug:
print("Data extracted from the document:")
print(content)
print("\n")
print("Table content extracted from the document:")
if table_content:
print(len(table_content))
print(table_content)
print(f"这是content:{content}")
print(f"这是table_content:{table_content}")
return content, table_content
方法参数中的lambda表达式
处理文件的核心逻辑
# partition_pdf 官方文档:https://docs.unstructured.io/open-source/core-functionality/partitioning#partition-pdf
def process_file(self, file_path, strategy, model_name):
"""
处理文件并提取数据,支持PDF文件和图像文件。
:param file_path: str,文件的路径,指定要处理的文件。
:param strategy: 使用的策略来提取数据,影响数据处理的方法和结果。
:param model_name: 使用的模型名称,这里使用yolox
方法的执行流程:
- 初始化`elements`变量为None,用来存储提取的元素。
- 检查文件路径的后缀,根据文件类型调用相应的处理函数:
- 如果文件是PDF(.pdf),使用`partition_pdf`函数处理:
- `filename`:提供文件路径。
- `strategy`:指定数据提取策略。
- `infer_table_structure`:是否推断表格结构,这里设为True。
- `hi_res_model_name`:提供高分辨率模型名称。
- `languages`:设置处理的语言为英语。
- 如果文件是图像(.jpg, .jpeg, .png),使用`partition_image`函数处理,参数类似于处理PDF的参数。
- 返回提取的元素`elements`。
:return: 返回从文件中提取的元素。
"""
# 初始化元素变量
elements = None
# 根据文件后缀决定处理方式
# partition_pdf 官方文档:https://docs.unstructured.io/open-source/core-functionality/partitioning#partition-pdf
# hi_res 策略配合 infer_table_structure=True 的表格识别效果较好
if file_path.lower().endswith('.pdf'):
elements = partition_pdf(
filename=file_path,
# strategy kwarg 控制用于处理 PDF 的方法。 PDF 的可用策略有 "auto" 、 "hi_res" 、 "ocr_only" 和 "fast"
strategy=strategy,
# 是否提取表格的格式化信息
infer_table_structure=True,
# 使用的模型名称
hi_res_model_name=model_name,
# chi_sim表示使用哪种语言来进行解析
languages=['chi_sim']
)
elif file_path.lower().endswith(('.jpg', '.jpeg', '.png')):
# 处理图像文件
elements = partition_image(
filename=file_path,
strategy=strategy,
infer_table_structure=True,
hi_res_model_name=model_name,
languages=['chi_sim']
)
return elements
这个是更改文件拓展名,生成一个新的文件路径
def change_file_extension(self, file_path, new_extension, suffix=None):
# Check if the new extension starts with a dot and add one if not
if not new_extension.startswith('.'):
new_extension = '.' + new_extension
# Split the file path into two parts: the base (everything before the last dot) and the extension
# If there's no dot in the filename, it'll just return the original filename without an extension
base = file_path.rsplit('.', 1)[0]
# Concatenate the base with the new extension
if suffix is None:
new_file_path = base + new_extension
else:
new_file_path = base + "_" + suffix + new_extension
return new_file_path
加载里面的数据
def load_text_data(self, elements, file_path, options):
# 手动将元素保存到 JSON 文件中,确保使用 ensure_ascii=False
with open(file_path, 'w', encoding='utf-8') as file:
json.dump([e.to_dict() for e in elements], file, ensure_ascii=False)
content, table_content = None, None
if options is None:
content = self.process_json_file(file_path)
# 判断是加载全部信息,还是仅仅表格的信息
if options and "tables" in options and "unstructured" in options:
content = self.process_json_file(file_path, "form")
table_content = self.process_json_file(file_path, "table")
return content, table_content
def process_json_file(self, file_path, option=None):
# Read the JSON file
with open(file_path, 'r', encoding='utf-8') as file:
data = json.load(file)
# Iterate over the JSON data and extract required elements
extracted_elements = []
for entry in data:
if entry["type"] == "Table" and (option is None or option == "table" or option == "form"):
table_data = entry["metadata"]["text_as_html"]
if option == "table" and self.table_has_header(table_data):
extracted_elements.append(table_data)
if option is None or option == "form":
extracted_elements.append(table_data)
elif entry["type"] == "Title" and (option is None or option == "form"):
extracted_elements.append(entry["text"])
# 叙述文本
elif entry["type"] == "NarrativeText" and (option is None or option == "form"):
extracted_elements.append(entry["text"])
# 未分类
elif entry["type"] == "UncategorizedText" and (option is None or option == "form"):
extracted_elements.append(entry["text"])
elif entry["type"] == "ListItem" and (option is None or option == "form"):
extracted_elements.append(entry["text"])
elif entry["type"] == "Image" and (option is None or option == "form"):
extracted_elements.append(entry["text"])
if option is None or option == "form":
# Convert list to single string with two new lines between each element
extracted_data = "\n\n".join(extracted_elements)
return extracted_data
return extracted_elements