[MAF预定义ChatClient中间件-07]PerServiceCallChatHistoryPersistingChatClient——基于ReAct循环的一步一存档
1. 由管道结构决定的对话历史存档方式
默认针对对话历史的存档是由ChatClientAgent管道的结构决定的。在下图所示的管道结构中,实现ReAct的FunctionInvokingChatClient中间件是IChatClient管道的一部分,用来持久化对话历史的ChatHistoryProvider位于IChatClient管道的上游,所以针对Agent的一次调用,ChatHistoryProvider针对现有消息的提取和针对新消息的存储各会执行一次。但是ReAct循环的迭代次数可能会很多,中间产生的消息也可能很多。如果每次迭代都等到ReAct循环结束才存档的话,意味着如果最后存档失败了,那么ReAct循环中产生的所有消息都会丢失。
我们可以通过一个演示程序来说明这一点。为了确定CodeHistoryProvider的两个方法被调用的时机,以及它们各自加载和保存的消息,我们定义了如下这个ChatHistoryProviderWrapper。顾名思义,ChatHistoryProviderWrapper是一个ChatHistoryProvider的包装类,它重写了InvokingCoreAsync和InvokedCoreAsync方法,在调用基类方法之前输出一些日志来显示当前加载和保存的消息列表。
class ChatHistoryProviderWrapper(ChatHistoryProvider chatHistoryProvider) : ChatHistoryProvider { private int _index = 1; protected override async ValueTask<IEnumerable<ChatMessage>> InvokingCoreAsync(InvokingContext context, CancellationToken cancellationToken = default) { Console.WriteLine($"{new string('-', 50)} Round {_index++} {new string('-', 50)}"); var messages = await chatHistoryProvider.InvokingAsync(context, cancellationToken); Console.WriteLine($"{messages.Count()} messages are loaded."); PrintMessages(messages); return messages; } protected override ValueTask InvokedCoreAsync(InvokedContext context, CancellationToken cancellationToken = default) { Console.WriteLine($"Received {context.RequestMessages.Count()} request messages to save."); PrintMessages(context.RequestMessages); Console.WriteLine($"Received {context.ResponseMessages?.Count()??0} response messages to save."); PrintMessages(context.ResponseMessages ?? Array.Empty<ChatMessage>()); return chatHistoryProvider.InvokedAsync(context, cancellationToken); } static void PrintMessages(IEnumerable<ChatMessage> messages) { var index = 1; foreach (var message in messages) { var content = message.Contents.FirstOrDefault(); var contentString = content switch { FunctionResultContent functionResultContent => functionResultContent.Result?.ToString() ?? string.Empty, FunctionCallContent functionCallContent => $"FunctionCall: {functionCallContent.Name}" , TextContent textContent => textContent.Text, _ => content?.ToString() ?? string.Empty }; Console.WriteLine($" Message {index++}: Role={message.Role}, Content={contentString}"); } Console.WriteLine(); } }然后我们编写了如下的演示程序。如代码片段所示,我们针对OpenAIClient创建了一个IChatClient对象,然后调用AsAIgent方法将其转换成一个ChatClientAgent对象。我们自定义的ChatHistoryProviderWrapper包装了一个InMemoryChatHistoryProvider对象,并在创建ChatClientAgent对象的时候通过ChatClientAgentOptions将其注册了进去。此ChatClientAgentOptions的ChatOptions上还注册了LookupLocationCode和GetWeatherForecast两个工具,前者根据用户提供的城市名称返回位置代码,后者根据位置代码返回天气预报信息。
using Azure; using dotenv.net; using Microsoft.Agents.AI; using Microsoft.Extensions.AI; using OpenAI; using System.ComponentModel; DotEnv.Load(); var model = Environment.GetEnvironmentVariable("MODEL")!; var apiKey = Environment.GetEnvironmentVariable("API_KEY")!; var endpoint = Environment.GetEnvironmentVariable("OPENAI_URL")!; ChatHistoryProvider chatHistoryProvider = new InMemoryChatHistoryProvider(); chatHistoryProvider = new ChatHistoryProviderWrapper(chatHistoryProvider); AITool[] tools = [ AIFunctionFactory.Create(LookupLocationCode,nameof(LookupLocationCode)), AIFunctionFactory.Create(GetWeatherForecast,nameof(GetWeatherForecast)) ]; var options = new ChatClientAgentOptions { ChatHistoryProvider = chatHistoryProvider, ChatOptions = new ChatOptions { Tools = tools } }; var agent = new OpenAIClient( credential: new AzureKeyCredential(apiKey), options: new OpenAIClientOptions { Endpoint = new Uri(endpoint) }) .GetChatClient(model:model) .AsIChatClient() .AsAIAgent(options); var session = await agent.CreateSessionAsync(); var response = await agent.RunAsync("根据今天苏州的天气给出穿衣建议。", session); Console.WriteLine(response); [Description("根据用户提供的城市名称,返回位置代码")] static string LookupLocationCode([Description("城市名称")] string city) => city switch { "苏州" => "SZ", "北京" => "BJ", _ => throw new ArgumentException($"Unknown city: {city}") }; [Description("根据位置代码,返回天气预报信息")] static string GetWeatherForecast([Description("位置代码")] string locationCode) { // Simulate looking up weather forecast based on location code return locationCode switch { "SZ" => "苏州今天是晴天,室外温度25度,西北风4级。", "BJ" => "北京今天是多云,室外温度22度,东风3级。", _ => throw new ArgumentException($"Unknown location code: {locationCode}") }; }我们调用AIAgent的RunAsync方法来执行一个任务,任务内容是根据今天苏州的天气给出穿衣建议。按照我们对ReAct的理解,这次调用会经历三轮ReAct迭代,除最后一次针对最终答案的输出外,前两次分别是对两个工具的调用。但是针对CodeHistoryProvider的InvokingAsync和InvokedAsync方法的调用只有一次,如下的输出体现了这一点。
-------------------------------------------------- Round 1 -------------------------------------------------- 1 messages are loaded. Message 1: Role=user, Content=根据今天苏州的天气给出穿衣建议。 Received 1 request messages to save. Message 1: Role=user, Content=根据今天苏州的天气给出穿衣建议。 Received 5 response messages to save. Message 1: Role=assistant, Content=FunctionCall: LookupLocationCode Message 2: Role=tool, Content=SZ Message 3: Role=assistant, Content=FunctionCall: GetWeatherForecast Message 4: Role=tool, Content=苏州今天是晴天,室外温度25度,西北风4级。 Message 5: Role=assistant, Content=苏州今天**晴天,25°C,西北风4级**,体感整体舒适偏暖,稍有风。 ### 👕 穿衣建议: - ✅ **上衣**:短袖、薄衬衫、POLO衫都很合适 - ✅ **下装**:长裤、薄款休闲裤或裙子 - ✅ **外搭**:可以带一件**薄外套/防风外套**(风力4级,早晚或骑车时会有点风感) - ✅ **鞋子**:运动鞋、休闲鞋都很合适 ### ☀️ 其他建议: - 紫外线在晴天会比较强,外出可适当**防晒**(防晒霜/帽子/墨镜) - 空气干燥时可多喝水 如果你是要通勤、出游或运动,我也可以帮你细化穿搭建议 😊 苏州今天**晴天,25°C,西北风4级**,体感整体舒适偏暖,稍有风。 ### 👕 穿衣建议: - ✅ **上衣**:短袖、薄衬衫、POLO衫都很合适 - ✅ **下装**:长裤、薄款休闲裤或裙子 - ✅ **外搭**:可以带一件**薄外套/防风外套**(风力4级,早晚或骑车时会有点风感) - ✅ **鞋子**:运动鞋、休闲鞋都很合适 ### ☀️ 其他建议: - 紫外线在晴天会比较强,外出可适当**防晒**(防晒霜/帽子/墨镜) - 空气干燥时可多喝水 如果你是要通勤、出游或运动,我也可以帮你细化穿搭建议 😊从输出可以看出,本次调用会生成6条消息(包含用户的原始请求),并且要求ChatHistoryProvider存档一次。如果在存档的那一刻出现了异常,所有的消息全部丢失。
2. 利用PerServiceCallChatHistoryPersistingChatClient实现每步一存
PerServiceCallChatHistoryPersistingChatClient旨在基于ReAct循环的多轮迭代中,实现超高频、每步一存的聊天历史持久化与状态同步,防止因为网络崩溃或单步失败导致整个复杂的工具调用链条数据丢失。这是是微软为了打造高可用、高容错、具备生产级弹性的企业级AI Agent而设计的底层机制。它通过在工具调用循环的每一次原子请求中安插持久化哨兵,实现了状态的实时落库与精准路由。
PerServiceCallChatHistoryPersistingChatClient无需采用常规的IChatClient中间件注册方式,而是通过ChatClientAgentOptions中的RequirePerServiceCallChatHistoryPersistence选项来启用的。对于我们前面演示的实例,我们只需要按照如下的方式创建ChatClientAgentOptions对象就可以将PerServiceCallChatHistoryPersistingChatClient中间件注册到ChatClientAgent管道中。
var options = new ChatClientAgentOptions { ChatHistoryProvider = chatHistoryProvider, ChatOptions = new ChatOptions { Tools = tools }, RequirePerServiceCallChatHistoryPersistence = true, };现在我们利用上面定义的ChatClientAgentOptions启用了PerServiceCallChatHistoryPersistingChatClient中间件。再次次运行前面的演示程序,我们就会发现完全不一样的输出:
-------------------------------------------------- Round 1 -------------------------------------------------- 1 messages are loaded. Message 1: Role=user, Content=根据今天苏州的天气给出穿衣建议。 Received 1 request messages to save. Message 1: Role=user, Content=根据今天苏州的天气给出穿衣建议。 Received 1 response messages to save. Message 1: Role=assistant, Content=FunctionCall: LookupLocationCode -------------------------------------------------- Round 2 -------------------------------------------------- 3 messages are loaded. Message 1: Role=user, Content=根据今天苏州的天气给出穿衣建议。 Message 2: Role=assistant, Content=FunctionCall: LookupLocationCode Message 3: Role=tool, Content=SZ Received 1 request messages to save. Message 1: Role=tool, Content=SZ Received 1 response messages to save. Message 1: Role=assistant, Content=FunctionCall: GetWeatherForecast -------------------------------------------------- Round 3 -------------------------------------------------- 5 messages are loaded. Message 1: Role=user, Content=根据今天苏州的天气给出穿衣建议。 Message 2: Role=assistant, Content=FunctionCall: LookupLocationCode Message 3: Role=tool, Content=SZ Message 4: Role=assistant, Content=FunctionCall: GetWeatherForecast Message 5: Role=tool, Content=苏州今天是晴天,室外温度25度,西北风4级。 Received 1 request messages to save. Message 1: Role=tool, Content=苏州今天是晴天,室外温度25度,西北风4级。 Received 1 response messages to save. Message 1: Role=assistant, Content=苏州今天**晴天,25°C,西北风4级**,体感总体比较舒适,略有风感。给你一些穿衣建议: ### 👕 白天穿搭 - ✅ **短袖 / 薄款衬衫 / T恤** 都很合适 - ✅ 下装可选 **牛仔裤、休闲裤、裙子** - ✅ 可搭配 **薄款外套 / 防风外套**(尤其在有风或早晚时段) ### 🌬 关于风 西北风4级风力较明显: - 长发建议扎起来 - 穿裙子的话可选稍有重量的款式 - 体感偏凉时加一件薄外套更舒适 ### 🌞 其他建议 - 紫外线较强,外出可考虑 **防晒霜、太阳镜** - 天气干爽,适合外出活动 整体来说是非常舒适的天气,不冷不热 😊 如果你需要早晚更具体的建议,也可以告诉我出行时间。 苏州今天**晴天,25°C,西北风4级**,体感总体比较舒适,略有风感。给你一些穿衣建议: ### 👕 白天穿搭 - ✅ **短袖 / 薄款衬衫 / T恤** 都很合适 - ✅ 下装可选 **牛仔裤、休闲裤、裙子** - ✅ 可搭配 **薄款外套 / 防风外套**(尤其在有风或早晚时段) ### 🌬 关于风 西北风4级风力较明显: - 长发建议扎起来 - 穿裙子的话可选稍有重量的款式 - 体感偏凉时加一件薄外套更舒适 ### 🌞 其他建议 - 紫外线较强,外出可考虑 **防晒霜、太阳镜** - 天气干爽,适合外出活动 整体来说是非常舒适的天气,不冷不热 😊 如果你需要早晚更具体的建议,也可以告诉我出行时间。输出表明CodeHistoryProvider的InvokingAsync和InvokedAsync方法被调用了三次,分别对应ReAct循环的三轮迭代。每轮迭代中,CodeHistoryProvider都加载了当前最新的消息列表,并且存档了本轮迭代新产生的消息。这样即使在某一轮迭代中出现了异常,之前迭代中存档的消息也不会丢失,从而保证了整个工具调用链条的数据安全和状态同步。
3. PerServiceCallChatHistoryPersistingChatClient
PerServiceCallChatHistoryPersistingChatClient是继承自DelegatingChatClient的一个内部类型。虽然它本质上是一个IChatClient中间件,但是它需要需要从AIAgent上下文中提取用来持久化对话历史的ChatHistoryProvider实例,所以只有它作为AIAgent的一部分参与Agent调用的前提下才能正常工作,这一点于AIContextProviderChatClient类似。
internal sealed class PerServiceCallChatHistoryPersistingChatClient : DelegatingChatClient { public PerServiceCallChatHistoryPersistingChatClient(IChatClient innerClient); public override async Task<ChatResponse> GetResponseAsync( IEnumerable<ChatMessage> messages, ChatOptions? options = null, CancellationToken cancellationToken = default); public override async IAsyncEnumerable<ChatResponseUpdate> GetStreamingResponseAsync( IEnumerable<ChatMessage> messages, ChatOptions? options = null, CancellationToken cancellationToken = default); }虽然系统也定义了如下这个UsePerServiceCallChatHistoryPersistence扩展方法来注册PerServiceCallChatHistoryPersistingChatClient中间件。如果我们调用了这个方法,但是ChatClientAgentOptions的RequirePerServiceCallChatHistoryPersistence选项没有被设置为true,那么这明显存在不一致。而且ChatClientAgentOptions的RequirePerServiceCallChatHistoryPersistence这个选项应用到整个管道的处理流程中,这个不一致性影响重大,我个人觉得将这个扩展方法暴露出来就是一个错误。我觉得微软会在后续的某个版本中将它删除掉,毕竟它的存在会导致用户在使用上的混乱和错误的发生。
public static class ChatClientBuilderExtensions { public static ChatClientBuilder UsePerServiceCallChatHistoryPersistence(this ChatClientBuilder builder);