LangGraph Production生产部署:从Demo到上线

唔~ 终于写到LangGraph系列的第十篇了!如果你一路跟着走过来,现在应该已经掌握了LangGraph的核心概念:StateGraph、Node、Edge、Memory、Human-in-the-Loop、Streaming……但等等,你以为这就完了?

Demo在本地跑得欢,一到生产环境就跪?今天这篇就是来解决这个问题的。我们要聊的是:如何把LangGraph应用从玩具变成能扛住真实流量的生产级系统。

呐,坐稳了,准备起飞。

一、引言:生产环境 vs 开发环境的差异

1.1 开发环境的”乌托邦”

在本地开发时,我们的LangGraph应用通常是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from langgraph.graph import StateGraph, END

# 定义State
class AgentState(TypedDict):
messages: list

# 简单的Node
def agent_node(state):
return {"messages": ["Hello"]}

# 构建图
builder = StateGraph(AgentState)
builder.add_node("agent", agent_node)
builder.set_entry_point("agent")
graph = builder.compile()

# 同步调用
result = graph.invoke({"messages": []})

代码简洁,运行顺畅,完美!但这就是个乌托邦——单线程、无持久化、不考虑失败、不管性能。

1.2 生产环境的”残酷现实”

当你把应用部署到生产环境,会面临这些挑战:

挑战 开发环境 生产环境
并发 单用户 成千上万并发请求
持久化 内存State 需要数据库持久化
容错 失败就报错 必须优雅降级、重试
可观测 print调试 需要日志、指标、追踪
部署 python app.py Docker/K8s/Serverless
安全 本地Token 密钥管理、访问控制

生产环境的核心原则:

  1. State必须持久化 —— 进程重启不能丢失对话历史
  2. 故障是常态 —— LLM API会超时,数据库会断开,网络会抖动
  3. 可观测性缺一不可 —— 看不到问题就无法修复
  4. 性能需要优化 —— 串行处理在并发场景下是灾难
  5. 部署要自动化 —— 手动部署早晚会出错

1.3 本文目标

这篇指南将带你走完LangGraph生产化的完整路径:

  • 选择合适的持久化方案
  • 搭建可观测性体系
  • 实现健壮的错误处理
  • 优化并发性能
  • 用Docker/K8s部署

适合人群:

  • 已有LangGraph基础,准备上生产的开发者
  • 想了解AI Agent生产最佳实践的工程师
  • 对系统架构感兴趣的技术人员

二、持久化选型:Postgres/Redis/MongoDB对比

LangGraph的Checkpointer机制让State持久化变得简单,但选择什么存储后端是个关键决策。

2.1 三种持久化方案概览

LangGraph支持多种Checkpointer实现:

方案 适用场景 优点 缺点
MemorySaver 开发/测试 零配置、极速 进程重启数据丢失
Postgres 生产首选 事务支持、查询灵活 需要运维
Redis 高性能缓存 极速读写、TTL支持 内存限制、无复杂查询
MongoDB 灵活Schema 文档模型、易扩展 一致性较弱

2.2 Postgres:生产环境的首选

对于大多数生产场景,PostgreSQL + LangGraph的PostgresSaver是最佳选择。

为什么选Postgres?

  1. ACID事务:State更新必须是原子的,不能让对话”丢一半”
  2. JSONB支持:LangGraph的State是JSON结构,Postgres的JSONB完美支持
  3. 成熟稳定:几十年的生产验证,云厂商支持完善
  4. 查询能力:需要查历史对话、统计数据时很方便

配置示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg import Connection

# 数据库连接配置
DB_URI = "postgresql://user:pass@localhost:5432/langgraph_db"

# 创建连接
conn = Connection.connect(DB_URI)

# 初始化Checkpointer
# 这会创建必要的表结构
checkpointer = PostgresSaver(conn)
checkpointer.setup() # 首次运行需要执行

# 构建带持久化的Graph
builder = StateGraph(AgentState)
# ... 添加节点和边 ...

# 编译时传入checkpointer
graph = builder.compile(checkpointer=checkpointer)

表结构说明

PostgresSaver会自动创建以下表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
-- checkpoints表存储State快照
CREATE TABLE checkpoints (
thread_id TEXT NOT NULL,
checkpoint_ns TEXT NOT NULL DEFAULT '',
checkpoint_id TEXT NOT NULL,
parent_checkpoint_id TEXT,
type TEXT,
checkpoint BYTEA,
metadata BYTEA,
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
);

-- checkpoint_writes表存储增量更新
CREATE TABLE checkpoint_writes (
thread_id TEXT NOT NULL,
checkpoint_ns TEXT NOT NULL DEFAULT '',
checkpoint_id TEXT NOT NULL,
task_id TEXT NOT NULL,
idx INTEGER NOT NULL,
channel TEXT NOT NULL,
type TEXT,
value BYTEA,
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
);

使用BYTEA存储二进制数据,通过PostgreSQL的序列化机制保证数据完整性。

2.3 Redis:极致性能场景

如果你的应用对延迟极其敏感,或者需要自动过期(TTL),Redis是更好的选择。

适用场景

  • 高频对话场景(客服机器人、实时助手)
  • 会话需要自动过期(临时对话、匿名用户)
  • 已经使用Redis作为缓存层,希望统一技术栈

配置示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from langgraph.checkpoint.redis import RedisSaver
import redis

# Redis连接
redis_client = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=False # 必须保持二进制模式
)

# 创建Checkpointer
checkpointer = RedisSaver(redis_client)

# 编译Graph
graph = builder.compile(checkpointer=checkpointer)

Redis的局限

1
2
3
4
5
6
7
8
9
10
11
# ⚠️ Redis不适合复杂查询
# 比如你想统计"过去24小时内所有对话的平均轮数"
# Redis很难高效完成这种分析

# ✅ 这种查询用Postgres很轻松
"""
SELECT thread_id, COUNT(*) as turn_count
FROM checkpoints
WHERE metadata->>'created_at' > NOW() - INTERVAL '24 hours'
GROUP BY thread_id;
"""

2.4 MongoDB:灵活Schema场景

如果你的State结构多变,或者团队已经深度使用MongoDB,可以考虑MongoDBSaver。

适用场景

  • Schema经常变化的实验性项目
  • 已经使用MongoDB作为主数据库
  • 需要灵活的查询和聚合能力

配置示例

1
2
3
4
5
6
7
8
9
10
11
12
from langgraph.checkpoint.mongodb import MongoDBSaver
from pymongo import MongoClient

# MongoDB连接
client = MongoClient("mongodb://localhost:27017/")
db = client["langgraph_db"]

# 创建Checkpointer
checkpointer = MongoDBSaver(db)

# 编译Graph
graph = builder.compile(checkpointer=checkpointer)

2.5 选型决策树

1
2
3
4
5
6
7
是否需要复杂查询和分析?
├── 是 → Postgres
└── 否 → 是否需要自动过期(TTL)?
├── 是 → Redis
└── 否 → 团队技术栈?
├── 已有MongoDB → MongoDB
└── 其他 → Postgres(稳妥之选)

2.6 生产环境Postgres配置建议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# docker-compose.yml 示例
version: '3.8'
services:
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: langgraph_db
POSTGRES_USER: langgraph
POSTGRES_PASSWORD: ${DB_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
command:
# 生产环境优化参数
- "postgres"
- "-c"
- "shared_buffers=256MB"
- "-c"
- "effective_cache_size=768MB"
- "-c"
- "maintenance_work_mem=64MB"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U langgraph -d langgraph_db"]
interval: 10s
timeout: 5s
retries: 5

volumes:
postgres_data:

三、可观测性:LangSmith/Langfuse集成

在生产环境,你看不到系统在做什么,就无法管理它。可观测性三大支柱:日志(Logs)、指标(Metrics)、追踪(Tracing)。

3.1 为什么可观测性对AI Agent特别重要?

相比传统应用,AI Agent更难调试:

  • 非确定性:同样的输入可能产生不同输出
  • 多步推理:一个请求可能经过十几个Node
  • 外部依赖:调用LLM、搜索、API,任何一个都可能失败
  • 上下文依赖:输出质量高度依赖历史对话

3.2 LangSmith:LangChain官方方案

LangSmith是LangChain团队提供的全栈可观测性平台,与LangGraph集成最简单。

快速集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import os
from langsmith import Client

# 配置环境变量
os.environ["LANGSMITH_API_KEY"] = "your-api-key"
os.environ["LANGSMITH_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGSMITH_PROJECT"] = "my-production-app"

# LangGraph自动集成
# 只要环境变量配置好,无需修改代码
from langgraph.graph import StateGraph

builder = StateGraph(AgentState)
# ... 构建Graph ...
graph = builder.compile()

LangSmith的核心功能

  1. Trace追踪:可视化Agent执行流程

    1
    2
    3
    User Input → Agent Node → Tool Call → LLM Call → Final Output
    ↓ ↓ ↓
    [耗时] [参数] [Token数]
  2. 调试查看:每个节点的输入输出、Token消耗、延迟

  3. 数据集与评估:批量测试、回归测试

  4. 在线监控:实时查看请求量、错误率、延迟分布

成本考量

LangSmith有免费额度,但生产环境大量使用需要考虑成本:

计划 价格 Trace数量
Developer 免费 5,000/月
Plus $39/月 10,000/月 + $5/千条
Enterprise 定制 无限

3.3 Langfuse:开源替代方案

如果你需要数据自主可控成本敏感,Langfuse是优秀的开源替代。

自托管部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# docker-compose.yml
version: '3.8'
services:
langfuse:
image: langfuse/langfuse:latest
depends_on:
- postgres
- redis
ports:
- "3000:3000"
environment:
- DATABASE_URL=postgresql://langfuse:pass@postgres:5432/langfuse
- NEXTAUTH_SECRET=your-secret
- SALT=your-salt
- REDIS_URL=redis://redis:6379

postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: langfuse
POSTGRES_USER: langfuse
POSTGRES_PASSWORD: pass
volumes:
- langfuse_postgres:/var/lib/postgresql/data

redis:
image: redis:7-alpine

volumes:
langfuse_postgres:

LangGraph集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from langfuse.callback import CallbackHandler

# 创建Langfuse回调处理器
langfuse_handler = CallbackHandler(
public_key="pk-lf-your-key",
secret_key="sk-lf-your-key",
host="http://localhost:3000"
)

# 在调用时传入
result = graph.invoke(
{"messages": [user_input]},
config={
"callbacks": [langfuse_handler],
"thread_id": conversation_id
}
)

3.4 自研方案:基于结构化日志

对于简单场景,也可以自己实现可观测性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import json
import time
import logging
from contextlib import contextmanager

# 结构化日志配置
logger = logging.getLogger("langgraph")

class GraphTracer:
"""简单的Graph执行追踪器"""

def __init__(self, trace_id: str):
self.trace_id = trace_id
self.spans = []
self.start_time = time.time()

@contextmanager
def span(self, name: str, **attributes):
"""创建追踪span的上下文管理器"""
span_start = time.time()
span_data = {
"name": name,
"start_time": span_start,
"attributes": attributes
}

try:
yield span_data
span_data["status"] = "ok"
except Exception as e:
span_data["status"] = "error"
span_data["error"] = str(e)
raise
finally:
span_data["duration_ms"] = (time.time() - span_start) * 1000
self.spans.append(span_data)

def finish(self):
"""完成追踪,输出日志"""
total_duration = (time.time() - self.start_time) * 1000

trace_log = {
"trace_id": self.trace_id,
"total_duration_ms": total_duration,
"span_count": len(self.spans),
"spans": self.spans
}

logger.info(json.dumps(trace_log, ensure_ascii=False))

# 使用示例
tracer = GraphTracer(trace_id="conv-123")

with tracer.span("agent_node", model="gpt-4"):
# 执行Agent逻辑
result = agent_logic()

with tracer.span("tool_call", tool="search"):
# 执行工具调用
tool_result = call_tool()

tracer.finish()

3.5 生产环境可观测性 checklist

  • 所有Graph调用都有Trace ID关联
  • 每个Node的输入输出都被记录(注意脱敏)
  • LLM调用的Token消耗、延迟、错误率被监控
  • 关键指标(请求量、延迟、错误率)有Dashboard
  • 异常情况有Alert通知(钉钉/飞书/Slack)
  • 日志有合理的保留策略(如30天)
  • 敏感信息(用户数据、API Key)不脱敏不上日志

四、错误处理与重试策略

在生产环境,故障是常态,不是例外。LLM API会限流、会超时,网络会抖动,你的Agent必须能优雅应对。

4.1 LangGraph中的错误类型

错误类型 示例 处理策略
LLM错误 RateLimit, Timeout, API Error 指数退避重试
工具错误 API 500, 连接超时 降级策略、重试
业务错误 参数校验失败 返回友好提示
系统错误 内存不足,DB断开 快速失败、报警

4.2 Node级别的错误处理

给每个可能出错的Node加上错误处理包装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from functools import wraps
import time
import random

def with_retry(max_retries=3, backoff_factor=2):
"""重试装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None

for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e

# 判断是否应该重试
if not should_retry(e):
raise

# 计算退避时间(指数退避 + 随机抖动)
wait_time = (backoff_factor ** attempt) + random.uniform(0, 1)

print(f"Attempt {attempt + 1} failed: {e}. Retrying in {wait_time:.1f}s...")
time.sleep(wait_time)

# 所有重试都失败
raise last_exception

return wrapper
return decorator

def should_retry(error):
"""判断错误是否可重试"""
error_str = str(error).lower()

# 可重试的错误
retryable = [
"rate limit",
"timeout",
"connection",
"503",
"502",
"429"
]

return any(err in error_str for err in retryable)

# 应用到Node
@with_retry(max_retries=3)
def call_llm_node(state: AgentState):
"""带重试的LLM调用Node"""
response = llm.invoke(state["messages"])
return {"messages": [response]}

@with_retry(max_retries=2)
def search_tool_node(state: AgentState):
"""带重试的搜索工具Node"""
query = state["messages"][-1].content
results = search_api(query)
return {"search_results": results}

4.3 Graph级别的错误边界

使用条件边实现”错误捕获”机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from typing import Literal

class AgentState(TypedDict):
messages: list
error: Optional[str] # 错误信息字段
retry_count: int # 重试计数

def safe_llm_node(state: AgentState):
"""带错误处理的LLM Node"""
try:
response = llm.invoke(state["messages"])
return {
"messages": [response],
"error": None,
"retry_count": 0
}
except Exception as e:
return {
"error": f"LLM调用失败: {str(e)}",
"retry_count": state.get("retry_count", 0) + 1
}

def error_handler_node(state: AgentState):
"""错误处理Node"""
error_msg = state["error"]

# 返回友好的错误提示
return {
"messages": [
AIMessage(content=f"抱歉,我遇到了一些问题:{error_msg}。请稍后重试。")
],
"error": None
}

def route_after_llm(state: AgentState) -> Literal["continue", "error", "retry"]:
"""根据状态路由"""
if state.get("error"):
if state.get("retry_count", 0) < 3:
return "retry" # 可以重试
else:
return "error" # 重试次数用尽,走错误处理
return "continue"

# 构建Graph
builder = StateGraph(AgentState)
builder.add_node("llm", safe_llm_node)
builder.add_node("error_handler", error_handler_node)

builder.set_entry_point("llm")
builder.add_conditional_edges(
"llm",
route_after_llm,
{
"continue": END,
"error": "error_handler",
"retry": "llm" # 回到LLM节点重试
}
)
builder.add_edge("error_handler", END)

graph = builder.compile()

4.4 断路器模式(Circuit Breaker)

当LLM服务持续故障时,快速失败比无限重试更好:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from enum import Enum
import time

class CircuitState(Enum):
CLOSED = "closed" # 正常
OPEN = "open" # 熔断
HALF_OPEN = "half_open" # 试探

class CircuitBreaker:
"""简单的断路器实现"""

def __init__(
self,
failure_threshold=5, # 触发熔断的失败次数
recovery_timeout=60, # 熔断后多久尝试恢复(秒)
half_open_max_calls=3 # 半开状态允许的最大调用数
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls

self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.half_open_calls = 0

def call(self, func, *args, **kwargs):
"""包装调用"""

if self.state == CircuitState.OPEN:
# 检查是否可以进入半开状态
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
print("Circuit breaker entering half-open state")
else:
raise Exception("Circuit breaker is OPEN - service unavailable")

if self.state == CircuitState.HALF_OPEN:
if self.half_open_calls >= self.half_open_max_calls:
raise Exception("Circuit breaker is half-open - too many test calls")
self.half_open_calls += 1

try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise e

def on_success(self):
"""调用成功处理"""
if self.state == CircuitState.HALF_OPEN:
# 半开状态成功,关闭熔断
self.state = CircuitState.CLOSED
self.failure_count = 0
print("Circuit breaker CLOSED - service recovered")
else:
self.failure_count = max(0, self.failure_count - 1)

def on_failure(self):
"""调用失败处理"""
self.failure_count += 1
self.last_failure_time = time.time()

if self.state == CircuitState.HALF_OPEN:
# 半开状态失败,重新熔断
self.state = CircuitState.OPEN
print("Circuit breaker OPEN - recovery failed")
elif self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit breaker OPEN - {self.failure_count} failures")

# 使用
llm_breaker = CircuitBreaker(failure_threshold=5)

def llm_call_with_breaker(messages):
return llm_breaker.call(llm.invoke, messages)

4.5 超时控制

防止单个Node卡住导致整个请求挂起:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import signal
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeout

def node_with_timeout(func, timeout_sec=30):
"""包装Node函数,添加超时控制"""

def wrapper(state):
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(func, state)
try:
return future.result(timeout=timeout_sec)
except FutureTimeout:
return {
"error": f"Node execution timeout after {timeout_sec}s",
"messages": [AIMessage(content="处理超时,请稍后重试。")]
}

return wrapper

# 应用超时包装
def slow_llm_node(state):
# 可能很慢的LLM调用
time.sleep(60) # 模拟超时
return {"messages": ["Done"]}

# 包装后最多执行10秒
timed_node = node_with_timeout(slow_llm_node, timeout_sec=10)

五、并发与性能优化

生产环境需要处理并发请求,LangGraph默认是单线程执行,需要额外配置来支持并发。

5.1 异步执行

LangGraph支持异步API,可以显著提升并发性能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import asyncio
from langgraph.graph import StateGraph

# 异步Node
async def async_llm_node(state: AgentState):
"""异步LLM调用"""
# 使用异步LLM客户端
response = await async_llm.ainvoke(state["messages"])
return {"messages": [response]}

async def async_tool_node(state: AgentState):
"""异步工具调用"""
# 并行调用多个工具
tasks = [
call_tool_a(state),
call_tool_b(state),
call_tool_c(state)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {"tool_results": results}

# 构建异步Graph
builder = StateGraph(AgentState)
builder.add_node("agent", async_llm_node)
builder.add_node("tools", async_tool_node)
# ... 添加边 ...

graph = builder.compile()

# 异步调用
async def handle_request(user_input: str):
result = await graph.ainvoke({
"messages": [HumanMessage(content=user_input)]
})
return result

# FastAPI集成
from fastapi import FastAPI

app = FastAPI()

@app.post("/chat")
async def chat_endpoint(request: ChatRequest):
result = await graph.ainvoke(
{"messages": [HumanMessage(content=request.message)]},
config={"thread_id": request.conversation_id}
)
return {"response": result["messages"][-1].content}

5.2 并行Node执行

当多个Node之间没有依赖时,可以让它们并行执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 传统方式:串行执行
builder.add_edge("start", "node_a")
builder.add_edge("node_a", "node_b") # 必须等A完成
builder.add_edge("node_b", "end")

# 并行执行方式
# 使用Send机制或Fan-out模式

def fan_out_node(state: AgentState):
"""将任务分发给多个并行Node"""
# 生成多个并行任务
return [
Send("processor", {"task": task_data})
for task_data in state["pending_tasks"]
]

def processor_node(state: AgentState):
"""处理单个任务"""
# 每个任务独立执行
result = process_task(state["task"])
return {"results": [result]}

def aggregate_node(state: AgentState):
"""聚合并行结果"""
# 收集所有并行节点的结果
all_results = state.get("results", [])
return {"final_result": combine_results(all_results)}

# 构建Graph
builder = StateGraph(AgentState)
builder.add_node("fan_out", fan_out_node)
builder.add_node("processor", processor_node)
builder.add_node("aggregate", aggregate_node)

builder.set_entry_point("fan_out")
builder.add_conditional_edges("fan_out", fan_out_node) # 动态生成并行边
builder.add_edge("processor", "aggregate")
builder.add_edge("aggregate", END)

5.3 流式响应优化

对于实时交互场景,流式响应比等待完整结果更好:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
"""流式响应端点"""

async def generate():
async for chunk in graph.astream(
{"messages": [HumanMessage(content=request.message)]},
config={"thread_id": request.conversation_id}
):
# chunk格式: {"node_name": {state_update}}
for node_name, update in chunk.items():
if "messages" in update:
# 提取消息内容
for msg in update["messages"]:
if hasattr(msg, "content"):
yield f"data: {json.dumps({'content': msg.content})}\n\n"

yield "data: [DONE]\n\n"

return StreamingResponse(
generate(),
media_type="text/event-stream"
)

5.4 连接池配置

数据库和HTTP连接池是性能关键:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from psycopg_pool import ConnectionPool

# Postgres连接池
pg_pool = ConnectionPool(
conninfo="postgresql://user:pass@localhost/db",
min_size=5, # 最小连接数
max_size=20, # 最大连接数
max_idle=300, # 最大空闲时间(秒)
max_lifetime=3600 # 连接最大生命周期
)

# 使用连接池创建Checkpointer
checkpointer = PostgresSaver(pg_pool)

# HTTP连接池(aiohttp示例)
import aiohttp

session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(
limit=100, # 总连接数限制
limit_per_host=30, # 单域名连接数限制
ttl_dns_cache=300, # DNS缓存时间
use_dns_cache=True
),
timeout=aiohttp.ClientTimeout(total=30)
)

5.5 性能监控指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import time
from dataclasses import dataclass
from typing import Dict

@dataclass
class PerformanceMetrics:
total_requests: int = 0
total_latency_ms: float = 0
error_count: int = 0
token_count: int = 0

@property
def avg_latency_ms(self):
if self.total_requests == 0:
return 0
return self.total_latency_ms / self.total_requests

@property
def error_rate(self):
if self.total_requests == 0:
return 0
return self.error_count / self.total_requests

metrics = PerformanceMetrics()

async def tracked_invoke(graph, state, config):
"""带性能追踪的Graph调用"""
start = time.time()
metrics.total_requests += 1

try:
result = await graph.ainvoke(state, config)

# 统计Token(如果有)
if "usage" in result:
metrics.token_count += result["usage"].get("total_tokens", 0)

return result
except Exception as e:
metrics.error_count += 1
raise
finally:
latency = (time.time() - start) * 1000
metrics.total_latency_ms += latency

# 暴露指标给Prometheus
from prometheus_client import Counter, Histogram, generate_latest

REQUEST_COUNT = Counter('langgraph_requests_total', 'Total requests')
LATENCY_HISTOGRAM = Histogram('langgraph_latency_seconds', 'Request latency')

@app.get("/metrics")
def metrics_endpoint():
return Response(generate_latest(), media_type="text/plain")

六、部署架构:Docker/K8s配置示例

6.1 最小化Docker部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY . .

# 非root用户运行
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser

# 暴露端口
EXPOSE 8000

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8000/health')"

# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
1
2
3
4
5
6
7
8
9
# requirements.txt
langgraph>=0.2.0
langchain-openai>=0.1.0
psycopg>=3.0.0
redis>=5.0.0
fastapi>=0.110.0
uvicorn[standard]>=0.27.0
pydantic>=2.0.0
prometheus-client>=0.19.0

6.2 Docker Compose完整栈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# docker-compose.yml
version: '3.8'

services:
app:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://langgraph:password@postgres:5432/langgraph
- REDIS_URL=redis://redis:6379
- OPENAI_API_KEY=${OPENAI_API_KEY}
- LANGSMITH_API_KEY=${LANGSMITH_API_KEY}
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
deploy:
replicas: 2
resources:
limits:
cpus: '2'
memory: 4G
reservations:
cpus: '0.5'
memory: 1G
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3

postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: langgraph
POSTGRES_USER: langgraph
POSTGRES_PASSWORD: password
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U langgraph -d langgraph"]
interval: 10s
timeout: 5s
retries: 5

redis:
image: redis:7-alpine
volumes:
- redis_data:/data
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5

nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
- app

volumes:
postgres_data:
redis_data:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# nginx.conf
upstream app {
server app:8000;
}

server {
listen 80;

location / {
proxy_pass http://app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;

# WebSocket支持(流式响应)
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}

6.3 Kubernetes部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: langgraph
---
# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: langgraph-config
namespace: langgraph
data:
DATABASE_URL: "postgresql://langgraph:password@postgres:5432/langgraph"
REDIS_URL: "redis://redis:6379"
---
# k8s/secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: langgraph-secrets
namespace: langgraph
type: Opaque
stringData:
OPENAI_API_KEY: "sk-xxx"
LANGSMITH_API_KEY: "ls-xxx"
---
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: langgraph-app
namespace: langgraph
spec:
replicas: 3
selector:
matchLabels:
app: langgraph
template:
metadata:
labels:
app: langgraph
spec:
containers:
- name: app
image: your-registry/langgraph-app:latest
ports:
- containerPort: 8000
envFrom:
- configMapRef:
name: langgraph-config
- secretRef:
name: langgraph-secrets
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "4Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
# k8s/service.yaml
apiVersion: v1
kind: Service
metadata:
name: langgraph-service
namespace: langgraph
spec:
selector:
app: langgraph
ports:
- port: 80
targetPort: 8000
type: ClusterIP
---
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: langgraph-hpa
namespace: langgraph
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: langgraph-app
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80

6.4 部署检查清单

  • Dockerfile使用多阶段构建减小镜像体积
  • 容器以非root用户运行
  • 配置了健康检查(liveness/readiness probe)
  • 资源限制(CPU/Memory)已设置
  • 环境变量和密钥分离(ConfigMap/Secret)
  • 日志输出到stdout/stderr(方便收集)
  • 配置了HPA自动扩缩容
  • 数据库连接使用连接池
  • 配置了Graceful Shutdown(处理SIGTERM)

七、系列总结与进阶路线图

7.1 本系列回顾

呼~ 写到这里,LangGraph零基础入门系列就告一段落了。让我们回顾一下这十篇的内容:

篇目 主题 核心内容
01 LangGraph小白指南 Graph基础、Node/Edge概念
02 State与Schema设计 TypedDict、状态管理最佳实践
03 Node深度解析 各种Node类型、工具集成
04 Edge与路由 条件边、动态路由
05 Memory记忆系统 短期/长期记忆、Checkpoint
06 人机协作(HITL) interrupt、人工审核、编辑
07 多智能体协作 Multi-Agent架构、协作模式
08 流式架构 Streaming、实时反馈
09 条件边指南 复杂路由逻辑
10 生产部署 持久化、可观测、部署架构

7.2 从入门到精通的进阶路线

如果你已经掌握了以上内容,接下来的进阶方向:

架构层面:

  • 学习Multi-Agent框架(AutoGen、CrewAI对比)
  • 深入图数据库(Neo4j)用于复杂知识图谱
  • 事件驱动架构(Event Sourcing)

工程层面:

  • 学习ML Ops(模型版本管理、A/B测试)
  • 深入可观测性(OpenTelemetry、自定义指标)
  • 安全与合规(提示注入防护、数据隐私)

算法层面:

  • 学习ReAct、Plan-and-Solve等高级推理模式
  • 研究RAG优化(重排序、查询改写、多路召回)
  • 探索Function Calling的边界

7.3 写在最后

呐,AI Agent的世界变化太快了,LangGraph还在快速迭代。但这个系列的核心目标不是教你每一个API的用法,而是帮你建立系统性的思维框架

  • State怎么设计?
  • Node如何拆分?
  • 什么时候用条件边?
  • 怎么保证可靠性?

这些问题的答案比具体代码更有生命力。

记住:

“Agent不是目的,解决问题才是。”

唔,祝你在AI Agent的征途上越走越远!有问题随时来聊。


参考资源


本文是LangGraph零基础入门系列的第10篇(完结篇),首发于C哥的技术博客。转载请联系作者获取授权。