AI Agent 记忆管理:从短期上下文到长程一致性的工程实践

一个记不住用户偏好的 AI Agent,就像一个每天都重新认识的陌生人。记忆不是锦上添花的功能,而是决定 Agent 能否从”工具”进化为”伙伴”的核心能力。

一、为什么记忆管理是 Agent 的刚需

1.1 没有记忆的 Agent 有多痛苦

想象你在和一个客服 Agent 对话:

1
2
3
4
用户: "我想订一张明天去上海的机票"
Agent: "好的,已为您查询到明天飞往上海的航班..."
用户: "要上午的"
Agent: "抱歉,您要订去哪里的机票?"

这个 Agent 犯了最基础的错误:丢失了对话上下文。但它的问题远不止于此:

  • 记不住用户偏好:每次都要重新询问素食、靠窗座位
  • 无法关联历史:不知道用户上周投诉过同样的问题
  • 缺乏长期学习:重复犯同样的错误,永远不长记性

1.2 记忆的三重境界

Agent 的记忆能力可以分成三个层次:

层次 能力 时间跨度 技术实现
短期记忆 当前对话上下文 单次会话 Context Window
工作记忆 跨会话状态保持 小时/天 Checkpointer
长期记忆 用户画像、知识积累 永久 Vector Store + 知识库

大多数 Demo 级 Agent 只有第一层。生产级 Agent 需要三层俱全。

1.3 记忆设计的核心挑战

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 这不是简单的存储问题
class MemoryChallenge:
"""真实场景下的记忆难题"""

# 1. 容量限制
context_window = 128000 # tokens,但填满后性能骤降

# 2. 检索精度
relevant_memories = retrieve(query="用户喜欢什么?") # 可能返回无关内容

# 3. 一致性维护
user_preference = "喜欢深色模式" # 但用户昨天说改了

# 4. 隐私与安全
sensitive_data = "用户病历..." # 谁可以访问?保留多久?

1.4 三层记忆架构概览

graph TB
    subgraph 应用层[Agent 应用层]
        A[对话界面]
        B[任务执行]
        C[决策引擎]
    end
    
    subgraph 记忆管理层
        D[短期记忆<br/>Context Window]
        E[工作记忆<br/>Checkpointer]
        F[长期记忆<br/>Vector Store]
    end
    
    subgraph 存储层
        G[内存/Redis]
        H[PostgreSQL]
        I[向量数据库]
    end
    
    A --> D
    B --> D
    C --> D
    D -->|溢出/摘要| E
    E -->|持久化| F
    D --> G
    E --> H
    F --> I
    
    style D fill:#e1f5ff
    style E fill:#e8f5e9
    style F fill:#fff3e0

架构说明:

  • 短期记忆:处理当前对话,存储在内存中
  • 工作记忆:跨会话状态保持,使用 PostgreSQL 持久化
  • 长期记忆:语义检索,使用向量数据库存储
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

## 二、短期记忆:Context Window 的最优利用

### 2.1 不是填满就好

很多开发者有个误区:既然模型支持 128K 上下文,那就把所有历史记录塞进去。这是灾难性的做法。

研究表明:
- **Lost in the Middle**:长上下文中间的信息提取准确率显著下降
- **成本爆炸**:token 用量与上下文长度成正比
- **延迟增加**:处理长上下文需要更多时间

```python
# 错误示范:全部塞进去
messages = load_all_conversation_history(user_id) # 可能 50000 tokens
response = llm.invoke(messages + [new_message]) # 慢且贵

# 正确做法:智能压缩
def compress_conversation(messages: list, max_tokens: int = 4000):
"""
对话压缩策略:
1. 保留最近 N 轮完整对话
2. 对更早的对话做摘要
3. 丢弃与当前话题无关的内容
"""
if estimate_tokens(messages) <= max_tokens:
return messages

# 保留最近 6 轮完整对话
recent = messages[-12:] # user + assistant = 2 per round

# 对历史做摘要
older = messages[:-12]
summary = generate_summary(older)

return [summary] + recent

2.2 滑动窗口与摘要策略

LangGraph 提供了内置的上下文管理工具:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph

# 配置滑动窗口
class ConversationState(TypedDict):
messages: Annotated[list, add_messages]
summary: str # 累积摘要

def manage_context(state: ConversationState):
"""动态管理上下文窗口"""
messages = state["messages"]

# 当消息超过阈值时触发摘要
if len(messages) > 20:
# 生成摘要
summary = generate_conversation_summary(messages[:-10])

# 保留最近 10 条 + 摘要
return {
"summary": state["summary"] + "\n" + summary,
"messages": messages[-10:]
}

return state

# 构建 Graph
builder = StateGraph(ConversationState)
builder.add_node("chat", chat_node)
builder.add_node("summarize", manage_context)

2.3 Token 预算分配

不是所有内容都值得占用宝贵的上下文空间。合理分配 token 预算:

1
2
3
4
5
6
7
8
CONTEXT_BUDGET = {
"system_prompt": 500, # 系统指令固定保留
"user_profile": 300, # 用户画像
"conversation_summary": 1000, # 对话摘要
"recent_messages": 2000, # 最近对话
"retrieved_context": 1500, # RAG 检索结果
"working_memory": 500, # 当前任务状态
}

三、工作记忆:跨会话状态持久化

3.1 Checkpointer 机制

LangGraph 的 Checkpointer 是实现工作记忆的核心。它能在会话中断后恢复状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph import StateGraph

# 配置 PostgreSQL 持久化
conn_string = "postgresql://user:pass@localhost:5432/agent_db"
checkpointer = PostgresSaver(conn_string=conn_string)

# 构建带持久化的 Graph
builder = StateGraph(State)
builder.add_node("agent", agent_node)
builder.add_node("tools", tool_node)
builder.set_entry_point("agent")

# 关键:添加 checkpointer
graph = builder.compile(checkpointer=checkpointer)

# 使用线程 ID 恢复对话
thread_id = "user_123_session_456"
config = {"configurable": {"thread_id": thread_id}}

# 第一次调用
result1 = graph.invoke({"messages": [user_message]}, config)

# 一小时后,从断点继续
result2 = graph.invoke({"messages": [new_message]}, config)
# 自动恢复之前的状态:工具结果、中间变量、执行路径

3.2 状态恢复的粒度控制

不是每次都要恢复全部状态。根据场景选择恢复策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class RecoveryStrategy(Enum):
FULL = "full" # 完整恢复所有状态
PARTIAL = "partial" # 只恢复关键状态
FRESH = "fresh" # 全新会话,但带摘要

def resume_with_strategy(thread_id: str, strategy: RecoveryStrategy):
checkpoint = checkpointer.get(thread_id)

if strategy == RecoveryStrategy.FULL:
# 完整恢复,包括中间计算结果
return checkpoint["state"]

elif strategy == RecoveryStrategy.PARTIAL:
# 只保留必要状态
return {
"messages": checkpoint["state"]["messages"][-5:],
"user_intent": checkpoint["state"]["user_intent"],
}

elif strategy == RecoveryStrategy.FRESH:
# 摘要 + 全新开始
summary = generate_summary(checkpoint["state"]["messages"])
return {
"messages": [SystemMessage(content=f"历史摘要: {summary}")]
}

3.3 超时与清理策略

持久化状态不能无限堆积,需要合理的生命周期管理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class StateLifecycleManager:
"""管理工作记忆的生命周期"""

def __init__(self):
self.ttl_config = {
"active_session": timedelta(hours=24), # 活跃会话保留 24 小时
"completed_task": timedelta(hours=2), # 完成的任务 2 小时后清理
"error_state": timedelta(days=7), # 错误状态保留 7 天用于调试
}

def should_archive(self, thread_id: str) -> bool:
"""判断是否应该归档"""
metadata = checkpointer.get_metadata(thread_id)
last_active = metadata["last_active"]
status = metadata["status"]

ttl = self.ttl_config.get(status, timedelta(hours=1))
return datetime.now() - last_active > ttl

def archive_and_cleanup(self):
"""定期归档和清理"""
for thread_id in checkpointer.list_threads():
if self.should_archive(thread_id):
# 归档到长期存储
archive_to_s3(thread_id)
# 清理工作记忆
checkpointer.delete(thread_id)

四、长期记忆:向量存储与知识沉淀

4.1 向量检索架构

长期记忆的核心是语义检索。当 Agent 需要”回忆”时,通过向量相似度找到相关记忆。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma

class LongTermMemory:
"""长期记忆系统"""

def __init__(self, user_id: str):
self.user_id = user_id
self.embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

# 每个用户独立的向量集合
self.vectorstore = Chroma(
collection_name=f"user_{user_id}_memories",
embedding_function=self.embeddings,
persist_directory="./memories"
)

def store_memory(self, content: str, metadata: dict):
"""存储记忆"""
# 添加时间戳和分类
doc = Document(
page_content=content,
metadata={
**metadata,
"timestamp": datetime.now().isoformat(),
"user_id": self.user_id,
}
)
self.vectorstore.add_documents([doc])

def retrieve_relevant(self, query: str, k: int = 5) -> list:
"""检索相关记忆"""
results = self.vectorstore.similarity_search_with_score(
query=query,
k=k,
filter={"user_id": self.user_id} # 只检索当前用户的记忆
)
return results

4.2 记忆分类与优先级

不是所有记忆都一样重要。分类管理可以提升检索效率:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class MemoryTypes:
"""记忆分类体系"""

# 用户画像类 - 高优先级,长期保留
PREFERENCES = "preferences" # 口味、偏好设置
FACTS = "facts" # 用户的基本信息
GOALS = "goals" # 用户的目标和愿望

# 交互历史类 - 中优先级,定期清理
CONVERSATIONS = "conversations" # 重要对话摘要
DECISIONS = "decisions" # 用户的决策记录
FEEDBACK = "feedback" # 用户的反馈和评价

# 临时信息类 - 低优先级,短期保留
TEMP_CONTEXT = "temp" # 临时上下文
TASK_STATE = "task" # 任务中间状态

def store_with_priority(self, content: str, memory_type: str):
"""根据类型设置不同的保留策略"""

retention_config = {
MemoryTypes.PREFERENCES: {"priority": 1, "ttl_days": None}, # 永久
MemoryTypes.FACTS: {"priority": 1, "ttl_days": None},
MemoryTypes.CONVERSATIONS: {"priority": 2, "ttl_days": 90},
MemoryTypes.TEMP_CONTEXT: {"priority": 3, "ttl_days": 7},
}

config = retention_config.get(memory_type, {"priority": 3, "ttl_days": 30})

self.store_memory(content, {
"type": memory_type,
"priority": config["priority"],
"expire_at": (datetime.now() + timedelta(days=config["ttl_days"])).isoformat()
if config["ttl_days"] else None
})

4.3 记忆的置信度与时间衰减

记忆不应该永远保持同样的权重。旧记忆和新记忆应该有区别:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class MemoryWithConfidence:
"""带置信度的记忆管理"""

def calculate_relevance(self, memory: dict, query: str) -> float:
"""计算记忆对当前查询的相关性"""

# 向量相似度 (0-1)
vector_score = memory["similarity_score"]

# 时间衰减因子 - 越旧权重越低
age_days = (datetime.now() - memory["timestamp"]).days
time_decay = math.exp(-age_days / 30) # 30 天半衰期

# 置信度权重 - 高置信度记忆更可靠
confidence = memory.get("confidence", 0.8)

# 用户确认过的记忆权重更高
confirmed_boost = 1.5 if memory.get("user_confirmed") else 1.0

return vector_score * time_decay * confidence * confirmed_boost

def retrieve_with_scoring(self, query: str, k: int = 10) -> list:
"""检索并排序,考虑置信度和时间"""
candidates = self.vectorstore.similarity_search_with_score(query, k=k*2)

scored_memories = []
for doc, vector_score in candidates:
memory = {
"content": doc.page_content,
"metadata": doc.metadata,
"similarity_score": vector_score,
"timestamp": datetime.fromisoformat(doc.metadata["timestamp"])
}
memory["final_score"] = self.calculate_relevance(memory, query)
scored_memories.append(memory)

# 按综合得分排序,返回 top k
scored_memories.sort(key=lambda x: x["final_score"], reverse=True)
return scored_memories[:k]

五、记忆一致性:解决冲突与矛盾

5.1 当记忆互相矛盾时

用户昨天说”我喜欢 Python”,今天说”我觉得 Python 太难用了,转 Go 了”。Agent 该怎么处理?

flowchart TD
    A[新记忆输入] --> B{检测冲突}
    B -->|发现冲突| C[检索相关记忆]
    C --> D{时间对比}
    D -->|新记忆更新| E[标记旧记忆过时]
    D -->|旧记忆更可信| F[降低新记忆置信度]
    E --> G[存储新记忆<br/>置信度: 0.9]
    F --> H[存储新记忆<br/>置信度: 0.3]
    B -->|无冲突| I[直接存储]
    
    style E fill:#ffebee
    style F fill:#fff3e0
    style G fill:#e8f5e9
    style H fill:#fff3e0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class MemoryConsistencyManager:
"""记忆一致性管理"""

def detect_conflicts(self, new_memory: str) -> list:
"""检测新记忆是否与已有记忆冲突"""

# 检索相关记忆
related = self.retrieve_relevant(new_memory, k=10)

conflicts = []
for existing in related:
# 使用 LLM 判断是否存在矛盾
is_conflict = self._check_contradiction(
existing=existing["content"],
new=new_memory
)
if is_conflict:
conflicts.append(existing)

return conflicts

def _check_contradiction(self, existing: str, new: str) -> bool:
"""使用 LLM 判断两条记忆是否矛盾"""

prompt = f"""判断以下两条陈述是否矛盾:

已有记忆: {existing}
新记忆: {new}

这两条陈述是否互相矛盾?只回答 "是" 或 "否"。"""

response = llm.invoke(prompt)
return "是" in response.content

def resolve_conflict(self, new_memory: str, conflicts: list):
"""解决记忆冲突"""

if not conflicts:
# 无冲突,直接存储
self.store_memory(new_memory)
return

# 策略 1: 时间优先 - 使用最新的记忆
newest_conflict = max(conflicts, key=lambda x: x["timestamp"])

if new_memory["timestamp"] > newest_conflict["timestamp"]:
# 新记忆更新,标记旧记忆为过时
self._mark_outdated(newest_conflict["id"])
self.store_memory(new_memory, confidence=0.9)
else:
# 旧记忆更可信,但降低新记忆置信度
self.store_memory(new_memory, confidence=0.3)

5.2 用户确认机制

对于高置信度的记忆,可以请用户确认:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class UserConfirmationFlow:
"""用户确认关键记忆"""

def maybe_request_confirmation(self, memory: str, confidence: float):
"""根据置信度决定是否请求确认"""

if confidence > 0.9:
# 高置信度,直接存储
self.store_memory(memory, user_confirmed=False)

elif confidence > 0.6:
# 中等置信度,隐式确认
self.store_memory(memory, user_confirmed=False, pending=True)

else:
# 低置信度,显式请求确认
return {
"action": "request_confirmation",
"message": f"我注意到您似乎对{'Python'}感兴趣,这样理解对吗?",
"pending_memory": memory
}

def on_user_confirm(self, memory_id: str, confirmed: bool):
"""用户确认回调"""
if confirmed:
self.update_memory(memory_id, {
"user_confirmed": True,
"confidence": 1.0,
"confirmed_at": datetime.now().isoformat()
})
else:
# 用户否认,降低置信度或删除
self.delete_memory(memory_id)

六、生产级记忆系统架构

6.1 完整架构设计

graph TB
    subgraph Agent层[Agent 应用层]
        A1[对话管理]
        A2[任务执行]
        A3[记忆感知决策]
    end
    
    subgraph 记忆管理层[记忆管理层]
        M1[短期记忆<br/>Redis]
        M2[工作记忆<br/>PostgreSQL]
        M3[长期记忆<br/>Vector DB]
    end
    
    subgraph 存储层[存储层]
        S1[(内存/Redis)]
        S2[(PostgreSQL)]
        S3[(向量数据库)]
    end
    
    A1 --> M1
    A2 --> M1
    A3 --> M1
    
    M1 -.->|溢出/摘要| M2
    M2 -.->|持久化| M3
    
    M1 --> S1
    M2 --> S2
    M3 --> S3
    
    style M1 fill:#e3f2fd
    style M2 fill:#e8f5e9
    style M3 fill:#fff3e0

架构说明:

  • 短期记忆 → Redis:高速读写,当前会话上下文
  • 工作记忆 → PostgreSQL:持久化状态,支持断点续传
  • 长期记忆 → 向量数据库:语义检索,用户画像与知识

6.2 记忆注入的 Prompt 工程

如何让 LLM 有效利用检索到的记忆:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
MEMORY_AUGMENTED_PROMPT = """你是一个有帮助的 AI 助手。以下是关于用户的已知信息:

## 用户画像
{user_profile}

## 相关历史记忆
{relevant_memories}

## 当前对话上下文
{conversation_history}

## 用户当前输入
{user_input}

请根据以上信息回答用户。注意:
1. 如果历史记忆与当前问题无关,请忽略
2. 如果记忆之间矛盾,优先使用较新的记忆
3. 如果不确定,可以询问用户确认
4. 不要提及"根据您的历史记录"这类表述,要自然融入回答
"""

def build_augmented_prompt(self, user_input: str, state: dict) -> str:
"""构建增强记忆的 Prompt"""

# 检索相关记忆
memories = self.long_term_memory.retrieve_relevant(user_input, k=5)

# 过滤高相关度的记忆
filtered = [m for m in memories if m["final_score"] > 0.7]

# 格式化记忆
memory_text = "\n".join([
f"- {m['content']} (相关度: {m['final_score']:.2f})"
for m in filtered
]) if filtered else "暂无相关历史记忆"

return MEMORY_AUGMENTED_PROMPT.format(
user_profile=self.get_user_profile(),
relevant_memories=memory_text,
conversation_history=format_messages(state["messages"][-6:]),
user_input=user_input
)

6.3 性能优化策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class MemoryOptimization:
"""记忆系统性能优化"""

def __init__(self):
# 多级缓存
self.cache = {
"l1": {}, # 进程内缓存
"l2": redis.Redis(), # Redis 缓存
}

def get_user_profile(self, user_id: str):
"""带缓存的用户画像获取"""

# L1 缓存
if user_id in self.cache["l1"]:
return self.cache["l1"][user_id]

# L2 缓存
cached = self.cache["l2"].get(f"profile:{user_id}")
if cached:
profile = json.loads(cached)
self.cache["l1"][user_id] = profile
return profile

# 数据库
profile = self.db.get_user_profile(user_id)

# 回填缓存
self.cache["l2"].setex(
f"profile:{user_id}",
timedelta(hours=1),
json.dumps(profile)
)
self.cache["l1"][user_id] = profile

return profile

def batch_retrieve(self, queries: list) -> list:
"""批量检索优化"""
# 合并多个查询,减少向量检索次数
pass

def async_write(self, memory: dict):
"""异步写入避免阻塞主流程"""
asyncio.create_task(self._persist_async(memory))

七、隐私、安全与合规

7.1 数据分级与访问控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class MemoryPrivacyManager:
"""记忆隐私管理"""

PRIVACY_LEVELS = {
"public": 1, # 可以共享给第三方
"internal": 2, # 仅限系统内部使用
"sensitive": 3, # 加密存储,需要授权
"restricted": 4, # 严格限制,单独存储
}

def store_with_privacy(self, content: str, level: str, user_id: str):
"""根据隐私级别存储"""

if level == "restricted":
# 加密存储
encrypted = self.encrypt(content, user_id)
self.restricted_store.store(encrypted, metadata={
"user_id": user_id,
"access_log": []
})

elif level == "sensitive":
# 加密 + 访问审计
encrypted = self.encrypt(content)
self.sensitive_store.store(encrypted)

else:
# 普通存储
self.regular_store.store(content, privacy_level=level)

def delete_user_data(self, user_id: str):
"""GDPR 合规 - 用户数据删除"""

# 删除向量存储
self.vectorstore.delete(where={"user_id": user_id})

# 删除工作记忆
self.checkpointer.delete_threads(user_id=user_id)

# 删除缓存
self.cache.delete(f"profile:{user_id}")

# 记录删除日志
self.audit_log.record({
"action": "user_data_deletion",
"user_id": user_id,
"timestamp": datetime.now().isoformat()
})

7.2 记忆遗忘机制

不是所有数据都值得永远保留:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class MemoryRetentionPolicy:
"""记忆保留策略"""

def apply_retention_policy(self):
"""定期清理过期记忆"""

# 1. 删除临时数据
self.vectorstore.delete(where={
"type": "temp",
"timestamp": {"$lt": datetime.now() - timedelta(days=7)}
})

# 2. 归档旧对话
old_conversations = self.vectorstore.search(where={
"type": "conversations",
"timestamp": {"$lt": datetime.now() - timedelta(days=90)}
})

for conv in old_conversations:
# 归档到冷存储
self.archive_to_s3(conv)
# 从热存储删除
self.vectorstore.delete(ids=[conv.id])

# 3. 压缩高频访问的向量索引
self.vectorstore.optimize()

八、实战:构建一个带完整记忆的 Agent

8.1 完整代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres import PostgresSaver
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
import chromadb

class EnhancedAgentState(TypedDict):
"""增强版 Agent 状态"""
messages: Annotated[list, add_messages]
user_intent: str
retrieved_memories: list
user_profile: dict
summary: str

class MemoryEnhancedAgent:
"""带三层记忆的增强 Agent"""

def __init__(self, user_id: str):
self.user_id = user_id

# 初始化长期记忆
self.long_term_memory = Chroma(
collection_name=f"user_{user_id}",
embedding_function=OpenAIEmbeddings(),
persist_directory="./memories"
)

# 初始化工作记忆 (Checkpointer)
self.checkpointer = PostgresSaver(conn_string=DB_URL)

# 构建 Graph
self.graph = self._build_graph()

def _build_graph(self):
"""构建带记忆的 Agent Graph"""

builder = StateGraph(EnhancedAgentState)

# 节点定义
builder.add_node("retrieve_memories", self.retrieve_memories_node)
builder.add_node("understand_intent", self.understand_intent_node)
builder.add_node("generate_response", self.generate_response_node)
builder.add_node("update_memory", self.update_memory_node)

# 边定义
builder.set_entry_point("retrieve_memories")
builder.add_edge("retrieve_memories", "understand_intent")
builder.add_edge("understand_intent", "generate_response")
builder.add_edge("generate_response", "update_memory")
builder.add_edge("update_memory", END)

# 编译,加入持久化
return builder.compile(checkpointer=self.checkpointer)

def retrieve_memories_node(self, state: EnhancedAgentState):
"""检索长期记忆"""

# 获取用户最新输入
last_message = state["messages"][-1].content

# 检索相关记忆
results = self.long_term_memory.similarity_search_with_score(
query=last_message,
k=5
)

# 过滤和格式化
memories = [
{"content": doc.page_content, "score": score}
for doc, score in results
if score > 0.7
]

return {"retrieved_memories": memories}

def understand_intent_node(self, state: EnhancedAgentState):
"""理解用户意图"""

# 使用记忆增强的 Prompt
prompt = self._build_intent_prompt(state)
response = llm.invoke(prompt)

return {"user_intent": response.content}

def generate_response_node(self, state: EnhancedAgentState):
"""生成回复"""

# 构建完整上下文
system_msg = SystemMessage(content=self._build_system_prompt(state))

messages = [system_msg] + state["messages"]

response = llm.invoke(messages)

return {"messages": [response]}

def update_memory_node(self, state: EnhancedAgentState):
"""更新长期记忆"""

# 提取值得记忆的信息
conversation_summary = self._extract_key_info(state["messages"][-2:])

if conversation_summary:
self.long_term_memory.add_texts(
texts=[conversation_summary],
metadatas=[{
"timestamp": datetime.now().isoformat(),
"type": "conversation",
"user_id": self.user_id
}]
)

# 检查是否需要更新用户画像
if self._should_update_profile(state):
self._update_user_profile(state)

return {}

def chat(self, message: str, thread_id: str = None):
"""主入口"""

thread_id = thread_id or f"{self.user_id}_{uuid.uuid4()}"
config = {"configurable": {"thread_id": thread_id}}

result = self.graph.invoke(
{"messages": [HumanMessage(content=message)]},
config=config
)

return result["messages"][-1].content

# 使用示例
agent = MemoryEnhancedAgent(user_id="user_123")

# 第一次对话
response1 = agent.chat("你好,我叫张三,是一名 Python 开发者")

# 后续对话会自动记住之前的上下文
response2 = agent.chat("帮我写一个处理 JSON 的函数")
# Agent 会知道:1) 用户叫张三 2) 用户用 Python

# 重启后从断点恢复
response3 = agent.chat("刚才的函数再加个异常处理", thread_id="same_thread")

8.2 效果验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def test_memory_system():
"""测试记忆系统效果"""

agent = MemoryEnhancedAgent(user_id="test_user")

# 测试 1: 短期记忆
agent.chat("我叫李四")
response = agent.chat("我叫什么名字?")
assert "李四" in response, "短期记忆失败"

# 测试 2: 工作记忆(会话恢复)
thread_id = "test_thread_1"
agent.chat("我正在做数据分析项目", thread_id=thread_id)

# 模拟重启后恢复
agent2 = MemoryEnhancedAgent(user_id="test_user")
response = agent2.chat("刚才的项目进度如何?", thread_id=thread_id)
assert "数据分析" in response, "工作记忆恢复失败"

# 测试 3: 长期记忆
agent.chat("我喜欢用 Pandas 处理数据")

# 隔一会再询问(模拟长期记忆)
response = agent.chat("推荐一个数据处理库")
assert "Pandas" in response, "长期记忆失败"

print("所有记忆测试通过!")

九、总结与最佳实践

9.1 核心要点回顾

  1. 三层记忆架构缺一不可

    • 短期记忆处理当前对话
    • 工作记忆支持会话恢复
    • 长期记忆实现真正的”记住用户”
  2. Context Window 是稀缺资源

    • 不要填满,要智能压缩
    • 优先保留相关、高置信度信息
    • 使用摘要和检索而非全量加载
  3. 记忆质量比数量重要

    • 置信度机制过滤噪声
    • 用户确认提升可靠性
    • 时间衰减避免过时信息
  4. 一致性需要主动维护

    • 冲突检测与解决
    • 版本控制和更新机制
    • 定期清理和归档

9.2 常见陷阱与解决方案

陷阱 表现 解决方案
记忆过载 检索返回太多无关内容 提高相似度阈值 + 重排序
记忆冲突 新旧信息互相矛盾 时间优先 + 置信度权重
隐私泄露 敏感信息被错误使用 分级存储 + 访问控制
性能瓶颈 向量检索慢 预过滤 + 缓存 + 批处理
冷启动 新用户没有记忆 使用通用画像 + 快速学习

9.3 下一步演进方向

  1. 主动记忆:Agent 主动询问以填补知识空白
  2. 跨用户学习:在保护隐私前提下提取群体模式
  3. 情景化记忆:根据场景动态调整记忆权重
  4. 可解释记忆:让用户知道 Agent 记住了什么,为什么

延伸阅读

代码仓库

本文完整代码示例可在 GitHub 获取:agent-memory-management-examples


记忆是智能的基石。当你的 Agent 能记住用户的名字、偏好、过往对话,它就不再是一个冷冰冰的工具,而开始成为一个真正的数字伙伴。这种从”它”到”你”的转变,正是 AI 产品最珍贵的进化。