微调数据对齐搞不定?多 Agent 协同才是出路
微调数据对齐搞不定?多 Agent 协同才是出路
前言
我们团队做 AI 客服,需要对齐多个业务方的数据标注标准。运营想要这样,产品想要那样,标注人员无所适从。
后来用多 Agent 协同架构,搞了个统一的微调数据对齐流程,效率提升了不少。今天聊聊这个方案。
一、底层原理
1.1 微调数据对齐为什么难
多 Agent 系统中,每个 Agent 有自己的数据标注标准:
graph TD A["业务数据"] --> B["Agent 1 标注"] A --> C["Agent 2 标注"] A --> D["Agent 3 标注"] B --> E{"格式对齐"} C --> E D --> E E --> F["标准统一"] F --> G["消息路由"] G --> H["状态一致性"] H --> I["微调数据"]核心难点:
- 不同 Agent 标注格式不同
- 数据版本不一致
- 消息传递丢失
- 状态同步有延迟
1.2 对齐方案对比
| 方案 | 一致性 | 效率 | 实现难度 |
|---|---|---|---|
| 全手动对齐 | 高 | 极低 | 低 |
| 规则引擎 | 中 | 中 | 中 |
| 多 Agent 协同 | 高 | 高 | 高 |
| 端到端模型 | 低 | 高 | 高 |
二、快速上手
先看一个简单的数据对齐 Agent:
from typing import Dict, List, Any from dataclasses import dataclass @dataclass class DataItem: text: str label: str source: str version: int class AlignmentAgent: def __init__(self, name: str, schema: Dict): self.name = name self.schema = schema self.data_store = {} def format_data(self, raw_data: Dict) -> DataItem: return DataItem( text=raw_data.get("text", ""), label=self._convert_label(raw_data.get("label", "")), source=self.name, version=1 ) def _convert_label(self, label: str) -> str: # 统一标签格式 label_map = { "正向": "positive", "负向": "negative", "中性": "neutral", } return label_map.get(label, label)三、核心 API / 深水区
3.1 消息路由与状态同步速查
| 组件 | 职责 | 实现 |
|---|---|---|
| 消息队列 | 传递数据 | 确保不丢失 |
| 状态存储 | 记录版本 | 版本号控制 |
| 对齐器 | 格式转换 | 统一 Schema |
| 校验器 | 数据验证 | 质量检查 |
3.2 版本控制的状态同步
class VersionedDataStore: def __init__(self): self.store = {} self.version_counter = {} def save(self, key: str, data: Any) -> int: if key not in self.version_counter: self.version_counter[key] = 0 self.version_counter[key] += 1 self.store[f"{key}_{self.version_counter[key]}"] = data return self.version_counter[key] def get_latest(self, key: str): version = self.version_counter.get(key, 0) return self.store.get(f"{key}_{version}") def resolve_conflict(self, key: str, versions: List[int]): latest_version = max(versions) return self.store.get(f"{key}_{latest_version}")3.3 消息路由实现
import uuid from datetime import datetime class MessageRouter: def __init__(self): self.queues = {} self.processed = set() def send(self, to_agent: str, data: Dict): if to_agent not in self.queues: self.queues[to_agent] = [] message = { "id": str(uuid.uuid4()), "data": data, "timestamp": datetime.now().isoformat(), "retry": 0 } self.queues[to_agent].append(message) return message["id"] def receive(self, agent_name: str): queue = self.queues.get(agent_name, []) if queue: msg = queue.pop(0) self.processed.add(msg["id"]) return msg return None def retry_failed(self, agent_name: str): queue = self.queues.get(agent_name, []) for msg in queue: msg["retry"] += 1四、实战演练
多 Agent 数据对齐系统:
import json import time from typing import Dict, List, Optional from dataclasses import dataclass, field @dataclass class AlignedRecord: text: str label: str confidence: float sources: List[str] version: int class DataAlignmentOrchestrator: def __init__(self): self.agents = {} self.router = MessageRouter() self.store = VersionedDataStore() self.aligned_records = [] def register_agent(self, name: str, schema: Dict): agent = AlignmentAgent(name, schema) self.agents[name] = agent def process_data(self, raw_data: Dict) -> Optional[AlignedRecord]: # 1. 分发给所有 Agent msg_ids = [] for agent_name in self.agents: msg_id = self.router.send(agent_name, raw_data) msg_ids.append(msg_id) # 2. 收集标注结果 results = [] for agent_name in self.agents: result = self.router.receive(agent_name) if result: agent = self.agents[agent_name] formatted = agent.format_data(result["data"]) results.append(formatted) # 3. 对齐 if not results: return None aligned = self._align_results(results) if aligned: self.aligned_records.append(aligned) return aligned def _align_results(self, results: List[DataItem]) -> Optional[AlignedRecord]: texts = set(r.text for r in results) if len(texts) > 1: text = next(iter(texts)) else: text = results[0].text labels = [r.label for r in results] label = self._majority_vote(labels) confidence = labels.count(label) / len(labels) sources = [r.source for r in results] version = max(r.version for r in results) return AlignedRecord( text=text, label=label, confidence=confidence, sources=sources, version=version ) def _majority_vote(self, items: List) -> Any: from collections import Counter return Counter(items).most_common(1)[0][0] def get_stats(self): return { "total_records": len(self.aligned_records), "avg_confidence": sum(r.confidence for r in self.aligned_records) / max(len(self.aligned_records), 1) } orchestrator = DataAlignmentOrchestrator() orchestrator.register_agent("agent_a", {}) orchestrator.register_agent("agent_b", {}) data = {"text": "这家餐厅不好吃", "label": "负向"} result = orchestrator.process_data(data) print(f"对齐结果: {result}")五、避坑指南与最佳实践
💡 **技巧:消息路由用队列保证不丢
数据对齐不能丢数据,消息队列是必须的。
⚠️ **警告:版本的冲突要合并
多个 Agent 并行标注,合并时要有策略。
✅ **推荐:用置信度过滤低质量数据
置信度低于 0.7 的数据,需要人工复核。
六、综合实战演示
生产级数据对齐管道:
import json from typing import Dict, List, Any from enum import Enum class DataStatus(Enum): PENDING = "pending" ALIGNED = "aligned" CONFLICT = "conflict" REVIEWED = "reviewed" class QualityController: def __init__(self, min_confidence=0.7): self.min_confidence = min_confidence def check_quality(self, record: AlignedRecord) -> bool: return record.confidence >= self.min_confidence def check_completeness(self, record: AlignedRecord) -> bool: return bool(record.text) and bool(record.label) class DataPipeline: def __init__(self): self.orchestrator = DataAlignmentOrchestrator() self.quality = QualityController() self.pipeline_log = [] def add_source(self, name: str, schema: Dict): self.orchestrator.register_agent(name, schema) def process_batch(self, raw_batch: List[Dict]) -> Dict[str, Any]: results = { "aligned": [], "conflict": [], "failed": [] } for i, raw in enumerate(raw_batch): try: aligned = self.orchestrator.process_data(raw) if not aligned: results["failed"].append(i) continue if self.quality.check_quality(aligned): results["aligned"].append(aligned) else: results["conflict"].append(aligned) except Exception as e: results["failed"].append({"index": i, "error": str(e)}) self.pipeline_log.append(results) return results def export_aligned_data(self, format="json"): aligned = [] for batch in self.pipeline_log: for record in batch.get("aligned", []): aligned.append({ "text": record.text, "label": record.label, "confidence": record.confidence }) if format == "json": return json.dumps(aligned, ensure_ascii=False) return aligned pipeline = DataPipeline() pipeline.add_source("agent_a", {}) pipeline.add_source("agent_b", {}) batch = [ {"text": "这个产品很好用", "label": "正向"}, {"text": "服务态度很差", "label": "负向"}, {"text": "一般般吧", "label": "中性"}, ] result = pipeline.process_batch(batch) print(f"对齐: {len(result['aligned'])}, 冲突: {len(result['conflict'])}, 失败: {len(result['failed'])}")七、总结
多 Agent 协同做微调数据对齐:
- 统一消息路由
- 版本化状态管理
- 置信度质量控制
- 冲突自动合并
搞好了这些,微调数据对齐就不再是难题。
