LangGraph Production生产部署:从Demo到上线 唔~ 终于写到LangGraph系列的第十篇了!如果你一路跟着走过来,现在应该已经掌握了LangGraph的核心概念:StateGraph、Node、Edge、Memory、Human-in-the-Loop、Streaming……但等等,你以为这就完了?
Demo在本地跑得欢,一到生产环境就跪?今天这篇就是来解决这个问题的。我们要聊的是:如何把LangGraph应用从玩具变成能扛住真实流量的生产级系统。
呐,坐稳了,准备起飞。
一、引言:生产环境 vs 开发环境的差异 1.1 开发环境的”乌托邦” 在本地开发时,我们的LangGraph应用通常是这样的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from langgraph.graph import StateGraph, ENDclass AgentState (TypedDict ): messages: list def agent_node (state ): return {"messages" : ["Hello" ]} builder = StateGraph(AgentState) builder.add_node("agent" , agent_node) builder.set_entry_point("agent" ) graph = builder.compile () result = graph.invoke({"messages" : []})
代码简洁,运行顺畅,完美!但这就是个乌托邦 ——单线程、无持久化、不考虑失败、不管性能。
1.2 生产环境的”残酷现实” 当你把应用部署到生产环境,会面临这些挑战:
挑战
开发环境
生产环境
并发
单用户
成千上万并发请求
持久化
内存State
需要数据库持久化
容错
失败就报错
必须优雅降级、重试
可观测
print调试
需要日志、指标、追踪
部署
python app.py
Docker/K8s/Serverless
安全
本地Token
密钥管理、访问控制
生产环境的核心原则:
State必须持久化 —— 进程重启不能丢失对话历史
故障是常态 —— LLM API会超时,数据库会断开,网络会抖动
可观测性缺一不可 —— 看不到问题就无法修复
性能需要优化 —— 串行处理在并发场景下是灾难
部署要自动化 —— 手动部署早晚会出错
1.3 本文目标 这篇指南将带你走完LangGraph生产化的完整路径:
选择合适的持久化方案
搭建可观测性体系
实现健壮的错误处理
优化并发性能
用Docker/K8s部署
适合人群:
已有LangGraph基础,准备上生产的开发者
想了解AI Agent生产最佳实践的工程师
对系统架构感兴趣的技术人员
二、持久化选型:Postgres/Redis/MongoDB对比 LangGraph的Checkpointer机制让State持久化变得简单,但选择什么存储后端是个关键决策。
2.1 三种持久化方案概览 LangGraph支持多种Checkpointer实现:
方案
适用场景
优点
缺点
MemorySaver
开发/测试
零配置、极速
进程重启数据丢失
Postgres
生产首选
事务支持、查询灵活
需要运维
Redis
高性能缓存
极速读写、TTL支持
内存限制、无复杂查询
MongoDB
灵活Schema
文档模型、易扩展
一致性较弱
2.2 Postgres:生产环境的首选 对于大多数生产场景,PostgreSQL + LangGraph的PostgresSaver 是最佳选择。
为什么选Postgres?
ACID事务 :State更新必须是原子的,不能让对话”丢一半”
JSONB支持 :LangGraph的State是JSON结构,Postgres的JSONB完美支持
成熟稳定 :几十年的生产验证,云厂商支持完善
查询能力 :需要查历史对话、统计数据时很方便
配置示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from langgraph.checkpoint.postgres import PostgresSaverfrom psycopg import ConnectionDB_URI = "postgresql://user:pass@localhost:5432/langgraph_db" conn = Connection.connect(DB_URI) checkpointer = PostgresSaver(conn) checkpointer.setup() builder = StateGraph(AgentState) graph = builder.compile (checkpointer=checkpointer)
表结构说明 PostgresSaver会自动创建以下表:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 CREATE TABLE checkpoints ( thread_id TEXT NOT NULL , checkpoint_ns TEXT NOT NULL DEFAULT '' , checkpoint_id TEXT NOT NULL , parent_checkpoint_id TEXT, type TEXT, checkpoint BYTEA, metadata BYTEA, PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id) ); CREATE TABLE checkpoint_writes ( thread_id TEXT NOT NULL , checkpoint_ns TEXT NOT NULL DEFAULT '' , checkpoint_id TEXT NOT NULL , task_id TEXT NOT NULL , idx INTEGER NOT NULL , channel TEXT NOT NULL , type TEXT, value BYTEA, PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx) );
使用BYTEA存储二进制数据 ,通过PostgreSQL的序列化机制保证数据完整性。
2.3 Redis:极致性能场景 如果你的应用对延迟极其敏感,或者需要自动过期(TTL),Redis是更好的选择。
适用场景
高频对话场景(客服机器人、实时助手)
会话需要自动过期(临时对话、匿名用户)
已经使用Redis作为缓存层,希望统一技术栈
配置示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from langgraph.checkpoint.redis import RedisSaverimport redisredis_client = redis.Redis( host='localhost' , port=6379 , db=0 , decode_responses=False ) checkpointer = RedisSaver(redis_client) graph = builder.compile (checkpointer=checkpointer)
Redis的局限 1 2 3 4 5 6 7 8 9 10 11 """ SELECT thread_id, COUNT(*) as turn_count FROM checkpoints WHERE metadata->>'created_at' > NOW() - INTERVAL '24 hours' GROUP BY thread_id; """
2.4 MongoDB:灵活Schema场景 如果你的State结构多变,或者团队已经深度使用MongoDB,可以考虑MongoDBSaver。
适用场景
Schema经常变化的实验性项目
已经使用MongoDB作为主数据库
需要灵活的查询和聚合能力
配置示例 1 2 3 4 5 6 7 8 9 10 11 12 from langgraph.checkpoint.mongodb import MongoDBSaverfrom pymongo import MongoClientclient = MongoClient("mongodb://localhost:27017/" ) db = client["langgraph_db" ] checkpointer = MongoDBSaver(db) graph = builder.compile (checkpointer=checkpointer)
2.5 选型决策树 1 2 3 4 5 6 7 是否需要复杂查询和分析? ├── 是 → Postgres └── 否 → 是否需要自动过期(TTL)? ├── 是 → Redis └── 否 → 团队技术栈? ├── 已有MongoDB → MongoDB └── 其他 → Postgres(稳妥之选)
2.6 生产环境Postgres配置建议 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 version: '3.8' services: postgres: image: postgres:16-alpine environment: POSTGRES_DB: langgraph_db POSTGRES_USER: langgraph POSTGRES_PASSWORD: ${DB_PASSWORD} volumes: - postgres_data:/var/lib/postgresql/data ports: - "5432:5432" command: - "postgres" - "-c" - "shared_buffers=256MB" - "-c" - "effective_cache_size=768MB" - "-c" - "maintenance_work_mem=64MB" healthcheck: test: ["CMD-SHELL" , "pg_isready -U langgraph -d langgraph_db" ] interval: 10s timeout: 5s retries: 5 volumes: postgres_data:
三、可观测性:LangSmith/Langfuse集成 在生产环境,你看不到系统在做什么,就无法管理它 。可观测性三大支柱:日志(Logs)、指标(Metrics)、追踪(Tracing)。
3.1 为什么可观测性对AI Agent特别重要? 相比传统应用,AI Agent更难调试:
非确定性 :同样的输入可能产生不同输出
多步推理 :一个请求可能经过十几个Node
外部依赖 :调用LLM、搜索、API,任何一个都可能失败
上下文依赖 :输出质量高度依赖历史对话
3.2 LangSmith:LangChain官方方案 LangSmith是LangChain团队提供的全栈可观测性平台,与LangGraph集成最简单。
快速集成 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import osfrom langsmith import Clientos.environ["LANGSMITH_API_KEY" ] = "your-api-key" os.environ["LANGSMITH_ENDPOINT" ] = "https://api.smith.langchain.com" os.environ["LANGSMITH_PROJECT" ] = "my-production-app" from langgraph.graph import StateGraphbuilder = StateGraph(AgentState) graph = builder.compile ()
LangSmith的核心功能
Trace追踪 :可视化Agent执行流程
1 2 3 User Input → Agent Node → Tool Call → LLM Call → Final Output ↓ ↓ ↓ [耗时] [参数] [Token数]
调试查看 :每个节点的输入输出、Token消耗、延迟
数据集与评估 :批量测试、回归测试
在线监控 :实时查看请求量、错误率、延迟分布
成本考量 LangSmith有免费额度,但生产环境大量使用需要考虑成本:
计划
价格
Trace数量
Developer
免费
5,000/月
Plus
$39/月
10,000/月 + $5/千条
Enterprise
定制
无限
3.3 Langfuse:开源替代方案 如果你需要数据自主可控 或成本敏感 ,Langfuse是优秀的开源替代。
自托管部署 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 version: '3.8' services: langfuse: image: langfuse/langfuse:latest depends_on: - postgres - redis ports: - "3000:3000" environment: - DATABASE_URL=postgresql://langfuse:pass@postgres:5432/langfuse - NEXTAUTH_SECRET=your-secret - SALT=your-salt - REDIS_URL=redis://redis:6379 postgres: image: postgres:16-alpine environment: POSTGRES_DB: langfuse POSTGRES_USER: langfuse POSTGRES_PASSWORD: pass volumes: - langfuse_postgres:/var/lib/postgresql/data redis: image: redis:7-alpine volumes: langfuse_postgres:
LangGraph集成 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from langfuse.callback import CallbackHandlerlangfuse_handler = CallbackHandler( public_key="pk-lf-your-key" , secret_key="sk-lf-your-key" , host="http://localhost:3000" ) result = graph.invoke( {"messages" : [user_input]}, config={ "callbacks" : [langfuse_handler], "thread_id" : conversation_id } )
3.4 自研方案:基于结构化日志 对于简单场景,也可以自己实现可观测性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 import jsonimport timeimport loggingfrom contextlib import contextmanagerlogger = logging.getLogger("langgraph" ) class GraphTracer : """简单的Graph执行追踪器""" def __init__ (self, trace_id: str ): self .trace_id = trace_id self .spans = [] self .start_time = time.time() @contextmanager def span (self, name: str , **attributes ): """创建追踪span的上下文管理器""" span_start = time.time() span_data = { "name" : name, "start_time" : span_start, "attributes" : attributes } try : yield span_data span_data["status" ] = "ok" except Exception as e: span_data["status" ] = "error" span_data["error" ] = str (e) raise finally : span_data["duration_ms" ] = (time.time() - span_start) * 1000 self .spans.append(span_data) def finish (self ): """完成追踪,输出日志""" total_duration = (time.time() - self .start_time) * 1000 trace_log = { "trace_id" : self .trace_id, "total_duration_ms" : total_duration, "span_count" : len (self .spans), "spans" : self .spans } logger.info(json.dumps(trace_log, ensure_ascii=False )) tracer = GraphTracer(trace_id="conv-123" ) with tracer.span("agent_node" , model="gpt-4" ): result = agent_logic() with tracer.span("tool_call" , tool="search" ): tool_result = call_tool() tracer.finish()
3.5 生产环境可观测性 checklist
四、错误处理与重试策略 在生产环境,故障是常态,不是例外 。LLM API会限流、会超时,网络会抖动,你的Agent必须能优雅应对。
4.1 LangGraph中的错误类型
错误类型
示例
处理策略
LLM错误
RateLimit, Timeout, API Error
指数退避重试
工具错误
API 500, 连接超时
降级策略、重试
业务错误
参数校验失败
返回友好提示
系统错误
内存不足,DB断开
快速失败、报警
4.2 Node级别的错误处理 给每个可能出错的Node加上错误处理包装:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 from functools import wrapsimport timeimport randomdef with_retry (max_retries=3 , backoff_factor=2 ): """重试装饰器""" def decorator (func ): @wraps(func ) def wrapper (*args, **kwargs ): last_exception = None for attempt in range (max_retries): try : return func(*args, **kwargs) except Exception as e: last_exception = e if not should_retry(e): raise wait_time = (backoff_factor ** attempt) + random.uniform(0 , 1 ) print (f"Attempt {attempt + 1 } failed: {e} . Retrying in {wait_time:.1 f} s..." ) time.sleep(wait_time) raise last_exception return wrapper return decorator def should_retry (error ): """判断错误是否可重试""" error_str = str (error).lower() retryable = [ "rate limit" , "timeout" , "connection" , "503" , "502" , "429" ] return any (err in error_str for err in retryable) @with_retry(max_retries=3 ) def call_llm_node (state: AgentState ): """带重试的LLM调用Node""" response = llm.invoke(state["messages" ]) return {"messages" : [response]} @with_retry(max_retries=2 ) def search_tool_node (state: AgentState ): """带重试的搜索工具Node""" query = state["messages" ][-1 ].content results = search_api(query) return {"search_results" : results}
4.3 Graph级别的错误边界 使用条件边实现”错误捕获”机制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 from typing import Literal class AgentState (TypedDict ): messages: list error: Optional [str ] retry_count: int def safe_llm_node (state: AgentState ): """带错误处理的LLM Node""" try : response = llm.invoke(state["messages" ]) return { "messages" : [response], "error" : None , "retry_count" : 0 } except Exception as e: return { "error" : f"LLM调用失败: {str (e)} " , "retry_count" : state.get("retry_count" , 0 ) + 1 } def error_handler_node (state: AgentState ): """错误处理Node""" error_msg = state["error" ] return { "messages" : [ AIMessage(content=f"抱歉,我遇到了一些问题:{error_msg} 。请稍后重试。" ) ], "error" : None } def route_after_llm (state: AgentState ) -> Literal ["continue" , "error" , "retry" ]: """根据状态路由""" if state.get("error" ): if state.get("retry_count" , 0 ) < 3 : return "retry" else : return "error" return "continue" builder = StateGraph(AgentState) builder.add_node("llm" , safe_llm_node) builder.add_node("error_handler" , error_handler_node) builder.set_entry_point("llm" ) builder.add_conditional_edges( "llm" , route_after_llm, { "continue" : END, "error" : "error_handler" , "retry" : "llm" } ) builder.add_edge("error_handler" , END) graph = builder.compile ()
4.4 断路器模式(Circuit Breaker) 当LLM服务持续故障时,快速失败比无限重试更好:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 from enum import Enumimport timeclass CircuitState (Enum ): CLOSED = "closed" OPEN = "open" HALF_OPEN = "half_open" class CircuitBreaker : """简单的断路器实现""" def __init__ ( self, failure_threshold=5 , recovery_timeout=60 , half_open_max_calls=3 ): self .failure_threshold = failure_threshold self .recovery_timeout = recovery_timeout self .half_open_max_calls = half_open_max_calls self .state = CircuitState.CLOSED self .failure_count = 0 self .last_failure_time = None self .half_open_calls = 0 def call (self, func, *args, **kwargs ): """包装调用""" if self .state == CircuitState.OPEN: if time.time() - self .last_failure_time > self .recovery_timeout: self .state = CircuitState.HALF_OPEN self .half_open_calls = 0 print ("Circuit breaker entering half-open state" ) else : raise Exception("Circuit breaker is OPEN - service unavailable" ) if self .state == CircuitState.HALF_OPEN: if self .half_open_calls >= self .half_open_max_calls: raise Exception("Circuit breaker is half-open - too many test calls" ) self .half_open_calls += 1 try : result = func(*args, **kwargs) self .on_success() return result except Exception as e: self .on_failure() raise e def on_success (self ): """调用成功处理""" if self .state == CircuitState.HALF_OPEN: self .state = CircuitState.CLOSED self .failure_count = 0 print ("Circuit breaker CLOSED - service recovered" ) else : self .failure_count = max (0 , self .failure_count - 1 ) def on_failure (self ): """调用失败处理""" self .failure_count += 1 self .last_failure_time = time.time() if self .state == CircuitState.HALF_OPEN: self .state = CircuitState.OPEN print ("Circuit breaker OPEN - recovery failed" ) elif self .failure_count >= self .failure_threshold: self .state = CircuitState.OPEN print (f"Circuit breaker OPEN - {self.failure_count} failures" ) llm_breaker = CircuitBreaker(failure_threshold=5 ) def llm_call_with_breaker (messages ): return llm_breaker.call(llm.invoke, messages)
4.5 超时控制 防止单个Node卡住导致整个请求挂起:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import signalfrom concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutdef node_with_timeout (func, timeout_sec=30 ): """包装Node函数,添加超时控制""" def wrapper (state ): with ThreadPoolExecutor(max_workers=1 ) as executor: future = executor.submit(func, state) try : return future.result(timeout=timeout_sec) except FutureTimeout: return { "error" : f"Node execution timeout after {timeout_sec} s" , "messages" : [AIMessage(content="处理超时,请稍后重试。" )] } return wrapper def slow_llm_node (state ): time.sleep(60 ) return {"messages" : ["Done" ]} timed_node = node_with_timeout(slow_llm_node, timeout_sec=10 )
五、并发与性能优化 生产环境需要处理并发请求,LangGraph默认是单线程执行,需要额外配置来支持并发。
5.1 异步执行 LangGraph支持异步API,可以显著提升并发性能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import asynciofrom langgraph.graph import StateGraphasync def async_llm_node (state: AgentState ): """异步LLM调用""" response = await async_llm.ainvoke(state["messages" ]) return {"messages" : [response]} async def async_tool_node (state: AgentState ): """异步工具调用""" tasks = [ call_tool_a(state), call_tool_b(state), call_tool_c(state) ] results = await asyncio.gather(*tasks, return_exceptions=True ) return {"tool_results" : results} builder = StateGraph(AgentState) builder.add_node("agent" , async_llm_node) builder.add_node("tools" , async_tool_node) graph = builder.compile () async def handle_request (user_input: str ): result = await graph.ainvoke({ "messages" : [HumanMessage(content=user_input)] }) return result from fastapi import FastAPIapp = FastAPI() @app.post("/chat" ) async def chat_endpoint (request: ChatRequest ): result = await graph.ainvoke( {"messages" : [HumanMessage(content=request.message)]}, config={"thread_id" : request.conversation_id} ) return {"response" : result["messages" ][-1 ].content}
5.2 并行Node执行 当多个Node之间没有依赖时,可以让它们并行执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 builder.add_edge("start" , "node_a" ) builder.add_edge("node_a" , "node_b" ) builder.add_edge("node_b" , "end" ) def fan_out_node (state: AgentState ): """将任务分发给多个并行Node""" return [ Send("processor" , {"task" : task_data}) for task_data in state["pending_tasks" ] ] def processor_node (state: AgentState ): """处理单个任务""" result = process_task(state["task" ]) return {"results" : [result]} def aggregate_node (state: AgentState ): """聚合并行结果""" all_results = state.get("results" , []) return {"final_result" : combine_results(all_results)} builder = StateGraph(AgentState) builder.add_node("fan_out" , fan_out_node) builder.add_node("processor" , processor_node) builder.add_node("aggregate" , aggregate_node) builder.set_entry_point("fan_out" ) builder.add_conditional_edges("fan_out" , fan_out_node) builder.add_edge("processor" , "aggregate" ) builder.add_edge("aggregate" , END)
5.3 流式响应优化 对于实时交互场景,流式响应比等待完整结果更好:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @app.post("/chat/stream" ) async def chat_stream (request: ChatRequest ): """流式响应端点""" async def generate (): async for chunk in graph.astream( {"messages" : [HumanMessage(content=request.message)]}, config={"thread_id" : request.conversation_id} ): for node_name, update in chunk.items(): if "messages" in update: for msg in update["messages" ]: if hasattr (msg, "content" ): yield f"data: {json.dumps({'content' : msg.content} )}\n\n" yield "data: [DONE]\n\n" return StreamingResponse( generate(), media_type="text/event-stream" )
5.4 连接池配置 数据库和HTTP连接池是性能关键:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 from psycopg_pool import ConnectionPoolpg_pool = ConnectionPool( conninfo="postgresql://user:pass@localhost/db" , min_size=5 , max_size=20 , max_idle=300 , max_lifetime=3600 ) checkpointer = PostgresSaver(pg_pool) import aiohttpsession = aiohttp.ClientSession( connector=aiohttp.TCPConnector( limit=100 , limit_per_host=30 , ttl_dns_cache=300 , use_dns_cache=True ), timeout=aiohttp.ClientTimeout(total=30 ) )
5.5 性能监控指标 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 import timefrom dataclasses import dataclassfrom typing import Dict @dataclass class PerformanceMetrics : total_requests: int = 0 total_latency_ms: float = 0 error_count: int = 0 token_count: int = 0 @property def avg_latency_ms (self ): if self .total_requests == 0 : return 0 return self .total_latency_ms / self .total_requests @property def error_rate (self ): if self .total_requests == 0 : return 0 return self .error_count / self .total_requests metrics = PerformanceMetrics() async def tracked_invoke (graph, state, config ): """带性能追踪的Graph调用""" start = time.time() metrics.total_requests += 1 try : result = await graph.ainvoke(state, config) if "usage" in result: metrics.token_count += result["usage" ].get("total_tokens" , 0 ) return result except Exception as e: metrics.error_count += 1 raise finally : latency = (time.time() - start) * 1000 metrics.total_latency_ms += latency from prometheus_client import Counter, Histogram, generate_latestREQUEST_COUNT = Counter('langgraph_requests_total' , 'Total requests' ) LATENCY_HISTOGRAM = Histogram('langgraph_latency_seconds' , 'Request latency' ) @app.get("/metrics" ) def metrics_endpoint (): return Response(generate_latest(), media_type="text/plain" )
六、部署架构:Docker/K8s配置示例 6.1 最小化Docker部署 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 FROM python:3.11 -slimWORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app USER appuserEXPOSE 8000 HEALTHCHECK --interval=30s --timeout =10s --start-period=5s --retries=3 \ CMD python -c "import requests; requests.get('http://localhost:8000/health')" CMD ["uvicorn" , "main:app" , "--host" , "0.0.0.0" , "--port" , "8000" , "--workers" , "4" ]
1 2 3 4 5 6 7 8 9 # requirements.txt langgraph>=0.2.0 langchain-openai>=0.1.0 psycopg>=3.0.0 redis>=5.0.0 fastapi>=0.110.0 uvicorn[standard]>=0.27.0 pydantic>=2.0.0 prometheus-client>=0.19.0
6.2 Docker Compose完整栈 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 version: '3.8' services: app: build: . ports: - "8000:8000" environment: - DATABASE_URL=postgresql://langgraph:password@postgres:5432/langgraph - REDIS_URL=redis://redis:6379 - OPENAI_API_KEY=${OPENAI_API_KEY} - LANGSMITH_API_KEY=${LANGSMITH_API_KEY} depends_on: postgres: condition: service_healthy redis: condition: service_healthy deploy: replicas: 2 resources: limits: cpus: '2' memory: 4G reservations: cpus: '0.5' memory: 1G healthcheck: test: ["CMD" , "curl" , "-f" , "http://localhost:8000/health" ] interval: 30s timeout: 10s retries: 3 postgres: image: postgres:16-alpine environment: POSTGRES_DB: langgraph POSTGRES_USER: langgraph POSTGRES_PASSWORD: password volumes: - postgres_data:/var/lib/postgresql/data healthcheck: test: ["CMD-SHELL" , "pg_isready -U langgraph -d langgraph" ] interval: 10s timeout: 5s retries: 5 redis: image: redis:7-alpine volumes: - redis_data:/data healthcheck: test: ["CMD" , "redis-cli" , "ping" ] interval: 10s timeout: 5s retries: 5 nginx: image: nginx:alpine ports: - "80:80" volumes: - ./nginx.conf:/etc/nginx/nginx.conf:ro depends_on: - app volumes: postgres_data: redis_data:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 upstream app { server app:8000 ; } server { listen 80 ; location / { proxy_pass http://app; proxy_set_header Host $host ; proxy_set_header X-Real-IP $remote_addr ; proxy_http_version 1 .1 ; proxy_set_header Upgrade $http_upgrade ; proxy_set_header Connection "upgrade" ; } }
6.3 Kubernetes部署 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 apiVersion: v1 kind: Namespace metadata: name: langgraph --- apiVersion: v1 kind: ConfigMap metadata: name: langgraph-config namespace: langgraph data: DATABASE_URL: "postgresql://langgraph:password@postgres:5432/langgraph" REDIS_URL: "redis://redis:6379" --- apiVersion: v1 kind: Secret metadata: name: langgraph-secrets namespace: langgraph type: Opaque stringData: OPENAI_API_KEY: "sk-xxx" LANGSMITH_API_KEY: "ls-xxx" --- apiVersion: apps/v1 kind: Deployment metadata: name: langgraph-app namespace: langgraph spec: replicas: 3 selector: matchLabels: app: langgraph template: metadata: labels: app: langgraph spec: containers: - name: app image: your-registry/langgraph-app:latest ports: - containerPort: 8000 envFrom: - configMapRef: name: langgraph-config - secretRef: name: langgraph-secrets resources: requests: memory: "1Gi" cpu: "500m" limits: memory: "4Gi" cpu: "2000m" livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /ready port: 8000 initialDelaySeconds: 5 periodSeconds: 5 --- apiVersion: v1 kind: Service metadata: name: langgraph-service namespace: langgraph spec: selector: app: langgraph ports: - port: 80 targetPort: 8000 type: ClusterIP --- apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: langgraph-hpa namespace: langgraph spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: langgraph-app minReplicas: 3 maxReplicas: 20 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Resource resource: name: memory target: type: Utilization averageUtilization: 80
6.4 部署检查清单
七、系列总结与进阶路线图 7.1 本系列回顾 呼~ 写到这里,LangGraph零基础入门系列就告一段落了。让我们回顾一下这十篇的内容:
篇目
主题
核心内容
01
LangGraph小白指南
Graph基础、Node/Edge概念
02
State与Schema设计
TypedDict、状态管理最佳实践
03
Node深度解析
各种Node类型、工具集成
04
Edge与路由
条件边、动态路由
05
Memory记忆系统
短期/长期记忆、Checkpoint
06
人机协作(HITL)
interrupt、人工审核、编辑
07
多智能体协作
Multi-Agent架构、协作模式
08
流式架构
Streaming、实时反馈
09
条件边指南
复杂路由逻辑
10
生产部署
持久化、可观测、部署架构
7.2 从入门到精通的进阶路线 如果你已经掌握了以上内容,接下来的进阶方向:
架构层面:
学习Multi-Agent框架(AutoGen、CrewAI对比)
深入图数据库(Neo4j)用于复杂知识图谱
事件驱动架构(Event Sourcing)
工程层面:
学习ML Ops(模型版本管理、A/B测试)
深入可观测性(OpenTelemetry、自定义指标)
安全与合规(提示注入防护、数据隐私)
算法层面:
学习ReAct、Plan-and-Solve等高级推理模式
研究RAG优化(重排序、查询改写、多路召回)
探索Function Calling的边界
7.3 写在最后 呐,AI Agent的世界变化太快了,LangGraph还在快速迭代。但这个系列的核心目标不是教你每一个API的用法,而是帮你建立系统性的思维框架 :
State怎么设计?
Node如何拆分?
什么时候用条件边?
怎么保证可靠性?
这些问题的答案比具体代码更有生命力。
记住:
“Agent不是目的,解决问题才是。”
唔,祝你在AI Agent的征途上越走越远!有问题随时来聊。
参考资源
本文是LangGraph零基础入门系列的第10篇(完结篇),首发于C哥的技术博客 。转载请联系作者获取授权。