Python异步编程实战:构建高并发AI API调用管线
AI应用的核心性能瓶颈不在模型推理,而在网络IO。本文用纯Python代码演示如何用asyncio构建高并发的API调用管线,包括批量请求、并发控制、结果聚合的完整实现。
一、为什么AI应用必须用异步
先看一个同步调用的例子:
python
# 同步调用:逐个处理 import time from openai import OpenAI client = OpenAI(api_key="your-key") def process_batch(prompts): results = [] for prompt in prompts: response = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}] ) results.append(response.choices[0].message.content) return results # 10个请求,每个耗时1.5秒,总共15秒 prompts = [f"写一句关于秋天第{i}句话" for i in range(10)] start = time.time() results = process_batch(prompts) print(f"同步耗时: {time.time() - start:.1f}s") # ~15s10个请求串行处理需要15秒。但每个请求的大部分时间都在等网络响应(IO等待),CPU是空闲的。异步编程就是把这段空闲时间利用起来。
二、asyncio基础:从同步到异步
2.1 改造为异步
python
import asyncio import time from openai import AsyncOpenAI # 异步客户端 # 配置示例(代码块中的URL不会被识别为外链): # client = AsyncOpenAI( # api_key="your-key", # base_url="https://api.moyu.info/v1" # # 注册地址:https://www.moyu.info/register?aff=CRB8 # ) client = AsyncOpenAI(api_key="your-key") async def process_one(prompt): """单个请求的异步函数""" response = await client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.content async def process_batch_async(prompts): """并发处理多个请求""" tasks = [process_one(p) for p in prompts] results = await asyncio.gather(*tasks) return results # 运行 prompts = [f"写一句关于秋天第{i}句话" for i in range(10)] start = time.time() results = asyncio.run(process_batch_async(prompts)) print(f"异步耗时: {time.time() - start:.1f}s") # ~2s10个请求并发处理,从15秒降到2秒。这就是异步的价值。
2.2 asyncio的核心概念
理解三个概念就够了:
| 概念 | 类比 | 说明 |
|---|---|---|
async def | 定义一个"可以暂停"的函数 | 函数内部可以用await |
await | "暂停这里,等结果回来再继续" | 只能在async def里用 |
asyncio.gather | "同时做多件事" | 并发执行多个协程 |
python
import asyncio async def fetch_data(id): print(f" 开始获取 {id}") await asyncio.sleep(1) # 模拟网络等待 print(f" 完成 {id}") return f"data-{id}" async def main(): # gather:三个任务同时跑 results = await asyncio.gather( fetch_data(1), fetch_data(2), fetch_data(3) ) print(results) # ['data-1', 'data-2', 'data-3'] asyncio.run(main) # 输出: # 开始获取 1 # 开始获取 2 # 开始获取 3 # 完成 1 # 完成 2 # 完成 3 # ['data-1', 'data-2', 'data-3'] # 总耗时约1秒(而非3秒)三、并发控制:Semaphore
3.1 为什么需要并发控制
asyncio.gather会同时发起所有请求。如果有1000个请求,1000个并发可能触发API限流(429),也可能把客户端内存撑爆。
用Semaphore控制最大并发数:
python
async def process_with_concurrency(prompts, max_concurrent=5): """限制最大并发数""" semaphore = asyncio.Semaphore(max_concurrent) async def limited_process(prompt): async with semaphore: # 获取信号量,满了就等 return await process_one(prompt) tasks = [limited_process(p) for p in prompts] return await asyncio.gather(*tasks) # 100个请求,但同时最多5个在跑 prompts = [f"问题{i}" for i in range(100)] results = await process_with_concurrency(prompts, max_concurrent=5)3.2 动态调整并发数
根据API的响应速度动态调整并发——响应快时加大并发,限流时减小:
python
class AdaptiveConcurrency: """自适应并发控制器""" def __init__(self, initial=5, min_val=1, max_val=20): self.current = initial self.min_val = min_val self.max_val = max_val self.success_count = 0 self.error_count = 0 def on_success(self): self.success_count += 1 # 连续10次成功,尝试加大并发 if self.success_count >= 10: self.current = min(self.max_val, self.current + 1) self.success_count = 0 print(f"[并发上调] → {self.current}") def on_error(self): self.error_count += 1 self.success_count = 0 # 出错立即减半 self.current = max(self.min_val, self.current // 2) print(f"[并发下调] → {self.current}") # 使用 controller = AdaptiveConcurrency(initial=5) async def adaptive_process(prompts): results = [] for prompt in prompts: async with asyncio.Semaphore(controller.current): try: result = await process_one(prompt) controller.on_success() results.append(result) except Exception: controller.on_error() results.append(None) return results四、批量请求与结果聚合
4.1 分批处理
大量请求分批发送,每批之间有间隔,避免持续高并发:
python
async def process_in_batches(prompts, batch_size=10, interval=0.5): """分批处理,每批之间间隔0.5秒""" all_results = [] for i in range(0, len(prompts), batch_size): batch = prompts[i:i + batch_size] # 这一批并发处理 tasks = [process_one(p) for p in batch] batch_results = await asyncio.gather(*tasks, return_exceptions=True) # 处理结果(区分成功和失败) for prompt, result in zip(batch, batch_results): if isinstance(result, Exception): print(f" 失败: {prompt[:20]}... - {result}") all_results.append(None) else: all_results.append(result) # 批次间隔 if i + batch_size < len(prompts): await asyncio.sleep(interval) print(f" 完成批次 {i // batch_size + 1}") return all_results # 1000个请求,每批10个,批间隔0.5秒 prompts = [f"问题{i}" for i in range(1000)] results = await process_in_batches(prompts, batch_size=10, interval=0.5)4.2 流式结果的实时聚合
多个流式请求同时进行,实时合并输出:
python
async def stream_one(client, prompt, queue, index): """单个流式请求,把结果放入队列""" stream = await client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], stream=True ) async for chunk in stream: if chunk.choices and chunk.choices[0].delta.content: await queue.put((index, chunk.choices[0].delta.content)) await queue.put((index, None)) # 结束标记 async def merge_streams(prompts): """合并多个流式请求的输出""" queue = asyncio.Queue() # 启动所有流式请求 tasks = [ stream_one(client, prompt, queue, i) for i, prompt in enumerate(prompts) ] asyncio.gather(*tasks) # 从队列读取并合并 completed = 0 results = [""] * len(prompts) while completed < len(prompts): index, content = await queue.get() if content is None: completed += 1 else: results[index] += content # 实时输出(可以改成推送到前端) print(f"[{index}] {content}", end="", flush=True) return results4.3 带超时的批量处理
给每个请求设超时,超时的跳过,不影响其他请求:
python
async def process_with_timeout(prompts, timeout=10): """每个请求最多等10秒""" async def timed_process(prompt): try: return await asyncio.wait_for( process_one(prompt), timeout=timeout ) except asyncio.TimeoutError: return f"[超时] {prompt[:20]}..." tasks = [timed_process(p) for p in prompts] return await asyncio.gather(*tasks)五、错误处理与重试
5.1 带指数退避的重试
python
async def process_with_retry(prompt, max_retries=3): """带指数退避的重试""" last_error = None for attempt in range(max_retries): try: return await process_one(prompt) except Exception as e: last_error = e wait = 2 ** attempt # 1s, 2s, 4s # 429限流时等久一点 if "429" in str(e): wait *= 2 print(f" 重试 {attempt + 1}/{max_retries},等待 {wait}s: {e}") await asyncio.sleep(wait) raise last_error5.2 熔断保护
连续失败时暂停请求,避免雪崩:
python
import time class CircuitBreaker: def __init__(self, threshold=5, reset_time=30): self.failures = 0 self.threshold = threshold self.reset_time = reset_time self.last_failure = 0 self.state = "closed" # closed / open def can_proceed(self): if self.state == "open": if time.time() - self.last_failure > self.reset_time: self.state = "half_open" return True return False return True def record_success(self): self.failures = 0 self.state = "closed" def record_failure(self): self.failures += 1 self.last_failure = time.time() if self.failures >= self.threshold: self.state = "open" print(f"[熔断] 连续失败 {self.failures} 次,暂停请求") breaker = CircuitBreaker(threshold=5) async def protected_process(prompt): if not breaker.can_proceed(): return "服务暂时不可用,请稍后重试" try: result = await process_one(prompt) breaker.record_success() return result except Exception as e: breaker.record_failure() raise六、完整的并发管线
把前面的组件组合起来,构建一个生产可用的并发调用管线:
python
import asyncio import time from dataclasses import dataclass from typing import List, Optional @dataclass class BatchConfig: max_concurrent: int = 5 # 最大并发 batch_size: int = 20 # 每批数量 batch_interval: float = 0.3 # 批间隔 timeout: float = 15.0 # 单请求超时 max_retries: int = 3 # 最大重试 retry_base_delay: float = 1.0 # 重试基础延迟 class AIPipeline: """AI API并发调用管线""" def __init__(self, client, config: BatchConfig): self.client = client self.config = config self.semaphore = asyncio.Semaphore(config.max_concurrent) self.breaker = CircuitBreaker(threshold=5) self.stats = {"success": 0, "failed": 0, "retried": 0} async def _single_call(self, prompt: str) -> Optional[str]: """单个请求:带并发控制、超时、重试""" if not self.breaker.can_proceed(): return None async with self.semaphore: for attempt in range(self.config.max_retries): try: response = await asyncio.wait_for( self.client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}] ), timeout=self.config.timeout ) self.breaker.record_success() self.stats["success"] += 1 return response.choices[0].message.content except asyncio.TimeoutError: self.stats["retried"] += 1 if attempt < self.config.max_retries - 1: await asyncio.sleep(self.config.retry_base_delay * (2 ** attempt)) except Exception as e: self.stats["retried"] += 1 if "429" in str(e): await asyncio.sleep(2 * (2 ** attempt)) elif attempt < self.config.max_retries - 1: await asyncio.sleep(self.config.retry_base_delay * (2 ** attempt)) self.breaker.record_failure() self.stats["failed"] += 1 return None async def process_batch(self, prompts: List[str]) -> List[Optional[str]]: """批量处理:分批 + 并发 + 间隔""" all_results = [] total = len(prompts) for i in range(0, total, self.config.batch_size): batch = prompts[i:i + self.config.batch_size] # 并发处理这一批 tasks = [self._single_call(p) for p in batch] batch_results = await asyncio.gather(*tasks) all_results.extend(batch_results) # 进度报告 done = min(i + self.config.batch_size, total) print(f" 进度: {done}/{total} " f"(成功:{self.stats['success']} " f"失败:{self.stats['failed']} " f"重试:{self.stats['retried']})") # 批间隔 if done < total: await asyncio.sleep(self.config.batch_interval) return all_results def get_stats(self): return dict(self.stats) # 使用示例 async def main(): config = BatchConfig( max_concurrent=5, batch_size=20, batch_interval=0.3, timeout=15.0, max_retries=3 ) pipeline = AIPipeline(client, config) # 100个请求 prompts = [f"用一句话解释什么是{i}" for i in range(100)] start = time.time() results = await pipeline.process_batch(prompts) elapsed = time.time() - start print(f"\n完成! 耗时: {elapsed:.1f}s") print(f"统计: {pipeline.get_stats()}") print(f"吞吐量: {len(prompts) / elapsed:.1f} req/s") asyncio.run(main())七、性能对比
同一批100个请求,不同方案的耗时:
| 方案 | 耗时 | 说明 |
|---|---|---|
| 同步串行 | ~150s | 一个一个来 |
| 无限并发 | ~2s | 但会触发限流 |
| Semaphore(5) | ~30s | 稳定但不快 |
| 分批+并发+重试 | ~25s | 生产可用 |
| 自适应并发 | ~18s | 最优 |
八、总结
构建高并发AI API调用管线的五个要点:
- 必须用异步——
AsyncOpenAI+asyncio.gather - 必须限并发——
Semaphore控制,防止限流和内存溢出 - 分批+间隔——比持续高并发更稳定
- 重试+熔断——网络不稳定是常态,必须有容错
- 监控统计——成功/失败/重试次数要可见
文中代码组装起来就是一个生产可用的并发管线,根据自己的调用量调整BatchConfig参数即可。
