本文与《Agent 记忆管理》形成知识体系互补:上午聊记忆,下午聊结构。记忆需要容器,子图就是 Agent 架构的模块化容器。

1. 为什么需要子图?

当你开始构建复杂的 AI Agent 时,很快会遇到一个架构困境:

  • 单个 graph 里节点越来越多,状态管理混乱
  • 不同功能模块耦合在一起,难以独立测试
  • 团队协作时容易冲突,一人改节点影响全局
  • 想要复用某个功能模块到别的项目,得复制粘贴

这就是子图(Subgraph)要解决的问题。子图本质是图的组合——把复杂的 Agent 拆分成多个独立的、可复用的子系统,每个子图负责一块清晰的功能领域。

类比一下:

  • 不用子图 = 把所有代码写在一个 main 函数里
  • 使用子图 = 按职责拆分成多个模块,通过明确的接口协作

2. 子图的核心概念

2.1 父图与子图的关系

graph LR
    subgraph 父图["父图 (Parent Graph)"]
        direction TB
        A[节点 A]
        
        subgraph 子图["子图 (Subgraph)"]
            direction LR
            S1[节点1] --> S2[节点2]
        end
        
        B[节点 B]
    end
    
    A --> 子图
    子图 --> B
    
    style 子图 fill:#e3f2fd
    style A fill:#e8f5e9
    style B fill:#e8f5e9

关系说明:

  • 子图在父图中表现为一个复合节点
  • 从父图视角看,子图只是一个普通节点
  • 子图内部是完整的 StateGraph,可以有自己的状态流转

子图在父图中表现为一个复合节点。从父图视角看,它只是一个普通的节点;从内部视角看,它是一个完整的 StateGraph。

2.2 状态隔离与共享

子图设计的关键决策是:状态如何传递

LangGraph 提供两种模式:

模式 特点 适用场景
共享状态 子图直接使用父图的 State 类型 紧耦合的功能模块
私有状态 子图定义自己的 State,通过输入/输出转换器映射 独立可复用的模块
graph TB
    subgraph 模式一["模式一:共享状态"]
        P1[父图 State] --> S1[子图直接使用]
        S1 --> P1
    end
    
    subgraph 模式二["模式二:私有状态 + 转换器"]
        P2[父图 State] -->|输入转换器| S2[子图 State]
        S2 -->|输出转换器| P2
    end
    
    style 模式一 fill:#ffebee
    style 模式二 fill:#e8f5e9

2.3 子图的生命周期

子图的执行完全由父图控制:

  1. 父图进入子图节点时,子图开始执行
  2. 子图内部可以有多轮迭代(如果有循环边)
  3. 子图完成后,控制权返回父图
  4. 子图的最终状态通过输出转换器映射回父图状态

3. 实战:三种子图设计模式

模式一:共享状态的紧耦合子图

这是最简单的子图模式,适合功能拆分但数据紧密相关的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END
import operator

# 父图和子图共享同一个 State 定义
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
current_step: str
tool_results: list
final_answer: str

# ========== 子图定义 ==========

def search_tool(state: AgentState):
"""模拟搜索工具"""
return {"tool_results": [f"搜索结果: {state['current_step']}"]}

def analyze_results(state: AgentState):
"""分析搜索结果"""
return {"messages": ["分析完成"]}

# 构建子图
subgraph_builder = StateGraph(AgentState)
subgraph_builder.add_node("search", search_tool)
subgraph_builder.add_node("analyze", analyze_results)
subgraph_builder.add_edge(START, "search")
subgraph_builder.add_edge("search", "analyze")
subgraph_builder.add_edge("analyze", END)

research_subgraph = subgraph_builder.compile()

# ========== 父图定义 ==========

def plan_step(state: AgentState):
"""规划执行步骤"""
return {"current_step": "research_topic", "messages": ["开始规划"]}

def generate_answer(state: AgentState):
"""生成最终答案"""
return {"final_answer": "基于研究结果生成的答案"}

# 构建父图,将子图作为节点
parent_builder = StateGraph(AgentState)
parent_builder.add_node("plan", plan_step)
# 关键:直接添加编译后的子图作为节点
parent_builder.add_node("research", research_subgraph)
parent_builder.add_node("generate", generate_answer)

parent_builder.add_edge(START, "plan")
parent_builder.add_edge("plan", "research")
parent_builder.add_edge("research", "generate")
parent_builder.add_edge("generate", END)

parent_graph = parent_builder.compile()

关键点解读:

  • 子图和父图使用完全相同的 AgentState 类型
  • 子图内部可以直接读写父图的状态字段
  • add_node("research", research_subgraph) 直接把编译后的子图当作节点

这种模式的优点是简单直观,缺点是耦合度高——子图不能独立运行,也无法复用到其他有不同 State 定义的项目。

模式二:私有状态的独立子图

更实用的模式是让子图拥有自己的 State 定义,通过转换器实现数据映射。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END

# ========== 父图 State ==========
class ParentState(TypedDict):
query: str
research_summary: str # 只接收子图的输出
final_answer: str

# ========== 子图 State(完全独立) ==========
class ResearchState(TypedDict):
search_query: str
raw_results: list
filtered_results: list
summary: str

# 子图节点实现
def search_web(state: ResearchState):
"""执行网络搜索"""
# 模拟搜索
results = [f"Result for: {state['search_query']}"]
return {"raw_results": results}

def filter_results(state: ResearchState):
"""过滤搜索结果"""
filtered = [r for r in state["raw_results"] if len(r) > 10]
return {"filtered_results": filtered}

def summarize(state: ResearchState):
"""生成摘要"""
summary = f"找到 {len(state['filtered_results'])} 条相关信息"
return {"summary": summary}

# 构建子图
research_builder = StateGraph(ResearchState)
research_builder.add_node("search", search_web)
research_builder.add_node("filter", filter_results)
research_builder.add_node("summarize", summarize)
research_builder.add_edge(START, "search")
research_builder.add_edge("search", "filter")
research_builder.add_edge("filter", "summarize")
research_builder.add_edge("summarize", END)

# ========== 转换器函数 ==========

def research_input_transform(parent_state: ParentState) -> ResearchState:
"""将父图状态转换为子图输入"""
return {
"search_query": parent_state["query"],
"raw_results": [],
"filtered_results": [],
"summary": ""
}

def research_output_transform(subgraph_state: ResearchState) -> dict:
"""将子图输出转换回父图状态更新"""
return {
"research_summary": subgraph_state["summary"]
}

# ========== 父图定义 ==========

def generate_response(state: ParentState):
"""基于研究结果生成回复"""
return {
"final_answer": f"根据研究: {state['research_summary']}"
}

# 构建父图
parent_builder = StateGraph(ParentState)
parent_builder.add_node("generate", generate_response)

# 添加子图节点,使用转换器
parent_builder.add_node(
"research",
research_builder.compile(),
input=research_input_transform,
output=research_output_transform
)

parent_builder.add_edge(START, "research")
parent_builder.add_edge("research", "generate")
parent_builder.add_edge("generate", END)

parent_graph = parent_builder.compile()

设计优势:

  1. 完全独立ResearchState 可以独立测试,不依赖父图
  2. 可复用:这个研究子图可以用到任何需要搜索+摘要功能的 Agent
  3. 类型安全:编译期就能发现状态映射错误
  4. 清晰的契约:输入/输出转换器明确了子图的数据边界

模式三:多级嵌套子图

复杂场景下,子图内部还可以嵌套更多子图,形成层级结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END

# ========== 最底层:工具执行子图 ==========
class ToolState(TypedDict):
tool_name: str
tool_input: str
tool_output: str

def execute_tool(state: ToolState):
return {"tool_output": f"Executed {state['tool_name']}"}

tool_subgraph = (
StateGraph(ToolState)
.add_node("execute", execute_tool)
.add_edge(START, "execute")
.add_edge("execute", END)
.compile()
)

# ========== 中间层:工具选择与执行子图 ==========
class ToolManagerState(TypedDict):
available_tools: list
user_request: str
selected_tool: str
tool_input: str
tool_result: str

def select_tool(state: ToolManagerState):
# 简单的工具选择逻辑
return {"selected_tool": state["available_tools"][0]}

def prepare_input(state: ToolManagerState):
return {"tool_input": state["user_request"]}

# 工具执行节点(嵌套最底层子图)
def run_tool(state: ToolManagerState):
# 这里可以嵌套 tool_subgraph
# 为简化示例,直接返回结果
return {"tool_result": f"Result from {state['selected_tool']}"}

tool_manager_subgraph = (
StateGraph(ToolManagerState)
.add_node("select", select_tool)
.add_node("prepare", prepare_input)
.add_node("run", run_tool)
.add_edge(START, "select")
.add_edge("select", "prepare")
.add_edge("prepare", "run")
.add_edge("run", END)
.compile()
)

# ========== 最顶层:主 Agent 图 ==========
class MainState(TypedDict):
user_input: str
tools_needed: bool
final_response: str

def analyze_intent(state: MainState):
return {"tools_needed": "search" in state["user_input"].lower()}

def direct_response(state: MainState):
return {"final_response": "直接回复用户"}

def compose_response(state: MainState):
return {"final_response": "基于工具结果回复"}

# 构建主图
main_builder = StateGraph(MainState)
main_builder.add_node("analyze", analyze_intent)
main_builder.add_node("direct", direct_response)
main_builder.add_node("compose", compose_response)
# 嵌套中间层子图
main_builder.add_node("tool_manager", tool_manager_subgraph)

main_builder.add_edge(START, "analyze")
main_builder.add_conditional_edges(
"analyze",
lambda s: "tool_manager" if s["tools_needed"] else "direct",
{"tool_manager": "tool_manager", "direct": "direct"}
)
main_builder.add_edge("tool_manager", "compose")
main_builder.add_edge("direct", END)
main_builder.add_edge("compose", END)

main_graph = main_builder.compile()

层级架构的价值:

1
2
3
4
5
6
7
8
9
10
11
主图 (Main Graph)
├── 意图分析
├── 条件分支
│ ├── 直接回复路径
│ └── 工具调用路径
│ └── 工具管理子图 (Tool Manager)
│ ├── 工具选择
│ ├── 输入准备
│ └── 工具执行
│ └── 工具执行子图 (Tool Executor)
└── 响应生成

每一层只关心自己的职责,不关心下层实现细节。

4. 状态传递的进阶技巧

4.1 状态分片(State Slicing)

当子图只需要父图的部分状态时,可以用切片减少数据传输。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class ParentState(TypedDict):
messages: list
user_profile: dict
system_config: dict
metadata: dict

class SimpleSubState(TypedDict):
messages: list # 只需要 messages

def slice_input(parent: ParentState) -> SimpleSubState:
return {"messages": parent["messages"]}

def merge_output(sub: SimpleSubState, parent: ParentState) -> dict:
# 合并时保留父图其他字段
return {"messages": sub["messages"]}

4.2 异步子图执行

LangGraph 支持并行执行多个子图(如果它们之间没有依赖)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from langgraph.graph import StateGraph, START, END
import asyncio

# 定义可以并行执行的子图
research_subgraph = build_research_subgraph()
analysis_subgraph = build_analysis_subgraph()

parent_builder = StateGraph(ParentState)
parent_builder.add_node("research", research_subgraph)
parent_builder.add_node("analysis", analysis_subgraph)

# 从同一个节点分叉到两个子图
parent_builder.add_edge(START, "research")
parent_builder.add_edge(START, "analysis") # 并行启动

# 使用汇合节点等待两个子图完成
parent_builder.add_node("merge", merge_results)
parent_builder.add_edge("research", "merge")
parent_builder.add_edge("analysis", "merge")
parent_builder.add_edge("merge", END)

注意:LangGraph 的并行执行依赖底层的异步支持,不是所有执行器都支持真正的并行。

4.3 状态持久化与检查点

子图也可以有自己的检查点(checkpoint),实现断点续传。

1
2
3
4
5
6
7
8
9
10
11
12
from langgraph.checkpoint.memory import MemorySaver

# 子图带检查点
research_subgraph = research_builder.compile(
checkpointer=MemorySaver(),
interrupt_before=["summarize"] # 在摘要前暂停
)

# 父图带检查点
parent_graph = parent_builder.compile(
checkpointer=MemorySaver()
)

这样即使父图被打断,子图的执行状态也能被恢复。

5. 子图设计的最佳实践

5.1 何时使用子图?

情况 建议
单个 graph 节点超过 10 个 考虑拆分
某个功能需要独立迭代 使用子图
多个 Agent 需要相同功能 提取为可复用子图
团队协作开发 按子图分配模块
需要独立测试的功能 封装成子图

5.2 设计原则

1. 单一职责原则

每个子图只负责一个明确的功能领域。例如:

  • research_subgraph:搜索+摘要
  • code_gen_subgraph:代码生成+验证
  • memory_subgraph:记忆检索+存储

2. 显式接口优于隐式共享

1
2
3
4
5
6
7
8
9
10
# 不好的设计:依赖父图状态的具体结构
def bad_node(state):
return {"result": state["some_field"]["nested"]["value"]}

# 好的设计:通过转换器明确输入
def good_input_transform(parent):
return {"value": parent["some_field"]["nested"]["value"]}

def good_node(state):
return {"result": state["value"]}

3. 避免深层嵌套

层级过多会增加理解成本:

  • 1-2 层:推荐
  • 3 层:谨慎使用,需要充分文档
  • 4 层以上:考虑重构

4. 子图应该是可测试的

1
2
3
4
5
6
7
8
9
def test_research_subgraph():
subgraph = build_research_subgraph()
result = subgraph.invoke({
"search_query": "test",
"raw_results": [],
"filtered_results": [],
"summary": ""
})
assert "summary" in result

5.3 常见陷阱

陷阱 1:循环依赖

1
2
# 错误:子图 A 调用子图 B,子图 B 又调用子图 A
# 这会导致编译错误或运行时栈溢出

陷阱 2:状态类型不匹配

1
2
3
4
5
class ParentState(TypedDict):
count: int

class SubState(TypedDict):
count: str # 类型不匹配!

陷阱 3:忘记处理子图的错误

1
2
3
4
5
6
# 子图可能抛出异常,需要在父图处理
def safe_subgraph_wrapper(state):
try:
return subgraph.invoke(state)
except Exception as e:
return {"error": str(e), "fallback_result": "default"}

6. 实战案例:构建模块化 RAG Agent

让我们用一个完整的案例来演示子图设计的威力:构建一个模块化的 RAG(检索增强生成)Agent。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
from typing import Annotated, TypedDict, List
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
import operator

# ========== 子图 1:查询理解 ==========
class QueryUnderstandingState(TypedDict):
raw_query: str
intent: str
entities: List[str]
rewritten_query: str

def analyze_intent(state: QueryUnderstandingState):
# 使用 LLM 分析意图
return {"intent": "information_seeking"}

def extract_entities(state: QueryUnderstandingState):
return {"entities": ["entity1", "entity2"]}

def rewrite_query(state: QueryUnderstandingState):
return {"rewritten_query": f"优化后的: {state['raw_query']}"}

query_understanding_graph = (
StateGraph(QueryUnderstandingState)
.add_node("intent", analyze_intent)
.add_node("entities", extract_entities)
.add_node("rewrite", rewrite_query)
.add_edge(START, "intent")
.add_edge("intent", "entities")
.add_edge("entities", "rewrite")
.add_edge("rewrite", END)
.compile()
)

# ========== 子图 2:文档检索 ==========
class RetrievalState(TypedDict):
query: str
retrieved_docs: List[dict]
ranked_docs: List[dict]

def retrieve_docs(state: RetrievalState):
# 调用向量数据库
return {"retrieved_docs": [{"id": 1, "content": "..."}]}

def rerank_docs(state: RetrievalState):
# 使用重排序模型
return {"ranked_docs": sorted(state["retrieved_docs"], key=lambda x: x["score"])}

retrieval_graph = (
StateGraph(RetrievalState)
.add_node("retrieve", retrieve_docs)
.add_node("rerank", rerank_docs)
.add_edge(START, "retrieve")
.add_edge("retrieve", "rerank")
.add_edge("rerank", END)
.compile()
)

# ========== 子图 3:响应生成 ==========
class GenerationState(TypedDict):
query: str
context: List[dict]
draft_response: str
validated_response: str

def generate_draft(state: GenerationState):
# 使用 LLM 生成初稿
context_text = "\n".join([d["content"] for d in state["context"]])
return {"draft_response": f"基于以下上下文: {context_text[:100]}..."}

def validate_response(state: GenerationState):
# 验证响应是否包含引用
if "来源" in state["draft_response"]:
return {"validated_response": state["draft_response"]}
return {"validated_response": state["draft_response"] + "\n\n(无明确来源)"}

generation_graph = (
StateGraph(GenerationState)
.add_node("draft", generate_draft)
.add_node("validate", validate_response)
.add_edge(START, "draft")
.add_edge("draft", "validate")
.add_edge("validate", END)
.compile()
)

# ========== 父图:RAG Agent ==========
class RAGState(TypedDict):
messages: Annotated[list, operator.add]
current_query: str
understood_query: str
retrieved_context: List[dict]
final_response: str

# 转换器
def query_input_transform(state: RAGState) -> QueryUnderstandingState:
return {
"raw_query": state["current_query"],
"intent": "",
"entities": [],
"rewritten_query": ""
}

def query_output_transform(sub_state: QueryUnderstandingState) -> dict:
return {"understood_query": sub_state["rewritten_query"]}

def retrieval_input_transform(state: RAGState) -> RetrievalState:
return {
"query": state["understood_query"],
"retrieved_docs": [],
"ranked_docs": []
}

def retrieval_output_transform(sub_state: RetrievalState) -> dict:
return {"retrieved_context": sub_state["ranked_docs"]}

def generation_input_transform(state: RAGState) -> GenerationState:
return {
"query": state["understood_query"],
"context": state["retrieved_context"],
"draft_response": "",
"validated_response": ""
}

def generation_output_transform(sub_state: GenerationState) -> dict:
return {
"final_response": sub_state["validated_response"],
"messages": [sub_state["validated_response"]]
}

# 构建父图
rag_builder = StateGraph(RAGState)

# 添加子图节点
rag_builder.add_node(
"understand",
query_understanding_graph,
input=query_input_transform,
output=query_output_transform
)
rag_builder.add_node(
"retrieve",
retrieval_graph,
input=retrieval_input_transform,
output=retrieval_output_transform
)
rag_builder.add_node(
"generate",
generation_graph,
input=generation_input_transform,
output=generation_output_transform
)

rag_builder.add_edge(START, "understand")
rag_builder.add_edge("understand", "retrieve")
rag_builder.add_edge("retrieve", "generate")
rag_builder.add_edge("generate", END)

rag_agent = rag_builder.compile(checkpointer=MemorySaver())

# 使用示例
result = rag_agent.invoke(
{
"messages": [],
"current_query": "LangGraph 子图如何设计?",
"understood_query": "",
"retrieved_context": [],
"final_response": ""
},
config={"configurable": {"thread_id": "user_123"}}
)
print(result["final_response"])

这个架构的优势:

  1. 模块独立:每个子图可以独立开发和测试
  2. 职责清晰:查询理解、检索、生成各自独立
  3. 可替换性:想换检索算法?只改 retrieval_graph
  4. 可观察性:每个阶段都是明确的节点,易于监控
graph LR
    subgraph RAGAgent[模块化 RAG Agent]
        direction TB
        
        subgraph 查询理解[查询理解子图]
            QU1[分析意图]
            QU2[提取实体]
            QU3[重写查询]
        end
        
        subgraph 文档检索[文档检索子图]
            DR1[向量检索]
            DR2[重排序]
        end
        
        subgraph 响应生成[响应生成子图]
            RG1[生成初稿]
            RG2[验证响应]
        end
        
        QU1 --> QU2 --> QU3
        QU3 --> DR1 --> DR2
        DR2 --> RG1 --> RG2
    end
    
    输入[用户查询] --> 查询理解
    响应生成 --> 输出[最终响应]
    
    style 查询理解 fill:#e3f2fd
    style 文档检索 fill:#fff3e0
    style 响应生成 fill:#e8f5e9

7. 子图与记忆系统的集成

结合上午的《Agent 记忆管理》,子图架构与记忆系统可以形成完美配合:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class RAGState(TypedDict):
messages: Annotated[list, operator.add]
# 短期记忆(当前对话上下文)
short_term_memory: str
# 长期记忆检索结果
long_term_memories: List[dict]

# 记忆检索子图
class MemoryRetrievalState(TypedDict):
query: str
user_id: str
retrieved_memories: List[dict]

def retrieve_long_term_memory(state: MemoryRetrievalState):
# 从向量存储检索用户相关信息
memories = vector_store.similarity_search(
state["query"],
filter={"user_id": state["user_id"]}
)
return {"retrieved_memories": memories}

memory_subgraph = (
StateGraph(MemoryRetrievalState)
.add_node("retrieve", retrieve_long_term_memory)
.add_edge(START, "retrieve")
.add_edge("retrieve", END)
.compile()
)

# 在主图中集成记忆子图
main_builder.add_node(
"memory_retrieval",
memory_subgraph,
input=lambda s: {"query": s["current_query"], "user_id": s["user_id"], "retrieved_memories": []},
output=lambda s: {"long_term_memories": s["retrieved_memories"]}
)

# 生成节点可以同时使用检索到的文档和用户记忆
def generate_with_memory(state: MainState):
context = {
"docs": state["retrieved_context"],
"memories": state["long_term_memories"]
}
# 生成时同时考虑文档和记忆
response = llm.generate(state["current_query"], context)
return {"final_response": response}

架构总结:

graph TB
    subgraph RAGAgent[RAG Agent 完整架构]
        direction TB
        
        QU[查询理解子图]
        DR[文档检索子图]
        MR[记忆检索子图<br/>长期记忆上下文]
        RG[响应生成子图<br/>融合多来源]
        
        QU --> DR
        QU --> MR
        DR --> RG
        MR --> RG
    end
    
    输入[用户查询] --> QU
    RG --> 输出[最终响应]
    
    style QU fill:#e3f2fd
    style DR fill:#fff3e0
    style MR fill:#fce4ec
    style RG fill:#e8f5e9

架构说明:

  • 查询理解:解析用户意图,重写查询
  • 文档检索:从知识库检索相关文档
  • 记忆检索:从长期记忆中检索用户相关信息
  • 响应生成:融合文档和记忆,生成个性化响应

8. 性能考虑

8.1 子图编译开销

每个子图的 compile() 都有一定开销。如果子图会被频繁调用,考虑:

1
2
3
4
5
# 在模块级别编译,避免重复编译
RESEARCH_SUBGRAPH = build_research_subgraph()

# 父图直接使用预编译的子图
parent_builder.add_node("research", RESEARCH_SUBGRAPH)

8.2 状态复制的成本

使用独立 State 时,转换器会创建新的状态对象。如果状态很大(如大量消息历史),注意性能:

1
2
3
4
5
6
7
8
9
10
# 高效:只复制需要的数据
def efficient_transform(parent):
return {
"messages": parent["messages"][-5:], # 只取最近5条
"query": parent["current_query"]
}

# 低效:复制整个状态
def inefficient_transform(parent):
return {**parent} # 复制所有字段

8.3 调试技巧

使用 LangGraph Studio 或打印状态变化来调试子图:

1
2
3
4
5
6
7
8
9
def debug_node(name):
def wrapper(state):
print(f"[{name}] Input: {state}")
return state
return wrapper

subgraph_builder.add_node("debug_in", debug_node("subgraph_input"))
subgraph_builder.add_edge(START, "debug_in")
subgraph_builder.add_edge("debug_in", "actual_node")

9. 总结

子图是 LangGraph 提供的强大抽象,让你能像搭积木一样构建复杂 Agent:

  1. 共享状态模式适合快速原型,耦合但简单
  2. 独立状态模式适合生产环境,解耦且可复用
  3. 多级嵌套可以构建复杂但清晰的分层架构
  4. 与记忆系统结合可以创建真正有”上下文意识”的 Agent

关键要点:

  • 子图不仅是代码组织工具,更是架构设计工具
  • 显式的输入/输出转换器是子图的核心价值所在
  • 设计子图时要考虑可测试性和可复用性
  • 避免过深层级和循环依赖

与上午的《Agent 记忆管理》一起,你现在掌握了构建复杂 Agent 的两个核心维度:

  • 记忆 = Agent 的”知识”
  • 子图 = Agent 的”结构”

两者结合,才能构建真正可用、可维护、可扩展的 AI Agent 系统。


文章配图关键词: technology,network,structure,artificial-intelligence,architecture

发布时间: 2026-02-24 15:00:00 UTC