单体 Agent 的能力边界正在被触及。当任务复杂度超过单一模型的上下文窗口,当工具选择数量达到数十个,当需要不同领域的专业知识协同工作时,多智能体架构成为必然选择。
多智能体系统的核心洞察是:分解带来能力 。就像人类团队通过分工协作完成复杂项目,AI Agent 也可以通过专业化分工和结构化协作,突破单体架构的能力天花板。
多智能体架构的核心问题 设计多智能体系统时,需要回答两个根本性问题:
问题
设计考量
影响
谁是独立的 Agent?
职责边界、工具集合、提示词设计
系统的模块化程度
Agent 如何连接?
通信机制、路由策略、状态共享
系统的协作效率
这两个问题的答案决定了系统的拓扑结构。LangGraph 的状态图架构为这些问题提供了优雅的解决框架:每个 Agent 是图中的一个节点,节点之间的边定义了控制流和通信路径。
为什么多智能体更有效 “如果一个 Agent 都做不好,为什么多个 Agent 会有效?”这是一个常见的质疑。答案在于专业化带来的精度提升 :
1. 工具聚焦效应 当 Agent 需要从一个庞大的工具库中选择时,错误选择的概率随工具数量增加。研究表明,当可选工具超过 20 个时,Agent 的工具选择准确率显著下降。
多智能体架构通过职责分组,让每个 Agent 只接触相关的工具子集:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class MonolithicAgent : tools = [search_tool, sql_tool, python_tool, email_tool, calendar_tool, crm_tool, analytics_tool, ...] class ResearchAgent : tools = [search_tool, web_scraper, document_parser] class DataAgent : tools = [sql_tool, python_tool, analytics_tool] class CommunicationAgent : tools = [email_tool, calendar_tool, notification_tool]
2. 提示词专业化 每个 Agent 可以有独立的系统提示词和少样本示例,针对特定任务优化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 research_agent_prompt = """你是一位专业的研究分析师。 你的职责是: 1. 搜索并收集相关信息 2. 验证信息来源的可靠性 3. 整理关键发现和数据 示例: 用户问:"2024年电动车市场份额" 思考:需要查找最新的市场研究报告 行动:使用搜索工具查找权威数据 观察:找到 McKinsey 报告... """ analysis_agent_prompt = """你是一位数据分析师。 你的职责是: 1. 理解研究 Agent 提供的原始数据 2. 进行统计分析 3. 生成可视化洞察 注意:只处理已经验证的数据,不进行新的搜索。 """
3. 独立评估与优化 模块化架构允许单独评估和改进每个 Agent,而不会影响整个系统:
1 2 3 4 5 6 7 8 9 10 11 12 research_result = await research_agent.ainvoke( {"query" : "测试查询" }, config={"callbacks" : [evaluation_callback]} ) analysis_agent = create_react_agent( model=llm, tools=analysis_tools, prompt=optimized_analysis_prompt )
LangGraph 多智能体模式 LangGraph 提供了三种核心多智能体协作模式,每种适用于不同的场景。
模式一:协作式多智能体(Collaboration) 适用场景 :需要紧密协作、信息共享的任务,如联合研究、头脑风暴。
核心特征 :
所有 Agent 共享同一个消息 scratchpad
每个 Agent 都能看到其他 Agent 的完整思考过程
通过路由器决定下一个由哪个 Agent 执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 from langgraph.graph import StateGraph, MessagesStatefrom langgraph.prebuilt import create_react_agentfrom typing import Literal research_agent = create_react_agent( model=gpt4, tools=[search_tool, web_fetch_tool], prompt="你是一个研究专家,负责收集信息。" ) analysis_agent = create_react_agent( model=gpt4, tools=[python_tool, calculator_tool], prompt="你是一个数据分析专家,负责处理和解释数据。" ) writing_agent = create_react_agent( model=gpt4, tools=[document_tool], prompt="你是一个写作专家,负责生成清晰、结构化的报告。" ) def router (state: MessagesState ) -> Literal ["researcher" , "analyst" , "writer" , "__end__" ]: """ 根据当前状态决定下一个执行者 """ last_message = state["messages" ][-1 ] if hasattr (last_message, 'name' ) and last_message.name: return last_message.name content = last_message.content.lower() if "需要更多数据" in content or "搜索" in content: return "researcher" elif "计算" in content or "分析" in content: return "analyst" elif "撰写" in content or "生成报告" in content: return "writer" elif "完成" in content or "结束" in content: return "__end__" return "analyst" builder = StateGraph(MessagesState) builder.add_node("researcher" , research_agent) builder.add_node("analyst" , analysis_agent) builder.add_node("writer" , writing_agent) builder.add_conditional_edges("researcher" , router) builder.add_conditional_edges("analyst" , router) builder.add_conditional_edges("writer" , router) builder.set_entry_point("researcher" ) collaboration_graph = builder.compile ()
协作模式的优势 :
透明度高 :所有思考过程可见,便于调试和审计
灵活性强 :Agent 可以根据上下文动态调整策略
知识共享 :一个 Agent 的发现立即对所有 Agent 可用
协作模式的挑战 :
上下文膨胀 :共享 scratchpad 可能变得冗长
角色混淆 :Agent 可能模仿其他 Agent 的风格而非坚持自己的角色
决策延迟 :需要路由器持续判断下一步,增加延迟
模式二:监督者模式(Supervisor) 适用场景 :任务需要明确的分阶段执行,如软件开发生命周期、业务流程自动化。
核心特征 :
每个 Agent 有独立的内部状态
Supervisor Agent 负责协调和路由
工作成果通过结构化消息传递
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 from typing import TypedDict, Annotatedfrom langgraph.graph import StateGraph, ENDfrom operator import addclass TeamState (TypedDict ): """团队共享状态""" task: str plan: list [str ] current_step: int results: Annotated[list [dict ], add] final_output: str class WorkerOutput (TypedDict ): """Worker Agent 的输出格式""" worker_name: str task_description: str result: str status: Literal ["success" , "failed" , "needs_retry" ] def create_worker_node (agent, name: str ): """创建 Worker 节点""" def worker_node (state: TeamState ): current_task = state["plan" ][state["current_step" ]] result = agent.invoke({ "task" : current_task, "context" : state["results" ] }) return { "results" : [{ "worker_name" : name, "task_description" : current_task, "result" : result["output" ], "status" : "success" }], "current_step" : state["current_step" ] + 1 } return worker_node requirements_agent = create_react_agent( model=gpt4, tools=[document_reader, stakeholder_interview_tool], prompt="你是需求分析专家。收集并澄清用户需求,输出结构化需求文档。" ) design_agent = create_react_agent( model=gpt4, tools=[architecture_design_tool, diagram_generator], prompt="你是系统架构师。基于需求设计技术方案,输出架构图和设计文档。" ) code_agent = create_react_agent( model=gpt4, tools=[code_generator, linter, test_generator], prompt="你是资深开发者。根据设计文档生成高质量代码,包括单元测试。" ) def supervisor_node (state: TeamState ): """ Supervisor 负责: 1. 制定执行计划 2. 监控执行进度 3. 决定下一步行动 4. 整合最终结果 """ if not state["plan" ]: plan = create_execution_plan(state["task" ]) return {"plan" : plan, "current_step" : 0 } if state["current_step" ] >= len (state["plan" ]): final_output = synthesize_results(state["results" ]) return {"final_output" : final_output} if state["results" ]: last_result = state["results" ][-1 ] if last_result["status" ] == "failed" : return handle_failure(state) return {} def route_by_plan (state: TeamState ) -> str : """根据当前步骤决定路由到哪个 Worker""" step_to_worker = { 0 : "requirements" , 1 : "design" , 2 : "code" , 3 : "test" , 4 : "deploy" } current_step = state["current_step" ] if current_step >= len (state["plan" ]): return END return step_to_worker.get(current_step, "supervisor" ) builder = StateGraph(TeamState) builder.add_node("supervisor" , supervisor_node) builder.add_node("requirements" , create_worker_node(requirements_agent, "requirements" )) builder.add_node("design" , create_worker_node(design_agent, "design" )) builder.add_node("code" , create_worker_node(code_agent, "code" )) builder.set_entry_point("supervisor" ) builder.add_conditional_edges("supervisor" , route_by_plan) builder.add_edge("requirements" , "supervisor" ) builder.add_edge("design" , "supervisor" ) builder.add_edge("code" , "supervisor" ) supervisor_graph = builder.compile ()
监督者模式的优势 :
清晰的控制流 :执行顺序显式定义,可预测
状态隔离 :每个 Worker 的内部工作不污染共享状态
便于监控 :Supervisor 可以统一收集指标和日志
监督者模式的挑战 :
单点瓶颈 :Supervisor 成为性能瓶颈和故障点
灵活性有限 :预设的执行路径难以应对意外情况
上下文丢失 :Worker 之间通过结构化消息通信,可能丢失细节
模式三:层级智能体团队(Hierarchical Teams) 适用场景 :超大规模任务,需要多层分解和协调,如企业级项目管理、复杂研究项目。
核心特征 :
Agent 本身可以是另一个 LangGraph
支持无限层级嵌套
父级 Agent 协调多个子级团队
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 from langgraph.graph import StateGraphfrom typing import TypedDict, List class EnterpriseState (TypedDict ): project_goal: str department_results: dict consolidated_report: str executive_summary: str def create_department_team (department_name: str ): """创建部门级团队子图""" class DepartmentState (TypedDict ): department: str tasks: List [str ] assigned_workers: List [str ] deliverables: List [str ] status: str def department_supervisor (state: DepartmentState ): """部门主管:分配任务给团队成员""" assignments = assign_to_workers(state["tasks" ], state["assigned_workers" ]) return {"assignments" : assignments} def worker_execution (state: DepartmentState ): """团队成员执行具体任务""" results = [] for task in state["tasks" ]: result = execute_task(task) results.append(result) return {"deliverables" : results, "status" : "completed" } builder = StateGraph(DepartmentState) builder.add_node("supervisor" , department_supervisor) builder.add_node("workers" , worker_execution) builder.add_edge("supervisor" , "workers" ) builder.set_entry_point("supervisor" ) return builder.compile () engineering_team = create_department_team("Engineering" ) marketing_team = create_department_team("Marketing" ) sales_team = create_department_team("Sales" ) def enterprise_planner (state: EnterpriseState ): """ 企业级规划器: 1. 理解整体项目目标 2. 分解为部门级子目标 3. 确定部门间依赖关系 """ department_goals = { "engineering" : "开发新产品功能" , "marketing" : "制定上市策略" , "sales" : "准备销售渠道" } return {"department_goals" : department_goals} def department_router (state: EnterpriseState ): """将工作路由到相应部门""" return ["engineering" , "marketing" , "sales" ] def engineering_subgraph (state: EnterpriseState ): """工程部门执行""" result = engineering_team.invoke({ "department" : "Engineering" , "tasks" : ["架构设计" , "核心开发" , "测试覆盖" ], "assigned_workers" : ["architect" , "senior_dev" , "qa_lead" ] }) return {"department_results" : {"engineering" : result}} def marketing_subgraph (state: EnterpriseState ): """市场部门执行""" result = marketing_team.invoke({ "department" : "Marketing" , "tasks" : ["竞品分析" , "定位策略" , "营销素材" ], "assigned_workers" : ["analyst" , "strategist" , "designer" ] }) return {"department_results" : {"marketing" : result}} def sales_subgraph (state: EnterpriseState ): """销售部门执行""" result = sales_team.invoke({ "department" : "Sales" , "tasks" : ["渠道准备" , "销售培训" , "客户沟通" ], "assigned_workers" : ["channel_mgr" , "trainer" , "account_exec" ] }) return {"department_results" : {"sales" : result}} def consolidation_node (state: EnterpriseState ): """整合各部门结果""" engineering_out = state["department_results" ]["engineering" ] marketing_out = state["department_results" ]["marketing" ] sales_out = state["department_results" ]["sales" ] consolidated = f""" # 项目综合报告 ## 工程部门 {engineering_out["deliverables" ]} ## 市场部门 {marketing_out["deliverables" ]} ## 销售部门 {sales_out["deliverables" ]} """ return {"consolidated_report" : consolidated} def executive_summary_node (state: EnterpriseState ): """生成高管摘要""" summary = generate_executive_summary(state["consolidated_report" ]) return {"executive_summary" : summary} builder = StateGraph(EnterpriseState) builder.add_node("planner" , enterprise_planner) builder.add_node("router" , department_router) builder.add_node("engineering" , engineering_subgraph) builder.add_node("marketing" , marketing_subgraph) builder.add_node("sales" , sales_subgraph) builder.add_node("consolidation" , consolidation_node) builder.add_node("executive_summary" , executive_summary_node) builder.set_entry_point("planner" ) builder.add_edge("planner" , "router" ) builder.add_edge("router" , "engineering" ) builder.add_edge("router" , "marketing" ) builder.add_edge("router" , "sales" ) builder.add_edge(["engineering" , "marketing" , "sales" ], "consolidation" ) builder.add_edge("consolidation" , "executive_summary" ) enterprise_graph = builder.compile ()
层级模式的优势 :
可扩展性 :支持任意复杂度的任务分解
模块化复用 :子图可以在不同项目中重复使用
并行执行 :不同部门/团队可以并行工作
层级模式的挑战 :
协调复杂度高 :层级越深,协调开销越大
上下文传递 :信息在不同层级间传递可能失真
调试困难 :多层嵌套使问题定位复杂
多智能体通信模式 无论采用哪种架构模式,Agent 之间的通信机制都是关键设计决策。
模式 A:消息传递(Message Passing) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 from typing import TypedDictfrom datetime import datetimeclass Message (TypedDict ): sender: str recipient: str content: str message_type: Literal ["task" , "response" , "query" , "notification" ] timestamp: datetime priority: int attachments: List [dict ] class MessageBus : """消息总线:Agent 间通信的中介""" def __init__ (self ): self .messages: List [Message] = [] self .subscribers: Dict [str , Callable ] = {} def subscribe (self, agent_name: str , handler: Callable ): """Agent 订阅消息""" self .subscribers[agent_name] = handler def publish (self, message: Message ): """发布消息""" self .messages.append(message) if message["recipient" ] in self .subscribers: self .subscribers[message["recipient" ]](message) def get_messages_for (self, agent_name: str ) -> List [Message]: """获取 Agent 的待处理消息""" return [m for m in self .messages if m["recipient" ] == agent_name and not m.get("read" )]
模式 B:共享状态(Shared State) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 from typing import TypedDict, Annotatedfrom operator import addclass SharedWorkspace (TypedDict ): """共享工作空间:所有 Agent 共同维护""" current_task: str task_history: Annotated[list , add] shared_knowledge: dict hypotheses: list conclusions: list active_agents: list handoff_queue: list created_at: datetime last_updated: datetime def update_shared_workspace (state: SharedWorkspace, update: dict ): """原子化更新共享状态""" new_state = {**state} for key, value in update.items(): if key in ["task_history" , "hypotheses" , "conclusions" ]: new_state[key] = state.get(key, []) + [value] else : new_state[key] = value new_state["last_updated" ] = datetime.now() return new_state
模式 C:函数调用(Function Calling) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 from langchain_core.tools import tool@tool def delegate_to_agent (agent_name: str , task: str , context: str ) -> str : """ 将任务委托给其他 Agent Args: agent_name: 目标 Agent 的名称 task: 具体任务描述 context: 相关上下文信息 """ agent_registry = { "researcher" : research_agent, "analyst" : analysis_agent, "writer" : writing_agent } if agent_name not in agent_registry: return f"Error: Unknown agent '{agent_name} '" target_agent = agent_registry[agent_name] result = target_agent.invoke({ "task" : task, "context" : context }) return result["output" ] @tool def request_clarification (from_agent: str , question: str ) -> str : """ 向另一个 Agent 请求澄清 Args: from_agent: 要询问的 Agent question: 具体问题 """ return f"已向 {from_agent} 发送澄清请求: {question} " collaboration_tools = [ delegate_to_agent, request_clarification, ]
实战:构建研究助手团队 让我们将理论付诸实践,构建一个完整的多智能体研究助手系统。
系统架构 1 2 3 4 5 6 7 8 9 10 11 12 ┌─────────────────────────────────────────────────────────────┐ │ Supervisor Agent │ │ (路由 + 质量控制) │ └──────────────────┬──────────────────────────────────────────┘ │ ┌──────────┼──────────┐ │ │ │ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Research │ │ Analysis │ │ Writing │ │ Agent │ │ Agent │ │ Agent │ └──────────┘ └──────────┘ └──────────┘
完整实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 from langgraph.graph import StateGraph, ENDfrom langgraph.prebuilt import create_react_agentfrom typing import TypedDict, Annotated, Sequence from langchain_core.messages import BaseMessage, HumanMessage, AIMessagefrom operator import addimport operatorclass ResearchState (TypedDict ): """研究团队共享状态""" messages: Annotated[Sequence [BaseMessage], add] research_findings: Annotated[list , add] analysis_insights: list final_report: str next_step: str iteration_count: int research_agent = create_react_agent( model=gpt4, tools=[search_tool, web_fetch_tool, news_search_tool], prompt="""你是专业的研究分析师。你的职责是: 1. 使用搜索工具收集相关信息 2. 验证来源的可靠性和时效性 3. 提取关键事实和数据 4. 将发现记录到 shared_state 当你找到足够信息后,明确声明 "RESEARCH_COMPLETE" 并总结关键发现。 """ ) analysis_agent = create_react_agent( model=gpt4, tools=[python_tool, trend_analyzer, comparison_tool], prompt="""你是数据分析师。你的职责是: 1. 分析 Research Agent 提供的数据 2. 识别趋势、模式和异常 3. 进行统计计算和比较分析 4. 生成可操作的洞察 当你完成分析后,明确声明 "ANALYSIS_COMPLETE" 并提供关键洞察。 """ ) writing_agent = create_react_agent( model=gpt4, tools=[outline_generator, citation_formatter], prompt="""你是专业报告撰写人。你的职责是: 1. 基于研究发现和分析洞察生成结构化报告 2. 确保逻辑清晰、论证有力 3. 正确引用数据来源 4. 生成执行摘要 报告应包括:执行摘要、主要发现、详细分析、结论与建议。 """ ) def supervisor_node (state: ResearchState ): """ Supervisor 协调整个研究流程: 1. 评估当前状态 2. 决定下一步行动 3. 检查是否需要迭代 """ messages = state["messages" ] last_message = messages[-1 ].content if messages else "" if "REPORT_COMPLETE" in last_message and state["final_report" ]: return {"next_step" : END} if state.get("iteration_count" , 0 ) > 10 : return {"next_step" : "writer" } if "RESEARCH_COMPLETE" in last_message and not state["analysis_insights" ]: return {"next_step" : "analyst" , "iteration_count" : state.get("iteration_count" , 0 ) + 1 } elif "ANALYSIS_COMPLETE" in last_message and not state["final_report" ]: return {"next_step" : "writer" , "iteration_count" : state.get("iteration_count" , 0 ) + 1 } elif state["final_report" ]: if needs_improvement(state["final_report" ]): return {"next_step" : "researcher" , "iteration_count" : state.get("iteration_count" , 0 ) + 1 } return {"next_step" : END} else : return {"next_step" : "researcher" , "iteration_count" : state.get("iteration_count" , 0 ) + 1 } def needs_improvement (report: str ) -> bool : """评估报告质量,决定是否需要改进""" required_sections = ["执行摘要" , "主要发现" , "结论" ] return not all (section in report for section in required_sections) def research_node (state: ResearchState ): """Research Agent 执行节点""" query = state["messages" ][0 ].content if state["messages" ] else "" result = research_agent.invoke({ "messages" : [HumanMessage(content=f"研究主题: {query} " ) ] + list (state["messages" ]) }) findings = extract_findings(result["messages" ][-1 ].content) return { "messages" : result["messages" ], "research_findings" : findings } def analysis_node (state: ResearchState ): """Analysis Agent 执行节点""" findings_text = "\n" .join(state["research_findings" ]) result = analysis_agent.invoke({ "messages" : [HumanMessage(content=f"请分析以下研究发现:\n{findings_text} " )] }) insights = extract_insights(result["messages" ][-1 ].content) return { "messages" : result["messages" ], "analysis_insights" : insights } def writing_node (state: ResearchState ): """Writing Agent 执行节点""" findings = "\n" .join(state["research_findings" ]) insights = "\n" .join(state["analysis_insights" ]) prompt = f"""基于以下研究资料撰写完整报告: 研究发现: {findings} 分析洞察: {insights} 请生成结构化的研究报告,以 "REPORT_COMPLETE" 结尾。 """ result = writing_agent.invoke({ "messages" : [HumanMessage(content=prompt)] }) report = result["messages" ][-1 ].content return { "messages" : result["messages" ], "final_report" : report } builder = StateGraph(ResearchState) builder.add_node("supervisor" , supervisor_node) builder.add_node("researcher" , research_node) builder.add_node("analyst" , analysis_node) builder.add_node("writer" , writing_node) builder.add_conditional_edges( "supervisor" , lambda state: state["next_step" ], { "researcher" : "researcher" , "analyst" : "analyst" , "writer" : "writer" , END: END } ) builder.add_edge("researcher" , "supervisor" ) builder.add_edge("analyst" , "supervisor" ) builder.add_edge("writer" , "supervisor" ) builder.set_entry_point("supervisor" ) research_team_graph = builder.compile () async def run_research (query: str ): """执行研究任务""" initial_state = { "messages" : [HumanMessage(content=query)], "research_findings" : [], "analysis_insights" : [], "final_report" : "" , "iteration_count" : 0 } result = await research_team_graph.ainvoke(initial_state) return { "report" : result["final_report" ], "findings_count" : len (result["research_findings" ]), "iterations" : result["iteration_count" ] } if __name__ == "__main__" : import asyncio result = asyncio.run(run_research( "分析2024-2025年生成式AI在医疗诊断领域的应用趋势和市场规模" )) print (f"研究报告:\n{result['report' ][:500 ]} ..." ) print (f"\n共收集 {result['findings_count' ]} 项研究发现" ) print (f"迭代次数: {result['iterations' ]} " )
多智能体系统的调试与监控 多智能体系统的调试比单体系统复杂得多。以下是关键实践:
执行轨迹追踪 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 from langchain.callbacks.base import BaseCallbackHandlerfrom typing import Any class MultiAgentTracer (BaseCallbackHandler ): """多智能体执行追踪器""" def __init__ (self ): self .traces = [] self .current_agent = None def on_agent_action (self, action, **kwargs ): """记录 Agent 行动""" self .traces.append({ "type" : "action" , "agent" : self .current_agent, "action" : action.tool, "input" : action.tool_input, "timestamp" : datetime.now() }) def on_agent_finish (self, finish, **kwargs ): """记录 Agent 完成""" self .traces.append({ "type" : "finish" , "agent" : self .current_agent, "output" : finish.return_values, "timestamp" : datetime.now() }) def visualize_trace (self ): """可视化执行轨迹""" for trace in self .traces: if trace["type" ] == "action" : print (f"[{trace['timestamp' ]} ] {trace['agent' ]} -> {trace['action' ]} " ) else : print (f"[{trace['timestamp' ]} ] {trace['agent' ]} ✓" ) tracer = MultiAgentTracer() result = await graph.ainvoke( state, config={"callbacks" : [tracer]} ) tracer.visualize_trace()
状态可视化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def visualize_state (state: dict , title: str = "当前状态" ): """以树形结构可视化状态""" from rich.tree import Tree from rich import print as rprint tree = Tree(f"[bold]{title} [/bold]" ) for key, value in state.items(): if isinstance (value, list ): node = tree.add(f"[cyan]{key} [/cyan]: list[{len (value)} ]" ) for i, item in enumerate (value[:3 ]): node.add(f"[{i} ]: {str (item)[:50 ]} ..." ) if len (value) > 3 : node.add(f"... and {len (value) - 3 } more" ) elif isinstance (value, dict ): node = tree.add(f"[cyan]{key} [/cyan]: dict" ) for k, v in value.items(): node.add(f"{k} : {str (v)[:50 ]} " ) else : tree.add(f"[cyan]{key} [/cyan]: {str (value)[:100 ]} " ) rprint(tree)
性能监控 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 from dataclasses import dataclassfrom typing import Dict import time@dataclass class AgentMetrics : calls: int = 0 total_latency: float = 0.0 errors: int = 0 token_usage: int = 0 class MultiAgentMonitor : """多智能体性能监控""" def __init__ (self ): self .metrics: Dict [str , AgentMetrics] = {} def record_call (self, agent_name: str , latency: float , tokens: int , error: bool = False ): """记录一次调用""" if agent_name not in self .metrics: self .metrics[agent_name] = AgentMetrics() metric = self .metrics[agent_name] metric.calls += 1 metric.total_latency += latency metric.token_usage += tokens if error: metric.errors += 1 def get_report (self ) -> str : """生成监控报告""" report = ["# 多智能体系统性能报告\n" ] for agent, metric in self .metrics.items(): avg_latency = metric.total_latency / metric.calls if metric.calls > 0 else 0 error_rate = metric.errors / metric.calls if metric.calls > 0 else 0 report.append(f"## {agent} " ) report.append(f"- 调用次数: {metric.calls} " ) report.append(f"- 平均延迟: {avg_latency:.2 f} s" ) report.append(f"- 错误率: {error_rate:.2 %} " ) report.append(f"- Token 消耗: {metric.token_usage} " ) report.append("" ) return "\n" .join(report)
与相关技术的关系 多智能体系统不是孤立的技术,它与多个领域密切相关:
多智能体 vs 可解释性 当多个 Agent 协作时,每个 Agent 的决策都需要可追踪。上午文章讨论的可解释性设计,在多智能体场景下变得更加重要——不仅要解释单个 Agent 的行为,还要解释 Agent 之间的协调逻辑。
多智能体 vs 状态管理 共享状态是多智能体协作的基础,但过度共享会导致上下文爆炸,隔离过度则会导致信息孤岛。找到合适的平衡点需要精心设计状态结构。
多智能体 vs 人机协作 多智能体系统的价值最终要通过人机协作体现。清晰的 Agent 分工和通信协议,使人类能够更有效地监督和介入。
选择合适架构的决策框架 面对具体问题时,如何选择多智能体架构?以下决策树可以帮助:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 任务复杂度评估 ├── 简单任务(单步完成) │ └── 使用单体 Agent ├── 中等复杂度(多步但线性) │ ├── 需要紧密协作? │ │ ├── 是 → 协作模式 │ │ └── 否 → 监督者模式 └── 高复杂度(多层嵌套) └── 层级团队模式 团队规模评估 ├── 2-3 个 Agent │ └── 任意模式均可 ├── 4-6 个 Agent │ └── 监督者或层级模式 └── 6+ 个 Agent └── 层级模式(必须分组)
生产环境最佳实践 1. 渐进式采用 不要一开始就构建完整的多智能体系统。建议路径:
单体原型 :先用单个 Agent 验证核心逻辑
功能拆分 :识别出可以独立测试的功能模块
逐步并行 :先让多个 Agent 并行工作,再引入协调
添加监督 :最后引入 Supervisor 进行流程控制
2. 容错设计 1 2 3 4 5 6 7 8 9 10 11 12 13 14 from tenacity import retry, stop_after_attempt, wait_exponential@retry( stop=stop_after_attempt(3 ), wait=wait_exponential(multiplier=1 , min =4 , max =10 ) )def invoke_with_fallback (agent, state, fallback_agent=None ): """带重试和回退的 Agent 调用""" try : return agent.invoke(state) except Exception as e: if fallback_agent: return fallback_agent.invoke(state) raise
3. 成本优化 1 2 3 4 5 6 7 8 def smart_agent_selection (task_complexity: str ) -> str : """根据任务复杂度选择合适的模型""" model_map = { "simple" : "gpt-3.5-turbo" , "medium" : "gpt-4-turbo-preview" , "complex" : "gpt-4" } return model_map.get(task_complexity, "gpt-4" )
4. 版本控制 1 2 3 4 5 6 7 8 9 10 11 AGENT_VERSIONS = { "researcher" : "v2.1.0" , "analyst" : "v1.5.2" , "writer" : "v2.0.1" } def get_agent (agent_name: str ): """获取指定版本的 Agent""" version = AGENT_VERSIONS.get(agent_name, "latest" ) return agent_registry.load(agent_name, version)
总结 多智能体架构代表了 AI 系统设计的重要演进。从单体到分布式,不仅是技术复杂度的增加,更是设计思维的根本转变:
单体思维 :如何让一个 Agent 变得更强大?多智能体思维 :如何让多个 Agent 高效协作?
LangGraph 的状态图架构为多智能体系统提供了坚实的基础。三种核心模式——协作式、监督者式、层级式——覆盖了从简单协作到复杂企业级应用的各种场景。
生产级多智能体系统的构建是一个迭代过程:
设计阶段 :明确定义 Agent 边界和通信协议
原型阶段 :快速验证核心协作逻辑
优化阶段 :添加监控、容错、成本优化
运营阶段 :基于反馈持续改进 Agent 能力
最终目标不是构建完美的多智能体系统,而是构建一个可进化 的系统——每个 Agent 可以独立改进,协作模式可以灵活调整,新 Agent 可以无缝加入。
这才是分布式智能的真正力量。