单体 Agent 的能力边界正在被触及。当任务复杂度超过单一模型的上下文窗口,当工具选择数量达到数十个,当需要不同领域的专业知识协同工作时,多智能体架构成为必然选择。

多智能体系统的核心洞察是:分解带来能力。就像人类团队通过分工协作完成复杂项目,AI Agent 也可以通过专业化分工和结构化协作,突破单体架构的能力天花板。

多智能体架构的核心问题

设计多智能体系统时,需要回答两个根本性问题:

问题 设计考量 影响
谁是独立的 Agent? 职责边界、工具集合、提示词设计 系统的模块化程度
Agent 如何连接? 通信机制、路由策略、状态共享 系统的协作效率

这两个问题的答案决定了系统的拓扑结构。LangGraph 的状态图架构为这些问题提供了优雅的解决框架:每个 Agent 是图中的一个节点,节点之间的边定义了控制流和通信路径。

为什么多智能体更有效

“如果一个 Agent 都做不好,为什么多个 Agent 会有效?”这是一个常见的质疑。答案在于专业化带来的精度提升

1. 工具聚焦效应

当 Agent 需要从一个庞大的工具库中选择时,错误选择的概率随工具数量增加。研究表明,当可选工具超过 20 个时,Agent 的工具选择准确率显著下降。

多智能体架构通过职责分组,让每个 Agent 只接触相关的工具子集:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 单体 Agent:需要处理所有工具
class MonolithicAgent:
tools = [search_tool, sql_tool, python_tool, email_tool,
calendar_tool, crm_tool, analytics_tool, ...] # 30+ 工具

# 多智能体:每个 Agent 专注特定领域
class ResearchAgent:
tools = [search_tool, web_scraper, document_parser]

class DataAgent:
tools = [sql_tool, python_tool, analytics_tool]

class CommunicationAgent:
tools = [email_tool, calendar_tool, notification_tool]

2. 提示词专业化

每个 Agent 可以有独立的系统提示词和少样本示例,针对特定任务优化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
research_agent_prompt = """你是一位专业的研究分析师。
你的职责是:
1. 搜索并收集相关信息
2. 验证信息来源的可靠性
3. 整理关键发现和数据

示例:
用户问:"2024年电动车市场份额"
思考:需要查找最新的市场研究报告
行动:使用搜索工具查找权威数据
观察:找到 McKinsey 报告...
"""

analysis_agent_prompt = """你是一位数据分析师。
你的职责是:
1. 理解研究 Agent 提供的原始数据
2. 进行统计分析
3. 生成可视化洞察

注意:只处理已经验证的数据,不进行新的搜索。
"""

3. 独立评估与优化

模块化架构允许单独评估和改进每个 Agent,而不会影响整个系统:

1
2
3
4
5
6
7
8
9
10
11
12
# 独立测试 Research Agent
research_result = await research_agent.ainvoke(
{"query": "测试查询"},
config={"callbacks": [evaluation_callback]}
)

# 单独优化 Data Agent 的提示词
analysis_agent = create_react_agent(
model=llm,
tools=analysis_tools,
prompt=optimized_analysis_prompt # 独立优化
)

LangGraph 多智能体模式

LangGraph 提供了三种核心多智能体协作模式,每种适用于不同的场景。

模式一:协作式多智能体(Collaboration)

适用场景:需要紧密协作、信息共享的任务,如联合研究、头脑风暴。

核心特征

  • 所有 Agent 共享同一个消息 scratchpad
  • 每个 Agent 都能看到其他 Agent 的完整思考过程
  • 通过路由器决定下一个由哪个 Agent 执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from langgraph.graph import StateGraph, MessagesState
from langgraph.prebuilt import create_react_agent
from typing import Literal

# 创建专业 Agent
research_agent = create_react_agent(
model=gpt4,
tools=[search_tool, web_fetch_tool],
prompt="你是一个研究专家,负责收集信息。"
)

analysis_agent = create_react_agent(
model=gpt4,
tools=[python_tool, calculator_tool],
prompt="你是一个数据分析专家,负责处理和解释数据。"
)

writing_agent = create_react_agent(
model=gpt4,
tools=[document_tool],
prompt="你是一个写作专家,负责生成清晰、结构化的报告。"
)

def router(state: MessagesState) -> Literal["researcher", "analyst", "writer", "__end__"]:
"""
根据当前状态决定下一个执行者
"""
last_message = state["messages"][-1]

# 如果显式标记了下一个执行者
if hasattr(last_message, 'name') and last_message.name:
return last_message.name

# 基于内容分析决定
content = last_message.content.lower()

if "需要更多数据" in content or "搜索" in content:
return "researcher"
elif "计算" in content or "分析" in content:
return "analyst"
elif "撰写" in content or "生成报告" in content:
return "writer"
elif "完成" in content or "结束" in content:
return "__end__"

# 默认继续当前流程
return "analyst"

# 构建协作图
builder = StateGraph(MessagesState)
builder.add_node("researcher", research_agent)
builder.add_node("analyst", analysis_agent)
builder.add_node("writer", writing_agent)

# 添加条件边
builder.add_conditional_edges("researcher", router)
builder.add_conditional_edges("analyst", router)
builder.add_conditional_edges("writer", router)

builder.set_entry_point("researcher")
collaboration_graph = builder.compile()

协作模式的优势

  • 透明度高:所有思考过程可见,便于调试和审计
  • 灵活性强:Agent 可以根据上下文动态调整策略
  • 知识共享:一个 Agent 的发现立即对所有 Agent 可用

协作模式的挑战

  • 上下文膨胀:共享 scratchpad 可能变得冗长
  • 角色混淆:Agent 可能模仿其他 Agent 的风格而非坚持自己的角色
  • 决策延迟:需要路由器持续判断下一步,增加延迟

模式二:监督者模式(Supervisor)

适用场景:任务需要明确的分阶段执行,如软件开发生命周期、业务流程自动化。

核心特征

  • 每个 Agent 有独立的内部状态
  • Supervisor Agent 负责协调和路由
  • 工作成果通过结构化消息传递
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from operator import add

class TeamState(TypedDict):
"""团队共享状态"""
task: str
plan: list[str]
current_step: int
results: Annotated[list[dict], add] # 各 Agent 的结果
final_output: str

class WorkerOutput(TypedDict):
"""Worker Agent 的输出格式"""
worker_name: str
task_description: str
result: str
status: Literal["success", "failed", "needs_retry"]

def create_worker_node(agent, name: str):
"""创建 Worker 节点"""
def worker_node(state: TeamState):
# 获取当前任务步骤
current_task = state["plan"][state["current_step"]]

# 调用 Agent
result = agent.invoke({
"task": current_task,
"context": state["results"]
})

return {
"results": [{
"worker_name": name,
"task_description": current_task,
"result": result["output"],
"status": "success"
}],
"current_step": state["current_step"] + 1
}

return worker_node

# 创建专业 Worker
requirements_agent = create_react_agent(
model=gpt4,
tools=[document_reader, stakeholder_interview_tool],
prompt="你是需求分析专家。收集并澄清用户需求,输出结构化需求文档。"
)

design_agent = create_react_agent(
model=gpt4,
tools=[architecture_design_tool, diagram_generator],
prompt="你是系统架构师。基于需求设计技术方案,输出架构图和设计文档。"
)

code_agent = create_react_agent(
model=gpt4,
tools=[code_generator, linter, test_generator],
prompt="你是资深开发者。根据设计文档生成高质量代码,包括单元测试。"
)

def supervisor_node(state: TeamState):
"""
Supervisor 负责:
1. 制定执行计划
2. 监控执行进度
3. 决定下一步行动
4. 整合最终结果
"""
# 如果还没有计划,制定计划
if not state["plan"]:
plan = create_execution_plan(state["task"])
return {"plan": plan, "current_step": 0}

# 检查是否完成所有步骤
if state["current_step"] >= len(state["plan"]):
# 整合结果
final_output = synthesize_results(state["results"])
return {"final_output": final_output}

# 检查上一步结果
if state["results"]:
last_result = state["results"][-1]
if last_result["status"] == "failed":
# 处理失败,可能需要重试或调整
return handle_failure(state)

# 继续执行下一步
return {}

def route_by_plan(state: TeamState) -> str:
"""根据当前步骤决定路由到哪个 Worker"""
step_to_worker = {
0: "requirements",
1: "design",
2: "code",
3: "test",
4: "deploy"
}

current_step = state["current_step"]
if current_step >= len(state["plan"]):
return END

return step_to_worker.get(current_step, "supervisor")

# 构建监督者图
builder = StateGraph(TeamState)
builder.add_node("supervisor", supervisor_node)
builder.add_node("requirements", create_worker_node(requirements_agent, "requirements"))
builder.add_node("design", create_worker_node(design_agent, "design"))
builder.add_node("code", create_worker_node(code_agent, "code"))

builder.set_entry_point("supervisor")
builder.add_conditional_edges("supervisor", route_by_plan)
builder.add_edge("requirements", "supervisor")
builder.add_edge("design", "supervisor")
builder.add_edge("code", "supervisor")

supervisor_graph = builder.compile()

监督者模式的优势

  • 清晰的控制流:执行顺序显式定义,可预测
  • 状态隔离:每个 Worker 的内部工作不污染共享状态
  • 便于监控:Supervisor 可以统一收集指标和日志

监督者模式的挑战

  • 单点瓶颈:Supervisor 成为性能瓶颈和故障点
  • 灵活性有限:预设的执行路径难以应对意外情况
  • 上下文丢失:Worker 之间通过结构化消息通信,可能丢失细节

模式三:层级智能体团队(Hierarchical Teams)

适用场景:超大规模任务,需要多层分解和协调,如企业级项目管理、复杂研究项目。

核心特征

  • Agent 本身可以是另一个 LangGraph
  • 支持无限层级嵌套
  • 父级 Agent 协调多个子级团队
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from langgraph.graph import StateGraph
from typing import TypedDict, List

class EnterpriseState(TypedDict):
project_goal: str
department_results: dict
consolidated_report: str
executive_summary: str

# ============ 第一层:部门级子图 ============

def create_department_team(department_name: str):
"""创建部门级团队子图"""

class DepartmentState(TypedDict):
department: str
tasks: List[str]
assigned_workers: List[str]
deliverables: List[str]
status: str

def department_supervisor(state: DepartmentState):
"""部门主管:分配任务给团队成员"""
assignments = assign_to_workers(state["tasks"], state["assigned_workers"])
return {"assignments": assignments}

def worker_execution(state: DepartmentState):
"""团队成员执行具体任务"""
results = []
for task in state["tasks"]:
result = execute_task(task)
results.append(result)
return {"deliverables": results, "status": "completed"}

builder = StateGraph(DepartmentState)
builder.add_node("supervisor", department_supervisor)
builder.add_node("workers", worker_execution)
builder.add_edge("supervisor", "workers")
builder.set_entry_point("supervisor")

return builder.compile()

# 创建各部门团队
engineering_team = create_department_team("Engineering")
marketing_team = create_department_team("Marketing")
sales_team = create_department_team("Sales")

# ============ 第二层:企业级协调图 ============

def enterprise_planner(state: EnterpriseState):
"""
企业级规划器:
1. 理解整体项目目标
2. 分解为部门级子目标
3. 确定部门间依赖关系
"""
department_goals = {
"engineering": "开发新产品功能",
"marketing": "制定上市策略",
"sales": "准备销售渠道"
}
return {"department_goals": department_goals}

def department_router(state: EnterpriseState):
"""将工作路由到相应部门"""
# 并行调用所有部门
return ["engineering", "marketing", "sales"]

def engineering_subgraph(state: EnterpriseState):
"""工程部门执行"""
result = engineering_team.invoke({
"department": "Engineering",
"tasks": ["架构设计", "核心开发", "测试覆盖"],
"assigned_workers": ["architect", "senior_dev", "qa_lead"]
})
return {"department_results": {"engineering": result}}

def marketing_subgraph(state: EnterpriseState):
"""市场部门执行"""
result = marketing_team.invoke({
"department": "Marketing",
"tasks": ["竞品分析", "定位策略", "营销素材"],
"assigned_workers": ["analyst", "strategist", "designer"]
})
return {"department_results": {"marketing": result}}

def sales_subgraph(state: EnterpriseState):
"""销售部门执行"""
result = sales_team.invoke({
"department": "Sales",
"tasks": ["渠道准备", "销售培训", "客户沟通"],
"assigned_workers": ["channel_mgr", "trainer", "account_exec"]
})
return {"department_results": {"sales": result}}

def consolidation_node(state: EnterpriseState):
"""整合各部门结果"""
engineering_out = state["department_results"]["engineering"]
marketing_out = state["department_results"]["marketing"]
sales_out = state["department_results"]["sales"]

consolidated = f"""
# 项目综合报告

## 工程部门
{engineering_out["deliverables"]}

## 市场部门
{marketing_out["deliverables"]}

## 销售部门
{sales_out["deliverables"]}
"""

return {"consolidated_report": consolidated}

def executive_summary_node(state: EnterpriseState):
"""生成高管摘要"""
summary = generate_executive_summary(state["consolidated_report"])
return {"executive_summary": summary}

# 构建企业级层级图
builder = StateGraph(EnterpriseState)
builder.add_node("planner", enterprise_planner)
builder.add_node("router", department_router)
builder.add_node("engineering", engineering_subgraph)
builder.add_node("marketing", marketing_subgraph)
builder.add_node("sales", sales_subgraph)
builder.add_node("consolidation", consolidation_node)
builder.add_node("executive_summary", executive_summary_node)

builder.set_entry_point("planner")
builder.add_edge("planner", "router")
# 并行执行所有部门
builder.add_edge("router", "engineering")
builder.add_edge("router", "marketing")
builder.add_edge("router", "sales")
# 等待所有部门完成后整合
builder.add_edge(["engineering", "marketing", "sales"], "consolidation")
builder.add_edge("consolidation", "executive_summary")

enterprise_graph = builder.compile()

层级模式的优势

  • 可扩展性:支持任意复杂度的任务分解
  • 模块化复用:子图可以在不同项目中重复使用
  • 并行执行:不同部门/团队可以并行工作

层级模式的挑战

  • 协调复杂度高:层级越深,协调开销越大
  • 上下文传递:信息在不同层级间传递可能失真
  • 调试困难:多层嵌套使问题定位复杂

多智能体通信模式

无论采用哪种架构模式,Agent 之间的通信机制都是关键设计决策。

模式 A:消息传递(Message Passing)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from typing import TypedDict
from datetime import datetime

class Message(TypedDict):
sender: str
recipient: str
content: str
message_type: Literal["task", "response", "query", "notification"]
timestamp: datetime
priority: int # 1-5, 5 为最高
attachments: List[dict] # 附加数据

class MessageBus:
"""消息总线:Agent 间通信的中介"""

def __init__(self):
self.messages: List[Message] = []
self.subscribers: Dict[str, Callable] = {}

def subscribe(self, agent_name: str, handler: Callable):
"""Agent 订阅消息"""
self.subscribers[agent_name] = handler

def publish(self, message: Message):
"""发布消息"""
self.messages.append(message)

# 通知接收者
if message["recipient"] in self.subscribers:
self.subscribers[message["recipient"]](message)

def get_messages_for(self, agent_name: str) -> List[Message]:
"""获取 Agent 的待处理消息"""
return [m for m in self.messages
if m["recipient"] == agent_name and not m.get("read")]

模式 B:共享状态(Shared State)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from typing import TypedDict, Annotated
from operator import add

class SharedWorkspace(TypedDict):
"""共享工作空间:所有 Agent 共同维护"""
# 任务相关
current_task: str
task_history: Annotated[list, add]

# 知识相关
shared_knowledge: dict # 发现的事实、数据
hypotheses: list # 待验证的假设
conclusions: list # 已验证的结论

# 协作相关
active_agents: list # 当前活跃的 Agent
handoff_queue: list # 等待接手的任务

# 元数据
created_at: datetime
last_updated: datetime

def update_shared_workspace(state: SharedWorkspace, update: dict):
"""原子化更新共享状态"""
new_state = {**state}

# 应用更新
for key, value in update.items():
if key in ["task_history", "hypotheses", "conclusions"]:
new_state[key] = state.get(key, []) + [value]
else:
new_state[key] = value

new_state["last_updated"] = datetime.now()
return new_state

模式 C:函数调用(Function Calling)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from langchain_core.tools import tool

@tool
def delegate_to_agent(agent_name: str, task: str, context: str) -> str:
"""
将任务委托给其他 Agent

Args:
agent_name: 目标 Agent 的名称
task: 具体任务描述
context: 相关上下文信息
"""
agent_registry = {
"researcher": research_agent,
"analyst": analysis_agent,
"writer": writing_agent
}

if agent_name not in agent_registry:
return f"Error: Unknown agent '{agent_name}'"

target_agent = agent_registry[agent_name]
result = target_agent.invoke({
"task": task,
"context": context
})

return result["output"]

@tool
def request_clarification(from_agent: str, question: str) -> str:
"""
向另一个 Agent 请求澄清

Args:
from_agent: 要询问的 Agent
question: 具体问题
"""
# 将问题添加到消息队列
return f"已向 {from_agent} 发送澄清请求: {question}"

# 在 Agent 中使用
collaboration_tools = [
delegate_to_agent,
request_clarification,
# ... 其他工具
]

实战:构建研究助手团队

让我们将理论付诸实践,构建一个完整的多智能体研究助手系统。

系统架构

1
2
3
4
5
6
7
8
9
10
11
12
┌─────────────────────────────────────────────────────────────┐
│ Supervisor Agent │
│ (路由 + 质量控制) │
└──────────────────┬──────────────────────────────────────────┘

┌──────────┼──────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Research │ │ Analysis │ │ Writing │
│ Agent │ │ Agent │ │ Agent │
└──────────┘ └──────────┘ └──────────┘

完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import create_react_agent
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from operator import add
import operator

class ResearchState(TypedDict):
"""研究团队共享状态"""
messages: Annotated[Sequence[BaseMessage], add]
research_findings: Annotated[list, add]
analysis_insights: list
final_report: str
next_step: str
iteration_count: int

# ============ Agent 定义 ============

research_agent = create_react_agent(
model=gpt4,
tools=[search_tool, web_fetch_tool, news_search_tool],
prompt="""你是专业的研究分析师。你的职责是:
1. 使用搜索工具收集相关信息
2. 验证来源的可靠性和时效性
3. 提取关键事实和数据
4. 将发现记录到 shared_state

当你找到足够信息后,明确声明 "RESEARCH_COMPLETE" 并总结关键发现。
"""
)

analysis_agent = create_react_agent(
model=gpt4,
tools=[python_tool, trend_analyzer, comparison_tool],
prompt="""你是数据分析师。你的职责是:
1. 分析 Research Agent 提供的数据
2. 识别趋势、模式和异常
3. 进行统计计算和比较分析
4. 生成可操作的洞察

当你完成分析后,明确声明 "ANALYSIS_COMPLETE" 并提供关键洞察。
"""
)

writing_agent = create_react_agent(
model=gpt4,
tools=[outline_generator, citation_formatter],
prompt="""你是专业报告撰写人。你的职责是:
1. 基于研究发现和分析洞察生成结构化报告
2. 确保逻辑清晰、论证有力
3. 正确引用数据来源
4. 生成执行摘要

报告应包括:执行摘要、主要发现、详细分析、结论与建议。
"""
)

# ============ Supervisor 逻辑 ============

def supervisor_node(state: ResearchState):
"""
Supervisor 协调整个研究流程:
1. 评估当前状态
2. 决定下一步行动
3. 检查是否需要迭代
"""
messages = state["messages"]
last_message = messages[-1].content if messages else ""

# 检查是否完成
if "REPORT_COMPLETE" in last_message and state["final_report"]:
return {"next_step": END}

# 检查迭代次数,防止无限循环
if state.get("iteration_count", 0) > 10:
return {"next_step": "writer"} # 强制结束,进入写作阶段

# 根据当前状态决定下一步
if "RESEARCH_COMPLETE" in last_message and not state["analysis_insights"]:
return {"next_step": "analyst", "iteration_count": state.get("iteration_count", 0) + 1}
elif "ANALYSIS_COMPLETE" in last_message and not state["final_report"]:
return {"next_step": "writer", "iteration_count": state.get("iteration_count", 0) + 1}
elif state["final_report"]:
# 质量检查,可能需要迭代改进
if needs_improvement(state["final_report"]):
return {"next_step": "researcher", "iteration_count": state.get("iteration_count", 0) + 1}
return {"next_step": END}
else:
# 默认开始研究
return {"next_step": "researcher", "iteration_count": state.get("iteration_count", 0) + 1}

def needs_improvement(report: str) -> bool:
"""评估报告质量,决定是否需要改进"""
# 简化的质量检查逻辑
required_sections = ["执行摘要", "主要发现", "结论"]
return not all(section in report for section in required_sections)

# ============ Worker 节点 ============

def research_node(state: ResearchState):
"""Research Agent 执行节点"""
# 提取研究任务
query = state["messages"][0].content if state["messages"] else ""

result = research_agent.invoke({
"messages": [HumanMessage(content=f"研究主题: {query}")
] + list(state["messages"])
})

# 提取研究发现
findings = extract_findings(result["messages"][-1].content)

return {
"messages": result["messages"],
"research_findings": findings
}

def analysis_node(state: ResearchState):
"""Analysis Agent 执行节点"""
findings_text = "\n".join(state["research_findings"])

result = analysis_agent.invoke({
"messages": [HumanMessage(content=f"请分析以下研究发现:\n{findings_text}")]
})

insights = extract_insights(result["messages"][-1].content)

return {
"messages": result["messages"],
"analysis_insights": insights
}

def writing_node(state: ResearchState):
"""Writing Agent 执行节点"""
findings = "\n".join(state["research_findings"])
insights = "\n".join(state["analysis_insights"])

prompt = f"""基于以下研究资料撰写完整报告:

研究发现:
{findings}

分析洞察:
{insights}

请生成结构化的研究报告,以 "REPORT_COMPLETE" 结尾。
"""

result = writing_agent.invoke({
"messages": [HumanMessage(content=prompt)]
})

report = result["messages"][-1].content

return {
"messages": result["messages"],
"final_report": report
}

# ============ 构建图 ============

builder = StateGraph(ResearchState)
builder.add_node("supervisor", supervisor_node)
builder.add_node("researcher", research_node)
builder.add_node("analyst", analysis_node)
builder.add_node("writer", writing_node)

# Supervisor 路由
builder.add_conditional_edges(
"supervisor",
lambda state: state["next_step"],
{
"researcher": "researcher",
"analyst": "analyst",
"writer": "writer",
END: END
}
)

# Worker 返回 Supervisor
builder.add_edge("researcher", "supervisor")
builder.add_edge("analyst", "supervisor")
builder.add_edge("writer", "supervisor")

builder.set_entry_point("supervisor")

research_team_graph = builder.compile()

# ============ 使用示例 ============

async def run_research(query: str):
"""执行研究任务"""
initial_state = {
"messages": [HumanMessage(content=query)],
"research_findings": [],
"analysis_insights": [],
"final_report": "",
"iteration_count": 0
}

result = await research_team_graph.ainvoke(initial_state)

return {
"report": result["final_report"],
"findings_count": len(result["research_findings"]),
"iterations": result["iteration_count"]
}

# 运行示例
if __name__ == "__main__":
import asyncio

result = asyncio.run(run_research(
"分析2024-2025年生成式AI在医疗诊断领域的应用趋势和市场规模"
))

print(f"研究报告:\n{result['report'][:500]}...")
print(f"\n共收集 {result['findings_count']} 项研究发现")
print(f"迭代次数: {result['iterations']}")

多智能体系统的调试与监控

多智能体系统的调试比单体系统复杂得多。以下是关键实践:

执行轨迹追踪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from langchain.callbacks.base import BaseCallbackHandler
from typing import Any

class MultiAgentTracer(BaseCallbackHandler):
"""多智能体执行追踪器"""

def __init__(self):
self.traces = []
self.current_agent = None

def on_agent_action(self, action, **kwargs):
"""记录 Agent 行动"""
self.traces.append({
"type": "action",
"agent": self.current_agent,
"action": action.tool,
"input": action.tool_input,
"timestamp": datetime.now()
})

def on_agent_finish(self, finish, **kwargs):
"""记录 Agent 完成"""
self.traces.append({
"type": "finish",
"agent": self.current_agent,
"output": finish.return_values,
"timestamp": datetime.now()
})

def visualize_trace(self):
"""可视化执行轨迹"""
for trace in self.traces:
if trace["type"] == "action":
print(f"[{trace['timestamp']}] {trace['agent']} -> {trace['action']}")
else:
print(f"[{trace['timestamp']}] {trace['agent']} ✓")

# 使用追踪器
tracer = MultiAgentTracer()
result = await graph.ainvoke(
state,
config={"callbacks": [tracer]}
)
tracer.visualize_trace()

状态可视化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def visualize_state(state: dict, title: str = "当前状态"):
"""以树形结构可视化状态"""
from rich.tree import Tree
from rich import print as rprint

tree = Tree(f"[bold]{title}[/bold]")

for key, value in state.items():
if isinstance(value, list):
node = tree.add(f"[cyan]{key}[/cyan]: list[{len(value)}]")
for i, item in enumerate(value[:3]): # 只显示前3个
node.add(f"[{i}]: {str(item)[:50]}...")
if len(value) > 3:
node.add(f"... and {len(value) - 3} more")
elif isinstance(value, dict):
node = tree.add(f"[cyan]{key}[/cyan]: dict")
for k, v in value.items():
node.add(f"{k}: {str(v)[:50]}")
else:
tree.add(f"[cyan]{key}[/cyan]: {str(value)[:100]}")

rprint(tree)

性能监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from dataclasses import dataclass
from typing import Dict
import time

@dataclass
class AgentMetrics:
calls: int = 0
total_latency: float = 0.0
errors: int = 0
token_usage: int = 0

class MultiAgentMonitor:
"""多智能体性能监控"""

def __init__(self):
self.metrics: Dict[str, AgentMetrics] = {}

def record_call(self, agent_name: str, latency: float, tokens: int, error: bool = False):
"""记录一次调用"""
if agent_name not in self.metrics:
self.metrics[agent_name] = AgentMetrics()

metric = self.metrics[agent_name]
metric.calls += 1
metric.total_latency += latency
metric.token_usage += tokens
if error:
metric.errors += 1

def get_report(self) -> str:
"""生成监控报告"""
report = ["# 多智能体系统性能报告\n"]

for agent, metric in self.metrics.items():
avg_latency = metric.total_latency / metric.calls if metric.calls > 0 else 0
error_rate = metric.errors / metric.calls if metric.calls > 0 else 0

report.append(f"## {agent}")
report.append(f"- 调用次数: {metric.calls}")
report.append(f"- 平均延迟: {avg_latency:.2f}s")
report.append(f"- 错误率: {error_rate:.2%}")
report.append(f"- Token 消耗: {metric.token_usage}")
report.append("")

return "\n".join(report)

与相关技术的关系

多智能体系统不是孤立的技术,它与多个领域密切相关:

多智能体 vs 可解释性
当多个 Agent 协作时,每个 Agent 的决策都需要可追踪。上午文章讨论的可解释性设计,在多智能体场景下变得更加重要——不仅要解释单个 Agent 的行为,还要解释 Agent 之间的协调逻辑。

多智能体 vs 状态管理
共享状态是多智能体协作的基础,但过度共享会导致上下文爆炸,隔离过度则会导致信息孤岛。找到合适的平衡点需要精心设计状态结构。

多智能体 vs 人机协作
多智能体系统的价值最终要通过人机协作体现。清晰的 Agent 分工和通信协议,使人类能够更有效地监督和介入。

选择合适架构的决策框架

面对具体问题时,如何选择多智能体架构?以下决策树可以帮助:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
任务复杂度评估
├── 简单任务(单步完成)
│ └── 使用单体 Agent
├── 中等复杂度(多步但线性)
│ ├── 需要紧密协作?
│ │ ├── 是 → 协作模式
│ │ └── 否 → 监督者模式
└── 高复杂度(多层嵌套)
└── 层级团队模式

团队规模评估
├── 2-3 个 Agent
│ └── 任意模式均可
├── 4-6 个 Agent
│ └── 监督者或层级模式
└── 6+ 个 Agent
└── 层级模式(必须分组)

生产环境最佳实践

1. 渐进式采用

不要一开始就构建完整的多智能体系统。建议路径:

  1. 单体原型:先用单个 Agent 验证核心逻辑
  2. 功能拆分:识别出可以独立测试的功能模块
  3. 逐步并行:先让多个 Agent 并行工作,再引入协调
  4. 添加监督:最后引入 Supervisor 进行流程控制

2. 容错设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def invoke_with_fallback(agent, state, fallback_agent=None):
"""带重试和回退的 Agent 调用"""
try:
return agent.invoke(state)
except Exception as e:
if fallback_agent:
return fallback_agent.invoke(state)
raise

3. 成本优化

1
2
3
4
5
6
7
8
def smart_agent_selection(task_complexity: str) -> str:
"""根据任务复杂度选择合适的模型"""
model_map = {
"simple": "gpt-3.5-turbo", # 简单任务使用轻量模型
"medium": "gpt-4-turbo-preview", # 中等复杂度
"complex": "gpt-4" # 复杂任务使用最强模型
}
return model_map.get(task_complexity, "gpt-4")

4. 版本控制

1
2
3
4
5
6
7
8
9
10
11
AGENT_VERSIONS = {
"researcher": "v2.1.0",
"analyst": "v1.5.2",
"writer": "v2.0.1"
}

def get_agent(agent_name: str):
"""获取指定版本的 Agent"""
version = AGENT_VERSIONS.get(agent_name, "latest")
# 从注册表加载对应版本
return agent_registry.load(agent_name, version)

总结

多智能体架构代表了 AI 系统设计的重要演进。从单体到分布式,不仅是技术复杂度的增加,更是设计思维的根本转变:

单体思维:如何让一个 Agent 变得更强大?
多智能体思维:如何让多个 Agent 高效协作?

LangGraph 的状态图架构为多智能体系统提供了坚实的基础。三种核心模式——协作式、监督者式、层级式——覆盖了从简单协作到复杂企业级应用的各种场景。

生产级多智能体系统的构建是一个迭代过程:

  1. 设计阶段:明确定义 Agent 边界和通信协议
  2. 原型阶段:快速验证核心协作逻辑
  3. 优化阶段:添加监控、容错、成本优化
  4. 运营阶段:基于反馈持续改进 Agent 能力

最终目标不是构建完美的多智能体系统,而是构建一个可进化的系统——每个 Agent 可以独立改进,协作模式可以灵活调整,新 Agent 可以无缝加入。

这才是分布式智能的真正力量。