当前位置: 首页 > article >正文

LlamaIndex 工作流

LlamaIndex 内部提供了一个简单的工作流引擎,为什么要有工作流引擎?做过 OA 的同学都了解工作流引擎,工作流的优势在于模块化开发,把业务节点进行抽象,流程于业务逻辑分离,方便进行业务节点组装,也是很多低代码平台的底层工作原理。大语言模型的应用特别适合工作流, 模型可以理解一个万能的 API,传统的 API 都有固定的入参、出参、功能,而模型会根据提示词做推理,具体做什么,返回什么,需要用户来自定义。例如,可以想象一个典型的场景,检测系统日志,如果发现异常发送邮件到指定的邮件组。本文将介绍如何在 LlamaIndex 创建工作流:

创建一个简单的工作流

首先安装工作流依赖

pip install llama-index-utils-workflow

LlamaIndex 是一个基于事件的工作流引擎,工作流通过事件来驱动,工作流节点在 LlamaIndex 中是 Step,节点对应类中的一个方法,方法上加上@step 注解,node 的输入和输出都是 event。工作流有两个特别 Event,StartEvent 和 StopEvent,StartEvent 是开始节点,workflow.run 启动 workflow 之后进入的第一个节点就是 StartEvent,workflow.run 可以传入初始化参数。

from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Event
)
import asyncio
from llama_index.utils.workflow import draw_all_possible_flows

class F1Event(Event):
    first_output: str

class MyWorkflow(Workflow):
    @step
    async def my_step(self, ev: StartEvent) -> StopEvent:
        # do something here
        return StopEvent(result=ev.topic)
    
    
draw_all_possible_flows(MyWorkflow, filename="basic_workflow.html")

async def main():
    w = MyWorkflow(timeout=10, verbose=False)
    result = await w.run(topic="Hello")
    print(result)


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

draw_all_possible_flows 可以将 Workflow 可视化
在这里插入图片描述

保存状态

通过上下文 (Conext) 在节点之间保存数据,例如初始化传入数据, 在其他节点获取数据。

from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Event,
    Context
)
import asyncio
from llama_index.utils.workflow import draw_all_possible_flows

class FirstEvent(Event):
    first_output: str


class SecondEvent(Event):
    second_output: str

class MyWorkflow(Workflow):
    @step
    async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
        print(ev.first_input)
        
        await ctx.set("data", ev.data)
        
        return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ctx:Context, ev: FirstEvent) -> SecondEvent:
        print(ev.first_output)
        return SecondEvent(second_output="Second step complete.")

    @step
    async def step_three(self, ctx:Context, ev: SecondEvent) -> StopEvent:
        print(ev.second_output)
        print(await ctx.get("data"))
        return StopEvent(result="Workflow complete.")



async def main():
    w = MyWorkflow(timeout=10, verbose=False)
    result = await w.run(first_input="Hello", data={"name": "tom"})
    print(result)


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

嵌套工作流

工作流节点中,可以嵌套其他工作流,首先在 workflow 中添加工作流,在需要启动工作流的节点上,将子工作流作为参数传入。

from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Event,
    Context
)
import asyncio
from llama_index.utils.workflow import draw_all_possible_flows

class ReflectionFlow(Workflow):
    @step
    async def sub_start(self, ctx: Context, ev: StartEvent) -> StopEvent:
        print("Doing custom reflection")
        return StopEvent(result="Improved query")

class FirstEvent(Event):
    first_output: str


class SecondEvent(Event):
    second_output: str

class MyWorkflow(Workflow):
    @step
    async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
        print(ev.first_input)
        
        await ctx.set("data", ev.data)
        
        return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ctx:Context, ev: FirstEvent, reflection_workflow: Workflow) -> SecondEvent:
        print(ev.first_output)
        res = await reflection_workflow.run(query="nested")
        print(f"nested workflow {res}")
        return SecondEvent(second_output="Second step complete.")

    @step
    async def step_three(self, ctx:Context, ev: SecondEvent) -> StopEvent:
        print(ev.second_output)
        print(await ctx.get("data"))
        return StopEvent(result="Workflow complete.")



async def main():
    w = MyWorkflow(timeout=10, verbose=False)
    w.add_workflows(reflection_workflow=ReflectionFlow())

    result = await w.run(first_input="Hello", data={"name": "tom"})
    print(result)


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

总结

在 LlamaIndex 中,为了能够更好的将组建进行组装,提供工作流机制,事件工作流可以很好的解耦工作流的逻辑。事件工作流要尽量简单,如果节点过多就是导致事件过于复杂,事件管理也是比较松散的,过多的依赖会导致后期维护困难。


http://www.kler.cn/a/284358.html

相关文章:

  • 高级java每日一道面试题-2024年11月06日-JVM篇-什么是 Class 文件? Class 文件主要的信息结构有哪些?
  • 卷积、频域乘积和矩阵向量乘积三种形式之间的等价关系与转换
  • 基于Spring Boot与Redis的令牌主动失效机制实现
  • 云运维基础
  • Unity 性能优化方案
  • MySQL的SQL书写顺序和执行顺序
  • 皕盛电商平台:为合作伙伴提供广阔的发展空间
  • 华为管理工程与管理工程部
  • 3 Python开发工具:VSCode+插件
  • OSI七层模型中的数据链路层
  • 设计模式 15 解释器模式
  • uni-app开发日志:将schema2code生成的新增页和修改页整合成一页
  • 8种数据结构
  • 【RabbitMQ】应用
  • 纯vue实现笔记系统
  • 【python】Gpt-embedding文本建模
  • 【面试题系列Vue06】Vue 单页应用与多页应用的区别
  • 【单片机原理及应用】实验:数码管的中断控制
  • customRef 与 ref
  • docker仓库的工作原理
  • Apache CloudStack Official Document 翻译节选(十)
  • 零基础转行学网络安全怎么样?
  • sheng的学习笔记-AI-基于分歧的方法
  • 高性价比百元学生党蓝牙耳机怎么选?2024四款年度耳机推荐揭秘!
  • redis作为缓存,mysql的数据如何与redis同步
  • 力扣52-最大子序和(java详细题解)