llamaindex实现企业级RAG应用(一)
在上一篇文章中使用Qwen2进行RAG代码实践,手动实现了一版简易的RAG应用,在实际工作中通常都用会使用langchain或llamaindex架构来搭建rag应用,并且会非常复杂。
RAG是个很神奇的应用,可以很简单,也可以很复杂。在llamaindex官网给的案例,5行代码就可以构建RAG应用,但要真正实现企业级RAG应用,则需要花费大量时间去调优。本文通过一个复杂的项目案例,记录下工作中常用到的优化方法。
项目代码:
https://github.com/yblir/LegalKnowledgeRAG
一 项目说明
RAG 是一种基于大模型的知识密集型应用,以数据查询与对话任务为主要形式。对于复杂需求的场景,比如几十个不同类型法律知识文档, 如果使用经典的 RAG 应用,通过知识块+向量+top_K 检索来获得上下文,让大模型给出答案,那么显然是不现实的,因为不同文档间内容属性不同,粗暴地与在不同的文档间匹配向量相似度,很容易检索出不相关的答案。
经典的 RAG 应用在回答文档相关的事实性问题上,在大部分时间可以工作得不错,但是知识应用并不总是这种类型的,比如无法基于向量检索简单地生成文档的摘要与总结,也无法胜任一些跨文档回答问题或者需要结合其他工具复合应用的工作。
所以,本项目使用Data Agent作为主架构来实现。 Data Agent在 RAG 的基础上引入自我规划与使用工具的能力,从而具备了完成大模型驱动的、更丰富的数据读写任务的能力,提升RAG问答能力。因此,本文结合Agent, 实现一版RAG法律知识问答的项目。当然也可以使用路由查询引擎来代替Agent 实现接近的功能。不过路由查询引擎与Agent 是有区别的,路由查询引擎在大部分时候仅起到选择工具与转发问题的作用,并不会多次迭代,而Agent 则会观察工具返回的结果,有可能使用多个工具通过多次迭代来完成任务。
项目整体流程如下:
需要的模块主要包括以下 3 个。
(1)后端 Agent 模块:这是系统的核心模块,用于给已有的多文档知识构
造索引与查询引擎,并以查询引擎作为工具创建上层的 Agent。
(2)后端 API 模块:这是提供给前端 UI 应用直接访问的 API。
(3)前端 UI 应用:这是一个简单的支持连续对话的前端 ChatBot,能够与后端 API 模块实现交互。
本文主要关注后端(Agent 模块与 API 模块),在整体架构中,我们自底向上推进。
二 自定义查询引擎
查询引擎可以通过vector_index.as_query_engine()的方式一步构建完成,但若要实现更复杂的RAG流程,则需要我们精准控制query_engine的内部细节,这里我们手动构建一个。
2.1 自定义响应器
通俗地说,查询引擎=检索器+响应器,检索器可灵活操作的代码不多,这里仅构建响应器
# 自定义响应器
class CustomSynthesizer(BaseSynthesizer):
my_prompt = (
"根据以下上下文信息:\n"
"---------------------\n"
"{context_str}\n"
"---------------------\n"
"使用中文回答以下问题\n "
"问题: {query_str}\n"
"答案: "
)
def __init__(
self,
llm: Optional[LLMPredictorType] = None,
) -> None:
super().__init__(llm=llm)
self._input_prompt = PromptTemplate(CustomSynthesizer.my_prompt)
# 必须实现的接口
def _get_prompts(self) -> PromptDictType:
return self._input_prompt.text
# 必须实现的接口, 更新提示词
def _update_prompts(self, prompts: PromptDictType) -> None:
self._input_prompt = PromptTemplate(prompts.text)
# 生成响应的接口
def get_response(
self,
query_str: str,
text_chunks: Sequence[str],
**response_kwargs: Any,
) -> RESPONSE_TEXT_TYPE:
context_str = "\n\n".join(n for n in text_chunks)
# 此处可以自定义任何响应逻辑
response = self._llm.predict(
self._input_prompt,
query_str=query_str,
context_str=context_str,
**response_kwargs,
)
return response
# 响应接口的异步版本
async def aget_response(
self,
query_str: str,
text_chunks: Sequence[str],
**response_kwargs: Any,
) -> RESPONSE_TEXT_TYPE:
context_str = "\n\n".join(n for n in text_chunks)
response = await self._llm.apredict(
self._input_prompt,
query_str=query_str,
context_str=context_str,
**response_kwargs,
)
return response
2.2 自定义大模型
RAG的核心是对大模型能力的调用,在llamaindex中可以通过内置的vllm和ollama等部署工具直接使用,当然也可以自定义一份,这样的好处是可以使用本地微调且还没被llamaindex支持的大模型,为项目进行定制化开发。这里使用vllm部署模型。
另外还有嵌入模型可以自定义,这里不演示。
class CustomVllmLLM(CustomLLM):
"""自定义大模型, 模型是huggingface格式"""
# self.vllm_model字段必须先在此声明才能用
vllm_model: vllm.Vllm = Field(default=None, description="VLLM 模型实例")
def __init__(self, model_path: str):
super().__init__()
if not os.path.exists(model_path):
raise ValueError("模型路径不存在")
self.vllm_model = vllm.Vllm(model_path)
# 实现metadata 接口
@property
def metadata(self) -> LLMMetadata:
return LLMMetadata(
model_name='vllm_model'
)
# 实现complete 接口
@llm_completion_callback()
def complete(
self, prompt: str, **kwargs: Any
) -> CompletionResponse:
response = self.vllm_model.complete(prompt, **kwargs)
return CompletionResponse(
text=response.text
)
# 实现stream_complete 接口
@llm_completion_callback()
def stream_complete(
self, prompt: str, **kwargs: Any
) -> CompletionResponseGen:
response = ""
model_response = self.vllm_model.complete(prompt, **kwargs)
for token in model_response.text:
response += token
yield CompletionResponse(text=response, delta=token)
2.3 自定义查询引擎
totooooooo,此处插图。
自定义好响应器和大模型后,就可以合成查询引擎了。查询引擎有两种,分别是单次查询引擎和多轮对话引擎,实际项目中常用的是对话引擎。
from custom_components import CustomVllmLLM, CustomSynthesizer
# 单次查询引擎
# 对chat_engine =vector_index.as_query_engine()的自定义操作
class OnceQueryEngine(CustomQueryEngine):
# 此处直接使用大模型组件,而不是响应生成器
# llm: Ollama = Field(default=None, description="llm")
llm: vllm.Vllm = Field(default=None, description="llm")
retriever: BaseRetriever = Field(default=None, description="retriever")
qa_prompt: PromptTemplate = Field(default=None, description="提示词")
synthesizer: CustomSynthesizer = Field(default=None, description="自定义响应器")
qa_prompt = PromptTemplate(
"根据以下上下文回答输入问题:\n"
"---------------------\n"
"{context_str}\n"
"---------------------\n"
"回答以下问题,不要编造\n"
"我的问题: {query_str}\n"
"答案: "
)
def __init__(self, retriever: BaseRetriever, llm: CustomVllmLLM):
super().__init__()
self.retriever = retriever
# self.llm = llm
self.synthesizer = CustomSynthesizer(llm=llm)
def custom_query(self, query_str: str):
nodes = self.retriever.retrieve(query_str)
# 用检索出的Node 构造上下文
context_str = "\n\n".join([n.node.get_content() for n in nodes])
# 用上下文与查询问题组装Prompt,然后调用大模型组件响应生成
# response = self.llm.complete(
# OnceQueryEngine.qa_prompt.format(
# context_str=context_str, query_str=query_str
# )
# )
# 使用自定义响应器完成响应生成
response = self.synthesizer.get_response(
query_str=query_str,
text_chunks=context_str
)
return str(response)
# 对话查询引擎
# 对chat_engine = vector_index.as_chat_engine(chat_mode="condense_question")自定义操作
class ChatQueryEngine:
custom_prompt = PromptTemplate(
"""
请根据以下的历史对话记录和新的输入问题,重写一个新的问题,使其能够捕捉对话中的
所有相关上下文。
<Chat History>
{chat_history}
<Follow Up Message>
{question}
<Standalone question>
"""
)
# 历史对话记录
custom_chat_history = [
ChatMessage(
role=MessageRole.USER,
content="我们来讨论一些有关法律知识的问题",
),
ChatMessage(role=MessageRole.ASSISTANT, content="好的"),
]
def __init__(self, retriever: BaseRetriever, llm: vllm.Vllm):
super().__init__()
self.once_query_engine = OnceQueryEngine(retriever, llm)
# 这种对话模式在理解历史对话记录的基础上将当前输入的问题重写成一
# 个独立的、具备完整语义的问题,然后通过查询引擎获得答案
self.custom_chat_engine = CondenseQuestionChatEngine.from_defaults(
query_engine=self.once_query_engine,
# 对话引擎基于查询引擎构造
condense_question_prompt=ChatQueryEngine.custom_prompt, # 设置重写问题的 Prompt 模板
chat_history=ChatQueryEngine.custom_chat_history,
# 携带历史对话记录
verbose=True,
)
self.custom_chat_engine.chat_repl()
if __name__ == '__main__':
vllm_model = vllm.Vllm('/media/xk/D6B8A862B8A8433B/data/qwen2_05b')
# vllm_model = CustomVllmLLM('/media/xk/D6B8A862B8A8433B/data/qwen2_05b')
s = CustomSynthesizer(vllm_model)
res = s.get_prompts()
print(res)
s.update_prompts(PromptTemplate('fdsffds').text)
print(s.get_prompts())
chat_engine = ChatQueryEngine('a', 'b').custom_chat_engine
chat_engine.chat_repl()
三 数据加载与构建索引
我们的目标是从多个法律文档中检索有用的知识,与问题合并后一起送入大模型获得答案。
为实现这一目的,我们有以下问题需要解决:
- 如何读取多个不同类型的文档
- 不同文档间知识差异很大,如交通法与刑法,如何根据问题正确检索出与问题最相关的上下文?
下图是从原始文档到向量索引的构建过程,从2.3可以知道向量索引与检索器和查询引擎的关系,下图也有简单体现。向量索引有多种类型,最常见的是向量存储索引,下图展示就是向量存储索引。除此还有很多其他索引,比如文档摘要索引,对象索引,知识图谱索引 . . .,存在即合理,每种索引都有独特用途,也有很多复杂参数项,这也是llamaindex架构的优势,对数据操作极为精细。
这里从路径加载数据,存在在chromadb向量库中,并构建了3种类型的向量索引。
# Settings.llm=
# 在vector_store_index = VectorStoreIndex(node, storage_context=storage_context)时会隐式调用
Settings.embed_model = OllamaEmbedding(model_name="milkey/dmeta-embedding-zh:f16")
# 创建持久化的Chroma客户端
chroma = chromadb.PersistentClient(path="./chroma_db")
chroma.heartbeat()
collection = chroma.get_or_create_collection(name="legal_knowledge_rag")
# vector_store = ChromaVectorStore(chroma_collection=collection)
# 创建向量存储
vector_store = ChromaVectorStore(chroma_collection=collection)
# 创建存储上下文, 准备向量存储索引
# storage_context = StorageContext.from_defaults(vector_store=vector_store)
# 所有文件都从这里读取
class VectorIndex:
def __init__(self):
# if not os.path.exists(file_paths):
# raise ValueError('文件路径不存在')
# self.nodes = self.read_data(file_paths)
pass
@staticmethod
def read_data(file_path: str):
# nodes = {}
if not os.path.isfile(file_path):
raise ValueError(f'{file_path} is not file')
# 获得不带后缀的文件名
# file_name = file_path.split(os.sep)[-1].split('.')[0]
document = SimpleDirectoryReader(input_files=[file_path]).load_data()
# 创建句子分割器, 对文档进行分割
spliter = SentenceSplitter(chunk_size=200, chunk_overlap=10)
# 从句子分割器获得节点数据
node = spliter.get_nodes_from_documents(document)
# node_embedding = embed_model(node)
# vector_store.add(node_embedding)
# return {file_name: node}
return node
def create_vector_index(self, file_path: str):
# 获得不带后缀的文件名
file_name = file_path.split(os.sep)[-1].split('.')[0]
node = self.read_data(file_path)
# 将切分好的数据保存在向量库中,使用时直接从库中取
if not os.path.exists(f"../chroma_db/vector_store_index/{file_name}"):
logger.info(f'create vector index: {file_name}')
storage_context = StorageContext.from_defaults(vector_store=vector_store)
# 向量存储索引, 只支持一种检索模式,就是根据向量的语义相似度来进行检索,
# 对应的检索器类型为VectorIndexRetriever
vector_store_index = VectorStoreIndex(node, storage_context=storage_context)
vector_store_index.storage_context.persist(persist_dir=f"../chroma_db/vector_store_index/{file_name}")
else:
logger.info(f'load vector index: {file_name}')
storage_context = StorageContext.from_defaults(
persist_dir=f"../chroma_db/vector_store_index/{file_name}",
vector_store=vector_store
)
vector_store_index = load_index_from_storage(storage_context=storage_context)
return vector_store_index
def create_keyword_index(self, file_path: str):
# 获得不带后缀的文件名
file_name = file_path.split(os.sep)[-1].split('.')[0]
node = self.read_data(file_path)
if not os.path.exists(f"../chroma_db/keyword_index/{file_name}"):
logger.info(f'create keyword index: {file_name}')
# 构造关键词表索引
kw_index = KeywordTableIndex(node)
kw_index.storage_context.persist(persist_dir=f"../chroma_db/keyword_index/{file_name}")
else:
logger.info(f'load keyword index: {file_name}')
storage_context = StorageContext.from_defaults(persist_dir=f"../chroma_db/keyword_index/{file_name}")
# 返回关键词检索器
kw_index = load_index_from_storage(storage_context=storage_context)
return kw_index
def create_summary_index(self, file_path: str, llm=None):
# 获得不带后缀的文件名
# file_name = file_path.split(os.sep)[-1].split('.')[0]
node = self.read_data(file_path)
# 文档摘要索引与向量存储索引的最大区别是,其不提供直接对基础Node
# 进行语义检索的能力,而是提供在文档摘要层进行检索的能力,然后映射到基础Node。
if llm is None:
summary=DocumentSummaryIndex(node)
else:
summary=DocumentSummaryIndex(node,llm=llm)
return summary