LangGraph 状态图详解:从定义到 Reducer 实践
状态定义:节点间的共享数据背包
在 LangGraph 中,状态是节点之间传递数据的“共享背包”。每个节点接收当前状态后,返回一个局部更新(state update),在 super-state 边界处,这些更新会被合并回状态,供下游节点读取。这种机制让多节点的协作变得高效且有序。

状态的结构(schema)必须在构建图之前明确。最基础且最常用的方式是使用 TypedDict:
from typing_extensions import TypedDict
class State(TypedDict):
state_value1: int
state_value2: str
# ...
此外,LangGraph 也支持用 Pydantic 的 BaseModel 定义状态,但在实际项目中 TypedDict 更流行——主要因为它原生支持 Annotated[type, reducer] 字段绑定 reducer 的写法(具体用法后文详述)。
真实项目中,很少从头定义状态,而是直接继承内置的 MessagesState:
from langgraph.graph import MessagesState
class CleanerState(MessagesState):
# 在此处定义额外字段
column_decisions: dict[str, Decision]
schema_info: dict
MessagesState 本质上是一个 TypedDict,但它内置了 messages 字段和对应的 reducer(add_messages)。这个 reducer 会自动累积人类、Agent 和工具调用产生的 Message 对象。简单来说,继承它就自动获得了消息历史保存能力。
图的构造与调用:从 builder 到 invoke
LangGraph 将 Agent 工作流建模为状态图,在 SDK 中使用 StateGraph 作为构造起点:
from langgraph.graph import StateGraph, START, END
# 1. 用 state schema 实例化 builder
builder = StateGraph(State)
# 2. 添加节点(每个节点是 state -> partial state 的函数)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
# 3. 添加边(决定节点间的执行流向)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_2", END)
# 4. 编译
graph = builder.compile()
# 可以使用IPython来可视化拓扑结构
# display(Image(graph.get_graph(xray=True).draw_mermaid_png()))
compile() 返回一个可调用对象。执行图时,传入初始状态:
messages = [HumanMessage(content="计算如下问题:将3与24相加。将其结果乘以6,最后除以3")]
graph.invoke({"messages": messages})
注意,graph.invoke() 传入的不是一句问题,而是整个状态的初始值。它启动的是整个图的执行,而不是单次 LLM 调用——这是 LangGraph 与裸 LLM API 在调用范式上的根本区别。即使图里只有一个 LLM 节点,也必须以这种 state-driven 方式触发;图越复杂,初始状态承载的控制信息就越多(例如配置开关、上下文数据、外部输入等)。
BSP 执行模型与 super-step 详解
LangGraph 采用 BSP(整体同步并行,Bulk Synchronous Parallel)模型来组织节点执行。这个模型决定了后续关于 Reducer、交换律和并发安全的讨论。
几个核心要点:
1. 节点返回 state update
每个节点函数接收当前状态,返回一个部分更新:
def node_1(state):
# state["state_value"]是当前值
return {"state_value_1": 2} # 这是局部更新,只返回更新的部分,不是完整state
LangGraph 拿到更新后不会立刻应用,而是先缓存起来,等到 super-step 边界处,再通过 reducer 统一合并。这就是 reducer 机制存在的前提——状态的更新永远走 reducer,没有“直接赋值”这条路。
2. 执行被划分为离散的 super-step
图的执行不是逐节点顺序推进,而是被切分成一系列离散的同步阶段,叫 super-step。在每个 super-step 内:
- 所有被本步触发的节点逻辑上并行执行
- 各自返回的 update 暂存,不立即合并
- super-step 结束后,所有 update 通过 reducer 合并进 state,进入下一个 checkpoint
- 然后开始下一个 super-step
3. 同 super-step 内的执行顺序未定义
一个 super-step 内同时触发的节点,完成顺序是未定义的——这是后续所有 reducer 设计的讨论起点。
考虑这样的结构:
flowchart LR
A["node 1"]
B["node 2"]
C["node 3"]
D[END]
A --> B
A --> C
B --> D
C --> D
当 node 1 完成后,node 2 和 node 3 就在同一个 super-step 被触发。但 LangGraph 无法保证谁先运行。这次可能是 node 2 先,下次可能是 node 3 先。
如果 node 2 和 node 3 都对同一个 state key 写入了 update,LangGraph 在 super-step 边界处必须把两个 update 合并成一个值。而“如何合并”的逻辑就是 Reducer 的核心。
Reducer:状态更新的核心机制
Reducer 是一个输入多个状态、输出单个状态的映射,通常被用作状态更新逻辑本身。它的逻辑可以理解为:
for k in update.keys():
new_state[k] = reducer(current_state[k], update[k])
其中 k 是状态值的 key(假设状态是一个 TypedDict)。
如果没有设置 reducer,那么默认就是用新值直接替换旧值:
for k in update.keys():
new_state[k] = update[k]
累积语义型 Reducer:保留历史信息
当新值出现,但旧值仍有保留价值时,就需要累积语义。这类场景的标志性特征是:LLM 或下游节点需要看到“历史”,而不只是最近一次的状态。
常用方法:
- 单调累积:
Annotated[list, operator.add] - 带语义累积:
add_messages、自定义滑窗(比如保留最近 10 次记录)、按 key 去重(upsert)
下面是一个原生 Python 实现信息累积的例子:
from typing import Annotated
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages
class MessagesState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
聚合语义型 Reducer:整合多个贡献
当多个节点尝试写入同一个 key 时,需要一种逻辑来整合多个结果。通常情况下,这种 reducer 适用于多个节点各自贡献结果的不同部分。
同时,这类 reducer 的设计必须满足交换律:
reducer(reducer(init, update_a), update_b) == reducer(reducer(init, update_b), update_a)
也就是说,合并结果不应当受执行顺序的影响。
用例:并行 schema profiling
EDA 启动时,三个 sub-agent 并行对同一份数据从不同维度做画像,各自只写入 column_profile 的不同维度,不碰别人的输出。
flowchart TD
A[START]
B["Assign profiling tasks"]
C["dtype_agent
推断列数据类型"]
D["missing_agent
计算缺失率"]
E["cardinality_agent
计算唯一值数"]
F["Merge (through reducer) & END"]
A --> B
B --> C
B --> D
B --> E
C --> F
D --> F
E --> F
三个 agent 同属一个 super-step,并行 fan-in 同一个 state key:
class EDAState(MessagesState):
column_profile: Annotated[dict, ???] # 待定 reducer
三个 agent 的 update 分别为:
A = {"age": {"dtype": "int32"}, "city": {"dtype": "string"}}
B = {"age": {"missing_rate": 0.03}, "city": {"missing_rate": 0.00}}
C = {"age": {"n_unique": 87}, "city": {"n_unique": 142}}
期望合并结果:
{
"age": {"dtype": "int32", "missing_rate": 0.03, "n_unique": 87},
"city": {"dtype": "string", "missing_rate": 0.00, "n_unique": 142},
}
错误尝试:浅合并
直觉上,字典合并就是 | 或 {**a, **b}:
def shallow_merge(old: dict, new: dict) -> dict:
return {**old, **new}
因为 LangGraph 不保证 A/B/C 的合并顺序,手动测试两种到达顺序:
# 顺序 1:A → B → C
shallow_merge(shallow_merge(shallow_merge({}, A), B), C)
# {"age": {"n_unique": 87}, "city": {"n_unique": 142}}
# 顺序 2:C → A → B
shallow_merge(shallow_merge(shallow_merge({}, C), A), B)
# {"age": {"missing_rate": 0.03}, "city": {"missing_rate": 0.00}}
两个问题同时暴露出来:
- 数据丢失:
{**a, **b}在叶子层用后写覆盖,已有子树被整体覆盖掉。 - 结果依赖到达顺序:每次运行丢失的字段不同——这是典型的 Heisenbug:本地能跑通,生产偶发出错。相比稳定地丢数据,这种顺序依赖的丢失更危险,因为单元测试可能通过,集成测试偶尔失败,定位成本极高。
正确实现:深合并
def deep_merge(old: dict, new: dict) -> dict:
result = dict(old)
for k, v in new.items():
if isinstance(v, dict) and isinstance(result.get(k), dict):
result[k] = deep_merge(result[k], v)
else:
result[k] = v
return result
class EDAState(MessagesState):
column_profile: Annotated[dict, deep_merge]
任意到达顺序(A→B→C、C→A→B、B→C→A……)合并结果都是期望的完整 profile,交换律成立。
关键洞察:合并粒度需匹配业务的“职责边界”
浅合并失败、深合并成功,本质差异不在递归深度本身,而在于 reducer 的合并粒度是否对齐了业务真实的职责边界:
- 三个 agent 在顶层 key(
"age"、"city")上有重叠 - 但在叶子层(
dtype/missing_rate/n_unique)上不重叠
浅合并在顶层就停止递归,把“顶层重叠”误判为冲突,用后写覆盖处理,制造了顺序依赖。深合并继续递归到叶子层,正确识别出“职责边界在更深处不重叠”,于是无冲突地合并,自然满足交换律。
所以,聚合型 reducer 的设计核心,就是让合并粒度匹配业务上真实的职责边界。边界划得对,并发就安全;边界划错(在还有非冲突合并空间时过早覆盖),交换律就被破坏。
仲裁语义型 Reducer:处理冲突写入
当多个节点对同一个值给出真正冲突的写入意图时,就需要业务规则来决定保留哪个。典型场景:
- HITL 中人类决策和 Agent 决策
- 多模型聚合时依据置信度选择行为
- 时间序列行为取最新值
⚠️仲裁型 Reducer 逻辑复杂性
仲裁型 Reducer 的函数体通常只有几行 if-else,但其实际复杂度来自业务逻辑本身。以下是其与聚合型 Reducer 的对比:
| 维度 | 聚合型 | 仲裁型 |
|---|---|---|
| 判定规则来源 | 数据结构 | 业务语义(必须人工编码) |
| 跨项目复用性 | 高 | 几乎没有 |
| 结构要求 | 裸数据即可 | 必须携带元数据 |
| 交换律保证 | 边界清晰 | 依赖人工编码精确性 |
| 操作对象 | 无限制,允许重叠 | 必然重叠并产生冲突 |
用例:数据清洗 sub-agent 的 HITL 行为覆写
以数据清洗 sub-agent 为例:
flowchart TD
A[START]
B["Route by column"]
C["missing_value_agent"]
D["type_inference_agent"]
H["human_review (HITL)"]
E["Merge (through reducer) & Next step"]
A --> B
B --> C
B --> D
B -.HITL interrupt.-> H
C --> E
D --> E
H --> E
业务规则:
- 人类一旦做出决定,Agent 不允许覆盖对应行为
- 多个 Agent 对同一列给出不同建议时,应当存在确定性的胜出规则
状态值设计:
class CleanerState(MessagesState):
column_decisions: Annotated[dict[str, Decision], ]
column_decisions 是一个字典,内含对每一列的处理决策。仲裁规则的应用层面也是对单列的:
def arbitrate_decisions(old: dict, new: dict) -> dict:
result = dict(old)
for col, decision in new.items():
result[col] = arbitrate_single(result.get(col), decision)
return result
以下是 arbitrate_single 的两种可能方式。
错误尝试:朴素 Human 优先
最朴素的想法是直接返回新旧中属于 Human 的行为:
def arbitrate_single(old, new):
if new.get("source") == "human":
return new
if old and old.get("source") == "human":
return old
return new
考虑测试样例:
A = {"source": "agent", "op": "fill_mean"}
B = {"source": "agent", "op": "fill_median"}
C = {"source": "human", "op": "drop"}
检查交换律:
- 当 C 参与时,逻辑正确:始终返回
{"source": "human"} - 当 C 不参与时,可能出现:
arbitrate_single(A, B) -> B而arbitrate_single(B, A) -> A,结果不同,交换律被破坏。
所以当只有 Agent 时,行为就不确定。
正确尝试:新增置信度以显式建模业务逻辑
将字段更新为如下结构:
A = {"source": "agent", "op": "fill_mean", "confidence": 0.7}
B = {"source": "agent", "op": "fill_median", "confidence": 0.9}
C = {"source": "human", "op": "drop"} # human 无需 confidence
更新 Reducer 实现:
def arbitrate_single(old, new):
# 规则1:强制Human行为最优先
if new.get("source") == "human":
return new
if old and old.get("source") == "human":
return old
# 规则2:无人类介入时,高置信度优先
if old is None:
return new
if new["confidence"] != old["confidence"]:
return new if new["confidence"] > old["confidence"] else old
# 规则3:置信度相等时,强制规定偏好
return new if new["op"] < old["op"] else old
这样一来,任何到达顺序下的结果都是一致的——交换律得到保证,系统行为也变得可预测。
