当前位置: 首页 > news >正文

大模型长期记忆同步:多 Agent 间的消息路由机制设计

大模型长期记忆同步:多 Agent 间的消息路由机制设计

前言

多 Agent 系统里,最头疼的问题就是"Agent 之间信息不同步"。Agent A 记住了用户的需求,Agent B 一无所知。

这个问题本质是消息路由和状态一致性问题。本文设计了一套方案,把长期记忆变成可路由的消息,在 Agent 之间同步。今天聊聊。

一、底层原理

1.1 长期记忆的消息路由

长期记忆在多个 Agent 之间同步,本质是一个消息传递问题:

graph TD A["Agent A 记忆更新"] --> B["生成记忆消息"] B --> C["消息路由"] C --> D["Agent B"] C --> E["Agent C"] C --> F["Agent D"] D --> G["更新本地记忆"] E --> H["更新本地记忆"] F --> I["更新本地记忆"] G --> J{"确认"} J -->|成功| K["ACK"] J -->|失败| L["重试"]

核心挑战:

  • 记忆更新要及时同步
  • 冲突要能解决
  • 记忆不能丢失
  • 性能不能太差

1.2 同步方案对比

方案一致性延迟复杂度
广播
订阅最终
中心化
去中心最终

二、快速上手

2.1 基础消息路由

from typing import Dict, List, Any, Callable from dataclasses import dataclass import uuid import time @dataclass class MemoryMessage: id: str key: str value: Any source_agent: str timestamp: float version: int class MemoryRouter: def __init__(self): self.subscribers: Dict[str, List[Callable]] = {} self.message_log: List[MemoryMessage] = [] def subscribe(self, agent_name: str, callback: Callable): if agent_name not in self.subscribers: self.subscribers[agent_name] = [] self.subscribers[agent_name].append(callback) def publish(self, key: str, value: Any, source: str): msg = MemoryMessage( id=str(uuid.uuid4()), key=key, value=value, source_agent=source, timestamp=time.time(), version=1 ) self.message_log.append(msg) for agent, callbacks in self.subscribers.items(): if agent != source: for cb in callbacks: cb(msg) def get_history(self, key: str) -> List[MemoryMessage]: return [m for m in self.message_log if m.key == key]

三、核心 API / 深水区

3.1 记忆同步机制速查

机制作用实现
事件广播通知更新发布订阅
版本向量冲突检测版本号
最后写入冲突解决时间戳
定期同步保证最终一致后台任务

3.2 版本控制解决冲突

class VersionVector: def __init__(self): self.versions: Dict[str, int] = {} def increment(self, agent: str): self.versions[agent] = self.versions.get(agent, 0) + 1 def get(self, agent: str) -> int: return self.versions.get(agent, 0) def merge(self, other: 'VersionVector'): for agent, ver in other.versions.items(): self.versions[agent] = max( self.versions.get(agent, 0), ver ) class ConflictResolver: def resolve(self, local: Dict, remote: Dict) -> Dict: if remote["version"] > local["version"]: return remote elif local["version"] > remote["version"]: return local else: # 版本相同,以时间戳为准 if remote["timestamp"] > local["timestamp"]: return remote return local

3.3 带重试的路由

import time class ReliableRouter: def __init__(self, max_retries=3): self.max_retries = max_retries self.pending: List[MemoryMessage] = [] def send(self, msg: MemoryMessage, destination: str) -> bool: for attempt in range(self.max_retries): try: self._deliver(msg, destination) return True except Exception: if attempt == self.max_retries - 1: self.pending.append(msg) return False time.sleep(0.1 * (attempt + 1)) return False def _deliver(self, msg, dest): # 实际投递逻辑 pass def retry_pending(self): for msg in self.pending[:]: for dest in self._get_destinations(msg): if self.send(msg, dest): self.pending.remove(msg)

四、实战演练

完整的记忆同步系统:

from typing import Dict, List, Any, Optional from dataclasses import dataclass, field import uuid import time import json @dataclass class SyncEvent: event_id: str agent_name: str memory_key: str memory_value: Any timestamp: float version: int class MemorySyncAgent: def __init__(self, name: str): self.name = name self.local_memory: Dict[str, Any] = {} self.version_vector: Dict[str, int] = {} self.event_log: List[SyncEvent] = [] self.peers: List['MemorySyncAgent'] = [] def add_peer(self, agent: 'MemorySyncAgent'): self.peers.append(agent) def store(self, key: str, value: Any): self.version_vector[key] = self.version_vector.get(key, 0) + 1 self.local_memory[key] = value event = SyncEvent( event_id=str(uuid.uuid4()), agent_name=self.name, memory_key=key, memory_value=value, timestamp=time.time(), version=self.version_vector[key] ) self.event_log.append(event) self._broadcast(event) def _broadcast(self, event: SyncEvent): for peer in self.peers: peer.receive_sync(event) def receive_sync(self, event: SyncEvent): current_version = self.version_vector.get(event.memory_key, 0) if event.version > current_version: self.local_memory[event.memory_key] = event.memory_value self.version_vector[event.memory_key] = event.version self.event_log.append(event) elif event.version == current_version and event.timestamp > self._get_local_ts(event.memory_key): self.local_memory[event.memory_key] = event.memory_value def _get_local_ts(self, key: str) -> float: for event in reversed(self.event_log): if event.memory_key == key: return event.timestamp return 0 def recall(self, key: str) -> Any: return self.local_memory.get(key) def get_state(self) -> Dict: return { "agent": self.name, "memory": self.local_memory, "versions": self.version_vector } class SyncOrchestrator: def __init__(self): self.agents: Dict[str, MemorySyncAgent] = {} def create_agent(self, name: str) -> MemorySyncAgent: agent = MemorySyncAgent(name) self.agents[name] = agent return agent def connect_all(self): names = list(self.agents.keys()) for i, name in enumerate(names): for j in range(i + 1, len(names)): self.agents[name].add_peer(self.agents[names[j]]) self.agents[names[j]].add_peer(self.agents[name]) def sync_all(self): for agent in self.agents.values(): for event in agent.event_log[-5:]: agent._broadcast(event) orchestrator = SyncOrchestrator() agent_a = orchestrator.create_agent("order_agent") agent_b = orchestrator.create_agent("logistics_agent") orchestrator.connect_all() agent_a.store("user_id", "12345") agent_a.store("order_id", "ORDER-001") time.sleep(0.1) print(f"Agent B 记忆: {json.dumps(agent_b.get_state(), ensure_ascii=False)}")

五、避坑指南与最佳实践

💡 **技巧:用版本号解决冲突
谁版本高谁说了算,简单有效。

⚠️ **警告:广播太多会爆炸
每次更新都广播,Agent 多了扛不住。

✅ **推荐:增量同步
只同步变更的内容,不要全量。

六、综合实战演示

生产级记忆同步方案:

from typing import Dict, List, Any, Optional, Set from dataclasses import dataclass, field import json import time @dataclass class MemoryDelta: key: str value: Any version: int timestamp: float class DeltaSyncAgent: def __init__(self, name: str): self.name = name self.memory: Dict[str, Any] = {} self.versions: Dict[str, int] = {} self.deltas: List[MemoryDelta] = [] self.peers: Dict[str, 'DeltaSyncAgent'] = {} self.last_sync: Dict[str, float] = {} def connect(self, name: str, peer: 'DeltaSyncAgent'): self.peers[name] = peer def update(self, key: str, value: Any): ver = self.versions.get(key, 0) + 1 self.memory[key] = value self.versions[key] = ver delta = MemoryDelta( key=key, value=value, version=ver, timestamp=time.time() ) self.deltas.append(delta) self._sync_delta(delta) def _sync_delta(self, delta: MemoryDelta): for name, peer in self.peers.items(): peer.receive_delta(self.name, delta) def receive_delta(self, source: str, delta: MemoryDelta): current = self.versions.get(delta.key, 0) if delta.version > current: self.memory[delta.key] = delta.value self.versions[delta.key] = delta.version self.deltas.append(delta) def get(self, key: str) -> Optional[Any]: return self.memory.get(key) def get_all_deltas(self) -> List[Dict]: return [{ "key": d.key, "value": d.value, "version": d.version } for d in self.deltas[-20:]] class MemoryHub: def __init__(self): self.agents: Dict[str, DeltaSyncAgent] = {} def register(self, name: str) -> DeltaSyncAgent: agent = DeltaSyncAgent(name) self.agents[name] = agent self._connect_new(agent, name) return agent def _connect_new(self, agent: DeltaSyncAgent, name: str): for existing_name, existing in self.agents.items(): if existing_name != name: agent.connect(existing_name, existing) existing.connect(name, agent) def broadcast(self, key: str, value: Any, source: str): agent = self.agents.get(source) if agent: agent.update(key, value) def get_consensus(self, key: str) -> Optional[Any]: values = {} for name, agent in self.agents.items(): v = agent.get(key) if v is not None: values[name] = v if not values: return None # 多数一致 from collections import Counter counter = Counter(str(v) for v in values.values()) most_common = counter.most_common(1) return eval(most_common[0][0]) if most_common else None hub = MemoryHub() agent_a = hub.register("agent_a") agent_b = hub.register("agent_b") agent_c = hub.register("agent_c") hub.broadcast("user_preference", "喜欢红色", "agent_a") time.sleep(0.1) print(f"Agent B 知道: {agent_b.get('user_preference')}") print(f"Agent C 知道: {agent_c.get('user_preference')}") print(f"共识: {hub.get_consensus('user_preference')}")

七、总结

多 Agent 长期记忆的消息路由与同步:

  • 增量同步代替全量
  • 版本号解决冲突
  • 广播 + 订阅
  • 多数一致的保证

做好这些,Agent 之间的记忆就不再"各自为政"了。

http://www.cnnetsun.cn/news/2755316.html

相关文章:

  • IPXWrapper技术方案:为现代Windows系统重构IPX/SPX兼容层,重温经典游戏网络对战
  • YOLOv5视觉瞄准系统架构剖析:基于深度学习的目标检测与实时控制技术实现
  • 2026 论文降AI率工具终极测评:真实体验分享,毕业党生存手册
  • 告别死记硬背:用‘小树’和‘铃儿’轻松搞定三十六计(附110位数字编码表)
  • AI工具链如何接管企业搜索?3步实现语义理解→意图识别→精准召回的闭环升级
  • 【金融级AI质押架构设计指南】:基于FISCO BCOS+LangChain+TEE的三重可信验证体系(附压测QPS 12,800实测报告)
  • HR总监紧急通知:下季度起所有请假系统必须通过ISO/IEC 23894 AI治理认证,你准备好了吗?
  • 别再手动整理了!用WPS宏一键提取汉字拼音首字母,批量处理通讯录超省心
  • Agent“活”起来!企业级动态RAG的可靠记忆与知识进化之路
  • 如何在5分钟内为Windows 11 24H2 LTSC恢复微软应用商店:新手完整指南
  • Qt Quick Canvas 画布实战:手把手教你用QML打造一个可复用的汽车仪表盘组件
  • SuperPNG终极指南:如何用免费插件彻底优化Photoshop PNG导出
  • 从航拍到成图:一次讲透无人机测绘中比例尺、GSD与航高设计的完整工作流(以1:1000地形图为例)
  • Kimi K2.6 AI Agent实战解析:任务拆解、工具调用与自主反思
  • 【仅限Q3开放】AI融资整合能力成熟度测评(含17项技术适配指标+3类企业定制路径),测完即生成金融机构认可的接入资质预评估报告
  • AI Agent(智能体)应用工程师:年薪50W+的AI风口,零基础也能入行
  • 3大突破重构ESP32物联网开发:从零到精通的完整指南
  • 从峰会实践看科技女性职业发展:架构、策略与可持续影响
  • Moneta Markets亿汇:聚焦细节,看看风控思路的关键细节
  • 9V电池转±5V双电源:线性稳压器与电荷泵的工程实践
  • 电路设计入门:从核心概念到实战项目,掌握硬件开发基础
  • 3分钟掌握:告别网盘限速困扰的浏览器脚本终极指南
  • 面试官:agent的三层记忆系统是啥
  • 如何在Windows 10/11上畅玩经典IPX游戏:终极兼容解决方案指南
  • 2020年西安公交线路与站点GIS矢量数据(WGS84坐标,含完整属性)
  • Python+Pygame实现的植物大战僵尸风格塔防游戏源码,含完整资源与运行说明
  • 奥斯卡信封系统:高保密性活动流程设计的极致案例
  • Arduino Grove入门套件实战:从零掌握传感器编程与I2C通信
  • 终极窗口调整方案:3分钟掌握Windows窗口强制调整技巧
  • 如何搭建终极跨平台游戏串流服务器:Sunshine完整配置指南