适合人群:完全不懂 LangGraph 的小白,有一定 Python 基础即可
阅读时长:约 15 分钟
代码难度:⭐⭐⭐☆☆

引言:为什么需要流式输出?

想象一下这个场景:你正在使用一个 AI 代码助手,输入”帮我写一个 Python 爬虫”,然后…页面就开始”转圈圈”。等了 10 秒、20 秒、30 秒,你的耐心正在消耗,开始怀疑是不是程序卡死了。终于,一大段代码”啪”地一下全部出现在屏幕上。

这就是非流式输出的体验——黑盒等待 + 结果突袭

流式输出的价值:就像 ChatGPT 那样,字一个一个”蹦”出来,用户知道系统在干活,可以随时打断,体验完全不在一个维度。

在 LangGraph 中,这个问题更严重。因为 LangGraph 是多步骤的 Agent 编排框架,一个请求可能要经过:

  • 意图识别节点
  • 工具调用节点
  • 数据检索节点
  • 结果生成节点

如果是非流式,用户要等所有节点都执行完才能看到结果。而流式输出可以让用户:

  1. 实时看到每个节点的执行状态
  2. 渐进式接收 AI 生成的内容
  3. 随时中断不合适的请求

本文将带你从零开始,彻底掌握 LangGraph 的 Streaming 机制。


一、LangGraph Streaming 核心概念

1.1 什么是 Streaming Mode?

LangGraph 提供了五种 Streaming Mode,对应不同的输出粒度:

Mode 输出粒度 适用场景
values 完整状态快照 需要查看每次状态变化后的全貌
updates 增量更新 只关心”什么变了”,性能最优
messages 消息令牌流 AI 回复逐字显示,用户体验最佳
custom 自定义事件 需要发送业务相关的自定义通知
debug 调试信息 开发调试,查看内部执行细节

重要区别:前四种(values/updates/messages/custom)是输出流,debug 是调试流,两者可以独立开启。

1.2 基础用法:一行代码开启流式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from langgraph.graph import StateGraph, END
from typing import TypedDict

# 定义状态
class State(TypedDict):
message: str
count: int

# 创建图
builder = StateGraph(State)

# 添加节点(简单的示例节点)
def node_a(state: State):
return {"message": state["message"] + "[A处理]", "count": state["count"] + 1}

def node_b(state: State):
return {"message": state["message"] + "[B处理]", "count": state["count"] + 10}

builder.add_node("node_a", node_a)
builder.add_node("node_b", node_b)
builder.set_entry_point("node_a")
builder.add_edge("node_a", "node_b")
builder.add_edge("node_b", END)

# 编译图
graph = builder.compile()

# ========== 流式执行 ==========
# stream() 方法返回一个生成器,每次 yield 一个事件
for event in graph.stream(
{"message": "开始", "count": 0},
stream_mode="values" # 选择 streaming mode
):
print(f"收到事件: {event}")

输出结果

1
2
3
收到事件: {'message': '开始', 'count': 0}
收到事件: {'message': '开始[A处理]', 'count': 1}
收到事件: {'message': '开始[A处理][B处理]', 'count': 11}

看到了吗?每个节点的执行都会产生一个事件,我们可以实时追踪状态变化


二、五种 Stream Mode 详解

2.1 values 模式:完整状态快照

values 模式每次返回当前图的完整状态,就像给状态拍了一张照片。

1
2
3
4
5
6
7
# 使用 values 模式
for event in graph.stream(
{"message": "你好", "count": 0},
stream_mode="values"
):
print("=" * 40)
print(f"当前完整状态: {event}")

适用场景

  • 需要查看每个节点执行后的完整上下文
  • 状态数据结构复杂,需要整体理解
  • 前端需要渲染完整的状态视图

优点:信息完整,无歧义
缺点:数据量大时传输成本高


2.2 updates 模式:增量更新

updates 模式只返回本次节点产生的变更,类似 Git 的 diff。

1
2
3
4
5
6
# 使用 updates 模式
for event in graph.stream(
{"message": "你好", "count": 0},
stream_mode="updates"
):
print(f"本次更新: {event}")

输出结果

1
2
本次更新: {'node_a': {'message': '你好[A处理]', 'count': 1}}
本次更新: {'node_b': {'message': '你好[A处理][B处理]', 'count': 11}}

对比 values 模式,updates 模式多了一个节点名称的 key,并且只返回该节点产生的变更。

适用场景

  • 网络带宽敏感的场景
  • 前端只需要局部更新的场景
  • 高频、大状态的数据流

优点:传输高效,网络友好
缺点:需要合并更新才能得到完整状态


2.3 messages 模式:令牌级流式

这是用户体验最好的模式!它会把 AI 模型的输出拆分成一个个 Token(词片段),实现”打字机效果”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from langchain_openai import ChatOpenAI
from langgraph.graph import MessagesState

# 使用 MessagesState(内置 message 管理)
llm = ChatOpenAI(model="gpt-4o-mini")

def chat_node(state: MessagesState):
# 调用 LLM,LangGraph 会自动处理流式
return {"messages": [llm.invoke(state["messages"])]}

builder = StateGraph(MessagesState)
builder.add_node("chat", chat_node)
builder.set_entry_point("chat")
builder.add_edge("chat", END)

graph = builder.compile()

# 使用 messages 模式
for chunk in graph.stream(
{"messages": [{"role": "user", "content": "讲一个短笑话"}]},
stream_mode="messages" # 关键!启用消息流式
):
print(chunk.content, end="", flush=True) # 逐字输出

效果:你会看到文字一个字一个字地出现,就像 ChatGPT 那样!

messages 模式的魔法:LangGraph 会自动识别 LLM 的流式输出能力,无需额外配置。只要 LLM 支持流式(如 OpenAI、Claude、本地模型等),就能享受这个体验。


2.4 custom 模式:自定义事件

有时候,我们需要发送业务相关的自定义通知,比如”正在搜索数据库”、”已找到 5 条结果”。这时候就需要 custom 模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from langgraph.types import StreamWriter

def search_node(state: dict, writer: StreamWriter):
# 发送自定义事件
writer({"type": "status", "message": "正在连接数据库..."})

# 模拟数据库查询
import time
time.sleep(1)

writer({"type": "status", "message": "查询中,已找到 3 条记录"})
time.sleep(1)

writer({"type": "progress", "percent": 50})
time.sleep(0.5)

# 返回实际结果
return {"results": ["记录1", "记录2", "记录3"]}

# 在 stream 中接收自定义事件
for chunk in graph.stream(
{"query": "Python教程"},
stream_mode=["custom", "values"] # 同时接收 custom 和 values 事件
):
if chunk[0] == "custom":
# 处理自定义事件
custom_data = chunk[1]
print(f"[状态更新] {custom_data}")
else:
# 处理 values 事件
print(f"[状态变更] {chunk[1]}")

自定义事件的优势

  • 可以给前端发送进度条、状态提示
  • 业务逻辑和状态变更解耦
  • 用户体验更友好

2.5 debug 模式:开发调试利器

debug 模式会输出图执行的详细调试信息,包括:

  • 每个节点的开始/结束时间
  • 输入输出数据
  • 检查点(checkpoint)信息
  • 并行执行的详细日志
1
2
3
4
5
6
7
8
9
10
# 同时启用 values 和 debug 模式
for chunk in graph.stream(
{"message": "测试"},
stream_mode=["values", "debug"]
):
if chunk[0] == "debug":
debug_info = chunk[1]
print(f"[DEBUG] 节点 {debug_info['node']} 执行耗时: {debug_info['duration']}ms")
else:
print(f"[状态] {chunk[1]}")

debug 模式适用场景

  • 排查执行异常
  • 性能分析(查看每个节点的耗时)
  • 理解图的执行流程

三、前端集成:WebSocket vs SSE

流式输出最终要在前端展示,常见两种方案:

3.1 Server-Sent Events (SSE)

SSE 是单向的服务器推送技术,适合”服务器 → 客户端”的流式场景。

优点

  • 基于 HTTP,穿透性好(防火墙友好)
  • 自动重连机制
  • 实现简单

缺点

  • 单向通信(只能服务器推,不能客户端发送)
  • 浏览器有连接数限制(HTTP/1.1 下每域 6 个)

FastAPI + SSE 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langgraph.graph import StateGraph, MessagesState
from langchain_openai import ChatOpenAI

app = FastAPI()

# 构建 LangGraph(简化版)
llm = ChatOpenAI(model="gpt-4o-mini")

def chat_node(state: MessagesState):
return {"messages": [llm.invoke(state["messages"])]}

builder = StateGraph(MessagesState)
builder.add_node("chat", chat_node)
builder.set_entry_point("chat")
builder.add_edge("chat", END)
graph = builder.compile()

@app.post("/chat")
async def chat_stream(request: dict):
"""SSE 流式接口"""
async def generate():
messages = request.get("messages", [])

# 使用 astream 进行异步流式输出
async for chunk in graph.astream(
{"messages": messages},
stream_mode="messages"
):
# SSE 格式:data: {...}\n\n
yield f"data: {json.dumps({'content': chunk.content})}\n\n"

# 结束标记
yield "data: [DONE]\n\n"

return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)

前端 JavaScript 接收

1
2
3
4
5
6
7
8
9
10
11
const eventSource = new EventSource('/chat');

eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
return;
}
const data = JSON.parse(event.data);
// 逐字追加到页面
document.getElementById('output').innerHTML += data.content;
};

3.2 WebSocket

WebSocket 是双向全双工通信,适合需要频繁交互的场景。

优点

  • 双向通信
  • 低延迟
  • 无连接数限制

缺点

  • 实现复杂度更高
  • 某些代理/防火墙可能阻止 WebSocket

FastAPI + WebSocket 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await websocket.accept()

try:
while True:
# 接收用户消息
data = await websocket.receive_json()
messages = data.get("messages", [])

# 流式发送 AI 回复
async for chunk in graph.astream(
{"messages": messages},
stream_mode="messages"
):
await websocket.send_json({
"type": "token",
"content": chunk.content
})

# 发送完成标记
await websocket.send_json({"type": "done"})

except Exception as e:
await websocket.send_json({"type": "error", "message": str(e)})
finally:
await websocket.close()

3.3 如何选择?

场景 推荐方案 理由
纯 AI 对话 SSE 简单、稳定、自动重连
需要中断/修改生成 WebSocket 双向通信,可发送停止指令
多模态交互(语音/图片) WebSocket 双向传输更灵活
高并发服务 SSE HTTP 连接池管理更成熟

四、Token级流式 vs 事件级流式

这是 Streaming 的两个维度,容易混淆,这里彻底讲清楚:

4.1 Token级流式(最细粒度)

  • 粒度:AI 模型生成的一个个 Token(约 0.75 个英文单词)
  • 来源stream_mode="messages"
  • 效果:字逐字出现,打字机效果
  • 延迟:毫秒级

适用:AI 对话、代码生成、文章创作

4.2 事件级流式(粗粒度)

  • 粒度:LangGraph 节点的执行事件
  • 来源stream_mode="values" / "updates"
  • 效果:节点执行完触发一次更新
  • 延迟:秒级(取决于节点执行时间)

适用:多步骤 Agent、工作流可视化、进度追踪

4.3 两者结合:最佳实践

实际项目中,通常需要同时启用两种流式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 同时启用 messages 和 updates
async for chunk in graph.astream(
input_state,
stream_mode=["messages", "updates"] # 双模式!
):
mode, data = chunk[0], chunk[1]

if mode == "messages":
# Token级流式:实时显示 AI 输出
yield f"data: {json.dumps({'type': 'token', 'content': data.content})}\n\n"

elif mode == "updates":
# 事件级流式:显示节点执行状态
node_name = list(data.keys())[0]
yield f"data: {json.dumps({'type': 'status', 'node': node_name})}\n\n"

前端效果

  • 顶部显示:”🔄 正在执行:搜索节点”
  • 内容区:AI 回复逐字出现
  • 节点切换时:状态提示更新

五、实战:实时显示的代码生成助手

现在,我们用一个完整的实战项目来串讲所有知识点。

5.1 项目需求

构建一个代码生成助手,功能包括:

  1. 用户输入需求描述
  2. AI 分析需求 → 生成代码
  3. 实时显示思考过程和生成的代码
  4. 使用 Web 界面展示

5.2 完整代码

项目结构

1
2
3
4
5
6
code-assistant/
├── app.py # FastAPI 后端
├── graph.py # LangGraph 定义
├── static/
│ └── index.html # 前端页面
└── requirements.txt

graph.py - LangGraph 核心

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""
代码生成助手的 LangGraph 定义
"""
from typing import TypedDict, List, Annotated
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
import os

# 定义状态
class CodeState(TypedDict):
messages: List # 对话历史
requirements: str # 用户需求
analysis: str # 需求分析结果
code: str # 生成的代码
language: str # 编程语言

# 初始化 LLM
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.2,
streaming=True # 关键:启用流式
)

# ========== 节点定义 ==========

def analyze_requirements(state: CodeState):
"""需求分析节点"""
prompt = f"""分析以下编程需求,提取关键信息:

需求:{state['requirements']}

请分析:
1. 应该使用什么编程语言?
2. 核心功能是什么?
3. 需要注意什么边界情况?

用简洁的语言回答。"""

response = llm.invoke([HumanMessage(content=prompt)])

return {
"messages": [AIMessage(content=response.content)],
"analysis": response.content
}

def generate_code(state: CodeState):
"""代码生成节点"""
prompt = f"""根据以下分析生成完整代码:

需求:{state['requirements']}
分析:{state['analysis']}

要求:
1. 代码必须完整可运行
2. 包含必要的注释
3. 处理边界情况
4. 使用最佳实践

直接输出代码,不要解释。"""

response = llm.invoke([HumanMessage(content=prompt)])

return {
"messages": [AIMessage(content=response.content)],
"code": response.content
}

# ========== 构建图 ==========

builder = StateGraph(CodeState)

# 添加节点
builder.add_node("analyze", analyze_requirements)
builder.add_node("generate", generate_code)

# 定义流程
builder.set_entry_point("analyze")
builder.add_edge("analyze", "generate")
builder.add_edge("generate", END)

# 编译
graph = builder.compile()

# ========== 流式执行函数 ==========

async def stream_code_generation(requirements: str):
"""
流式生成代码,yield 各种事件
"""
initial_state = {
"messages": [HumanMessage(content=requirements)],
"requirements": requirements,
"analysis": "",
"code": "",
"language": ""
}

# 使用双模式流式
async for chunk in graph.astream(
initial_state,
stream_mode=["messages", "updates"]
):
mode, data = chunk

if mode == "updates":
# 节点执行事件
node_name = list(data.keys())[0]
yield {
"type": "node_start",
"node": node_name,
"message": f"正在执行:{node_name}..."
}

elif mode == "messages":
# Token 流式
yield {
"type": "token",
"content": data.content
}

app.py - FastAPI 后端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
"""
代码生成助手 - FastAPI 后端
"""
from fastapi import FastAPI
from fastapi.responses import StreamingResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
import json
import asyncio

from graph import stream_code_generation

app = FastAPI(title="代码生成助手")

# 跨域配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)

# 挂载静态文件
app.mount("/static", StaticFiles(directory="static"), name="static")

@app.get("/", response_class=HTMLResponse)
async def root():
"""返回主页"""
return """
<!DOCTYPE html>
<html>
<head>
<title>AI 代码生成助手</title>
<meta charset="utf-8">
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
min-height: 100vh;
padding: 20px;
}
.container {
max-width: 900px;
margin: 0 auto;
}
h1 {
color: white;
text-align: center;
margin-bottom: 30px;
}
.input-section {
background: white;
padding: 20px;
border-radius: 12px;
margin-bottom: 20px;
box-shadow: 0 4px 6px rgba(0,0,0,0.1);
}
textarea {
width: 100%;
height: 100px;
padding: 12px;
border: 2px solid #e0e0e0;
border-radius: 8px;
font-size: 14px;
resize: vertical;
}
button {
width: 100%;
padding: 14px;
margin-top: 12px;
background: #667eea;
color: white;
border: none;
border-radius: 8px;
font-size: 16px;
cursor: pointer;
transition: background 0.3s;
}
button:hover { background: #5568d3; }
button:disabled { background: #ccc; cursor: not-allowed; }
.output-section {
background: white;
padding: 20px;
border-radius: 12px;
box-shadow: 0 4px 6px rgba(0,0,0,0.1);
min-height: 200px;
}
.status {
padding: 10px 15px;
background: #f0f0f0;
border-radius: 6px;
margin-bottom: 15px;
font-size: 14px;
color: #666;
}
.status.active {
background: #e3f2fd;
color: #1976d2;
animation: pulse 1.5s infinite;
}
@keyframes pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.7; }
}
.output {
background: #1e1e1e;
color: #d4d4d4;
padding: 20px;
border-radius: 8px;
font-family: 'Consolas', 'Monaco', monospace;
font-size: 14px;
line-height: 1.6;
white-space: pre-wrap;
word-wrap: break-word;
min-height: 150px;
}
.output:empty::before {
content: "生成的代码将显示在这里...";
color: #666;
}
</style>
</head>
<body>
<div class="container">
<h1>🚀 AI 代码生成助手</h1>

<div class="input-section">
<textarea id="requirements" placeholder="描述你的编程需求,例如:写一个 Python 函数,计算斐波那契数列的第 n 项,要求使用递归并添加缓存优化..."></textarea>
<button id="generateBtn" onclick="generateCode()">生成代码</button>
</div>

<div class="output-section">
<div class="status" id="status">等待输入...</div>
<div class="output" id="output"></div>
</div>
</div>

<script>
async function generateCode() {
const requirements = document.getElementById('requirements').value;
const statusEl = document.getElementById('status');
const outputEl = document.getElementById('output');
const btn = document.getElementById('generateBtn');

if (!requirements.trim()) {
alert('请输入需求描述');
return;
}

// 重置状态
outputEl.textContent = '';
btn.disabled = true;
statusEl.className = 'status active';

try {
const response = await fetch('/generate', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ requirements })
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
const { done, value } = await reader.read();
if (done) break;

const lines = decoder.decode(value).split('\\n');

for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));

if (data.type === 'node_start') {
statusEl.textContent = data.message;
} else if (data.type === 'token') {
outputEl.textContent += data.content;
statusEl.textContent = '生成中...';
} else if (data.type === 'done') {
statusEl.className = 'status';
statusEl.textContent = '生成完成!';
}
}
}
}
} catch (error) {
statusEl.textContent = '生成失败:' + error.message;
statusEl.style.background = '#ffebee';
statusEl.style.color = '#c62828';
} finally {
btn.disabled = false;
}
}
</script>
</body>
</html>
"""

@app.post("/generate")
async def generate(request: dict):
"""SSE 流式接口"""
requirements = request.get("requirements", "")

async def event_stream():
async for event in stream_code_generation(requirements):
yield f"data: {json.dumps(event)}\n\n"
await asyncio.sleep(0.01) # 小延迟,避免阻塞

yield "data: {\"type\": \"done\"}\n\n"

return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)

if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

requirements.txt

1
2
3
4
5
fastapi
uvicorn
langgraph
langchain-openai
python-dotenv

5.3 运行项目

1
2
3
4
5
6
7
8
9
10
# 1. 安装依赖
pip install -r requirements.txt

# 2. 设置 OpenAI API Key
export OPENAI_API_KEY="your-api-key"

# 3. 运行
python app.py

# 4. 浏览器访问 http://localhost:8000

5.4 效果演示

用户界面会展示:

  1. 状态栏:实时显示”正在执行:analyze…” → “正在执行:generate…”
  2. 代码区:代码逐字出现,打字机效果
  3. 完成提示:生成完毕后显示”生成完成!”

这就是流式输出的魅力——用户不再面对”转圈圈”的黑盒,而是清楚地看到 AI 的思考过程和内容生成过程,体验提升不止一个档次!


六、常见问题与最佳实践

6.1 常见问题

Q1: messages 模式没有逐字输出?

确保 LLM 的 streaming=True

1
llm = ChatOpenAI(model="gpt-4", streaming=True)  # 关键!

Q2: 流式输出乱序?

异步环境下,确保使用 async for 而不是 for

1
2
3
4
5
6
7
# ✅ 正确
async for chunk in graph.astream(...):
...

# ❌ 错误(同步方法在异步环境)
for chunk in graph.stream(...):
...

Q3: 前端收不到 SSE 事件?

检查 MIME type 和 headers:

1
2
3
4
5
return StreamingResponse(
generate(),
media_type="text/event-stream", # 必须
headers={"Cache-Control": "no-cache"} # 推荐
)

6.2 最佳实践

  1. 组合使用多种模式

    1
    stream_mode=["messages", "updates", "custom"]
  2. 区分用户可见和系统事件

    • messages → 直接展示给用户
    • updates → 用于进度提示
    • custom → 业务状态通知
  3. 添加超时和错误处理

    1
    2
    3
    4
    5
    try:
    async for chunk in graph.astream(...):
    yield chunk
    except Exception as e:
    yield {"type": "error", "message": str(e)}
  4. 前端防抖
    高频 Token 流可能导致页面卡顿,使用 requestAnimationFrame 或节流函数优化。


七、总结与下篇预告

7.1 本文总结

通过本文,你学习了:

五种 Stream Mode 的区别与使用场景

  • values:完整状态,适合需要全貌的场景
  • updates:增量更新,网络友好
  • messages:Token 级流式,用户体验最佳
  • custom:自定义事件,业务解耦
  • debug:调试利器

前端集成方案

  • SSE:简单、HTTP 友好、自动重连
  • WebSocket:双向、低延迟、更灵活

实战项目

  • 完整的代码生成助手
  • 双模式流式(messages + updates)
  • FastAPI + SSE + 前端界面

7.2 下篇预告

《LangGraph 人机协作:Human-in-the-Loop 详解》

将讲解如何在 LangGraph 中实现:

  • 人工审批节点(审批 AI 的操作)
  • 人工输入节点(AI 主动询问用户)
  • 编辑状态后恢复执行
  • 构建真正可靠的 AI Agent

敬请期待!


参考资源


如果本文对你有帮助,欢迎点赞收藏!有任何问题可以在评论区留言,我会第一时间回复。