游乐游手机版
首页/AI教程/文章详情

LangGraph第一篇 Graph通讯机制从入门到精通实战全攻略

时间:2026-05-29 15:26
前言 在之前的文章中,我们已经一起探讨了 LangGraph 的基本概念,并编写了第一个简易程序。今天我们将更进一步,从它的通信机制开始深入——这是理解后续复杂工作流不可或缺的基础。 Reducers 当多个节点以并行或串行方式对同一状态进行修改时,Reducers 负责定义如何将“旧状态”与“新修

前言

在之前的文章中,我们已经一起探讨了 LangGraph 的基本概念,并编写了第一个简易程序。今天我们将更进一步,从它的通信机制开始深入——这是理解后续复杂工作流不可或缺的基础。

Reducers

当多个节点以并行或串行方式对同一状态进行修改时,Reducers 负责定义如何将“旧状态”与“新修改”合并为新的状态。可以将其视为一套状态合并的规则引擎。

LangGraph 的设计很精妙:它利用 Annotated 标注为每个字段预先定义合并规则,框架在执行过程中会自动依据这些规则合并来自多个节点的修改。

请看以下代码示例:

  • messages: Annotated[list[str], lambda old, new: old + new]
    • list[str]:表示字段类型为字符串列表
    • lambda old, new: old + new:这就是 Reducers 的实现——一个纯函数,定义了旧状态与新修改的合并方式。此处采用了“追加列表”的合并策略。
# 1. 定义状态与Reducers(关键!)
class State(TypedDict):
    messages: Annotated[list[str], lambda old, new: old + new]  # 追加合并对话
    count: Annotated[int, lambda old, new: old + new]           # 求和合并计数

Send

Send 是 LangGraph 提供的一种动态消息触发 API。节点函数通过返回 Send 对象,可以主动向目标节点发送一份状态副本,从而触发该节点执行相应的操作。其核心价值在于:替代静态边,实现“根据运行时数据动态选择下一个节点”的能力。例如,当用户意图发生变化时,可以转向不同的节点进行处理。

  • 单播:返回 {"send": Send(target, state)},向一个节点发送一份状态副本,类似于“一对一派单”。
  • 广播:返回 [Send(target, state_i) for i in N],向多个节点分别发送多份状态副本,类似于“批量派单”。

关于 Send 的更多具体应用案例,建议参考官方文档。

State

  • messages:用于单播
  • jokes:用于广播
class State(TypedDict):
    messages: Annotated[list[str], operator.add]  # 对话消息
    jokes: Annotated[list[str], operator.add]     # 生成的笑话
    subject: Annotated[str, operator.add]

这里需要特别注意:

  • query_order:消息发送仅传递当前状态,并未产生新信息。
  • generate_joke:通过 Send 传递了新信息(如笑话主题),但 Send 本身不会自动将新信息加入 State,因此需要手动构造新的 State 进行传递。
# 3. 核心节点:同时演示单播/广播(条件分支)
def demo_send(state: State) -> Union[Dict[str, Any], List[Send]]:
    last_msg = state["messages"][-1] if state["messages"] else ""

    # ? 单播写法:触发query_order节点
    if "订单" in last_msg:
        return {"send": Send("query_order", {"messages": state["messages"],
                                              "jokes": state["jokes"],
                                              "subject": ""})}  # 显式传递subject,避免字段缺失

    # ? 广播写法:触发generate_joke节点3次(并行)
    elif "笑话" in last_msg:
        sends = []
        for s in ["猫", "狗", "咖啡"]:
            # 复制一份 state,把 subject 写进去!
            new_state = {"subject": s}  # 必须写在这里!
            sends.append(Send("generate_joke", new_state))
        return {"send": sends}

    # 默认分支:触发通用回答
    else:
        return {"send": Send("general_answer", {"messages": state})}

节点

# 4. 单播目标节点:处理订单查询
def query_order(state: State) -> State:
    msg = state["messages"][0]
    return {"messages": [f"查订单结果:{msg}已发货"],
            "jokes": state.get("jokes", []),
            "subject": ""}

# 5. 广播目标节点:生成单个笑话
def generate_joke(state: State) -> State:
    subject = state.get("subject", "小毛蛋")
    joke = f"{subject}走进酒吧说“来杯牛奶”,老板问“变乖了?”,它说“医生说我得补钙!”"
    return {"messages": state.get("messages", []),
            "jokes": state.get("jokes", []) + [joke],
            "subject": ""}

条件边

如果节点返回的是 {"send": [Send(), Send()]},那么条件边必须直接写成 return send

# 添加条件边,让demo_send的send指令驱动路由
def get_next_nodes(state):
    return state["send"]

# 配置demo_send的条件路由(关键:让Send指令生效)
builder.add_conditional_edges(
    "demo_send",
    get_next_nodes,  # 路由函数:返回要触发的目标节点
    {"query_order": "query_order",
     "generate_joke": "generate_joke",
     "general_answer": "general_answer"}
)

示例

# 示例1:单播(含“订单”)
res_single = graph.invoke({"messages": ["我的订单123没到"], "jokes": [], "subject": ""})
logger.info("=== 单播结果 ===")
logger.info("消息:" + str(res_single["messages"]))  # 输出:["我的订单123没到", "查订单结果:我的订单123没到已发货"]
logger.info("笑话:" + str(res_single["jokes"]))    # 输出:[]

# 示例2:广播(含“笑话”)
res_broadcast = graph.invoke({"messages": ["我想听笑话"], "jokes": [], "subject": ""})
logger.info("n=== 广播结果 ===")
logger.info("消息:" + str(res_broadcast["messages"]))  # 输出:[]
logger.info("笑话:" + str(res_broadcast["jokes"]))    # 输出:3个关于猫、狗、咖啡的笑话

Running

Interrupt

Interrupt 是 LangGraph 内置的主动暂停机制。当流程执行到某个节点时,触发 Interrupt 会冻结当前完整状态(包括所有变量、节点执行历史和上下文),然后将控制权交还给调用方。待外部输入(如用户反馈、人工审核)到达后,再从暂停处无缝恢复执行。

下面的 Demo 展示了一个简单的 AI 自动处理与人工中断审核相结合的工作流。更多案例可参考官方文档。

  • interrupt():使用此函数可以制造中断。
def conditional_human_review(state: State):
    """动态中断:只有 need_human_review = True 才会等待人类输入,否则直接跳过"""
    # ✅ 核心:条件中断
    if state["need_human_review"]:
        # 中断,等待人类输入
        human_input = interrupt({
            "ai_result": state["some_text"],
            "tip": "请输入修改意见(满足条件才会出现):"
        })
        return {"some_text": human_input}
    # 不满足条件 → 不中断,直接返回原内容
    logger.info("✅ 不满足中断条件,跳过人工审核")
    return state

Command

Command 是 LangGraph 流程控制中的“导航仪”。它封装了“下一步去哪、带什么数据、做什么操作”等指令,有效解决了传统 Agent 中隐式流程控制容易失控的痛点。

  • 本质:一个结构体,包含 goto(目标节点名)、update(要修改的状态字段)、resume(恢复中断的节点)等关键信息,需要与 interrupt 搭配使用。
  • 作用:精确控制工作流的跳转与状态变更。
  • 应用场景:多分支决策(例如分类后跳转至不同节点)、循环重试(例如失败后返回原节点)、中断恢复(例如等待用户输入后继续执行)。

State

class AgentState(TypedDict):
    question: str                       # 用户问题
    category: Literal["tech", "non-tech"] | None  # 分类:技术/非技术
    response: str | None                # 最终回答
    need_interrupt: bool | None         # 是否需要中断(用于 resume 演示)

goto/update

def classify_node(state: AgentState) -> Command:
    """核心演示:
         Command(goto=节点名) → 动态跳转
         Command(update=状态) → 合并更新状态
    """
    question = state["question"]
    # 规则:包含代码、bug → 技术问题
    if "代码" in question or "bug" in question:
        return Command(
            goto="handle_tech",                          # 跳去技术节点
            update={
                "category": "tech",                       # 更新分类
                "need_interrupt": True                    # 需要中断(演示 resume)
            }
        )
    # 其他 → 非技术
    else:
        return Command(
            goto="handle_non_tech",
            update={
                "category": "non-tech",
                "need_interrupt": False
            }
        )

resume

resume 用于恢复由 interrupt 引起的中断。首先需要定义 interrupt 代码。

def handle_tech(state: AgentState):
    """核心演示:
         interrupt() → 暂停流程
         Command(resume=xxx) → 恢复流程
    """
    # 如果需要中断,就暂停,等待人类输入
    if state["need_interrupt"]:
        # 中断!返回给前端/用户
        human_input = interrupt({
            "tip": "请输入人工修改意见",
            "ai_answer": f"已识别技术问题:{state['question']}"
        })
        # 恢复后,用人类输入更新结果
        return {"response": f"[技术+人工] {human_input}"}
    # 不需要中断 → 直接返回
    return {"response": f"[技术] 已处理:{state['question']}"}

当中断被检测到后,等待人工输入即可恢复执行。

# 检测到中断,才会有恢复
if result.get("__interrupt__"):
    logger.info("检测到技术问题,转人工")
    # 2. 用户输入 → 恢复流程(核心:resume)
    human_feedback = input("人工9527号修复意见:")
    resume_result = app.invoke(
        Command(resume=human_feedback),  # ✅ resume 恢复中断
        config=config
    )
    # 3. 恢复结果
    logger.info(f"? 恢复后最终结果:{resume_result['response']}")

更多实际案例请参阅官方文档。

源码

GitHub 源码

来源:https://juejin.cn/post/7627887164718170148
上一篇半监督学习是什么?一文读懂核心概念与应用 下一篇气体涡轮流量计说明书撰写完整指南及范文示例
本站内容用于信息整理与展示,如有侵权或内容问题请及时联系处理。

相关推荐

补充同频道和同主题内容,方便继续浏览更多相关内容。

同类最新

继续查看同栏目最近更新的文章。

更多
Windows Docker Desktop RabbitMQ生产级部署完整指南
AI教程 · 2026-06-29

Windows Docker Desktop RabbitMQ生产级部署完整指南

前言 在 Windows 本地开发环境中,直接安装 RabbitMQ 确实颇为周折:需要单独配置 Erlang 运行环境、手动管理环境变量、服务启停全凭手工操作。更令人困扰的是,版本兼容冲突、端口占用、环境不一致等问题层出不穷。笔者见过不少开发者为搭建环境就得耗费整整半天时间。 相比之下,借助 Do

AI搜索重构制造业采购逻辑的阿里云企业级GEOCMS优化实践
AI教程 · 2026-06-29

AI搜索重构制造业采购逻辑的阿里云企业级GEOCMS优化实践

先分享一个切实感受。过去两年,我们与福建制造企业合作较为频繁,发现一个非常突出的现象:超过80%的企业官网,产品参数仍然存放在PDF或图片中。AI爬虫?根本无法抓取。这些企业技术实力不弱、资质证照齐全、应用案例也丰富,但在AI搜索这一全新战场上,它们几乎处于隐身状态。 一、一个正在发生的行业变化 A

阿里云Token Plan团队版功能价格与省钱购买指南
AI教程 · 2026-06-29

阿里云Token Plan团队版功能价格与省钱购买指南

阿里云百炼近期推出了名为“Token Plan 团队版”的全新服务,这一服务专为企业与开发者量身打造,定位为AI大模型订阅平台。通过引入Credits作为统一计量单位,将文本生成、图像生成等多模态AI能力纳入单一计费体系,同时无缝兼容主流AI编程工具及智能体(Agent)生态系统。其核心亮点包括:全

阿里云物联网.NET Core客户端位置信息上报
AI教程 · 2026-06-29

阿里云物联网.NET Core客户端位置信息上报

阿里云物联网平台的位置服务并非一个完全独立的功能模块。位置信息可包含二维坐标与三维坐标,而位置数据的来源本质上是借助设备属性进行上传。换言之,若要让设备上报位置,您需先将其视为一个普通属性进行处理。 1)添加二维位置数据 操作过程十分简洁。进入数据分析 → 空间数据可视化 → 二维数据,点击添加,将

年阿里云服务器选型配置与网站部署全攻略
AI教程 · 2026-06-29

年阿里云服务器选型配置与网站部署全攻略

2026年,阿里云服务器生态已高度成熟,形成了清晰的轻量应用服务器与ECS云服务器两大产品阵营。无论你是计划搭建个人博客、企业官网,还是运营电商平台、进行应用开发,基本都能找到理想的解决方案。本指南将从服务器选型、配置选择、部署流程到安全运维,系统梳理2026年最实用的操作要点,帮助你少走弯路,让网