Multi-Agent 协作模式与分布式智能体系统设计:从单体到团队的演进

单一 Agent 的能力总有边界,但当多个专业 Agent 协同工作时,它们能完成远超个体能力的复杂任务。这是从”单体智能”到”群体智能”的跃迁。

1. 为什么需要 Multi-Agent?

1.1 单体 Agent 的局限性

上午我们探讨了单 Agent 的任务分解与规划能力。但当面对真正复杂的现实场景时,单体 Agent 会遇到瓶颈:

工具过载问题:当一个 Agent 需要处理 20+ 个工具时,工具选择准确率会急剧下降。研究表明,工具数量超过 15 个时,LLM 的工具调用准确率从 85% 降至 60% 以下。

上下文稀释:复杂任务需要维护大量中间状态,导致关键信息被稀释在冗长的上下文中。

专业度不足:通用 Agent 难以在多个垂直领域同时达到专家水平。

单点故障:一旦某个步骤出错,整个流程都可能失败,缺乏容错和冗余。

1.2 人类团队的启示

观察人类如何组织复杂工作:

  • 专业分工:程序员写代码,设计师做 UI,产品经理写 PRD
  • 并行协作:多个任务可以同时进行,互不阻塞
  • 层级管理:Team Lead 协调团队,CEO 制定战略
  • 质量把关:Code Review、设计评审确保输出质量

Multi-Agent 系统正是借鉴了这些组织原则。

1.3 Multi-Agent 的核心价值

维度 单体 Agent Multi-Agent
专业化 通用但浅 专业且深
并行度 串行执行 可并行执行
可扩展性 工具增多性能下降 增加 Agent 不影响单个 Agent 性能
可维护性 庞大复杂 模块化、可独立迭代
容错性 单点故障 可重试、可降级
可解释性 黑盒 Agent 分工明确,易于追踪

2. Multi-Agent 架构的核心概念

2.1 什么是 Multi-Agent 系统

Multi-Agent 系统(MAS)由多个独立运行的 Agent 组成,它们:

  • 各自拥有独立的 LLM、Prompt 和工具集
  • 通过特定的通信机制交换信息
  • 遵循预定的协作协议完成任务

在 LangGraph 中,Multi-Agent 系统可以自然地用图表示:

1
2
3
Agent A → Supervisor → Agent B
↓ ↓
Shared State ← ← ← ← ← ← ←

每个 Agent 是图中的一个节点,边代表控制流,共享状态实现通信。

2.2 三种核心协作模式

根据 LangGraph 官方实践,Multi-Agent 有三种主要架构模式:

模式一:Collaboration(协作模式)

多个 Agent 共享同一个消息 scratchpad,所有工作对彼此可见。

特点

  • 透明度高,Agent 可以看到彼此的完整思考过程
  • 适合需要深度协作的任务
  • 缺点是消息冗长,可能包含不必要的信息

模式二:Supervisor(监督者模式)

引入一个 Supervisor Agent 作为协调者,其他 Agent 作为 Worker。

特点

  • Worker Agent 有自己的独立 scratchpad
  • 只将最终结果提交给 Supervisor
  • 通信更精简,但牺牲了一定的透明度
  • Supervisor 可以看作是”工具为其他 Agent”的特殊 Agent

模式三:Hierarchical Teams(分层团队模式)

Worker 本身也是一个完整的 LangGraph 应用,形成层级结构。

特点

  • 最强的灵活性和封装性
  • 每个 Team 内部可以有复杂的子流程
  • 适合大规模、复杂的组织场景

2.3 状态管理:共享 vs 隔离

Multi-Agent 系统的状态设计是关键决策点:

完全共享状态

1
2
3
class SharedState(TypedDict):
messages: Annotated[list, operator.add] # 所有 Agent 可见
current_agent: str

部分隔离状态

1
2
3
4
5
6
7
8
9
class HierarchicalState(TypedDict):
# 全局可见
task: str
final_answer: Optional[str]

# Agent 私有(通过命名空间隔离)
researcher_scratchpad: list
writer_scratchpad: list
reviewer_scratchpad: list

完全隔离 + 消息传递

1
2
3
4
5
class MessagePassingState(TypedDict):
task: str
# 通过消息队列异步通信
inbox: Annotated[list, operator.add]
final_answer: Optional[str]

3. 实战:三种模式的 LangGraph 实现

3.1 Collaboration 模式:研究者与写手协作

场景:研究团队需要分析数据并撰写报告,Researcher 负责研究,Writer 负责写作,两者紧密协作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
from typing import TypedDict, Annotated, Sequence, Literal
import operator
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END

# ============ 状态定义 ============
class CollaborationState(TypedDict):
"""协作模式:共享消息历史"""
messages: Annotated[Sequence[BaseMessage], operator.add]
next: str # 下一个执行的 Agent
research_complete: bool
writing_complete: bool

# ============ Agent 定义 ============
researcher_prompt = """你是 Research Agent,专门负责信息收集和分析。

你的任务:
1. 分析用户请求,确定需要收集哪些信息
2. 使用搜索工具查找相关资料
3. 整理关键发现,用 bullet points 总结
4. 当研究完成时,回复 "RESEARCH_COMPLETE"

可用工具:
- web_search: 网络搜索
- data_analyzer: 数据分析

当前对话历史:"""

writer_prompt = """你是 Writer Agent,专门负责内容创作。

你的任务:
1. 基于 Research Agent 的发现撰写报告
2. 确保内容准确、结构清晰
3. 可以要求 Research Agent 补充信息
4. 当写作完成时,回复 "FINAL_ANSWER"

当前研究发现:"""

# 创建 Agent
researcher_model = ChatOpenAI(
model="gpt-4o",
temperature=0.2
).bind_tools([web_search, data_analyzer])

writer_model = ChatOpenAI(
model="gpt-4o",
temperature=0.7
)

# ============ 节点实现 ============
def researcher_node(state: CollaborationState):
"""Research Agent 节点"""
messages = state["messages"]

# 添加系统提示
system_msg = SystemMessage(content=researcher_prompt)
response = researcher_model.invoke([system_msg] + list(messages))

# 检查是否完成研究
research_complete = "RESEARCH_COMPLETE" in response.content

return {
"messages": [response],
"research_complete": research_complete,
"next": "writer" if research_complete else "researcher"
}

def writer_node(state: CollaborationState):
"""Writer Agent 节点"""
messages = state["messages"]

system_msg = SystemMessage(content=writer_prompt)
response = writer_model.invoke([system_msg] + list(messages))

# 检查是否需要补充研究或已完成
if "FINAL_ANSWER" in response.content:
return {
"messages": [response],
"writing_complete": True,
"next": "end"
}
elif "NEED_MORE_RESEARCH" in response.content:
return {
"messages": [response],
"next": "researcher"
}
else:
return {
"messages": [response],
"next": "writer" # 继续完善
}

def router(state: CollaborationState) -> Literal["researcher", "writer", "end"]:
"""路由决策"""
if state.get("writing_complete"):
return "end"
return state["next"]

# ============ 构建工作流 ============
workflow = StateGraph(CollaborationState)

workflow.add_node("researcher", researcher_node)
workflow.add_node("writer", writer_node)

workflow.set_entry_point("researcher")

workflow.add_conditional_edges(
"researcher",
router,
{"researcher": "researcher", "writer": "writer", "end": END}
)

workflow.add_conditional_edges(
"writer",
router,
{"researcher": "researcher", "writer": "writer", "end": END}
)

app = workflow.compile()

# ============ 运行 ============
result = app.invoke({
"messages": [HumanMessage(content="分析2025年AI Agent市场趋势,撰写一份投资报告")],
"research_complete": False,
"writing_complete": False
})

Collaboration 模式的关键点

  1. 共享 messages 让两个 Agent 看到完整对话历史
  2. Agent 通过特定标记(”RESEARCH_COMPLETE”)传递状态
  3. 路由逻辑决定流程走向

3.2 Supervisor 模式:客服工单处理系统

场景:客服中心需要处理多种类型的工单,Supervisor 负责分类和路由,专业 Agent 处理具体问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
from typing import TypedDict, Annotated
from pydantic import BaseModel, Field
import operator

# ============ 状态定义 ============
class SupervisorState(TypedDict):
"""Supervisor 模式:Worker 有自己的私有状态"""
# 全局状态(所有 Agent 可见)
customer_message: str
customer_id: Optional[str]
final_response: Optional[str]

# Supervisor 决策
next_agent: str
task_status: Literal["pending", "in_progress", "completed", "escalated"]

# Worker 私有状态(仅在各自节点中使用)
technical_context: Optional[dict]
billing_context: Optional[dict]
sales_context: Optional[dict]

# ============ 路由决策模型 ============
class RouteDecision(BaseModel):
"""Supervisor 的路由决策"""
agent: Literal["technical", "billing", "sales", "general", "human"]
reason: str = Field(description="选择该 Agent 的原因")
priority: Literal["low", "medium", "high", "urgent"]
estimated_turns: int = Field(description="预计需要多少轮对话")

# ============ Supervisor 节点 ============
supervisor_model = ChatOpenAI(
model="gpt-4o"
).with_structured_output(RouteDecision)

def supervisor_node(state: SupervisorState):
"""Supervisor Agent:负责路由和协调"""

supervisor_prompt = f"""你是客服 Supervisor,负责分析客户请求并分配给合适的专业 Agent。

当前请求:{state['customer_message']}
客户ID:{state.get('customer_id', '未识别')}

可用 Agent:
- technical: 处理技术问题、故障排查、功能咨询
- billing: 处理账单、退款、支付问题
- sales: 处理产品咨询、升级、新订单
- general: 处理一般性咨询
- human: 复杂或敏感问题,需要人工介入

请分析并输出路由决策。"""

decision = supervisor_model.invoke(supervisor_prompt)

return {
"next_agent": decision.agent,
"task_status": "in_progress"
}

# ============ Worker Agent 节点 ============
def technical_agent(state: SupervisorState):
"""技术支持 Agent"""

tech_prompt = f"""你是技术支持专家。处理客户的技术问题。

客户问题:{state['customer_message']}

请:
1. 分析问题根因
2. 提供分步解决方案
3. 如果无法解决,说明需要升级的原因

返回格式:
{{"analysis": "问题分析", "solution": "解决方案", "solved": true/false}}"""

response = ChatOpenAI(model="gpt-4o").invoke(tech_prompt)

# 解析响应
try:
result = json.loads(response.content)
if result["solved"]:
return {
"final_response": f"【技术支持】\n问题分析:{result['analysis']}\n\n解决方案:{result['solution']}",
"task_status": "completed"
}
else:
return {
"technical_context": result,
"next_agent": "human", # 升级到人工
"task_status": "escalated"
}
except:
return {"final_response": response.content, "task_status": "completed"}

def billing_agent(state: SupervisorState):
"""账单支持 Agent"""
# 类似实现,处理账单相关问题
billing_prompt = f"""你是账单支持专家..."""
# ...
return {"final_response": "账单处理结果", "task_status": "completed"}

def sales_agent(state: SupervisorState):
"""销售支持 Agent"""
# 处理销售相关问题
return {"final_response": "销售咨询回复", "task_status": "completed"}

def general_agent(state: SupervisorState):
"""通用咨询 Agent"""
# 处理一般性问题
return {"final_response": "通用回复", "task_status": "completed"}

# ============ 路由逻辑 ============
def route_to_agent(state: SupervisorState) -> str:
"""根据 Supervisor 决策路由到对应 Agent"""
next_agent = state["next_agent"]

# 映射到节点名称
agent_map = {
"technical": "technical",
"billing": "billing",
"sales": "sales",
"general": "general",
"human": END # 升级到人工,结束自动流程
}

return agent_map.get(next_agent, END)

def check_completion(state: SupervisorState) -> str:
"""检查是否完成或需要重新路由"""
if state["task_status"] == "completed":
return END
elif state["task_status"] == "escalated":
return "supervisor" # 重新路由
else:
return END

# ============ 构建工作流 ============
workflow = StateGraph(SupervisorState)

# 添加节点
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("technical", technical_agent)
workflow.add_node("billing", billing_agent)
workflow.add_node("sales", sales_agent)
workflow.add_node("general", general_agent)

# 设置入口
workflow.set_entry_point("supervisor")

# Supervisor 路由到各 Worker
workflow.add_conditional_edges(
"supervisor",
route_to_agent,
{
"technical": "technical",
"billing": "billing",
"sales": "sales",
"general": "general",
END: END
}
)

# Worker 完成后检查状态
for agent in ["technical", "billing", "sales", "general"]:
workflow.add_conditional_edges(
agent,
check_completion,
{END: END, "supervisor": "supervisor"}
)

app = workflow.compile()

# ============ 运行示例 ============
result = app.invoke({
"customer_message": "我的服务器无法连接,已经尝试了重启但没有用。请帮我排查一下。",
"customer_id": "CUST-12345"
})

Supervisor 模式的关键点

  1. Worker 的私有状态不互相暴露
  2. Supervisor 负责智能路由
  3. 支持升级和重新路由机制

3.3 Hierarchical Teams 模式:企业级研究助手

场景:构建一个企业级研究助手,包含多个专业团队,每个团队内部有复杂的子流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END

# ============ 子图:研究团队 ============
class ResearchTeamState(TypedDict):
"""研究团队私有状态"""
topic: str
search_queries: list
search_results: Annotated[list, operator.add]
synthesis: Optional[str]
confidence: float

def create_research_team() -> StateGraph:
"""创建一个研究团队子图"""

workflow = StateGraph(ResearchTeamState)

# 子图节点
def planner(state: ResearchTeamState):
"""规划搜索策略"""
# 生成多个搜索查询
return {"search_queries": [f"{state['topic']} 最新进展", f"{state['topic']} 市场分析"]}

def searcher(state: ResearchTeamState):
"""执行搜索"""
results = []
for query in state["search_queries"]:
# 实际调用搜索 API
results.append({"query": query, "results": f"搜索结果 for {query}"})
return {"search_results": results}

def synthesizer(state: ResearchTeamState):
"""综合研究结果"""
# 综合所有搜索结果
synthesis = f"综合了 {len(state['search_results'])} 个来源的研究"
return {
"synthesis": synthesis,
"confidence": 0.85
}

workflow.add_node("planner", planner)
workflow.add_node("searcher", searcher)
workflow.add_node("synthesizer", synthesizer)

workflow.set_entry_point("planner")
workflow.add_edge("planner", "searcher")
workflow.add_edge("searcher", "synthesizer")
workflow.add_edge("synthesizer", END)

return workflow.compile()

# ============ 子图:写作团队 ============
class WritingTeamState(TypedDict):
"""写作团队私有状态"""
research_input: str
outline: Optional[str]
draft: Optional[str]
polished: Optional[str]
revision_count: int

def create_writing_team() -> StateGraph:
"""创建一个写作团队子图"""

workflow = StateGraph(WritingTeamState)

def outliner(state: WritingTeamState):
"""生成大纲"""
outline = f"基于研究输入:{state['research_input'][:100]}...\n\n1. 引言\n2. 主体\n3. 结论"
return {"outline": outline}

def drafter(state: WritingTeamState):
"""撰写初稿"""
draft = f"根据大纲 {state['outline']} 撰写的文章初稿"
return {"draft": draft}

def editor(state: WritingTeamState):
"""编辑润色"""
polished = f"编辑后的文章:{state['draft']}"
return {
"polished": polished,
"revision_count": state.get("revision_count", 0) + 1
}

workflow.add_node("outliner", outliner)
workflow.add_node("drafter", drafter)
workflow.add_node("editor", editor)

workflow.set_entry_point("outliner")
workflow.add_edge("outliner", "drafter")
workflow.add_edge("drafter", "editor")
workflow.add_edge("editor", END)

return workflow.compile()

# ============ 主图:协调层 ============
class HierarchicalSystemState(TypedDict):
"""顶层系统状态"""
user_request: str
research_output: Optional[str]
final_report: Optional[str]
status: str

def create_hierarchical_system():
"""创建分层团队系统"""

# 创建子团队
research_team = create_research_team()
writing_team = create_writing_team()

workflow = StateGraph(HierarchicalSystemState)

# 封装子图为节点
def research_team_node(state: HierarchicalSystemState):
"""研究团队节点(子图)"""
# 调用研究团队子图
result = research_team.invoke({
"topic": state["user_request"],
"search_queries": [],
"search_results": [],
"synthesis": None,
"confidence": 0.0
})
return {
"research_output": result["synthesis"],
"status": "research_completed"
}

def writing_team_node(state: HierarchicalSystemState):
"""写作团队节点(子图)"""
result = writing_team.invoke({
"research_input": state["research_output"],
"outline": None,
"draft": None,
"polished": None,
"revision_count": 0
})
return {
"final_report": result["polished"],
"status": "completed"
}

workflow.add_node("research_team", research_team_node)
workflow.add_node("writing_team", writing_team_node)

workflow.set_entry_point("research_team")
workflow.add_edge("research_team", "writing_team")
workflow.add_edge("writing_team", END)

return workflow.compile()

# ============ 运行系统 ============
system = create_hierarchical_system()
result = system.invoke({
"user_request": "分析2026年量子计算在药物研发中的应用前景",
"research_output": None,
"final_report": None,
"status": "started"
})

Hierarchical 模式的关键点

  1. 每个 Team 是一个独立的子图,有自己的复杂逻辑
  2. 子图之间通过顶层状态传递关键信息
  3. 最强的封装性和可复用性

4. 通信机制设计

4.1 同步通信 vs 异步通信

同步通信

1
2
3
4
def agent_a_node(state):
result = agent_a.invoke(state)
# 直接返回,下一个节点立即执行
return {"messages": [result]}

异步通信(消息队列)

1
2
3
4
5
6
7
8
9
10
11
12
13
class AsyncState(TypedDict):
inbox: Annotated[list, operator.add] # 消息队列

def agent_a_node(state):
# 发送消息到队列
message = {"to": "agent_b", "content": "数据已准备好"}
return {"inbox": [message]}

def dispatcher(state):
# 分发消息到对应 Agent
for msg in state["inbox"]:
if msg["to"] == "agent_b":
return "agent_b"

4.2 消息格式标准化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from datetime import datetime
from pydantic import BaseModel
from typing import Literal

class AgentMessage(BaseModel):
"""标准化的 Agent 间消息格式"""
message_id: str
timestamp: datetime
from_agent: str
to_agent: str
message_type: Literal["request", "response", "notification", "error"]
content: dict
priority: Literal["low", "normal", "high", "urgent"] = "normal"
requires_response: bool = False
context_refs: list[str] = [] # 引用的上下文消息 ID

# 示例消息
research_request = AgentMessage(
message_id="msg-001",
timestamp=datetime.now(),
from_agent="writer",
to_agent="researcher",
message_type="request",
content={
"topic": "量子计算最新进展",
"depth": "comprehensive",
"deadline": "2026-02-23T12:00:00"
},
priority="high",
requires_response=True
)

5. 错误处理与容错设计

5.1 Agent 失败重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def resilient_agent_node(state):
"""带重试机制的 Agent 节点"""
try:
result = agent.invoke(state)
return result
except Exception as e:
# 记录失败
state["retry_count"] = state.get("retry_count", 0) + 1
raise

def fallback_handler(state):
"""降级处理:当 Agent 多次失败后"""
if state.get("retry_count", 0) >= 3:
# 使用简化版 Agent 或返回错误信息
return {
"final_response": "该请求暂时无法处理,已转接人工服务。",
"escalated": True
}

5.2 超时控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
from concurrent.futures import TimeoutError

async def agent_with_timeout(state, timeout_seconds=30):
"""带超时的 Agent 执行"""
try:
result = await asyncio.wait_for(
agent.ainvoke(state),
timeout=timeout_seconds
)
return result
except TimeoutError:
return {
"error": "Agent 执行超时",
"partial_result": state.get("partial_result")
}

6. 性能优化策略

6.1 并行执行无依赖任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
from typing import List

async def parallel_agents(state, agent_list: List[callable]):
"""并行执行多个 Agent"""
tasks = [agent(state) for agent in agent_list]
results = await asyncio.gather(*tasks, return_exceptions=True)

# 合并结果
merged = {}
for i, result in enumerate(results):
if isinstance(result, Exception):
merged[f"agent_{i}_error"] = str(result)
else:
merged.update(result)

return merged

# 使用示例
async def research_parallel(state):
results = await parallel_agents(
state,
[web_research_agent, data_analysis_agent, expert_interview_agent]
)
return {"research_results": results}

6.2 结果缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from functools import lru_cache
import hashlib

class AgentCache:
"""Agent 结果缓存"""

def __init__(self):
self._cache = {}

def _get_key(self, state) -> str:
"""生成状态指纹"""
state_str = json.dumps(state, sort_keys=True, default=str)
return hashlib.md5(state_str.encode()).hexdigest()

def get_or_compute(self, state, agent_func):
key = self._get_key(state)
if key in self._cache:
return self._cache[key]

result = agent_func(state)
self._cache[key] = result
return result

cache = AgentCache()

def cached_agent_node(state):
return cache.get_or_compute(state, expensive_agent_function)

7. 评估与监控

7.1 Agent 性能指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from dataclasses import dataclass
from typing import List
import time

@dataclass
class AgentMetrics:
"""Agent 性能指标"""
agent_name: str
invocation_count: int
avg_latency_ms: float
success_rate: float
token_usage: int
error_types: List[str]

def create_metrics_collector():
"""创建指标收集器"""
metrics = {}

def collect(agent_name: str, start_time: float, result: dict, error: Exception = None):
if agent_name not in metrics:
metrics[agent_name] = {
"invocations": 0,
"total_latency": 0,
"successes": 0,
"failures": 0,
"errors": []
}

m = metrics[agent_name]
m["invocations"] += 1
m["total_latency"] += (time.time() - start_time) * 1000

if error:
m["failures"] += 1
m["errors"].append(type(error).__name__)
else:
m["successes"] += 1

return collect, metrics

7.2 协作质量评估

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class CollaborationEvaluator:
"""评估 Multi-Agent 协作质量"""

def evaluate_handoff(self, from_agent: str, to_agent: str, message: dict) -> float:
"""评估 Agent 间交接质量"""
scores = []

# 信息完整性
scores.append(1.0 if "context" in message else 0.0)

# 清晰度
scores.append(min(len(message.get("content", "")) / 100, 1.0))

# 可追溯性
scores.append(1.0 if "reference_id" in message else 0.0)

return sum(scores) / len(scores)

def evaluate_end_to_end(self, initial_request: str, final_output: str) -> dict:
"""端到端质量评估"""
return {
"completeness": self._check_completeness(initial_request, final_output),
"accuracy": self._check_accuracy(final_output),
"coherence": self._check_coherence(final_output)
}

8. 最佳实践总结

8.1 设计原则

  1. 单一职责:每个 Agent 专注于一个明确的职责领域
  2. 显式接口:Agent 间的通信格式标准化、文档化
  3. 失败隔离:一个 Agent 的失败不应导致整个系统崩溃
  4. 可观测性:每个 Agent 的决策和执行过程可追踪
  5. 渐进复杂:从简单模式开始,按需增加复杂度

8.2 模式选择指南

场景 推荐模式 原因
两个 Agent 紧密协作 Collaboration 透明度最高,便于迭代
3-5 个专业 Agent Supervisor 通信精简,易于管理
大规模团队(5+) Hierarchical 模块化强,可分层管理
需要人工介入 Supervisor 易于集成 human-in-the-loop
实时协作场景 Collaboration 响应最快

8.3 常见陷阱

  1. 过度工程化:不要为了 Multi-Agent 而 Multi-Agent,简单场景单 Agent 可能更好
  2. 通信风暴:共享状态下消息过多,导致上下文膨胀
  3. 循环依赖:Agent A 依赖 B,B 又依赖 A,导致死锁
  4. 状态同步问题:多个 Agent 同时修改同一状态,产生冲突
  5. 监控盲区:子 Agent 的执行情况难以追踪

9. 与单 Agent 的对比与结合

9.1 何时使用 Multi-Agent

适合 Multi-Agent

  • 任务涉及多个专业领域
  • 需要并行处理独立子任务
  • 系统需要长期演进,模块化带来维护优势
  • 需要引入人工审核或外部系统

适合单 Agent

  • 任务相对简单,工具 < 10 个
  • 追求最低延迟
  • 开发和调试成本优先于可维护性

9.2 混合架构

1
2
3
4
5
6
7
8
9
10
11
def hybrid_agent_system(user_input):
"""混合架构:先用单 Agent 尝试,复杂时再切换"""

# 第一层:简单 Agent 尝试解决
simple_result = simple_agent.invoke(user_input)

if simple_result["confidence"] > 0.8:
return simple_result

# 复杂场景:切换到 Multi-Agent
return multi_agent_system.invoke(user_input)

10. 总结

Multi-Agent 系统代表了从”单体智能”向”群体智能”的演进。通过合理的架构设计和协作机制,我们可以构建出远超单 Agent 能力的复杂系统。

三种核心模式各有适用场景:

  • Collaboration:透明协作,适合深度配合
  • Supervisor:清晰分工,适合专业团队
  • Hierarchical:模块化封装,适合大规模组织

关键在于理解:Multi-Agent 的价值不在于 Agent 的数量,而在于它们如何协同产生 1+1>2 的效果。


系列文章

参考资源