开源模型应用落地-LangGraph101-探索 LangGraph人机交互-添加断点(一)
一、前言
在如今这个 AI 技术飞速发展的时代,智能系统的决策透明性和可控性变得越来越受到开发者的关注。LangGraph 作为一款智能体构建框架,它通过一种创新的 “断点机制” 重新定义了人机协作的方式。每当智能体在使用重要工具或执行敏感任务前,系统都会自动暂停并等待人类的许可。这种“可控自主性”的设计不仅解决了传统黑箱模型带来的信任问题,还为安全审计和合规操作等场景打开了新的应用可能性。
本文将深入探讨 LangGraph 的断点功能的技术实现。
二、术语介绍
2.1.LangGraph
是一个基于 LangChain 构建的开源库,旨在为构建基于大语言模型(LLM)的有状态、多智能体应用程序提供一个全新的范式和强大的框架支持。
核心特点
- 支持循环图构建:与传统的基于有向无环图(DAG)的框架不同,LangGraph 允许开发者在图形结构中自由定义循环边和循环节点,这对于设计动态和迭代的智能体工作流程至关重要,能使智能体行为更贴近实际编程场景。
- 自动状态管理:具有自动状态管理能力,可在多个交互中跟踪和持久化信息。随着智能体执行任务,状态会动态更新,确保系统能维护上下文并对新输入做出适当响应。
- 多智能体协调:支持在单个图结构中协调多个智能体,每个智能体都可以有自己的提示、LLM、工具和自定义代码,方便构建多智能体系统,使多个智能体能够有效地协同工作,共同完成复杂任务。
- 灵活性与可定制性:开发者可以灵活定义自己的智能体逻辑和通信协议,根据具体用例构建高度定制化的应用,满足不同场景需求。
- 可扩展性与容错性:设计用于支持大规模多智能体应用的执行,具有强大的架构,能够处理大量交互和复杂工作流。同时,包含了优雅处理错误的机制,确保即使个别智能体遇到问题,应用也能继续运行。
工作原理
- 图结构表示:将应用视为一个有向图,其中每个节点代表一个 LLM 智能体或其他可执行的函数、工具等,边则定义了执行和数据流的方向,连接各个节点。
- 状态更新机制:通过状态图来管理在执行周期中持久化的数据,当数据在节点之间流动时,状态对象会被管理和更新,从而实现跨执行周期的状态跟踪和更新。
核心组件
- 节点(Nodes):是图结构中的执行单元,在其内部依据预设的代码逻辑进行复杂的运算与处理,最终输出一个经过更新后的全新状态。
- 边(Edges):边作为图结构中的连接要素,承担着构建节点之间逻辑关系与信息流向路径的重要职责。从类型上划分,主要涵盖了简单边和条件边这两种形式。简单边建立起节点之间最为基本的连接关系,使得前一个节点完成处理后所输出的状态信息能够顺畅地传输至下一个节点,成为其输入信息的来源,从而保障了信息在各个节点间的线性传递。而条件边则具备更为灵活和智能的特性,当一个节点的处理过程结束后,条件边会基于其内置的判断逻辑,精准地确定该节点后续应当通过哪些输出边来传递信息,极大地提高了系统的运行效率和任务处理的灵活性。
- 状态(State):作为一种共享的数据结构,其重要性不言而喻。在整个系统的运行过程中,状态如同一个公共的信息存储库,保存着应用程序在各个阶段的当前信息
应用场景
- 智能聊天机器人:能够处理各种用户请求,通过多 LLM 智能体协作,处理自然语言查询,提供准确响应,并在不同对话主题间无缝切换,提供连贯一致的用户体验。
- 自主智能体:可创建根据用户输入和预定义逻辑执行任务的自主智能体,执行复杂工作流,与其他系统交互,并动态适应新信息,适用于自动化客户支持、数据处理、系统监控等任务。
- 多智能体系统:在供应链管理、团队协作等场景中,多个智能体可分别管理库存、处理订单、协调交付等,通过 LangGraph 的协调能力,实现高效沟通、信息共享和决策同步,提升整体系统性能。
- 工作流自动化工具:用于设计智能体来处理文档处理、审批工作流、数据分析等任务,通过定义清晰的工作流和利用状态管理,无需人工干预即可执行复杂操作序列,提高工作效率并减少错误。
- 个性化推荐系统:利用多个智能体分析用户行为、偏好等数据,提供更个性化的推荐服务。
类似竞品
- AutoGen:微软推出的行业先驱,核心设计包括用户智能体和助手智能体,特别适用于软件开发领域,在处理代码编排任务时表现出色。它允许用户在智能体间进行互动和手动调整,但对于没有编程背景的用户而言,学习成本较高。
- CrewAI:以简洁直观的操作界面受到推崇,非常适合快速搭建多智能体任务,用户无需深厚的技术背景,只需编写一些简短的提示词即可创建多个智能体并完成课程演示。不过,其在灵活性和定制化方面存在局限,技术社区的支持也相对较弱。
- OpenAI Swarm:专为新手设计,以简化智能体的创建与上下文切换为主要特征,用户只需简单操作即可快速制作演示应用。但它仅支持 OpenAI API,缺乏灵活性,不适合复杂的生产环境,技术社区支持也相对匮乏。
- Magentic-One:微软最新推出的产品,目标是为 AutoGen 提供一个简化的替代方案,尤其适合缺乏编程经验的用户,内置多个智能体用于文件管理、网页浏览和代码编写等功能,支持快速配置和使用。然而,其在对开源 LLMs 的支持上相对复杂,目前的文档和社区支持也未完善。
2.2.添加断点的作用
在创建 LangGraph 智能体时,加入人工干预组件很有帮助,尤其是在智能体使用工具时,在执行某些操作前手动审批可确保操作的合理性。而添加断点是实现人工干预的主要方式,它能在节点执行前中断执行,之后可从断点处继续。
三、前置条件
3.1.基础环境及前置条件
1. 操作系统:无限
3.2.安装依赖
conda create --name langgraph python=3.10
conda activate langgraph
pip install langgraph -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install langchain_openai langchain_community -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install pillow -i https://pypi.tuna.tsinghua.edu.cn/simple
四、技术实现
4.1.输出流程图
# -*- coding:utf-8 -*-
import io
import json
import os
import threading
import traceback
import requests
from typing import Annotated, TypedDict
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from pydantic import BaseModel, Field
from PIL import Image as PILImage
os.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'#你的Open AI Key
gaode_api_key = 'xxxxxxxxxxxxxxxxxxxxxxxxxx'
gaode_domain = "https://restapi.amap.com/v3"
class ToolInput(BaseModel):
region: str = Field(description="地区")
@tool("get_current_weather", return_direct=True, args_schema=ToolInput)
def get_current_weather(region:str):
"""根据传入的城市名查询天气"""
ad_code = None
try:
session = requests.session()
region_response = session.request(
method="GET",
url=f"{gaode_domain}/config/district?key={gaode_api_key}&keywords={region}&subdistrict=0",
headers={"Content-Type": "application/json; charset=utf-8"},
)
region_response.raise_for_status()
region_data = region_response.json()
if region_data.get("info") == "OK":
ad_code = region_data["districts"][0]["adcode"]
except:
traceback.print_exc()
return f"获取{region}代码失败"
try:
if ad_code:
weather_response = session.request(
method="GET",
url=f"{gaode_domain}/weather/weatherInfo?key={gaode_api_key}&city={ad_code}&extensions=all",
headers={"Content-Type": "application/json; charset=utf-8"},
)
weather_response.raise_for_status()
weather_data = weather_response.json()
if weather_data.get("info") == "OK":
result = json.dumps(weather_data, ensure_ascii=False)
return json.dumps(weather_data,ensure_ascii=False)
except:
traceback.print_exc()
return f"获取{region}天气预报信息失败"
tools = [get_current_weather]
class State(TypedDict):
messages: Annotated[list, add_messages]
def chatbot(state: State):
llm = ChatOpenAI(model="gpt-4o-mini",temperature=0,max_tokens=512)
llm_with_tools = llm.bind_tools(tools)
result = llm_with_tools.invoke(state["messages"])
return {"messages": [result]}
def display(image_data):
image = PILImage.open(io.BytesIO(image_data))
image.show()
if __name__ == "__main__":
thread_id = threading.get_ident()
config = {"configurable": {"thread_id": thread_id}}
graph_builder = StateGraph(State)
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_node("tools", ToolNode(tools=tools))
graph_builder.add_conditional_edges("chatbot", tools_condition)
graph_builder.add_edge("tools", "chatbot")
graph_builder.add_edge(START, "chatbot")
memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory,interrupt_before=["tools"],)
display(graph.get_graph().draw_mermaid_png())
调用结果:
4.2.添加断点
# -*- coding:utf-8 -*-
import io
import json
import os
import threading
import traceback
import requests
from typing import Annotated, TypedDict
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from pydantic import BaseModel, Field
os.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'#你的Open AI Key
gaode_api_key = 'xxxxxxxxxxxxxxxxxxxxxxxxxx'
gaode_domain = "https://restapi.amap.com/v3"
class ToolInput(BaseModel):
region: str = Field(description="地区")
@tool("get_current_weather", return_direct=True, args_schema=ToolInput)
def get_current_weather(region:str):
"""根据传入的城市名查询天气"""
ad_code = None
try:
session = requests.session()
region_response = session.request(
method="GET",
url=f"{gaode_domain}/config/district?key={gaode_api_key}&keywords={region}&subdistrict=0",
headers={"Content-Type": "application/json; charset=utf-8"},
)
region_response.raise_for_status()
region_data = region_response.json()
if region_data.get("info") == "OK":
ad_code = region_data["districts"][0]["adcode"]
except:
traceback.print_exc()
return f"获取{region}代码失败"
try:
if ad_code:
weather_response = session.request(
method="GET",
url=f"{gaode_domain}/weather/weatherInfo?key={gaode_api_key}&city={ad_code}&extensions=all",
headers={"Content-Type": "application/json; charset=utf-8"},
)
weather_response.raise_for_status()
weather_data = weather_response.json()
if weather_data.get("info") == "OK":
result = json.dumps(weather_data, ensure_ascii=False)
return json.dumps(weather_data,ensure_ascii=False)
except:
traceback.print_exc()
return f"获取{region}天气预报信息失败"
tools = [get_current_weather]
class State(TypedDict):
messages: Annotated[list, add_messages]
def chatbot(state: State):
llm = ChatOpenAI(model="gpt-4o-mini",temperature=0,max_tokens=512)
llm_with_tools = llm.bind_tools(tools)
result = llm_with_tools.invoke(state["messages"])
return {"messages": [result]}
def chat(messages,config):
for output in graph.stream(messages, config=config):
for value in output.values():
if "messages" in value:
print("Assistant:", value["messages"][-1].content)
def display(image_data):
image = PILImage.open(io.BytesIO(image_data))
image.show()
if __name__ == "__main__":
thread_id = threading.get_ident()
config = {"configurable": {"thread_id": thread_id}}
graph_builder = StateGraph(State)
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_node("tools", ToolNode(tools=tools))
graph_builder.add_conditional_edges("chatbot", tools_condition)
graph_builder.add_edge("tools", "chatbot")
graph_builder.add_edge(START, "chatbot")
memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory,interrupt_before=["tools"],)
messages = {"messages": [("user", '今天广州的天气如何?')]}
chat(messages, config)
try:
user_approval = input("是否调用外部工具去检索实时天气?(yes/no):")
except:
user_approval = "yes"
if user_approval.lower() == "yes":
chat(None, config)
else:
print("用户不允许调用外部工具,请求中断!")
调用结果:
1) 用户允许:
2) 用户不允许:
五、附带说明:
5.1.interrupt_before的作用
是 LangGraph 中用于设置断点的关键参数,作用是在特定节点执行前中断图的运行流程,实现人工介入或其他自定义操作,保障程序运行的安全性、可控性与准确性。
- 中断运行流程:在 LangGraph 运行过程中,当设置
interrupt_before=["action"]
,在调用action
节点之前,整个图的执行会暂停。在实际场景中,若action
节点负责调用外部敏感数据接口,通过设置interrupt_before
,可以避免在未经人工确认的情况下随意调用接口,防止数据泄露或不必要的资源消耗。 - 实现人工介入:在开发智能体调用工具的场景中,可在中断处进行人工审批。在智能体调用搜索引擎工具获取信息前,通过
interrupt_before
中断运行,人工确认搜索关键词是否合理,避免出现不合理的搜索行为,从而让智能体的行为更符合预期。 - 便于调试和监控:在开发和优化 LangGraph 应用时,开发者可以利用
interrupt_before
在关键节点设置断点。在复杂的图结构中,通过在特定工具调用节点前中断,查看当前运行状态、参数设置是否正确,有助于快速定位和解决问题。