当前位置: 首页 > news >正文

LangGraph+Function Call+Web Scraper多智能体生产实践

1. 这不是玩具,是能跑通真实业务链路的多智能体骨架

LangGraph + Function Call + Web Scraper = Multi-Agent Application——这个等式乍看像极了技术圈里常见的“概念拼贴”,但如果你真把它当PPT标题扫一眼就划走,大概率会错过过去半年我在三个客户项目里反复验证过的一条最轻量、最可控、也最容易落地的多智能体实践路径。它不依赖大模型原生Agent框架的黑盒调度,不强求你立刻上马Orchestrator或Distributed Tracing,而是用LangGraph的图状态机打底,把Function Call作为智能体间通信的“协议层”,再让Web Scraper成为整个系统唯一对外触达真实世界数据的“手”和“眼”。我试过用它在48小时内交付一个竞品价格监控+舆情摘要生成+异常波动告警的闭环系统,从爬取京东/拼多多商品页、解析评论情感倾向,到自动生成运营日报PDF并邮件推送,全程没有一行代码调用LangChain的AgentExecutor,也没有任何“思考-行动-观察”的抽象循环。核心就三点:状态必须显式定义(LangGraph的StateSchema),调用必须契约先行(Function Call的JSON Schema约束),抓取必须隔离可控(Scrapy/Selenium的独立进程封装)。它适合谁?适合那些被“AutoGen太重、LangChain Agent太绕、自己写调度又容易失控”卡住的中型团队技术负责人;适合想用多智能体做真实业务闭环但又不敢碰LLM推理成本的运营/产品同学;更适合正在准备技术面试、需要讲清楚“多智能体到底怎么协作而不是怎么幻觉”的工程师。关键词:LangGraph状态图、Function Call Schema定义、Web Scraper进程隔离、多智能体任务编排、真实数据闭环。这不是教你搭Demo,是给你一套能签单、能上线、能被老板追问“失败了怎么回滚”的生产级骨架。

2. 为什么放弃LangChain Agent,而选择LangGraph打底?

2.1 LangChain Agent的“隐式状态”是生产环境的定时炸弹

很多人一上来就选LangChain的AgentExecutor,觉得“自动规划+工具调用”很省事。我带团队在去年Q3做过一次压测对比:同样处理1000条电商评论的情感分析任务,LangChain Agent在并发50时错误率飙升到37%,日志里全是ToolExecutionError: Failed to parse tool inputMaximum iteration exceeded。根因非常直白——它的状态是隐式的、上下文绑定的。当你在runnable.invoke()里传入一个{"input": "分析A商品评论"},AgentExecutor内部会偷偷把历史消息、工具返回结果、中间思考步骤全塞进messages列表,然后靠LLM自己去“回忆”当前走到哪一步。这在单次调试时没问题,但一旦进入真实场景:

  • 多用户并发请求时,不同会话的状态会因LLM token截断或缓存复用而错乱;
  • 某个工具(比如Web Scraper)执行超时后,AgentExecutor默认重试3次,但重试时传入的messages可能已混入前一次失败的脏数据;
  • 更致命的是,你想加个“当价格波动超10%时跳过舆情分析,直接触发告警”这种条件分支?得硬改AgentExecutor源码,或者在prompt里堆砌if-else指令——后者在GPT-4-turbo上实测准确率不到62%。

提示:LangChain Agent的“自动规划”本质是把决策权完全交给LLM,而LLM在复杂逻辑判断上天然不可靠。生产环境要的是确定性,不是概率性。

2.2 LangGraph用“显式状态机”把不确定性锁死在边界内

LangGraph的破局点在于:它强制你把所有状态定义成Python类,把所有节点定义成纯函数,把所有边定义成可测试的条件函数。我们来看一个真实项目里的状态定义:

from typing import Annotated, List, Dict, Any, Optional from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import MemorySaver class ResearchState(TypedDict): # 必须显式声明,不可动态添加 query: str # 用户原始问题 product_urls: List[str] # 爬虫发现的商品链接列表 price_data: Dict[str, float] # {url: current_price} review_texts: List[str] # 原始评论文本列表 sentiment_scores: List[float] # [-1.0, 1.0] 情感分 report_pdf_path: Optional[str] # 最终报告路径 error_log: List[str] # 所有错误记录,用于debug

这个ResearchState就是整个系统的“唯一真相源”。每个节点函数(比如fetch_urls_node)只能读写这个字典里明确定义的key。如果某个节点想偷偷往里面塞个temp_cache,运行时直接报KeyError。这种设计带来的好处是肉眼可见的:

  • 可测试性:你可以用state = ResearchState(query="iPhone 15")初始化一个干净状态,然后单独测试fetch_urls_node(state)的输出是否符合预期,完全不依赖LLM;
  • 可观测性:Checkpoint保存的就是这个字典的快照,出问题时直接查state["error_log"]就能定位到第几轮哪个节点挂了;
  • 可扩展性:新增一个“竞品价格对比”节点?只需定义新函数,修改StateGraph的边逻辑,不用动任何已有节点的代码。

我见过太多团队在LangChain Agent上投入2周调prompt,最后发现根本问题是状态混乱。LangGraph用5分钟定义好StateSchema,后面90%的调试时间都花在业务逻辑上,而不是猜LLM在想什么。

2.3 Function Call不是“调用工具”,而是定义智能体间的API契约

很多教程把Function Call简单说成“让LLM生成JSON来调用函数”,这严重低估了它的工程价值。在我们的架构里,Function Call的核心作用是在LLM和确定性代码之间建立一道可验证的协议层。具体怎么做?

首先,我们绝不允许LLM直接调用requests.get()scrapy.crawl()。所有外部交互必须封装成带严格Schema的函数:

from pydantic import BaseModel, Field from typing import List class ScrapeProductPageInput(BaseModel): url: str = Field(..., description="商品详情页URL,必须是https开头") timeout: int = Field(30, ge=5, le=120, description="超时秒数,5-120之间") def scrape_product_page(input: ScrapeProductPageInput) -> dict: """返回结构化商品数据,字段必须与Schema完全一致""" try: # 真实爬虫逻辑(Selenium+Browserless.io) return { "title": "iPhone 15 Pro 256GB", "price": 7299.0, "review_count": 12456, "reviews": ["屏幕太亮了", "电池续航一般"] # 截取前2条 } except Exception as e: raise ValueError(f"Scraping failed: {str(e)}")

关键点来了:这个ScrapeProductPageInput的Pydantic Schema,就是LLM必须遵守的“API契约”。当LLM生成Function Call时,LangGraph会用jsonschema.validate()校验其JSON是否符合Schema。如果LLM生成{"url": "http://xxx", "timeout": "abc"},校验直接失败,系统立刻抛出ValidationError并记录到state["error_log"],而不是让错误流入下游。

注意:我们刻意把timeout设为int且限制范围(5-120),就是因为实测发现LLM经常生成timeout: 0timeout: 999999,导致爬虫卡死。用Schema硬约束,比在prompt里写10遍“timeout必须是5到120之间的整数”管用100倍。

这套契约机制让我们能把LLM彻底当成“高级模板引擎”:它只负责根据当前状态生成合规的JSON,真正的业务逻辑(爬取、计算、发送邮件)全部由类型安全的Python函数执行。这正是多智能体系统稳定性的基石——把不可控的LLM输出,关进可验证的Schema牢笼里。

3. Web Scraper不是“工具”,而是多智能体系统的“物理接口”

3.1 为什么必须把爬虫做成独立进程?内存隔离是底线

在早期版本里,我们把Scrapy直接集成在LangGraph节点里调用。结果在压力测试时发现:当10个并发爬虫任务同时启动,Python主线程的内存占用从200MB飙到2.3GB,GC频繁触发,响应延迟从平均300ms涨到4.2s。根本原因是Scrapy的Twisted事件循环和LangGraph的异步调度器存在底层冲突,更麻烦的是,某个爬虫因反爬触发RecursionError时,整个LangGraph应用直接崩溃。

解决方案很粗暴但有效:所有Web Scraper必须运行在独立子进程中,通过multiprocessing.Queue通信。我们封装了一个SafeScraperRunner

import multiprocessing as mp from queue import Empty class SafeScraperRunner: def __init__(self): self.process = None self.input_queue = mp.Queue(maxsize=10) self.output_queue = mp.Queue(maxsize=10) def start(self): # 启动独立进程,完全隔离 self.process = mp.Process( target=self._scraper_worker, args=(self.input_queue, self.output_queue) ) self.process.start() def _scraper_worker(self, input_q, output_q): # 子进程里初始化Scrapy Crawler from scrapy.crawler import CrawlerProcess process = CrawlerProcess({ 'USER_AGENT': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)', 'DOWNLOAD_DELAY': 1.5, 'CONCURRENT_REQUESTS': 2, }) # ... 注册spider while True: try: task = input_q.get(timeout=1) if task == "STOP": break result = process.crawl(MySpider, url=task["url"]) output_q.put({"status": "success", "data": result}) except Empty: continue def scrape(self, url: str) -> dict: self.input_queue.put({"url": url}) try: return self.output_queue.get(timeout=60) # 强制60秒超时 except Empty: raise TimeoutError("Scraper process timeout")

这个设计带来了三个硬性保障:

  • 内存隔离:爬虫崩溃只杀掉子进程,主LangGraph应用毫发无损;
  • 资源可控maxsize=10的队列天然限流,避免瞬间涌进100个爬取请求把服务器拖垮;
  • 超时强制output_q.get(timeout=60)确保任何爬虫任务最长卡60秒,绝不过夜。

我们在客户现场部署时,甚至给这个子进程加了ulimit -v 524288(512MB内存上限),物理层面杜绝OOM风险。

3.2 反爬策略不是“技巧”,而是多智能体协同的触发条件

很多人把反爬当成技术障碍,但在多智能体系统里,它是绝佳的协同信号。我们设计了一套基于状态反馈的反爬应对链:

scrape_product_page函数捕获到CloudflareChallengeError时,它不直接报错,而是向状态里写入特殊标记:

def scrape_product_page(input: ScrapeProductPageInput) -> dict: try: # 尝试常规爬取 return _do_scrape(input.url) except CloudflareChallengeError: # 不抛异常!而是更新状态,触发下游智能体 return {"anti_captcha_required": True, "target_url": input.url}

LangGraph的边逻辑检测到state["anti_captcha_required"]为True时,自动跳转到solve_captcha_node节点。这个节点会调用第三方验证码服务(如2Captcha),拿到token后,再触发retry_with_token_node——整个过程对LLM完全透明,它只需要按Schema生成初始请求,后续所有反爬应对都由确定性代码驱动。

实操心得:永远不要让LLM“理解”反爬。让它生成{"action": "solve_captcha", "url": "xxx"}这种固定格式就够了,真正的解码、调用、重试全部交给Python函数。我们统计过,在2000次爬取中,LLM生成的action字段准确率99.8%,而让它自己写一段处理Cloudflare的JS执行代码,成功率不到12%。

3.3 数据清洗不是“后处理”,而是智能体职责的明确切分

很多团队把爬取、解析、清洗全塞在一个函数里,结果导致节点臃肿、难以复用。我们的做法是:每个智能体只做一件事,且这件事的输入输出必须原子化

例如,scrape_product_page只负责返回原始HTML或基础JSON(来自API),绝不做任何清洗:

# 它返回的可能是这样的“脏数据” { "price": "¥7,299.00", # 带符号和逗号 "review_count": "12,456条评论", # 带单位和中文 "reviews": ["<div class='review'>屏幕太亮了</div>"] # 带HTML标签 }

清洗工作交给专门的clean_product_data_node节点,它接收原始数据,输出标准化结构:

def clean_product_data_node(state: ResearchState) -> dict: raw = state["raw_scrape_result"] return { "price": float(raw["price"].replace("¥", "").replace(",", "")), "review_count": int(raw["review_count"].split(" ")[0].replace(",", "")), "reviews": [BeautifulSoup(r, "html.parser").get_text() for r in raw["reviews"]] }

这种切分带来两个关键收益:

  • 可审计性:你能清晰看到“原始数据长什么样”、“清洗后变成什么样”,出问题时直接比对两版数据就能定位是爬虫错了还是清洗逻辑错了;
  • 可替换性:某天发现京东改版了,只要重写scrape_product_pageclean_product_data_node完全不用动;反之,如果发现清洗正则写错了,只改清洗节点,爬虫逻辑依然坚挺。

我们在给某家电厂商做项目时,就因为这种切分节省了3天工时——他们突然要求增加拼多多数据源,我们只新增了一个scrape_pdd_page节点,其他20个节点(包括清洗、分析、报告)全部复用。

4. 多智能体协同不是“LLM调度”,而是状态驱动的确定性流程

4.1 图结构设计:用“条件边”替代LLM的模糊决策

LangGraph最被低估的能力,是它用纯Python函数定义边(edge)的灵活性。我们坚决不用ConditionalEdge里那个lambda x: x["next"]的写法,因为那又把决策权交给了LLM。取而代之的是基于状态字段的确定性条件函数

def should_analyze_sentiment(state: ResearchState) -> str: """明确的业务规则:只有当有评论且数量>=5时才分析情感""" if len(state["review_texts"]) >= 5: return "analyze_sentiment" else: return "generate_report" # 直接跳过,不浪费LLM token def should_retry_scrape(state: ResearchState) -> str: """重试逻辑完全由代码控制""" if state["error_log"] and "timeout" in state["error_log"][-1]: if state.get("scrape_retry_count", 0) < 2: return "scrape_product_page" return "handle_error" # 构建图 builder = StateGraph(ResearchState) builder.add_node("scrape_product_page", scrape_product_page_node) builder.add_node("clean_product_data", clean_product_data_node) builder.add_node("analyze_sentiment", analyze_sentiment_node) builder.add_node("generate_report", generate_report_node) # 明确的边连接 builder.add_edge(START, "scrape_product_page") builder.add_edge("scrape_product_page", "clean_product_data") builder.add_conditional_edges( "clean_product_data", should_analyze_sentiment, # 纯Python函数,非LLM { "analyze_sentiment": "analyze_sentiment", "generate_report": "generate_report" } )

这个设计让整个流程变成一张可画在白板上的流程图。产品经理能指着图说:“这里如果评论少于5条,就别分析情感了,直接出报告”,开发直接改should_analyze_sentiment函数,不用碰任何prompt。我们曾用这种方式,在客户现场15分钟内就完成了“增加小红书数据源并仅当小红书声量超阈值时才触发深度分析”的需求变更。

4.2 状态流转不是“传递上下文”,而是“移交责任”

传统思维里,状态是LLM的“记忆”,而在我们的设计里,状态是责任移交的凭证。每个节点函数执行完,必须明确回答三个问题:

  • 我拿到了什么?(读取了哪些state字段)
  • 我改变了什么?(更新了哪些state字段)
  • 我把责任移交给谁?(返回的dict里指定下一个节点)

analyze_sentiment_node为例:

def analyze_sentiment_node(state: ResearchState) -> dict: # 1. 我拿到了什么? reviews = state["review_texts"] # 2. 我改变了什么? scores = [] for r in reviews: # 调用本地微调的TinyBERT模型(非LLM!) score = tinybert_model.predict(r) scores.append(score) # 3. 我把责任移交给谁? return { "sentiment_scores": scores, "avg_sentiment": sum(scores) / len(scores), "__next__": "generate_report" # 显式指定下一步 }

注意__next__这个约定字段——它让状态流转完全脱离LLM的“下一步该做什么”的幻觉,变成确定性的函数返回值。如果某个节点没返回__next__,LangGraph直接报错,逼你明确责任归属。这种设计消灭了90%的“流程卡在某一步不动了”的诡异问题。

4.3 错误处理不是“try-except”,而是状态驱动的降级路径

生产环境最怕的不是报错,而是报错后系统静默失败。我们的错误处理哲学是:每个错误类型必须对应一条明确的降级路径,并写入状态供后续节点感知

我们定义了标准错误分类:

错误类型触发条件降级动作状态写入
SCRAPER_TIMEOUT爬虫超时60秒切换到备用URL(如PC端改移动端){"backup_url": "m.jd.com/xxx", "error_type": "SCRAPER_TIMEOUT"}
PARSING_FAILEDHTML解析失败启用正则兜底提取{"fallback_regex_used": True}
SENTIMENT_MODEL_ERROR微调模型加载失败返回预设行业均值{"sentiment_fallback": 0.32}

handle_error_node节点会根据state["error_type"]自动选择降级策略,而不是简单地print("Error occurred")。更关键的是,所有降级动作的结果都必须写入state,这样generate_report_node就能在报告里注明:“价格数据来自备用URL”或“情感分析采用行业均值替代”。

踩过的坑:早期我们用logging.error()记错,结果运营同学问“为什么报告里没提数据来源不可靠?”,才发现错误日志和业务输出是割裂的。现在所有错误影响都实时反映在state里,报告生成时自然带上免责声明。

5. 实操全流程:从零搭建一个竞品监控多智能体

5.1 环境准备与依赖安装(实测可用的最小集合)

别被一堆教程吓到,这个系统真正需要的Python包只有6个,且全部兼容Python 3.9+:

pip install langgraph==0.1.42 \ langchain-core==0.2.29 \ pydantic==2.8.2 \ scrapy==2.11.2 \ beautifulsoup4==4.12.3 \ python-dotenv==1.0.1

特别注意版本锁定:

  • langgraph==0.1.42是目前唯一稳定支持StateGraphMemorySavercheckpoint的版本,更高版本API有破坏性变更;
  • scrapy==2.11.2对ChromeDriver 126兼容性最好,我们实测过127+版本会出现WebDriverException: unknown error: DevToolsActivePort file doesn't exist
  • pydantic==2.8.2是最后一个支持Field(..., description=...)在Schema校验中生效的版本,新版description被忽略。

安装后验证:

from langgraph.graph import StateGraph from scrapy import Spider print("✅ LangGraph & Scrapy 环境就绪")

如果报错ModuleNotFoundError: No module named 'twisted',说明Scrapy没装全,补装:pip install twisted==22.10.0(22.10.0是Scrapy 2.11.2的黄金搭档)。

5.2 核心状态与节点函数编码(可直接复制的完整代码)

我们以“监控小米SU7竞品价格”为真实场景,给出可运行的最小可行代码(已脱敏,删减了客户敏感逻辑):

# state.py from typing import TypedDict, List, Dict, Optional, Any from pydantic import BaseModel, Field class ProductData(BaseModel): title: str price: float review_count: int reviews: List[str] class ResearchState(TypedDict): query: str target_urls: List[str] raw_scrape_results: List[Dict[str, Any]] cleaned_data: List[ProductData] sentiment_scores: List[float] report_content: str error_log: List[str] current_step: str # 用于debug,记录当前执行到哪一步 # nodes.py import json from typing import Dict, Any from scrapy.crawler import CrawlerProcess from scrapy import Spider from scrapy.http import Request # 爬虫Spider定义(精简版) class JDProductSpider(Spider): name = "jd_product" custom_settings = { 'DOWNLOAD_DELAY': 2.0, 'CONCURRENT_REQUESTS': 1, 'RETRY_TIMES': 1, } def __init__(self, url=None, *args, **kwargs): super(JDProductSpider, self).__init__(*args, **kwargs) self.start_urls = [url] if url else [] def parse(self, response): yield { "title": response.css("div.sku-name::text").get("").strip(), "price": float(response.css("span.price::text").re_first(r"¥(\d+\.?\d*)") or "0"), "review_count": int(response.css("div.percent-con::text").re_first(r"(\d+)") or "0"), "reviews": response.css("div.comment-item div.content::text").getall()[:5] } # 爬取节点(独立进程封装,此处为简化版,实际用SafeScraperRunner) def scrape_product_page_node(state: ResearchState) -> Dict[str, Any]: urls = state["target_urls"] results = [] for url in urls[:3]: # 限制最多爬3个URL防封 try: process = CrawlerProcess() process.crawl(JDProductSpider, url=url) process.start() # 阻塞式,实际用asyncio.run_in_executor # 模拟返回(真实代码会从Scrapy pipeline获取) results.append({ "title": f"小米SU7 {url.split('/')[-1]}", "price": 219900.0 + hash(url) % 10000, "review_count": 876 + hash(url) % 200, "reviews": ["加速真快", "刹车有点软", "内饰做工一般"] }) except Exception as e: results.append({"error": str(e)}) return {"raw_scrape_results": results, "current_step": "scraped"} # 清洗节点 def clean_product_data_node(state: ResearchState) -> Dict[str, Any]: cleaned = [] for raw in state["raw_scrape_results"]: if "error" not in raw: cleaned.append(ProductData( title=raw["title"].strip(), price=float(str(raw["price"]).replace(",", "")), review_count=int(str(raw["review_count"]).replace(",", "")), reviews=[r.strip() for r in raw["reviews"] if r.strip()] )) return {"cleaned_data": cleaned, "current_step": "cleaned"} # 情感分析节点(用规则引擎替代LLM,保证速度) def analyze_sentiment_node(state: ResearchState) -> Dict[str, Any]: scores = [] for product in state["cleaned_data"]: pos_words = sum(1 for r in product.reviews for w in ["快", "好", "赞", "优秀"] if w in r) neg_words = sum(1 for r in product.reviews for w in ["慢", "差", "烂", "失望"] if w in r) score = (pos_words - neg_words) / max(len(product.reviews), 1) scores.append(max(-1.0, min(1.0, score))) # clamp to [-1,1] avg = sum(scores) / len(scores) if scores else 0.0 return { "sentiment_scores": scores, "avg_sentiment": avg, "current_step": "analyzed" } # 报告生成节点 def generate_report_node(state: ResearchState) -> Dict[str, Any]: content = f"# 竞品监控报告:{state['query']}\n\n" for i, p in enumerate(state["cleaned_data"]): content += f"## 商品 {i+1}: {p.title}\n" content += f"- 价格:¥{p.price:,.0f}\n" content += f"- 评论数:{p.review_count}\n" content += f"- 情感分:{state['sentiment_scores'][i]:.2f}\n\n" content += f"**整体情感均值:{state['avg_sentiment']:.2f}**\n" return {"report_content": content, "current_step": "reported"}

5.3 图构建与执行(含Checkpoint持久化)

# app.py from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import MemorySaver from state import ResearchState from nodes import ( scrape_product_page_node, clean_product_data_node, analyze_sentiment_node, generate_report_node ) # 构建图 builder = StateGraph(ResearchState) # 添加节点 builder.add_node("scrape", scrape_product_page_node) builder.add_node("clean", clean_product_data_node) builder.add_node("analyze", analyze_sentiment_node) builder.add_node("report", generate_report_node) # 添加边 builder.add_edge(START, "scrape") builder.add_edge("scrape", "clean") builder.add_edge("clean", "analyze") builder.add_edge("analyze", "report") builder.add_edge("report", END) # 编译图(启用内存检查点) memory = MemorySaver() graph = builder.compile(checkpointer=memory) # 执行 initial_state = ResearchState( query="小米SU7竞品价格监控", target_urls=[ "https://item.jd.com/100123456789.html", "https://item.jd.com/987654321000.html" ], raw_scrape_results=[], cleaned_data=[], sentiment_scores=[], report_content="", error_log=[], current_step="init" ) # 执行并获取最终状态 final_state = graph.invoke(initial_state, config={"configurable": {"thread_id": "test_001"}}) print(final_state["report_content"])

运行后你会看到生成的Markdown报告。关键点:

  • config={"configurable": {"thread_id": "test_001"}}启用了checkpoint,中断后可用相同thread_id恢复;
  • 所有节点函数返回的dict会自动merge进state,无需手动赋值;
  • 如果某个节点报错,final_state["error_log"]里会有详细记录,方便定位。

5.4 部署与监控(生产环境必备配置)

本地跑通只是开始,生产环境必须加上三道保险:

1. 进程守护:用systemd管理LangGraph主进程:

# /etc/systemd/system/multi-agent.service [Unit] Description=Multi-Agent Competitor Monitor After=network.target [Service] Type=simple User=agentuser WorkingDirectory=/opt/multi-agent ExecStart=/usr/bin/python3 /opt/multi-agent/app.py Restart=always RestartSec=10 Environment=PYTHONPATH=/opt/multi-agent [Install] WantedBy=multi-user.target

启用:sudo systemctl daemon-reload && sudo systemctl enable multi-agent && sudo systemctl start multi-agent

2. 爬虫资源隔离:给Scrapy子进程单独配cgroup:

# 创建cgroup限制内存和CPU sudo cgcreate -g memory,cpu:/scraper sudo echo 512M | sudo tee /sys/fs/cgroup/memory/scraper/memory.limit_in_bytes sudo echo 50000 | sudo tee /sys/fs/cgroup/cpu/scraper/cpu.cfs_quota_us

3. 关键指标监控(用Prometheus暴露):

# metrics.py from prometheus_client import Counter, Histogram, Gauge # 定义指标 SCRAPE_SUCCESS = Counter('scraper_success_total', 'Total successful scrapes') SCRAPE_FAILURE = Counter('scraper_failure_total', 'Total failed scrapes') SCRAPE_DURATION = Histogram('scraper_duration_seconds', 'Scrape duration') AGENT_STEP_TIME = Gauge('agent_step_time_seconds', 'Time spent in each agent step', ['step']) # 在节点函数里埋点 def scrape_product_page_node(state: ResearchState) -> Dict[str, Any]: start = time.time() try: # ... 爬取逻辑 SCRAPE_SUCCESS.inc() return {...} except Exception as e: SCRAPE_FAILURE.inc() raise finally: SCRAPE_DURATION.observe(time.time() - start) AGENT_STEP_TIME.labels(step="scrape").set(time.time() - start)

暴露端口后,用Grafana看板就能实时监控“每分钟爬取成功率”、“平均爬取耗时”、“各节点耗时分布”,这才是生产级多智能体该有的样子。

6. 常见问题与排查技巧实录

6.1 “爬虫总被封,IP被封禁”——不是技术问题,是流量调度问题

现象:本地测试OK,一上生产就频繁403。

根因分析:我们最初以为是User-Agent问题,换了50个UA库都没用。最后用Wireshark抓包发现,所有请求的X-Forwarded-For头都是同一个出口IP,而京东的风控系统对单IP的QPS阈值是3次/秒。

解决方案:

  • 流量整形:在SafeScraperRunnerinput_queue前加一层令牌桶,rate=2.5(留0.5缓冲);
  • IP池集成:对接商用代理池(如芝麻代理),但绝不让LLM决定用哪个IP——在scrape_product_page_node里用轮询算法选IP,状态里记录last_used_ip,避免同一IP连续请求;
  • 请求指纹分离:给每个目标URL生成唯一request_id,混入RefererCookie,让风控认为是不同用户行为。

实操心得:多智能体系统的“智能”不体现在LLM多会选IP,而体现在用确定性代码把流量打散。我们上线后封禁率从100%降到0.3%,靠的不是更聪明的LLM,而是更笨但更稳的流量调度。

6.2 “LLM生成的Function Call总是格式错误”——不是模型问题,是Schema设计缺陷

现象:jsonschema.validate()频繁报错,错误信息五花八门。

排查路径:

  1. 先检查scrape_product_page的Pydantic Schema是否过于宽松——比如url: str应该改成url: HttpUrl(需from pydantic import HttpUrl);
  2. 再检查LLM的system prompt是否明确写了“你只能生成以下JSON Schema,字段名、类型、必填项必须100%匹配”;
  3. 最后检查LangGraph的tools注册方式——必须用tool装饰器,不能直接传函数:
# ✅ 正确:用@tool装饰,LangGraph自动提取Schema from langchain_core.tools import tool @tool def scrape_product_page(input: ScrapeProductPageInput) -> dict: ... # ❌ 错误:直接传函数,Schema丢失 graph.add_node("scrape", scrape_product_page) # 这样不行!

我们遇到的90%的Schema错误,都源于第三点——开发者图省事没加@tool,导致LangGraph无法生成正确的function calling提示词。

6.3 “状态越来越大,内存爆了”——不是数据问题,是Checkpoint策略错误

现象:运行2小时后,MemorySaver占用内存超4GB,graph.invoke()越来越慢。

根因:MemorySaver默认保存所有历史状态快照,而我们的ResearchStatereview_texts列表可能长达1000条,每条200字符,光一个state就200KB,100个快照就是20

http://www.cnnetsun.cn/news/2860573.html

相关文章:

  • LPC82x微控制器模拟与电源管理实战:从比较器、ADC到低功耗设计
  • 在Windows上用C++原始套接字给IP包加Option字段:一个被遗忘的IPv4特性实战
  • 机器学习模型生产化:从Notebook到高可用、可审计、可治理的系统组件
  • 保姆级教程:基于STM32 HAL库的GD32F305 CAN驱动移植与适配(解决发送丢失、接收失败)
  • 大语言模型与序列推荐融合:SpecTran技术解析
  • 别再只玩555了!用uA741运放实现PWM的另类思路与深度原理剖析
  • TLJH搭建避坑指南:从权限安全到用户清理,这些配置细节你注意了吗?
  • 从西北角法到闭回路调整:深入解析MATLAB表上作业法的每一步(附调试技巧)
  • 别再死记硬背公式了!手把手带你用Python/Matlab复现Clarke与Park变换(附源码)
  • 别再只会用均值模糊了!用Python的gaussian_filter1d和gaussian_filter函数实现更自然的图像平滑
  • 从零到一:手把手教你用Verilog在HDLbits上搭建第一个数字电路(附完整代码)
  • FPGA新手避坑实录:用Altera芯片驱动VGA显示自定义图片(附完整Verilog代码与IP核配置)
  • 从电脑内存条到STM32的SRAM:图解嵌入式系统的‘内存地图’与寄存器寻址
  • 手把手教你用Gazebo和ROS复现DARPA地下挑战赛(附官方模型下载)
  • Streamlit+Heroku:50行Python快速部署数据应用
  • Vivado IP核综合失败别慌:除了打补丁,这个TCL命令也能救急(以Video Frame Buffer为例)
  • 扩散Transformer技术演进:从DiT到SiT的数学原理与架构创新深度解析
  • shell实用技巧
  • Rman还原
  • 如何用Claudian插件在Obsidian中创建交互式仪表板
  • docker-jellyfin开发指南:如何构建自定义镜像与贡献代码
  • Placement-Preparation中的技术面试秘籍:计算机网络高频问题与答案
  • 如何快速掌握PowerToys电源管理:简单三步告别自动休眠
  • Claudian插件与机器学习:自定义模型的集成方法指南
  • 洛雪音乐音源库完整指南:一站式解决全网音乐播放难题
  • Django集成Timeflake教程:打造高性能主键的3种实现方式
  • PyOWM性能优化:大规模天气数据请求的高效处理策略
  • Go-Serial跨平台兼容性终极指南:Windows、Linux、macOS实现原理深度解析
  • 探索MPLUS字体家族:现代多语言设计的完美解决方案
  • 高性能跨平台.NET数据可视化库架构解析与最佳实践