当前位置: 首页 > article >正文

RAG项目实战:金融问答系统

需求痛点

私有知识很多,如何让大模型只选择跟问题有关的知识进行参考呢?

需求分析

是否可以使用关键词匹配呢?以前的搜索主要使用关键词匹配,这个要求太高了,需要提前抽取准备好关键词,有点像以前SEO的工作。现在是国际化社会,大模型对语言应是无感知理解的,也就是说用户使用英文或中文提问时,它的语义理解应是一样的,要脱离语言本身,看内涵,跨语言、跨表达的习惯,直击这句话的本质,所以我们使用语义化匹配。

实现思路

将私有资料,按逻辑、互相独立的单元来分段;

使用模型将每段知识的核心信息抽取出来,变成向量;

将这些知识片段及其向量存到向量数据库,如chromadb向量数据库;

将用户提的问题转化为向量,去向量数据库中进行相似度匹配,寻找答案。

设置Prompt提示词,让结果更符合预期。

工作流程:

用户提一个问题,把问题通过向量化模型变成一个向量,再使用这个向量到向量库中,查找最相似的上下文,接着把问题和检索到的上下文一起交给模型,让模型回答问题。

这里可能会有一个问题,比如你已经在库里检索到答案了,为何还要让大模型生成答案呢?因为检索出来的结果,有些高度相关,有些低相关,而且比较长,不一定符合预期的格式,需要自己加工。大模型帮我们快速筛选和判断,然后通过prompt设置的格式来综合整理,生成最终的结果。

项目数据

数据集:招股说明书,80个PDF文件

数据集下载地址:https://www.modelscope.cn/datasets/BJQW14B/bs_challenge_financial_14b_dataset/summary

实现效果示例:

用户提问:景顺长城中短债债券C基金在20210331的季报里,前三大持仓占比的债券名称是什么?

回答:景顺长城中短债债券C在20210331的季报中,前三大持仓占比的债券名称分别是21国开01、20农发清发01、20国信03。

提问:上海华铭智能终端设备股份有限公司的首发战略配售结果如何?

回答:上海华铭智能终端设备股份有限公司的首发战略配售具体情况并未在招股意向书中详细说明。

项目实现

1. 规划工程文件

金融问答系统项目涉及前端、后端、数据集和模型等,本文只探讨RAG部分,暂不做前端UI交互。

项目目录规划:

SmartFinance \
    |- app \ # 该目录用于服务端代码
        |- conf \
            |- .qwen # 配置API KEY
        |- dataset \ # 该目录存放数据集
            |-pdf \ # 该目录存放PDF文件
        |- rag \ # 该目录用于保存 rag 相关代码
            |- chroma_conn.py  # 管理ChromaDB数据库,如添加文档和查询等操作
            |- pdf_processor.py  # 实现PDF文件加载、文本切分和向量存储等操作
            |- rag.py # 实现检索功能
        |- test_framework.py # 测试功能
        |- utils \ # 该目录用于存放工具配置类文件
            |- util.py # 加载连接大模型

2.  实现基本连接大模型的 util 库

代码文件及目录:app/utils/util.py

# 导入必要的库
from dotenv import load_dotenv  # 用于加载环境变量
import os  # 用于操作文件路径

# 获取当前文件所在的目录
current_dir = os.path.dirname(__file__)

# 构建到 conf/.qwen 的相对路径
# 这里通过 os.path.join 拼接路径,确保跨平台兼容性
conf_file_path_qwen = os.path.join(current_dir, '..', 'conf', '.qwen')

# 加载 .qwen 文件中的环境变量
# dotenv_path 参数指定了环境变量文件的路径
load_dotenv(dotenv_path=conf_file_path_qwen)

def get_qwen_models():
    """
    加载通义千问系列的大模型,包括 LLM、Chat 和 Embedding 模型。
    """
    # 加载 LLM 大模型(语言生成模型)
    # 使用 Tongyi 类来实例化一个通义千问模型
    from langchain_community.llms.tongyi import Tongyi
    llm = Tongyi(
        model="qwen-max",  # 指定模型类型为 qwen-max,适合复杂任务
        temperature=0.1,   # 控制输出的随机性,值越低越保守
        top_p=0.7,         # 核采样参数,控制生成文本的多样性
        max_tokens=1024    # 最大生成的 token 数量
    )

    # 加载 Chat 大模型(对话模型)
    # 使用 ChatTongyi 类来实例化一个通义千问对话模型
    from langchain_community.chat_models import ChatTongyi
    chat = ChatTongyi(
        model="qwen-max",  # 指定模型类型为 qwen-max,适合高质量对话
        temperature=0.01,  # 温度更低以获得更确定的回复
        top_p=0.2,         # 控制对话生成的多样性
        max_tokens=1024    # 最大生成的 token 数量
    )

    # 加载 Embedding 大模型(嵌入模型)
    # 使用 DashScopeEmbeddings 类来实例化一个通义千问嵌入模型
    from langchain_community.embeddings import DashScopeEmbeddings
    embed = DashScopeEmbeddings(
        model="text-embedding-v3"  # 指定嵌入模型版本
    )

    # 返回加载的三个模型
    return llm, chat, embed

app/conf/.qwen中添加对应的API KEY

DASHSCOPE_API_KEY = sk-xxxxxx

3. 实现向量库的基础功能

文本使用ChromaDB实现,用于基本的向量库连接、数据入库操作

代码文件及目录:app/rag/chroma_conn.py

import chromadb  # 导入 ChromaDB 的核心库
from chromadb import Settings  # 导入 ChromaDB 的配置类
from langchain_community.vectorstores.chroma import Chroma  # 导入 LangChain 的 Chroma 向量存储模块

class ChromaDB:
    """
    自定义的 ChromaDB 类,用于管理 Chroma 向量数据库。
    支持本地模式和 HTTP 模式连接 ChromaDB。
    """

    def __init__(self,
                 chroma_server_type="local",  # 服务器类型:http 表示通过 HTTP 连接,local 表示本地文件方式
                 host="localhost", port=8000,  # 如果是 HTTP 模式,需要指定服务器地址和端口
                 persist_path="chroma_db",  # 数据库路径:如果是本地模式,需要指定数据库的存储路径
                 collection_name="langchain",  # 数据库中的集合名称
                 embed=None  # 数据库使用的向量化函数
                 ):
        """
        初始化 ChromaDB 对象。

        :param chroma_server_type: 连接类型("local" 或 "http")
        :param host: HTTP 服务器地址(仅在 chroma_server_type="http" 时有效)
        :param port: HTTP 服务器端口(仅在 chroma_server_type="http" 时有效)
        :param persist_path: 数据库存储路径(仅在 chroma_server_type="local" 时有效)
        :param collection_name: 数据库集合名称
        :param embed: 向量化函数
        """
        self.host = host  # HTTP 服务器地址
        self.port = port  # HTTP 服务器端口
        self.path = persist_path  # 数据库存储路径
        self.embed = embed  # 向量化函数
        self.store = None  # 初始化时未创建向量存储对象

        # 如果是 HTTP 协议方式连接数据库
        if chroma_server_type == "http":
            # 创建一个 HTTP 客户端
            client = chromadb.HttpClient(host=host, port=port)
            # 初始化 Chroma 向量存储对象
            self.store = Chroma(collection_name=collection_name,
                                embedding_function=embed,
                                client=client)

        # 如果是本地模式连接数据库
        elif chroma_server_type == "local":
            # 初始化 Chroma 向量存储对象
            self.store = Chroma(collection_name=collection_name,
                                embedding_function=embed,
                                persist_directory=persist_path)

        # 如果初始化失败,抛出异常
        if self.store is None:
            raise ValueError("Chroma store initialization failed!")

    def add_with_langchain(self, docs):
        """
        将文档添加到 ChromaDB 数据库中。

        :param docs: 要添加的文档列表
        """
        # 使用 LangChain 提供的接口将文档添加到数据库
        self.store.add_documents(documents=docs)

    def get_store(self):
        """
        返回 Chroma 向量数据库的对象实例。

        :return: Chroma 向量存储对象
        """
        return self.store

4. 实现入库功能

代码文件及目录:app/rag/pdf_processor.py

import os  # 文件操作相关模块
import logging  # 日志记录模块
import time  # 时间操作模块
from tqdm import tqdm  # 进度条显示模块
from langchain_community.document_loaders import PyMuPDFLoader  # PDF文件加载器
from langchain_text_splitters import RecursiveCharacterTextSplitter  # 文本切分工具
from rag.chroma_conn import ChromaDB  # 自定义的ChromaDB类

class PDFProcessor:
    """
    PDFProcessor 类用于处理PDF文件并将其内容存储到ChromaDB中。
    """

    def __init__(self,
                 directory,  # PDF文件所在目录
                 chroma_server_type,  # ChromaDB服务器类型("local" 或 "http")
                 persist_path,  # ChromaDB持久化路径
                 embed):  # 向量化函数

        self.directory = directory  # PDF文件存放目录
        self.file_group_num = 80  # 每组处理的文件数
        self.batch_num = 6  # 每次插入的批次数量

        self.chunksize = 500  # 切分文本的大小
        self.overlap = 100  # 切分文本的重叠大小

        # 初始化ChromaDB对象
        self.chroma_db = ChromaDB(chroma_server_type=chroma_server_type,
                                  persist_path=persist_path,
                                  embed=embed)

        # 配置日志
        logging.basicConfig(
            level=logging.INFO,  # 设置日志级别为INFO
            format='%(asctime)s - %(levelname)s - %(message)s',  # 日志格式
            datefmt='%Y-%m-%d %H:%M:%S'  # 时间格式
        )

    def load_pdf_files(self):
        """
        加载指定目录下的所有PDF文件。
        """
        pdf_files = []
        for file in os.listdir(self.directory):  # 遍历目录中的所有文件
            if file.lower().endswith('.pdf'):  # 筛选PDF文件
                pdf_files.append(os.path.join(self.directory, file))  # 构建完整路径

        logging.info(f"Found {len(pdf_files)} PDF files.")  # 记录日志
        return pdf_files

    def load_pdf_content(self, pdf_path):
        """
        使用PyMuPDFLoader读取PDF文件的内容。
        """
        pdf_loader = PyMuPDFLoader(file_path=pdf_path)  # 创建PDF加载器
        docs = pdf_loader.load()  # 加载PDF内容
        logging.info(f"Loading content from {pdf_path}.")  # 记录日志
        return docs

    def split_text(self, documents):
        """
        使用RecursiveCharacterTextSplitter将文档切分为小段。
        """
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.chunksize,  # 每段文本的最大长度
            chunk_overlap=self.overlap,  # 段与段之间的重叠长度
            length_function=len,  # 使用字符串长度作为分割依据
            add_start_index=True,  # 添加起始索引信息
        )
        docs = text_splitter.split_documents(documents)  # 切分文档
        logging.info("Split text into smaller chunks with RecursiveCharacterTextSplitter.")  # 记录日志
        return docs

    def insert_docs_chromadb(self, docs, batch_size=6):
        """
        将文档分批插入到ChromaDB中。
        """
        logging.info(f"Inserting {len(docs)} documents into ChromaDB.")  # 记录日志

        start_time = time.time()  # 记录开始时间
        total_docs_inserted = 0  # 已插入的文档总数

        # 计算总批次
        total_batches = (len(docs) + batch_size - 1) // batch_size

        # 使用tqdm显示进度条
        with tqdm(total=total_batches, desc="Inserting batches", unit="batch") as pbar:
            for i in range(0, len(docs), batch_size):
                batch = docs[i:i + batch_size]  # 获取当前批次的文档
                self.chroma_db.add_with_langchain(batch)  # 插入到ChromaDB
                total_docs_inserted += len(batch)  # 更新已插入的文档数量

                # 计算TPM(每分钟插入的文档数)
                elapsed_time = time.time() - start_time
                if elapsed_time > 0:  # 防止除以零
                    tpm = (total_docs_inserted / elapsed_time) * 60
                    pbar.set_postfix({"TPM": f"{tpm:.2f}"})  # 更新进度条的后缀信息

                pbar.update(1)  # 更新进度条

    def process_pdfs_group(self, pdf_files_group):
        """
        处理一组PDF文件,包括加载内容、切分文本和插入数据库。
        """
        pdf_contents = []  # 存储所有PDF文件的内容

        for pdf_path in pdf_files_group:
            documents = self.load_pdf_content(pdf_path)  # 加载PDF内容
            pdf_contents.extend(documents)  # 将内容添加到列表中

        docs = self.split_text(pdf_contents)  # 切分文本
        self.insert_docs_chromadb(docs, self.batch_num)  # 插入到ChromaDB

    def process_pdfs(self):
        """
        批量处理目录下的所有PDF文件。
        """
        pdf_files = self.load_pdf_files()  # 加载所有PDF文件

        group_num = self.file_group_num  # 每组处理的文件数

        # 按组处理PDF文件
        for i in range(0, len(pdf_files), group_num):
            pdf_files_group = pdf_files[i:i + group_num]
            self.process_pdfs_group(pdf_files_group)

        print("PDFs processed successfully!")  # 提示处理完成

5. 测试导入功能

测试从 PDF 文件加载内容并将其导入到 ChromaDB 向量数据库的过程

代码文件及目录:app/test_framework.py

# 测试将 PDF 文件内容导入到向量数据库的主流程

def test_import():
    """
    测试从 PDF 文件加载内容并将其导入到 ChromaDB 向量数据库的过程
    """
    # 导入 PDFProcessor 类,用于处理 PDF 文件并将其内容存储到向量数据库中
    from rag.pdf_processor import PDFProcessor

    # 导入工具函数 get_qwen_models,用于获取通义千问的模型实例
    from utils.util import get_qwen_models

    # 获取通义千问的 LLM、Chat 和 Embedding 模型
    # 这里只使用 Embedding 模型来生成文本的向量表示
    llm, chat, embed = get_qwen_models()

    # 如果需要使用其他嵌入模型(如 Hugging Face 的 Embeddings),可以取消注释以下行
    # embed = get_huggingface_embeddings()

    # 定义 PDF 文件所在的目录路径
    directory = "./app/dataset/pdf"  # 存放 PDF 文件的目录

    # 定义 ChromaDB 数据库的持久化路径
    persist_path = "chroma_db"  # ChromaDB 数据库文件的存储路径

    # 定义 ChromaDB 的服务器类型("local" 表示本地模式)
    server_type = "local"  # 使用本地模式连接 ChromaDB

    # 创建 PDFProcessor 实例,用于处理 PDF 文件并将其内容存储到 ChromaDB 中
    pdf_processor = PDFProcessor(
        directory=directory,  # PDF 文件目录
        chroma_server_type=server_type,  # ChromaDB 服务器类型
        persist_path=persist_path,  # ChromaDB 数据库的持久化路径
        embed=embed  # 嵌入模型,用于生成文本向量
    )

    # 调用 process_pdfs 方法,开始处理 PDF 文件并将内容插入到 ChromaDB
    pdf_processor.process_pdfs()

# 如果此脚本作为主程序运行,则执行测试导入流程
if __name__ == "__main__":
    test_import()

(1)通过命令行启动ChromaDB服务端

chroma run --path chroma_db --port 8000 --host localhost

(2)运行 test_framework.py

 

6. 实现检索功能

代码文件及目录:app/rag/rag.py

通过这段代码,可以实现一个基本的 RAG 系统,从向量数据库中检索知识并生成答案

import logging  # 日志记录模块
from langchain_core.prompts import ChatPromptTemplate  # 用于构建对话提示模板
from langchain_core.runnables import RunnablePassthrough  # 用于传递输入数据的工具
from langchain_core.runnables.base import RunnableLambda  # 用于包装自定义函数为可运行对象
from langchain_core.output_parsers import StrOutputParser  # 用于解析输出为字符串
from .chroma_conn import ChromaDB  # 自定义的 ChromaDB 类

# 配置日志记录
logging.basicConfig(
    level=logging.INFO,  # 设置日志级别为 INFO
    format='%(asctime)s - %(levelname)s - %(message)s'  # 日志格式
)

class RagManager:
    """
    RAG(检索增强生成)管理器类,用于管理和执行基于向量数据库的知识检索与生成任务。
    """

    def __init__(self,
                 chroma_server_type="http",  # ChromaDB 服务器类型("http" 或 "local")
                 host="localhost", port=8000,  # ChromaDB HTTP 服务器地址和端口
                 persist_path="chroma_db",  # ChromaDB 数据库持久化路径
                 llm=None, embed=None):  # LLM 模型和嵌入模型

        self.llm = llm  # 大语言模型(LLM)
        self.embed = embed  # 嵌入模型

        # 初始化 ChromaDB 并获取存储对象
        chrom_db = ChromaDB(chroma_server_type=chroma_server_type,
                            host=host, port=port,
                            persist_path=persist_path,
                            embed=embed)
        self.store = chrom_db.get_store()  # 获取 ChromaDB 的存储实例

    def get_chain(self, retriever):
        """
        构建并返回 RAG 查询链。

        :param retriever: 向量数据库的检索器
        :return: RAG 查询链
        """
        # 定义 RAG 系统的经典 Prompt 模板
        prompt = ChatPromptTemplate.from_messages([
            ("human", """You are an assistant for question-answering tasks. Use the following pieces 
          of retrieved context to answer the question. If you don't know the answer, just say that you don't know. 
          Use three sentences maximum and keep the answer concise.
          Question: {question} 
          Context: {context} 
          Answer:""")
        ])

        # 将 `format_docs` 方法包装为 Runnable 对象
        format_docs_runnable = RunnableLambda(self.format_docs)

        # 构建 RAG 查询链
        rag_chain = (
                {"context": retriever | format_docs_runnable,  # 使用检索器获取上下文
                 "question": RunnablePassthrough()}  # 直接传递问题
                | prompt  # 应用 Prompt 模板
                | self.llm  # 使用 LLM 模型生成答案
                | StrOutputParser()  # 解析输出为字符串
        )

        return rag_chain

    def format_docs(self, docs):
        """
        格式化检索到的文档内容。

        :param docs: 检索到的文档列表
        :return: 格式化后的文档内容
        """
        # 记录检索到的文档数量
        logging.info(f"检索到资料文件个数:{len(docs)}")

        # 提取文档来源信息
        retrieved_files = "\n".join([doc.metadata["source"] for doc in docs])
        logging.info(f"资料文件分别是:\n{retrieved_files}")

        # 提取文档内容
        retrieved_content = "\n\n".join(doc.page_content for doc in docs)
        logging.info(f"检索到的资料为:\n{retrieved_content}")

        return retrieved_content  # 返回格式化后的文档内容

    def get_retriever(self, k=4, mutuality=0.3):
        """
        获取向量数据库的检索器。

        :param k: 检索返回的文档数量
        :param mutuality: 相似度阈值(分数阈值)
        :return: 检索器对象
        """
        retriever = self.store.as_retriever(
            search_type="similarity_score_threshold",  # 使用相似度分数阈值检索
            search_kwargs={"k": k, "score_threshold": mutuality}  # 检索参数
        )
        return retriever

    def get_result(self, question, k=4, mutuality=0.3):
        """
        执行 RAG 查询并返回结果。

        :param question: 用户提出的问题
        :param k: 检索返回的文档数量
        :param mutuality: 相似度阈值(分数阈值)
        :return: 查询结果(生成的答案)
        """
        # 获取检索器
        retriever = self.get_retriever(k, mutuality)

        # 获取 RAG 查询链
        rag_chain = self.get_chain(retriever)

        # 执行查询并返回结果
        return rag_chain.invoke(input=question)

7. 测试检索效果

在 app/framework.py 中添加 RAG 的测试调用函数

def test_rag():
    """
    测试 RAG(检索增强生成)系统的主流程。
    """
    # 导入 RagManager 类,用于管理和执行 RAG 查询任务
    from rag.rag import RagManager

    # 导入工具函数 get_qwen_models,用于获取通义千问的模型实例
    from utils.util import get_qwen_models

    # 获取通义千问的 LLM、Chat 和 Embedding 模型
    llm, chat, embed = get_qwen_models()

    # 创建 RagManager 实例,用于管理 RAG 系统
    # 参数说明:
    # - host: ChromaDB HTTP 服务器地址
    # - port: ChromaDB HTTP 服务器端口
    # - llm: 大语言模型(LLM)
    # - embed: 嵌入模型(Embedding Model)
    rag = RagManager(host="localhost", port=8000, llm=llm, embed=embed)

    # 执行 RAG 查询,传入用户提出的问题
    question = "景顺长城中短债债券C基金在20210331的季报里,前三大持仓占比的债券名称是什么?"
    result = rag.get_result(question)

    # 输出查询结果
    print(result)


if __name__ == "__main__":
    # 执行 RAG 测试函数
    test_rag()

    # 批量导入 PDF 测试函数(可选)
    # 如果需要测试 PDF 文件批量导入功能,请取消以下行的注释
    # test_import()

总结:

Step 1:创建一个 ChromaDB 类,封装基础的Chroma连接、插入和检索;

Step2:实现 PDFProcessor 类,该类调用 ChromaDB 类的插入函数,将批量读取的PDF文件进行切分后保存至向量库;

Step3:实现 RAGManager 类,该类封装了RAG的检索链,并定义检索参数;

Step4:实现一个测试函数,用于测试RAG的检索功能。


http://www.kler.cn/a/568403.html

相关文章:

  • 物联网同RFID功能形态 使用场景的替代品
  • Android 图片压缩详解
  • python中单例模式应用
  • 【计算机网络入门】初学计算机网络(九)
  • 2025年2月最新一区SCI-海市蜃楼搜索优化算法Mirage search optimization-附Matlab免费代码
  • 蓝桥杯 灯笼大乱斗【算法赛】
  • android智能指针android::sp使用介绍
  • Pytorch为什么 nn.CrossEntropyLoss = LogSoftmax + nn.NLLLoss?
  • 使用Docker快速搭建Redis主从复制
  • 【硬件工程师成长】之是否需要组合电容进行滤波的考虑
  • PDF工具 Candy Desktop(安卓)
  • openharmony-音频
  • HideUL:守护电脑隐私,轻松隐藏软件的隐藏工具
  • 京东一面:为什么 IDEA 建议去掉 StringBuilder,而要使用 “+” 拼接字符串?
  • 人工智能核心技术初识:Prompt、RAG、Agent与大模型微调
  • ECharts组件封装教程:Vue3中的实践与探索
  • Vue核心知识:KeepLive全方位分析
  • 从业务到技术:构建高效的 ERP 系统开发组织之道
  • deepseek使用记录18——文化基因之文化融合
  • 【CCF GESP 3 级】小猫分鱼 洛谷 B3925