适合人群 :完全不懂 LangGraph 的小白,有一定 Python 基础即可阅读时长 :约 15 分钟代码难度 :⭐⭐⭐☆☆
引言:为什么需要流式输出? 想象一下这个场景:你正在使用一个 AI 代码助手,输入”帮我写一个 Python 爬虫”,然后…页面就开始”转圈圈”。等了 10 秒、20 秒、30 秒,你的耐心正在消耗,开始怀疑是不是程序卡死了。终于,一大段代码”啪”地一下全部出现在屏幕上。
这就是非流式输出 的体验——黑盒等待 + 结果突袭 。
流式输出的价值 :就像 ChatGPT 那样,字一个一个”蹦”出来,用户知道系统在干活,可以随时打断,体验完全不在一个维度。
在 LangGraph 中,这个问题更严重。因为 LangGraph 是多步骤的 Agent 编排框架 ,一个请求可能要经过:
意图识别节点
工具调用节点
数据检索节点
结果生成节点
如果是非流式,用户要等所有节点都执行完 才能看到结果。而流式输出可以让用户:
实时看到每个节点的执行状态
渐进式接收 AI 生成的内容
随时中断不合适的请求
本文将带你从零开始,彻底掌握 LangGraph 的 Streaming 机制。
一、LangGraph Streaming 核心概念 1.1 什么是 Streaming Mode? LangGraph 提供了五种 Streaming Mode,对应不同的输出粒度:
Mode
输出粒度
适用场景
values
完整状态快照
需要查看每次状态变化后的全貌
updates
增量更新
只关心”什么变了”,性能最优
messages
消息令牌流
AI 回复逐字显示,用户体验最佳
custom
自定义事件
需要发送业务相关的自定义通知
debug
调试信息
开发调试,查看内部执行细节
重要区别 :前四种(values/updates/messages/custom)是输出流 ,debug 是调试流 ,两者可以独立开启。
1.2 基础用法:一行代码开启流式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 from langgraph.graph import StateGraph, ENDfrom typing import TypedDictclass State (TypedDict ): message: str count: int builder = StateGraph(State) def node_a (state: State ): return {"message" : state["message" ] + "[A处理]" , "count" : state["count" ] + 1 } def node_b (state: State ): return {"message" : state["message" ] + "[B处理]" , "count" : state["count" ] + 10 } builder.add_node("node_a" , node_a) builder.add_node("node_b" , node_b) builder.set_entry_point("node_a" ) builder.add_edge("node_a" , "node_b" ) builder.add_edge("node_b" , END) graph = builder.compile () for event in graph.stream( {"message" : "开始" , "count" : 0 }, stream_mode="values" ): print (f"收到事件: {event} " )
输出结果 :
1 2 3 收到事件: {'message': '开始', 'count': 0} 收到事件: {'message': '开始[A处理]', 'count': 1} 收到事件: {'message': '开始[A处理][B处理]', 'count': 11}
看到了吗?每个节点的执行都会产生一个事件,我们可以实时追踪状态变化 !
二、五种 Stream Mode 详解 2.1 values 模式:完整状态快照 values 模式每次返回当前图的完整状态 ,就像给状态拍了一张照片。
1 2 3 4 5 6 7 for event in graph.stream( {"message" : "你好" , "count" : 0 }, stream_mode="values" ): print ("=" * 40 ) print (f"当前完整状态: {event} " )
适用场景 :
需要查看每个节点执行后的完整上下文
状态数据结构复杂,需要整体理解
前端需要渲染完整的状态视图
优点 :信息完整,无歧义缺点 :数据量大时传输成本高
2.2 updates 模式:增量更新 updates 模式只返回本次节点产生的变更 ,类似 Git 的 diff。
1 2 3 4 5 6 for event in graph.stream( {"message" : "你好" , "count" : 0 }, stream_mode="updates" ): print (f"本次更新: {event} " )
输出结果 :
1 2 本次更新: {'node_a': {'message': '你好[A处理]', 'count': 1}} 本次更新: {'node_b': {'message': '你好[A处理][B处理]', 'count': 11}}
对比 values 模式,updates 模式多了一个节点名称的 key ,并且只返回该节点产生的变更。
适用场景 :
网络带宽敏感的场景
前端只需要局部更新的场景
高频、大状态的数据流
优点 :传输高效,网络友好缺点 :需要合并更新才能得到完整状态
2.3 messages 模式:令牌级流式 这是用户体验最好 的模式!它会把 AI 模型的输出拆分成一个个 Token(词片段),实现”打字机效果”。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from langchain_openai import ChatOpenAIfrom langgraph.graph import MessagesStatellm = ChatOpenAI(model="gpt-4o-mini" ) def chat_node (state: MessagesState ): return {"messages" : [llm.invoke(state["messages" ])]} builder = StateGraph(MessagesState) builder.add_node("chat" , chat_node) builder.set_entry_point("chat" ) builder.add_edge("chat" , END) graph = builder.compile () for chunk in graph.stream( {"messages" : [{"role" : "user" , "content" : "讲一个短笑话" }]}, stream_mode="messages" ): print (chunk.content, end="" , flush=True )
效果 :你会看到文字一个字一个字地出现,就像 ChatGPT 那样!
messages 模式的魔法 :LangGraph 会自动识别 LLM 的流式输出能力,无需额外配置。只要 LLM 支持流式(如 OpenAI、Claude、本地模型等),就能享受这个体验。
2.4 custom 模式:自定义事件 有时候,我们需要发送业务相关的自定义通知 ,比如”正在搜索数据库”、”已找到 5 条结果”。这时候就需要 custom 模式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from langgraph.types import StreamWriterdef search_node (state: dict , writer: StreamWriter ): writer({"type" : "status" , "message" : "正在连接数据库..." }) import time time.sleep(1 ) writer({"type" : "status" , "message" : "查询中,已找到 3 条记录" }) time.sleep(1 ) writer({"type" : "progress" , "percent" : 50 }) time.sleep(0.5 ) return {"results" : ["记录1" , "记录2" , "记录3" ]} for chunk in graph.stream( {"query" : "Python教程" }, stream_mode=["custom" , "values" ] ): if chunk[0 ] == "custom" : custom_data = chunk[1 ] print (f"[状态更新] {custom_data} " ) else : print (f"[状态变更] {chunk[1 ]} " )
自定义事件的优势 :
可以给前端发送进度条、状态提示
业务逻辑和状态变更解耦
用户体验更友好
2.5 debug 模式:开发调试利器 debug 模式会输出图执行的详细调试信息 ,包括:
每个节点的开始/结束时间
输入输出数据
检查点(checkpoint)信息
并行执行的详细日志
1 2 3 4 5 6 7 8 9 10 for chunk in graph.stream( {"message" : "测试" }, stream_mode=["values" , "debug" ] ): if chunk[0 ] == "debug" : debug_info = chunk[1 ] print (f"[DEBUG] 节点 {debug_info['node' ]} 执行耗时: {debug_info['duration' ]} ms" ) else : print (f"[状态] {chunk[1 ]} " )
debug 模式适用场景 :
排查执行异常
性能分析(查看每个节点的耗时)
理解图的执行流程
三、前端集成:WebSocket vs SSE 流式输出最终要在前端展示,常见两种方案:
3.1 Server-Sent Events (SSE) SSE 是单向 的服务器推送技术,适合”服务器 → 客户端”的流式场景。
优点 :
基于 HTTP,穿透性好(防火墙友好)
自动重连机制
实现简单
缺点 :
单向通信(只能服务器推,不能客户端发送)
浏览器有连接数限制(HTTP/1.1 下每域 6 个)
FastAPI + SSE 示例 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 from fastapi import FastAPIfrom fastapi.responses import StreamingResponsefrom langgraph.graph import StateGraph, MessagesStatefrom langchain_openai import ChatOpenAIapp = FastAPI() llm = ChatOpenAI(model="gpt-4o-mini" ) def chat_node (state: MessagesState ): return {"messages" : [llm.invoke(state["messages" ])]} builder = StateGraph(MessagesState) builder.add_node("chat" , chat_node) builder.set_entry_point("chat" ) builder.add_edge("chat" , END) graph = builder.compile () @app.post("/chat" ) async def chat_stream (request: dict ): """SSE 流式接口""" async def generate (): messages = request.get("messages" , []) async for chunk in graph.astream( {"messages" : messages}, stream_mode="messages" ): yield f"data: {json.dumps({'content' : chunk.content} )}\n\n" yield "data: [DONE]\n\n" return StreamingResponse( generate(), media_type="text/event-stream" , headers={ "Cache-Control" : "no-cache" , "Connection" : "keep-alive" , } )
前端 JavaScript 接收 :
1 2 3 4 5 6 7 8 9 10 11 const eventSource = new EventSource ('/chat' );eventSource.onmessage = (event ) => { if (event.data === '[DONE]' ) { eventSource.close (); return ; } const data = JSON .parse (event.data ); document .getElementById ('output' ).innerHTML += data.content ; };
3.2 WebSocket WebSocket 是双向 全双工通信,适合需要频繁交互的场景。
优点 :
缺点 :
实现复杂度更高
某些代理/防火墙可能阻止 WebSocket
FastAPI + WebSocket 示例 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from fastapi import FastAPI, WebSocketapp = FastAPI() @app.websocket("/ws/chat" ) async def websocket_chat (websocket: WebSocket ): await websocket.accept() try : while True : data = await websocket.receive_json() messages = data.get("messages" , []) async for chunk in graph.astream( {"messages" : messages}, stream_mode="messages" ): await websocket.send_json({ "type" : "token" , "content" : chunk.content }) await websocket.send_json({"type" : "done" }) except Exception as e: await websocket.send_json({"type" : "error" , "message" : str (e)}) finally : await websocket.close()
3.3 如何选择?
场景
推荐方案
理由
纯 AI 对话
SSE
简单、稳定、自动重连
需要中断/修改生成
WebSocket
双向通信,可发送停止指令
多模态交互(语音/图片)
WebSocket
双向传输更灵活
高并发服务
SSE
HTTP 连接池管理更成熟
四、Token级流式 vs 事件级流式 这是 Streaming 的两个维度,容易混淆,这里彻底讲清楚:
4.1 Token级流式(最细粒度)
粒度 :AI 模型生成的一个个 Token(约 0.75 个英文单词)
来源 :stream_mode="messages"
效果 :字逐字出现,打字机效果
延迟 :毫秒级
适用 :AI 对话、代码生成、文章创作
4.2 事件级流式(粗粒度)
粒度 :LangGraph 节点的执行事件
来源 :stream_mode="values" / "updates"
效果 :节点执行完触发一次更新
延迟 :秒级(取决于节点执行时间)
适用 :多步骤 Agent、工作流可视化、进度追踪
4.3 两者结合:最佳实践 实际项目中,通常需要同时启用两种流式 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 async for chunk in graph.astream( input_state, stream_mode=["messages" , "updates" ] ): mode, data = chunk[0 ], chunk[1 ] if mode == "messages" : yield f"data: {json.dumps({'type' : 'token' , 'content' : data.content} )}\n\n" elif mode == "updates" : node_name = list (data.keys())[0 ] yield f"data: {json.dumps({'type' : 'status' , 'node' : node_name} )}\n\n"
前端效果 :
顶部显示:”🔄 正在执行:搜索节点”
内容区:AI 回复逐字出现
节点切换时:状态提示更新
五、实战:实时显示的代码生成助手 现在,我们用一个完整的实战项目来串讲所有知识点。
5.1 项目需求 构建一个代码生成助手 ,功能包括:
用户输入需求描述
AI 分析需求 → 生成代码
实时显示 思考过程和生成的代码
使用 Web 界面展示
5.2 完整代码 项目结构 :
1 2 3 4 5 6 code-assistant/ ├── app.py # FastAPI 后端 ├── graph.py # LangGraph 定义 ├── static/ │ └── index.html # 前端页面 └── requirements.txt
graph.py - LangGraph 核心 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 """ 代码生成助手的 LangGraph 定义 """ from typing import TypedDict, List , Annotatedfrom langgraph.graph import StateGraph, ENDfrom langchain_openai import ChatOpenAIfrom langchain_core.messages import SystemMessage, HumanMessage, AIMessageimport osclass CodeState (TypedDict ): messages: List requirements: str analysis: str code: str language: str llm = ChatOpenAI( model="gpt-4o-mini" , temperature=0.2 , streaming=True ) def analyze_requirements (state: CodeState ): """需求分析节点""" prompt = f"""分析以下编程需求,提取关键信息: 需求:{state['requirements' ]} 请分析: 1. 应该使用什么编程语言? 2. 核心功能是什么? 3. 需要注意什么边界情况? 用简洁的语言回答。""" response = llm.invoke([HumanMessage(content=prompt)]) return { "messages" : [AIMessage(content=response.content)], "analysis" : response.content } def generate_code (state: CodeState ): """代码生成节点""" prompt = f"""根据以下分析生成完整代码: 需求:{state['requirements' ]} 分析:{state['analysis' ]} 要求: 1. 代码必须完整可运行 2. 包含必要的注释 3. 处理边界情况 4. 使用最佳实践 直接输出代码,不要解释。""" response = llm.invoke([HumanMessage(content=prompt)]) return { "messages" : [AIMessage(content=response.content)], "code" : response.content } builder = StateGraph(CodeState) builder.add_node("analyze" , analyze_requirements) builder.add_node("generate" , generate_code) builder.set_entry_point("analyze" ) builder.add_edge("analyze" , "generate" ) builder.add_edge("generate" , END) graph = builder.compile () async def stream_code_generation (requirements: str ): """ 流式生成代码,yield 各种事件 """ initial_state = { "messages" : [HumanMessage(content=requirements)], "requirements" : requirements, "analysis" : "" , "code" : "" , "language" : "" } async for chunk in graph.astream( initial_state, stream_mode=["messages" , "updates" ] ): mode, data = chunk if mode == "updates" : node_name = list (data.keys())[0 ] yield { "type" : "node_start" , "node" : node_name, "message" : f"正在执行:{node_name} ..." } elif mode == "messages" : yield { "type" : "token" , "content" : data.content }
app.py - FastAPI 后端 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 """ 代码生成助手 - FastAPI 后端 """ from fastapi import FastAPIfrom fastapi.responses import StreamingResponse, HTMLResponsefrom fastapi.staticfiles import StaticFilesfrom fastapi.middleware.cors import CORSMiddlewareimport jsonimport asynciofrom graph import stream_code_generationapp = FastAPI(title="代码生成助手" ) app.add_middleware( CORSMiddleware, allow_origins=["*" ], allow_methods=["*" ], allow_headers=["*" ], ) app.mount("/static" , StaticFiles(directory="static" ), name="static" ) @app.get("/" , response_class=HTMLResponse ) async def root (): """返回主页""" return """ <!DOCTYPE html> <html> <head> <title>AI 代码生成助手</title> <meta charset="utf-8"> <style> * { margin: 0; padding: 0; box-sizing: border-box; } body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); min-height: 100vh; padding: 20px; } .container { max-width: 900px; margin: 0 auto; } h1 { color: white; text-align: center; margin-bottom: 30px; } .input-section { background: white; padding: 20px; border-radius: 12px; margin-bottom: 20px; box-shadow: 0 4px 6px rgba(0,0,0,0.1); } textarea { width: 100%; height: 100px; padding: 12px; border: 2px solid #e0e0e0; border-radius: 8px; font-size: 14px; resize: vertical; } button { width: 100%; padding: 14px; margin-top: 12px; background: #667eea; color: white; border: none; border-radius: 8px; font-size: 16px; cursor: pointer; transition: background 0.3s; } button:hover { background: #5568d3; } button:disabled { background: #ccc; cursor: not-allowed; } .output-section { background: white; padding: 20px; border-radius: 12px; box-shadow: 0 4px 6px rgba(0,0,0,0.1); min-height: 200px; } .status { padding: 10px 15px; background: #f0f0f0; border-radius: 6px; margin-bottom: 15px; font-size: 14px; color: #666; } .status.active { background: #e3f2fd; color: #1976d2; animation: pulse 1.5s infinite; } @keyframes pulse { 0%, 100% { opacity: 1; } 50% { opacity: 0.7; } } .output { background: #1e1e1e; color: #d4d4d4; padding: 20px; border-radius: 8px; font-family: 'Consolas', 'Monaco', monospace; font-size: 14px; line-height: 1.6; white-space: pre-wrap; word-wrap: break-word; min-height: 150px; } .output:empty::before { content: "生成的代码将显示在这里..."; color: #666; } </style> </head> <body> <div class="container"> <h1>🚀 AI 代码生成助手</h1> <div class="input-section"> <textarea id="requirements" placeholder="描述你的编程需求,例如:写一个 Python 函数,计算斐波那契数列的第 n 项,要求使用递归并添加缓存优化..."></textarea> <button id="generateBtn" onclick="generateCode()">生成代码</button> </div> <div class="output-section"> <div class="status" id="status">等待输入...</div> <div class="output" id="output"></div> </div> </div> <script> async function generateCode() { const requirements = document.getElementById('requirements').value; const statusEl = document.getElementById('status'); const outputEl = document.getElementById('output'); const btn = document.getElementById('generateBtn'); if (!requirements.trim()) { alert('请输入需求描述'); return; } // 重置状态 outputEl.textContent = ''; btn.disabled = true; statusEl.className = 'status active'; try { const response = await fetch('/generate', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ requirements }) }); const reader = response.body.getReader(); const decoder = new TextDecoder(); while (true) { const { done, value } = await reader.read(); if (done) break; const lines = decoder.decode(value).split('\\n'); for (const line of lines) { if (line.startsWith('data: ')) { const data = JSON.parse(line.slice(6)); if (data.type === 'node_start') { statusEl.textContent = data.message; } else if (data.type === 'token') { outputEl.textContent += data.content; statusEl.textContent = '生成中...'; } else if (data.type === 'done') { statusEl.className = 'status'; statusEl.textContent = '生成完成!'; } } } } } catch (error) { statusEl.textContent = '生成失败:' + error.message; statusEl.style.background = '#ffebee'; statusEl.style.color = '#c62828'; } finally { btn.disabled = false; } } </script> </body> </html> """ @app.post("/generate" ) async def generate (request: dict ): """SSE 流式接口""" requirements = request.get("requirements" , "" ) async def event_stream (): async for event in stream_code_generation(requirements): yield f"data: {json.dumps(event)} \n\n" await asyncio.sleep(0.01 ) yield "data: {\"type\": \"done\"}\n\n" return StreamingResponse( event_stream(), media_type="text/event-stream" , headers={ "Cache-Control" : "no-cache" , "Connection" : "keep-alive" , } ) if __name__ == "__main__" : import uvicorn uvicorn.run(app, host="0.0.0.0" , port=8000 )
requirements.txt :
1 2 3 4 5 fastapi uvicorn langgraph langchain-openai python-dotenv
5.3 运行项目 1 2 3 4 5 6 7 8 9 10 pip install -r requirements.txt export OPENAI_API_KEY="your-api-key" python app.py
5.4 效果演示 用户界面会展示:
状态栏 :实时显示”正在执行:analyze…” → “正在执行:generate…”
代码区 :代码逐字出现,打字机效果
完成提示 :生成完毕后显示”生成完成!”
这就是流式输出的魅力 ——用户不再面对”转圈圈”的黑盒,而是清楚地看到 AI 的思考过程和内容生成过程,体验提升不止一个档次!
六、常见问题与最佳实践 6.1 常见问题 Q1: messages 模式没有逐字输出?
确保 LLM 的 streaming=True:
1 llm = ChatOpenAI(model="gpt-4" , streaming=True )
Q2: 流式输出乱序?
异步环境下,确保使用 async for 而不是 for:
1 2 3 4 5 6 7 async for chunk in graph.astream(...): ... for chunk in graph.stream(...): ...
Q3: 前端收不到 SSE 事件?
检查 MIME type 和 headers:
1 2 3 4 5 return StreamingResponse( generate(), media_type="text/event-stream" , headers={"Cache-Control" : "no-cache" } )
6.2 最佳实践
组合使用多种模式
1 stream_mode=["messages" , "updates" , "custom" ]
区分用户可见和系统事件
messages → 直接展示给用户
updates → 用于进度提示
custom → 业务状态通知
添加超时和错误处理
1 2 3 4 5 try : async for chunk in graph.astream(...): yield chunk except Exception as e: yield {"type" : "error" , "message" : str (e)}
前端防抖 高频 Token 流可能导致页面卡顿,使用 requestAnimationFrame 或节流函数优化。
七、总结与下篇预告 7.1 本文总结 通过本文,你学习了:
✅ 五种 Stream Mode 的区别与使用场景
values:完整状态,适合需要全貌的场景
updates:增量更新,网络友好
messages:Token 级流式,用户体验最佳
custom:自定义事件,业务解耦
debug:调试利器
✅ 前端集成方案
SSE:简单、HTTP 友好、自动重连
WebSocket:双向、低延迟、更灵活
✅ 实战项目
完整的代码生成助手
双模式流式(messages + updates)
FastAPI + SSE + 前端界面
7.2 下篇预告 《LangGraph 人机协作:Human-in-the-Loop 详解》
将讲解如何在 LangGraph 中实现:
人工审批节点(审批 AI 的操作)
人工输入节点(AI 主动询问用户)
编辑状态后恢复执行
构建真正可靠的 AI Agent
敬请期待!
参考资源
如果本文对你有帮助,欢迎点赞收藏!有任何问题可以在评论区留言,我会第一时间回复。