AI Agent 任务分解与自主规划:从 ReAct 到 LangGraph 实战

当 AI 面对复杂任务时,”一步到位”往往是幻想。真正的智能体现在如何拆解问题、规划路径、并在执行中不断调整。

1. 为什么任务分解是 Agent 的核心能力

1.1 复杂任务的困境

想象你让 AI “帮我策划一场 50 人的技术分享会”。这个看似简单的指令背后包含:

  • 确定主题和讲师
  • 寻找合适场地
  • 安排时间和日程
  • 准备物料和设备
  • 发送邀请函
  • 现场签到和接待
  • 活动后复盘

如果 Agent 试图”一口气”完成,结果往往是灾难性的。要么遗漏关键环节,要么在某个子任务上陷入死循环。

1.2 人类是如何做的

人类面对复杂任务时,本能会进行分解:

1
大目标 → 阶段里程碑 → 具体任务 → 执行步骤

我们会先制定大纲,再填充细节;先确定框架,再完善内容。这种”分而治之”的思维,正是 Agent 需要学习的。

1.3 Agent 任务分解的本质

Agent 的任务分解不是简单的”把大任务切成小任务”,而是涉及:

能力维度 说明
依赖识别 哪些子任务有先后顺序?哪些可以并行?
资源评估 每个子任务需要什么工具/信息?
容错设计 某个子任务失败时如何回退或重试?
动态调整 执行中发现新信息时如何重新规划?

2. 经典方案:ReAct 模式

2.1 ReAct 是什么

ReAct(Reasoning + Acting)是 Agent 领域最具影响力的模式之一,来自论文《ReAct: Synergizing Reasoning and Acting in Language Models》。

核心思想很简单:让 LLM 交替进行推理和行动

1
Thought → Action → Observation → Thought → Action → ...

2.2 ReAct 的执行流程

以 “查询北京的天气,然后计算今天适合穿什么衣服” 为例:

1
2
3
4
5
6
7
8
9
10
Thought: 用户想知道北京今天的天气,我需要先查询天气信息。
Action: search_weather(location="北京")
Observation: 北京今天晴,气温 -2°C 到 8°C,北风 3-4 级

Thought: 天气晴朗但温度较低,有风。建议穿保暖外套。
Action: generate_clothing_advice(temperature=8, weather="晴", wind="3-4级")
Observation: 建议穿着:羽绒服或厚外套,内搭毛衣,注意防风。

Thought: 我已经获得了足够信息,可以给出完整回答了。
Action: finish(answer="北京今天天气晴朗但较冷...")

2.3 ReAct 的代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from typing import TypedDict, Annotated, Sequence
import operator
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END

# 定义状态
class ReActState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
steps: int # 执行步数,用于防止无限循环

# 工具定义
from langchain_core.tools import tool

@tool
def search_weather(location: str) -> str:
"""查询指定城市的天气"""
# 实际实现会调用天气 API
return f"{location}今天晴,气温 5-15°C"

@tool
def calculate(expression: str) -> str:
"""计算数学表达式"""
return str(eval(expression))

tools = [search_weather, calculate]

# 绑定工具的模型
model = ChatOpenAI(model="gpt-4o").bind_tools(tools)

# ReAct Agent 节点
def react_agent(state: ReActState):
"""
ReAct Agent 核心逻辑:
1. 接收当前消息历史
2. 模型决定是调用工具还是直接回答
3. 更新状态
"""
messages = state["messages"]
response = model.invoke(messages)

return {
"messages": [response],
"steps": state.get("steps", 0) + 1
}

# 工具执行节点
from langchain_core.messages import ToolMessage

def execute_tools(state: ReActState):
"""执行模型请求的工具调用"""
messages = state["messages"]
last_message = messages[-1]

# 收集工具调用结果
tool_results = []
for tool_call in last_message.tool_calls:
# 找到对应的工具函数
tool_func = next(t for t in tools if t.name == tool_call["name"])
# 执行工具
result = tool_func.invoke(tool_call["args"])
tool_results.append(
ToolMessage(
content=str(result),
tool_call_id=tool_call["id"]
)
)

return {"messages": tool_results}

# 判断是否需要继续
def should_continue(state: ReActState) -> str:
"""
决定流程走向:
- 如果最后一条消息有 tool_calls,说明需要执行工具
- 如果没有,说明 Agent 已经给出最终答案
- 如果步数超过限制,强制结束
"""
messages = state["messages"]
last_message = messages[-1]

# 防止无限循环
if state.get("steps", 0) > 10:
return "end"

# 检查是否有工具调用
if last_message.tool_calls:
return "continue"

return "end"

# 构建图
workflow = StateGraph(ReActState)

workflow.add_node("agent", react_agent)
workflow.add_node("tools", execute_tools)

workflow.set_entry_point("agent")

workflow.add_conditional_edges(
"agent",
should_continue,
{
"continue": "tools",
"end": END
}
)

workflow.add_edge("tools", "agent")

app = workflow.compile()

# 运行示例
result = app.invoke({
"messages": [HumanMessage(content="北京今天天气怎么样?气温的平方是多少?")],
"steps": 0
})

2.4 ReAct 的局限性

ReAct 虽然强大,但在复杂任务面前有明显短板:

缺乏全局规划:ReAct 是”走一步看一步”,没有事前规划。面对 “策划技术分享会” 这种任务,它可能在场地和讲师之间反复横跳,而不是按逻辑顺序推进。

无法处理并行:ReAct 的执行路径是线性的,无法同时发起多个独立查询。

难以处理长程依赖:如果第 10 步需要第 1 步的信息,ReAct 可能已经在上下文中丢失了这个信息。

3. 进阶方案:Plan-and-Execute

3.1 规划优先的思路

Plan-and-Execute 模式的核心是先规划,后执行

1
2
3
1. Planner: 分析任务,生成执行计划
2. Executor: 按计划逐步执行
3. Re-Planner: 根据执行情况动态调整计划

这就像人类的项目管理:先做 WBS(工作分解结构),再分配任务,最后根据进度调整计划。

3.2 LangGraph 中的 Plan-and-Execute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
from typing import List, Optional
from pydantic import BaseModel, Field

# 定义计划项
class PlanStep(BaseModel):
step_id: int = Field(description="步骤序号")
description: str = Field(description="步骤描述")
tool: Optional[str] = Field(description="需要使用的工具名,可为空")
dependencies: List[int] = Field(
default_factory=list,
description="依赖的步骤ID"
)

class Plan(BaseModel):
steps: List[PlanStep] = Field(description="执行计划步骤列表")
estimated_steps: int = Field(description="预计总步数")

# 状态定义
class PlanExecuteState(TypedDict):
input: str # 原始输入
plan: Plan # 生成的计划
current_step: int # 当前执行到第几步
step_results: Annotated[List[dict], operator.add] # 各步骤执行结果
final_answer: Optional[str] # 最终答案
replan_count: int # 重规划次数

# Planner 节点
planner_model = ChatOpenAI(model="gpt-4o").with_structured_output(Plan)

def planner_node(state: PlanExecuteState):
"""根据用户输入生成执行计划"""

planner_prompt = f"""
你是一个任务规划专家。请将用户的任务分解为可执行的步骤。

可用工具:
- search_weather: 查询天气
- search_location: 搜索地点信息
- calculate: 数学计算
- send_email: 发送邮件

用户任务:{state['input']}

请生成详细的执行计划,考虑步骤间的依赖关系。
"""

plan = planner_model.invoke(planner_prompt)

return {"plan": plan, "current_step": 0, "replan_count": 0}

# Executor 节点
executor_model = ChatOpenAI(model="gpt-4o").bind_tools(tools)

def executor_node(state: PlanExecuteState):
"""执行当前步骤"""
plan = state["plan"]
current_idx = state["current_step"]

if current_idx >= len(plan.steps):
return {"final_answer": "所有步骤已完成"}

step = plan.steps[current_idx]

# 收集依赖步骤的结果
dep_results = {
f"step_{d}": state["step_results"][d]
for d in step.dependencies
if d < len(state["step_results"])
}

executor_prompt = f"""
执行以下计划步骤:

步骤 {step.step_id}: {step.description}
建议工具: {step.tool or "无"}

依赖步骤结果:{dep_results}

请执行此步骤,返回执行结果。
"""

response = executor_model.invoke(executor_prompt)

# 检查是否需要调用工具
if response.tool_calls:
# 执行工具
tool_results = []
for tc in response.tool_calls:
tool_func = next(t for t in tools if t.name == tc["name"])
result = tool_func.invoke(tc["args"])
tool_results.append({
"tool": tc["name"],
"args": tc["args"],
"result": str(result)
})
step_result = {"step_id": step.step_id, "results": tool_results}
else:
step_result = {"step_id": step.step_id, "answer": response.content}

return {
"step_results": [step_result],
"current_step": current_idx + 1
}

# Re-Planner 节点
def replanner_node(state: PlanExecuteState):
"""
根据执行结果决定:
1. 继续执行下一步
2. 重新规划剩余步骤
3. 结束任务
"""

# 检查是否所有步骤完成
if state["current_step"] >= len(state["plan"].steps):
# 生成最终答案
final_prompt = f"""
任务已完成。所有步骤结果:
{state["step_results"]}

请生成最终回答。
"""
response = executor_model.invoke(final_prompt)
return {"final_answer": response.content}

# 检查是否需要重规划
last_result = state["step_results"][-1] if state["step_results"] else None

if last_result and "error" in str(last_result):
# 某步骤失败,需要重规划
if state["replan_count"] < 3: # 最多重规划 3 次
replan_prompt = f"""
步骤执行出现问题,需要调整计划。

原计划:{state['plan']}
已完成步骤:{state['step_results']}
当前步骤:{state['current_step']}
问题:{last_result}

请重新规划剩余步骤。
"""
new_plan = planner_model.invoke(replan_prompt)
return {
"plan": new_plan,
"replan_count": state["replan_count"] + 1
}

# 继续执行
return {}

# 构建工作流
workflow = StateGraph(PlanExecuteState)

workflow.add_node("planner", planner_node)
workflow.add_node("executor", executor_node)
workflow.add_node("replanner", replanner_node)

workflow.set_entry_point("planner")
workflow.add_edge("planner", "executor")
workflow.add_edge("executor", "replanner")

# 条件边:如果有 final_answer 就结束,否则继续执行
workflow.add_conditional_edges(
"replanner",
lambda s: "end" if s.get("final_answer") else "continue",
{
"end": END,
"continue": "executor"
}
)

app = workflow.compile()

3.3 Plan-and-Execute 的优势

  1. 全局视野:执行前就能看到完整路径
  2. 依赖管理:明确步骤间的先后关系
  3. 可解释性:计划本身就是执行逻辑的文档
  4. 可控性:可以在执行前审查和调整计划

4. 反思与自我修正:Reflexion 模式

4.1 为什么需要反思

即使是最好的计划,执行中也会遇到意外:

  • 工具返回错误
  • 发现新信息需要调整策略
  • 某条路径走不通需要回退

人类会”边做边学”,Agent 也需要这种能力。

4.2 Reflexion 的核心机制

Reflexion 模式引入了三层记忆:

1
2
3
短期记忆(Short-term): 当前任务上下文
长期记忆(Long-term): 跨任务的技能和经验
外部记忆(External): 知识库、文档等

反思发生在以下时机:

  1. 执行失败时:分析原因,尝试其他方法
  2. 达到里程碑时:评估进度,调整后续计划
  3. 任务完成时:总结经验,更新长期记忆

4.3 在 LangGraph 中实现反思

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from typing import Literal

class Reflection(BaseModel):
success: bool = Field(description="上一步是否成功")
analysis: str = Field(description="结果分析")
lesson: str = Field(description="学到的经验")
adjustment: Optional[str] = Field(description="需要的调整")

def reflection_node(state: PlanExecuteState):
"""
反思节点:评估上一步执行,决定是否调整策略
"""

if not state["step_results"]:
return {} # 还没有执行结果,跳过反思

last_result = state["step_results"][-1]
current_step = state["current_step"] - 1 # 上一步

reflection_model = ChatOpenAI(
model="gpt-4o"
).with_structured_output(Reflection)

reflection_prompt = f"""
请反思最近一步的执行:

原计划步骤 {current_step}: {state['plan'].steps[current_step]}
执行结果: {last_result}

请分析:
1. 是否达到预期目标?
2. 遇到了什么问题?
3. 有什么经验教训?
4. 后续需要如何调整?
"""

reflection = reflection_model.invoke(reflection_prompt)

# 如果失败,可能需要重试或改道
if not reflection.success:
# 记录失败经验
print(f"⚠️ 步骤 {current_step} 遇到问题: {reflection.analysis}")
print(f"💡 经验总结: {reflection.lesson}")

# 触发重规划
return {
"replan_needed": True,
"reflection": reflection.dict()
}

return {"reflection": reflection.dict()}

4.4 反思驱动的迭代优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def should_replan(state: PlanExecuteState) -> Literal["replan", "continue", "end"]:
"""根据反思结果决定下一步"""

# 如果有最终答案,结束
if state.get("final_answer"):
return "end"

# 如果反思认为需要重规划
if state.get("replan_needed"):
return "replan"

# 检查是否还有步骤
if state["current_step"] < len(state["plan"].steps):
return "continue"

return "end"

# 更新工作流
workflow.add_node("reflect", reflection_node)

# 在执行后添加反思
workflow.add_edge("executor", "reflect")
workflow.add_conditional_edges(
"reflect",
should_replan,
{
"replan": "replanner",
"continue": "executor",
"end": END
}
)

5. 实战案例:智能客服 Agent

5.1 场景描述

构建一个电商客服 Agent,处理用户咨询:

1
2
用户:我上周买的那件蓝色外套不太合适,想退货。
另外你们有新上架的春季夹克吗?

这个请求包含多个子任务:

  1. 识别用户身份
  2. 查询订单历史
  3. 处理退货申请
  4. 查询新品信息
  5. 生成回复

5.2 任务分解设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class CustomerServiceState(TypedDict):
user_message: str
user_id: Optional[str]
intent: List[str] # 识别出的意图列表
plan: List[dict]
context: dict # 查询到的信息
response: Optional[str]

# 意图识别 + 任务分解
intents = {
"return_request": "退货申请",
"product_inquiry": "商品咨询",
"order_status": "订单查询",
"complaint": "投诉反馈"
}

def analyze_and_plan(state: CustomerServiceState):
"""分析用户意图,生成执行计划"""

analysis_model = ChatOpenAI(model="gpt-4o")

analysis_prompt = f"""
分析用户消息,识别所有意图并生成处理计划。

用户消息:{state['user_message']}
已知用户ID:{state.get('user_id', '未识别')}

可用工具:
- identify_user: 识别用户身份
- get_order_history: 获取订单历史
- process_return: 处理退货
- search_products: 搜索商品
- check_inventory: 查询库存

请输出:
1. 识别到的意图列表
2. 分步骤处理计划(考虑依赖关系)
"""

response = analysis_model.invoke(analysis_prompt)

# 解析响应生成结构化计划
# ...

return {"plan": plan, "intents": intents}

5.3 并行执行独立任务

客服场景中,查询订单和搜索新品是独立的,可以并行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from langgraph.graph import StateGraph, END
import asyncio

async def parallel_executor(state: CustomerServiceState):
"""并行执行无依赖的任务"""

plan = state["plan"]

# 识别可并行执行的步骤
parallel_groups = group_by_dependencies(plan)

results = {}
for group in parallel_groups:
# 同组任务并行执行
tasks = [
execute_step(step, state["context"])
for step in group
]
group_results = await asyncio.gather(*tasks)
results.update(group_results)

return {"context": {**state["context"], **results}}

def group_by_dependencies(plan: List[dict]) -> List[List[dict]]:
"""
根据依赖关系将计划分组
同组任务可以并行,组间需要顺序执行
"""
# 拓扑排序实现
# ...
return groups

5.4 完整工作流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def build_customer_service_agent():
workflow = StateGraph(CustomerServiceState)

# 节点定义
workflow.add_node("analyze", analyze_and_plan)
workflow.add_node("identify", identify_user_node)
workflow.add_node("parallel_execute", parallel_executor)
workflow.add_node("generate", generate_response_node)
workflow.add_node("reflect", quality_check_node) # 质检反思

# 流程编排
workflow.set_entry_point("analyze")

# 分析后,如果需要识别用户
workflow.add_conditional_edges(
"analyze",
lambda s: "identify" if s.get("need_identify") else "parallel_execute",
{"identify": "identify", "parallel_execute": "parallel_execute"}
)

workflow.add_edge("identify", "parallel_execute")
workflow.add_edge("parallel_execute", "generate")
workflow.add_edge("generate", "reflect")

# 质检不通过则重新生成
workflow.add_conditional_edges(
"reflect",
lambda s: "end" if s.get("quality_passed") else "generate",
{"end": END, "generate": "generate"}
)

return workflow.compile()

6. 最佳实践与踩坑经验

6.1 规划粒度如何把握

规划太粗,失去指导意义;规划太细,灵活性丧失。建议:

  • 每个步骤聚焦一个明确目标
  • 步骤描述包含成功标准
  • 预留”自适应步骤”应对未知情况

6.2 防止无限循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class RobustState(TypedDict):
# ... 其他字段
step_count: int # 总执行步数
replan_count: int # 重规划次数
same_error_count: int # 同一错误重复次数

def safety_check(state: RobustState) -> str:
"""安全检查,防止无限循环"""

if state["step_count"] > 50:
return "abort" # 总步数超限

if state["replan_count"] > 5:
return "abort" # 重规划次数超限

# 检查是否在同一问题上反复失败
errors = [r for r in state["step_results"] if "error" in str(r)]
if len(errors) > 3:
last_three = errors[-3:]
if all(e == last_three[0] for e in last_three):
return "abort" # 同一错误重复 3 次

return "continue"

6.3 规划错误的恢复

当计划本身有问题时,Agent 需要能够:

  1. 识别计划不可行:工具返回”无法完成”
  2. 分析失败原因:是信息不足还是逻辑错误
  3. 生成替代方案:尝试其他路径或请求用户澄清

6.4 人机协作节点

复杂任务中,适时引入人类判断:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from langgraph.types import interrupt

def human_approval_node(state: PlanExecuteState):
"""需要人类确认的关键节点"""

# 高价值操作需要确认
if is_high_risk_action(state["current_step"]):
approval = interrupt({
"message": f"即将执行:{state['current_step']}\n确认执行吗?",
"options": ["approve", "reject", "modify"]
})

if approval == "reject":
return {"final_answer": "用户取消了操作"}
elif approval == "modify":
# 获取用户修改意见
modification = interrupt({"message": "请描述修改意见:"})
return {"user_modification": modification}

return {}

7. 总结:任务分解的艺术

7.1 从 ReAct 到 Plan-and-Execute 的演进

特性 ReAct Plan-and-Execute
规划时机 边执行边规划 先规划后执行
全局视野 局部 全局
并行能力
可解释性 中等
适用场景 简单任务 复杂多步任务

7.2 核心设计原则

  1. 分而治之:复杂任务必须分解
  2. 规划先行:执行前先看全局
  3. 反思迭代:执行中学习和调整
  4. 安全兜底:防止无限循环和错误累积
  5. 人机协作:关键节点引入人类判断

7.3 下一步探索

  • LLM 作为规划器:如何让 LLM 生成更优计划
  • 学习优化:从历史执行中学习,改进未来规划
  • 多 Agent 协作:多个 Agent 各自负责子任务,协同完成大目标

任务分解不仅是一种技术方案,更是一种思维方式。当 Agent 学会”像项目经理一样思考”,它才能真正处理现实世界中的复杂问题。


参考资源