Agent与工具调用 - 问题与解决方案
问题1:Agent和普通AI对话有什么区别?
核心区别
| 能力 | 普通AI | Agent |
|---|---|---|
| 回答方式 | 被动响应 | 主动规划 |
| 工具调用 | 不能 | 可以 |
| 记忆 | 仅上下文 | 长期+短期 |
| 执行动作 | 不能 | 可以 |
什么时候用Agent?
简单问答:天气查询、常识回答 → 普通AI即可
复杂任务:多步骤、需要操作外部系统 → Agent
自主执行:自动化流程、监控系统 → Agent
Q:什么场景下必须用Agent,不能用普通AI?
A:任务需要:
调用外部API获取实时信息
读写数据库、操作文件系统
多步骤串行执行,中间结果影响后续
长期记忆,不受上下文窗口限制
问题2:Function Calling的工作流程是什么?
标准流程
用户输入 → 模型判断 → 调用工具 → 获取结果 → 模型生成回答详细步骤
步骤1:定义工具
{ "name": "get_weather", "description": "查询城市天气", "parameters": { "type": "object", "properties": {"city": {"type": "string"}}, "required": ["city"] } }步骤2:模型判断模型根据用户输入,决定是否调用工具、调用哪个工具、传什么参数
步骤3:执行工具
result = get_weather(city="北京")步骤4:返回结果
{"temperature": 25, "weather": "晴天", "humidity": 60}步骤5:生成回答模型根据工具返回结果,生成最终回答
Q:Function Calling和普通API调用的区别?
A:
普通API:代码显式调用,固定逻辑
Function Calling:AI决定何时调用、调用什么、传什么参数
问题3:如何设计工具描述(description)?
描述的三大要素
做什么:工具的功能
什么时候用:触发条件
返回什么:输出格式
好描述 vs 差描述
差描述:
"description": "天气工具"好描述:
"description": "查询指定城市和日期的天气预报,返回温度、天气状况、湿度、风力等信息。用于用户询问天气、规划出行、穿衣建议等场景。"Q:工具描述写错了会有什么后果?
A:
描述过于模糊 → 模型不知道什么时候该调用
描述过于具体 → 可能错过适用场景
描述有歧义 → 模型可能调用错误
问题4:多工具协同,串行和并行怎么选?
串行调用
定义:上一个工具的输出是下一个工具的输入
场景:
1. 查订单 → 2. 根据订单查物流 → 3. 根据物流生成通知代码:
order = get_order(user_id) tracking = get_tracking(order['tracking_id']) notify_user(tracking)并行调用
定义:多个工具同时执行,互不依赖
场景:
同时查:北京天气、上海天气、广州天气代码:
import asyncio results = await asyncio.gather( get_weather("北京"), get_weather("上海"), get_weather("广州") )何时用哪种?
| 场景 | 方式 |
|---|---|
| A的输出是B的输入 | 串行 |
| 多个查询互不依赖 | 并行 |
| 主任务+分支任务 | 先并行查主数据,再串行处理 |
Q:为什么并行调用能提升性能?
A:IO等待时间可以叠加,减少总等待时间。
问题5:工具调用失败怎么处理?
5.1 失败类型详解
| 类型 | 特点 | 原因 | 处理 |
|---|---|---|---|
| 临时性失败 | 再试一次可能成功 | 网络抖动、服务器繁忙 | 重试 |
| 永久性失败 | 怎么重试都没用 | 参数错误、权限不足 | 修复代码 |
| 限流失败 | 等一会儿就能恢复 | 请求超过配额 | 等待后重试 |
5.2 重试策略
简单重试(不推荐)
for i in range(3): try: return call_api() except: continue问题:可能让服务器更忙。
指数退避(推荐)
import time import random for attempt in range(5): try: return call_api() except (TimeoutError, ConnectionError): wait_time = (2 ** attempt) + random.uniform(0, 1) time.sleep(wait_time)等待时间指数增长:1s → 2s → 4s → 8s
加随机抖动避免多进程同时重试
5.3 熔断器模式
什么是熔断器?
保险丝断了 → 停电 → 防止电线着火 熔断器跳闸 → 快速失败 → 防止服务崩溃三种状态:
CLOSED(闭合) OPEN(打开) HALF_OPEN(半开) ↓ ↓ ↓ 正常运行 快速失败 尝试恢复代码实现:
class CircuitBreaker: def __init__(self, failure_threshold=3, timeout=60): self.failure_threshold = failure_threshold self.timeout = timeout self.failure_count = 0 self.last_failure_time = None self.state = "CLOSED" def call(self, func): if self.state == "OPEN": # 检查是否超时 if time.time() - self.last_failure_time > self.timeout: self.state = "HALF_OPEN" else: raise Exception("服务暂时不可用(熔断中)") try: result = func() if self.state == "HALF_OPEN": self.state = "CLOSED" self.failure_count = 0 return result except Exception as e: self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = "OPEN" raise5.4 降级策略
缓存降级
cache = {"北京": {"weather": "晴", "temp": 25}} def get_weather_with_cache(city): try: result = call_weather_api(city) cache[city] = result # 更新缓存 return result except: if city in cache: return {**cache[city], "note": "(缓存数据)"} raise功能降级
def smart_reply(user_message): try: return full_feature_reply(user_message) # 完整功能 except AIOverloadError: return basic_reply(user_message) # 降级到基础功能 except: return "服务繁忙,请稍后重试"5.5 完整容错代码
class RobustToolCaller: def __init__(self): self.circuit_breaker = CircuitBreaker(failure_threshold=3) self.cache = {} def call(self, tool_name, func, params): # 1. 检查熔断器 if self.circuit_breaker.state == "OPEN": return self._fallback(tool_name, params) # 2. 带重试的调用 for attempt in range(3): try: return func(**params) except (TimeoutError, ConnectionError): if attempt == 2: self.circuit_breaker.call(lambda: None) return self._fallback(tool_name, params) except Exception as e: raise return self._fallback(tool_name, params) def _fallback(self, tool_name, params): # 降级方案 if tool_name == "weather": return self.cache.get(params.get("city"), {"weather": "未知"}) raise Exception(f"工具{tool_name}暂时不可用")Q:工具返回错误后,Agent应该告诉用户什么?
A:
如果有兜底方案 → 自动切换
如果无法完成 → 诚实地告诉用户哪里出了问题
不要编造结果(幻觉)
给出替代建议(如稍后重试)
问题6:如何防止Agent陷入死循环?
6.1 死循环的本质
Agent的核心逻辑:
while True: 思考下一步 执行工具 检查结果 如果完成就返回,否则继续为什么会死循环?
模型无法判断"什么时候够了"
反思产生新任务,无限循环
工具返回不满意,无限重试
6.2 最大调用次数限制
class Agent: MAX_TOOL_CALLS = 10 def run(self, task): for call_num in range(self.MAX_TOOL_CALLS): result = self.execute() if self.is_complete(result): return result return self.force_complete() # 达到上限强制返回6.3 熔断器防止死循环
class CircuitBreaker: """连续失败N次后,暂时停止调用""" def __init__(self, failure_threshold=5, timeout=30): self.failure_threshold = failure_threshold self.timeout = timeout self.failure_count = 0 self.last_failure_time = None self.state = "CLOSED" def call(self, func): # OPEN状态:直接拒绝 if self.state == "OPEN": if time.time() - self.last_failure_time > self.timeout: self.state = "HALF_OPEN" else: raise Exception("超过调用限制") try: return func() except: self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = "OPEN" raise6.4 资源追踪
class ResourceTracker: def __init__(self): self.tool_calls = 0 self.total_tokens = 0 self.execution_time = 0 def should_stop(self): """多维度判断是否该停止""" if self.tool_calls > 10: return True, "超过最大调用次数" if self.total_tokens > 50000: return True, "超过Token限制" if self.execution_time > 120: return True, "执行超时" return False, None6.5 阶段式Agent设计
class StagedAgent: STAGES = [ {"name": "understand", "max_calls": 2}, {"name": "plan", "max_calls": 3}, {"name": "execute", "max_calls": 5}, {"name": "verify", "max_calls": 2}, ] def run(self, task): for stage in self.STAGES: result = self.run_stage(stage, task) if self.is_complete(result): return result return self.best_effort_result(task)6.6 完整防死循环系统
class RobustAgent: def __init__(self): self.max_tool_calls = 10 self.circuit_breaker = CircuitBreaker(failure_threshold=5) self.tracker = ResourceTracker() def run(self, task): self.tracker.start() for call_num in range(self.max_tool_calls): # 1. 检查是否该停止 should_stop, reason = self.tracker.should_stop() if should_stop: return self.force_complete(reason) # 2. 检查熔断器 if self.circuit_breaker.state == "OPEN": return self.force_complete("熔断器开启") try: result = self.execute(task) self.tracker.track(tool_call=True) if self.is_complete(result): return result except Exception as e: self.circuit_breaker.call(lambda: None) continue return self.force_complete("达到调用上限") def force_complete(self, reason): return {"status": "completed_with_limit", "reason": reason}Q:AutoGPT为什么会陷入死循环?
A:
反思机制没有限制 → 每次反思都产生新任务
缺乏有效的终止条件 → 模型总觉得"还能更好"
没有强制停止机制 → 不会强制返回
改进方法:
加入最大调用次数限制
设置明确的终止条件
熔断器防止雪崩
阶段式设计,分阶段执行
