0%

LangChain学习日志: LangGraph

尝试使用LangGraph。

Intro

传统的开发模式往往将 LLM 视为无状态的处理器,而在构建真正的生产级应用时,我们需要的往往是能够长期运行、拥有记忆并能与环境交互的系统。这正是 LangGraph 诞生的背景。(https://docs.langchain.com/oss/python/langgraph/overview )

LangGraph 是一个专为构建、管理和部署长期运行、Stateful(有状态)智能体而设计的底层编排框架(Low-level orchestration framework)。

与提供高层封装的传统框架不同,LangGraph 赋予了开发者极高的控制权。它不仅仅关注模型和工具的连接,更专注于Agent Orchestration(智能体编排)的核心能力。

官方文档告诉我们,其具有以下优势:

  • Durable execution: Build agents that persist through failures and can run for extended periods, resuming from where they left off.
  • Human-in-the-loop: Incorporate human oversight by inspecting and modifying agent state at any point.
  • Comprehensive memory: Create stateful agents with both short-term working memory for ongoing reasoning and long-term memory across sessions.
  • Debugging with LangSmith: Gain deep visibility into complex agent behavior with visualization tools that trace execution paths, capture state transitions, and provide detailed runtime metrics.
  • Production-ready deployment: Deploy sophisticated agent systems confidently with scalable infrastructure designed to handle the unique challenges of stateful, long-running workflows.

Get Started

下载 LangGraph

1
2
pip install -U langchain
# Requires Python 3.10+

Define tools and model

在上一篇文章中,我们介绍了Agent如何调用工具,而在官方文档对于LangGraph的例子中,定义了简单的运算工具:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from langchain.tools import tool
from langchain.chat_models import init_chat_model

model = init_chat_model(
"claude-sonnet-4-5-20250929",
temperature=0
)

# Define tools
@tool
def multiply(a: int, b: int) -> int:
"""Multiply `a` and `b`.

Args:
a: First int
b: Second int
"""
return a * b

@tool
def add(a: int, b: int) -> int:
"""Adds `a` and `b`.

Args:
a: First int
b: Second int
"""
return a + b

@tool
def divide(a: int, b: int) -> float:
"""Divide `a` and `b`.

Args:
a: First int
b: Second int
"""
return a / b

# Augment the LLM with tools
tools = [add, multiply, divide]
tools_by_name = {tool.name: tool for tool in tools}
model_with_tools = model.bind_tools(tools)

Define state

图中的所有节点通过状态进行通信。在这个例子中,我们直接使用了内置的 MessagesState。 它本质上是一个包含 messages 列表的字典。当你在不同节点间流转时,新的消息会被 Append 到这个列表中,从而形成对话历史。

1
2
3
4
5
6
7
from langchain.messages import AnyMessage
from typing_extensions import TypedDict, Annotated
import operator

class MessagesState(TypedDict):
messages: Annotated[list[AnyMessage], operator.add]
llm_calls: int

Define model node

模型节点不仅负责生成回复,还负责决定是否需要调用工具,直接上面的model_with_tools就可以调用工具。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from langchain.messages import SystemMessage

def llm_call(state: dict):
"""LLM decides whether to call a tool or not"""

return {
"messages": [
model_with_tools.invoke(
[
SystemMessage(
content="You are a helpful assistant tasked with performing arithmetic on a set of inputs."
)
]
+ state["messages"]
)
],
"llm_calls": state.get('llm_calls', 0) + 1
}

Define tool node

当 LLM 决定使用工具时,工具节点会执行具体的计算或查询逻辑,并将结果(ToolMessage)返回给状态。

1
2
3
4
5
6
7
8
9
10
11
from langchain.messages import ToolMessage

def tool_node(state: dict):
"""Performs the tool call"""

result = []
for tool_call in state["messages"][-1].tool_calls:
tool = tools_by_name[tool_call["name"]]
observation = tool.invoke(tool_call["args"])
result.append(ToolMessage(content=observation, tool_call_id=tool_call["id"]))
return {"messages": result}

当你把 Tools 绑定到模型上后(例如我们上面使用 model.bind_tools(tools)),它不会直接回答文字,而是产生一个包含 tool_calls 的消息。一个典型的 tool_calls 结构如下:

1
2
3
4
5
6
7
8
9
10
{
"tool_calls": [
{
"name": "get_weather", // 想要调用的工具名称
"args": {"location": "Beijing"}, // 模型自动提取的参数
"id": "call_123abcd", // 此次调用的ID
"type": "tool_call"
}
]
}

因此,在上面的逻辑中,tool_node 会遍历这个列表,根据 name 找到对应的 Python 函数,把 args 传进去。

Define end logic

Conditional Edge(条件边函数)用于根据 LLM 是否进行工具调用来路由到工具节点或末端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from typing import Literal
from langgraph.graph import StateGraph, START, END


def should_continue(state: MessagesState) -> Literal["tool_node", END]:
"""Decide if we should continue the loop or stop based upon whether the LLM made a tool call"""

messages = state["messages"]
last_message = messages[-1]

# If the LLM makes a tool call, then perform an action
if last_message.tool_calls:
return "tool_node"

# Otherwise, we stop (reply to the user)
return END

这里的 Literal 是 Python 的类型注解,它明确告诉编译器,这个函数执行完后,流量只能流向这两个目的地之一

Build and compile the agent

这一部分是将孤立的逻辑函数(节点)和跳转规则(边)封装成一个可执行的智能体对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Build workflow
agent_builder = StateGraph(MessagesState)

# Add nodes
agent_builder.add_node("llm_call", llm_call)
agent_builder.add_node("tool_node", tool_node)

# Add edges to connect nodes
agent_builder.add_edge(START, "llm_call")
agent_builder.add_conditional_edges(
"llm_call",
should_continue,
["tool_node", END]
)
agent_builder.add_edge("tool_node", "llm_call")

# Compile the agent
agent = agent_builder.compile()

# Show the agent
from IPython.display import Image, display
display(Image(agent.get_graph(xray=True).draw_mermaid_png()))

# Invoke
from langchain.messages import HumanMessage
messages = [HumanMessage(content="Add 3 and 4.")]
messages = agent.invoke({"messages": messages})
for m in messages["messages"]:
m.pretty_print()

我们来拆解一下。首先通过 StateGraph 类定义系统的状态模式 (State Schema)。这是整个图的基石,决定了各节点间通信的数据契约。

1
2
# 初始化图容器,注入全局状态模型
agent_builder = StateGraph(MessagesState)

节点是最小的逻辑调度单位。通过 add_node 建立逻辑名称执行函数的映射关系。

1
2
3
# 注册算力节点(LLM 决策)与 I/O 节点(工具执行)
agent_builder.add_node("llm_call", llm_call)
agent_builder.add_node("tool_node", tool_node)

确定性边动态条件边

  • Entry Point:使用 START 指定系统的入口。
  • Cyclic Edge:通过 add_edge("tool_node", "llm_call") 建立回环,允许智能体根据工具返回的结果进行二次推理。
  • Conditional Routing:核心路由逻辑由 add_conditional_edges 实现。它接收一个判断函数,根据函数返回的 Literal 标签,在预设的路径列表中动态分发流量。
1
2
3
4
5
6
7
agent_builder.add_edge(START, "llm_call")
agent_builder.add_conditional_edges(
"llm_call",
should_continue,
["tool_node", END]
)
agent_builder.add_edge("tool_node", "llm_call")

验证图的连通性并完成协议转换:

1
2
# 将静态拓扑编译为可执行的 Runtime 环境
agent = agent_builder.compile()

总结

根据官方文档,LangGraph 的定义流程可以被总结为:

1. Define tools and model(定义工具与模型) 实例化具体的 ChatModel(如 Claude),并通过 .bind_tools() 方法将自定义的 Python 函数(工具)注入模型。这一步确立了智能体的“大脑”和可使用的“技能列表”。

2. Define state(定义状态) 通过 TypedDict 构建 MessagesState,并利用 Annotated[list, operator.add] 机制,确保后续产生的新消息(无论是 LLM 回复还是工具结果)都会被追加到历史列表中,而不是覆盖原有数据。

3. Define model node(定义模型节点) 、该节点运行绑定了工具的 LLM,接收当前的状态历史,并输出一条新的消息。这条消息可能包含了直接对用户的回复,也可能包含了需要调用工具的结构化指令 (tool_calls)。

4. Define tool node(定义工具节点) 、该节点专门负责解析上一轮模型输出中的 tool_calls,找到对应的 Python 函数并执行,最终将执行结果封装为 ToolMessage 返回给状态。

5. Define end logic(定义结束逻辑) 、通过定义 should_continue 条件函数,判断最近一条消息中是否存在工具调用请求。它决定了控制流是跳转到工具节点继续干活,还是直接走向 END 结束对话。

6. Build and compile the agent(构建与编译智能体) 利用 StateGraph 将上述所有节点(Node)和逻辑判断(Edge)连接成一个完整的图结构,并调用 .compile() 将其转化为可执行的 Runnable 对象,准备好随时响应 invoke 调用。

WorkFlows & Agents

我们刚才做的算术助手其实就是一个典型的 Agent,因为是 LLM 在决定要不要用加法工具。但在实际开发中,并不是所有场景都需要这么高的自由度。有时候,使用 Workflow(比如固定的翻译流程)反而更稳健。LangGraph 的强大之处在于,它允许你在同一个图中混合使用这两种模式。

Workflows(工作流)具有预先设定的代码路径,并设计为按特定顺序运行。在这个模式中,开发者在代码中硬编码了所有的步骤和逻辑分支,而 LLM 仅被用来完成某个特定步骤,而不负责控制整个流程的走向。

Agents是由 LLM 动态决定控制流的系统,由LLM 负责决定下一步该做什么,控制着循环的终止条件和路径选择。

WorkflowsAgents 并不是非黑即白的二元对立,而是一个连续的光谱,Workflows一端代表着低自主性,而Agents代表着高自主性。基于这两个概念,LangChain设计了以下几种模式:

Prompt Chaining

Prompt Chaining(提示词链)是最基础的线性工作流模式。当一个任务过于复杂,无法通过单个 Prompt 准确完成时,我们将其拆解为多个子步骤,前一个步骤的输出作为后一个步骤的输入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from IPython.display import Image, display


# Graph state
class State(TypedDict):
topic: str
joke: str
improved_joke: str
final_joke: str


# Nodes
def generate_joke(state: State):
"""First LLM call to generate initial joke"""

msg = llm.invoke(f"Write a short joke about {state['topic']}")
return {"joke": msg.content}


def check_punchline(state: State):
"""Gate function to check if the joke has a punchline"""

# Simple check - does the joke contain "?" or "!"
if "?" in state["joke"] or "!" in state["joke"]:
return "Pass"
return "Fail"


def improve_joke(state: State):
"""Second LLM call to improve the joke"""

msg = llm.invoke(f"Make this joke funnier by adding wordplay: {state['joke']}")
return {"improved_joke": msg.content}


def polish_joke(state: State):
"""Third LLM call for final polish"""
msg = llm.invoke(f"Add a surprising twist to this joke: {state['improved_joke']}")
return {"final_joke": msg.content}


# Build workflow
workflow = StateGraph(State)

# Add nodes
workflow.add_node("generate_joke", generate_joke)
workflow.add_node("improve_joke", improve_joke)
workflow.add_node("polish_joke", polish_joke)

# Add edges to connect nodes
workflow.add_edge(START, "generate_joke")
workflow.add_conditional_edges(
"generate_joke", check_punchline, {"Fail": "improve_joke", "Pass": END}
)
workflow.add_edge("improve_joke", "polish_joke")
workflow.add_edge("polish_joke", END)

# Compile
chain = workflow.compile()

# Show workflow
display(Image(chain.get_graph().draw_mermaid_png()))

# Invoke
state = chain.invoke({"topic": "cats"})
print("Initial joke:")
print(state["joke"])
print("\n--- --- ---\n")
if "improved_joke" in state:
print("Improved joke:")
print(state["improved_joke"])
print("\n--- --- ---\n")

print("Final joke:")
print(state["final_joke"])
else:
print("Final joke:")
print(state["joke"])

Parallelization

Parallelization(并行化)允许 LLM 同时处理多个独立的子任务。这不仅能提升速度,还能通过多视角增强结果的鲁棒性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# Graph state
class State(TypedDict):
topic: str
joke: str
story: str
poem: str
combined_output: str


# Nodes
def call_llm_1(state: State):
"""First LLM call to generate initial joke"""

msg = llm.invoke(f"Write a joke about {state['topic']}")
return {"joke": msg.content}


def call_llm_2(state: State):
"""Second LLM call to generate story"""

msg = llm.invoke(f"Write a story about {state['topic']}")
return {"story": msg.content}


def call_llm_3(state: State):
"""Third LLM call to generate poem"""

msg = llm.invoke(f"Write a poem about {state['topic']}")
return {"poem": msg.content}


def aggregator(state: State):
"""Combine the joke, story and poem into a single output"""

combined = f"Here's a story, joke, and poem about {state['topic']}!\n\n"
combined += f"STORY:\n{state['story']}\n\n"
combined += f"JOKE:\n{state['joke']}\n\n"
combined += f"POEM:\n{state['poem']}"
return {"combined_output": combined}


# Build workflow
parallel_builder = StateGraph(State)

# Add nodes
parallel_builder.add_node("call_llm_1", call_llm_1)
parallel_builder.add_node("call_llm_2", call_llm_2)
parallel_builder.add_node("call_llm_3", call_llm_3)
parallel_builder.add_node("aggregator", aggregator)

# Add edges to connect nodes
parallel_builder.add_edge(START, "call_llm_1")
parallel_builder.add_edge(START, "call_llm_2")
parallel_builder.add_edge(START, "call_llm_3")
parallel_builder.add_edge("call_llm_1", "aggregator")
parallel_builder.add_edge("call_llm_2", "aggregator")
parallel_builder.add_edge("call_llm_3", "aggregator")
parallel_builder.add_edge("aggregator", END)
parallel_workflow = parallel_builder.compile()

# Show workflow
display(Image(parallel_workflow.get_graph().draw_mermaid_png()))

# Invoke
state = parallel_workflow.invoke({"topic": "cats"})
print(state["combined_output"])

Routing

Routing(路由)引入了分类的概念。系统根据输入的语义或类型,将其分发给最擅长处理该问题的下游路径。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from typing_extensions import Literal
from langchain.messages import HumanMessage, SystemMessage


# Schema for structured output to use as routing logic
class Route(BaseModel):
step: Literal["poem", "story", "joke"] = Field(
None, description="The next step in the routing process"
)


# Augment the LLM with schema for structured output
router = llm.with_structured_output(Route)


# State
class State(TypedDict):
input: str
decision: str
output: str


# Nodes
def llm_call_1(state: State):
"""Write a story"""

result = llm.invoke(state["input"])
return {"output": result.content}


def llm_call_2(state: State):
"""Write a joke"""

result = llm.invoke(state["input"])
return {"output": result.content}


def llm_call_3(state: State):
"""Write a poem"""

result = llm.invoke(state["input"])
return {"output": result.content}


def llm_call_router(state: State):
"""Route the input to the appropriate node"""

# Run the augmented LLM with structured output to serve as routing logic
decision = router.invoke(
[
SystemMessage(
content="Route the input to story, joke, or poem based on the user's request."
),
HumanMessage(content=state["input"]),
]
)

return {"decision": decision.step}


# Conditional edge function to route to the appropriate node
def route_decision(state: State):
# Return the node name you want to visit next
if state["decision"] == "story":
return "llm_call_1"
elif state["decision"] == "joke":
return "llm_call_2"
elif state["decision"] == "poem":
return "llm_call_3"


# Build workflow
router_builder = StateGraph(State)

# Add nodes
router_builder.add_node("llm_call_1", llm_call_1)
router_builder.add_node("llm_call_2", llm_call_2)
router_builder.add_node("llm_call_3", llm_call_3)
router_builder.add_node("llm_call_router", llm_call_router)

# Add edges to connect nodes
router_builder.add_edge(START, "llm_call_router")
router_builder.add_conditional_edges(
"llm_call_router",
route_decision,
{ # Name returned by route_decision : Name of next node to visit
"llm_call_1": "llm_call_1",
"llm_call_2": "llm_call_2",
"llm_call_3": "llm_call_3",
},
)
router_builder.add_edge("llm_call_1", END)
router_builder.add_edge("llm_call_2", END)
router_builder.add_edge("llm_call_3", END)

# Compile workflow
router_workflow = router_builder.compile()

# Show the workflow
display(Image(router_workflow.get_graph().draw_mermaid_png()))

# Invoke
state = router_workflow.invoke({"input": "Write me a joke about cats"})
print(state["output"])

Orchestrator-Worker

Orchestrator-Worker(指挥官-工人)是并行化的高级形态。在普通并行中,任务数量往往是写死的;而在该模式下,Orchestrator(指挥官) 会动态分析任务,决定需要拆分成多少个子任务,并动态创建 Worker(工人) 节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from typing import Annotated, List
import operator


# Schema for structured output to use in planning
class Section(BaseModel):
name: str = Field(
description="Name for this section of the report.",
)
description: str = Field(
description="Brief overview of the main topics and concepts to be covered in this section.",
)


class Sections(BaseModel):
sections: List[Section] = Field(
description="Sections of the report.",
)


# Augment the LLM with schema for structured output
planner = llm.with_structured_output(Sections)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from langgraph.types import Send


# Graph state
class State(TypedDict):
topic: str # Report topic
sections: list[Section] # List of report sections
completed_sections: Annotated[
list, operator.add
] # All workers write to this key in parallel
final_report: str # Final report


# Worker state
class WorkerState(TypedDict):
section: Section
completed_sections: Annotated[list, operator.add]


# Nodes
def orchestrator(state: State):
"""Orchestrator that generates a plan for the report"""

# Generate queries
report_sections = planner.invoke(
[
SystemMessage(content="Generate a plan for the report."),
HumanMessage(content=f"Here is the report topic: {state['topic']}"),
]
)

return {"sections": report_sections.sections}


def llm_call(state: WorkerState):
"""Worker writes a section of the report"""

# Generate section
section = llm.invoke(
[
SystemMessage(
content="Write a report section following the provided name and description. Include no preamble for each section. Use markdown formatting."
),
HumanMessage(
content=f"Here is the section name: {state['section'].name} and description: {state['section'].description}"
),
]
)

# Write the updated section to completed sections
return {"completed_sections": [section.content]}


def synthesizer(state: State):
"""Synthesize full report from sections"""

# List of completed sections
completed_sections = state["completed_sections"]

# Format completed section to str to use as context for final sections
completed_report_sections = "\n\n---\n\n".join(completed_sections)

return {"final_report": completed_report_sections}


# Conditional edge function to create llm_call workers that each write a section of the report
def assign_workers(state: State):
"""Assign a worker to each section in the plan"""

# Kick off section writing in parallel via Send() API
return [Send("llm_call", {"section": s}) for s in state["sections"]]


# Build workflow
orchestrator_worker_builder = StateGraph(State)

# Add the nodes
orchestrator_worker_builder.add_node("orchestrator", orchestrator)
orchestrator_worker_builder.add_node("llm_call", llm_call)
orchestrator_worker_builder.add_node("synthesizer", synthesizer)

# Add edges to connect nodes
orchestrator_worker_builder.add_edge(START, "orchestrator")
orchestrator_worker_builder.add_conditional_edges(
"orchestrator", assign_workers, ["llm_call"]
)
orchestrator_worker_builder.add_edge("llm_call", "synthesizer")
orchestrator_worker_builder.add_edge("synthesizer", END)

# Compile the workflow
orchestrator_worker = orchestrator_worker_builder.compile()

# Show the workflow
display(Image(orchestrator_worker.get_graph().draw_mermaid_png()))

# Invoke
state = orchestrator_worker.invoke({"topic": "Create a report on LLM scaling laws"})

from IPython.display import Markdown
Markdown(state["final_report"])

Evaluator-Optimizer

Evaluator-Optimizer(评估-优化环)模式引入了“反馈循环”。一个 LLM 负责生成,另一个 LLM(或人类)负责评估。如果结果不达标,会携带反馈意见退回给生成器重做。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# Graph state
class State(TypedDict):
joke: str
topic: str
feedback: str
funny_or_not: str


# Schema for structured output to use in evaluation
class Feedback(BaseModel):
grade: Literal["funny", "not funny"] = Field(
description="Decide if the joke is funny or not.",
)
feedback: str = Field(
description="If the joke is not funny, provide feedback on how to improve it.",
)


# Augment the LLM with schema for structured output
evaluator = llm.with_structured_output(Feedback)


# Nodes
def llm_call_generator(state: State):
"""LLM generates a joke"""

if state.get("feedback"):
msg = llm.invoke(
f"Write a joke about {state['topic']} but take into account the feedback: {state['feedback']}"
)
else:
msg = llm.invoke(f"Write a joke about {state['topic']}")
return {"joke": msg.content}


def llm_call_evaluator(state: State):
"""LLM evaluates the joke"""

grade = evaluator.invoke(f"Grade the joke {state['joke']}")
return {"funny_or_not": grade.grade, "feedback": grade.feedback}


# Conditional edge function to route back to joke generator or end based upon feedback from the evaluator
def route_joke(state: State):
"""Route back to joke generator or end based upon feedback from the evaluator"""

if state["funny_or_not"] == "funny":
return "Accepted"
elif state["funny_or_not"] == "not funny":
return "Rejected + Feedback"


# Build workflow
optimizer_builder = StateGraph(State)

# Add the nodes
optimizer_builder.add_node("llm_call_generator", llm_call_generator)
optimizer_builder.add_node("llm_call_evaluator", llm_call_evaluator)

# Add edges to connect nodes
optimizer_builder.add_edge(START, "llm_call_generator")
optimizer_builder.add_edge("llm_call_generator", "llm_call_evaluator")
optimizer_builder.add_conditional_edges(
"llm_call_evaluator",
route_joke,
{ # Name returned by route_joke : Name of next node to visit
"Accepted": END,
"Rejected + Feedback": "llm_call_generator",
},
)

# Compile the workflow
optimizer_workflow = optimizer_builder.compile()

# Show the workflow
display(Image(optimizer_workflow.get_graph().draw_mermaid_png()))

# Invoke
state = optimizer_workflow.invoke({"topic": "Cats"})
print(state["joke"])

Agents

Agents(智能体)是自动化程度最高的模式。与上述模式不同,智能体的执行路径不是预定义的,而是由 LLM 自主决定的。它依赖 Tools(工具)与环境交互,直到它认为任务已完成。

实战

这次我选择基于LangGraph做一个周末旅游顾问,负责向用户推荐当天旅游目的地。

🤖 周末旅游顾问

推荐 :根据用户当前位置,想出 3 个周边目的地

查天气:同时去查这 3 个地方的天气

决策:汇总天气信息,选出一个天气最好的

按照上述文档的规范,我们先定义一个工具weathertool:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import os
import requests
import logging
from typing import Type, Dict, Any, Optional
from pydantic import BaseModel, Field, SecretStr, ValidationError
from langchain_core.tools import BaseTool

# 配置日志
logger = logging.getLogger(__name__)

# --- 1. 定义输入 Schema ---
class WeatherInput(BaseModel):
city: str = Field(
description="需要查询的城市英文名称 (e.g., 'Beijing', 'Shanghai', 'London')"
)

# --- 2. 封装专业的工具类 ---
class OpenWeatherTool(BaseTool):
"""
基于 OpenWeatherMap 的天气查询工具。
"""

name: str = "get_weather_data"
description: str = "获取目标城市的实时天气数据,包括温度、天气状况(condition)、湿度等。"
args_schema: Type[BaseModel] = WeatherInput

api_key: SecretStr = Field(default_factory=lambda: SecretStr(os.environ.get("OPENWEATHER_API_KEY", "")))
base_url: str = "http://api.openweathermap.org/data/2.5/weather"

def _run(self, city: str) -> Dict[str, Any]:

if not self.api_key.get_secret_value():
return {"error": "ConfigurationError", "message": "Missing OPENWEATHER_API_KEY"}

params = {
"q": city,
"appid": self.api_key.get_secret_value(),
"units": "metric",
"lang": "en"
}

try:
# 设置超时
response = requests.get(self.base_url, params=params, timeout=10)
response.raise_for_status()
data = response.json()

result = {
"city": data.get("name", city),
"temp": data["main"]["temp"],
"feels_like": data["main"]["feels_like"],
"humidity": data["main"]["humidity"],
"condition": data["weather"][0]["main"],
"description": data["weather"][0]["description"],
"wind_speed": data["wind"]["speed"],
"status": "success"
}
logger.info(f"Successfully fetched weather for {city}")
return result

except requests.exceptions.HTTPError as e:
logger.error(f"HTTP Error for {city}: {e}")
if response.status_code == 404:
return {"status": "error", "message": "City not found", "city": city}
return {"status": "error", "message": f"API Error: {response.status_code}"}

except requests.exceptions.RequestException as e:
logger.error(f"Connection Error for {city}: {e}")
return {"status": "error", "message": "Connection timeout or failure"}

except Exception as e:
logger.error(f"Unknown Error for {city}: {e}")
return {"status": "error", "message": str(e)}

# --- 3. 将结构化数据转为自然语言 ---
def format_weather_for_llm(weather_data: Dict[str, Any]) -> str:
if weather_data.get("status") == "error":
return f"查询失败: {weather_data.get('message')}"

return (
f"【{weather_data['city']}】\n"
f"天气: {weather_data['condition']} ({weather_data['description']})\n"
f"温度: {weather_data['temp']}°C (体感 {weather_data['feels_like']}°C)\n"
f"湿度: {weather_data['humidity']}%\n"
f"风速: {weather_data['wind_speed']} m/s"
)

和上一篇Agent的文章一样,我们使用 OpenWeatherMap 的天气查询服务,实际上,这个 tool 仅负责规范输入输出并封装天气查询功能。

回到主函数,我们先定义三个状态,即数据流转的格式规范:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Destination(TypedDict):
"""单体目的地结构"""
city: str
reason: str

class AgentState(TypedDict):
"""全局共享状态"""
current_location: str
candidates: List[Destination] # LLM 推荐的城市列表
weather_reports: Annotated[List[Dict], operator.add]
final_decision: Dict # 最终选定的方案
final_output: str # 最终给用户的建议文案

class WeatherWorkerState(TypedDict):
"""Worker 节点的专用输入状态"""
city: str

再定义langgraph的节点。我们需要的agent的行为顺序是选择候选城市->查询天气->根据天气选出最佳城市。

第一个节点根据用户ip自动构建让llm推荐周边城市的 prompt,并发送给llm,让其返回一个city列表。第二个节点负责调用封装好的weathertool,查询这些城市的天气情况。第三个节点负责选出天气最好的城市,这里我们只做一个逻辑判断,不调用llm。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# 初始化通义千问模型
llm = ChatTongyi(model_name="qwen-max", temperature=0.5)

# 3.1 推荐节点
class RecommendOutput(BaseModel):
cities: List[str] = Field(description="3个推荐的周边城市英文名")

def recommend_node(state: AgentState):
structured_llm = llm.with_structured_output(RecommendOutput)

prompt = f"我在 {state['current_location']},请推荐3个适合周末自驾游的周边城市。请只返回城市英文名。"
result = structured_llm.invoke(prompt)

return {"candidates": [{"city": c, "reason": "LLM推荐"} for c in result.cities]}

# 3.2 天气查询节点
def weather_worker(state: WeatherWorkerState):
city = state["city"]

# 实例化并调用工具
tool = OpenWeatherTool()
weather_data = tool.invoke({"city": city})

return {"weather_reports": [weather_data]}

# 3.3 决策节点
def decision_node(state: AgentState):
reports = state["weather_reports"]
best_city = None

# 策略1: 优先找晴天
sunny_cities = [r for r in reports if r.get("condition") in ["Sunny", "Clear"]]

if sunny_cities:
# 如果有多个晴天,选温度最高的
best_city = max(sunny_cities, key=lambda x: x.get("temp", 0))
strategy = "天气晴朗"
else:
# 策略2: 如果都没晴天,选温度最高的
non_rainy = [r for r in reports if "Rain" not in r.get("condition", "")]
if non_rainy:
best_city = max(non_rainy, key=lambda x: x.get("temp", 0))
strategy = "虽然多云但温暖"
else:
# 策略3: 只有雨天,随便选一个
best_city = reports[0]
strategy = "室内活动为主"

# 生成最终文案
final_text = (
f"决定去:【{best_city['city']}】\n"
f"天气状况:{best_city['condition']}, {best_city['temp']}°C\n"
f"湿度:{best_city['humidity']}%\n"
f"推荐理由:{strategy},适合周末出行。"
)

return {"final_decision": best_city, "final_output": final_text}

定义路由逻辑:

1
2
3
4
5
6
7
8
9
def map_weather_tasks(state: AgentState):
"""
Map 步骤:
读取 candidates 列表,为每个城市生成一个 Send 对象。
这将触发 N 个 weather_worker 并行运行。
"""
candidates = state["candidates"]
# 语法: Send(目标节点名, 传递给该节点的Input)
return [Send("weather_worker", {"city": c["city"]}) for c in candidates]

构建graph:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def build_agent():
workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("recommend_node", recommend_node)
workflow.add_node("weather_worker", weather_worker)
workflow.add_node("decision_node", decision_node)

# 定义边
workflow.add_edge(START, "recommend_node")
workflow.add_conditional_edges("recommend_node", map_weather_tasks, ["weather_worker"])
workflow.add_edge("weather_worker", "decision_node")
workflow.add_edge("decision_node", END)

return workflow.compile()

最后贴出整合后的完整的主函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import logging
import operator
import os
from typing import Annotated, Any, Dict, List, TypedDict
from langchain_community.chat_models import ChatTongyi
from langgraph.graph import END, START, StateGraph
from langgraph.types import Send
from pydantic import BaseModel, Field
from tools.weathertool import OpenWeatherTool

# --- Configuration ---
LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
logger = logging.getLogger("TravelAgent")
os.environ["DASHSCOPE_API_KEY"] = "**"
os.environ["OPENWEATHER_API_KEY"] = "**"

# --- State Definitions ---
class Destination(TypedDict):
city: str
reason: str

class AgentState(TypedDict):
current_location: str
candidates: List[Destination]
# 并行任务结果聚合
weather_reports: Annotated[List[Dict[str, Any]], operator.add]
final_decision: Dict[str, Any]
final_output: str

class WeatherWorkerState(TypedDict):
city: str

class RecommendOutput(BaseModel):
cities: List[str] = Field(description="List of 3 recommended cities (English names only).")

# --- Service Initialization ---

llm = ChatTongyi(model_name="qwen-max", temperature=0.5)

# --- Node Implementations ---

def recommend_node(state: AgentState) -> Dict[str, Any]:
"""Generates city recommendations based on current location."""
logger.info(f"Generating recommendations for location: {state['current_location']}")

structured_llm = llm.with_structured_output(RecommendOutput)
prompt = f"Located in {state['current_location']}. Recommend 3 suitable cities for a weekend road trip. Return English names only."

try:
result = structured_llm.invoke(prompt)
logger.info(f"Generated candidates: {result.cities}")
return {"candidates": [{"city": c, "reason": "LLM Recommendation"} for c in result.cities]}
except Exception as e:
logger.error(f"Recommendation failed: {e}")
raise

def weather_worker(state: WeatherWorkerState) -> Dict[str, Any]:
"""Fetches weather data for a specific city."""
city = state["city"]

try:
tool = OpenWeatherTool()
weather_data = tool.invoke({"city": city})
return {"weather_reports": [weather_data]}
except Exception as e:
logger.error(f"Weather fetch failed for {city}: {e}")
return {"weather_reports": [{"city": city, "status": "error"}]}

def decision_node(state: AgentState) -> Dict[str, Any]:
"""Selects the best destination based on weather conditions."""
logger.info("Evaluating weather reports for final decision.")
reports = state["weather_reports"]

# 过滤无效报告
valid_reports = [r for r in reports if r.get("status") != "error"]
if not valid_reports:
raise ValueError("No valid weather reports available.")

# 决策逻辑:优先晴天 -> 其次无雨 -> 最后兜底
sunny_cities = [r for r in valid_reports if r.get("condition") in ["Sunny", "Clear"]]
non_rainy = [r for r in valid_reports if "Rain" not in r.get("condition", "")]

if sunny_cities:
best_city = max(sunny_cities, key=lambda x: x.get("temp", 0))
strategy = "Optimal weather conditions (Sunny/Clear)"
elif non_rainy:
best_city = max(non_rainy, key=lambda x: x.get("temp", 0))
strategy = "Acceptable conditions (Non-rainy)"
else:
best_city = valid_reports[0]
strategy = "Fallback selection (Indoor activities recommended)"

logger.info(f"Selected: {best_city['city']} | Reason: {strategy}")

final_text = (
f"Destination: {best_city['city']}\n"
f"Conditions: {best_city.get('condition', 'N/A')}, {best_city.get('temp', 'N/A')}°C\n"
f"Humidity: {best_city.get('humidity', 'N/A')}%\n"
f"Strategy: {strategy}"
)

return {"final_decision": best_city, "final_output": final_text}

# --- Routing Logic ---

def map_weather_tasks(state: AgentState) -> List[Send]:
"""Generates parallel tasks for weather checking."""
return [Send("weather_worker", {"city": c["city"]}) for c in state["candidates"]]

# --- Graph Construction ---
def create_agent() -> Any:
workflow = StateGraph(AgentState)

# Nodes
workflow.add_node("recommend_node", recommend_node)
workflow.add_node("weather_worker", weather_worker)
workflow.add_node("decision_node", decision_node)

# Edges
workflow.add_edge(START, "recommend_node")
workflow.add_conditional_edges("recommend_node", map_weather_tasks, ["weather_worker"])
workflow.add_edge("weather_worker", "decision_node")
workflow.add_edge("decision_node", END)

return workflow.compile()

# --- Execution Entry Point ---

if __name__ == "__main__":
agent = create_agent()
inputs = {"current_location": "Shanghai"}

logger.info("Starting Travel Agent execution...")

try:
final_state = agent.invoke(inputs)

print("\n--- Execution Result ---")
print(final_state["final_output"])
print("------------------------\n")

logger.debug("Full weather reports: %s", final_state["weather_reports"])

except Exception as e:
logger.critical(f"Execution failed: {e}", exc_info=True)

运行输出为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2026-01-11 23:00:39,700 - TravelAgent - INFO - Starting Travel Agent execution...
2026-01-11 23:00:39,700 - TravelAgent - INFO - Generating recommendations for location: Shanghai
2026-01-11 23:00:43,200 - TravelAgent - INFO - Generated candidates: ['Hangzhou', 'Suzhou', 'Nanjing']
2026-01-11 23:00:43,812 - tools.weathertool - INFO - Successfully fetched weather for Hangzhou
2026-01-11 23:00:43,818 - tools.weathertool - INFO - Successfully fetched weather for Suzhou
2026-01-11 23:00:44,667 - tools.weathertool - INFO - Successfully fetched weather for Nanjing
2026-01-11 23:00:44,669 - TravelAgent - INFO - Evaluating weather reports for final decision.
2026-01-11 23:00:44,670 - TravelAgent - INFO - Selected: Nanjing | Reason: Acceptable conditions (Non-rainy)

--- Execution Result ---
Destination: Nanjing
Conditions: Clouds, 2.21°C
Humidity: 19%
Strategy: Acceptable conditions (Non-rainy)
------------------------