Durable execution: Build agents that persist through failures and can run for extended periods, resuming from where they left off.
Human-in-the-loop: Incorporate human oversight by inspecting and modifying agent state at any point.
Comprehensive memory: Create stateful agents with both short-term working memory for ongoing reasoning and long-term memory across sessions.
Debugging with LangSmith: Gain deep visibility into complex agent behavior with visualization tools that trace execution paths, capture state transitions, and provide detailed runtime metrics.
Production-ready deployment: Deploy sophisticated agent systems confidently with scalable infrastructure designed to handle the unique challenges of stateful, long-running workflows.
defllm_call(state: dict): """LLM decides whether to call a tool or not"""
return { "messages": [ model_with_tools.invoke( [ SystemMessage( content="You are a helpful assistant tasked with performing arithmetic on a set of inputs." ) ] + state["messages"] ) ], "llm_calls": state.get('llm_calls', 0) + 1 }
from typing importLiteral from langgraph.graph import StateGraph, START, END
defshould_continue(state: MessagesState) -> Literal["tool_node", END]: """Decide if we should continue the loop or stop based upon whether the LLM made a tool call"""
# Compile the agent agent = agent_builder.compile()
# Show the agent from IPython.display import Image, display display(Image(agent.get_graph(xray=True).draw_mermaid_png()))
# Invoke from langchain.messages import HumanMessage messages = [HumanMessage(content="Add 3 and 4.")] messages = agent.invoke({"messages": messages}) for m in messages["messages"]: m.pretty_print()
msg = llm.invoke(f"Write a short joke about {state['topic']}") return {"joke": msg.content}
defcheck_punchline(state: State): """Gate function to check if the joke has a punchline"""
# Simple check - does the joke contain "?" or "!" if"?"in state["joke"] or"!"in state["joke"]: return"Pass" return"Fail"
defimprove_joke(state: State): """Second LLM call to improve the joke"""
msg = llm.invoke(f"Make this joke funnier by adding wordplay: {state['joke']}") return {"improved_joke": msg.content}
defpolish_joke(state: State): """Third LLM call for final polish""" msg = llm.invoke(f"Add a surprising twist to this joke: {state['improved_joke']}") return {"final_joke": msg.content}
from typing_extensions importLiteral from langchain.messages import HumanMessage, SystemMessage
# Schema for structured output to use as routing logic classRoute(BaseModel): step: Literal["poem", "story", "joke"] = Field( None, description="The next step in the routing process" )
# Augment the LLM with schema for structured output router = llm.with_structured_output(Route)
# State classState(TypedDict): input: str decision: str output: str
# Nodes defllm_call_1(state: State): """Write a story"""
result = llm.invoke(state["input"]) return {"output": result.content}
defllm_call_2(state: State): """Write a joke"""
result = llm.invoke(state["input"]) return {"output": result.content}
defllm_call_3(state: State): """Write a poem"""
result = llm.invoke(state["input"]) return {"output": result.content}
defllm_call_router(state: State): """Route the input to the appropriate node"""
# Run the augmented LLM with structured output to serve as routing logic decision = router.invoke( [ SystemMessage( content="Route the input to story, joke, or poem based on the user's request." ), HumanMessage(content=state["input"]), ] )
return {"decision": decision.step}
# Conditional edge function to route to the appropriate node defroute_decision(state: State): # Return the node name you want to visit next if state["decision"] == "story": return"llm_call_1" elif state["decision"] == "joke": return"llm_call_2" elif state["decision"] == "poem": return"llm_call_3"
# Add edges to connect nodes router_builder.add_edge(START, "llm_call_router") router_builder.add_conditional_edges( "llm_call_router", route_decision, { # Name returned by route_decision : Name of next node to visit "llm_call_1": "llm_call_1", "llm_call_2": "llm_call_2", "llm_call_3": "llm_call_3", }, ) router_builder.add_edge("llm_call_1", END) router_builder.add_edge("llm_call_2", END) router_builder.add_edge("llm_call_3", END)
from typing import Annotated, List import operator
# Schema for structured output to use in planning classSection(BaseModel): name: str = Field( description="Name for this section of the report.", ) description: str = Field( description="Brief overview of the main topics and concepts to be covered in this section.", )
classSections(BaseModel): sections: List[Section] = Field( description="Sections of the report.", )
# Augment the LLM with schema for structured output planner = llm.with_structured_output(Sections)
# Graph state classState(TypedDict): topic: str# Report topic sections: list[Section] # List of report sections completed_sections: Annotated[ list, operator.add ] # All workers write to this key in parallel final_report: str# Final report
# Worker state classWorkerState(TypedDict): section: Section completed_sections: Annotated[list, operator.add]
# Nodes deforchestrator(state: State): """Orchestrator that generates a plan for the report"""
# Generate queries report_sections = planner.invoke( [ SystemMessage(content="Generate a plan for the report."), HumanMessage(content=f"Here is the report topic: {state['topic']}"), ] )
return {"sections": report_sections.sections}
defllm_call(state: WorkerState): """Worker writes a section of the report"""
# Generate section section = llm.invoke( [ SystemMessage( content="Write a report section following the provided name and description. Include no preamble for each section. Use markdown formatting." ), HumanMessage( content=f"Here is the section name: {state['section'].name} and description: {state['section'].description}" ), ] )
# Write the updated section to completed sections return {"completed_sections": [section.content]}
defsynthesizer(state: State): """Synthesize full report from sections"""
# List of completed sections completed_sections = state["completed_sections"]
# Format completed section to str to use as context for final sections completed_report_sections = "\n\n---\n\n".join(completed_sections)
# Conditional edge function to create llm_call workers that each write a section of the report defassign_workers(state: State): """Assign a worker to each section in the plan"""
# Kick off section writing in parallel via Send() API return [Send("llm_call", {"section": s}) for s in state["sections"]]
# Schema for structured output to use in evaluation classFeedback(BaseModel): grade: Literal["funny", "not funny"] = Field( description="Decide if the joke is funny or not.", ) feedback: str = Field( description="If the joke is not funny, provide feedback on how to improve it.", )
# Augment the LLM with schema for structured output evaluator = llm.with_structured_output(Feedback)
# Nodes defllm_call_generator(state: State): """LLM generates a joke"""
if state.get("feedback"): msg = llm.invoke( f"Write a joke about {state['topic']} but take into account the feedback: {state['feedback']}" ) else: msg = llm.invoke(f"Write a joke about {state['topic']}") return {"joke": msg.content}
defllm_call_evaluator(state: State): """LLM evaluates the joke"""
# Conditional edge function to route back to joke generator or end based upon feedback from the evaluator defroute_joke(state: State): """Route back to joke generator or end based upon feedback from the evaluator"""
# Add the nodes optimizer_builder.add_node("llm_call_generator", llm_call_generator) optimizer_builder.add_node("llm_call_evaluator", llm_call_evaluator)
# Add edges to connect nodes optimizer_builder.add_edge(START, "llm_call_generator") optimizer_builder.add_edge("llm_call_generator", "llm_call_evaluator") optimizer_builder.add_conditional_edges( "llm_call_evaluator", route_joke, { # Name returned by route_joke : Name of next node to visit "Accepted": END, "Rejected + Feedback": "llm_call_generator", }, )
# Compile the workflow optimizer_workflow = optimizer_builder.compile()
# Show the workflow display(Image(optimizer_workflow.get_graph().draw_mermaid_png()))
# Invoke state = optimizer_workflow.invoke({"topic": "Cats"}) print(state["joke"])
import logging import operator import os from typing import Annotated, Any, Dict, List, TypedDict from langchain_community.chat_models import ChatTongyi from langgraph.graph import END, START, StateGraph from langgraph.types import Send from pydantic import BaseModel, Field from tools.weathertool import OpenWeatherTool
defrecommend_node(state: AgentState) -> Dict[str, Any]: """Generates city recommendations based on current location.""" logger.info(f"Generating recommendations for location: {state['current_location']}") structured_llm = llm.with_structured_output(RecommendOutput) prompt = f"Located in {state['current_location']}. Recommend 3 suitable cities for a weekend road trip. Return English names only." try: result = structured_llm.invoke(prompt) logger.info(f"Generated candidates: {result.cities}") return {"candidates": [{"city": c, "reason": "LLM Recommendation"} for c in result.cities]} except Exception as e: logger.error(f"Recommendation failed: {e}") raise
defweather_worker(state: WeatherWorkerState) -> Dict[str, Any]: """Fetches weather data for a specific city.""" city = state["city"] try: tool = OpenWeatherTool() weather_data = tool.invoke({"city": city}) return {"weather_reports": [weather_data]} except Exception as e: logger.error(f"Weather fetch failed for {city}: {e}") return {"weather_reports": [{"city": city, "status": "error"}]}
defdecision_node(state: AgentState) -> Dict[str, Any]: """Selects the best destination based on weather conditions.""" logger.info("Evaluating weather reports for final decision.") reports = state["weather_reports"] # 过滤无效报告 valid_reports = [r for r in reports if r.get("status") != "error"] ifnot valid_reports: raise ValueError("No valid weather reports available.")
# 决策逻辑:优先晴天 -> 其次无雨 -> 最后兜底 sunny_cities = [r for r in valid_reports if r.get("condition") in ["Sunny", "Clear"]] non_rainy = [r for r in valid_reports if"Rain"notin r.get("condition", "")]
defmap_weather_tasks(state: AgentState) -> List[Send]: """Generates parallel tasks for weather checking.""" return [Send("weather_worker", {"city": c["city"]}) for c in state["candidates"]]