当前位置: 首页 > article >正文

深入研究 RAG 流程中的关键组件

我们已经看到了整个RAG流程,并获得了第一手的实践经验,您可能会对RAG流程中一些组件的使用和目的存在很多疑惑,比如RunnablePassthrough。在本节中,我们将进一步了解这些关键组件。

RAG的核心模型思想是将一个复杂的任务分解为多个小而简单的任务组合,分而治之的策略在处理大型和复杂问题时非常有用。让我们先来看一个非常简单的RAG流水线:


from langchain_core.runnables import Runnable, RunnableSequence

# 定义第一个任务:输出 "hello"
class TaskA(Runnable):
    def invoke(self, input, context=None):
        return input + " a"

# 定义第二个任务:输出 "beautiful"
class TaskBeautiful(Runnable):
    def invoke(self, input, context=None):
        return input + " beautiful"

class TaskDay(Runnable):
  def invoke(self, input, context = None):
    return input + " day!"
# 创建任务实例
task_a = TaskA()
task_beautiful = TaskBeautiful()
task_day = TaskDay()

# 使用"|"操作符重载Runnable
rag_chain = task_a | task_beautiful | task_day
output = rag_chain.invoke("what")
print(output)

上面代码的输出是"what a beautiful day"。可以看到Runnable类重载了"|"操作符,我们可以将Runnable的实例组合在一起,并且最后一个Runnable实例的输出成为下一个Runnable实例的输入。同时,我们需要确保每个Runnable实例提供invoke接口,以便数据在连接的Runnable实例之间传递。

回到RAG流程中使用的组件,RunnablePassThrough实际上类似于以下代码:


class DoNothing(Runnable):
    def invoke(self, input, context=None):
        return input

让我们看一个例子:


passthrough = RunnablePassthrough()
input_data = {"msg": "this is a test"}
output_data = passthrough.invoke(input_data)
print(output_data)

运行上面的代码将输出: {‘msg’: ‘this is a test’}

RunnablePassthrough通常在管道中用作占位符或连接器,以连接两个功能管道,但它实际上可以对其输入数据进行一些简单的更改:


# 原始数据
input_data = {"message": "Hello, World!", "status": "initial"}

passthrough = RunnablePassthrough.assign(result = lambda x: "processed")

# 使用输入数据调用
output_data = passthrough.invoke(input_data)

print(output_data)

这里我们需要确保输入数据是字典形式,且assign的参数为lambda表达式,运行上述代码得到以下结果:


{'message': 'Hello, World!', 'status': 'initial', 'result': 'processed'}

Runnable对象可以像下面这样将几个函数链接在一起:


def task1(input1):
  return f"task1:{input1}"

def task2(input1):
  return f"task2:{input1}"

def context(input):
  return f"context: {input}"

rag_chain = RunnablePassthrough().assign(context=context) | task1 | task2
res = rag_chain.invoke({"invoke": "input for invoke"})
print(res)

invoke调用将数据发送到RunnablePassthrough对象,数据未更改地输出,然后数据将发送到task1,task1的输出将作为task2的输入。运行上述代码的结果为:


task2:task1:{'invoke': 'input for invoke', 'context': "context: {'invoke': 'input for invoke'}"}

另一个需要了解的组件是RunnableMap,它允许我们将多个Runnable对象组合在一起,可以并行或顺序运行。RunnableMap对象就像一个字典,字典中的键就像分支或步骤,可以在相同输入上独立运行。当我们需要多个步骤或函数独立处理相同输入时,这非常有用。让我们来看一个示例:


from langchain_core.runnables import Runnable, RunnableMap

# 定义第一个任务:输出 "task1"
class Task1(Runnable):
    def invoke(self, input, context=None):
        return f"task1 for {input}"
      
class Task2(Runnable):
    def invoke(self, input, context=None):
        return f"task2 for {input}"
pipeline = RunnableMap({
    "task1": Task1(),
    "task2": Task2(),
})

input_data = "input data"
output_data = pipeline.invoke(input_data)
print(output_data)

传递给RunnableMap的映射中每个键的值都应实现invoke接口,pipeline.invoke调用将并行触发映射中每个值的invoke调用。上述代码的输出为:


{'task1': 'task1 for input data', 'task2': 'task2 for input data'}

最后一个我们需要了解的组件是RunnableParallel,它类似于RunnableMap,它们之间可能有一些细微差别,但在此不重要。我们可以将RunnableParallel视为RunnableMap,值得注意的一点是RunnableParallel的assign。让我们来看代码示例:


from langchain_core.runnables import Runnable, RunnableParallel, RunnableMap
import time
# 定义第一个任务:输出 "a"
class TaskA(Runnable):
    def invoke(self, input, context=None):
        time.sleep(1)
        return input + " a"

# 定义第二个任务:输出 "beautiful"
class TaskBeautiful(Runnable):
    def invoke(self, input, context=None):
        time.sleep(2)
        return input + " beautiful"

class TaskDay(Runnable):
  def invoke(self, input, context = None):
    time.sleep(3)
    return input + " day"
# 创建任务实例

class TaskAssign(Runnable):
  def invoke(self, input, context = None):
    time.sleep(3)
    print(f"input: {input}")
    return {**input}

start = time.time()
parallel_tasks = RunnableParallel({
    "taskA": TaskA(),
    "taskBeautiful": TaskBeautiful(),
    "TaskDay": TaskDay(),
}).assign(taskAssign=TaskAssign())
output = parallel_tasks.invoke("what")
print(f"output for rag run parallelly: {output}")
end = time.time()
print(f"time for rag run parallel: {end - start}" )

上面代码的输出为:


input: {'taskA': 'what a', 'taskBeautiful': 'what beautiful', 'TaskDay': 'what day'}
output for rag run parallelly: {'taskA': 'what a', 'taskBeautiful': 'what beautiful', 'TaskDay': 'what day', 'taskAssign': {'taskA': 'what a', 'taskBeautiful': 'what beautiful', 'TaskDay': 'what day'}}
time for rag run parallel: 6.011621713638306

从输出可以看出,在RunnableParallel中使用assign时,它会首先执行传递给RunnableParallel的映射中的任务,然后使用结果字典作为TaskAssign的参数,最终输出为RunnableParallel的字典和一个键为"TaskAssign"的条目,值为TaskAssign的返回结果。

最后,让我们重新构建上一节的RAG流水线,如下所示:


def task_from_docs(input):
  print(f"task from docs: {input}")
  print(f"input['context']: {input['context']}")
  return lambda x: format_docs(input["context"])

rag_chain_from_docs = (
    RunnablePassthrough.assign(context=(
        task_from_docs
    ))
)

rag_chain_with_source = RunnableParallel({
    "context": retriver,
    "question": RunnablePassthrough(),
}).assign(answer=rag_chain_from_docs)
'''
rag_chain_with_source的invoke将并行触发retriver、RunnablePassthrough的invoke,
然后将得到的字典结果发送到rag_chain_from_docs流水线,再将该字典发送到task_from_docs函数,并将
输出作为字典中的"context"键的值
'''
res = rag_chain_with_source.invoke("How does RAG compare with fine-tuning")
print(f"context of res {res['context']}")
print(f"question of res {res['question']}")

#res将用作调用rag_chain_from_docs的参数,并将其传递给task_from_docs调用,
#task_from_docs将提取字典中context的值并传递给format_docs

rag_chain = rag_chain_with_source | prompt | llm | StrOutputParser()
response = rag_chain.invoke("How does RAG compare with fine-tuning")
print(f"final rag response: {response}")

上面代码运行后所得结果如下:

Retrieval-Augmented Generation (RAG) and fine-tuning are two different approaches to enhancing the capabilities of Large Language Models (LLMs). 

RAG combines the strengths of LLMs with internal data, allowing organizations to access and utilize their own data effectively. It enhances the accuracy and relevance of responses by fetching specific information from databases in real time, thus expanding the model's knowledge beyond its initial training data. RAG is particularly useful for organizations that need to leverage their proprietary data or the latest information that was not included in the model's training set.

On the other hand, fine-tuning involves adjusting the weights and biases of a pre-trained model based on new training data. This process permanently alters the model's behavior and is suitable for teaching the model specialized tasks or adapting it to specific domains. However, fine-tuning can be complex and costly, especially as data sources evolve, and it may lead to issues like overfitting.

In summary, RAG is more about integrating real-time data retrieval to enhance model responses, while fine-tuning is about permanently modifying the model based on new data. Each approach has its advantages and limitations, and the choice between them depends on the specific needs and resources of the organization.
``

基于上述知识,我们可以轻松理解代码。rag_chain_with_source的invoke将生成一个带有context和question键的字典,context的值为retriver.invoke("How does RAG compare with fine-tuning")的结果,question的值为RunnablePassthrough().invoke("How does RAG compare with fine-tuning")的结果,即原始字符串本身。

然后带有context和question的字典作为task_from_docs的输入。在task_from_docs中,取出context键的值传递给format_docs,task_from_docs的输出将发送给prompt函数。prompt的输出将生成一个带有问题和上下文的提示,提示发送到ChatGPT以获得最终返回结果。

http://www.kler.cn/a/380461.html

相关文章:

  • 漫途焊机安全生产监管方案,提升安全生产管理水平!
  • MySQL的约束和三大范式
  • openstack之guardian介绍与实例创建过程
  • IPC机制总结笔记
  • 一个由Deno和React驱动的静态网站生成器
  • Java复习35(PTA)
  • Kubernetes架构及核心组件
  • Soul App创始人张璐团队自研多模态大模型,亮相GITEX GLOBAL获好评
  • 【Arch Linux 上安装 Appium 】
  • 进程介绍!
  • AcWing 1073 树的中心 树形dp (详解)
  • 图书管理系统(JDBC)
  • Elasticsearch Date类型,时间存储相关说明
  • 创新材料科技:铜冷却壁助力高炉节能降耗
  • 【React】初学React
  • 新能源汽车火灾应急处置程序
  • w~大模型~合集20
  • 在 Oracle 数据库中,SERVICE_NAME 和 SERVICE_NAMES 有什么区别?
  • 云原生后端:现代应用程序开发的关键技术
  • 【vue项目中添加告警音频提示音】
  • 如何编写STM32的RTC程序
  • 自动化立体仓库:详细设计方案
  • 深度学习之数据增强
  • DDOS的攻击方式有哪些?
  • 音视频入门基础:H.264专题(22)——通过FFprobe显示H.264裸流每个packet的信息
  • 内网远程连接解决方案【Frp】