基于事件驱动架构构建可靠AI Agent:inngest/agent-kit实战指南
1. 项目概述:为什么我们需要一个“事件驱动”的代理框架?
最近在折腾AI应用开发,特别是想把大语言模型(LLM)的能力真正嵌入到业务流程里,而不是仅仅做个聊天机器人。相信很多同行都遇到过类似的困境:你写了一个很酷的Agent(智能体),它能理解用户意图,也能调用工具(比如查数据库、发邮件),但一旦涉及到需要“等待”或“按顺序执行”的复杂工作流,代码就变得一团糟。比如,用户说“帮我订一张下周五去上海的机票,收到确认邮件后提醒我”,这个简单的需求背后,就包含了异步调用、状态持久化、错误重试、超时处理等一系列头疼的问题。
这就是我关注到inngest/agent-kit这个项目的原因。它不是一个具体的AI模型,而是一个专为构建可靠、可扩展的AI Agent应用而设计的后端框架。它的核心思想是“事件驱动”(Event-Driven)。你可以把它想象成一个高度智能、永不宕机的工作流引擎,专门用来编排和管理你的AI Agent。你的Agent不再需要自己操心“任务进行到哪一步了”、“失败了怎么办”、“怎么等外部API回调”这些问题,框架帮你全包了。
简单来说,agent-kit让你能像写同步函数一样,去编写复杂的、异步的、有状态的AI工作流。它把Agent执行过程中的每一步(如“解析用户意图”、“调用工具”、“等待用户输入”、“处理结果”)都抽象成“事件”(Event),然后由框架来可靠地调度和执行。这对于开发需要与外部系统深度集成、执行长时间运行任务(如数据ETL、自动化客服、智能审批)的AI应用来说,简直是雪中送炭。
2. 核心设计理念与架构拆解
2.1 事件驱动架构:从“请求-响应”到“状态机”
传统的Web服务或API调用是“请求-响应”(Request-Response)模式。用户发一个请求,服务器处理,然后返回结果,连接结束。这种模式对于简单的AI问答还行,但对于一个可能持续数小时、涉及多个步骤和外部回调的Agent任务,就力不从心了。你需要自己实现任务队列、状态存储、定时轮询、错误恢复,复杂度陡增。
agent-kit采用了事件驱动架构。在这个模型里,核心不再是“函数调用”,而是“事件”和“状态迁移”。
- 事件(Event):系统中发生的任何值得关注的事情。例如:
user.message.received(用户发送消息)、tool.search.completed(搜索工具完成)、workflow.step.failed(某步骤失败)。事件携带了执行所需的所有数据。 - 函数(Function):你编写的业务逻辑,用于响应特定的事件。一个函数可以监听一个或多个事件类型。当匹配的事件到达时,框架会自动调用你的函数。
- 工作流(Workflow):由多个函数按特定逻辑(顺序、并行、条件分支)组合而成的完整业务流程。框架负责根据事件流来推进工作流的状态。
这种架构的优势非常明显:
- 解耦:事件的产生者和消费者(函数)是分离的。你的AI模型生成一个“调用工具”的事件后,就可以继续处理其他事情,由另一个专门的函数来处理工具调用和结果返回。
- 可靠性:Inngest核心引擎会持久化每一个事件和函数执行状态。即使你的服务进程重启,工作流也能从断点继续执行,不会丢失进度。
- 可观测性:所有事件和历史执行记录都被集中存储,你可以清晰地看到一个工作流实例的完整生命周期,便于调试和审计。
2.2 Agent-Kit的核心抽象:Steps, Tools, 与 Memory
agent-kit在Inngest的事件驱动引擎之上,封装了一层更适合AI Agent开发的抽象。
Step(步骤):这是最核心的抽象。一个
step代表工作流中一个可暂停、可恢复的执行单元。当你调用step.run(“step_name”, fn)时,框架会确保这个函数(fn)只被执行一次,即使其所在的服务器函数被多次调用(由于重试等原因)。这对于需要幂等性的操作(如调用第三方API)至关重要。更重要的是,step提供了step.sleepUntil等功能,可以让工作流“睡眠”直到某个时间点或某个事件发生,完美实现了“等待”逻辑。Tool(工具):Agent需要与外界交互的能力。
agent-kit提供了声明式定义工具的方式。你可以定义一个工具的名称、描述、参数JSON Schema,以及执行函数。框架会自动处理工具的调用请求和结果返回事件。这与你可能用过的LangChain Tools或LLamaIndex Tools概念类似,但它是深度集成到Inngest工作流中的。Memory(记忆):Agent需要有上下文记忆。
agent-kit提供了工作流级别的记忆存储。你可以在一个步骤中存储一些数据(如对话历史、中间结果),在后续的步骤中读取。这些记忆同样由Inngest引擎持久化,保证了Agent在长时间运行中状态的连续性。
2.3 技术栈与部署模型
agent-kit是一个TypeScript/JavaScript框架,与Node.js环境深度集成。它底层依赖@inngest/sdk来与Inngest云服务或自托管的Inngest服务器通信。
部署模型通常是这样的:
- 你的应用服务器:运行着用
agent-kit编写的函数(Agent逻辑)。它向Inngest服务注册这些函数。 - Inngest服务:可以是云服务(Inngest Cloud),也可以是用Docker自托管(Inngest Open Source)。它负责接收事件、排队、调度函数执行,并管理状态。
- 执行器:Inngest服务通过HTTP调用你的应用服务器中对应的函数端点(默认为
/api/inngest)来触发函数执行。你的服务器需要保持在线以接收这些调用。
这种架构将“状态管理”和“工作流编排”这种复杂的、有状态的部分外包给了专门的引擎(Inngest),而你的代码只需专注于无状态的业务逻辑(Agent推理、工具调用),大大简化了开发。
3. 从零开始:构建你的第一个可靠AI Agent
3.1 环境准备与项目初始化
我们从一个实际场景开始:一个“智能旅行规划助手”。用户可以说“我想去杭州玩三天,预算5000元”,Agent需要自动查询天气、查找景点、生成行程草案,然后等待用户确认。
首先,确保你有Node.js环境(建议18+)。创建一个新项目并安装依赖:
mkdir travel-agent && cd travel-agent npm init -y npm install @inngest/agent-kit inngest npm install -D typescript tsx @types/node # 如果你使用某个AI SDK,比如 OpenAI npm install openai创建tsconfig.json和基本的项目结构。核心文件是src/index.ts,我们将在这里定义Inngest客户端和函数。
3.2 定义核心工作流与工具
在src/index.ts中,我们首先初始化客户端和定义工具。
import { Inngest } from "inngest"; import { createAgent, step, tool } from "@inngest/agent-kit"; import OpenAI from "openai"; // 1. 初始化Inngest客户端 // 如果你的环境变量 INNGEST_EVENT_KEY 已设置,可以省略参数 const inngest = new Inngest({ id: "travel-agent" }); // 2. 初始化OpenAI客户端(或其他LLM) const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY }); // 3. 定义工具 // 工具一:查询天气(模拟) const getWeatherTool = tool({ id: "get_weather", description: "获取指定城市未来几天的天气预报", args: { city: { type: "string", description: "城市名,如:杭州" }, days: { type: "number", description: "预报天数,如:3" }, }, execute: async ({ city, days }) => { // 这里应该是真实的API调用,例如调用和风天气、OpenWeatherMap等 console.log(`[Tool] 查询 ${city} 未来 ${days} 天天气`); // 模拟返回 return { city, forecast: [ { day: "Day1", condition: "Sunny", high: 25, low: 18 }, { day: "Day2", condition: "Cloudy", high: 23, low: 17 }, { day: "Day3", condition: "Light Rain", high: 20, low: 16 }, ], }; }, }); // 工具二:搜索旅游景点(模拟) const searchAttractionsTool = tool({ id: "search_attractions", description: "搜索指定城市的旅游景点和门票信息", args: { city: { type: "string", description: "城市名" }, keywords: { type: "string", description: "搜索关键词,如‘古镇’、‘博物馆’", optional: true }, }, execute: async ({ city, keywords }) => { console.log(`[Tool] 在 ${city} 搜索景点: ${keywords || '全部'}`); // 模拟返回 return { city, attractions: [ { name: "西湖", type: "自然风光", fee: "免费", rating: 4.8 }, { name: "灵隐寺", type: "文化古迹", fee: "45元", rating: 4.5 }, { name: "宋城", type: "主题公园", fee: "320元", rating: 4.3 }, ], }; }, }); // 4. 创建Agent,并注册工具 const travelAgent = createAgent({ id: "travel-planner", name: "旅行规划助手", instructions: `你是一个专业的旅行规划助手。根据用户的需求,调用工具查询天气和景点信息,然后为其制定一份详细的行程草案。请确保行程合理,符合预算。`, model: "gpt-4o", // 或 gpt-3.5-turbo tools: [getWeatherTool, searchAttractionsTool], client: openai, // 传入LLM客户端 });注意:工具的定义非常关键。
description和args的description是给LLM看的,必须清晰准确,LLM靠这些描述来决定是否以及如何调用工具。args的JSON Schema定义要严谨,这能有效减少LLM调用错误。
3.3 实现主工作流函数
接下来,我们创建一个Inngest函数,作为整个工作流的入口。这个函数将响应app/user.request.received事件。
// 5. 定义主工作流函数 const planTravelWorkflow = inngest.createFunction( { id: "plan-travel", name: "旅行规划工作流", // 当收到用户旅行规划请求时触发 onEvents: [{ event: "app/user.request.received" }], }, async ({ event, step }) => { const { userId, userMessage, budget } = event.data; // 从事件数据中提取 // 步骤1:使用Agent解析用户意图并规划行动 const plan = await step.run("analyze-and-plan", async () => { console.log(`[Step] 开始为用户 ${userId} 分析请求: ${userMessage}`); // 这里,我们让Agent根据用户输入,决定要调用哪些工具,以及调用的顺序/参数 // agent-kit 的 `run` 方法会处理与LLM的交互和工具调用 const response = await travelAgent.run({ messages: [ { role: "user", content: userMessage } ], // 可以传入额外的上下文,比如用户历史 memory: await step.getState("userContext") || {}, }); // 保存Agent的完整响应,可能包含思考过程和多个工具调用结果 await step.saveState("agentResponse", response); return response; }); // 步骤2:根据Agent的计划,串行执行工具调用(框架已在上一步的run中处理) // 但我们可以显式地检查结果并推进流程 const toolResults = plan?.messages?.filter(m => m.tool_calls) || []; console.log(`[Step] Agent规划完成,建议调用 ${toolResults.length} 个工具`); // 步骤3:整合信息,生成行程草案 const itineraryDraft = await step.run("generate-itinerary", async () => { // 从状态中获取完整的对话和工具结果 const fullContext = await step.getState("agentResponse"); // 再次调用Agent,让其基于所有工具结果,生成一份文本行程 const finalResponse = await travelAgent.run({ messages: [ { role: "user", content: userMessage }, ...(fullContext.messages || []) // 包含之前的对话和工具结果 ], instructions: `请基于之前的工具查询结果(天气、景点),为用户生成一份详细的${budget ? `预算${budget}元以内的` : ''}三日行程草案。格式要清晰,包含每天上下午的安排、景点、大致花费和注意事项。`, }); return finalResponse.messages?.[finalResponse.messages.length - 1]?.content || "生成失败"; }); // 步骤4:模拟“等待用户确认” - 这是一个异步等待的经典场景 // 我们让工作流休眠,直到一个特定的“用户确认”事件发生 console.log(`[Step] 行程草案已生成,等待用户确认...`); const userConfirmation = await step.waitForEvent("app/user.confirmation.received", { timeout: "24h", // 最多等待24小时 match: "data.userId", // 确保事件是针对同一个用户的 }); // 步骤5:处理用户确认结果 if (userConfirmation) { const isConfirmed = userConfirmation.data.confirmed; await step.run("handle-confirmation", async () => { if (isConfirmed) { console.log(`[Step] 用户 ${userId} 已确认行程。开始执行预订流程(模拟)。`); // 这里可以触发下一个工作流,例如预订酒店、机票等 await step.sendEvent({ name: "app/travel.booking.requested", data: { userId, itinerary: itineraryDraft }, }); return { status: "confirmed", nextStep: "booking_triggered" }; } else { console.log(`[Step] 用户 ${userId} 拒绝了行程草案。`); // 可以发送消息通知Agent或客服 return { status: "rejected", message: "用户要求修改" }; } }); } else { // 超时处理 await step.run("handle-timeout", async () => { console.log(`[Step] 用户 ${userId} 在24小时内未确认,行程规划超时。`); return { status: "timeout" }; }); } // 工作流结束 return { status: "workflow_completed", userId, hasConfirmation: !!userConfirmation }; } );这个工作流清晰地展示了agent-kit和 Inngest 的核心能力:
step.run:确保关键操作(如调用Agent、生成行程)的幂等性。step.waitForEvent:优雅地处理异步等待,将“等待用户操作”这个不确定时长的过程,从你的代码逻辑中解耦出去,由引擎管理。step.sendEvent:触发新事件,从而驱动其他工作流,构建起复杂的业务链。
3.4 服务端配置与启动
最后,我们需要一个HTTP服务器来托管这些函数,供Inngest服务调用。
创建一个src/server.ts:
import { serve } from "inngest/next"; // 如果你用Next.js // 或使用通用的Express/Node.js服务 import { createServer } from "http"; import { Inngest } from "inngest"; import { planTravelWorkflow } from "./index"; // 导入刚才定义的函数 // 使用官方推荐的 serve handler (以Next.js为例,其他框架类似) export const { GET, POST, PUT } = serve({ client: new Inngest({ id: "travel-agent" }), functions: [planTravelWorkflow], // 注册所有函数 }); // 如果是简单的Node.js服务器,可以这样: // const inngest = new Inngest({ id: "travel-agent" }); // const handler = createServeHandler({ client: inngest, functions: [planTravelWorkflow] }); // createServer(handler).listen(3000);同时,你需要启动一个进程来发送初始事件,触发工作流。这可以是你现有的API服务器的一部分:
// 在某个API路由中,例如 /api/plan import { inngest } from "./inngest-client"; export async function POST(request) { const { userId, message, budget } = await request.json(); // 发送事件,触发工作流 await inngest.send({ name: "app/user.request.received", data: { userId, userMessage: message, budget }, }); return Response.json({ ok: true, message: "旅行规划请求已接收,处理中..." }); }4. 深入核心:状态管理、错误处理与调试
4.1 可靠的状态持久化与记忆
在长时间运行的工作流中,状态管理是头等大事。agent-kit结合 Inngest,提供了多层状态管理:
- 步骤输出持久化:每次
step.run的返回值都会被Inngest自动持久化。即使函数实例崩溃,重启后重新执行到该step.run时,框架会直接返回之前存储的结果,而不会重复执行函数体。这是实现幂等性的基石。 - 自定义状态存储:使用
step.saveState和step.getState。这是工作流级别的键值存储,非常适合保存Agent的对话历史、中间计算结果等。这些状态与工作流实例的生命周期绑定。 - Agent Memory:
travelAgent.run()方法可以接受一个memory参数,这个记忆会在一次run调用过程中传递给LLM,帮助其维持上下文。对于更复杂的记忆管理(如长期记忆、向量存储检索),你需要自己集成,并将结果通过step.saveState或作为工具参数传入。
实操心得:不要滥用
step.saveState。只存储推进工作流所必需的数据。对于大的、不变的数据(如查询到的景点列表),更适合作为步骤输出存储,而不是状态。状态存储应该小而精,频繁读写大量数据可能会影响性能。
4.2 错误处理、重试与回退策略
生产环境的Agent必须健壮。agent-kit和 Inngest 提供了强大的错误处理机制。
函数级重试:在
createFunction时可以配置重试策略。const planTravelWorkflow = inngest.createFunction( { id: "plan-travel", onEvents: [...], retries: 5, // 最大重试次数 // 指数退避重试 retryDelay: ({ attempt }) => Math.min(1000 * 2 ** attempt, 30000), }, async ({ event, step }) => { ... } );这对于处理短暂的网络波动或第三方API限流非常有效。
步骤级错误处理:
step.run中的代码如果抛出错误,该步骤会被标记为失败。你可以用try...catch包裹step.run,在捕获错误后决定是重试、执行备用逻辑,还是让整个工作流失败。const result = await step.run("call-unstable-api", async () => { try { return await callSomeAPI(); } catch (error) { // 记录错误,返回一个降级结果 console.error("API调用失败,使用降级数据", error); return { status: "fallback", data: defaultData }; } });超时控制:
step.waitForEvent可以设置超时。整个函数也有默认的超时时间(默认5分钟)。对于可能长时间运行的步骤,要合理评估并设置step.run的超时,或者将其拆分为多个更小的步骤。死信队列(DLQ):Inngest支持将最终失败的事件发送到DLQ。你需要配置一个Webhook或云函数来接收这些失败事件,进行人工干预或报警。
4.3 调试与可观测性实践
开发事件驱动的工作流,调试方式和传统应用不同。你不能简单地在代码里打console.log然后刷新页面。Inngest Cloud和自托管版本都提供了强大的Dev Server和UI。
本地开发服务器:运行
npx inngest-cli@latest dev。它会启动一个本地Inngest服务器和Web UI(通常是http://localhost:8288)。你可以在这里:- 实时发送事件:模拟用户请求。
- 流式查看日志:所有函数执行、步骤完成的日志一目了然。
- 检查函数运行历史:查看每个工作流实例的完整时间线,包括每个步骤的输入、输出、耗时和状态。这是最强大的调试工具。
结构化日志:在函数中,使用
console.log或你喜欢的日志库(如Pino、Winston)输出结构化JSON日志。Inngest UI会捕获并展示这些日志。建议为每个日志加上workflowId,stepId,userId等字段,方便追踪。使用“重放”功能:在UI中看到某个运行失败后,你可以直接修改代码,然后在该次运行上点击“重放”。Inngest会用新代码和原有的事件数据重新执行整个工作流,这对于修复bug后重新处理失败任务极其方便。
5. 进阶模式与生产环境考量
5.1 复杂工作流模式:并行、分支与循环
真实场景的工作流很少是简单的直线。agent-kit借助 Inngest SDK,可以轻松实现复杂模式。
并行执行:使用
Promise.all包装多个step.run。const [weather, attractions] = await Promise.all([ step.run("fetch-weather", () => getWeather(city)), step.run("search-attractions", () => searchAttractions(city)), ]); // 后续步骤可以同时使用weather和attractions的结果条件分支:基于步骤结果,使用
if/else或switch来决定发送不同的事件,从而触发不同的下游函数。const sentiment = await step.run("analyze-sentiment", () => analyze(userMessage)); if (sentiment.score < 0) { await step.sendEvent({ name: "app/complaint.received", data: { userId, message } }); } else { await step.sendEvent({ name: "app/normal.request.received", data: { userId, message } }); }循环/动态步骤:虽然不能直接写
for循环来动态创建step.run(因为步骤需要在函数定义时确定),但可以通过“发送事件触发自身”的模式来实现。例如,处理一个待办事项列表:const todos = await step.run("fetch-todos", fetchTodos); for (const todo of todos) { // 为每个待办项发送一个事件,触发同一个或另一个函数来处理 await step.sendEvent({ name: "app/todo.item.process", data: { todoId: todo.id, ... }, }); }
5.2 性能优化与成本控制
当你的Agent应用规模增长时,以下几点至关重要:
- 函数拆分:不要把所有逻辑都写在一个巨大的函数里。将不同的业务阶段拆分成独立的Inngest函数。例如,将“意图解析”、“工具执行”、“结果生成”拆开。这提高了代码可维护性,也允许Inngest更灵活地调度和伸缩。
- 冷启动与超时:如果你的函数部署在Serverless平台(如Vercel、AWS Lambda),需要注意冷启动延迟。确保你的函数包体积尽可能小。同时,合理设置函数和步骤的超时时间,避免因长时间等待LLM响应或第三方API而导致不必要的资源占用和费用。
- LLM调用优化:这是成本大头。
- 缓存:对相似的、确定性的LLM提示词(prompt)结果进行缓存。可以使用
step.run的幂等性,或者外接一个Redis缓存。 - 流式响应:如果最终输出是给用户的,考虑使用LLM的流式响应,并通过Server-Sent Events (SSE) 或WebSocket推送给前端,提升用户体验。
agent-kit本身不直接处理流式,但你可以将流式处理封装成一个工具,或者在生成最终答案的步骤中实现。 - 模型选择:非核心的推理步骤(如文本分类、简单提取)可以使用更便宜、更快的模型(如gpt-3.5-turbo),把gpt-4等大模型留给最需要创造性和复杂推理的环节。
- 缓存:对相似的、确定性的LLM提示词(prompt)结果进行缓存。可以使用
- 监控与告警:除了Inngest UI,你需要将关键指标(工作流执行时长、失败率、工具调用延迟)集成到你的APM工具(如Datadog, Sentry)中。为关键业务工作流的失败设置告警。
5.3 安全与权限
将AI Agent接入业务流程,安全是底线。
- 工具调用沙箱化:工具(Tool)的
execute函数可能执行任意代码(如执行数据库查询、调用外部API)。务必进行严格的输入验证和权限检查。例如,一个“发送邮件”的工具,必须在执行前验证当前工作流关联的用户是否有权向目标地址发送邮件。 - 敏感信息处理:不要在事件数据、步骤状态或日志中明文存储API密钥、用户密码等敏感信息。使用环境变量或安全的密钥管理服务(如AWS Secrets Manager, HashiCorp Vault)。Inngest事件数据在传输和存储时是加密的,但仍需遵循最小权限原则。
- 用户输入净化:传递给LLM的用户输入必须进行适当的清理和检查,防止提示词注入攻击。
- 审计日志:确保所有工具调用、关键决策点(特别是涉及费用、数据变更的)都有清晰的审计日志,并关联到具体的用户和工作流实例。Inngest的执行历史本身就是一份强大的审计日志。
6. 常见问题与排查实录
在实际使用agent-kit的过程中,我遇到并总结了一些典型问题:
问题1:函数注册成功,但发送事件后没有执行。
- 排查:
- 检查Inngest Dev Server UI或Cloud Dashboard,确认事件是否成功送达。事件列表里应该有记录。
- 检查你的函数定义中的
onEvents配置。事件名称必须完全匹配(包括大小写)。app/user.request.received和app/user.request.received.v2是两个不同的事件。 - 检查你的服务端(
servehandler)是否正常运行,并且网络可达。Inngest服务会向你的http://your-server.com/api/inngest发送POST请求来触发函数。 - 查看服务端日志,确认收到了来自Inngest的调用。
问题2:step.run里的代码被重复执行了。
- 原因与解决:这通常是因为函数执行中途失败(如未捕获的异常、超时),触发了Inngest的重试机制。确保
step.run内部的代码是幂等的。如果操作本身不幂等(如发送邮件、创建订单),你需要:- 在操作前检查状态(例如,查询这笔订单是否已创建)。
- 使用一个唯一ID(如
stepId或自己生成的UUID)作为幂等键,确保即使重复执行,外部系统也只会处理一次。 - 或者,将非幂等操作移到函数最后,并减少其重试次数。
问题3:Agent调用工具时,LLM总是传错参数格式。
- 排查:
- 检查工具描述:工具的
description和每个参数的description是否清晰无歧义?LLM完全依赖这些描述来理解工具。 - 检查参数Schema:
args的JSON Schema定义是否正确?特别是type和optional字段。 - 提供Few-shot示例:在给Agent的
instructions或系统提示词中,提供一两个正确调用该工具的示例,能显著提升准确性。 - 使用更强大的模型:如果使用gpt-3.5-turbo经常出错,可以尝试换用gpt-4或claude-3,它们在函数调用/工具调用上通常更可靠。
- 检查工具描述:工具的
问题4:工作流等待用户确认(step.waitForEvent)超时后,后续逻辑不执行。
- 排查:
step.waitForEvent在超时后,返回值是null。你的代码必须处理这种null情况。参考前面示例中的if (userConfirmation) {...} else {...}分支。超时本身不会导致函数失败,你需要显式地处理超时逻辑。
问题5:如何测试复杂的工作流?
- 最佳实践:
- 单元测试:单独测试你的工具函数(
execute方法)和纯业务逻辑函数。这些不依赖Inngest运行时。 - 集成测试:使用
inngestSDK的测试工具。你可以创建一个模拟的Inngest客户端,发送事件,然后直接运行你的函数代码,并断言其发送了特定的事件或调用了特定的服务。这能让你在CI/CD流水线中测试工作流的整体逻辑。 - 端到端测试(E2E):在接近生产的环境(如预发布环境)中,通过真实API触发完整流程,验证从用户请求到最终结果的全链路。Inngest的“重放”功能在这里也很有用。
- 单元测试:单独测试你的工具函数(
