当前位置: 首页 > news >正文

Agent Loop 内核——从 prompt 到多轮对话的完整运转机制

Agent Loop 是什么?

用一句话概括:用户发 prompt → LLM 返回响应 → 如果 LLM 要求调工具就执行 → 把工具结果喂回 LLM → 重复,直到 LLM 说"我说完了"

画成流程图:

end_turn / stop_sequence

max_tokens

tool_use

用户 prompt

构建 messages + tools

调用 LLM API

返回结果

追加'请继续'

提取 tool_use blocks

按只读/变更分桶

只读工具并发执行

变更工具串行执行

微压缩大结果

tool_result 加入 messages

这个循环里有几个关键决策点:

  1. 什么时候停?LLM 返回end_turnstop_sequence时正常结束;到达maxTurns上限时强制停止;超出预算 (maxBudgetUsd) 时中断;用户主动取消时也中断。
  2. 工具怎么执行?只读工具并发跑(最多 10 个),变更工具串行跑——避免并发写文件。
  3. 上下文太长怎么办?自动压缩——用一个 LLM 调用把历史摘要,腾出空间继续。
  4. 中途出错怎么办?内置重试、回退模型、错误隔离(工具报错不会炸掉整个循环)。

两条入口:prompt() 和 stream()

SDK 提供两种方式触发 Agent Loop:

阻塞式 prompt()

let agent = createAgent(options: AgentOptions( apiKey: "sk-...", model: "claude-sonnet-4-6", maxTurns: 10 )) let result = await agent.prompt("Read Package.swift and summarize it.") print(result.text) print("Turns: \(result.numTurns), Cost: $\(String(format: "%.4f", result.totalCostUsd))")

prompt()是"发出去等结果"模式。一次调用跑完所有轮次,返回最终的QueryResult。适合不需要实时看到中间过程的场景——比如后台任务、CLI 工具。

流式 stream()

for await message in agent.stream("Explain this codebase.") { switch message { case .partialMessage(let data): print(data.text, terminator: "") // 实时输出文本 case .toolUse(let data): print("[Using tool: \(data.toolName)]") case .toolResult(let data): print("[Tool done, \(data.content.count) chars]") case .result(let data): print("\nDone: \(data.numTurns) turns, $\(String(format: "%.4f", data.totalCostUsd))") default: break } }

stream()返回AsyncStream<SDKMessage>,在 LLM 处理过程中持续推送事件。SDK 定义了 17 种消息类型,从partialMessage(文本片段)到toolUse(工具调用)到result(最终结果),覆盖了 Agent Loop 的每个阶段。

选择哪种取决于你的 UI 需求:要实时展示就用stream(),不需要就用prompt()

循环体内部:一个 turn 做了什么

不管走哪条入口,每个 turn 的核心逻辑是相同的。让我们跟一遍代码。

1. 检查是否需要压缩

if shouldAutoCompact(messages: messages, model: model, state: compactState) { let (newMessages, _, newState) = await compactConversation( client: client, model: model, messages: messages, state: compactState, fileCache: fileCache, sessionMemory: sessionMemory ) messages = newMessages compactState = newState }

每个 turn 开始前先检查:消息历史估计的 token 数是不是快要撑爆上下文窗口了。如果是,用一个 LLM 调用把历史压缩成摘要,替换掉原始消息。

压缩的阈值是模型上下文窗口 - 10000 tokens(缓冲区)。连续压缩失败 3 次后会停止尝试,避免浪费 token。

2. 发 LLM 请求(带重试和回退)

response = try await withRetry({ try await client.sendMessage( model: model, messages: messages, maxTokens: maxTokens, system: buildSystemPrompt(), tools: apiTools, ... ) }, retryConfig: retryConfig)

所有 LLM 请求都经过withRetry包装,按配置的重试策略处理临时错误(网络超时、429 限流等)。

如果主模型彻底失败,还配置了fallbackModel,SDK 会用备用模型再试一次:

if let fallbackModel = self.options.fallbackModel, fallbackModel != self.model { // 用 fallbackModel 重试... }

3. 处理 stop_reason

LLM 响应里的stop_reason决定了循环的走向:

stop_reason含义循环行为
end_turnLLM 说完了正常退出循环
stop_sequence碰到停止符正常退出循环
tool_useLLM 想调工具执行工具,继续循环
max_tokens输出被截断追加"请继续",继续循环

max_tokens的情况有个保护:最多自动续接 3 次,防止无限循环。

4. 工具执行:分桶并发

当 LLM 返回tool_use时,SDK 不是简单地把工具排着队一个个跑,而是做了分桶:

// ToolExecutor.partitionTools() for block in blocks { let tool = tools.first { $0.name == block.name } if let tool = tool, tool.isReadOnly { readOnly.append(item) // 只读桶 } else { mutations.append(item) // 变更桶 } }

只读工具(Read、Glob、Grep、WebSearch 等)可以安全并发,用TaskGroup跑,最多 10 个一批:

let batchResults = await withTaskGroup(of: ToolResult.self) { group in for item in batchSlice { group.addTask { await executeSingleTool(block: item.block, tool: item.tool, context: ...) } } // 收集结果 }

变更工具(Write、Edit、Bash 等)必须串行执行,一个跑完再跑下一个,避免并发写冲突:

for item in items { let result = await executeSingleTool(...) results.append(result) }

执行顺序:先跑所有只读工具(并发),再跑所有变更工具(串行)。这在 LLM 一次返回多个工具调用时能显著提升性能——比如 LLM 同时要求读 5 个文件,5 个读操作并行完成。

5. 微压缩

工具执行完后,结果在喂回 LLM 之前还要过一道微压缩:

for result in toolResults { let processedContent = await processToolResult(result.content, isError: result.isError) processedResults.append(ToolResult( toolUseId: result.toolUseId, content: processedContent, isError: result.isError )) }

如果一个工具返回的内容超过 50000 字符(比如读了一个大文件),SDK 会用一次额外的 LLM 调用把内容压缩。错误结果不压缩——保留了完整的错误信息供 LLM 诊断。

成本追踪:逐 turn 累加

每一轮 LLM 调用后,SDK 都会更新 token 用量和费用:

let turnCost = estimateCost(model: model, usage: turnUsage) totalCostUsd += turnCost costByModel[model] = CostBreakdownEntry( model: model, inputTokens: turnUsage.inputTokens, outputTokens: turnUsage.outputTokens, costUsd: turnCost )

costByModel按 model 分组记录。这意味着如果你中途切换了模型(通过switchModel()),每个模型的费用是分开计算的。最终result.costBreakdown能告诉你每个模型花了多少钱。

预算检查在每个 turn 后执行:

if let budget = options.maxBudgetUsd, totalCostUsd > budget { status = .errorMaxBudgetUsd break }

超出预算时立即退出循环,但已产生的文本会保留在结果里——你拿到的是部分结果,不是空白的。

取消:协作式取消

Swift 的结构化并发用Task.isCancelled做协作式取消。SDK 在循环的多个检查点都检查了这个标志:

  1. while 循环入口
  2. 只读工具和变更工具之间
  3. SSE 事件循环内部
  4. 工具执行前后
// 循环入口 if Task.isCancelled || _interrupted { status = .cancelled break } // 只读/变更之间 if Task.isCancelled { return results }

stream()还额外支持通过interrupt()方法取消——内部就是 cancel 掉持有 stream 的 Task。

取消后返回的是QueryResult(isCancelled: true),附带截止到取消时刻的部分文本和 token 用量。

错误处理:不炸、不丢

SDK 的错误处理原则是:工具执行错误不传播,API 错误有重试,最终失败保留部分结果

工具执行时,任何错误都被捕获为ToolResult(isError: true)

static func executeSingleTool(...) async -> ToolResult { guard let tool = tool else { return ToolResult(toolUseId: block.id, content: "Error: Unknown tool", isError: true) } // ... try executing let result = await tool.call(input: block.input, context: context) return ToolResult(toolUseId: block.id, content: result.content, isError: result.isError) }

工具报错的结果照样喂回 LLM,LLM 看到错误信息后可以决定换个策略。Agent Loop 不会因为一个工具挂了就崩溃。

API 层面的错误(网络问题、500 等)会触发重试;重试失败后触发 fallback 模型;全挂了才返回errorDuringExecution状态。

Hook 集成:循环的生命周期

Agent Loop 在关键节点触发 Hook 事件:

Hook 事件触发时机
sessionStart循环开始前
preToolUse每个工具执行前
postToolUse工具成功执行后
postToolUseFailure工具执行失败后
stop循环结束时(正常或异常)
sessionEnd返回结果前

Hook 的一个典型用法是在preToolUse拦截危险操作:

await hookRegistry.register(.preToolUse, definition: HookDefinition( matcher: "Bash", handler: { input in return HookOutput(message: "Bash blocked in production", block: true) } ))

被 Hook 拦截的工具不会执行,而是返回一个错误结果——LLM 会看到"Bash blocked in production",可以换个方式完成任务。

还有一个入口:streamInput()

除了prompt()stream(),SDK 还提供了第三种入口——streamInput(),接受一个AsyncStream<String>作为输入:

let input = AsyncStream<String> { continuation in continuation.yield("What's in this project?") continuation.yield("Now explain the test structure.") continuation.finish() } for await message in agent.streamInput(input) { // 处理每条输入对应的响应 }

每个输入元素被视为一条新的用户消息,触发一个完整的 prompt 周期。这适合聊天式交互:用户的每条消息都是输入流的一个元素,Agent 逐条处理并流式输出。

小结

Agent Loop 是整个 SDK 的心脏。理解了它的工作方式,剩下的功能都是在它的基础上叠加的:

  • 工具系统— Loop 里的"执行工具"环节
  • MCP 集成— Loop 启动时连接外部工具服务器
  • 会话持久化— Loop 结束后保存 messages 数组
  • 权限控制— 工具执行前的拦截点
http://www.cnnetsun.cn/news/3094699.html

相关文章:

  • AI+薪酬管理:从“算薪自动化”到“决策智能化”的中大型企业升级路径
  • 张鹏翔在AI营销实战方法论沙龙上详解智能体如何助力企业长效流量增长
  • C# 深度学习框架 TorchSharp 原生训练模型和图像识别-手写数字识别
  • AI工程实战:模型服务化与性能优化关键策略
  • view_source
  • 小月子多久可以洗头洗澡?结合休养禁忌科学把控洗护时
  • 3步掌握UE4SS:虚幻引擎游戏修改的终极解决方案
  • Kubernetes Operator开发教程
  • React性能优化
  • JavaScript原型链
  • CVE-2026-22218 Chainlit 框架任意文件读取漏洞全解析
  • ASP.NET Core 之 Identity 入门(一)
  • MANO手部模型完整指南:如何用Python实现逼真3D手部建模
  • 如何提取 Word 文档中的表格并导出为 Excel(Python 教程)
  • AI编曲工具实战:从入门到专业音乐制作
  • C++集成OpenSSL实现RSA公钥加密:从原理到工程实践
  • 如何彻底解决 AI 编程的连贯性难题
  • 手机磁吸转轴支架出厂检验全解:5 大类必检项目与 4 家厂商品控体系对比
  • Burp Suite安全测试实战:从零掌握Web渗透核心工作流与高阶技巧
  • Frida内存操作避坑指南:从原理到实战的逆向分析核心技能
  • 开源 GR00T N1.7 论文解读:Cosmos-Reason2/Qwen3-VL + DiT 动作头,20K 小时人类视频预训练
  • Banana Pi BpiRouterOS 路由器 官方操作系统,基于Openwrt开发 #路由器
  • 从看图说话到一键出码:2026年多模态AI,最值得普通人立刻用的3个场景
  • 异步并行批处理框架设计的一些思考
  • 01:Agent Loop:Claude Code 的运行时主循环
  • 生成式引擎优化(GEO)在酒店民宿行业的落地实践:对抗 OTA 流量截流
  • 密码学中的欧拉定理研究与应用
  • 小米穿戴表盘设计终极指南:零代码创建个性化智能手表界面
  • 百万路像素并行三维推演,分布式 SpaceOS 图形底座承载城域级实景孪生
  • 微信QQ消息防撤回终极指南:3步揭秘聊天记录永久保存技术