一、checkpointer=InMemorySa ver():将状态保存到内存
先从最轻量级的方案说起。在LangGraph中,工作流的“记忆”是通过检查点(Checkpoint)机制来体现的。而这个机制背后,是一整套由检查点保存器(BaseCheckpointSa ver)定义的接口,以及序列化/反序列化的规范。简单来说,这就是给工作流“拍快照”的基础设施。
LangGraph 1.0 内置了一个非常实用的作为示例的检查点实现:InMemorySa ver。它的特点非常鲜明:数据全部暂存在内存里,程序一关闭就全部归零,什么都不剩。好处是不需要任何额外配置,拿来就能用。这对本地测试、临时验证工作流逻辑来说,再合适不过了。
先看这段代码,核心是定义了一个状态类型,然后串联起三个执行步骤。
"""MemoryPersistence.pylanggraph-checkpoint:检查点保存器(BaseCheckpointSa ver)的基础接口以及序列化/反序列化接口(SerializerProtocol)。包含用于实验的内存中检查点实现(InMemorySa ver)。LangGraph 已内置 langgraph-checkpoint。LangGraph 1.0 持久化存储演示 - 内存存储 (In-Memory)特点:- 数据暂存于内存,程序关闭后丢失- 无需额外配置- 适用于本地测试和临时验证工作流逻辑"""
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySa ver
import operator
# 定义状态
class PersistenceDemoState(TypedDict):
# operator.add:将元素追加到现有元素中,支持列表、字符串、数值类型的追加
messages: Annotated[list, operator.add]
step_count: Annotated[int, operator.add]
# 节点函数
def step_one(state: PersistenceDemoState) -> dict:
print("执行步骤 1")
return {"messages": ["执行了步骤 1"], "step_count": 1}
def step_two(state: PersistenceDemoState) -> dict:
print("执行步骤 2")
return {"messages": ["执行了步骤 2"], "step_count": 1}
def step_three(state: PersistenceDemoState) -> dict:
print("执行步骤 3")
return {"messages": ["执行了步骤 3"], "step_count": 1}
# 构建图
def create_graph():
builder = StateGraph(PersistenceDemoState)
builder.add_node("step_one", step_one)
builder.add_node("step_two", step_two)
builder.add_node("step_three", step_three)
builder.add_edge(START, "step_one")
builder.add_edge("step_one", "step_two")
builder.add_edge("step_two", "step_three")
builder.add_edge("step_three", END)
return builder
def main():
print("=== LangGraph 1.0 内存持久化存储演示 ===\n")
# 编译图并使用内存存储
graph = create_graph()
app = graph.compile(checkpointer=InMemorySa ver())
# 配置线程ID用于存储状态
config = {"configurable": {"thread_id": "user_13811112222"}}
print("1. 首次执行工作流:")
result = app.invoke({"messages": ["开始执行"], "step_count": 0}, config)
print(f"执行结果result: {result}\n")
print("2. 检查存储的状态:")
sa ved_state = app.get_state(config)
print(f"保存的状态: {sa ved_state.values}")
print(f"下一个节点: {sa ved_state.next}\n")
# 获取指定线程的完整执行历史(正序:从最早到最晚,第一步在栈底)
history = app.get_state_history(config)
# 遍历历史中的每一个检查点快照
for checkpoint in history:
print("=" * 50)
# 该时刻的完整State状态(最核心)
print(f"当前状态: {checkpoint.values}")
print("=" * 80)
print("3. 恢复执行工作流:")
# 由于工作流已经完成,这里会直接返回最终结果
result2 = app.invoke(None, config)
print(f"恢复执行结果: {result2}\n")
print("=== 演示结束 ===")
if __name__ == "__main__":
main()
执行结果:
图片
熟悉LangGraph的朋友应该能直接看懂这个结构。关键在于编译图时,我们传入了一个 InMemorySa ver() 实例,并配置了一个 thread_id。这个 thread_id 就是工作流“记忆”的钥匙。后续通过 get_state 和 get_state_history,可以查询到当前状态,甚至是整个执行过程的完整快照历史。工作流执行完成后,通过相同的 thread_id 和 invoke 命令,你可以任何时候回来查看最终状态——这在交互式应用中非常有价值。
二、checkpointer=from_conn_string(DB_URI):将状态持久化到数据库
内存存储固然方便,但一关程序数据就没了,这在生产环境中显然是不可接受的。真正的持久化存储,需要把状态写到磁盘或数据库中。
在LangGraph的架构里,检查点功能由符合BaseCheckpointSa ver接口的检查点对象提供。为了满足不同场景,官方提供了多种独立的实现库,可以通过pip install安装:
langgraph-checkpoint-sqlite:基于SQLite。非常轻量,配置简单,适合实验和本地工作流。langgraph-checkpoint-postgres:基于Postgres。适用于生产环境,配合LangSmith使用效果更佳。
这里我们演示SQLite的实现,因为它足够简单,能让你快速看到效果。从代码层面来看,持久化到SQLite和Postgres的接入方式几乎是完全一样的,关键就在于替换那个checkpointer。
下面的代码片段,展示了如何从一个数据库连接字符串创建PostgresSa ver(或SqliteSa ver):
'''SqlitePersistence.py在底层,检查点功能由符合BaseCheckpointSa ver接口的检查点对象提供支持。LangGraph提供了多种检查点实现,所有这些实现都通过独立的、可安装的库来完成,数据库类型的有: langgraph-checkpoint-sqlite:使用SQLite数据库(SqliteSa ver / AsyncSqliteSa ver)存储检查点。非常适合实验和本地工作流程。需要单独安装。 langgraph-checkpoint-postgres:使用Postgres数据库(PostgresSa ver / AsyncPostgresSa ver)存储检查点,用于LangSmith。非常适合在生产环境中使用。需要单独安装。......本次案例,安装sqlite所需依赖pip install langgraph-checkpoint-sqlite'''
import sqlite3
import operator
from typing import TypedDict, Annotated
from langgraph.checkpoint.postgres import PostgresSa ver
from langgraph.checkpoint.sqlite import SqliteSa ver
from langgraph.graph import StateGraph,START,END
class MyState(TypedDict):
messages:Annotated[list,operator.add]
def node_1(state:MyState):
return {"messages":["abc","def"]}
def main():
DB_URI = "postgresql://postgres:123456@localhost:5432/postgres"
builder = StateGraph(MyState)
builder.add_node("node_1",node_1)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", END)
with PostgresSa ver.from_conn_string(DB_URI) as checkpointer:
checkpointer.setup()
graph = builder.compile(checkpointer=checkpointer)
# 同一个用户id下,每次执行都会插入一次新数据,上课时记得修改用户编号或者直接删除D:44sqlite_data.db
config = {"configurable": {"thread_id": "user-002"}}
initial_state = graph.get_state(config)
print(f"Initial state: {initial_state}")
# 执行图
result = graph.invoke({"messages":[]}, config)
print(f"Result: {result}")
print()
print("====================查看执行后的状态====================")
# 查看执行后的状态
final_state = graph.get_state(config)
print()
print(f"Final state: {final_state}")
if __name__ == '__main__':
main()
执行了4次之后的结果:
图片
持久化到数据库与内存持久化相比,最大的区别是:数据不会因为程序重启而丢失。同一个thread_id下的多次执行,都会在数据库里追加新的状态记录。这一点在代码执行结果中体现得非常明显——每次invoke成功后,再次查询get_state,你都会看到状态中的messages列表在持续增长。这意味着你的工作流拥有了真正的、持久的“记忆”。
结合这两个演示,可以看到LangGraph的持久化设计非常优雅:切换存储后端,只需要更换一个checkpointer对象,而工作流的核心逻辑不需要做任何改动。从本地测试到生产部署,这条路径平滑且清晰。
