一.Command使用
"""CommandDemo.py — LangGraph 1.0.6 正式版中 Command 的基础用法,演示状态更新、流程控制和动态路由"""from typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import Command# 统一递归限制,方便后续维护RECURSION_LIMIT = 50# 先定义状态结构class AgentState(TypedDict):messages: Annotated[list, lambda x, y: x + y]# 自动合并消息current_agent: strtask_completed: bool#{'messages': [('user', '我需要计算数学题')], 'current_agent': 'user', 'task_completed': False}# 核心路由节点,负责分发任务def decision_agent(state: AgentState) -> Command[AgentState]:"""根据用户输入决定下一步走到哪个 agent,如果任务已完成就直接结束"""print("执行节点: decision_agent")# 先检查任务是否已完成,防止无限循环if state["task_completed"]:print("✅ 检测到任务已完成,直接终止流程")return Command(update={"messages": [("system", "所有任务处理完成,流程正常结束")]},goto=END)# 取出最后一条消息,注意处理空列表的情况last_message = state["messages"][-1] if state["messages"] else ("", "")last_msg_content = last_message[1] #last_message = ("user", "我需要计算数学题")print(f"最新消息文本: {last_msg_content}")# 根据关键词做路由if "数学" in last_msg_content:print("✅ 检测到数学任务,路由到数学 agent")return Command(update={"messages": [("system", "路由到数学 agent")], "current_agent": "math_agent"},goto="math_agent")elif "翻译" in last_msg_content:print("✅ 检测到翻译任务,路由到翻译 agent")return Command(update={"messages": [("system", "路由到翻译 agent")], "current_agent": "translation_agent"},goto="translation_agent")else:print("❌ 未识别任务类型,标记任务完成并终止")return Command(update={"messages": [("system", "任务完成")], "task_completed": True},goto=END)# 数学业务节点def math_agent(state: AgentState) -> Command[AgentState]:"""处理数学计算任务,完成后返回决策节点"""print("执行节点: math_agent")result = "2 + 2 = 4"print(f"计算结果: {result}")return Command(update={"messages": [("assistant", f"数学计算结果: {result}")],"current_agent": "decision_agent","task_completed": True},goto="decision_agent")# 翻译业务节点def translation_agent(state: AgentState) -> Command[AgentState]:"""处理中英翻译任务,完成后返回决策节点"""print("执行节点: translation_agent")translation = "Hello -> 你好"print(f"翻译结果: {translation}")return Command(update={"messages": [("assistant", f"翻译结果: {translation}")],"current_agent": "decision_agent","task_completed": True},goto="decision_agent")def main():"""用Command实现状态更新、动态路由和流程终止"""print("=== Command 基础演示(LangGraph 1.0.6)===n")# 1. 构建状态图builder = StateGraph(AgentState)builder.add_node("decision_agent", decision_agent)builder.add_node("math_agent", math_agent)builder.add_node("translation_agent", translation_agent)# 2. 定义边(完整节点关系)builder.add_edge(START, "decision_agent")builder.add_edge("math_agent", "decision_agent")builder.add_edge("translation_agent", "decision_agent")builder.add_edge("decision_agent", END)# 3. 编译图graph = builder.compile()# 测试1:数学任务print("【测试1: 数学任务】")initial_state = {"messages": [("user", "我需要计算数学题")], "current_agent": "user", "task_completed": False}print("初始状态:", initial_state)result = graph.invoke(initial_state, recursion_limit=RECURSION_LIMIT)print("最终状态(简化):", {k: v for k, v in result.items() if k != "messages"})print("n" + "-" * 50 + "n")# 测试2:翻译任务print("【测试2: 翻译任务】")initial_state = {"messages": [("user", "我需要翻译文本")], "current_agent": "user", "task_completed": False}print("初始状态:", initial_state)result = graph.invoke(initial_state, recursion_limit=RECURSION_LIMIT)print("最终状态(简化):", {k: v for k, v in result.items() if k != "messages"})print("n" + "-" * 50 + "n")# 测试3:未识别任务print("【测试3: 未识别任务类型】")initial_state = {"messages": [("user", "你好")], "current_agent": "user", "task_completed": False}print("初始状态:", initial_state)result = graph.invoke(initial_state, recursion_limit=RECURSION_LIMIT)print("最终状态(简化):", {k: v for k, v in result.items() if k != "messages"})# 新增:可视化图结构(教学演示必备)print("n=== 图结构可视化 ===")print(graph.get_graph().draw_mermaid())if __name__ == "__main__":main()
执行结果:
二.context_schema参数和Runtime用法
"""RuntimeContextDemo.py — 演示 LangGraph 的 Context Schema 功能,看看怎么把模型名称、数据库连接这些非状态信息传递给节点,实际项目中很实用。"""from typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.runtime import Runtimefrom langchain_core.messages import AIMessage, HumanMessagefrom dataclasses import dataclass# 定义状态结构class AgentState(TypedDict):messages: Annotated[list, lambda x, y: x + y]response: str# 定义上下文结构class ContextSchema:model_name: strdb_connection: strapi_key: str# 处理用户消息的节点def process_message(state: AgentState, runtime: Runtime[ContextSchema]) -> dict:"""从 context 中读取模型名称、数据库连接等,然后模拟处理请求"""print("执行节点: process_message")# 拿到用户刚发的消息last_message = state["messages"][-1].content if state["messages"] else ""print(f"用户消息: {last_message}")print("=========以下是从RuntimeContext中获得信息=========")# 从 runtime.context 里取出各种配置信息model_name = runtime.context.model_namedb_connection = runtime.context.db_connectionapi_key = runtime.context.api_keyprint(f"使用的模型: {model_name}")print(f"数据库连接: {db_connection}")# 安全考虑,只显示前5位,其余用***隐藏print(f"API密钥前缀: {api_key[:5]}***")# 模拟用这些信息处理请求response = f"使用 {model_name} 处理了您的请求,已连接到 {db_connection}"return {"messages": [AIMessage(content=response)],"response": response}# 生成最终响应的节点def generate_response(state: AgentState, runtime: Runtime[ContextSchema]) -> dict:"""利用 context 中的模型信息,给用户一个更完整的回复"""print("执行节点: generate_response")model_name = runtime.context.model_nameprint(f"使用模型 {model_name} 生成最终响应")previous_response = state["response"]final_response = f"{previous_response}nn这是使用 {model_name} 生成的完整响应。"return {"messages": [AIMessage(content=final_response)],"response": final_response}def main():"""演示 context_schema 的完整用法"""print("=== Context Schema 演示 ===n")# 构造上下文对象,里面包含模型、数据库、API 密钥context = ContextSchema(model_name="gpt-4-turbo",db_connection="postgresql://user:pass@localhost:5432/orders_db",api_key="sk-abcdefghijklmnopqrstuvwxyz123456")# 创建图,指定 state_schema 和 context_schemabuilder = StateGraph(AgentState, context_schema=ContextSchema)# 添加节点builder.add_node(node="process_message", action=process_message)builder.add_node("generate_response", generate_response)# 添加边builder.add_edge(START, "process_message")builder.add_edge("process_message", "generate_response")builder.add_edge("generate_response", END)# 编译图graph = builder.compile()# 定义初始状态initial_state = {"messages": [HumanMessage(content="请帮我查询最新的订单信息")],"response": ""}print("初始状态:", initial_state)print()print("上下文信息:n", {"model_name": context.model_name,"db_connection": context.db_connection,"api_key": f"{context.api_key[:5]}***"})print("n" + "-" * 50 + "n")# 执行图,通过 context 参数传入上下文result = graph.invoke(initial_state, context=context)print("n" + "=" * 50)print("最终状态:", result)print("n最终响应:")print(result["response"])if __name__ == "__main__":main()
执行结果:
三.Send用法
"""SendDemo.py — 展示 LangGraph 的 Map-Reduce 模式,用 Send 对象动态生成子任务,根据运行时状态决定执行路径。具体流程:先生成主题列表,然后为每个主题创建一个 Send 任务,并行执行 make_joke,最后合并结果。非常适合处理数量不确定的任务。"""from typing import Annotated, List, Sequencefrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import Send# 定义状态class AtguiguState(TypedDict):subjects: List[str]jokes: Annotated[List[str], lambda x, y: x + y]# 使用列表合并的方式# 第一个节点:生成需要处理的主题列表def generate_subjects(state: AtguiguState) -> dict:"""生成三个主题:猫、狗、程序员"""print("执行节点(第一个节点:生成需要处理的主题列表): generate_subjects")subjects = ["猫", "狗", "程序员"]print(f"生成主题列表: {subjects}")return {"subjects": subjects}# Map节点:为每个主题生成笑话def make_joke(state: AtguiguState) -> dict:"""根据主题生成对应的笑话"""subject = state.get("subject", "未知")print(f"执行节点: make_joke,处理主题: {subject}")# 不同主题对应不同笑话,硬编码了三个经典段子jokes_map = {"猫": "为什么猫不喜欢在线购物?因为它们更喜欢实体店!","狗": "为什么狗不喜欢计算机?因为它们害怕被鼠标咬!","程序员": "为什么程序员喜欢洗衣服?因为他们在寻找bugs!","未知": "这是一个关于未知主题的神秘笑话。"}joke = jokes_map.get(subject, f"这是一个关于{subject}的即兴笑话。")print(f"生成笑话: {joke}")return {"jokes": [joke]}# 条件边函数:根据主题列表生成Send对象列表def map_subjects_to_jokes(state: AtguiguState) -> List[Send]:"""为每个主题创建一个 Send 对象,指向 make_joke 节点"""print("执行条件边函数: map_subjects_to_jokes")subjects = state["subjects"]print(f"映射主题到joke任务: {subjects}")# 列表推导式,逐个生成 Sendsend_list = [Send("make_joke", {"subject": subject}) for subject in subjects]print(f"生成Send对象列表: {send_list}")return send_listdef main():"""运行 Map-Reduce 演示"""print("=== Map-Reduce 模式演示 ===n")# 创建图builder = StateGraph(AtguiguState)# 添加节点builder.add_node("generate_subjects", generate_subjects)builder.add_node("make_joke", make_joke)# 添加边builder.add_edge(START, "generate_subjects")# 条件边,返回 Send 列表实现并行分支builder.add_conditional_edges(source="generate_subjects",path=map_subjects_to_jokes)# 每个 make_joke 执行完后都直接到 ENDbuilder.add_edge("make_joke", END)# 编译图graph = builder.compile()print(graph.get_graph().print_ascii())# 执行图initial_state = {"subjects": [], "jokes": []}print("初始状态:", initial_state)print("n开始执行图...")result = graph.invoke(initial_state)print(f"n最终结果: {result}")print("n=== 演示完成 ===")if __name__ == "__main__":main()
执行结果:
