Agent 系统的记忆不是简单的数据存储,而是状态的有序演进。当多个 Agent 并行工作,当用户对话跨天继续,当系统需要回滚到某个决策点——记忆一致性成为系统可靠性的基石。

本文讨论 LangGraph 中的状态持久化机制,以及如何在多 Agent 场景中维护记忆一致性。

记忆一致性的核心挑战

Agent 系统的记忆管理面临几个独特挑战:

1. 状态演进的不确定性

与数据库事务不同,Agent 的推理过程是非确定性的。同样的输入可能因模型随机性产生不同输出,状态演进路径不唯一。

2. 长时间运行的会话

一个客服对话可能持续数小时,一个研究任务可能跨越多天。期间系统可能重启、升级、扩缩容,记忆必须持久化且可恢复。

3. 多 Agent 并行修改

当多个 Agent 同时访问和修改共享状态时,需要协调机制防止冲突和数据丢失。

4. 人机协作的穿插

人类用户可能在任意时刻介入,查看历史、修正错误、提供反馈。记忆系统需要支持这种非结构化的交互模式。

LangGraph 的持久化机制

LangGraph 通过 Checkpointer 机制实现状态持久化。核心思想是:在每个节点执行前后捕获完整状态,支持断点续跑和状态回滚。

Checkpointer 基础架构

1
2
3
4
5
6
7
8
9
10
11
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph

# 内存级别的 Checkpointer(开发测试用)
memory = MemorySaver()

graph = StateGraph(State)
# ... 配置节点和边 ...

# 编译时附加 Checkpointer
app = graph.compile(checkpointer=memory)

Checkpointer 在以下时机自动保存状态:

  • 每个节点执行完成后
  • 条件边决策后
  • 发生异常中断时

配置持久化

1
2
3
4
5
6
7
8
9
10
11
12
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.postgres import PostgresSaver

# SQLite 持久化(单机部署)
sqlite_saver = SqliteSaver.from_conn_string("checkpoint.db")

# PostgreSQL 持久化(生产集群)
postgres_saver = PostgresSaver.from_conn_string(
"postgresql://user:pass@host/db"
)

app = graph.compile(checkpointer=postgres_saver)

选择持久化后端时需要考虑:

  • 吞吐量:每秒状态写入次数
  • 一致性要求:是否接受最终一致性
  • 查询需求:是否需要复杂的跨会话分析

状态版本控制

每个 checkpoint 包含完整的执行上下文:

1
2
3
4
5
6
7
8
@dataclass
class Checkpoint:
thread_id: str # 会话唯一标识
checkpoint_id: str # 当前状态版本
parent_checkpoint: str # 父状态(支持分支)
state: dict # 完整状态快照
node_path: list # 执行路径
created_at: datetime

这种设计天然支持状态回滚。当当前路径走入死胡同时,可以回退到之前的 checkpoint 重新选择分支。

并发控制策略

多 Agent 同时访问共享状态时需要并发控制。

乐观锁机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class OptimisticCheckpointer(BaseCheckpointSaver):
def put(self, config: RunnableConfig, checkpoint: Checkpoint):
thread_id = config["configurable"]["thread_id"]
current_version = self.get_latest_version(thread_id)

# 版本冲突检测
if checkpoint.parent_checkpoint != current_version.checkpoint_id:
raise ConcurrentModificationError(
f"状态已被修改,当前版本: {current_version.checkpoint_id}, "
f"预期父版本: {checkpoint.parent_checkpoint}"
)

# 生成新版本
checkpoint.checkpoint_id = generate_uuid()
self.store(thread_id, checkpoint)

乐观锁适合读多写少的场景。当冲突发生时,上层代码可以选择重试或向用户报告。

悲观锁机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class PessimisticCheckpointer(BaseCheckpointSaver):
def acquire_lock(self, thread_id: str, timeout: int = 30) -> LockToken:
"""获取分布式锁"""
return self.lock_manager.acquire(
f"checkpoint:{thread_id}",
timeout=timeout
)

def put(self, config: RunnableConfig, checkpoint: Checkpoint,
lock_token: LockToken):
"""需要持有锁才能写入"""
if not self.lock_manager.validate(lock_token):
raise LockExpiredError("锁已过期")

self.store(checkpoint)

悲观锁适合写操作频繁、冲突概率高的场景。缺点是增加延迟,且需要处理锁超时和死锁问题。

无锁设计:状态不可变性

更激进的方案是将状态设计为不可变对象,每次修改都创建新状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from copy import deepcopy
from typing import FrozenSet

@dataclass(frozen=True)
class ImmutableState:
messages: tuple # 使用 tuple 代替 list
context: FrozenSet[Context] # 使用 frozenset
metadata: frozenset

def with_message(self, message: Message) -> "ImmutableState":
"""返回包含新消息的新状态"""
return ImmutableState(
messages=self.messages + (message,),
context=self.context,
metadata=self.metadata
)

不可变状态消除了并发冲突的可能,但增加了内存开销和 GC 压力。适合状态体较小、更新频率不高的场景。

多 Agent 共享记忆

当多个 Agent 协作完成复杂任务时,需要设计共享记忆机制。

共享工作区模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class SharedWorkspace:
"""多 Agent 共享的工作区"""

def __init__(self, thread_id: str, checkpointer: Checkpointer):
self.thread_id = thread_id
self.checkpointer = checkpointer
self._lock = asyncio.Lock()

async def read(self, agent_id: str) -> WorkspaceState:
"""Agent 读取工作区状态"""
checkpoint = await self.checkpointer.aget(
{"configurable": {"thread_id": self.thread_id}}
)
return WorkspaceState.from_checkpoint(checkpoint)

async def write(self, agent_id: str, delta: StateDelta) -> bool:
"""Agent 提交修改"""
async with self._lock:
current = await self.read(agent_id)
new_state = current.apply(delta, agent_id)

# 写入时附加修改者信息
await self.checkpointer.aput(
{"configurable": {"thread_id": self.thread_id}},
new_state.to_checkpoint(),
metadata={"modified_by": agent_id, "delta": delta}
)
return True

工作区模式的特点是:

  • 所有 Agent 读写同一个状态空间
  • 需要锁机制防止冲突
  • 适合任务高度耦合的场景

消息总线模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class MessageBus:
"""基于消息的多 Agent 通信"""

def __init__(self):
self.subscribers: dict[str, list[Callable]] = {}
self.message_log: list[AgentMessage] = []

def subscribe(self, topic: str, handler: Callable):
"""订阅特定类型的消息"""
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(handler)

async def publish(self, message: AgentMessage):
"""发布消息给所有订阅者"""
self.message_log.append(message)

handlers = self.subscribers.get(message.topic, [])
await asyncio.gather(*[
handler(message) for handler in handlers
])

消息总线模式的特点是:

  • Agent 通过异步消息通信
  • 解耦了发送者和接收者
  • 天然支持事件溯源和回放
  • 适合 Agent 职责边界清晰的场景

混合架构

生产环境通常采用混合架构:共享工作区存储任务核心状态,消息总线处理事件通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class HybridMemorySystem:
def __init__(self):
self.workspace = SharedWorkspace()
self.message_bus = MessageBus()

async def agent_action(self, agent_id: str, action: Action):
# 修改共享状态
success = await self.workspace.write(agent_id, action.delta)

if success:
# 广播事件
await self.message_bus.publish(AgentMessage(
topic="state_changed",
sender=agent_id,
payload={"action": action.type, "delta": action.delta}
))

会话恢复与断点续跑

持久化的核心价值是支持会话恢复。当系统崩溃或需要暂停时,可以从任意 checkpoint 恢复。

基础恢复

1
2
3
4
5
6
7
# 启动新会话
config = {"configurable": {"thread_id": "session-123"}}
result = await app.ainvoke(initial_state, config)

# 系统重启后恢复相同会话
restored_config = {"configurable": {"thread_id": "session-123"}}
result = await app.ainvoke(None, restored_config) # None 表示从 checkpoint 恢复

指定版本恢复

1
2
3
4
5
6
7
8
# 回滚到特定版本
config = {
"configurable": {
"thread_id": "session-123",
"checkpoint_id": "chk-abc-456" # 指定版本
}
}
result = await app.ainvoke(None, config)

分支与合并

复杂场景可能需要从某个 checkpoint 分叉出多个探索分支:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 创建分支
branch_config = await app.afork(
parent_config={"thread_id": "main", "checkpoint_id": "v10"},
branch_id="exploration-a"
)

# 在分支上独立执行
result_a = await app.ainvoke(exploration_input, branch_config)

# 比较多个分支结果后合并回主线
await app.amerge(
target={"thread_id": "main"},
source_branches=["exploration-a", "exploration-b"],
resolution_strategy="manual_review"
)

生产环境最佳实践

1. 状态分层存储

不是所有状态都需要 checkpoint。将状态分层:

1
2
3
4
5
6
7
8
9
10
11
12
class LayeredState(BaseModel):
# L1: 必须持久化(业务关键数据)
user_intent: str
confirmed_booking: Booking

# L2: 建议持久化(恢复体验)
conversation_history: list[Message]
retrieved_context: list[Document]

# L3: 无需持久化(可重建)
temp_calculations: dict
cached_embeddings: list[Vector]

L1 和 L2 进入 checkpoint,L3 在节点内临时计算。

2. 状态压缩与清理

长时间运行的会话会产生大量 checkpoint。需要策略性清理:

1
2
3
4
5
6
7
8
async def cleanup_old_checkpoints(thread_id: str, keep_last: int = 10):
"""保留最近 N 个 checkpoint,其余归档"""
checkpoints = await checkpointer.alist(thread_id)

to_archive = checkpoints[:-keep_last]
for chk in to_archive:
await archive_storage.store(chk)
await checkpointer.adelete(chk.checkpoint_id)

3. 一致性监控

建立监控指标,及时发现一致性问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class ConsistencyMonitor:
def __init__(self):
self.metrics = {
"checkpoint_failures": Counter(),
"concurrent_conflicts": Counter(),
"recovery_attempts": Counter(),
"recovery_success": Counter()
}

async def check_consistency(self, thread_id: str) -> HealthReport:
checkpoints = await self.checkpointer.alist(thread_id)

# 检查状态链完整性
gaps = self._find_gaps(checkpoints)

# 检查数据完整性
corruptions = self._detect_corruption(checkpoints)

return HealthReport(
thread_id=thread_id,
checkpoint_count=len(checkpoints),
gaps=gaps,
corruptions=corruptions,
healthy=len(gaps) == 0 and len(corruptions) == 0
)

4. 测试策略

记忆一致性需要专门的测试策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@pytest.mark.asyncio
async def test_concurrent_modifications():
"""测试并发修改的冲突处理"""
workspace = SharedWorkspace("test-thread", checkpointer)

# 两个 Agent 同时修改同一字段
task1 = workspace.write("agent-a", {"counter": 1})
task2 = workspace.write("agent-b", {"counter": 2})

results = await asyncio.gather(task1, task2, return_exceptions=True)

# 至少一个应该失败或按序执行
assert any(isinstance(r, ConcurrentModificationError) for r in results) or \
results.count(True) == 2

@pytest.mark.asyncio
async def test_recovery_after_crash():
"""测试崩溃后状态恢复"""
thread_id = "recovery-test"

# 执行到一半
app = create_app()
await app.ainvoke({"step": 1}, {"configurable": {"thread_id": thread_id}})

# 模拟崩溃:重建 app 实例
app2 = create_app()
result = await app2.ainvoke(
None,
{"configurable": {"thread_id": thread_id}}
)

# 验证状态正确恢复
assert result["step"] == 1

总结

Agent 记忆一致性是生产级系统的核心能力。LangGraph 的 Checkpointer 机制提供了状态持久化的基础设施,但正确使用需要理解并发控制、状态分层、故障恢复等概念。

关键设计原则:

  1. 显式优于隐式:状态的保存和恢复点应该在代码中清晰可见,不要依赖自动魔法
  2. 分层存储:区分必须持久化、建议持久化和临时状态,降低存储成本
  3. 防御性设计:假设网络分区、节点故障、并发冲突一定会发生,代码中主动处理
  4. 可观测性:记录足够的状态变更日志,支持事后审计和故障排查

记忆一致性的最终目标不是绝对正确,而是让系统在出现问题时可检测、可恢复、可解释。这才是生产环境可以依赖的 Agent 系统。