Agentic Design Patterns-模式3:并行化(Parallelization)的代码实现
目录
1. 概述
2. 价值
3. 代码实现
代码说明
实际使用注意事项
1. 概述
许多复杂的智能体任务其实包含多个可以同时执行的子任务,而不是一个接一个地串行处理,这时,并行化设计模式就变得至关重要。并行化模式是一种通过同时执行独立子任务优化计算流程的方法,尤其适用于涉及多次模型推理或外部服务调用的复杂操作,可有效降低整体延迟。
核心思想是识别流程中彼此无依赖的部分,并将它们并行执行。尤其在涉及外部服务(如API或数据库)有延迟时,可以同时发起多个请求,显著提升效率。
图1:并行化设计模式
图2:并行化与子智能体示例
2. 价值
并行化是一种通过并发执行独立任务提升效率的设计模式,尤其适用于涉及外部资源(如API调用)等待的场景,并行化可显著降低整体延迟,让智能体系统在复杂任务下更具响应性。但并发/并行架构也可能会增加设计、调试和日志等开发复杂度与成本。
3. 代码实现
以下是并行化(Parallelization)模式的Python实现,包含完整的使用示例:
import asyncio import concurrent.futures from typing import List, Any, Dict, Callable import time from dataclasses import dataclass import logging # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class ParallelTask: """并行任务数据结构""" task_id: str system_prompt: str user_prompt: str model: str = "gpt-4o" metadata: Dict[str, Any] = None class ParallelProcessor: """并行处理器""" def __init__(self, max_workers: int = 5): """ 初始化并行处理器 Args: max_workers: 最大并发工作线程数 """ self.max_workers = max_workers self.client = None def _init_client(self): """初始化模型客户端(懒加载)""" if self.client is None: # 模拟外部模型客户端获取 # 实际使用时取消下面两行的注释 # from model_factory import get_model_client # self.client = get_model_client() # 模拟客户端(用于演示) class MockClient: class chat: class completions: @staticmethod def create(model, messages, stream=False): class Response: class Choice: class Message: content = f"Mock response for model {model}" choices = [Choice()] return Response() self.client = MockClient() return self.client def _process_single_task_sync(self, task: ParallelTask) -> str: """ 同步处理单个任务 Args: task: 并行任务 Returns: 处理结果 """ try: client = self._init_client() messages = [ {"role": "system", "content": task.system_prompt}, {"role": "user", "content": task.user_prompt}, ] logger.info(f"开始处理任务 {task.task_id},使用模型 {task.model}") response = client.chat.completions.create( model=task.model, messages=messages, stream=False ) result = response.choices[0].message.content logger.info(f"任务 {task.task_id} 处理完成") return result except Exception as e: logger.error(f"处理任务 {task.task_id} 时出错: {e}") return f"Error: {str(e)}" def process_tasks_parallel(self, tasks: List[ParallelTask]) -> Dict[str, str]: """ 并行处理多个任务(线程池方式) Args: tasks: 任务列表 Returns: 任务ID到结果的映射字典 """ results = {} logger.info(f"开始并行处理 {len(tasks)} 个任务,最大并发数: {self.max_workers}") start_time = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: # 提交所有任务 future_to_task = { executor.submit(self._process_single_task_sync, task): task for task in tasks } # 收集结果 for future in concurrent.futures.as_completed(future_to_task): task = future_to_task[future] try: result = future.result() results[task.task_id] = result except Exception as e: logger.error(f"任务 {task.task_id} 执行异常: {e}") results[task.task_id] = f"Error: {str(e)}" elapsed_time = time.time() - start_time logger.info(f"所有任务处理完成,总耗时: {elapsed_time:.2f}秒") return results async def _process_single_task_async(self, task: ParallelTask) -> str: """ 异步处理单个任务 Args: task: 并行任务 Returns: 处理结果 """ try: # 在实际IO操作处使用asyncio.to_thread loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, self._process_single_task_sync, task ) return result except Exception as e: logger.error(f"异步处理任务 {task.task_id} 时出错: {e}") return f"Error: {str(e)}" async def process_tasks_async(self, tasks: List[ParallelTask]) -> Dict[str, str]: """ 异步并行处理多个任务 Args: tasks: 任务列表 Returns: 任务ID到结果的映射字典 """ logger.info(f"开始异步并行处理 {len(tasks)} 个任务") start_time = time.time() # 创建所有异步任务 async_tasks = [ self._process_single_task_async(task) for task in tasks ] # 等待所有任务完成 results_list = await asyncio.gather(*async_tasks, return_exceptions=True) # 整理结果 results = {} for task, result in zip(tasks, results_list): if isinstance(result, Exception): results[task.task_id] = f"Error: {str(result)}" else: results[task.task_id] = result elapsed_time = time.time() - start_time logger.info(f"异步任务处理完成,总耗时: {elapsed_time:.2f}秒") return results class ParallelAgent: """并行智能体示例""" def __init__(self, processor: ParallelProcessor): self.processor = processor def analyze_multiple_documents(self, documents: List[Dict]) -> Dict: """ 并行分析多个文档 Args: documents: 文档列表,每个文档包含内容和分析要求 Returns: 分析结果 """ tasks = [] for i, doc in enumerate(documents): task = ParallelTask( task_id=f"doc_{i}", system_prompt="你是一个文档分析助手。请分析用户提供的文档内容。", user_prompt=f"请分析以下文档:\n\n{doc['content']}\n\n分析要求:{doc['analysis_type']}", model="gpt-4o", metadata={"doc_index": i, "title": doc.get("title", f"文档{i}")} ) tasks.append(task) # 并行处理所有文档分析任务 results = self.processor.process_tasks_parallel(tasks) return { "total_documents": len(documents), "analysis_results": results, "summary": f"成功分析 {len([r for r in results.values() if not r.startswith('Error')])}/{len(documents)} 个文档" } def batch_sentiment_analysis(self, texts: List[str]) -> Dict: """ 批量情感分析 Args: texts: 文本列表 Returns: 情感分析结果 """ tasks = [] for i, text in enumerate(texts): task = ParallelTask( task_id=f"sentiment_{i}", system_prompt="你是一个情感分析助手。请分析文本的情感倾向。", user_prompt=f"请分析以下文本的情感倾向(正面、负面或中性):\n\n{text}", model="gpt-4o" ) tasks.append(task) # 使用异步方式处理 results = asyncio.run(self.processor.process_tasks_async(tasks)) return { "total_texts": len(texts), "sentiment_results": results, "statistics": self._calculate_sentiment_stats(results) } def _calculate_sentiment_stats(self, results: Dict[str, str]) -> Dict: """计算情感统计""" stats = {"positive": 0, "negative": 0, "neutral": 0, "error": 0} for result in results.values(): if "正面" in result: stats["positive"] += 1 elif "负面" in result: stats["negative"] += 1 elif "中性" in result: stats["neutral"] += 1 elif "Error" in result: stats["error"] += 1 return stats # 使用示例 def main(): """主函数示例""" # 创建并行处理器 processor = ParallelProcessor(max_workers=3) # 创建并行智能体 agent = ParallelAgent(processor) # 示例1:并行处理多个独立任务 print("=" * 50) print("示例1:基本并行任务处理") print("=" * 50) tasks = [ ParallelTask( task_id="task_1", system_prompt="你是一个翻译助手。", user_prompt="请将以下英文翻译成中文:Hello, how are you today?", model="gpt-4o" ), ParallelTask( task_id="task_2", system_prompt="你是一个代码助手。", user_prompt="用Python写一个快速排序算法。", model="gpt-4o" ), ParallelTask( task_id="task_3", system_prompt="你是一个知识问答助手。", user_prompt="简述人工智能的发展历史。", model="gpt-4o" ), ParallelTask( task_id="task_4", system_prompt="你是一个诗歌创作助手。", user_prompt="写一首关于春天的五言绝句。", model="gpt-4o" ), ] # 并行处理任务 results = processor.process_tasks_parallel(tasks) for task_id, result in results.items(): print(f"\n任务 {task_id} 结果:") print(f"结果预览: {result[:100]}...") # 示例2:文档分析 print("\n" + "=" * 50) print("示例2:并行文档分析") print("=" * 50) documents = [ { "title": "AI报告", "content": "人工智能正在改变世界...", "analysis_type": "提取关键观点" }, { "title": "技术文章", "content": "深度学习在自然语言处理中的应用...", "analysis_type": "总结主要内容" }, { "title": "市场分析", "content": "2024年科技市场趋势...", "analysis_type": "分析市场机会" } ] doc_results = agent.analyze_multiple_documents(documents) print(f"文档分析完成: {doc_results['summary']}") # 示例3:批量情感分析 print("\n" + "=" * 50) print("示例3:批量情感分析") print("=" * 50) texts = [ "这个产品非常好用,我很满意!", "服务态度很差,不会再来了。", "质量很好,但价格有点高。", "物流很快,包装也很用心。" ] sentiment_results = agent.batch_sentiment_analysis(texts) print(f"情感分析统计: {sentiment_results['statistics']}") # 高级用法示例:带错误处理和超时控制 class RobustParallelProcessor(ParallelProcessor): """增强型并行处理器,带错误处理和超时控制""" def process_tasks_with_timeout(self, tasks: List[ParallelTask], timeout_per_task: float = 30.0) -> Dict[str, str]: """ 带超时控制的并行处理 Args: tasks: 任务列表 timeout_per_task: 每个任务的超时时间(秒) Returns: 任务ID到结果的映射字典 """ results = {} with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: future_to_task = {} # 提交任务,设置超时 for task in tasks: future = executor.submit(self._process_single_task_sync, task) future_to_task[future] = task # 收集结果,处理超时 for future in concurrent.futures.as_completed(future_to_task.keys(), timeout=timeout_per_task * len(tasks)): task = future_to_task[future] try: result = future.result(timeout=timeout_per_task) results[task.task_id] = result except concurrent.futures.TimeoutError: logger.warning(f"任务 {task.task_id} 超时") results[task.task_id] = "Error: Timeout" except Exception as e: logger.error(f"任务 {task.task_id} 执行异常: {e}") results[task.task_id] = f"Error: {str(e)}" return results if __name__ == "__main__": # 运行示例 main() # 使用增强版处理器 print("\n" + "=" * 50) print("增强版并行处理器示例") print("=" * 50) robust_processor = RobustParallelProcessor(max_workers=2) test_tasks = [ ParallelTask( task_id="robust_task_1", system_prompt="测试", user_prompt="简单响应", model="gpt-4o" ) ] robust_results = robust_processor.process_tasks_with_timeout( test_tasks, timeout_per_task=10.0 ) print(f"增强版处理结果: {robust_results}")代码说明
1. 核心组件
- ParallelTask:并行任务数据结构,封装单个任务的所有信息
- ParallelProcessor:核心并行处理器,提供两种并行方式:
process_tasks_parallel():基于线程池的同步并行process_tasks_async():基于asyncio的异步并行
- ParallelAgent:应用示例,展示如何在智能体任务中使用并行化
2. 主要特性
- 并发控制:通过
max_workers限制最大并发数 - 错误处理:每个任务独立处理异常,不影响其他任务
- 结果收集:自动收集并整理所有任务结果
- 性能监控:记录每个任务的执行时间和总耗时
3. 使用场景示例
- 文档批量分析:同时分析多个文档
- 情感批量分析:并行处理大量文本情感分析
- 多任务处理:同时执行翻译、代码生成、问答等独立任务
实际使用注意事项
1. 客户端初始化:
# 取消注释以下代码使用真实模型客户端 # from model_factory import get_model_client # self.client = get_model_client()2. 并发数调整:
- 根据API限制调整
max_workers - 考虑网络带宽和服务器负载
3. 错误处理增强:
- 可添加重试机制
- 实现断路器模式防止级联故障
- 添加更详细的监控和日志
4. 资源管理:
- 使用连接池管理数据库/API连接
- 实现优雅关闭,确保所有任务完成
适用场景总结:
这个并行化设计模式特别适合以下场景:
- 需要同时调用多个外部API
- 批量处理大量独立数据
- 需要优化整体响应时间的智能体系统
- 有多个可独立执行的子任务
通过并行执行独立任务,可以显著减少总体等待时间,特别是在涉及网络延迟的情况下。
参照书籍《Agentic Design Patterns》的基本概念和观点。
