当前位置: 首页 > article >正文

LangGraph - Hierarchical Agent Teams

本文翻译整理自 Hierarchical Agent Teams
https://langchain-ai.github.io/langgraph/tutorials/multi_agent/hierarchical_agent_teams/

文章目录

    • 一、前言
    • 二、设置
    • 三、创建工具
    • 四、Helper Utilities
    • 五、定义代理 Team
      • 研究 Team
      • 文档写作Team
    • 六、添加图层


一、前言

在前面的示例(Agent Supervisor)中,我们引入了单个主管节点的概念,用于在不同的工作节点之间路由工作。

但是如果一个工人的工作变得太复杂怎么办?如果工人数量太多怎么办?

对于某些应用程序,如果工作按层次分布,系统可能会更有效。

您可以通过组合不同的子图并创建顶级主管以及中级主管来做到这一点。

为此,让我们构建一个简单的研究助手!该图如下所示:

在这里插入图片描述


AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation

本笔记本的灵感来自Wu等人的论文AutoGen:通过多代理对话启用下一代LLM应用程序。在本笔记本的其余部分,您将:

  1. Define the agents’ tools to access the web and write files
  2. 定义代理访问Web和写入文件的工具
  3. Define some utilities to help create the graph and agents
  4. 定义一些实用程序来帮助创建图和代理
  5. Create and define each team (web research + doc writing)
  6. 创建和定义每个Team (网络研究+文档编写)
  7. Compose everything together.
  8. 把一切组合在一起。

二、设置

首先,让我们安装所需的包并设置API密钥

%%capture --no-stderr
%pip install -U langgraph langchain langchain_openai langchain_experimental

import getpass
import os

def _set_if_undefined(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"Please provide your {var}")

_set_if_undefined("OPENAI_API_KEY")
_set_if_undefined("TAVILY_API_KEY")

设置LangSmith用于LangGraph开发

注册LangSmith以快速发现问题并提高LangGraph项目的性能。LangSmith允许您使用跟踪数据来调试、测试和监控使用LangGraph构建的LLM应用程序 - 在此处阅读有关如何开始的更多信息。


三、创建工具

每个Team 将由一个或多个代理组成,每个代理都有一个或多个工具。下面,定义您的不同Team 要使用的所有工具。

我们将从研究Team 开始。

ResearchTeam 工具

研究Team 工具

研究Team 可以使用搜索引擎和url刮刀在网络上查找信息。请随时在下面添加其他功能以提高Team 绩效!

from typing import Annotated, List

from langchain_community.document_loaders import WebBaseLoader
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.tools import tool

tavily_tool = TavilySearchResults(max_results=5)

@tool
def scrape_webpages(urls: List[str]) -> str:
    """Use requests and bs4 to scrape the provided web pages for detailed information."""
    loader = WebBaseLoader(urls)
    docs = loader.load()
    return "\n\n".join(
        [
            f'<Document name="{doc.metadata.get("title", "")}">\n{doc.page_content}\n</Document>'
            for doc in docs
        ]
    )

API Reference: WebBaseLoaderTavilySearchResultstool


文档编写Team 工具

接下来,我们将提供一些工具供文档编写Team 使用。我们在下面定义了一些基本的文件访问工具。

请注意,这使代理可以访问您的文件系统,这可能是不安全的。我们也没有优化工具描述以提高性能。

from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Dict, Optional

from langchain_experimental.utilities import PythonREPL
from typing_extensions import TypedDict

_TEMP_DIRECTORY = TemporaryDirectory()
WORKING_DIRECTORY = Path(_TEMP_DIRECTORY.name)

@tool
def create_outline(
    points: Annotated[List[str], "List of main points or sections."],
    file_name: Annotated[str, "File path to save the outline."],
) -> Annotated[str, "Path of the saved outline file."]:
    """Create and save an outline."""
    with (WORKING_DIRECTORY / file_name).open("w") as file:
        for i, point in enumerate(points):
            file.write(f"{i + 1}. {point}\n")
    return f"Outline saved to {file_name}"

@tool
def read_document(
    file_name: Annotated[str, "File path to save the document."],
    start: Annotated[Optional[int], "The start line. Default is 0"] = None,
    end: Annotated[Optional[int], "The end line. Default is None"] = None,
) -> str:
    """Read the specified document."""
    with (WORKING_DIRECTORY / file_name).open("r") as file:
        lines = file.readlines()
    if start is not None:
        start = 0
    return "\n".join(lines[start:end])

@tool
def write_document(
    content: Annotated[str, "Text content to be written into the document."],
    file_name: Annotated[str, "File path to save the document."],
) -> Annotated[str, "Path of the saved document file."]:
    """Create and save a text document."""
    with (WORKING_DIRECTORY / file_name).open("w") as file:
        file.write(content)
    return f"Document saved to {file_name}"

@tool
def edit_document(
    file_name: Annotated[str, "Path of the document to be edited."],
    inserts: Annotated[
        Dict[int, str],
        "Dictionary where key is the line number (1-indexed) and value is the text to be inserted at that line.",
    ],
) -> Annotated[str, "Path of the edited document file."]:
    """Edit a document by inserting text at specific line numbers."""

    with (WORKING_DIRECTORY / file_name).open("r") as file:
        lines = file.readlines()

    sorted_inserts = sorted(inserts.items())

    for line_number, text in sorted_inserts:
        if 1 <= line_number <= len(lines) + 1:
            lines.insert(line_number - 1, text + "\n")
        else:
            return f"Error: Line number {line_number} is out of range."

    with (WORKING_DIRECTORY / file_name).open("w") as file:
        file.writelines(lines)

    return f"Document edited and saved to {file_name}"

# Warning: This executes code locally, which can be unsafe when not sandboxed

repl = PythonREPL()

@tool
def python_repl(
    code: Annotated[str, "The python code to execute to generate your chart."],
):
    """Use this to execute python code. If you want to see the output of a value,
    you should print it out with `print(...)`. This is visible to the user."""
    try:
        result = repl.run(code)
    except BaseException as e:
        return f"Failed to execute. Error: {repr(e)}"
    return f"Successfully executed:\n\`\`\`python\n{code}\n\`\`\`\nStdout: {result}"

API Reference: PythonREPL


四、Helper Utilities

当我们想要时,我们将创建一些实用函数以使其更加简洁:

  1. 创建一个工作代理。
  2. 为子图创建主管。

这些将为我们简化最后的图形组合代码,以便更容易看到发生了什么。

from typing import List, Optional
from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI

from langgraph.graph import END, StateGraph, START
from langchain_core.messages import HumanMessage, trim_messages

llm = ChatOpenAI(model="gpt-4o-mini")

trimmer = trim_messages(
    max_tokens=100000,
    strategy="last",
    token_counter=llm,
    include_system=True,
)

def agent_node(state, agent, name):
    result = agent.invoke(state)
    return {
        "messages": [HumanMessage(content=result["messages"][-1].content, name=name)]
    }

def create_team_supervisor(llm: ChatOpenAI, system_prompt, members) -> str:
    """An LLM-based router."""
    options = ["FINISH"] + members
    function_def = {
        "name": "route",
        "description": "Select the next role.",
        "parameters": {
            "title": "routeSchema",
            "type": "object",
            "properties": {
                "next": {
                    "title": "Next",
                    "anyOf": [
                        {"enum": options},
                    ],
                },
            },
            "required": ["next"],
        },
    }
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", system_prompt),
            MessagesPlaceholder(variable_name="messages"),
            (
                "system",
                "Given the conversation above, who should act next?"
                " Or should we FINISH? Select one of: {options}",
            ),
        ]
    ).partial(options=str(options), team_members=", ".join(members))
    return (
        prompt
 | trimmer
 | llm.bind_functions(functions=[function_def], function_call="route")
 | JsonOutputFunctionsParser()
    )

API Reference: JsonOutputFunctionsParserChatPromptTemplateMessagesPlaceholderChatOpenAIHumanMessagetrim_messagesENDStateGraphSTART


五、定义代理 Team

现在我们可以定义我们的等级Team 了。“选择你的球员!”


研究 Team

研究Team 将有一个搜索代理和一个网页抓取“research_agent”作为两个工作节点。让我们创建这些,以及Team 主管。

import functools
import operator

from langchain_core.messages import BaseMessage, HumanMessage
from langchain_openai.chat_models import ChatOpenAI
from langgraph.prebuilt import create_react_agent

# ResearchTeam graph state
class ResearchTeamState(TypedDict):
    # A message is added after each team member finishes
    messages: Annotated[List[BaseMessage], operator.add]
    # The team members are tracked so they are aware of
    # the others' skill-sets
    team_members: List[str]
    # Used to route work. The supervisor calls a function
    # that will update this every time it makes a decision
    next: str

llm = ChatOpenAI(model="gpt-4o")

search_agent = create_react_agent(llm, tools=[tavily_tool])
search_node = functools.partial(agent_node, agent=search_agent, name="Search")

research_agent = create_react_agent(llm, tools=[scrape_webpages])
research_node = functools.partial(agent_node, agent=research_agent, name="WebScraper")

supervisor_agent = create_team_supervisor(
    llm,
    "You are a supervisor tasked with managing a conversation between the"
    " following workers:  Search, WebScraper. Given the following user request,"
    " respond with the worker to act next. Each worker will perform a"
    " task and respond with their results and status. When finished,"
    " respond with FINISH.",
    ["Search", "WebScraper"],
)

API Reference: BaseMessageHumanMessageChatOpenAIcreate_react_agent


现在我们已经创建了必要的组件,定义它们的交互很容易。将节点添加到团队图中,并定义确定转换标准的边。

research_graph = StateGraph(ResearchTeamState)
research_graph.add_node("Search", search_node)
research_graph.add_node("WebScraper", research_node)
research_graph.add_node("supervisor", supervisor_agent)

# Define the control flow
research_graph.add_edge("Search", "supervisor")
research_graph.add_edge("WebScraper", "supervisor")
research_graph.add_conditional_edges(
    "supervisor",
    lambda x: x["next"],
    {"Search": "Search", "WebScraper": "WebScraper", "FINISH": END},
)

research_graph.add_edge(START, "supervisor")
chain = research_graph.compile()

# The following functions interoperate between the top level graph state
# and the state of the research sub-graph
# this makes it so that the states of each graph don't get intermixed
def enter_chain(message: str):
    results = {
        "messages": [HumanMessage(content=message)],
    }
    return results

research_chain = enter_chain | chain


from IPython.display import Image, display

display(Image(chain.get_graph(xray=True).draw_mermaid_png()))

在这里插入图片描述


我们可以直接给这个团队工作。在下面试试。

for s in research_chain.stream(
    "when is Taylor Swift's next tour?", {"recursion_limit": 100}
):
    if "__end__" not in s:
        print(s)
        print("---")

文档写作Team

使用类似的方法在下面创建文档编写团队。这一次,我们将授予每个代理访问不同文件编写工具的权限。

请注意,我们在这里向我们的代理提供文件系统访问权限,这在所有情况下都不安全。

import operator
from pathlib import Path


# Document writing team graph state
class DocWritingState(TypedDict):
    # This tracks the team's conversation internally
    messages: Annotated[List[BaseMessage], operator.add]
    # This provides each worker with context on the others' skill sets
    team_members: str
    # This is how the supervisor tells langgraph who to work next
    next: str
    # This tracks the shared directory state
    current_files: str


# This will be run before each worker agent begins work
# It makes it so they are more aware of the current state
# of the working directory.
def prelude(state):
    written_files = []
    if not WORKING_DIRECTORY.exists():
        WORKING_DIRECTORY.mkdir()
    try:
        written_files = [
            f.relative_to(WORKING_DIRECTORY) for f in WORKING_DIRECTORY.rglob("*")
        ]
    except Exception:
        pass
    if not written_files:
        return {**state, "current_files": "No files written."}
    return {
        **state,
        "current_files": "\nBelow are files your team has written to the directory:\n"
        + "\n".join([f" - {f}" for f in written_files]),
    }


llm = ChatOpenAI(model="gpt-4o")

doc_writer_agent = create_react_agent(
    llm, tools=[write_document, edit_document, read_document]
)
# Injects current directory working state before each call
context_aware_doc_writer_agent = prelude | doc_writer_agent
doc_writing_node = functools.partial(
    agent_node, agent=context_aware_doc_writer_agent, name="DocWriter"
)

note_taking_agent = create_react_agent(llm, tools=[create_outline, read_document])
context_aware_note_taking_agent = prelude | note_taking_agent
note_taking_node = functools.partial(
    agent_node, agent=context_aware_note_taking_agent, name="NoteTaker"
)

chart_generating_agent = create_react_agent(llm, tools=[read_document, python_repl])
context_aware_chart_generating_agent = prelude | chart_generating_agent
chart_generating_node = functools.partial(
    agent_node, agent=context_aware_note_taking_agent, name="ChartGenerator"
)

doc_writing_supervisor = create_team_supervisor(
    llm,
    "You are a supervisor tasked with managing a conversation between the"
    " following workers:  {team_members}. Given the following user request,"
    " respond with the worker to act next. Each worker will perform a"
    " task and respond with their results and status. When finished,"
    " respond with FINISH.",
    ["DocWriter", "NoteTaker", "ChartGenerator"],
)

通过创建对象本身,我们可以形成图形。

# Create the graph here:
# Note that we have unrolled the loop for the sake of this doc
authoring_graph = StateGraph(DocWritingState)
authoring_graph.add_node("DocWriter", doc_writing_node)
authoring_graph.add_node("NoteTaker", note_taking_node)
authoring_graph.add_node("ChartGenerator", chart_generating_node)
authoring_graph.add_node("supervisor", doc_writing_supervisor)

# Add the edges that always occur
authoring_graph.add_edge("DocWriter", "supervisor")
authoring_graph.add_edge("NoteTaker", "supervisor")
authoring_graph.add_edge("ChartGenerator", "supervisor")

# Add the edges where routing applies
authoring_graph.add_conditional_edges(
    "supervisor",
    lambda x: x["next"],
    {
        "DocWriter": "DocWriter",
        "NoteTaker": "NoteTaker",
        "ChartGenerator": "ChartGenerator",
        "FINISH": END,
    },
)

authoring_graph.add_edge(START, "supervisor")
chain = authoring_graph.compile()


# The following functions interoperate between the top level graph state
# and the state of the research sub-graph
# this makes it so that the states of each graph don't get intermixed
def enter_chain(message: str, members: List[str]):
    results = {
        "messages": [HumanMessage(content=message)],
        "team_members": ", ".join(members),
    }
    return results


# We reuse the enter/exit functions to wrap the graph
authoring_chain = (
    functools.partial(enter_chain, members=authoring_graph.nodes)
    | authoring_graph.compile()
)

from IPython.display import Image, display

display(Image(chain.get_graph().draw_mermaid_png()))

在这里插入图片描述


for s in authoring_chain.stream(
    "Write an outline for poem and then write the poem to disk.",
    {"recursion_limit": 100},
):
    if "__end__" not in s:
        print(s)
        print("---")
{'supervisor': {'next': 'NoteTaker'}}
---
{'NoteTaker': {'messages': [HumanMessage(content='The poem has been written and saved to "poem.txt".', name='NoteTaker')]}}
---
{'supervisor': {'next': 'FINISH'}}
---

六、添加图层

在这个设计中,我们执行了一个自上而下的规划策略。我们已经创建了两个图表,但是我们必须决定如何在两者之间分配工作。

我们将创建第三个图来协调前两个图,并添加一些连接器来定义如何在不同图之间共享这个顶级状态。

from langchain_core.messages import BaseMessage
from langchain_openai.chat_models import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o")

supervisor_node = create_team_supervisor(
    llm,
    "You are a supervisor tasked with managing a conversation between the"
    " following teams: {team_members}. Given the following user request,"
    " respond with the worker to act next. Each worker will perform a"
    " task and respond with their results and status. When finished,"
    " respond with FINISH.",
    ["ResearchTeam", "PaperWritingTeam"],
)

API Reference: BaseMessageChatOpenAI


# Top-level graph state
class State(TypedDict):
    messages: Annotated[List[BaseMessage], operator.add]
    next: str

def get_last_message(state: State) -> str:
    return state["messages"][-1].content

def join_graph(response: dict):
    return {"messages": [response["messages"][-1]]}

# Define the graph.
super_graph = StateGraph(State)
# First add the nodes, which will do the work
super_graph.add_node("ResearchTeam", get_last_message | research_chain | join_graph)
super_graph.add_node(
    "PaperWritingTeam", get_last_message | authoring_chain | join_graph
)
super_graph.add_node("supervisor", supervisor_node)

# Define the graph connections, which controls how the logic
# propagates through the program
super_graph.add_edge("ResearchTeam", "supervisor")
super_graph.add_edge("PaperWritingTeam", "supervisor")
super_graph.add_conditional_edges(
    "supervisor",
    lambda x: x["next"],
    {
        "PaperWritingTeam": "PaperWritingTeam",
        "ResearchTeam": "ResearchTeam",
        "FINISH": END,
    },
)

super_graph.add_edge(START, "supervisor")
super_graph = super_graph.compile()


from IPython.display import Image, display

display(Image(super_graph.get_graph().draw_mermaid_png()))

在这里插入图片描述


for s in super_graph.stream(
    {
        "messages": [
            HumanMessage(
                content="Write a brief research report on the North American sturgeon. Include a chart."
            )
        ],
    },
    {"recursion_limit": 150},
):
    if "__end__" not in s:
        print(s)
        print("---")

{'supervisor': {'next': 'ResearchTeam'}}
---
{'ResearchTeam': {'messages': [HumanMessage(content="Unfortunately, ...  documents.", name='WebScraper')]}}
---
{'supervisor': {'next': 'PaperWritingTeam'}}

2024-10-18(五)


http://www.kler.cn/news/354982.html

相关文章:

  • 京东背调有病吧......
  • 开源节流-2024年10月17日
  • 【秋招笔试】10.13拼多多(已改编)秋招-三语言题解
  • LLM大模型的评测维度有哪些?
  • 试用cursor的简单的记录
  • mysql迁移到达梦的修改点
  • 企业电子印章主要通过以下几种方式进行防伪
  • DIFY上使用多种大语言模型(MindCraft API)
  • AI绘图大模型 Stable Diffusion 使用详解
  • 利用高德API获取整个城市的公交路线并可视化(六)
  • k8s worker 节点关机 sts 管理的 pod 无法迁移
  • 如何设计开发RTSP直播播放器?
  • 基于直播美颜SDK的实时美颜平台开发指南
  • 如何启动hive
  • 软件分享|Notepad-- 文本编辑器
  • 基于逻辑回归实现乳腺癌预测
  • Rocky linux 修改ip地址, rocky服务器修改静态地址, rocky虚拟机修改ip
  • ai字幕用什么软件制作?6款视频加字幕工具分享!
  • 29.第二阶段x86游戏实战2-遍历周围-花指令与二叉树数据结构(有如何阅读vm代码混淆代码)
  • 【微信小程序_7_WXML 模板语法 - 按钮事件和输入框事件绑定】