游乐游手机版
首页/AI教程/文章详情

LangGraph第一篇 Graph通讯机制从入门到精通实战全攻略

时间:2026-05-29 15:26
前言 在之前的文章中,我们已经一起探讨了 LangGraph 的基本概念,并编写了第一个简易程序。今天我们将更进一步,从它的通信机制开始深入——这是理解后续复杂工作流不可或缺的基础。 Reducers 当多个节点以并行或串行方式对同一状态进行修改时,Reducers 负责定义如何将“旧状态”与“新修

前言

在之前的文章中,我们已经一起探讨了 LangGraph 的基本概念,并编写了第一个简易程序。今天我们将更进一步,从它的通信机制开始深入——这是理解后续复杂工作流不可或缺的基础。

Reducers

当多个节点以并行或串行方式对同一状态进行修改时,Reducers 负责定义如何将“旧状态”与“新修改”合并为新的状态。可以将其视为一套状态合并的规则引擎。

LangGraph 的设计很精妙:它利用 Annotated 标注为每个字段预先定义合并规则,框架在执行过程中会自动依据这些规则合并来自多个节点的修改。

请看以下代码示例:

  • messages: Annotated[list[str], lambda old, new: old + new]
    • list[str]:表示字段类型为字符串列表
    • lambda old, new: old + new:这就是 Reducers 的实现——一个纯函数,定义了旧状态与新修改的合并方式。此处采用了“追加列表”的合并策略。
# 1. 定义状态与Reducers(关键!)
class State(TypedDict):
    messages: Annotated[list[str], lambda old, new: old + new]  # 追加合并对话
    count: Annotated[int, lambda old, new: old + new]           # 求和合并计数

Send

Send 是 LangGraph 提供的一种动态消息触发 API。节点函数通过返回 Send 对象,可以主动向目标节点发送一份状态副本,从而触发该节点执行相应的操作。其核心价值在于:替代静态边,实现“根据运行时数据动态选择下一个节点”的能力。例如,当用户意图发生变化时,可以转向不同的节点进行处理。

  • 单播:返回 {"send": Send(target, state)},向一个节点发送一份状态副本,类似于“一对一派单”。
  • 广播:返回 [Send(target, state_i) for i in N],向多个节点分别发送多份状态副本,类似于“批量派单”。

关于 Send 的更多具体应用案例,建议参考官方文档。

State

  • messages:用于单播
  • jokes:用于广播
class State(TypedDict):
    messages: Annotated[list[str], operator.add]  # 对话消息
    jokes: Annotated[list[str], operator.add]     # 生成的笑话
    subject: Annotated[str, operator.add]

这里需要特别注意:

  • query_order:消息发送仅传递当前状态,并未产生新信息。
  • generate_joke:通过 Send 传递了新信息(如笑话主题),但 Send 本身不会自动将新信息加入 State,因此需要手动构造新的 State 进行传递。
# 3. 核心节点:同时演示单播/广播(条件分支)
def demo_send(state: State) -> Union[Dict[str, Any], List[Send]]:
    last_msg = state["messages"][-1] if state["messages"] else ""

    # ? 单播写法:触发query_order节点
    if "订单" in last_msg:
        return {"send": Send("query_order", {"messages": state["messages"],
                                              "jokes": state["jokes"],
                                              "subject": ""})}  # 显式传递subject,避免字段缺失

    # ? 广播写法:触发generate_joke节点3次(并行)
    elif "笑话" in last_msg:
        sends = []
        for s in ["猫", "狗", "咖啡"]:
            # 复制一份 state,把 subject 写进去!
            new_state = {"subject": s}  # 必须写在这里!
            sends.append(Send("generate_joke", new_state))
        return {"send": sends}

    # 默认分支:触发通用回答
    else:
        return {"send": Send("general_answer", {"messages": state})}

节点

# 4. 单播目标节点:处理订单查询
def query_order(state: State) -> State:
    msg = state["messages"][0]
    return {"messages": [f"查订单结果:{msg}已发货"],
            "jokes": state.get("jokes", []),
            "subject": ""}

# 5. 广播目标节点:生成单个笑话
def generate_joke(state: State) -> State:
    subject = state.get("subject", "小毛蛋")
    joke = f"{subject}走进酒吧说“来杯牛奶”,老板问“变乖了?”,它说“医生说我得补钙!”"
    return {"messages": state.get("messages", []),
            "jokes": state.get("jokes", []) + [joke],
            "subject": ""}

条件边

如果节点返回的是 {"send": [Send(), Send()]},那么条件边必须直接写成 return send

# 添加条件边,让demo_send的send指令驱动路由
def get_next_nodes(state):
    return state["send"]

# 配置demo_send的条件路由(关键:让Send指令生效)
builder.add_conditional_edges(
    "demo_send",
    get_next_nodes,  # 路由函数:返回要触发的目标节点
    {"query_order": "query_order",
     "generate_joke": "generate_joke",
     "general_answer": "general_answer"}
)

示例

# 示例1:单播(含“订单”)
res_single = graph.invoke({"messages": ["我的订单123没到"], "jokes": [], "subject": ""})
logger.info("=== 单播结果 ===")
logger.info("消息:" + str(res_single["messages"]))  # 输出:["我的订单123没到", "查订单结果:我的订单123没到已发货"]
logger.info("笑话:" + str(res_single["jokes"]))    # 输出:[]

# 示例2:广播(含“笑话”)
res_broadcast = graph.invoke({"messages": ["我想听笑话"], "jokes": [], "subject": ""})
logger.info("n=== 广播结果 ===")
logger.info("消息:" + str(res_broadcast["messages"]))  # 输出:[]
logger.info("笑话:" + str(res_broadcast["jokes"]))    # 输出:3个关于猫、狗、咖啡的笑话

Running

Interrupt

Interrupt 是 LangGraph 内置的主动暂停机制。当流程执行到某个节点时,触发 Interrupt 会冻结当前完整状态(包括所有变量、节点执行历史和上下文),然后将控制权交还给调用方。待外部输入(如用户反馈、人工审核)到达后,再从暂停处无缝恢复执行。

下面的 Demo 展示了一个简单的 AI 自动处理与人工中断审核相结合的工作流。更多案例可参考官方文档。

  • interrupt():使用此函数可以制造中断。
def conditional_human_review(state: State):
    """动态中断:只有 need_human_review = True 才会等待人类输入,否则直接跳过"""
    # ✅ 核心:条件中断
    if state["need_human_review"]:
        # 中断,等待人类输入
        human_input = interrupt({
            "ai_result": state["some_text"],
            "tip": "请输入修改意见(满足条件才会出现):"
        })
        return {"some_text": human_input}
    # 不满足条件 → 不中断,直接返回原内容
    logger.info("✅ 不满足中断条件,跳过人工审核")
    return state

Command

Command 是 LangGraph 流程控制中的“导航仪”。它封装了“下一步去哪、带什么数据、做什么操作”等指令,有效解决了传统 Agent 中隐式流程控制容易失控的痛点。

  • 本质:一个结构体,包含 goto(目标节点名)、update(要修改的状态字段)、resume(恢复中断的节点)等关键信息,需要与 interrupt 搭配使用。
  • 作用:精确控制工作流的跳转与状态变更。
  • 应用场景:多分支决策(例如分类后跳转至不同节点)、循环重试(例如失败后返回原节点)、中断恢复(例如等待用户输入后继续执行)。

State

class AgentState(TypedDict):
    question: str                       # 用户问题
    category: Literal["tech", "non-tech"] | None  # 分类:技术/非技术
    response: str | None                # 最终回答
    need_interrupt: bool | None         # 是否需要中断(用于 resume 演示)

goto/update

def classify_node(state: AgentState) -> Command:
    """核心演示:
         Command(goto=节点名) → 动态跳转
         Command(update=状态) → 合并更新状态
    """
    question = state["question"]
    # 规则:包含代码、bug → 技术问题
    if "代码" in question or "bug" in question:
        return Command(
            goto="handle_tech",                          # 跳去技术节点
            update={
                "category": "tech",                       # 更新分类
                "need_interrupt": True                    # 需要中断(演示 resume)
            }
        )
    # 其他 → 非技术
    else:
        return Command(
            goto="handle_non_tech",
            update={
                "category": "non-tech",
                "need_interrupt": False
            }
        )

resume

resume 用于恢复由 interrupt 引起的中断。首先需要定义 interrupt 代码。

def handle_tech(state: AgentState):
    """核心演示:
         interrupt() → 暂停流程
         Command(resume=xxx) → 恢复流程
    """
    # 如果需要中断,就暂停,等待人类输入
    if state["need_interrupt"]:
        # 中断!返回给前端/用户
        human_input = interrupt({
            "tip": "请输入人工修改意见",
            "ai_answer": f"已识别技术问题:{state['question']}"
        })
        # 恢复后,用人类输入更新结果
        return {"response": f"[技术+人工] {human_input}"}
    # 不需要中断 → 直接返回
    return {"response": f"[技术] 已处理:{state['question']}"}

当中断被检测到后,等待人工输入即可恢复执行。

# 检测到中断,才会有恢复
if result.get("__interrupt__"):
    logger.info("检测到技术问题,转人工")
    # 2. 用户输入 → 恢复流程(核心:resume)
    human_feedback = input("人工9527号修复意见:")
    resume_result = app.invoke(
        Command(resume=human_feedback),  # ✅ resume 恢复中断
        config=config
    )
    # 3. 恢复结果
    logger.info(f"? 恢复后最终结果:{resume_result['response']}")

更多实际案例请参阅官方文档。

源码

GitHub 源码

来源:https://juejin.cn/post/7627887164718170148
上一篇半监督学习是什么?一文读懂核心概念与应用 下一篇气体涡轮流量计说明书撰写完整指南及范文示例
本站内容用于信息整理与展示,如有侵权或内容问题请及时联系处理。

相关推荐

补充同频道和同主题内容,方便继续浏览更多相关内容。

同类最新

继续查看同栏目最近更新的文章。

更多
GPT Workspace通过GPT-5强化Google Workspace,文档表格邮件创作效率与智能化提升
AI教程 · 2026-05-29

GPT Workspace通过GPT-5强化Google Workspace,文档表格邮件创作效率与智能化提升

GPT Workspace 产品介绍:GPT-5 如何增强 Google Workspace 工作效率 如果你每天都在使用 Google Workspace 进行文档撰写、表格处理、邮件沟通和演示制作,一定深有体会:大量重复性的办公任务耗费了宝贵的时间。现在,GPT Workspace 将 GPT-

AI助手提升年终总结与周报效率的精准营销策略
AI教程 · 2026-05-29

AI助手提升年终总结与周报效率的精准营销策略

适合需求:在信息爆炸的时代,企业所承受的竞争压力几乎覆盖了所有维度,其中营销领域尤为令人困扰。无论是撰写年终总结还是生成周报,精准的营销策略已成为不可或缺的需求——没有谁愿意在庞杂的数据中迷失方向。当我们复盘营销活动时,总会思考:过去哪些数字营销策略真正发挥了效果?哪些内容营销策略有待改进?然而实际

Afri Studio 非洲创意工作室
AI教程 · 2026-05-29

Afri Studio 非洲创意工作室

Afri Studio是什么先来聊聊Afri Studio——它是Afri AI团队推出的一款AI媒体创作工作室,目标很明确:把原本高高在上的智能技术拉下神坛,让普通用户也能轻松生成高质量的文本、图像、音频等内容。换句话说,这是一个面向内容创作者、博主、营销人员、艺术家的“AI工具箱”,帮你高效搞定

Geniea专注Midjourney提示词优化提升创意生成效率
AI教程 · 2026-05-29

Geniea专注Midjourney提示词优化提升创意生成效率

Geniea产品详解:Midjourney提示优化工具Geniea是一款专注于Midjourney提示词优化的智能平台,致力于帮助创作者快速生成高质量且富有创意的提示方案。无论您需要电影镜头、食品摄影还是汽车广告等场景的提示词,只需输入简单指令,系统便会自动输出优化后的提示文本,大幅提升创作效率。提

幼儿园大班毕业典礼方案PPT AI轻松制作精彩回顾
AI教程 · 2026-05-29

幼儿园大班毕业典礼方案PPT AI轻松制作精彩回顾

使用情景 每年毕业季来临之际,幼儿园大班毕业典礼的筹备工作,总是牵动着众多老师、家长和孩子们的心弦。这不仅仅是一场简单的活动,更是孩子们人生中首个重要的成长仪式,标志着他们告别幼儿时光、迈向新阶段的里程碑。对于家长而言,这也是一次充满感怀的“毕业”,意味着一段陪伴旅程的暂时落幕。 如何让这场典礼既温