前言
在之前的文章中,我们已经一起探讨了 LangGraph 的基本概念,并编写了第一个简易程序。今天我们将更进一步,从它的通信机制开始深入——这是理解后续复杂工作流不可或缺的基础。
Reducers
当多个节点以并行或串行方式对同一状态进行修改时,Reducers 负责定义如何将“旧状态”与“新修改”合并为新的状态。可以将其视为一套状态合并的规则引擎。
LangGraph 的设计很精妙:它利用 Annotated 标注为每个字段预先定义合并规则,框架在执行过程中会自动依据这些规则合并来自多个节点的修改。
请看以下代码示例:
messages: Annotated[list[str], lambda old, new: old + new]list[str]:表示字段类型为字符串列表lambda old, new: old + new:这就是 Reducers 的实现——一个纯函数,定义了旧状态与新修改的合并方式。此处采用了“追加列表”的合并策略。
# 1. 定义状态与Reducers(关键!)
class State(TypedDict):
messages: Annotated[list[str], lambda old, new: old + new] # 追加合并对话
count: Annotated[int, lambda old, new: old + new] # 求和合并计数
Send
Send 是 LangGraph 提供的一种动态消息触发 API。节点函数通过返回 Send 对象,可以主动向目标节点发送一份状态副本,从而触发该节点执行相应的操作。其核心价值在于:替代静态边,实现“根据运行时数据动态选择下一个节点”的能力。例如,当用户意图发生变化时,可以转向不同的节点进行处理。
- 单播:返回
{"send": Send(target, state)},向一个节点发送一份状态副本,类似于“一对一派单”。 - 广播:返回
[Send(target, state_i) for i in N],向多个节点分别发送多份状态副本,类似于“批量派单”。
关于 Send 的更多具体应用案例,建议参考官方文档。
State
messages:用于单播jokes:用于广播
class State(TypedDict):
messages: Annotated[list[str], operator.add] # 对话消息
jokes: Annotated[list[str], operator.add] # 生成的笑话
subject: Annotated[str, operator.add]
这里需要特别注意:
query_order:消息发送仅传递当前状态,并未产生新信息。generate_joke:通过Send传递了新信息(如笑话主题),但 Send 本身不会自动将新信息加入 State,因此需要手动构造新的 State 进行传递。
# 3. 核心节点:同时演示单播/广播(条件分支)
def demo_send(state: State) -> Union[Dict[str, Any], List[Send]]:
last_msg = state["messages"][-1] if state["messages"] else ""
# ? 单播写法:触发query_order节点
if "订单" in last_msg:
return {"send": Send("query_order", {"messages": state["messages"],
"jokes": state["jokes"],
"subject": ""})} # 显式传递subject,避免字段缺失
# ? 广播写法:触发generate_joke节点3次(并行)
elif "笑话" in last_msg:
sends = []
for s in ["猫", "狗", "咖啡"]:
# 复制一份 state,把 subject 写进去!
new_state = {"subject": s} # 必须写在这里!
sends.append(Send("generate_joke", new_state))
return {"send": sends}
# 默认分支:触发通用回答
else:
return {"send": Send("general_answer", {"messages": state})}
节点
# 4. 单播目标节点:处理订单查询
def query_order(state: State) -> State:
msg = state["messages"][0]
return {"messages": [f"查订单结果:{msg}已发货"],
"jokes": state.get("jokes", []),
"subject": ""}
# 5. 广播目标节点:生成单个笑话
def generate_joke(state: State) -> State:
subject = state.get("subject", "小毛蛋")
joke = f"{subject}走进酒吧说“来杯牛奶”,老板问“变乖了?”,它说“医生说我得补钙!”"
return {"messages": state.get("messages", []),
"jokes": state.get("jokes", []) + [joke],
"subject": ""}
条件边
如果节点返回的是 {"send": [Send(), Send()]},那么条件边必须直接写成 return send。
# 添加条件边,让demo_send的send指令驱动路由
def get_next_nodes(state):
return state["send"]
# 配置demo_send的条件路由(关键:让Send指令生效)
builder.add_conditional_edges(
"demo_send",
get_next_nodes, # 路由函数:返回要触发的目标节点
{"query_order": "query_order",
"generate_joke": "generate_joke",
"general_answer": "general_answer"}
)
示例
# 示例1:单播(含“订单”)
res_single = graph.invoke({"messages": ["我的订单123没到"], "jokes": [], "subject": ""})
logger.info("=== 单播结果 ===")
logger.info("消息:" + str(res_single["messages"])) # 输出:["我的订单123没到", "查订单结果:我的订单123没到已发货"]
logger.info("笑话:" + str(res_single["jokes"])) # 输出:[]
# 示例2:广播(含“笑话”)
res_broadcast = graph.invoke({"messages": ["我想听笑话"], "jokes": [], "subject": ""})
logger.info("n=== 广播结果 ===")
logger.info("消息:" + str(res_broadcast["messages"])) # 输出:[]
logger.info("笑话:" + str(res_broadcast["jokes"])) # 输出:3个关于猫、狗、咖啡的笑话
Running
Interrupt
Interrupt 是 LangGraph 内置的主动暂停机制。当流程执行到某个节点时,触发 Interrupt 会冻结当前完整状态(包括所有变量、节点执行历史和上下文),然后将控制权交还给调用方。待外部输入(如用户反馈、人工审核)到达后,再从暂停处无缝恢复执行。
下面的 Demo 展示了一个简单的 AI 自动处理与人工中断审核相结合的工作流。更多案例可参考官方文档。
interrupt():使用此函数可以制造中断。
def conditional_human_review(state: State):
"""动态中断:只有 need_human_review = True 才会等待人类输入,否则直接跳过"""
# ✅ 核心:条件中断
if state["need_human_review"]:
# 中断,等待人类输入
human_input = interrupt({
"ai_result": state["some_text"],
"tip": "请输入修改意见(满足条件才会出现):"
})
return {"some_text": human_input}
# 不满足条件 → 不中断,直接返回原内容
logger.info("✅ 不满足中断条件,跳过人工审核")
return state
Command
Command 是 LangGraph 流程控制中的“导航仪”。它封装了“下一步去哪、带什么数据、做什么操作”等指令,有效解决了传统 Agent 中隐式流程控制容易失控的痛点。
- 本质:一个结构体,包含
goto(目标节点名)、update(要修改的状态字段)、resume(恢复中断的节点)等关键信息,需要与interrupt搭配使用。 - 作用:精确控制工作流的跳转与状态变更。
- 应用场景:多分支决策(例如分类后跳转至不同节点)、循环重试(例如失败后返回原节点)、中断恢复(例如等待用户输入后继续执行)。
State
class AgentState(TypedDict):
question: str # 用户问题
category: Literal["tech", "non-tech"] | None # 分类:技术/非技术
response: str | None # 最终回答
need_interrupt: bool | None # 是否需要中断(用于 resume 演示)
goto/update
def classify_node(state: AgentState) -> Command:
"""核心演示:
Command(goto=节点名) → 动态跳转
Command(update=状态) → 合并更新状态
"""
question = state["question"]
# 规则:包含代码、bug → 技术问题
if "代码" in question or "bug" in question:
return Command(
goto="handle_tech", # 跳去技术节点
update={
"category": "tech", # 更新分类
"need_interrupt": True # 需要中断(演示 resume)
}
)
# 其他 → 非技术
else:
return Command(
goto="handle_non_tech",
update={
"category": "non-tech",
"need_interrupt": False
}
)
resume
resume 用于恢复由 interrupt 引起的中断。首先需要定义 interrupt 代码。
def handle_tech(state: AgentState):
"""核心演示:
interrupt() → 暂停流程
Command(resume=xxx) → 恢复流程
"""
# 如果需要中断,就暂停,等待人类输入
if state["need_interrupt"]:
# 中断!返回给前端/用户
human_input = interrupt({
"tip": "请输入人工修改意见",
"ai_answer": f"已识别技术问题:{state['question']}"
})
# 恢复后,用人类输入更新结果
return {"response": f"[技术+人工] {human_input}"}
# 不需要中断 → 直接返回
return {"response": f"[技术] 已处理:{state['question']}"}
当中断被检测到后,等待人工输入即可恢复执行。
# 检测到中断,才会有恢复
if result.get("__interrupt__"):
logger.info("检测到技术问题,转人工")
# 2. 用户输入 → 恢复流程(核心:resume)
human_feedback = input("人工9527号修复意见:")
resume_result = app.invoke(
Command(resume=human_feedback), # ✅ resume 恢复中断
config=config
)
# 3. 恢复结果
logger.info(f"? 恢复后最终结果:{resume_result['response']}")
更多实际案例请参阅官方文档。
源码
GitHub 源码
