游乐游手机版
首页/AI教程/文章详情

LangGraph状态图与状态转换全面解析

时间:2026-06-01 18:33
LangGraph 状态图详解:从定义到 Reducer 实践 状态定义:节点间的共享数据背包 在 LangGraph 中,状态是节点之间传递数据的“共享背包”。每个节点接收当前状态后,返回一个局部更新(state update),在 super-state 边界处,这些更新会被合并回状态,供下游节

LangGraph 状态图详解:从定义到 Reducer 实践

状态定义:节点间的共享数据背包

在 LangGraph 中,状态是节点之间传递数据的“共享背包”。每个节点接收当前状态后,返回一个局部更新(state update),在 super-state 边界处,这些更新会被合并回状态,供下游节点读取。这种机制让多节点的协作变得高效且有序。

LangGraph: 状态图与状态转换

状态的结构(schema)必须在构建图之前明确。最基础且最常用的方式是使用 TypedDict

from typing_extensions import TypedDict

class State(TypedDict):
    state_value1: int
    state_value2: str
    # ...

此外,LangGraph 也支持用 Pydantic 的 BaseModel 定义状态,但在实际项目中 TypedDict 更流行——主要因为它原生支持 Annotated[type, reducer] 字段绑定 reducer 的写法(具体用法后文详述)。

真实项目中,很少从头定义状态,而是直接继承内置的 MessagesState

from langgraph.graph import MessagesState

class CleanerState(MessagesState):
    # 在此处定义额外字段
    column_decisions: dict[str, Decision]
    schema_info: dict

MessagesState 本质上是一个 TypedDict,但它内置了 messages 字段和对应的 reducer(add_messages)。这个 reducer 会自动累积人类、Agent 和工具调用产生的 Message 对象。简单来说,继承它就自动获得了消息历史保存能力。

图的构造与调用:从 builder 到 invoke

LangGraph 将 Agent 工作流建模为状态图,在 SDK 中使用 StateGraph 作为构造起点:

from langgraph.graph import StateGraph, START, END

# 1. 用 state schema 实例化 builder
builder = StateGraph(State)

# 2. 添加节点(每个节点是 state -> partial state 的函数)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)

# 3. 添加边(决定节点间的执行流向)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_2", END)

# 4. 编译
graph = builder.compile()

# 可以使用IPython来可视化拓扑结构
# display(Image(graph.get_graph(xray=True).draw_mermaid_png()))

compile() 返回一个可调用对象。执行图时,传入初始状态:

messages = [HumanMessage(content="计算如下问题:将3与24相加。将其结果乘以6,最后除以3")]
graph.invoke({"messages": messages})

注意,graph.invoke() 传入的不是一句问题,而是整个状态的初始值。它启动的是整个图的执行,而不是单次 LLM 调用——这是 LangGraph 与裸 LLM API 在调用范式上的根本区别。即使图里只有一个 LLM 节点,也必须以这种 state-driven 方式触发;图越复杂,初始状态承载的控制信息就越多(例如配置开关、上下文数据、外部输入等)。

BSP 执行模型与 super-step 详解

LangGraph 采用 BSP(整体同步并行,Bulk Synchronous Parallel)模型来组织节点执行。这个模型决定了后续关于 Reducer、交换律和并发安全的讨论。

几个核心要点:

1. 节点返回 state update

每个节点函数接收当前状态,返回一个部分更新:

def node_1(state):
    # state["state_value"]是当前值
    return {"state_value_1": 2}  # 这是局部更新,只返回更新的部分,不是完整state

LangGraph 拿到更新后不会立刻应用,而是先缓存起来,等到 super-step 边界处,再通过 reducer 统一合并。这就是 reducer 机制存在的前提——状态的更新永远走 reducer,没有“直接赋值”这条路。

2. 执行被划分为离散的 super-step

图的执行不是逐节点顺序推进,而是被切分成一系列离散的同步阶段,叫 super-step。在每个 super-step 内:

  • 所有被本步触发的节点逻辑上并行执行
  • 各自返回的 update 暂存,不立即合并
  • super-step 结束后,所有 update 通过 reducer 合并进 state,进入下一个 checkpoint
  • 然后开始下一个 super-step
3. 同 super-step 内的执行顺序未定义

一个 super-step 内同时触发的节点,完成顺序是未定义的——这是后续所有 reducer 设计的讨论起点。

考虑这样的结构:

flowchart LR
    A["node 1"]
    B["node 2"]
    C["node 3"]
    D[END]
    A --> B
    A --> C
    B --> D
    C --> D

node 1 完成后,node 2node 3 就在同一个 super-step 被触发。但 LangGraph 无法保证谁先运行。这次可能是 node 2 先,下次可能是 node 3 先。

如果 node 2node 3 都对同一个 state key 写入了 update,LangGraph 在 super-step 边界处必须把两个 update 合并成一个值。而“如何合并”的逻辑就是 Reducer 的核心。

Reducer:状态更新的核心机制

Reducer 是一个输入多个状态、输出单个状态的映射,通常被用作状态更新逻辑本身。它的逻辑可以理解为:

for k in update.keys():
    new_state[k] = reducer(current_state[k], update[k])

其中 k 是状态值的 key(假设状态是一个 TypedDict)。

如果没有设置 reducer,那么默认就是用新值直接替换旧值:

for k in update.keys():
    new_state[k] = update[k]

累积语义型 Reducer:保留历史信息

当新值出现,但旧值仍有保留价值时,就需要累积语义。这类场景的标志性特征是:LLM 或下游节点需要看到“历史”,而不只是最近一次的状态。

常用方法:

  • 单调累积:Annotated[list, operator.add]
  • 带语义累积:add_messages、自定义滑窗(比如保留最近 10 次记录)、按 key 去重(upsert)

下面是一个原生 Python 实现信息累积的例子:

from typing import Annotated
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages

class MessagesState(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

聚合语义型 Reducer:整合多个贡献

当多个节点尝试写入同一个 key 时,需要一种逻辑来整合多个结果。通常情况下,这种 reducer 适用于多个节点各自贡献结果的不同部分。

同时,这类 reducer 的设计必须满足交换律:

reducer(reducer(init, update_a), update_b) == reducer(reducer(init, update_b), update_a)

也就是说,合并结果不应当受执行顺序的影响。

用例:并行 schema profiling

EDA 启动时,三个 sub-agent 并行对同一份数据从不同维度做画像,各自只写入 column_profile 的不同维度,不碰别人的输出。

flowchart TD
    A[START]
    B["Assign profiling tasks"]
    C["dtype_agent
推断列数据类型"] D["missing_agent
计算缺失率"] E["cardinality_agent
计算唯一值数"] F["Merge (through reducer) & END"] A --> B B --> C B --> D B --> E C --> F D --> F E --> F

三个 agent 同属一个 super-step,并行 fan-in 同一个 state key:

class EDAState(MessagesState):
    column_profile: Annotated[dict, ???]  # 待定 reducer

三个 agent 的 update 分别为:

A = {"age": {"dtype": "int32"}, "city": {"dtype": "string"}}
B = {"age": {"missing_rate": 0.03}, "city": {"missing_rate": 0.00}}
C = {"age": {"n_unique": 87}, "city": {"n_unique": 142}}

期望合并结果:

{
    "age": {"dtype": "int32", "missing_rate": 0.03, "n_unique": 87},
    "city": {"dtype": "string", "missing_rate": 0.00, "n_unique": 142},
}
错误尝试:浅合并

直觉上,字典合并就是 |{**a, **b}

def shallow_merge(old: dict, new: dict) -> dict:
    return {**old, **new}

因为 LangGraph 不保证 A/B/C 的合并顺序,手动测试两种到达顺序:

# 顺序 1:A → B → C
shallow_merge(shallow_merge(shallow_merge({}, A), B), C)
# {"age": {"n_unique": 87}, "city": {"n_unique": 142}}

# 顺序 2:C → A → B
shallow_merge(shallow_merge(shallow_merge({}, C), A), B)
# {"age": {"missing_rate": 0.03}, "city": {"missing_rate": 0.00}}

两个问题同时暴露出来:

  • 数据丢失{**a, **b} 在叶子层用后写覆盖,已有子树被整体覆盖掉。
  • 结果依赖到达顺序:每次运行丢失的字段不同——这是典型的 Heisenbug:本地能跑通,生产偶发出错。相比稳定地丢数据,这种顺序依赖的丢失更危险,因为单元测试可能通过,集成测试偶尔失败,定位成本极高。
正确实现:深合并
def deep_merge(old: dict, new: dict) -> dict:
    result = dict(old)
    for k, v in new.items():
        if isinstance(v, dict) and isinstance(result.get(k), dict):
            result[k] = deep_merge(result[k], v)
        else:
            result[k] = v
    return result

class EDAState(MessagesState):
    column_profile: Annotated[dict, deep_merge]

任意到达顺序(A→B→C、C→A→B、B→C→A……)合并结果都是期望的完整 profile,交换律成立。

关键洞察:合并粒度需匹配业务的“职责边界”

浅合并失败、深合并成功,本质差异不在递归深度本身,而在于 reducer 的合并粒度是否对齐了业务真实的职责边界:

  • 三个 agent 在顶层 key("age""city")上有重叠
  • 但在叶子层(dtype / missing_rate / n_unique)上不重叠

浅合并在顶层就停止递归,把“顶层重叠”误判为冲突,用后写覆盖处理,制造了顺序依赖。深合并继续递归到叶子层,正确识别出“职责边界在更深处不重叠”,于是无冲突地合并,自然满足交换律。

所以,聚合型 reducer 的设计核心,就是让合并粒度匹配业务上真实的职责边界。边界划得对,并发就安全;边界划错(在还有非冲突合并空间时过早覆盖),交换律就被破坏。

仲裁语义型 Reducer:处理冲突写入

当多个节点对同一个值给出真正冲突的写入意图时,就需要业务规则来决定保留哪个。典型场景:

  • HITL 中人类决策和 Agent 决策
  • 多模型聚合时依据置信度选择行为
  • 时间序列行为取最新值

⚠️仲裁型 Reducer 逻辑复杂性

仲裁型 Reducer 的函数体通常只有几行 if-else,但其实际复杂度来自业务逻辑本身。以下是其与聚合型 Reducer 的对比:

维度聚合型仲裁型
判定规则来源数据结构业务语义(必须人工编码)
跨项目复用性几乎没有
结构要求裸数据即可必须携带元数据
交换律保证边界清晰依赖人工编码精确性
操作对象无限制,允许重叠必然重叠并产生冲突

用例:数据清洗 sub-agent 的 HITL 行为覆写

以数据清洗 sub-agent 为例:

flowchart TD
    A[START]
    B["Route by column"]
    C["missing_value_agent"]
    D["type_inference_agent"]
    H["human_review (HITL)"]
    E["Merge (through reducer) & Next step"]
    A --> B
    B --> C
    B --> D
    B -.HITL interrupt.-> H
    C --> E
    D --> E
    H --> E

业务规则:

  • 人类一旦做出决定,Agent 不允许覆盖对应行为
  • 多个 Agent 对同一列给出不同建议时,应当存在确定性的胜出规则

状态值设计:

class CleanerState(MessagesState):
    column_decisions: Annotated[dict[str, Decision], ]

column_decisions 是一个字典,内含对每一列的处理决策。仲裁规则的应用层面也是对单列的:

def arbitrate_decisions(old: dict, new: dict) -> dict:
    result = dict(old)
    for col, decision in new.items():
        result[col] = arbitrate_single(result.get(col), decision)
    return result

以下是 arbitrate_single 的两种可能方式。

错误尝试:朴素 Human 优先

最朴素的想法是直接返回新旧中属于 Human 的行为:

def arbitrate_single(old, new):
    if new.get("source") == "human":
        return new
    if old and old.get("source") == "human":
        return old
    return new

考虑测试样例:

A = {"source": "agent", "op": "fill_mean"}
B = {"source": "agent", "op": "fill_median"}
C = {"source": "human", "op": "drop"}

检查交换律:

  • 当 C 参与时,逻辑正确:始终返回 {"source": "human"}
  • 当 C 不参与时,可能出现:arbitrate_single(A, B) -> Barbitrate_single(B, A) -> A,结果不同,交换律被破坏。

所以当只有 Agent 时,行为就不确定。

正确尝试:新增置信度以显式建模业务逻辑

将字段更新为如下结构:

A = {"source": "agent", "op": "fill_mean", "confidence": 0.7}
B = {"source": "agent", "op": "fill_median", "confidence": 0.9}
C = {"source": "human", "op": "drop"}  # human 无需 confidence

更新 Reducer 实现:

def arbitrate_single(old, new):
    # 规则1:强制Human行为最优先
    if new.get("source") == "human":
        return new
    if old and old.get("source") == "human":
        return old

    # 规则2:无人类介入时,高置信度优先
    if old is None:
        return new
    if new["confidence"] != old["confidence"]:
        return new if new["confidence"] > old["confidence"] else old

    # 规则3:置信度相等时,强制规定偏好
    return new if new["op"] < old["op"] else old

这样一来,任何到达顺序下的结果都是一致的——交换律得到保证,系统行为也变得可预测。

来源:https://juejin.cn/post/7646293047777263658
上一篇Valideo AI生活助手好不好用 真实体验评测 下一篇ProductHunt AI导航网站功能与使用体验评测
本站内容用于信息整理与展示,如有侵权或内容问题请及时联系处理。

相关推荐

补充同频道和同主题内容,方便继续浏览更多相关内容。

同类最新

继续查看同栏目最近更新的文章。

更多
AI驱动无代码技术降低巡检超自动化门槛
AI教程 · 2026-06-01

AI驱动无代码技术降低巡检超自动化门槛

想象一下,在IT运维场景中,超自动化巡检的远景蓝图确实令人憧憬——全栈覆盖、AI驱动、无人值守、智能闭环,听起来极具未来感。但真正了解内情的人都知道,一个现实难题长期困扰着企业:自动化的进入门槛,实在太高了。传统自动化方案往往离不开脚本编写、API对接、协议理解,每一项都对编程功底提出了严峻考验。知

提升工作总结公文写作技巧与格式范文指南
AI教程 · 2026-06-01

提升工作总结公文写作技巧与格式范文指南

工作总结是职场人回顾过去、规划未来的关键工具,广泛应用于科技、教育、医疗等行业。高质量总结需明确读者对象,涵盖完成情况、问题、改进措施和计划,采用标题、引言、正文、结尾的规范格式,提升专业度与可读性。

范文正公文集叙翻译写作技巧与专业提升
AI教程 · 2026-06-01

范文正公文集叙翻译写作技巧与专业提升

翻译《范文正公文集叙》需兼顾语言转换与文化传递,精准表达原文情感与底色。公文写作强调语言准确清晰、格式规范,各类通知、报告等均有固定结构。借鉴该书范本,可提升公文专业性与规范性。

公文申请格式与撰写技巧:提升审批效率
AI教程 · 2026-06-01

公文申请格式与撰写技巧:提升审批效率

公文申请格式标准化能显著提升审批效率,市场需求随技术从数字化迈向智能化快速翻倍。撰写申请需清晰说明需求与依据,注重逻辑严谨、排版规范,并站在审批者视角突出必要性与合理性,以增强说服力。

五大策略提升公文写作模板使用效率与规范性
AI教程 · 2026-06-01

五大策略提升公文写作模板使用效率与规范性

公文写作模板已成为职场刚需,广泛应用于政府、企业、教育等领域。通过标准化格式、智能化工具及灵活调整,可提升写作效率与规范性。结合清晰段落、简洁语言及表格等技巧,能进一步优化文书质量。