当前位置: 首页 > news >正文

LangGraph实战:构建可控、可调试的复杂AI工作流

LangChain 适合快速原型,但生产级 AI 工作流需要更精细的状态管理和流程控制。LangGraph 的图结构为复杂 Agent 提供了清晰的架构范式。本文通过实战案例,掌握 LangGraph 的核心工程模式。

为什么需要 LangGraph传统 LangChain Chain 的执行是线性的:A → B → C。这对简单场景够用,但真实的 AI 应用往往需要:-条件分支:根据判断结果走不同路径-循环重试:失败时重试,成功时跳出-并行执行:多个子任务同时进行-人工介入:关键决策时暂停等待人工确认这些需求用线性 Chain 实现会让代码极其复杂且难以维护。LangGraph 用有向图替代线性链,让复杂工作流的实现变得清晰。## 核心概念LangGraph 的三个核心概念:State(状态):图中流转的数据结构。所有节点共享同一个状态,每个节点读取并更新它。pythonfrom typing import TypedDict, Annotated, Listfrom langgraph.graph import add_messagesclass AgentState(TypedDict): messages: Annotated[List, add_messages] # 对话历史(自动追加) task: str # 当前任务描述 artifacts: List[dict] # 生成的产出物 error: str # 错误信息(如果有) retry_count: int # 重试次数Node(节点):处理状态的函数。每个节点接收状态,返回更新后的状态片段。Edge(边):节点间的连接。可以是固定连接(无条件流转)或条件边(根据状态决定下一个节点)。## 基础:Research Agent 实战下面实现一个研究助手 Agent,能搜索、分析、生成报告:pythonfrom langgraph.graph import StateGraph, ENDfrom langchain_openai import ChatOpenAIfrom langchain_community.tools.tavily_search import TavilySearchResultsfrom langchain_core.messages import HumanMessage, AIMessage, ToolMessageimport json# 初始化工具llm = ChatOpenAI(model="gpt-4o", temperature=0)search_tool = TavilySearchResults(max_results=5)tools = [search_tool]llm_with_tools = llm.bind_tools(tools)# ──── 节点定义 ────def research_agent(state: AgentState) -> dict: """主 Agent 节点:分析任务,决定下一步行动""" system_prompt = """你是一个研究助手。分析用户的研究任务, 使用搜索工具收集信息,然后综合生成高质量报告。 当你认为收集的信息足够生成报告时,直接生成报告内容。""" messages = [{"role": "system", "content": system_prompt}] + state["messages"] response = llm_with_tools.invoke(messages) return {"messages": [response]}def tool_executor(state: AgentState) -> dict: """工具执行节点:执行 Agent 调用的工具""" last_message = state["messages"][-1] tool_results = [] for tool_call in last_message.tool_calls: if tool_call["name"] == "tavily_search_results_json": result = search_tool.invoke(tool_call["args"]) tool_results.append( ToolMessage( content=json.dumps(result, ensure_ascii=False), tool_call_id=tool_call["id"] ) ) return {"messages": tool_results}def generate_report(state: AgentState) -> dict: """报告生成节点:将研究结果整理为结构化报告""" report_prompt = f"""基于以下研究任务和收集的信息,生成一份结构化报告:任务:{state["task"]}请生成包含以下部分的报告:1. 执行摘要(200字)2. 主要发现(3-5点)3. 详细分析4. 结论与建议""" messages = state["messages"] + [HumanMessage(content=report_prompt)] response = llm.invoke(messages) artifact = { "type": "report", "content": response.content, "task": state["task"] } return { "messages": [response], "artifacts": state.get("artifacts", []) + [artifact] }# ──── 条件边函数 ────def should_use_tools(state: AgentState) -> str: """判断是否需要调用工具""" last_message = state["messages"][-1] # 如果最后一条消息有工具调用,进入工具执行节点 if hasattr(last_message, "tool_calls") and last_message.tool_calls: return "use_tools" # 否则,进入报告生成节点 return "generate_report"# ──── 构建图 ────workflow = StateGraph(AgentState)# 添加节点workflow.add_node("research_agent", research_agent)workflow.add_node("tool_executor", tool_executor)workflow.add_node("generate_report", generate_report)# 设置入口workflow.set_entry_point("research_agent")# 添加条件边:research_agent 根据是否有工具调用决定走向workflow.add_conditional_edges( "research_agent", should_use_tools, { "use_tools": "tool_executor", "generate_report": "generate_report" })# 工具执行后回到 research_agentworkflow.add_edge("tool_executor", "research_agent")# 报告生成后结束workflow.add_edge("generate_report", END)# 编译图app = workflow.compile()# 运行示例initial_state = { "messages": [HumanMessage(content="请研究2026年LLM推理优化的最新进展")], "task": "2026年LLM推理优化最新进展", "artifacts": [], "error": "", "retry_count": 0}result = app.invoke(initial_state)print(result["artifacts"][-1]["content"])## 进阶:带人工介入的审批流程生产级 Agent 往往需要"暂停并等待人工确认"的能力:pythonfrom langgraph.checkpoint.memory import MemorySaverfrom langgraph.graph import interrupt_before# 使用 Checkpointer 支持暂停恢复memory = MemorySaver()def draft_action(state: AgentState) -> dict: """生成拟执行的操作(高风险操作,需审批)""" response = llm.invoke(state["messages"]) return { "messages": [response], "pending_action": response.content }def execute_action(state: AgentState) -> dict: """执行已审批的操作""" action = state.get("pending_action", "") # 实际执行逻辑... return {"messages": [AIMessage(content=f"已执行:{action}")]}# 构建带审批的工作流approval_workflow = StateGraph(AgentState)approval_workflow.add_node("draft", draft_action)approval_workflow.add_node("execute", execute_action)approval_workflow.set_entry_point("draft")approval_workflow.add_edge("draft", "execute") # draft → execute(中间会暂停)approval_workflow.add_edge("execute", END)# interrupt_before 指定在哪个节点前暂停app_with_approval = approval_workflow.compile( checkpointer=memory, interrupt_before=["execute"] # 在 execute 前暂停等待人工确认)# 运行到暂停点thread_id = "task-001"config = {"configurable": {"thread_id": thread_id}}result = app_with_approval.invoke(initial_state, config)# 此时图已暂停在 execute 节点之前# 人工审查后,恢复执行print("请审查以下操作:", result["pending_action"])user_approve = input("是否批准?(y/n): ")if user_approve == "y": # 从断点继续执行 final_result = app_with_approval.invoke(None, config)## 并行执行:多任务同时处理pythonfrom langgraph.graph import Senddef parallel_research_router(state: AgentState): """将研究任务拆分为多个并行子任务""" topics = state.get("topics", []) # 使用 Send 将任务分发给并行执行的节点 return [ Send("research_subtask", {"topic": topic, "messages": []}) for topic in topics ]def research_subtask(state: dict) -> dict: """单个子任务的研究执行""" topic = state["topic"] result = search_tool.invoke({"query": topic}) return {"subtask_result": {"topic": topic, "data": result}}def aggregate_results(state: AgentState) -> dict: """聚合所有子任务结果""" # subtask_results 会包含所有并行子任务的输出 all_results = state.get("subtask_results", []) aggregated = "\n\n".join([ f"## {r['topic']}\n{r['data']}" for r in all_results ]) return {"aggregated_research": aggregated}## 错误处理与重试pythondef robust_agent_node(state: AgentState) -> dict: """带错误处理的健壮 Agent 节点""" max_retries = 3 retry_count = state.get("retry_count", 0) if retry_count >= max_retries: return { "error": f"超过最大重试次数({max_retries})", "status": "failed" } try: response = llm_with_tools.invoke(state["messages"]) return { "messages": [response], "error": "", "retry_count": 0 # 成功后重置重试计数 } except Exception as e: return { "error": str(e), "retry_count": retry_count + 1 }def should_retry_or_fail(state: AgentState) -> str: if state.get("error") and state.get("retry_count", 0) < 3: return "retry" elif state.get("error"): return "fail" else: return "continue"## 可观测性:调试复杂图LangGraph 提供内置的追踪能力:python# 启用 LangSmith 追踪import osos.environ["LANGCHAIN_TRACING_V2"] = "true"os.environ["LANGCHAIN_API_KEY"] = "your-api-key"os.environ["LANGCHAIN_PROJECT"] = "my-agent-project"# 流式输出每个节点的状态变化for event in app.stream(initial_state): for node_name, node_output in event.items(): print(f"\n{'='*40}") print(f"节点: {node_name}") if "messages" in node_output: last_msg = node_output["messages"][-1] print(f"输出: {last_msg.content[:200]}...")# 获取图的可视化from IPython.display import ImageImage(app.get_graph().draw_mermaid_png())## 生产部署:LangGraph ServerLangGraph 提供 Server 模式用于生产部署:bash# 安装 LangGraph CLIpip install langgraph-cli# 创建配置文件 langgraph.jsoncat > langgraph.json << 'EOF'{ "dependencies": ["."], "graphs": { "research_agent": "./my_agent.py:app" }}EOF# 启动服务langgraph up服务器提供 REST API 和 WebSocket 接口,支持:- 创建和管理 Thread(对话线程)- 流式获取执行状态- 暂停和恢复执行- 完整的执行历史查询## 最佳实践总结1.状态设计先行:花时间设计好 State 结构,它是整个图的基础2.节点保持单一职责:每个节点只做一件事,便于测试和复用3.条件边逻辑外置:路由逻辑放在独立的判断函数中,不要嵌入节点4.使用 Checkpointer:生产环境必须启用,支持暂停恢复和错误恢复5.流式输出:用app.stream()代替app.invoke(),提升用户体验6.错误不应静默失败:在状态中维护 error 字段,在条件边中处理错误路径LangGraph 的学习曲线比 LangChain 陡,但一旦掌握,你会发现复杂 Agent 系统的开发效率和可维护性都有质的飞跃。

http://www.cnnetsun.cn/news/2516259.html

相关文章:

  • 免费卸载软件再推荐!支持多款软件同时卸载、注册表清理、垃圾文件清理、空文件查找、进程管理、启动管理等等功能!强制卸载+系统清理,绝了
  • 一次性掌握Mapbox地图开发框架
  • web服务器的实验(RHCE)
  • JSON差异对比终极指南:3分钟掌握开源神器操作技巧
  • 条码唯一性比对系统的技术实现与工业落地
  • 国产 AI 漫剧制作工具有哪些?5 款高性价比工具实测,新手也能快速出片
  • 搭建CMake+Ninja+GCC开发GD32
  • Yolov8-pose关键点检测:CVPR2026 UCMNet |FrequencyCM赋能YOLO C2f:从频域增强视角解决感受野与细节瓶颈
  • 视频号视频下载去水印方法全是坑?全网视频一键拿捏!2026封神玩法!
  • 重磅首发|医学文献王Mac版+Office引用加载项同步上线,今晚直播解锁科研高效密码
  • Sora 2动态纹理流送与Unreal Niagara系统深度联调,GPU显存占用降低63%——一线影视工作室内部技术备忘录
  • DeepSeek V2 vs. DeepSeek-R1:参数冻结策略、LoRA适配层、量化精度损失的3维硬核对比
  • 【2024最新】ChatGPT SEO文章写作SOP:含关键词布局模板、EEAT强化话术、结构化Schema注入三步法
  • 【机密级部署白皮书首发】:DeepSeek-V2.5私有化集群在信创环境(鲲鹏920+统信UOS+达梦V8)的12小时极速上线实录
  • 产品经理核心能力,根本不是画原型
  • 终极指南:如何实现《塞尔达传说:旷野之息》Switch与WiiU存档无缝互通
  • Ender-3固件配置:从困惑到精通的完整指南
  • 大数据之安装HBase2.2.6
  • 终极指南:快速完整破解Cursor Pro限制的免费工具
  • Mac Mouse Fix完整指南:让你的普通鼠标变身专业级Mac输入设备
  • AI智能切片不是‘一键分割’就完事:批量口播视频的工程化切片陷阱与工具选型
  • 2026年AI工具格局不会由算法决定,而由这4个非技术变量主导:合规审计周期、客户LTV/CAC比值、模型即服务SLA违约金条款、边缘推理延迟容忍阈值
  • OFC求解工具横评—5款Pineapple EV实测
  • 互联网招聘独角兽拉勾网破产,AI浪潮下在线招聘赛道竞争正当时
  • 企业级Agent架构实战:竞争情报来源分散,无法系统化整理分析怎么办?
  • NVIDIA Profile Inspector完全指南:解锁显卡700+隐藏设置,游戏性能提升30%
  • 观察Taotoken在高峰时段的请求成功率与路由稳定性
  • Esp32Robot入门05-大模型接口对接与配置(实战进阶:对接Qwen3.6-35B本地大模型与API配置实战)
  • 360浏览器隐私怎么清理?【图文讲解】360浏览器缓存清理?360浏览器上网痕迹清除?浏览器删除Cookie密码?共用电脑隐私清理?
  • 独立开发者如何管理多个项目的API Key与访问权限