理解langgraph.graph.StateGraph中 State 的 Annotated 以函数作为元数据(meta)如何影响State传递
LangChain 编程的一大创新是引入管道化表达式(Pipeline Expressions),通过链式组合不同的模块来构建 LLM 任务流。
为了实现这种链式编程,LangChain 通过 Runnable
统一封装模块,使它们提供标准化接口(如 invoke()
、batch()
、stream()
)
通过运算符(|
)实现模块间的数据传递,“|” 像一数据管道,本质上仍然是 invoke()
逐步执行。
langgraph.graph.StateGraph 更是引入图论、基于图论,通过节点(node)定义不同的任务,通过边(Edge)使数据在多个任务之间流转和更新。
V0.3 版的聊天机器人示例代码中,State就是通过Edge流转。里面的StateMessage通过Annotated中引用的add_message实现了自动叠加历史消息的逻辑。
为了理解add_message如何影响State,我们先用一个更直观的例子来理解。
以下代码展示的是一个有三个节点的工作流,数据从一个节点流入下一个节点的过程中,会被Annotated中元数据的函数修改,模拟“带历史消息”的聊天。
因为数据(state)是从一个节点流入另一个节点,连接节点的是边(edge),为了形象,我们用管道来描述,即edge是数据流通的管道。
为了展示数据是在管道中被修改的,我们分别把节点收到的state、节点输出的state、管道中的state分别打印出来。
为了方便演示,我们定义State的结构包含2个成员:foo 表示数据要流向的节点名称,bar 表示本环节大模型api接口返回的结果。 Annotated 的bar_meta_function 对bar添加的逻辑是自动把之前的结果拼接,使得下一个节点收到的bar值是已叠加了历史消息的bar。
from langgraph.graph import StateGraph, START, END
from typing import Annotated, TypedDict, Sequence
# 自定义累加函数
def foo_meta_function(existing_foo, new_foo):
print(f"在连接{existing_foo} - {new_foo}的管道中,foo_meta_function被调用 - [上一节名称是]:{existing_foo} + [下一节名称是]:{new_foo}")
return new_foo
def bar_meta_function(existing_bar, new_bar):
print(f"在管道中,bar_meta_function被调用 - [上一节点接收到的state中的bar]:{existing_bar} + [上一节点return值中的bar]:{new_bar}\n")
update_bar = existing_bar + new_bar
return update_bar
# 定义 State 结构
class State(TypedDict):
foo: Annotated[str, foo_meta_function]
bar: Annotated[Sequence[str], bar_meta_function] # 自动累加
# 节点 A
def node_a(state: State) -> State:
print('node_a收到的state', state)
new_state = {**state, "foo": "step_2", "bar": ["a"]} # 先字典解包,然后指定的成员用新值覆盖。创建新的state而不是直接修改原state,这样可避免在多线并行等特殊场景state被污染。
print('node_a输出的state', new_state, '\n')
return new_state
# 节点 B
def node_b(state: State) -> State:
print('node_b收到的state', state)
new_state = {**state, "foo": "step_3", "bar": ["b"]} # 返回新 state
print('node_b输出的state', new_state, '\n')
return new_state
# 节点 C
def node_c(state: State) -> State:
print('node_c收到的state', state)
new_state = {**state, "foo": "#end#", "bar": ["c"]} # 返回新 state
print('node_c输出的state', new_state, '\n')
return new_state
# 构建 StateGraph
workflow = StateGraph(State)
workflow.add_node("node_a", node_a)
workflow.add_node("node_b", node_b)
workflow.add_node("node_c", node_c)
# 一共四条边(含连接START 和 连接END 的边)
workflow.set_entry_point("node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", "node_c")
workflow.add_edge("node_c", END)
graph = workflow.compile()
# 运行
initial_state = {"foo": "step_1", "bar": ['000']}
result = graph.invoke(initial_state)
print("最终结果:", result)
输出结果如下:
在连接 - step_1的管道中,foo_meta_function被调用 - [上一节名称是]: + [下一节名称是]:step_1
在管道中,bar_meta_function被调用 - [上一节点接收到的state中的bar]:[] + [上一节点return值中的bar]:['000']
node_a收到的state {'foo': 'step_1', 'bar': ['000']}
node_a输出的state {'foo': 'step_2', 'bar': ['a']}
在连接step_1 - step_2的管道中,foo_meta_function被调用 - [上一节名称是]:step_1 + [下一节名称是]:step_2
在管道中,bar_meta_function被调用 - [上一节点接收到的state中的bar]:['000'] + [上一节点return值中的bar]:['a']
node_b收到的state {'foo': 'step_2', 'bar': ['000', 'a']}
node_b输出的state {'foo': 'step_3', 'bar': ['b']}
在连接step_2 - step_3的管道中,foo_meta_function被调用 - [上一节名称是]:step_2 + [下一节名称是]:step_3
在管道中,bar_meta_function被调用 - [上一节点接收到的state中的bar]:['000', 'a'] + [上一节点return值中的bar]:['b']
node_c收到的state {'foo': 'step_3', 'bar': ['000', 'a', 'b']}
node_c输出的state {'foo': '#end#', 'bar': ['c']}
在连接step_3 - #end#的管道中,foo_meta_function被调用 - [上一节名称是]:step_3 + [下一节名称是]:#end#
在管道中,bar_meta_function被调用 - [上一节点接收到的state中的bar]:['000', 'a', 'b'] + [上一节点return值中的bar]:['c']
最终结果: {'foo': '#end#', 'bar': ['000', 'a', 'b', 'c']}