LangGraph Conditional Edges条件边:让流程”会思考”

如果你刚刚接触 LangGraph,你可能会被各种概念绕晕:State、Node、Edge、Checkpoint……这些东西到底在说什么?别急,今天我们不贪多,只聚焦一个核心概念——Conditional Edges(条件边)。它是让 LangGraph 从”死板流程”进化成”智能决策系统”的关键。

想象一下,你的 AI Agent 就像一个会思考的员工。面对不同的任务,它能自主决定下一步该做什么,而不是像传统程序那样只会按固定路线执行。这就是条件边的魅力所在。

本文会用最通俗的语言,带你从零理解条件边,并通过一个完整的智能客服路由系统实战案例,让你真正掌握这项技术。


一、什么是条件边?给流程装上”大脑”

1.1 传统流程 VS 智能流程

在我们深入之前,先来看一个生活中的例子。

假设你开了一家餐厅,顾客来点餐。传统程序(无条件的普通 Edge)就像这样:

1
顾客进店 → 推荐套餐A → 结账离开

不管顾客是素食主义者、过敏人群还是大胃王,都是同样的流程。这显然很蠢。

而条件边(Conditional Edge)就像一位经验丰富的服务员:

1
2
3
顾客进店 → [观察顾客特征] → 如果是素食者?推荐蔬菜套餐
→ 如果是过敏人群?询问过敏原
→ 如果是大胃王?推荐家庭套餐

关键区别:中间的”[观察顾客特征]”节点,会根据输入动态决定走哪条边。

1.2 代码世界的映射

在 LangGraph 中,这被形式化为:

1
2
3
4
5
6
7
8
9
# 普通边:死板,固定流向
graph.add_edge("node_a", "node_b") # 永远从A到B

# 条件边:智能,动态决策
graph.add_conditional_edges(
"classifier", # 来源节点
route_decision, # 决策函数:返回下一个节点的名称
{"sales": "sales_team", "tech": "tech_support", "billing": "billing_dept"}
)

route_decision 是一个路由函数,它接收当前 State,输出一个字符串(key),LangGraph 根据这个 key 从映射表中找到下一个节点。

1.3 条件边的核心能力

能力 说明 应用场景
动态路由 根据输入内容选择不同分支 客服分流、内容分类
循环控制 决定是继续迭代还是结束 质量检查、自我修正
渐进式决策 边执行边决策 流式输出、实时交互
条件终止 满足条件时提前结束 提前退出、错误处理

二、三种常见模式:分类器、质量门、循环优化

条件边虽然概念简单,但组合起来能实现非常强大的功能。下面介绍三种最常见的使用模式。

2.1 模式一:分类器路由(Classifier Routing)

这是最经典的用法。一个”分类”节点,根据内容将请求分发到不同的处理节点。

场景示例:智能客服系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def classify_intent(state: State) -> str:
"""根据用户问题,决定路由到哪个部门"""
query = state["query"].lower()

if any(word in query for word in ["价格", "费用", "账单", "付款"]):
return "billing" # 去账单部门
elif any(word in query for word in ["bug", "错误", "崩溃", "无法"]):
return "technical" # 去技术支持
elif any(word in query for word in ["购买", "升级", "套餐", "优惠"]):
return "sales" # 去销售部门
else:
return "general" # 去通用客服

# 构建条件边
graph.add_conditional_edges(
"classifier",
classify_intent,
{
"billing": "billing_agent",
"technical": "tech_agent",
"sales": "sales_agent",
"general": "general_agent"
}
)

核心思想:把决策逻辑封装在函数里,函数返回的字符串就是”路标”。

2.2 模式二:质量门(Quality Gate)

当 AI 的输出质量不确定时,可以用条件边实现”检查-修正”循环。

场景示例:代码生成器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def check_code_quality(state: State) -> str:
"""检查代码是否通过质量门槛"""
code = state["generated_code"]
tests_passed = state["test_results"]["passed"]
syntax_valid = state["syntax_check"]

# 如果测试全部通过且语法正确,直接结束
if tests_passed and syntax_valid:
return "finish"

# 如果还有改进空间,回到代码生成节点继续优化
# 但最多迭代3次,避免无限循环
if state["iteration_count"] < 3:
return "regenerate"
else:
return "finish" # 达到最大迭代次数,强制结束

graph.add_conditional_edges(
"code_reviewer",
check_code_quality,
{
"regenerate": "code_generator", # 循环回去继续改进
"finish": END # 流程结束
}
)

核心思想:条件边不只是”分发”,还可以实现”循环”。只要路由函数返回的节点在图中存在,流程就会按这个方向走。

2.3 模式三:渐进式决策(Progressive Decision)

结合 LangGraph 的 Streaming 能力,条件边可以实现”边执行边决策”。

场景示例:多轮对话 Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def decide_next_step(state: State) -> str:
"""根据对话状态决定下一步"""
messages = state["messages"]
last_message = messages[-1]

# 如果用户说"再见",结束对话
if any(word in last_message.content.lower() for word in ["再见", "拜拜", "结束"]):
return "end_conversation"

# 如果 AI 说需要调用工具
if last_message.tool_calls:
return "execute_tools"

# 如果对话正常进行,继续生成回复
return "continue_chat"

graph.add_conditional_edges(
"llm",
decide_next_step,
{
"end_conversation": END,
"execute_tools": "tool_executor",
"continue_chat": "llm" # 甚至可以循环回自己
}
)

核心思想:每次执行完一个节点,LangGraph 都会调用条件边函数”请示下一步”。这让整个流程具有了反应性


三、完整实战:智能客服路由系统

光说不练假把式。下面是一个完整可运行的智能客服系统代码。

3.1 系统架构

1
2
3
4
5
6
7
8
9
10
11
12
用户提问

意图分类器 (classifier) ←─┐
↓ │
┌───┴───┐ │
↓ ↓ │
销售 技术 │
部门 支持 │
↓ ↓ │
回复生成 (responder) ──────┘

输出给用户

3.2 完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
import os

# ============ 1. 定义状态 ============
class CustomerServiceState(TypedDict):
"""客服系统的状态定义"""
messages: Annotated[list, add_messages] # 对话历史
category: str # 问题分类结果
response: str # 最终回复

# ============ 2. 初始化 LLM ============
# 请确保设置了 OPENAI_API_KEY 环境变量
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.3)

# ============ 3. 定义节点函数 ============

def classify_query(state: CustomerServiceState) -> CustomerServiceState:
"""第一步:分类用户问题"""
messages = state["messages"]
last_message = messages[-1].content

# 构建分类提示
prompt = f"""请分析以下用户问题,将其分类为以下类别之一:
- sales: 购买、升级、套餐、价格相关问题
- technical: 技术故障、bug、使用问题
- billing: 账单、支付、退款问题
- general: 其他一般性问题

用户问题:{last_message}

只返回类别名称(sales/technical/billing/general),不要其他内容。"""

response = llm.invoke([HumanMessage(content=prompt)])
category = response.content.strip().lower()

# 确保类别有效
valid_categories = ["sales", "technical", "billing", "general"]
if category not in valid_categories:
category = "general"

print(f"🔍 问题分类结果: {category}")
return {"category": category}

def sales_agent(state: CustomerServiceState) -> CustomerServiceState:
"""销售部门:处理购买相关问题"""
messages = state["messages"]

system_prompt = """你是专业的销售顾问。你的任务是:
1. 热情介绍产品优势和价格方案
2. 根据用户需求推荐合适的套餐
3. 解答购买流程相关疑问
语气要友好、专业,有说服力。"""

full_messages = [HumanMessage(content=system_prompt)] + messages
response = llm.invoke(full_messages)

print(f"💰 销售部门处理中...")
return {"response": response.content}

def tech_agent(state: CustomerServiceState) -> CustomerServiceState:
"""技术支持:处理故障相关问题"""
messages = state["messages"]

system_prompt = """你是技术支持工程师。你的任务是:
1. 耐心倾听用户描述的问题
2. 提供清晰的故障排查步骤
3. 用通俗易懂的语言解释技术问题
语气要专业、耐心、乐于助人。"""

full_messages = [HumanMessage(content=system_prompt)] + messages
response = llm.invoke(full_messages)

print(f"🔧 技术支持处理中...")
return {"response": response.content}

def billing_agent(state: CustomerServiceState) -> CustomerServiceState:
"""账单部门:处理支付相关问题"""
messages = state["messages"]

system_prompt = """你是账单专员。你的任务是:
1. 清晰解释账单明细
2. 指导支付操作流程
3. 处理退款和发票相关问题
语气要准确、清晰、值得信赖。"""

full_messages = [HumanMessage(content=system_prompt)] + messages
response = llm.invoke(full_messages)

print(f"💳 账单部门处理中...")
return {"response": response.content}

def general_agent(state: CustomerServiceState) -> CustomerServiceState:
"""通用客服:处理其他问题"""
messages = state["messages"]

system_prompt = """你是通用客服代表。你的任务是:
1. 回答用户的一般性咨询
2. 如果问题超出范围,礼貌引导用户联系相关部门
3. 保持友好和乐于助人的态度"""

full_messages = [HumanMessage(content=system_prompt)] + messages
response = llm.invoke(full_messages)

print(f"📞 通用客服处理中...")
return {"response": response.content}

def format_response(state: CustomerServiceState) -> CustomerServiceState:
"""格式化最终回复"""
category_names = {
"sales": "销售顾问",
"technical": "技术支持",
"billing": "账单专员",
"general": "客服代表"
}

category = state.get("category", "general")
agent_name = category_names.get(category, "客服")
response = state["response"]

formatted = f"【{agent_name}】\n\n{response}\n\n---\n如有其他问题,随时告诉我!"

return {
"messages": [AIMessage(content=formatted)],
"response": formatted
}

# ============ 4. 定义路由函数 ============

def route_by_category(state: CustomerServiceState) -> Literal["sales", "technical", "billing", "general"]:
"""根据分类结果路由到不同部门"""
category = state.get("category", "general")

# 这里返回的字符串必须与 add_conditional_edges 中的映射 key 一致
if category in ["sales", "technical", "billing", "general"]:
return category
return "general"

# ============ 5. 构建图 ============

# 创建状态图
workflow = StateGraph(CustomerServiceState)

# 添加节点
workflow.add_node("classifier", classify_query)
workflow.add_node("sales", sales_agent)
workflow.add_node("technical", tech_agent)
workflow.add_node("billing", billing_agent)
workflow.add_node("general", general_agent)
workflow.add_node("responder", format_response)

# 设置入口点
workflow.set_entry_point("classifier")

# 添加条件边:分类器 → 各部门
workflow.add_conditional_edges(
"classifier",
route_by_category,
{
"sales": "sales",
"technical": "technical",
"billing": "billing",
"general": "general"
}
)

# 添加普通边:各部门 → 格式化器
workflow.add_edge("sales", "responder")
workflow.add_edge("technical", "responder")
workflow.add_edge("billing", "responder")
workflow.add_edge("general", "responder")

# 添加结束边
workflow.add_edge("responder", END)

# 编译图
app = workflow.compile()

# ============ 6. 运行示例 ============

def run_customer_service(query: str):
"""运行客服系统"""
print(f"\n{'='*50}")
print(f"用户问题: {query}")
print(f"{'='*50}\n")

# 初始化状态
initial_state = {
"messages": [HumanMessage(content=query)],
"category": "",
"response": ""
}

# 执行工作流
result = app.invoke(initial_state)

# 输出结果
print(f"\n{'='*50}")
print("最终回复:")
print(f"{'='*50}")
print(result["response"])
print()

return result

# ============ 7. 测试不同的场景 ============

if __name__ == "__main__":
# 测试场景1:销售问题
run_customer_service("我想了解一下你们的高级套餐多少钱?")

# 测试场景2:技术问题
run_customer_service("我的应用总是闪退,怎么办?")

# 测试场景3:账单问题
run_customer_service("这个月的账单扣错了,我要申请退款。")

# 测试场景4:一般问题
run_customer_service("你们公司什么时候成立的?")

3.3 代码解读

这段代码展示了条件边的完整使用流程:

  1. State 定义CustomerServiceState 定义了需要维护的状态
  2. 节点函数:每个部门是一个节点,负责特定的业务逻辑
  3. 路由函数route_by_category 根据分类结果返回不同的路由 key
  4. 条件边配置add_conditional_edges 将路由 key 映射到具体的节点
  5. 流程执行app.invoke() 启动整个流程

关键点:路由函数的返回值(如 "sales")必须与 add_conditional_edges 中映射字典的 key 完全匹配,否则 LangGraph 会报错。


四、条件边与 Streaming 的结合

4.1 为什么需要 Streaming?

传统的 invoke() 是”一次性返回结果”,用户要等整个流程跑完才能看到输出。这在实际产品体验中很不友好。

LangGraph 支持 Streaming,可以实时看到:

  • 问题被分类到了哪个部门
  • 各个节点的处理进度
  • 中间状态的更新

4.2 流式执行代码

1
2
3
4
5
6
7
8
9
# 使用 stream 替代 invoke
for event in app.stream(initial_state):
for key, value in event.items():
print(f"节点 [{key}] 输出:")
if "category" in value:
print(f" → 分类: {value['category']}")
if "response" in value:
print(f" → 回复: {value['response'][:100]}...")
print()

4.3 渐进式决策的优势

结合条件边和 Streaming,可以实现:

  1. 实时调试:看到数据在每个节点的流转情况
  2. 用户反馈:在流程执行中给用户显示”正在转接销售部门…”
  3. 动态干预:根据中间结果决定是否提前终止或改变流程

五、常见错误与调试技巧

5.1 错误一:路由 key 不匹配

1
2
3
4
5
6
7
8
9
10
# ❌ 错误示例
def router(state):
return "billing_dept" # 返回了 billing_dept

graph.add_conditional_edges(
"classifier",
router,
{"billing": "billing_node"} # 但映射里只有 billing
)
# 报错:KeyError 'billing_dept' not found in mapping

解决:确保路由函数返回的字符串与映射字典的 key 完全一致。

5.2 错误二:忘记添加节点

1
2
3
4
5
6
7
8
# ❌ 错误示例
graph.add_conditional_edges(
"classifier",
router,
{"sales": "sales_agent"} # 指向 sales_agent
)
# 但忘记添加 sales_agent 节点!
# 报错:Node 'sales_agent' not found

解决:所有在条件边中引用的节点,必须先用 add_node 添加。

5.3 调试技巧

1
2
3
4
5
# 打印图结构
print(app.get_graph().draw_ascii())

# 查看执行轨迹
result = app.invoke(state, debug=True)

六、总结与下篇预告

6.1 核心要点回顾

  • 条件边让 LangGraph 具备了”动态决策”能力
  • 路由函数返回 key,映射字典定义 key → node 的关系
  • 三种常见模式:分类器路由、质量门循环、渐进式决策
  • 条件边 + Streaming 可以实现实时、可观测的智能流程

6.2 什么时候用条件边?

场景 是否推荐
根据输入选择不同处理逻辑 ✅ 强烈推荐
需要循环/迭代的流程 ✅ 推荐
多轮对话的状态切换 ✅ 推荐
简单的线性流程 ❌ 不需要
固定顺序的数据处理 ❌ 不需要

6.3 下篇预告

在下一篇文章中,我们将深入探讨 LangGraph 的另一个核心概念——State(状态管理)。你会学到:

  • 如何设计复杂的状态结构
  • State 的更新机制与最佳实践
  • 多 Agent 共享 State 的协作模式
  • Reducer 函数的进阶用法

敬请期待《LangGraph 状态管理:让你的 Agent 拥有记忆》!


附录:完整项目结构

1
2
3
4
5
6
7
8
langgraph-customer-service/
├── main.py # 主程序(上面的完整代码)
├── requirements.txt # 依赖
│ langgraph
│ langchain-openai
│ python-dotenv
└── .env # 环境变量(不要提交到Git)
OPENAI_API_KEY=sk-...

运行步骤:

1
2
3
4
5
6
7
8
# 1. 安装依赖
pip install -r requirements.txt

# 2. 设置环境变量
export OPENAI_API_KEY="your-key-here"

# 3. 运行
python main.py

本文示例代码已开源,如需完整项目文件,欢迎在评论区留言。

如果这篇文章对你有帮助,别忘了点赞收藏,转发给正在学习 LangGraph 的朋友!