Agent 可观测性架构:从黑箱到白盒的监控革命

核心洞察:当你的 Agent 在凌晨 3 点做出了一个 “莫名其妙” 的决策,你能否在 5 分钟内定位到问题根因?如果不能,说明你的可观测性架构还有巨大改进空间。


一、问题的本质:为什么 Agent 比微服务更难监控?

传统的软件可观测性三板斧——日志(Logs)、指标(Metrics)、追踪(Traces),在 Agent 时代遇到了根本性的挑战。

1.1 决策路径的不确定性

微服务的调用链是确定的:A → B → C,出问题就在这三个节点里找。

Agent 的决策路径是动态生成的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 传统微服务 - 确定性路径
def process_order(order_id):
user = get_user(order_id) # 固定调用
inventory = check_stock(order_id) # 固定调用
result = create_payment(user, inventory) # 固定调用
return result

# Agent - 动态决策路径
def agent_handle_request(query):
# LLM 自己决定调用哪些工具、调用顺序、调用次数
response = llm.decide([
"分析用户意图",
"可能调用: search_db / call_api / compute / human_help",
"可能调用 1 次,也可能循环 10 次"
])
return response

1.2 状态爆炸

一个复杂的 Agent 系统可能包含:

  • 多轮对话状态:上下文窗口的压缩/丢弃策略
  • 工具调用历史:哪些工具被调用了、参数是什么、结果如何
  • 推理中间状态:CoT(思维链)的中间步骤
  • 记忆检索结果:RAG 召回的文档片段
  • 人机协作断点:哪些地方需要人类介入

这些状态交织在一起,构成了传统监控工具无法处理的复杂度。

1.3 延迟归因困难

当用户投诉 “你们的 Agent 太慢了”,你需要回答:

  • 是 LLM API 延迟高?
  • 是某个工具执行慢?
  • 是循环次数过多?
  • 是上下文太长导致处理慢?

没有精细的追踪,这些问题几乎无法定位。


二、LangSmith 的可观测性原语

LangSmith 提供了一套专门针对 LLM 应用的可观测性抽象,理解这些原语是构建 Agent 监控体系的基础。

2.1 Run(运行单元)

Run 是最小的执行单元,对应一次独立的计算操作:

  • 一次 LLM 调用
  • 一个工具函数的执行
  • 一个 prompt 格式化操作
  • 一个 Runnable lambda
1
2
3
4
5
6
7
8
9
10
11
from langsmith import traceable

@traceable(run_type="llm")
def call_llm(prompt: str) -> str:
"""这个装饰器会自动创建一个 Run"""
return llm.invoke(prompt)

@traceable(run_type="tool")
def search_database(query: str) -> list:
"""工具调用也会被追踪"""
return db.search(query)

每个 Run 包含:

  • 输入参数
  • 输出结果
  • 开始/结束时间戳
  • 延迟
  • Token 使用量
  • 元数据(模型版本、温度参数等)

2.2 Trace(追踪链)

Trace 是同一请求的所有 Run 的集合,构成完整的调用链:

1
2
3
4
5
6
Trace: 用户查询 "查一下北京明天天气"
├── Run 1: 意图识别 LLM 调用 (200ms)
├── Run 2: 提取城市名称工具 (10ms)
├── Run 3: 天气 API 调用 (500ms)
├── Run 4: 格式化回复 LLM 调用 (300ms)
└── Run 5: 输出后处理 (5ms)

关键特性:所有 Runs 通过唯一的 trace_id 关联

2.3 Thread(会话线程)

Thread 是多轮对话的追踪集合,将连续的交互串联起来:

1
2
3
4
5
6
7
8
9
10
11
12
# 通过 metadata 关联同一个会话
thread_id = "user_123_session_456"

response1 = agent.invoke(
{"messages": [user_msg1]},
config={"metadata": {"session_id": thread_id}}
)

response2 = agent.invoke(
{"messages": [user_msg1, ai_msg1, user_msg2]},
config={"metadata": {"session_id": thread_id}}
)

在 LangSmith UI 中,你可以看到完整的对话历史,这对调试多轮交互问题至关重要。

2.4 Feedback(反馈)

Feedback 允许对单个 Run 进行评分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from langsmith import Client

client = Client()

# 手动反馈
client.create_feedback(
run_id="run_abc123",
key="user_rating",
score=1, # 👍
comment="回答准确,很有帮助"
)

# 自动评估反馈
client.create_feedback(
run_id="run_def456",
key="hallucination_check",
score=0, # 检测到幻觉
comment="引用了不存在的论文"
)

反馈类型:

  • 连续值:0-1 的相关性评分
  • 离散值:👍/👎、通过/失败
  • 标签:分类标签(如 “安全”、”需要人工审核”)

三、LangGraph 的 Agent 模式与监控策略

LangGraph 定义了几种核心的 Agent 模式,每种模式需要不同的监控重点。

3.1 Prompt Chaining(提示链)

模式特点:LLM 调用按顺序执行,前一个输出作为后一个输入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from langgraph.graph import StateGraph

class State(TypedDict):
user_query: str
search_query: str
search_results: list
final_answer: str

# 节点 1:生成搜索查询
def generate_search(state: State):
structured_llm = llm.with_structured_output(SearchQuery)
result = structured_llm.invoke(state["user_query"])
return {"search_query": result.search_query}

# 节点 2:执行搜索
def execute_search(state: State):
results = search_engine.search(state["search_query"])
return {"search_results": results}

# 节点 3:生成最终答案
def generate_answer(state: State):
answer = llm.invoke(f"基于以下结果回答问题:{state['search_results']}")
return {"final_answer": answer}

graph = StateGraph(State)
graph.add_node("generate_query", generate_search)
graph.add_node("search", execute_search)
graph.add_node("answer", generate_answer)
graph.add_edge("generate_query", "search")
graph.add_edge("search", "answer")

监控重点

  • 每个节点的输入/输出转换
  • 中间状态的合理性检查
  • 链式延迟分解

3.2 Parallelization(并行化)

模式特点:多个 LLM 调用同时执行,最后合并结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from langgraph.graph import StateGraph, END
from typing import Annotated
import operator

class State(TypedDict):
document: str
keyword_score: float
format_score: float
citation_score: float
final_score: float

# 并行评估节点
def eval_keywords(state: State):
score = llm.invoke(f"评分关键词密度: {state['document']}")
return {"keyword_score": float(score)}

def eval_format(state: State):
score = llm.invoke(f"评分格式规范: {state['document']}")
return {"format_score": float(score)}

def eval_citations(state: State):
score = llm.invoke(f"评分引用质量: {state['document']}")
return {"citation_score": float(score)}

# 汇总节点
def aggregate_scores(state: State):
final = (state["keyword_score"] +
state["format_score"] +
state["citation_score"]) / 3
return {"final_score": final}

graph = StateGraph(State)
graph.add_node("keywords", eval_keywords)
graph.add_node("format", eval_format)
graph.add_node("citations", eval_citations)
graph.add_node("aggregate", aggregate_scores)

# 并行执行
graph.add_edge("__start__", "keywords")
graph.add_edge("__start__", "format")
graph.add_edge("__start__", "citations")
graph.add_edge("keywords", "aggregate")
graph.add_edge("format", "aggregate")
graph.add_edge("citations", "aggregate")

监控重点

  • 并行节点中是否有异常慢的节点
  • 各节点评分的一致性问题
  • 最终聚合结果的合理性

3.3 Orchestrator-Worker(协调器-工作器)

模式特点:中央协调器动态分解任务,分发给工作器执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from langgraph.types import Send

class State(TypedDict):
topic: str
sections: list[Section]
completed_sections: Annotated[list, operator.add]
final_report: str

class WorkerState(TypedDict):
section: Section
completed_sections: Annotated[list, operator.add]

def orchestrator(state: State):
"""协调器:生成报告章节计划"""
report_sections = planner.invoke([
SystemMessage(content="Generate a plan for the report."),
HumanMessage(content=f"Topic: {state['topic']}")
])
return {"sections": report_sections.sections}

def worker(state: WorkerState):
"""工作器:执行单个章节的写作"""
section_content = writer.invoke(
f"Write section: {state['section'].title}"
)
return {"completed_sections": [section_content]}

def synthesizer(state: State):
"""汇总器:合并所有章节"""
final = synthesize(state["completed_sections"])
return {"final_report": final}

# 使用 Send API 动态创建工作器
graph = StateGraph(State)
graph.add_node("orchestrator", orchestrator)
graph.add_node("worker", worker)
graph.add_node("synthesizer", synthesizer)

# 关键:动态路由到多个工作器
def assign_workers(state: State):
return [Send("worker", {"section": s}) for s in state["sections"]]

graph.add_conditional_edges("orchestrator", assign_workers)
graph.add_edge("worker", "synthesizer")

监控重点

  • 协调器生成的任务分解质量
  • 工作器的并行执行效率
  • 各工作器输出的一致性

四、构建生产级可观测性体系

4.1 三层监控架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
┌─────────────────────────────────────────────────────────┐
│ 业务层监控 │
│ • 用户满意度评分 • 任务完成率 • 人工介入频率 │
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│ Agent 层监控 │
│ • 决策路径追踪 • 工具调用成功率 • 循环次数 │
│ • Token 消耗趋势 • 延迟 P99/P95 │
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│ 基础设施层监控 │
│ • LLM API 可用性 • 向量数据库延迟 • 缓存命中率 │
└─────────────────────────────────────────────────────────┘

4.2 关键监控指标

延迟指标

指标 说明 告警阈值建议
llm_latency_p99 LLM 调用延迟 > 5s
tool_latency_p99 工具执行延迟 > 2s
total_request_time 完整请求耗时 > 10s
time_to_first_token 首 Token 时间 > 500ms

质量指标

指标 说明 采集方式
hallucination_rate 幻觉率 自动评估
tool_error_rate 工具调用失败率 运行时统计
loop_exit_rate 异常循环退出率 追踪分析
human_escalation_rate 人工升级率 业务统计

成本指标

指标 说明 计算方式
tokens_per_request 单次请求 Token 数 usage.total_tokens
cost_per_session 单会话成本 模型单价 × Token 数
cache_hit_rate 缓存命中率 cache_tokens / total_tokens

4.3 实现代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from langsmith import Client, traceable
from langchain.callbacks import LangChainTracer
import time
from functools import wraps
import statistics

class AgentObservability:
"""生产级 Agent 可观测性工具"""

def __init__(self, project_name: str):
self.client = Client()
self.project_name = project_name
self.tracer = LangChainTracer(
project_name=project_name
)
self.metrics = {
'latencies': [],
'token_counts': [],
'error_count': 0
}

@traceable(run_type="chain")
def trace_agent_execution(self, agent_input: dict) -> dict:
"""
包装 Agent 执行,自动追踪完整调用链
"""
start_time = time.time()

try:
# 执行 Agent
result = self.agent.invoke(
agent_input,
config={"callbacks": [self.tracer]}
)

# 计算延迟
latency = time.time() - start_time
self.metrics['latencies'].append(latency)

# 提取 Token 使用
if 'usage' in result:
self.metrics['token_counts'].append(
result['usage']['total_tokens']
)

return result

except Exception as e:
self.metrics['error_count'] += 1
# 记录错误详情到 LangSmith
self.client.create_run(
name="agent_error",
run_type="chain",
error=str(e),
inputs=agent_input
)
raise

def add_feedback(self, run_id: str, score: float, comment: str = None):
"""添加人工反馈"""
self.client.create_feedback(
run_id=run_id,
key="human_rating",
score=score,
comment=comment
)

def get_performance_report(self) -> dict:
"""生成性能报告"""
if not self.metrics['latencies']:
return {"error": "No data collected"}

latencies = sorted(self.metrics['latencies'])
n = len(latencies)

return {
"latency": {
"p50": latencies[n // 2],
"p95": latencies[int(n * 0.95)],
"p99": latencies[int(n * 0.99)],
"avg": statistics.mean(latencies)
},
"tokens": {
"avg": statistics.mean(self.metrics['token_counts']),
"max": max(self.metrics['token_counts'])
},
"error_rate": self.metrics['error_count'] / n,
"total_requests": n
}

# 使用示例
observability = AgentObservability(project_name="production-agent")

# 包装你的 Agent
result = observability.trace_agent_execution({
"messages": [{"role": "user", "content": "帮我分析一下 Q3 财报"}]
})

# 用户反馈
observability.add_feedback(
run_id=result['run_id'],
score=0.8,
comment="回答准确,但希望能更详细"
)

# 查看性能报告
print(observability.get_performance_report())

五、OpenAI Reasoning 模型的可观测性

随着 GPT-5 等 reasoning 模型的出现,可观测性面临新的挑战。

5.1 Reasoning Token 的管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from openai import OpenAI

client = OpenAI()

response = client.responses.create(
model="gpt-5",
reasoning={"effort": "medium"}, # low/medium/high
input=[{"role": "user", "content": "复杂推理任务"}],
max_output_tokens=300 # 需要预留 reasoning token 空间
)

# 检查 usage
print(response.usage)
# {
# "input_tokens": 75,
# "output_tokens": 1186,
# "output_tokens_details": {
# "reasoning_tokens": 1024 # 隐藏的思考过程
# }
# }

关键洞察

  • Reasoning tokens 虽然不可见,但占用上下文空间并计费
  • 建议预留至少 25,000 tokens 给 reasoning 和输出
  • 如果达到限制,会得到 status: "incomplete"

5.2 推理过程的可视化

虽然 reasoning tokens 不可见,但可以通过以下方式推断:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def monitor_reasoning_effort(response):
"""监控推理努力度"""
usage = response.usage

if 'reasoning_tokens' in usage.output_tokens_details:
reasoning = usage.output_tokens_details.reasoning_tokens
total = usage.output_tokens
ratio = reasoning / total

# 告警:推理占比过高可能表示任务过于复杂
if ratio > 0.9:
print(f"⚠️ Warning: High reasoning ratio ({ratio:.2%})")
print("Consider: 1) Breaking task into smaller steps")
print(" 2) Using lower reasoning effort")
print(" 3) Providing more context")

return {
"reasoning_tokens": reasoning,
"output_tokens": total - reasoning,
"reasoning_ratio": ratio
}

六、从可观测性到可解释性

可观测性回答 “发生了什么”,可解释性回答 “为什么发生”。

6.1 决策路径回溯

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def explain_decision(trace_id: str, client: Client):
"""
生成决策解释报告
"""
# 获取完整追踪
runs = client.list_runs(trace_id=trace_id)

explanation = []
for run in runs:
if run.run_type == "llm":
explanation.append({
"step": run.name,
"input_preview": run.inputs.get("messages", [{}])[0].get("content", "")[:100],
"output_preview": str(run.outputs)[:100],
"latency_ms": (run.end_time - run.start_time).total_seconds() * 1000,
"tokens": run.outputs.get("usage", {}).get("total_tokens", 0)
})
elif run.run_type == "tool":
explanation.append({
"step": f"工具调用: {run.name}",
"tool_input": run.inputs,
"tool_output": str(run.outputs)[:200],
"success": run.error is None
})

return explanation

6.2 根因分析 Checklist

当 Agent 行为异常时,按以下顺序排查:

Level 1: 输入问题

  • 用户查询是否清晰?
  • 上下文是否完整?
  • 是否有歧义或歧义消解失败?

Level 2: 模型问题

  • Temperature 设置是否合适?
  • 模型版本是否发生变化?
  • Prompt 模板是否被修改?

Level 3: 工具问题

  • 工具返回是否正确?
  • 工具延迟是否过高?
  • 工具参数是否正确传递?

Level 4: 架构问题

  • 循环退出条件是否合理?
  • 状态管理是否有竞态条件?
  • 内存/上下文是否溢出?

七、最佳实践总结

7.1 黄金法则

  1. 所有 LLM 调用必须可被追踪——没有例外
  2. 关键决策点必须记录理由——不仅是结果
  3. 每个 Trace 必须可关联到用户/会话——便于问题定位
  4. 生产环境必须有实时监控——延迟 P95、错误率、成本

7.2 反模式清单

黑盒 Agent:没有任何追踪,出了问题只能猜
过度追踪:追踪了所有细节但没人看
事后诸葛亮:出了问题才加日志
忽视反馈:没有机制收集用户评价

7.3 演进路线图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Phase 1 (当前): 基础追踪
├── 所有 Run 自动上报 LangSmith
└── 关键指标 Dashboard

Phase 2 (1-2 个月): 智能告警
├── 异常延迟自动检测
├── 幻觉率实时告警
└── 成本异常预警

Phase 3 (3-6 个月): 主动干预
├── 自动回滚异常版本
├── 动态降级策略
└── A/B 测试框架

Phase 4 (6-12 个月): 自我优化
├── 基于反馈的 Prompt 自动优化
├── 工具选择策略学习
└── 自适应推理努力度

结语

Agent 可观测性不是 “锦上添花”,而是生产部署的 前置条件。当你的 Agent 开始服务真实用户时,你会发现:80% 的工程时间花在调试和优化上,而良好的可观测性架构能让这个过程效率提升 10 倍。

从黑箱到白盒,从被动救火到主动预防,这是每一个 Agent 工程团队的必经之路。


文章数据基于 LangSmith、LangGraph 官方文档及 OpenAI API 文档整理
最后更新:2026-02-18