2026年LangGraph 深度实战:从状态图到生产级 Agent 系统——用图结构重新定义 AI 工程边界

LangGraph 深度实战:从状态图到生产级 Agent 系统——用图结构重新定义 AI 工程边界2026 年 AI Agent 已经从实验室走进了每个工程团队 LangChain 的 链式调用 时代正在谢幕 而 LangGraph 用 图结构 接管了复杂 Agent 编排的话语权 本文是一篇 5000 字的深度实战指南 带你从原理到生产落地 彻底搞懂 LangGraph 2022 年 LangChain 横空出世 用

大家好,我是讯享网,很高兴认识大家。这里提供最前沿的Ai技术和互联网信息。



2026 年,AI Agent 已经从实验室走进了每个工程团队。LangChain 的”链式调用”时代正在谢幕,而 LangGraph 用”图结构”接管了复杂 Agent 编排的话语权。本文是一篇 5000+ 字的深度实战指南,带你从原理到生产落地,彻底搞懂 LangGraph。


2022 年,LangChain 横空出世,用 Chain(链)的思路串联 LLM 调用。那时候大多数应用都是:输入 → Prompt → LLM → 输出,线性的,可预期的。

但 2025 年之后,业务需求变了:

  • 用户要求 Agent 能”回头重来”:如果搜索结果不好,重新搜索
  • 要求多个 Agent 并发协作,互相通信
  • 要求在关键决策点”暂停等人审批”(Human-in-the-loop)
  • 要求 Agent 的执行过程可回溯、可 replay

线性 Chain 处理这些需求非常痛苦——你得手动管理状态、手写循环、自己实现分支逻辑。代码很快变成意大利面。

LangGraph 的核心洞察:AI Agent 的执行流程本质上是一个有向图。节点是执行单元,边是流转逻辑,State 是全局共享的上下文。把业务流程画成图,比手写 if-else 直观 10 倍,维护 10 倍容易。


LangGraph 的核心就三个概念:

State 是贯穿整个图的”公共黑板”,所有节点都可以读写它。

from typing import TypedDict, Annotated from langgraph.graph import StateGraph, add_messages

class AgentState(TypedDict):

# messages 字段使用 add_messages reducer,自动追加而非覆盖 messages: Annotated[list, add_messages] # 自定义业务字段 search_results: list[str] retry_count: int final_answer: str 

这里有个关键点:Reducer。默认情况下,节点返回的值会”覆盖”State 对应字段。但对于 messages(消息历史),我们想要追加而不是覆盖,所以用 add_messages 这个内置 Reducer。

你也可以自定义 Reducer:

from typing import Annotated import operator

def merge_results(left: list, right: list) -> list:

"""去重合并搜索结果""" return list(set(left + right)) 

class ResearchState(TypedDict):

# 自定义 reducer,搜索结果去重合并 search_results: Annotated[list, merge_results] # 普通字段,直接覆盖 current_query: str is_done: bool 

节点是执行单元,本质是一个函数:接收 State,返回更新后的 State(部分更新即可)。

from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage, SystemMessage

llm = ChatOpenAI(model="gpt-4o", temperature=0)

def research_planner(state: AgentState) -> dict:

"""规划节点:分析用户问题,制定研究计划""" system_prompt = """你是一个研究规划专家。 分析用户的研究问题,制定一个清晰的调研计划。 返回 3-5 个具体的搜索查询词,用于后续信息收集。""" response = llm.invoke([ SystemMessage(content=system_prompt), *state["messages"] ]) return {"messages": [response]} 

def web_searcher(state: AgentState) -> dict:

"""搜索节点:执行网络搜索""" # 从最新消息中提取搜索词 last_message = state["messages"][-1].content # 模拟搜索(实际替换为 SerpAPI、Tavily 等) results = mock_search(last_message) return 

def quality_checker(state: AgentState) -> dict:

"""质量检查节点:判断搜索结果是否足够""" results = state.get("search_results", []) retry_count = state.get("retry_count", 0) if len(results) < 3 and retry_count < 2: # 结果不够,需要重新搜索 return {"retry_count": retry_count + 1} else: return {"is_done": True} 

def report_writer(state: AgentState) -> dict:

"""报告生成节点:基于搜索结果撰写报告""" system_prompt = """基于提供的搜索结果,撰写一份详细的研究报告。 要求:逻辑清晰,数据准确,有独特见解。""" context = " 

".join(state.get("search_results", []))

response = llm.invoke([ SystemMessage(content=f"{system_prompt} 

参考资料: {context}"),

 *state["messages"] ]) return { "messages": [response], "final_answer": response.content } 

边决定执行流转逻辑。LangGraph 支持三种边:

普通边:无条件跳转

graph.add_edge("planner", "searcher") 

条件边:根据 State 动态决定下一个节点

def route_after_check(state: AgentState) -> str:

"""根据质量检查结果决定流转方向""" if state.get("is_done"): return "writer" # 结果够了,写报告 elif state.get("retry_count", 0) < 2: return "searcher" # 重试搜索 else: return "writer" # 超过最大重试次数,强制写报告 

graph.add_conditional_edges(

"quality_check", route_after_check, { "writer": "writer", "searcher": "searcher" } 

)

入口和出口

from langgraph.graph import START, END

graph.add_edge(START, "planner") # 图的入口 graph.add_edge("writer", END) # 图的出口


把上面三个概念组合起来,搭建一个完整的”超级研究员” Agent:

from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import MemorySaver

def build_research_agent():

# 1. 初始化图 workflow = StateGraph(AgentState) # 2. 添加节点 workflow.add_node("planner", research_planner) workflow.add_node("searcher", web_searcher) workflow.add_node("quality_check", quality_checker) workflow.add_node("writer", report_writer) # 3. 定义边(流转逻辑) workflow.add_edge(START, "planner") # 入口 → 规划 workflow.add_edge("planner", "searcher") # 规划 → 搜索 workflow.add_edge("searcher", "quality_check") # 搜索 → 质量检查 # 条件边:根据质量检查结果决定 workflow.add_conditional_edges( "quality_check", route_after_check, {"writer": "writer", "searcher": "searcher"} ) workflow.add_edge("writer", END) # 写报告 → 结束 # 4. 添加 checkpoint(持久化 State,支持 Human-in-the-loop) memory = MemorySaver() # 5. 编译图 agent = workflow.compile(checkpointer=memory) return agent 

运行 Agent

agent = build_research_agent()

配置线程 ID(每个对话独立的上下文)

config = {"configurable": {"thread_id": "research-001"}}

result = agent.invoke(

{"messages": [HumanMessage(content="分析 2026 年 AI Agent 技术的发展趋势")]}, config=config 

)

print(result["final_answer"])

LangGraph 支持一行代码生成图的可视化:

# 生成 Mermaid 图表 from IPython.display import display, Image

display(Image(agent.get_graph().draw_mermaid_png()))

输出类似:

START → planner → searcher → quality_check

 ↑ ↓ └──────────────┘ (retry) ↓ writer → END 


这是 LangGraph 最强大的功能之一——在执行到关键节点时,暂停等待人类输入,然后继续执行。

# 编译时指定需要中断的节点 agent = workflow.compile(

checkpointer=memory, interrupt_before=["writer"] # 在写报告前中断,让人类审核搜索结果 

)

config = {"configurable": {"thread_id": "research-002"}}

第一次运行:执行到 writer 节点前暂停

result = agent.invoke(

{"messages": [HumanMessage(content="分析量子计算的商业化进展")]}, config=config 

)

此时 result 包含已完成的搜索结果

print("搜索结果:") for r in result["search_results"]:

print(f" - {r}") 

可以检查当前状态

current_state = agent.get_state(config) print(f"下一步:{current_state.next}") # (‘writer’,)

# 人类可以修改状态(例如补充额外信息) agent.update_state(

config, {"search_results": result["search_results"] + ["额外补充:量子纠错突破最新进展..."]} 

)

继续执行(从中断点恢复)

final_result = agent.invoke(None, config=config) print(final_result["final_answer"])

一个更实际的例子——邮件发送审批工作流:

from typing import TypedDict, Annotated from langgraph.graph import StateGraph, START, END, add_messages from langchain_core.messages import HumanMessage, AIMessage

class EmailWorkflowState(TypedDict):

messages: Annotated[list, add_messages] draft_email: str approved: bool recipient: str 

def draft_email_node(state: EmailWorkflowState) -> dict:

"""起草邮件""" prompt = f"为 {state['recipient']} 起草一封专业的商务邮件,内容:{state['messages'][-1].content}" response = llm.invoke([HumanMessage(content=prompt)]) return { "messages": [response], "draft_email": response.content } 

def send_email_node(state: EmailWorkflowState) -> dict:

"""发送邮件(只有 approved=True 才会到这里)""" # 实际发送逻辑 print(f"✅ 邮件已发送给 {state['recipient']}") return {"messages": [AIMessage(content="邮件发送成功!")]} 

def reject_email_node(state: EmailWorkflowState) -> dict:

"""拒绝发送""" return {"messages": [AIMessage(content="邮件发送已取消。")]} 

def route_after_approval(state: EmailWorkflowState) -> str:

if state.get("approved"): return "send" return "reject" 

构建工作流

workflow = StateGraph(EmailWorkflowState) workflow.add_node("draft", draft_email_node) workflow.add_node("send", send_email_node) workflow.add_node("reject", reject_email_node)

workflow.add_edge(START, "draft") workflow.add_conditional_edges(

"draft", route_after_approval, {"send": "send", "reject": "reject"} 

) workflow.add_edge("send", END) workflow.add_edge("reject", END)

memory = MemorySaver() email_agent = workflow.compile(

checkpointer=memory, interrupt_after=["draft"] # 起草后中断等待审批 

)

config = {"configurable": {"thread_id": "email-001"}}

触发工作流

email_agent.invoke(

{ "messages": [HumanMessage(content="通知项目团队明天下午3点开会")], "recipient": "", "approved": False }, config=config 

)

查看草稿

state = email_agent.get_state(config) print("📧 邮件草稿:") print(state.values["draft_email"])

人类审批(批准发送)

email_agent.update_state(config, {"approved": True})

继续执行

email_agent.invoke(None, config=config)


LangGraph 支持构建多 Agent 系统,最常见的是 Supervisor 架构:一个 Supervisor Agent 负责分配任务,多个 Worker Agent 负责执行。

from langgraph.graph import StateGraph, START, END from langgraph_supervisor import create_supervisor from langgraph.prebuilt import create_react_agent

创建专门的 Worker Agent

research_agent = create_react_agent(

llm, tools=[web_search_tool, arxiv_search_tool], name="researcher", prompt="你是研究专家,负责搜集和分析信息。" 

)

code_agent = create_react_agent(

llm, tools=[python_repl_tool, code_sandbox_tool], name="coder", prompt="你是代码专家,负责编写和调试代码。" 

)

writing_agent = create_react_agent(

llm, tools=[], name="writer", prompt="你是写作专家,负责将信息整理成清晰的文章。" 

)

创建 Supervisor

supervisor = create_supervisor(

agents=[research_agent, code_agent, writing_agent], model=llm, prompt="""你是一个项目经理。根据任务需求,决定让哪个专家来处理: - 需要搜索信息 → researcher - 需要写代码 → coder - 需要写文档 → writer 当任务完成时,汇总结果并结束。""" 

)

multi_agent = supervisor.compile(checkpointer=MemorySaver())

另一种模式是 Swarm(蜂群):Agent 之间可以互相”移交”控制权,不需要中央 Supervisor。

from langgraph_swarm import create_swarm, create_handoff_tool

定义移交工具

transfer_to_coder = create_handoff_tool(

agent_name="coder", description="当需要写代码或调试程序时,移交给代码专家" 

)

transfer_to_writer = create_handoff_tool(

agent_name="writer", description="当需要写文档或报告时,移交给写作专家" 

)

研究 Agent(可以移交给 coder 或 writer)

research_agent = create_react_agent(

llm, tools=[web_search_tool, transfer_to_coder, transfer_to_writer], name="researcher" 

)

代码 Agent(可以移交回 researcher 或给 writer)

transfer_to_researcher = create_handoff_tool(agent_name="researcher") code_agent = create_react_agent(

llm, tools=[python_repl_tool, transfer_to_researcher, transfer_to_writer], name="coder" 

)

组装 Swarm

swarm = create_swarm(

agents=[research_agent, code_agent, writing_agent], default_active_agent="researcher" # 从 researcher 开始 

)


内存版 MemorySaver 不适合生产,进程重启就丢失了。生产环境需要持久化:

# PostgreSQL 持久化(推荐生产使用) from langgraph.checkpoint.postgres import PostgresSaver

connection_string = "postgresql://user:password@localhost:5432/langgraph_db" checkpointer = PostgresSaver.from_conn_string(connection_string)

初始化数据库表(只需执行一次)

checkpointer.setup()

agent = workflow.compile(checkpointer=checkpointer)

# Redis 持久化(适合高并发场景) from langgraph.checkpoint.redis import RedisSaver

checkpointer = RedisSaver.from_conn_string("redis://localhost:6379") agent = workflow.compile(checkpointer=checkpointer)

LangGraph 原生支持流式输出,适合 Web 应用实时展示进度:

import asyncio

async def stream_research(query: str):

config = {"configurable": {"thread_id": f"stream-{query[:10]}"}} async for chunk in agent.astream( {"messages": [HumanMessage(content=query)]}, config=config, stream_mode="updates" # 每次节点更新都推送 ): node_name = list(chunk.keys())[0] node_output = chunk[node_name] # 实时打印节点执行情况 print(f" 

[{node_name}] 执行完成")

 if "messages" in node_output: last_msg = node_output["messages"][-1] if hasattr(last_msg, "content"): print(f"输出:{last_msg.content[:200]}...") 

运行

asyncio.run(stream_research("分析 Rust 在系统编程中的最新进展"))

也可以流式获取 Token 级别的输出:

async for chunk in agent.astream(

{"messages": [HumanMessage(content=query)]}, config=config, stream_mode="messages" # Token 级流式 

):

if isinstance(chunk, tuple): message, metadata = chunk if hasattr(message, "content") and message.content: print(message.content, end="", flush=True) 

LangChain 官方提供了 LangGraph Platform,一个专门用于部署 LangGraph 应用的托管服务:

# langgraph.json 配置文件 { "dependencies": ["."], "graphs": {

"research_agent": "./src/agent.py:graph", "email_workflow": "./src/email.py:workflow" 

}, "env": ".env" }

部署后会自动提供:

  • REST API(POST /runs, GET /runs/{run_id}/stream
  • WebSocket 支持
  • 水平扩展
  • 内置监控和可观测性

LangGraph 支持节点并发——没有依赖关系的节点可以并行执行:

from langgraph.graph import StateGraph, START, END from typing import TypedDict, Annotated import operator

class ParallelState(TypedDict):

query: str # 三个并发搜索结果,用 operator.add 合并 web_results: Annotated[list, operator.add] arxiv_results: Annotated[list, operator.add] news_results: Annotated[list, operator.add] final_report: str 

def web_search(state: ParallelState) -> dict:

"""并发节点1:网页搜索""" results = search_web(state["query"]) return {"web_results": results} 

def arxiv_search(state: ParallelState) -> dict:

"""并发节点2:学术搜索""" results = search_arxiv(state["query"]) return {"arxiv_results": results} 

def news_search(state: ParallelState) -> dict:

"""并发节点3:新闻搜索""" results = search_news(state["query"]) return {"news_results": results} 

def synthesize(state: ParallelState) -> dict:

"""汇总节点:在所有并发节点完成后执行""" all_results = state["web_results"] + state["arxiv_results"] + state["news_results"] report = generate_report(state["query"], all_results) return {"final_report": report} 

构建并发图

workflow = StateGraph(ParallelState) workflow.add_node("web_search", web_search) workflow.add_node("arxiv_search", arxiv_search) workflow.add_node("news_search", news_search) workflow.add_node("synthesize", synthesize)

从 START 同时触发三个搜索节点(并发)

workflow.add_edge(START, "web_search") workflow.add_edge(START, "arxiv_search") workflow.add_edge(START, "news_search")

三个节点都完成后执行汇总

workflow.add_edge("web_search", "synthesize") workflow.add_edge("arxiv_search", "synthesize") workflow.add_edge("news_search", "synthesize") workflow.add_edge("synthesize", END)

parallel_agent = workflow.compile()

这样三个搜索任务并发执行,总耗时从串行的 3 × T 降低到约 1 × T

from functools import lru_cache import hashlib

def cached_search(state: AgentState) -> dict:

"""带缓存的搜索节点""" query = state["messages"][-1].content cache_key = hashlib.md5(query.encode()).hexdigest() # 检查缓存 cached = redis_client.get(f"search:{cache_key}") if cached: return {"search_results": json.loads(cached)} # 执行搜索 results = actual_search(query) # 写入缓存(1小时过期) redis_client.setex(f"search:{cache_key}", 3600, json.dumps(results)) return {"search_results": results} 

import asyncio from collections import deque from datetime import datetime, timedelta

class RateLimiter:

def __init__(self, max_calls: int, period: float): self.max_calls = max_calls self.period = period self.calls = deque() async def acquire(self): now = datetime.now() # 清理过期的调用记录 while self.calls and self.calls[0] < now - timedelta(seconds=self.period): self.calls.popleft() if len(self.calls) >= self.max_calls: # 等待直到有空位 wait_time = (self.calls[0] + timedelta(seconds=self.period) - now).total_seconds() await asyncio.sleep(wait_time) self.calls.append(now) 

每分钟最多 60 次 LLM 调用

llm_limiter = RateLimiter(max_calls=60, period=60)

async def rate_limited_llm_node(state: AgentState) -> dict:

await llm_limiter.acquire() response = await llm.ainvoke(state["messages"]) return {"messages": [response]} 


import os os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key" os.environ["LANGCHAIN_PROJECT"] = "research-agent-prod"

之后所有的 LangGraph 执行都会自动上报到 LangSmith

可以看到每个节点的输入输出、耗时、Token 消耗等

from langchain_core.callbacks import BaseCallbackHandler from datetime import datetime

class AgentMonitorCallback(BaseCallbackHandler):

def __init__(self, metrics_client): self.metrics = metrics_client def on_chain_start(self, serialized, inputs, kwargs): chain_name = serialized.get("name", "unknown") self.metrics.increment(f"agent.node.start.{chain_name}") self.start_time = datetime.now() def on_chain_end(self, outputs, kwargs): duration = (datetime.now() - self.start_time).total_seconds() self.metrics.histogram("agent.node.duration", duration) def on_chain_error(self, error, kwargs): self.metrics.increment("agent.node.error") logger.error(f"节点执行错误: {error}") 

使用回调

monitor = AgentMonitorCallback(prometheus_client) result = agent.invoke(

{"messages": [HumanMessage(content=query)]}, config={ "configurable": {"thread_id": "prod-001"}, "callbacks": [monitor] } 

)


综合以上技术,实现一个完整的代码审查 Agent:

from typing import TypedDict, Annotated, Literal from langgraph.graph import StateGraph, START, END, add_messages from langgraph.checkpoint.postgres import PostgresSaver from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage, AIMessage, SystemMessage

llm = ChatOpenAI(model="gpt-4o", temperature=0)

class CodeReviewState(TypedDict):

messages: Annotated[list, add_messages] code_snippet: str language: str security_issues: list[str] performance_issues: list[str] style_issues: list[str] overall_score: int # 0-100 review_approved: bool revision_count: int 

===== 节点实现 =====

def detect_language(state: CodeReviewState) -> dict:

"""检测编程语言""" response = llm.invoke([ SystemMessage(content="判断代码使用的编程语言,只返回语言名称,如 Python、JavaScript、Go 等"), HumanMessage(content=state["code_snippet"]) ]) return {"language": response.content.strip()} 

def security_audit(state: CodeReviewState) -> dict:

"""安全审查节点""" response = llm.invoke([ SystemMessage(content=f"""你是 {state['language']} 安全专家。 审查代码中的安全问题,包括:SQL注入、XSS、命令注入、敏感信息泄露、权限漏洞等。 以 JSON 列表形式返回问题,如:["存在 SQL 注入风险:第3行", "硬编码密码:第7行"] 如果没有安全问题,返回 []"""), HumanMessage(content=state["code_snippet"]) ]) try: import json issues = json.loads(response.content) except: issues = [response.content] if response.content != "[]" else [] return {"security_issues": issues} 

def performance_audit(state: CodeReviewState) -> dict:

"""性能审查节点(与安全审查并发执行)""" response = llm.invoke([ SystemMessage(content=f"""你是 {state['language']} 性能优化专家。 审查代码中的性能问题,包括:N+1查询、不必要的循环、内存泄漏、阻塞I/O等。 以 JSON 列表形式返回问题。"""), HumanMessage(content=state["code_snippet"]) ]) try: import json issues = json.loads(response.content) except: issues = [response.content] if response.content.strip() else [] return {"performance_issues": issues} 

def style_audit(state: CodeReviewState) -> dict:

"""代码风格审查节点(与前两个并发执行)""" response = llm.invoke([ SystemMessage(content=f"""你是 {state['language']} 代码风格专家。 审查代码风格问题:命名规范、注释质量、函数长度、复杂度等。 以 JSON 列表形式返回问题。"""), HumanMessage(content=state["code_snippet"]) ]) try: import json issues = json.loads(response.content) except: issues = [] return {"style_issues": issues} 

def score_and_summarize(state: CodeReviewState) -> dict:

"""汇总评分节点""" security = state.get("security_issues", []) performance = state.get("performance_issues", []) style = state.get("style_issues", []) # 计算分数(安全问题扣分最多) base_score = 100 base_score -= len(security) * 15 # 每个安全问题扣15分 base_score -= len(performance) * 8 # 每个性能问题扣8分 base_score -= len(style) * 3 # 每个风格问题扣3分 score = max(0, min(100, base_score)) # 生成总结报告 summary_prompt = f"""基于以下审查结果,生成一份简洁的代码审查报告: 

安全问题({len(security)}个):{security} 性能问题({len(performance)}个):{performance} 风格问题({len(style)}个):{style} 综合评分:{score}/100

要求:

  1. 指出最关键的问题(按优先级)
  2. 给出具体的修改建议
  3. 语气专业但友好"""

    response = llm.invoke([HumanMessage(content=summary_prompt)])

    return {

    "messages": [AIMessage(content=response.content)], "overall_score": score 

    }

def check_approval(state: CodeReviewState) -> Literal["approve", "request_revision"]:

"""决策节点:是否通过审查""" score = state.get("overall_score", 0) security_issues = state.get("security_issues", []) # 安全问题零容忍,或分数低于 60 都需要修改 if security_issues or score < 60: return "request_revision" return "approve" 

def request_revision(state: CodeReviewState) -> dict:

"""请求修改节点""" revision_count = state.get("revision_count", 0) + 1 msg = f"代码审查未通过(第{revision_count}次)。评分:{state['overall_score']}/100。请根据上述问题修改后重新提交。" return { "messages": [AIMessage(content=msg)], "review_approved": False, "revision_count": revision_count } 

def approve_code(state: CodeReviewState) -> dict:

"""批准节点""" msg = f"✅ 代码审查通过!评分:{state['overall_score']}/100。代码质量良好,可以合并。" return { "messages": [AIMessage(content=msg)], "review_approved": True } 

===== 构建图 =====

def build_code_review_agent():

workflow = StateGraph(CodeReviewState) # 添加节点 workflow.add_node("detect_language", detect_language) workflow.add_node("security_audit", security_audit) workflow.add_node("performance_audit", performance_audit) workflow.add_node("style_audit", style_audit) workflow.add_node("score_and_summarize", score_and_summarize) workflow.add_node("request_revision", request_revision) workflow.add_node("approve_code", approve_code) # 定义边 workflow.add_edge(START, "detect_language") # 语言检测后,并发执行三个审查 workflow.add_edge("detect_language", "security_audit") workflow.add_edge("detect_language", "performance_audit") workflow.add_edge("detect_language", "style_audit") # 三个审查完成后汇总 workflow.add_edge("security_audit", "score_and_summarize") workflow.add_edge("performance_audit", "score_and_summarize") workflow.add_edge("style_audit", "score_and_summarize") # 条件判断 workflow.add_conditional_edges( "score_and_summarize", check_approval, { "approve": "approve_code", "request_revision": "request_revision" } ) workflow.add_edge("approve_code", END) workflow.add_edge("request_revision", END) # 生产级 PostgreSQL checkpoint checkpointer = PostgresSaver.from_conn_string( "postgresql://localhost/code_review_db" ) checkpointer.setup() return workflow.compile(checkpointer=checkpointer) 

===== 使用示例 =====

review_agent = build_code_review_agent()

test_code = """ import sqlite3

def get_user(username):

conn = sqlite3.connect('users.db') cursor = conn.cursor() # 危险:直接拼接SQL,存在注入风险 query = f"SELECT * FROM users WHERE username = '{username}'" cursor.execute(query) user = cursor.fetchone() conn.close() return user 

def process_users():

users = get_all_users() result = [] for user in users: # N+1 问题:每个用户单独查询订单 orders = get_user_orders(user['id']) result.append({user, 'orders': orders}) return result 

"""

config = {"configurable": {"thread_id": "review-001"}}

result = review_agent.invoke(

{ "code_snippet": test_code, "messages": [HumanMessage(content="请审查这段代码")], "security_issues": [], "performance_issues": [], "style_issues": [], "revision_count": 0 }, config=config 

)

print(result["messages"][-1].content) print(f" 综合评分:{result[‘overall_score’]}/100") print(f"审查结果:")


维度LangGraphCrewAIAutoGen直接手写 学习曲线中等低中等低(但维护成本高) 灵活性⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐ Human-in-the-loop原生支持有限支持支持需手写 持久化多种后端有限有限需手写 可视化内置无无无 生产成熟度高中中取决于实现 并发支持原生无有限需手写 社区活跃非常活跃活跃活跃N/A

结论:如果你要在生产环境部署复杂的 Agent 系统,LangGraph 是目前最成熟的选择。如果只是快速原型,CrewAI 更简单。AutoGen 适合需要 Agent 间”自由对话”的场景。


# ❌ 错误:每次更新都会覆盖 messages,只保留最新一条 class BadState(TypedDict):

messages: list # 没有 reducer 

✅ 正确:使用 add_messages reducer,自动追加

class GoodState(TypedDict):

messages: Annotated[list, add_messages] 

# ❌ 危险:没有终止条件的条件边可能死循环 def route(state):

if not state["is_done"]: return "retry_node" # 如果 retry_node 不更新 is_done,永远循环! 

✅ 安全:添加最大重试次数限制

def route(state):

if not state["is_done"] and state["retry_count"] < 3: return "retry_node" return "end_node" 

# ❌ 同一个 thread_id 重复使用会叠加历史状态 config = {"configurable": {"thread_id": "fixed-id"}} # 每次都用同一个

✅ 每个独立任务用唯一的 thread_id

import uuid config = {"configurable": {"thread_id": str(uuid.uuid4())}}

# ❌ 全局状态导致并发问题 counter = 0 def bad_node(state):

global counter counter += 1 # 并发时会竞争 return {"count": counter} 

✅ 状态都放 State 里,节点是纯函数

def good_node(state):

return 

  1. State 设计优先:在写任何节点之前,先设计好 State 结构
  2. 节点保持纯函数:不要在节点里维护全局状态
  3. 条件边要有兜底:所有条件分支都要有默认情况
  4. 生产用 PostgreSQL checkpoint:内存 Saver 不适合生产
  5. 流式输出改善用户体验:长任务一定要用 astream
  6. 为重要节点加日志:方便排查 Agent 行为

LangGraph 的出现代表了 AI Agent 工程化的一次重要进步:

它解决的核心问题

  • 复杂 Agent 流程的可维护性(图 vs 意大利面代码)
  • Human-in-the-Loop 的生产级支持
  • 多 Agent 协作的标准化接口
  • 状态持久化和可恢复性

它的局限

  • 对于简单的单轮问答,它是过度设计
  • 学习曲线比 CrewAI 更陡
  • 图结构对动态变化的流程支持有限(节点和边是静态的)

2026 年的展望:随着 AI Agent 从实验走向大规模生产,LangGraph 这类”Agent 操作系统”会变得越来越重要。下一步值得关注的方向:

  • LangGraph + 向量数据库:长期记忆的标准解决方案
  • 跨语言 Agent 互操作:不同框架的 Agent 能互相调用
  • Agent 评估框架:自动化测试 Agent 行为的基准体系

工具只是手段,搞清楚你的业务流程,才是用好 LangGraph 的关键。


本文代码示例基于 LangGraph 0.2.x,依赖:pip install langgraph langchain langchain-openai

小讯
上一篇 2026-04-16 15:42
下一篇 2026-04-16 15:40

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/262348.html