尝试使用LangGraph。
Intro
传统的开发模式往往将 LLM 视为无状态的处理器,而在构建真正的生产级应用时,我们需要的往往是能够长期运行、拥有记忆并能与环境交互的系统。这正是 LangGraph 诞生的背景。(https://docs.langchain.com/oss/python/langgraph/overview )
LangGraph 是一个专为构建、管理和部署长期运行、Stateful(有状态)智能体而设计的底层编排框架(Low-level orchestration framework)。
与提供高层封装的传统框架不同,LangGraph 赋予了开发者极高的控制权。它不仅仅关注模型和工具的连接,更专注于Agent Orchestration(智能体编排)的核心能力。
官方文档告诉我们,其具有以下优势:
- Durable execution: Build agents that persist through failures and can run for extended periods, resuming from where they left off.
- Human-in-the-loop: Incorporate human oversight by inspecting and modifying agent state at any point.
- Comprehensive memory: Create stateful agents with both short-term working memory for ongoing reasoning and long-term memory across sessions.
- Debugging with LangSmith: Gain deep visibility into complex agent behavior with visualization tools that trace execution paths, capture state transitions, and provide detailed runtime metrics.
- Production-ready deployment: Deploy sophisticated agent systems confidently with scalable infrastructure designed to handle the unique challenges of stateful, long-running workflows.
Get Started
下载 LangGraph1
2pip install -U langchain
# Requires Python 3.10+
Define tools and model
在上一篇文章中,我们介绍了Agent如何调用工具,而在官方文档对于LangGraph的例子中,定义了简单的运算工具:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43from langchain.tools import tool
from langchain.chat_models import init_chat_model
model = init_chat_model(
"claude-sonnet-4-5-20250929",
temperature=0
)
# Define tools
def multiply(a: int, b: int) -> int:
"""Multiply `a` and `b`.
Args:
a: First int
b: Second int
"""
return a * b
def add(a: int, b: int) -> int:
"""Adds `a` and `b`.
Args:
a: First int
b: Second int
"""
return a + b
def divide(a: int, b: int) -> float:
"""Divide `a` and `b`.
Args:
a: First int
b: Second int
"""
return a / b
# Augment the LLM with tools
tools = [add, multiply, divide]
tools_by_name = {tool.name: tool for tool in tools}
model_with_tools = model.bind_tools(tools)
Define state
图中的所有节点通过状态进行通信。在这个例子中,我们直接使用了内置的 MessagesState。 它本质上是一个包含 messages 列表的字典。当你在不同节点间流转时,新的消息会被 Append 到这个列表中,从而形成对话历史。1
2
3
4
5
6
7from langchain.messages import AnyMessage
from typing_extensions import TypedDict, Annotated
import operator
class MessagesState(TypedDict):
messages: Annotated[list[AnyMessage], operator.add]
llm_calls: int
Define model node
模型节点不仅负责生成回复,还负责决定是否需要调用工具,直接上面的model_with_tools就可以调用工具。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18from langchain.messages import SystemMessage
def llm_call(state: dict):
"""LLM decides whether to call a tool or not"""
return {
"messages": [
model_with_tools.invoke(
[
SystemMessage(
content="You are a helpful assistant tasked with performing arithmetic on a set of inputs."
)
]
+ state["messages"]
)
],
"llm_calls": state.get('llm_calls', 0) + 1
}
Define tool node
当 LLM 决定使用工具时,工具节点会执行具体的计算或查询逻辑,并将结果(ToolMessage)返回给状态。1
2
3
4
5
6
7
8
9
10
11from langchain.messages import ToolMessage
def tool_node(state: dict):
"""Performs the tool call"""
result = []
for tool_call in state["messages"][-1].tool_calls:
tool = tools_by_name[tool_call["name"]]
observation = tool.invoke(tool_call["args"])
result.append(ToolMessage(content=observation, tool_call_id=tool_call["id"]))
return {"messages": result}
当你把 Tools 绑定到模型上后(例如我们上面使用 model.bind_tools(tools)),它不会直接回答文字,而是产生一个包含 tool_calls 的消息。一个典型的 tool_calls 结构如下:1
2
3
4
5
6
7
8
9
10{
"tool_calls": [
{
"name": "get_weather", // 想要调用的工具名称
"args": {"location": "Beijing"}, // 模型自动提取的参数
"id": "call_123abcd", // 此次调用的ID
"type": "tool_call"
}
]
}
因此,在上面的逻辑中,tool_node 会遍历这个列表,根据 name 找到对应的 Python 函数,把 args 传进去。
Define end logic
Conditional Edge(条件边函数)用于根据 LLM 是否进行工具调用来路由到工具节点或末端。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16from typing import Literal
from langgraph.graph import StateGraph, START, END
def should_continue(state: MessagesState) -> Literal["tool_node", END]:
"""Decide if we should continue the loop or stop based upon whether the LLM made a tool call"""
messages = state["messages"]
last_message = messages[-1]
# If the LLM makes a tool call, then perform an action
if last_message.tool_calls:
return "tool_node"
# Otherwise, we stop (reply to the user)
return END
这里的 Literal 是 Python 的类型注解,它明确告诉编译器,这个函数执行完后,流量只能流向这两个目的地之一。
Build and compile the agent
这一部分是将孤立的逻辑函数(节点)和跳转规则(边)封装成一个可执行的智能体对象。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29# Build workflow
agent_builder = StateGraph(MessagesState)
# Add nodes
agent_builder.add_node("llm_call", llm_call)
agent_builder.add_node("tool_node", tool_node)
# Add edges to connect nodes
agent_builder.add_edge(START, "llm_call")
agent_builder.add_conditional_edges(
"llm_call",
should_continue,
["tool_node", END]
)
agent_builder.add_edge("tool_node", "llm_call")
# Compile the agent
agent = agent_builder.compile()
# Show the agent
from IPython.display import Image, display
display(Image(agent.get_graph(xray=True).draw_mermaid_png()))
# Invoke
from langchain.messages import HumanMessage
messages = [HumanMessage(content="Add 3 and 4.")]
messages = agent.invoke({"messages": messages})
for m in messages["messages"]:
m.pretty_print()
我们来拆解一下。首先通过 StateGraph 类定义系统的状态模式 (State Schema)。这是整个图的基石,决定了各节点间通信的数据契约。1
2# 初始化图容器,注入全局状态模型
agent_builder = StateGraph(MessagesState)
节点是最小的逻辑调度单位。通过 add_node 建立逻辑名称与执行函数的映射关系。1
2
3# 注册算力节点(LLM 决策)与 I/O 节点(工具执行)
agent_builder.add_node("llm_call", llm_call)
agent_builder.add_node("tool_node", tool_node)
确定性边与动态条件边:
- Entry Point:使用
START指定系统的入口。 - Cyclic Edge:通过
add_edge("tool_node", "llm_call")建立回环,允许智能体根据工具返回的结果进行二次推理。 - Conditional Routing:核心路由逻辑由
add_conditional_edges实现。它接收一个判断函数,根据函数返回的Literal标签,在预设的路径列表中动态分发流量。
1 | agent_builder.add_edge(START, "llm_call") |
验证图的连通性并完成协议转换:1
2# 将静态拓扑编译为可执行的 Runtime 环境
agent = agent_builder.compile()
总结
根据官方文档,LangGraph 的定义流程可以被总结为:
1. Define tools and model(定义工具与模型) 实例化具体的 ChatModel(如 Claude),并通过 .bind_tools() 方法将自定义的 Python 函数(工具)注入模型。这一步确立了智能体的“大脑”和可使用的“技能列表”。
2. Define state(定义状态) 通过 TypedDict 构建 MessagesState,并利用 Annotated[list, operator.add] 机制,确保后续产生的新消息(无论是 LLM 回复还是工具结果)都会被追加到历史列表中,而不是覆盖原有数据。
3. Define model node(定义模型节点) 、该节点运行绑定了工具的 LLM,接收当前的状态历史,并输出一条新的消息。这条消息可能包含了直接对用户的回复,也可能包含了需要调用工具的结构化指令 (tool_calls)。
4. Define tool node(定义工具节点) 、该节点专门负责解析上一轮模型输出中的 tool_calls,找到对应的 Python 函数并执行,最终将执行结果封装为 ToolMessage 返回给状态。
5. Define end logic(定义结束逻辑) 、通过定义 should_continue 条件函数,判断最近一条消息中是否存在工具调用请求。它决定了控制流是跳转到工具节点继续干活,还是直接走向 END 结束对话。
6. Build and compile the agent(构建与编译智能体) 利用 StateGraph 将上述所有节点(Node)和逻辑判断(Edge)连接成一个完整的图结构,并调用 .compile() 将其转化为可执行的 Runnable 对象,准备好随时响应 invoke 调用。
WorkFlows & Agents
我们刚才做的算术助手其实就是一个典型的 Agent,因为是 LLM 在决定要不要用加法工具。但在实际开发中,并不是所有场景都需要这么高的自由度。有时候,使用 Workflow(比如固定的翻译流程)反而更稳健。LangGraph 的强大之处在于,它允许你在同一个图中混合使用这两种模式。
Workflows(工作流)具有预先设定的代码路径,并设计为按特定顺序运行。在这个模式中,开发者在代码中硬编码了所有的步骤和逻辑分支,而 LLM 仅被用来完成某个特定步骤,而不负责控制整个流程的走向。
而Agents是由 LLM 动态决定控制流的系统,由LLM 负责决定下一步该做什么,控制着循环的终止条件和路径选择。
Workflows 和 Agents 并不是非黑即白的二元对立,而是一个连续的光谱,Workflows一端代表着低自主性,而Agents代表着高自主性。基于这两个概念,LangChain设计了以下几种模式:
Prompt Chaining
Prompt Chaining(提示词链)是最基础的线性工作流模式。当一个任务过于复杂,无法通过单个 Prompt 准确完成时,我们将其拆解为多个子步骤,前一个步骤的输出作为后一个步骤的输入。
1 | from typing_extensions import TypedDict |
Parallelization
Parallelization(并行化)允许 LLM 同时处理多个独立的子任务。这不仅能提升速度,还能通过多视角增强结果的鲁棒性。
1 | # Graph state |
Routing
Routing(路由)引入了分类的概念。系统根据输入的语义或类型,将其分发给最擅长处理该问题的下游路径。
1 | from typing_extensions import Literal |
Orchestrator-Worker
Orchestrator-Worker(指挥官-工人)是并行化的高级形态。在普通并行中,任务数量往往是写死的;而在该模式下,Orchestrator(指挥官) 会动态分析任务,决定需要拆分成多少个子任务,并动态创建 Worker(工人) 节点。
1 | from typing import Annotated, List |
1 | from langgraph.types import Send |
Evaluator-Optimizer
Evaluator-Optimizer(评估-优化环)模式引入了“反馈循环”。一个 LLM 负责生成,另一个 LLM(或人类)负责评估。如果结果不达标,会携带反馈意见退回给生成器重做。
1 | # Graph state |
Agents
Agents(智能体)是自动化程度最高的模式。与上述模式不同,智能体的执行路径不是预定义的,而是由 LLM 自主决定的。它依赖 Tools(工具)与环境交互,直到它认为任务已完成。
实战
这次我选择基于LangGraph做一个周末旅游顾问,负责向用户推荐当天旅游目的地。
🤖 周末旅游顾问
推荐 :根据用户当前位置,想出 3 个周边目的地
查天气:同时去查这 3 个地方的天气
决策:汇总天气信息,选出一个天气最好的
按照上述文档的规范,我们先定义一个工具weathertool:
1 | import os |
和上一篇Agent的文章一样,我们使用 OpenWeatherMap 的天气查询服务,实际上,这个 tool 仅负责规范输入输出并封装天气查询功能。
回到主函数,我们先定义三个状态,即数据流转的格式规范:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16class Destination(TypedDict):
"""单体目的地结构"""
city: str
reason: str
class AgentState(TypedDict):
"""全局共享状态"""
current_location: str
candidates: List[Destination] # LLM 推荐的城市列表
weather_reports: Annotated[List[Dict], operator.add]
final_decision: Dict # 最终选定的方案
final_output: str # 最终给用户的建议文案
class WeatherWorkerState(TypedDict):
"""Worker 节点的专用输入状态"""
city: str
再定义langgraph的节点。我们需要的agent的行为顺序是选择候选城市->查询天气->根据天气选出最佳城市。
第一个节点根据用户ip自动构建让llm推荐周边城市的 prompt,并发送给llm,让其返回一个city列表。第二个节点负责调用封装好的weathertool,查询这些城市的天气情况。第三个节点负责选出天气最好的城市,这里我们只做一个逻辑判断,不调用llm。
1 | # 初始化通义千问模型 |
定义路由逻辑:1
2
3
4
5
6
7
8
9def map_weather_tasks(state: AgentState):
"""
Map 步骤:
读取 candidates 列表,为每个城市生成一个 Send 对象。
这将触发 N 个 weather_worker 并行运行。
"""
candidates = state["candidates"]
# 语法: Send(目标节点名, 传递给该节点的Input)
return [Send("weather_worker", {"city": c["city"]}) for c in candidates]
构建graph:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15def build_agent():
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("recommend_node", recommend_node)
workflow.add_node("weather_worker", weather_worker)
workflow.add_node("decision_node", decision_node)
# 定义边
workflow.add_edge(START, "recommend_node")
workflow.add_conditional_edges("recommend_node", map_weather_tasks, ["weather_worker"])
workflow.add_edge("weather_worker", "decision_node")
workflow.add_edge("decision_node", END)
return workflow.compile()
最后贴出整合后的完整的主函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146import logging
import operator
import os
from typing import Annotated, Any, Dict, List, TypedDict
from langchain_community.chat_models import ChatTongyi
from langgraph.graph import END, START, StateGraph
from langgraph.types import Send
from pydantic import BaseModel, Field
from tools.weathertool import OpenWeatherTool
# --- Configuration ---
LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
logger = logging.getLogger("TravelAgent")
os.environ["DASHSCOPE_API_KEY"] = "**"
os.environ["OPENWEATHER_API_KEY"] = "**"
# --- State Definitions ---
class Destination(TypedDict):
city: str
reason: str
class AgentState(TypedDict):
current_location: str
candidates: List[Destination]
# 并行任务结果聚合
weather_reports: Annotated[List[Dict[str, Any]], operator.add]
final_decision: Dict[str, Any]
final_output: str
class WeatherWorkerState(TypedDict):
city: str
class RecommendOutput(BaseModel):
cities: List[str] = Field(description="List of 3 recommended cities (English names only).")
# --- Service Initialization ---
llm = ChatTongyi(model_name="qwen-max", temperature=0.5)
# --- Node Implementations ---
def recommend_node(state: AgentState) -> Dict[str, Any]:
"""Generates city recommendations based on current location."""
logger.info(f"Generating recommendations for location: {state['current_location']}")
structured_llm = llm.with_structured_output(RecommendOutput)
prompt = f"Located in {state['current_location']}. Recommend 3 suitable cities for a weekend road trip. Return English names only."
try:
result = structured_llm.invoke(prompt)
logger.info(f"Generated candidates: {result.cities}")
return {"candidates": [{"city": c, "reason": "LLM Recommendation"} for c in result.cities]}
except Exception as e:
logger.error(f"Recommendation failed: {e}")
raise
def weather_worker(state: WeatherWorkerState) -> Dict[str, Any]:
"""Fetches weather data for a specific city."""
city = state["city"]
try:
tool = OpenWeatherTool()
weather_data = tool.invoke({"city": city})
return {"weather_reports": [weather_data]}
except Exception as e:
logger.error(f"Weather fetch failed for {city}: {e}")
return {"weather_reports": [{"city": city, "status": "error"}]}
def decision_node(state: AgentState) -> Dict[str, Any]:
"""Selects the best destination based on weather conditions."""
logger.info("Evaluating weather reports for final decision.")
reports = state["weather_reports"]
# 过滤无效报告
valid_reports = [r for r in reports if r.get("status") != "error"]
if not valid_reports:
raise ValueError("No valid weather reports available.")
# 决策逻辑:优先晴天 -> 其次无雨 -> 最后兜底
sunny_cities = [r for r in valid_reports if r.get("condition") in ["Sunny", "Clear"]]
non_rainy = [r for r in valid_reports if "Rain" not in r.get("condition", "")]
if sunny_cities:
best_city = max(sunny_cities, key=lambda x: x.get("temp", 0))
strategy = "Optimal weather conditions (Sunny/Clear)"
elif non_rainy:
best_city = max(non_rainy, key=lambda x: x.get("temp", 0))
strategy = "Acceptable conditions (Non-rainy)"
else:
best_city = valid_reports[0]
strategy = "Fallback selection (Indoor activities recommended)"
logger.info(f"Selected: {best_city['city']} | Reason: {strategy}")
final_text = (
f"Destination: {best_city['city']}\n"
f"Conditions: {best_city.get('condition', 'N/A')}, {best_city.get('temp', 'N/A')}°C\n"
f"Humidity: {best_city.get('humidity', 'N/A')}%\n"
f"Strategy: {strategy}"
)
return {"final_decision": best_city, "final_output": final_text}
# --- Routing Logic ---
def map_weather_tasks(state: AgentState) -> List[Send]:
"""Generates parallel tasks for weather checking."""
return [Send("weather_worker", {"city": c["city"]}) for c in state["candidates"]]
# --- Graph Construction ---
def create_agent() -> Any:
workflow = StateGraph(AgentState)
# Nodes
workflow.add_node("recommend_node", recommend_node)
workflow.add_node("weather_worker", weather_worker)
workflow.add_node("decision_node", decision_node)
# Edges
workflow.add_edge(START, "recommend_node")
workflow.add_conditional_edges("recommend_node", map_weather_tasks, ["weather_worker"])
workflow.add_edge("weather_worker", "decision_node")
workflow.add_edge("decision_node", END)
return workflow.compile()
# --- Execution Entry Point ---
if __name__ == "__main__":
agent = create_agent()
inputs = {"current_location": "Shanghai"}
logger.info("Starting Travel Agent execution...")
try:
final_state = agent.invoke(inputs)
print("\n--- Execution Result ---")
print(final_state["final_output"])
print("------------------------\n")
logger.debug("Full weather reports: %s", final_state["weather_reports"])
except Exception as e:
logger.critical(f"Execution failed: {e}", exc_info=True)
运行输出为:1
2
3
4
5
6
7
8
9
10
11
12
13
14
152026-01-11 23:00:39,700 - TravelAgent - INFO - Starting Travel Agent execution...
2026-01-11 23:00:39,700 - TravelAgent - INFO - Generating recommendations for location: Shanghai
2026-01-11 23:00:43,200 - TravelAgent - INFO - Generated candidates: ['Hangzhou', 'Suzhou', 'Nanjing']
2026-01-11 23:00:43,812 - tools.weathertool - INFO - Successfully fetched weather for Hangzhou
2026-01-11 23:00:43,818 - tools.weathertool - INFO - Successfully fetched weather for Suzhou
2026-01-11 23:00:44,667 - tools.weathertool - INFO - Successfully fetched weather for Nanjing
2026-01-11 23:00:44,669 - TravelAgent - INFO - Evaluating weather reports for final decision.
2026-01-11 23:00:44,670 - TravelAgent - INFO - Selected: Nanjing | Reason: Acceptable conditions (Non-rainy)
--- Execution Result ---
Destination: Nanjing
Conditions: Clouds, 2.21°C
Humidity: 19%
Strategy: Acceptable conditions (Non-rainy)
------------------------