Agent 可观测性架构:从黑箱到白盒的监控革命
核心洞察 :当你的 Agent 在凌晨 3 点做出了一个 “莫名其妙” 的决策,你能否在 5 分钟内定位到问题根因?如果不能,说明你的可观测性架构还有巨大改进空间。
一、问题的本质:为什么 Agent 比微服务更难监控? 传统的软件可观测性三板斧——日志(Logs)、指标(Metrics)、追踪(Traces) ,在 Agent 时代遇到了根本性的挑战。
1.1 决策路径的不确定性 微服务的调用链是确定的:A → B → C,出问题就在这三个节点里找。
Agent 的决策路径是动态生成的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def process_order (order_id ): user = get_user(order_id) inventory = check_stock(order_id) result = create_payment(user, inventory) return result def agent_handle_request (query ): response = llm.decide([ "分析用户意图" , "可能调用: search_db / call_api / compute / human_help" , "可能调用 1 次,也可能循环 10 次" ]) return response
1.2 状态爆炸 一个复杂的 Agent 系统可能包含:
多轮对话状态 :上下文窗口的压缩/丢弃策略
工具调用历史 :哪些工具被调用了、参数是什么、结果如何
推理中间状态 :CoT(思维链)的中间步骤
记忆检索结果 :RAG 召回的文档片段
人机协作断点 :哪些地方需要人类介入
这些状态交织在一起,构成了传统监控工具无法处理的复杂度。
1.3 延迟归因困难 当用户投诉 “你们的 Agent 太慢了”,你需要回答:
是 LLM API 延迟高?
是某个工具执行慢?
是循环次数过多?
是上下文太长导致处理慢?
没有精细的追踪,这些问题几乎无法定位。
二、LangSmith 的可观测性原语 LangSmith 提供了一套专门针对 LLM 应用的可观测性抽象,理解这些原语是构建 Agent 监控体系的基础。
2.1 Run(运行单元) Run 是最小的执行单元,对应一次独立的计算操作:
一次 LLM 调用
一个工具函数的执行
一个 prompt 格式化操作
一个 Runnable lambda
1 2 3 4 5 6 7 8 9 10 11 from langsmith import traceable@traceable(run_type="llm" ) def call_llm (prompt: str ) -> str : """这个装饰器会自动创建一个 Run""" return llm.invoke(prompt) @traceable(run_type="tool" ) def search_database (query: str ) -> list : """工具调用也会被追踪""" return db.search(query)
每个 Run 包含:
输入参数
输出结果
开始/结束时间戳
延迟
Token 使用量
元数据(模型版本、温度参数等)
2.2 Trace(追踪链) Trace 是同一请求的所有 Run 的集合,构成完整的调用链:
1 2 3 4 5 6 Trace: 用户查询 "查一下北京明天天气" ├── Run 1: 意图识别 LLM 调用 (200ms) ├── Run 2: 提取城市名称工具 (10ms) ├── Run 3: 天气 API 调用 (500ms) ├── Run 4: 格式化回复 LLM 调用 (300ms) └── Run 5: 输出后处理 (5ms)
关键特性:所有 Runs 通过唯一的 trace_id 关联 。
2.3 Thread(会话线程) Thread 是多轮对话的追踪集合,将连续的交互串联起来:
1 2 3 4 5 6 7 8 9 10 11 12 thread_id = "user_123_session_456" response1 = agent.invoke( {"messages" : [user_msg1]}, config={"metadata" : {"session_id" : thread_id}} ) response2 = agent.invoke( {"messages" : [user_msg1, ai_msg1, user_msg2]}, config={"metadata" : {"session_id" : thread_id}} )
在 LangSmith UI 中,你可以看到完整的对话历史,这对调试多轮交互问题至关重要。
2.4 Feedback(反馈) Feedback 允许对单个 Run 进行评分:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from langsmith import Clientclient = Client() client.create_feedback( run_id="run_abc123" , key="user_rating" , score=1 , comment="回答准确,很有帮助" ) client.create_feedback( run_id="run_def456" , key="hallucination_check" , score=0 , comment="引用了不存在的论文" )
反馈类型:
连续值 :0-1 的相关性评分
离散值 :👍/👎、通过/失败
标签 :分类标签(如 “安全”、”需要人工审核”)
三、LangGraph 的 Agent 模式与监控策略 LangGraph 定义了几种核心的 Agent 模式,每种模式需要不同的监控重点。
3.1 Prompt Chaining(提示链) 模式特点 :LLM 调用按顺序执行,前一个输出作为后一个输入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from langgraph.graph import StateGraphclass State (TypedDict ): user_query: str search_query: str search_results: list final_answer: str def generate_search (state: State ): structured_llm = llm.with_structured_output(SearchQuery) result = structured_llm.invoke(state["user_query" ]) return {"search_query" : result.search_query} def execute_search (state: State ): results = search_engine.search(state["search_query" ]) return {"search_results" : results} def generate_answer (state: State ): answer = llm.invoke(f"基于以下结果回答问题:{state['search_results' ]} " ) return {"final_answer" : answer} graph = StateGraph(State) graph.add_node("generate_query" , generate_search) graph.add_node("search" , execute_search) graph.add_node("answer" , generate_answer) graph.add_edge("generate_query" , "search" ) graph.add_edge("search" , "answer" )
监控重点 :
每个节点的输入/输出转换
中间状态的合理性检查
链式延迟分解
3.2 Parallelization(并行化) 模式特点 :多个 LLM 调用同时执行,最后合并结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 from langgraph.graph import StateGraph, ENDfrom typing import Annotatedimport operatorclass State (TypedDict ): document: str keyword_score: float format_score: float citation_score: float final_score: float def eval_keywords (state: State ): score = llm.invoke(f"评分关键词密度: {state['document' ]} " ) return {"keyword_score" : float (score)} def eval_format (state: State ): score = llm.invoke(f"评分格式规范: {state['document' ]} " ) return {"format_score" : float (score)} def eval_citations (state: State ): score = llm.invoke(f"评分引用质量: {state['document' ]} " ) return {"citation_score" : float (score)} def aggregate_scores (state: State ): final = (state["keyword_score" ] + state["format_score" ] + state["citation_score" ]) / 3 return {"final_score" : final} graph = StateGraph(State) graph.add_node("keywords" , eval_keywords) graph.add_node("format" , eval_format) graph.add_node("citations" , eval_citations) graph.add_node("aggregate" , aggregate_scores) graph.add_edge("__start__" , "keywords" ) graph.add_edge("__start__" , "format" ) graph.add_edge("__start__" , "citations" ) graph.add_edge("keywords" , "aggregate" ) graph.add_edge("format" , "aggregate" ) graph.add_edge("citations" , "aggregate" )
监控重点 :
并行节点中是否有异常慢的节点
各节点评分的一致性问题
最终聚合结果的合理性
3.3 Orchestrator-Worker(协调器-工作器) 模式特点 :中央协调器动态分解任务,分发给工作器执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 from langgraph.types import Sendclass State (TypedDict ): topic: str sections: list [Section] completed_sections: Annotated[list , operator.add] final_report: str class WorkerState (TypedDict ): section: Section completed_sections: Annotated[list , operator.add] def orchestrator (state: State ): """协调器:生成报告章节计划""" report_sections = planner.invoke([ SystemMessage(content="Generate a plan for the report." ), HumanMessage(content=f"Topic: {state['topic' ]} " ) ]) return {"sections" : report_sections.sections} def worker (state: WorkerState ): """工作器:执行单个章节的写作""" section_content = writer.invoke( f"Write section: {state['section' ].title} " ) return {"completed_sections" : [section_content]} def synthesizer (state: State ): """汇总器:合并所有章节""" final = synthesize(state["completed_sections" ]) return {"final_report" : final} graph = StateGraph(State) graph.add_node("orchestrator" , orchestrator) graph.add_node("worker" , worker) graph.add_node("synthesizer" , synthesizer) def assign_workers (state: State ): return [Send("worker" , {"section" : s}) for s in state["sections" ]] graph.add_conditional_edges("orchestrator" , assign_workers) graph.add_edge("worker" , "synthesizer" )
监控重点 :
协调器生成的任务分解质量
工作器的并行执行效率
各工作器输出的一致性
四、构建生产级可观测性体系 4.1 三层监控架构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ┌─────────────────────────────────────────────────────────┐ │ 业务层监控 │ │ • 用户满意度评分 • 任务完成率 • 人工介入频率 │ └─────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────┐ │ Agent 层监控 │ │ • 决策路径追踪 • 工具调用成功率 • 循环次数 │ │ • Token 消耗趋势 • 延迟 P99/P95 │ └─────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────┐ │ 基础设施层监控 │ │ • LLM API 可用性 • 向量数据库延迟 • 缓存命中率 │ └─────────────────────────────────────────────────────────┘
4.2 关键监控指标 延迟指标
指标
说明
告警阈值建议
llm_latency_p99
LLM 调用延迟
> 5s
tool_latency_p99
工具执行延迟
> 2s
total_request_time
完整请求耗时
> 10s
time_to_first_token
首 Token 时间
> 500ms
质量指标
指标
说明
采集方式
hallucination_rate
幻觉率
自动评估
tool_error_rate
工具调用失败率
运行时统计
loop_exit_rate
异常循环退出率
追踪分析
human_escalation_rate
人工升级率
业务统计
成本指标
指标
说明
计算方式
tokens_per_request
单次请求 Token 数
usage.total_tokens
cost_per_session
单会话成本
模型单价 × Token 数
cache_hit_rate
缓存命中率
cache_tokens / total_tokens
4.3 实现代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 from langsmith import Client, traceablefrom langchain.callbacks import LangChainTracerimport timefrom functools import wrapsimport statisticsclass AgentObservability : """生产级 Agent 可观测性工具""" def __init__ (self, project_name: str ): self .client = Client() self .project_name = project_name self .tracer = LangChainTracer( project_name=project_name ) self .metrics = { 'latencies' : [], 'token_counts' : [], 'error_count' : 0 } @traceable(run_type="chain" ) def trace_agent_execution (self, agent_input: dict ) -> dict : """ 包装 Agent 执行,自动追踪完整调用链 """ start_time = time.time() try : result = self .agent.invoke( agent_input, config={"callbacks" : [self .tracer]} ) latency = time.time() - start_time self .metrics['latencies' ].append(latency) if 'usage' in result: self .metrics['token_counts' ].append( result['usage' ]['total_tokens' ] ) return result except Exception as e: self .metrics['error_count' ] += 1 self .client.create_run( name="agent_error" , run_type="chain" , error=str (e), inputs=agent_input ) raise def add_feedback (self, run_id: str , score: float , comment: str = None ): """添加人工反馈""" self .client.create_feedback( run_id=run_id, key="human_rating" , score=score, comment=comment ) def get_performance_report (self ) -> dict : """生成性能报告""" if not self .metrics['latencies' ]: return {"error" : "No data collected" } latencies = sorted (self .metrics['latencies' ]) n = len (latencies) return { "latency" : { "p50" : latencies[n // 2 ], "p95" : latencies[int (n * 0.95 )], "p99" : latencies[int (n * 0.99 )], "avg" : statistics.mean(latencies) }, "tokens" : { "avg" : statistics.mean(self .metrics['token_counts' ]), "max" : max (self .metrics['token_counts' ]) }, "error_rate" : self .metrics['error_count' ] / n, "total_requests" : n } observability = AgentObservability(project_name="production-agent" ) result = observability.trace_agent_execution({ "messages" : [{"role" : "user" , "content" : "帮我分析一下 Q3 财报" }] }) observability.add_feedback( run_id=result['run_id' ], score=0.8 , comment="回答准确,但希望能更详细" ) print (observability.get_performance_report())
五、OpenAI Reasoning 模型的可观测性 随着 GPT-5 等 reasoning 模型的出现,可观测性面临新的挑战。
5.1 Reasoning Token 的管理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from openai import OpenAIclient = OpenAI() response = client.responses.create( model="gpt-5" , reasoning={"effort" : "medium" }, input =[{"role" : "user" , "content" : "复杂推理任务" }], max_output_tokens=300 ) print (response.usage)
关键洞察 :
Reasoning tokens 虽然不可见,但占用上下文空间并计费
建议预留至少 25,000 tokens 给 reasoning 和输出
如果达到限制,会得到 status: "incomplete"
5.2 推理过程的可视化 虽然 reasoning tokens 不可见,但可以通过以下方式推断:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def monitor_reasoning_effort (response ): """监控推理努力度""" usage = response.usage if 'reasoning_tokens' in usage.output_tokens_details: reasoning = usage.output_tokens_details.reasoning_tokens total = usage.output_tokens ratio = reasoning / total if ratio > 0.9 : print (f"⚠️ Warning: High reasoning ratio ({ratio:.2 %} )" ) print ("Consider: 1) Breaking task into smaller steps" ) print (" 2) Using lower reasoning effort" ) print (" 3) Providing more context" ) return { "reasoning_tokens" : reasoning, "output_tokens" : total - reasoning, "reasoning_ratio" : ratio }
六、从可观测性到可解释性 可观测性回答 “发生了什么”,可解释性回答 “为什么发生”。
6.1 决策路径回溯 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 def explain_decision (trace_id: str , client: Client ): """ 生成决策解释报告 """ runs = client.list_runs(trace_id=trace_id) explanation = [] for run in runs: if run.run_type == "llm" : explanation.append({ "step" : run.name, "input_preview" : run.inputs.get("messages" , [{}])[0 ].get("content" , "" )[:100 ], "output_preview" : str (run.outputs)[:100 ], "latency_ms" : (run.end_time - run.start_time).total_seconds() * 1000 , "tokens" : run.outputs.get("usage" , {}).get("total_tokens" , 0 ) }) elif run.run_type == "tool" : explanation.append({ "step" : f"工具调用: {run.name} " , "tool_input" : run.inputs, "tool_output" : str (run.outputs)[:200 ], "success" : run.error is None }) return explanation
6.2 根因分析 Checklist 当 Agent 行为异常时,按以下顺序排查:
Level 1: 输入问题
Level 2: 模型问题
Level 3: 工具问题
Level 4: 架构问题
七、最佳实践总结 7.1 黄金法则
所有 LLM 调用必须可被追踪 ——没有例外
关键决策点必须记录理由 ——不仅是结果
每个 Trace 必须可关联到用户/会话 ——便于问题定位
生产环境必须有实时监控 ——延迟 P95、错误率、成本
7.2 反模式清单 ❌ 黑盒 Agent :没有任何追踪,出了问题只能猜 ❌ 过度追踪 :追踪了所有细节但没人看 ❌ 事后诸葛亮 :出了问题才加日志 ❌ 忽视反馈 :没有机制收集用户评价
7.3 演进路线图 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Phase 1 (当前): 基础追踪 ├── 所有 Run 自动上报 LangSmith └── 关键指标 Dashboard Phase 2 (1-2 个月): 智能告警 ├── 异常延迟自动检测 ├── 幻觉率实时告警 └── 成本异常预警 Phase 3 (3-6 个月): 主动干预 ├── 自动回滚异常版本 ├── 动态降级策略 └── A/B 测试框架 Phase 4 (6-12 个月): 自我优化 ├── 基于反馈的 Prompt 自动优化 ├── 工具选择策略学习 └── 自适应推理努力度
结语 Agent 可观测性不是 “锦上添花”,而是生产部署的 前置条件 。当你的 Agent 开始服务真实用户时,你会发现:80% 的工程时间花在调试和优化上,而良好的可观测性架构能让这个过程效率提升 10 倍。
从黑箱到白盒,从被动救火到主动预防,这是每一个 Agent 工程团队的必经之路。
文章数据基于 LangSmith、LangGraph 官方文档及 OpenAI API 文档整理 最后更新:2026-02-18