【Agent】OpenManus-Flow-PlanningFlow设计分析
1. 概述
PlanningFlow 是一个继承自 BaseFlow 的具体实现,专门用于管理基于计划的任务执行流程。它协调多个Agent按照预定计划执行任务,跟踪执行进度,并提供完整的计划生命周期管理。
2. 核心属性
属性名 | 类型 | 默认值 | 描述 |
---|---|---|---|
llm | LLM | LLM() | 用于计划创建和总结的语言模型实例 |
planning_tool | PlanningTool | PlanningTool() | 用于计划管理的工具实例 |
executor_keys | List[str] | [] | 可执行步骤的Agent标识符列表 |
active_plan_id | str | 基于时间戳生成 | 当前活动计划的唯一标识符 |
current_step_index | Optional[int] | None | 当前执行步骤的索引 |
3. 初始化流程
def __init__(
self, agents: Union[BaseAgent, List[BaseAgent], Dict[str, BaseAgent]], **data
):
功能:初始化 PlanningFlow 实例,设置Agent、执行器和计划工具。
实现细节:
- 处理
executors
参数,转换为executor_keys
- 处理
plan_id
参数,设置为active_plan_id
- 初始化
planning_tool
(如果未提供) - 调用父类初始化方法
- 如果未指定执行器,将所有Agent设为执行器,所有 Agent的key的list
4. 核心方法
4.1 execute
async def execute(self, input_text: str) -> str:
功能:执行完整的计划流程,从创建计划到执行所有步骤。
参数:
input_text
: str - 用户输入 prompt,用于创建初始计划
返回值:
- str - 执行结果的文本摘要
code trace
async def execute(self, input_text: str) -> str:
try:
# 验证主要Agent是否可用
if not self.primary_agent:
raise ValueError("No primary agent available")
# 如果提供了 prompt,创建初始计划
if input_text:
await self._create_initial_plan(input_text)
if self.active_plan_id not in self.planning_tool.plans:
logger.error(
f"Plan creation failed. Plan ID {self.active_plan_id} not found in planning tool."
)
return f"Failed to create plan for: {input_text}"
result = ""
# 循环执行以下步骤,直到计划完成或Agent终止
while True:
# 获取当前要执行的步骤信息
self.current_step_index, step_info = await self._get_current_step_info()
# 如果没有更多步骤,完成计划,输出总结后退出
if self.current_step_index is None:
result += await self._finalize_plan()
break
# 选择适当的执行Agent
step_type = step_info.get("type") if step_info else None
executor = self.get_executor(step_type)
# 执行当前步骤,就是 Agent 里面 run 的逻辑
step_result = await self._execute_step(executor, step_info)
result += step_result + "\n"
# 检查Agent状态是否为 FINISHED,如果是就退出
if hasattr(executor, "state") and executor.state == AgentState.FINISHED:
break
return result
except Exception as e:
logger.error(f"Error in PlanningFlow: {str(e)}")
return f"Execution failed: {str(e)}"
如果是 FINISHED,就直接退出,并没有像 current_step_index 为 None 那样,总结后再退出
4.2 _execute_step
async def _execute_step(self, executor: BaseAgent, step_info: dict) -> str:
功能:使用指定Agent执行当前步骤。
参数:
executor
: BaseAgent - 执行步骤的Agentstep_info
: dict - 步骤信息
返回值:
- str - 步骤执行结果
code trace
async def _execute_step(self, executor: BaseAgent, step_info: dict) -> str:
# 根据当前任务状态获取上下文给等下 Agent run 的时候使用
plan_status = await self._get_plan_text()
step_text = step_info.get("text", f"Step {self.current_step_index}")
# 创建提示词
step_prompt = f"""
CURRENT PLAN STATUS:
{plan_status}
YOUR CURRENT TASK:
You are now working on step {self.current_step_index}: "{step_text}"
Please execute this step using the appropriate tools. When you're done, provide a summary of what you accomplished.
"""
# 执行具体 Agent 的 run 方法
try:
step_result = await executor.run(step_prompt)
# 执行成功后将步骤标记为已完成
await self._mark_step_completed()
return step_result
except Exception as e:
logger.error(f"Error executing step {self.current_step_index}: {e}")
return f"Error executing step {self.current_step_index}: {str(e)}"
4.3 get_executor
def get_executor(self, step_type: Optional[str] = None) -> BaseAgent:
功能:获取适合当前步骤的执行Agent。
参数:
step_type
: Optional[str] - 步骤类型,用于选择特定Agent
返回值:
- BaseAgent - 选定的执行Agent实例
实现细节:
- 如果提供了步骤类型且匹配Agent键,使用该Agent
- 否则使用第一个可用的执行器
- 如果没有可用执行器,回退到主要Agent
4.4 _create_initial_plan
async def _create_initial_plan(self, request: str) -> None:
功能:基于用户请求创建初始计划。
参数:
request
: str - 用户请求prompt
实现细节:
- 创建系统消息和用户消息
- 使用 LLM 和 PlanningTool 生成计划,这一步会生成每个 step
- 处理工具调用结果
- 如果 LLM 未能创建计划,创建默认计划
4.5 _get_current_step_info
async def _get_current_step_info(self) -> tuple[Optional[int], Optional[dict]]:
功能:解析当前计划,识别下一个要执行的步骤。
返回值:
- tuple[Optional[int], Optional[dict]] - 步骤索引和步骤信息的元组
实现细节:
- 验证计划是否存在
- 从计划工具存储中直接访问计划数据
- 用正则查找第一个未完成的步骤
- 提取步骤类型(如果有)
- 将步骤标记为进行中
- 返回步骤索引和信息
4.6 _mark_step_completed
async def _mark_step_completed(self) -> None:
功能:将当前步骤标记为已完成。
实现细节:
- 使用计划工具将步骤标记为已完成
- 如果工具调用失败,直接更新计划工具存储中的状态
4.7 _get_plan_text
async def _get_plan_text(self) -> str:
功能:获取当前计划的格式化文本表示。
返回值:
- str - 格式化的计划文本
实现细节:
- 使用计划工具获取计划文本
- 如果工具调用失败,使用备用方法从存储生成文本
4.8 _generate_plan_text_from_storage
def _generate_plan_text_from_storage(self) -> str:
功能:直接从存储生成计划文本(备用机制)。
返回值:
- str - 格式化的计划文本
实现细节:
- 从计划工具存储中获取计划数据
- 确保状态列表长度匹配步骤数量
- 计算各状态的统计信息和完成进度
- 生成格式化的计划文本,包括标题、进度和步骤列表
4.9 _finalize_plan
async def _finalize_plan(self) -> str:
功能:完成计划并提供执行摘要。
返回值:
- str - 计划完成摘要
实现细节:
- 获取最终计划状态文本
- 使用 LLM 直接生成摘要
- 如果 LLM 调用失败,使用Agent生成摘要
- 返回格式化的完成摘要
5. 执行流程分析
5.1 计划创建流程
- 用户提供任务描述
- 系统创建系统消息和用户消息
- LLM 生成包含步骤的计划
- 计划被存储在 PlanningTool 中
- 如果 LLM 未能创建计划,系统创建默认计划
5.2 步骤执行流程
- 系统获取下一个未完成步骤
- 系统选择适当的执行Agent
- 系统准备包含计划状态和当前任务的提示
- Agent执行步骤并返回结果
- 系统将步骤标记为已完成
- 系统检查是否有更多步骤需要执行
5.3 计划完成流程
- 系统检测到没有更多未完成步骤
- 系统获取最终计划状态
- LLM 生成计划执行摘要
- 系统返回完整的执行结果
6. 设计理念分析
6.1 多Agent协作
PlanningFlow 实现了一个灵活的多Agent协作框架:
- 基于类型的Agent选择:根据步骤类型选择专门的Agent
- 执行器列表:允许指定哪些Agent可以执行步骤
- 回退机制:确保总有Agent可用于执行步骤
这种设计允许不同专长的Agent协同工作,共同完成复杂任务。
6.2 计划管理
PlanningFlow 提供了完整的计划生命周期管理:
- 计划创建:使用 LLM 创建结构化计划
- 步骤跟踪:维护每个步骤的状态
- 进度可视化:生成格式化的计划状态文本
- 计划总结:在完成时提供执行摘要
这种设计使得复杂任务可以被分解为可管理的步骤,并提供清晰的执行路径。
6.3 LLM 与Agent集成
PlanningFlow 集成了 LLM 和Agent:
- LLM 用于创造性任务:计划创建和总结
- Agent用于执行任务:步骤执行和交互
- 上下文共享:为Agent提供完整的计划状态
这种设计利用了 LLM 的创造力和Agent的执行能力,实现了高效的任务处理。
7. 使用示例
# 创建Agent
planner = PlanningAgent()
executor = SWEAgent()
# 创建流程
flow = PlanningFlow(
agents={"planner": planner, "executor": executor},
executors=["executor"], # 只使用 executor 执行步骤
primary_agent_key="planner" # 使用 planner 作为主要Agent
)
# 执行流程
result = await flow.execute("创建一个简单的网页爬虫,抓取新闻标题")
print(result)
8. 扩展与定制
PlanningFlow 设计为可扩展和定制:
8.1 Agent选择扩展
可以扩展 get_executor
方法实现更复杂的Agent选择逻辑:
def get_executor(self, step_type: Optional[str] = None) -> BaseAgent:
# 基于步骤类型选择专门的Agent
if step_type == "code":
return self.agents.get("code_agent", self.primary_agent)
elif step_type == "search":
return self.agents.get("search_agent", self.primary_agent)
# 默认行为
return super().get_executor(step_type)
8.2 步骤类型识别扩展
可以增强步骤类型识别逻辑:
# 在 _get_current_step_info 方法中
# 更复杂的步骤类型识别
if "搜索" in step.lower() or "查找" in step.lower():
step_info["type"] = "search"
elif "编写" in step.lower() or "代码" in step.lower():
step_info["type"] = "code"
8.3 计划创建定制
可以定制计划创建提示以生成特定格式的计划:
system_message = Message.system_message(
"创建一个包含以下类型步骤的计划:\n"
"1. [SEARCH] 用于搜索信息的步骤\n"
"2. [CODE] 用于编写代码的步骤\n"
"3. [TEST] 用于测试的步骤\n"
"每个步骤应该清晰、具体,并标明类型。"
)
9. 总结
PlanningFlow 是一个强大而灵活的流程管理框架,专门用于基于计划的任务执行。它的核心优势包括:
- 结构化任务分解:将复杂任务分解为可管理的步骤
- 多Agent协作:协调不同专长的Agent共同完成任务
- 完整的计划管理:从创建到执行到总结的全生命周期管理
- 健壮的错误处理:多层错误处理和恢复机制
- 灵活的扩展性:支持各种定制和扩展
这种设计使 PlanningFlow 特别适合处理复杂的多步骤任务,如软件开发、研究分析和内容创建等。通过将任务分解为明确的步骤,并为每个步骤选择最合适的Agent,PlanningFlow 能够高效地处理各种复杂任务。