From 59de12cd13284b89b059d8c8f03810d33653dc53 Mon Sep 17 00:00:00 2001 From: westey <164392973+westey-m@users.noreply.github.com> Date: Fri, 13 Mar 2026 17:24:30 +0000 Subject: [PATCH 1/6] Persist messages during the Function Call Loop --- dotnet/agent-framework-dotnet.slnx | 1 + dotnet/global.json | 2 +- ..._Step19_InFunctionLoopCheckpointing.csproj | 20 + .../Program.cs | 128 +++ .../README.md | 62 ++ dotnet/samples/02-agents/Agents/README.md | 1 + .../ChatClient/ChatClientAgent.cs | 145 ++-- .../ChatClient/ChatClientAgentOptions.cs | 31 + .../ChatClient/ChatClientAgentSession.cs | 16 + .../ChatClient/ChatClientExtensions.cs | 11 + .../ChatHistoryPersistingChatClient.cs | 195 +++++ .../SummarizationCompactionStrategy.cs | 2 + .../ChatHistoryPersistingChatClientTests.cs | 733 ++++++++++++++++++ 13 files changed, 1298 insertions(+), 49 deletions(-) create mode 100644 dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Agent_Step19_InFunctionLoopCheckpointing.csproj create mode 100644 dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs create mode 100644 dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/README.md create mode 100644 dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs create mode 100644 dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs diff --git a/dotnet/agent-framework-dotnet.slnx b/dotnet/agent-framework-dotnet.slnx index 04fbb6cd87..3cfafb9a49 100644 --- a/dotnet/agent-framework-dotnet.slnx +++ b/dotnet/agent-framework-dotnet.slnx @@ -57,6 +57,7 @@ + diff --git a/dotnet/global.json b/dotnet/global.json index 42bb8863a3..482aa6b8d3 100644 --- a/dotnet/global.json +++ b/dotnet/global.json @@ -1,6 +1,6 @@ { "sdk": { - "version": "10.0.200", + "version": "10.0.100", "rollForward": "minor", "allowPrerelease": false }, diff --git a/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Agent_Step19_InFunctionLoopCheckpointing.csproj b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Agent_Step19_InFunctionLoopCheckpointing.csproj new file mode 100644 index 0000000000..41aafe3437 --- /dev/null +++ b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Agent_Step19_InFunctionLoopCheckpointing.csproj @@ -0,0 +1,20 @@ + + + + Exe + net10.0 + + enable + enable + + + + + + + + + + + + diff --git a/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs new file mode 100644 index 0000000000..3a16c6a8e7 --- /dev/null +++ b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs @@ -0,0 +1,128 @@ +// Copyright (c) Microsoft. All rights reserved. + +// This sample demonstrates how the PersistChatHistoryAfterEachServiceCall option causes +// chat history to be persisted after each individual call to the AI service, rather than +// only at the end of the full agent run. When an agent uses tools, FunctionInvokingChatClient +// loops multiple times (service call → tool execution → service call), and by default the +// chat history is only persisted once the entire loop finishes. With this option enabled, +// intermediate messages (tool calls and results) are persisted after each service call, +// allowing you to inspect or recover them even if the process is interrupted mid-loop. +// +// The sample uses RunStreamingAsync so that we can observe the chat history growing +// after each service call within a single agent run. + +using System.ComponentModel; +using Azure.AI.OpenAI; +using Azure.Identity; +using Microsoft.Agents.AI; +using Microsoft.Extensions.AI; + +var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set."); +var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini"; + +// WARNING: DefaultAzureCredential is convenient for development but requires careful consideration in production. +// In production, consider using a specific credential (e.g., ManagedIdentityCredential) to avoid +// latency issues, unintended credential probing, and potential security risks from fallback mechanisms. +AzureOpenAIClient openAIClient = new(new Uri(endpoint), new DefaultAzureCredential()); +IChatClient chatClient = openAIClient.GetChatClient(deploymentName).AsIChatClient(); + +// Define multiple tools so the model makes several tool calls in a single run. +[Description("Get the current weather for a city.")] +static string GetWeather([Description("The city name.")] string city) => + city.ToUpperInvariant() switch + { + "SEATTLE" => "Seattle: 55°F, cloudy with light rain.", + "NEW YORK" => "New York: 72°F, sunny and warm.", + "LONDON" => "London: 48°F, overcast with fog.", + _ => $"{city}: weather data not available." + }; + +[Description("Get the current time in a city.")] +static string GetTime([Description("The city name.")] string city) => + city.ToUpperInvariant() switch + { + "SEATTLE" => "Seattle: 9:00 AM PST", + "NEW YORK" => "New York: 12:00 PM EST", + "LONDON" => "London: 5:00 PM GMT", + _ => $"{city}: time data not available." + }; + +// Create the agent with PersistChatHistoryAfterEachServiceCall enabled. +// The in-memory ChatHistoryProvider is used by default when no explicit provider is set, +// so we can inspect the chat history via session.TryGetInMemoryChatHistory(). +AIAgent agent = chatClient.AsAIAgent( + new ChatClientAgentOptions + { + Name = "WeatherAssistant", + ChatOptions = new() + { + Instructions = "You are a helpful assistant. When asked about multiple cities, call the appropriate tool for each city.", + Tools = [AIFunctionFactory.Create(GetWeather), AIFunctionFactory.Create(GetTime)] + }, + PersistChatHistoryAfterEachServiceCall = true, + }); + +AgentSession session = await agent.CreateSessionAsync(); + +// Ask about multiple cities — the model will need to call tools for each city, +// resulting in multiple service calls within a single agent run. +string prompt = "What's the weather and time in Seattle, New York, and London?"; + +Console.ForegroundColor = ConsoleColor.Cyan; +Console.Write("\n[User] "); +Console.ResetColor(); +Console.WriteLine(prompt); + +PrintChatHistory("Before run"); + +Console.ForegroundColor = ConsoleColor.Cyan; +Console.Write("\n[Agent] "); +Console.ResetColor(); + +// Use RunStreamingAsync to observe the response as it streams. +await foreach (var update in agent.RunStreamingAsync(prompt, session)) +{ + Console.Write(update); +} + +Console.WriteLine(); + +PrintChatHistory("After run"); + +// Run a second turn to show that chat history accumulated correctly. +string followUp = "Which city is the warmest?"; +Console.ForegroundColor = ConsoleColor.Cyan; +Console.Write("\n[User] "); +Console.ResetColor(); +Console.WriteLine(followUp); + +Console.ForegroundColor = ConsoleColor.Cyan; +Console.Write("\n[Agent] "); +Console.ResetColor(); + +await foreach (var update in agent.RunStreamingAsync(followUp, session)) +{ + Console.Write(update); +} + +Console.WriteLine(); + +PrintChatHistory("After second run"); + +// Helper to print the current chat history from the session. +void PrintChatHistory(string label) +{ + if (session.TryGetInMemoryChatHistory(out var history)) + { + Console.ForegroundColor = ConsoleColor.DarkGray; + Console.WriteLine($"\n [{label} — Chat history: {history.Count} message(s)]"); + foreach (var msg in history) + { + var preview = msg.Text?.Length > 80 ? msg.Text[..80] + "…" : msg.Text; + var contentTypes = string.Join(", ", msg.Contents.Select(c => c.GetType().Name)); + Console.WriteLine($" {msg.Role,-12} | {preview ?? $"[{contentTypes}]"}"); + } + + Console.ResetColor(); + } +} diff --git a/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/README.md b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/README.md new file mode 100644 index 0000000000..3c3e8a2c30 --- /dev/null +++ b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/README.md @@ -0,0 +1,62 @@ +# In-Function-Loop Checkpointing + +This sample demonstrates how the `PersistChatHistoryAfterEachServiceCall` option on `ChatClientAgentOptions` causes chat history to be saved after each individual call to the AI service, rather than only at the end of the full agent run. + +## What This Sample Shows + +When an agent uses tools, the `FunctionInvokingChatClient` loops multiple times (service call → tool execution → service call → …). By default, chat history is only persisted once the entire loop finishes. With `PersistChatHistoryAfterEachServiceCall` enabled: + +- A `ChatHistoryPersistingChatClient` decorator is automatically inserted into the chat client pipeline +- After each service call, the decorator notifies the `ChatHistoryProvider` (and any `AIContextProvider` instances) with the new messages +- Only **new** messages are sent to providers on each notification — messages that were already persisted in an earlier call within the same run are deduplicated automatically +- The end-of-run persistence in `ChatClientAgent` is skipped to avoid double-persisting + +This is useful for: +- **Crash recovery** — if the process is interrupted mid-loop, the intermediate tool calls and results are already persisted +- **Observability** — you can inspect the chat history while the agent is still running (e.g., during streaming) +- **Long-running tool loops** — agents with many sequential tool calls benefit from incremental persistence + +## How It Works + +The sample asks the agent about the weather and time in three cities. The model calls the `GetWeather` and `GetTime` tools for each city, resulting in multiple service calls within a single `RunStreamingAsync` invocation. After the run completes, the sample prints the full chat history to show all the intermediate messages that were persisted along the way. + +### Pipeline Architecture + +``` +ChatClientAgent + └─ FunctionInvokingChatClient (handles tool call loop) + └─ ChatHistoryPersistingChatClient (persists after each service call) + └─ Leaf IChatClient (Azure OpenAI) +``` + +## Prerequisites + +- .NET 10 SDK or later +- Azure OpenAI service endpoint and model deployment +- Azure CLI installed and authenticated + +**Note**: This sample uses `DefaultAzureCredential`. Sign in with `az login` before running. For production, prefer a specific credential such as `ManagedIdentityCredential`. For more information, see the [Azure CLI authentication documentation](https://learn.microsoft.com/cli/azure/authenticate-azure-cli-interactively). + +## Environment Variables + +```powershell +$env:AZURE_OPENAI_ENDPOINT="https://your-resource.openai.azure.com/" # Required +$env:AZURE_OPENAI_DEPLOYMENT_NAME="gpt-4o-mini" # Optional, defaults to gpt-4o-mini +``` + +## Running the Sample + +```powershell +cd dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing +dotnet run +``` + +## Expected Behavior + +The sample runs two conversation turns: + +1. **First turn** — asks about weather and time in three cities. The model calls `GetWeather` and `GetTime` tools (potentially in parallel or sequentially), then provides a summary. The chat history dump after the run shows all the intermediate tool call and result messages. + +2. **Second turn** — asks a follow-up question ("Which city is the warmest?") that uses the persisted conversation context. The chat history dump shows the full accumulated conversation. + +The chat history printout uses `session.TryGetInMemoryChatHistory()` to inspect the in-memory storage. diff --git a/dotnet/samples/02-agents/Agents/README.md b/dotnet/samples/02-agents/Agents/README.md index 4ac53ba246..c5258ba9f4 100644 --- a/dotnet/samples/02-agents/Agents/README.md +++ b/dotnet/samples/02-agents/Agents/README.md @@ -45,6 +45,7 @@ Before you begin, ensure you have the following prerequisites: |[Declarative agent](./Agent_Step16_Declarative/)|This sample demonstrates how to declaratively define an agent.| |[Providing additional AI Context to an agent using multiple AIContextProviders](./Agent_Step17_AdditionalAIContext/)|This sample demonstrates how to inject additional AI context into a ChatClientAgent using multiple custom AIContextProvider components that are attached to the agent.| |[Using compaction pipeline with an agent](./Agent_Step18_CompactionPipeline/)|This sample demonstrates how to use a compaction pipeline to efficiently limit the size of the conversation history for an agent.| +|[In-function-loop checkpointing](./Agent_Step19_InFunctionLoopCheckpointing/)|This sample demonstrates how to persist chat history after each service call during a tool-calling loop, enabling crash recovery and mid-run observability.| ## Running the samples from the console diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs index adb6eb9f83..bba8c474ef 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs @@ -212,13 +212,18 @@ protected override async Task RunCoreAsync( await this.PrepareSessionAndMessagesAsync(session, inputMessages, options, cancellationToken).ConfigureAwait(false); var chatClient = this.ChatClient; - chatClient = ApplyRunOptionsTransformations(options, chatClient); var loggingAgentName = this.GetLoggingAgentName(); - this._logger.LogAgentChatClientInvokingAgent(nameof(RunAsync), this.Id, loggingAgentName, this._chatClientType); + // Initialize the per-service-call message tracking if the decorator is being used. + if (this.PersistsChatHistoryPerServiceCall) + { + safeSession.NotifiedMessages ??= new(); + safeSession.NotifiedMessages.Clear(); + } + // Call the IChatClient and notify the AIContextProvider of any failures. ChatResponse chatResponse; try @@ -227,10 +232,14 @@ protected override async Task RunCoreAsync( } catch (Exception ex) { - await this.NotifyChatHistoryProviderOfFailureAsync(safeSession, ex, inputMessagesForChatClient, chatOptions, cancellationToken).ConfigureAwait(false); - await this.NotifyAIContextProviderOfFailureAsync(safeSession, ex, inputMessagesForChatClient, cancellationToken).ConfigureAwait(false); + await this.NotifyProvidersOfFailureAtEndOfRunAsync(safeSession, ex, inputMessagesForChatClient, chatOptions, cancellationToken).ConfigureAwait(false); throw; } + finally + { + // Clear the per-service-call message tracking now that the run is complete (or failed). + safeSession.NotifiedMessages?.Clear(); + } this._logger.LogAgentChatClientInvokedAgent(nameof(RunAsync), this.Id, loggingAgentName, this._chatClientType, inputMessages.Count); @@ -244,11 +253,8 @@ protected override async Task RunCoreAsync( chatResponseMessage.AuthorName ??= this.Name; } - // Only notify the session of new messages if the chatResponse was successful to avoid inconsistent message state in the session. - await this.NotifyChatHistoryProviderOfNewMessagesAsync(safeSession, inputMessagesForChatClient, chatResponse.Messages, chatOptions, cancellationToken).ConfigureAwait(false); - - // Notify the AIContextProvider of all new messages. - await this.NotifyAIContextProviderOfSuccessAsync(safeSession, inputMessagesForChatClient, chatResponse.Messages, cancellationToken).ConfigureAwait(false); + // Notify providers of all new messages unless persistence is handled per-service-call by the decorator. + await this.NotifyProvidersOfNewMessagesAtEndOfRunAsync(safeSession, inputMessagesForChatClient, chatResponse.Messages, chatOptions, cancellationToken).ConfigureAwait(false); return new AgentResponse(chatResponse) { @@ -304,6 +310,13 @@ protected override async IAsyncEnumerable RunCoreStreamingA this._logger.LogAgentChatClientInvokingAgent(nameof(RunStreamingAsync), this.Id, loggingAgentName, this._chatClientType); + // Initialize the per-service-call message tracking if the decorator is being used. + if (this.PersistsChatHistoryPerServiceCall) + { + safeSession.NotifiedMessages ??= new(); + safeSession.NotifiedMessages.Clear(); + } + List responseUpdates = GetResponseUpdates(continuationToken); IAsyncEnumerator responseUpdatesEnumerator; @@ -315,8 +328,8 @@ protected override async IAsyncEnumerable RunCoreStreamingA } catch (Exception ex) { - await this.NotifyChatHistoryProviderOfFailureAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false); - await this.NotifyAIContextProviderOfFailureAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), cancellationToken).ConfigureAwait(false); + safeSession.NotifiedMessages?.Clear(); + await this.NotifyProvidersOfFailureAtEndOfRunAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false); throw; } @@ -330,8 +343,8 @@ protected override async IAsyncEnumerable RunCoreStreamingA } catch (Exception ex) { - await this.NotifyChatHistoryProviderOfFailureAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false); - await this.NotifyAIContextProviderOfFailureAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), cancellationToken).ConfigureAwait(false); + safeSession.NotifiedMessages?.Clear(); + await this.NotifyProvidersOfFailureAtEndOfRunAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false); throw; } @@ -357,8 +370,8 @@ protected override async IAsyncEnumerable RunCoreStreamingA } catch (Exception ex) { - await this.NotifyChatHistoryProviderOfFailureAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false); - await this.NotifyAIContextProviderOfFailureAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), cancellationToken).ConfigureAwait(false); + safeSession.NotifiedMessages?.Clear(); + await this.NotifyProvidersOfFailureAtEndOfRunAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false); throw; } } @@ -369,11 +382,11 @@ protected override async IAsyncEnumerable RunCoreStreamingA // so let's update it and set the conversation id for the service session case. this.UpdateSessionConversationId(safeSession, chatResponse.ConversationId, cancellationToken); - // To avoid inconsistent state we only notify the session of the input messages if no error occurs after the initial request. - await this.NotifyChatHistoryProviderOfNewMessagesAsync(safeSession, GetInputMessages(inputMessagesForChatClient, continuationToken), chatResponse.Messages, chatOptions, cancellationToken).ConfigureAwait(false); + // Notify providers of all new messages unless persistence is handled per-service-call by the decorator. + await this.NotifyProvidersOfNewMessagesAtEndOfRunAsync(safeSession, GetInputMessages(inputMessagesForChatClient, continuationToken), chatResponse.Messages, chatOptions, cancellationToken).ConfigureAwait(false); - // Notify the AIContextProvider of all new messages. - await this.NotifyAIContextProviderOfSuccessAsync(safeSession, GetInputMessages(inputMessagesForChatClient, continuationToken), chatResponse.Messages, cancellationToken).ConfigureAwait(false); + // Clear the per-service-call message tracking now that the run is complete. + safeSession.NotifiedMessages?.Clear(); } /// @@ -441,17 +454,29 @@ protected override ValueTask DeserializeSessionCoreAsync(JsonEleme #region Private /// - /// Notify the when an agent run succeeded, if there is an . + /// Notifies the and all of successfully completed messages. /// - private async Task NotifyAIContextProviderOfSuccessAsync( + /// + /// This method is also called by to persist messages per-service-call. + /// + internal async Task NotifyProvidersOfNewMessagesAsync( ChatClientAgentSession session, - IEnumerable inputMessages, + IEnumerable requestMessages, IEnumerable responseMessages, + ChatOptions? chatOptions, CancellationToken cancellationToken) { + ChatHistoryProvider? chatHistoryProvider = this.ResolveChatHistoryProvider(chatOptions, session); + + if (chatHistoryProvider is not null) + { + var invokedContext = new ChatHistoryProvider.InvokedContext(this, session, requestMessages, responseMessages); + await chatHistoryProvider.InvokedAsync(invokedContext, cancellationToken).ConfigureAwait(false); + } + if (this.AIContextProviders is { Count: > 0 } contextProviders) { - AIContextProvider.InvokedContext invokedContext = new(this, session, inputMessages, responseMessages); + AIContextProvider.InvokedContext invokedContext = new(this, session, requestMessages, responseMessages); foreach (var contextProvider in contextProviders) { @@ -461,17 +486,29 @@ private async Task NotifyAIContextProviderOfSuccessAsync( } /// - /// Notify the of any failure during an agent run, if there is an . + /// Notifies the and all of a failure during a service call. /// - private async Task NotifyAIContextProviderOfFailureAsync( + /// + /// This method is also called by to report failures per-service-call. + /// + internal async Task NotifyProvidersOfFailureAsync( ChatClientAgentSession session, Exception ex, - IEnumerable inputMessages, + IEnumerable requestMessages, + ChatOptions? chatOptions, CancellationToken cancellationToken) { + ChatHistoryProvider? chatHistoryProvider = this.ResolveChatHistoryProvider(chatOptions, session); + + if (chatHistoryProvider is not null) + { + var invokedContext = new ChatHistoryProvider.InvokedContext(this, session, requestMessages, ex); + await chatHistoryProvider.InvokedAsync(invokedContext, cancellationToken).ConfigureAwait(false); + } + if (this.AIContextProviders is { Count: > 0 } contextProviders) { - AIContextProvider.InvokedContext invokedContext = new(this, session, inputMessages, ex); + AIContextProvider.InvokedContext invokedContext = new(this, session, requestMessages, ex); foreach (var contextProvider in contextProviders) { @@ -798,47 +835,59 @@ private void UpdateSessionConversationId(ChatClientAgentSession session, string? } } - private Task NotifyChatHistoryProviderOfFailureAsync( + /// + /// Notifies providers of successfully completed messages at the end of an agent run. + /// + /// + /// When is , the + /// decorator handles per-service-call notification, + /// so this end-of-run notification is skipped. + /// + private Task NotifyProvidersOfNewMessagesAtEndOfRunAsync( ChatClientAgentSession session, - Exception ex, IEnumerable requestMessages, + IEnumerable responseMessages, ChatOptions? chatOptions, CancellationToken cancellationToken) { - ChatHistoryProvider? provider = this.ResolveChatHistoryProvider(chatOptions, session); - - // Only notify the provider if we have one. - // If we don't have one, it means that the chat history is service managed and the underlying service is responsible for storing messages. - if (provider is not null) + if (this.PersistsChatHistoryPerServiceCall) { - var invokedContext = new ChatHistoryProvider.InvokedContext(this, session, requestMessages, ex); - - return provider.InvokedAsync(invokedContext, cancellationToken).AsTask(); + return Task.CompletedTask; } - return Task.CompletedTask; + return this.NotifyProvidersOfNewMessagesAsync(session, requestMessages, responseMessages, chatOptions, cancellationToken); } - private Task NotifyChatHistoryProviderOfNewMessagesAsync( + /// + /// Notifies providers of a failure at the end of an agent run. + /// + /// + /// When is , the + /// decorator handles per-service-call notification, + /// so this end-of-run notification is skipped. + /// + private Task NotifyProvidersOfFailureAtEndOfRunAsync( ChatClientAgentSession session, + Exception ex, IEnumerable requestMessages, - IEnumerable responseMessages, ChatOptions? chatOptions, CancellationToken cancellationToken) { - ChatHistoryProvider? provider = this.ResolveChatHistoryProvider(chatOptions, session); - - // Only notify the provider if we have one. - // If we don't have one, it means that the chat history is service managed and the underlying service is responsible for storing messages. - if (provider is not null) + if (this.PersistsChatHistoryPerServiceCall) { - var invokedContext = new ChatHistoryProvider.InvokedContext(this, session, requestMessages, responseMessages); - return provider.InvokedAsync(invokedContext, cancellationToken).AsTask(); + return Task.CompletedTask; } - return Task.CompletedTask; + return this.NotifyProvidersOfFailureAsync(session, ex, requestMessages, chatOptions, cancellationToken); } + /// + /// Gets a value indicating whether the agent is configured to persist chat history after each individual service call + /// via a decorator. + /// + private bool PersistsChatHistoryPerServiceCall => + this._agentOptions?.PersistChatHistoryAfterEachServiceCall is true && this._agentOptions?.UseProvidedChatClientAsIs is not true; + private ChatHistoryProvider? ResolveChatHistoryProvider(ChatOptions? chatOptions, ChatClientAgentSession session) { ChatHistoryProvider? provider = session.ConversationId is null ? this.ChatHistoryProvider : null; diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs index 38cad40bbe..b9c23e62ad 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs @@ -1,7 +1,9 @@ // Copyright (c) Microsoft. All rights reserved. using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using Microsoft.Extensions.AI; +using Microsoft.Shared.DiagnosticIds; namespace Microsoft.Agents.AI; @@ -89,6 +91,34 @@ public sealed class ChatClientAgentOptions /// public bool ThrowOnChatHistoryProviderConflict { get; set; } = true; + /// + /// Gets or sets a value indicating whether to persist chat history after each individual service call + /// rather than only at the end of the full agent run. + /// + /// + /// + /// By default, persists request and response messages via the + /// only after the full run completes, which may include multiple + /// iterations of the function invocation loop. Setting this property to causes + /// messages to be persisted after each individual call to the underlying AI service, so that intermediate + /// messages (e.g., tool calls and results) are saved even if the process is interrupted mid-loop. + /// + /// + /// When this option is enabled, a decorator is automatically + /// inserted into the chat client pipeline between the and the + /// leaf , and the will not perform its own + /// end-of-run chat history persistence to avoid double-persisting messages. + /// + /// + /// This option has no effect when is . + /// + /// + /// + /// Default is . + /// + [Experimental(DiagnosticIds.Experiments.AgentsAIExperiments)] + public bool PersistChatHistoryAfterEachServiceCall { get; set; } + /// /// Creates a new instance of with the same values as this instance. /// @@ -105,5 +135,6 @@ public ChatClientAgentOptions Clone() ClearOnChatHistoryProviderConflict = this.ClearOnChatHistoryProviderConflict, WarnOnChatHistoryProviderConflict = this.WarnOnChatHistoryProviderConflict, ThrowOnChatHistoryProviderConflict = this.ThrowOnChatHistoryProviderConflict, + PersistChatHistoryAfterEachServiceCall = this.PersistChatHistoryAfterEachServiceCall, }; } diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentSession.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentSession.cs index 400bfbcaf6..f7093472e9 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentSession.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentSession.cs @@ -1,9 +1,11 @@ // Copyright (c) Microsoft. All rights reserved. using System; +using System.Collections.Generic; using System.Diagnostics; using System.Text.Json; using System.Text.Json.Serialization; +using Microsoft.Extensions.AI; using Microsoft.Shared.Diagnostics; namespace Microsoft.Agents.AI; @@ -88,4 +90,18 @@ internal JsonElement Serialize(JsonSerializerOptions? jsonSerializerOptions = nu private string DebuggerDisplay => this.ConversationId is { } conversationId ? $"ConversationId = {conversationId}, StateBag Count = {this.StateBag.Count}" : $"StateBag Count = {this.StateBag.Count}"; + + /// + /// Gets or sets the set of instances that have already been notified to providers + /// during the current agent run. Used by to avoid duplicate + /// notifications when loops cause the same messages to be passed + /// across multiple service calls. + /// + /// + /// This set is cleared at the start and end of each run. It uses reference equality + /// to track message identity since reuses the same message objects + /// across loop iterations. + /// + [JsonIgnore] + internal HashSet? NotifiedMessages { get; set; } } diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientExtensions.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientExtensions.cs index 8290c39974..03c283f74e 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientExtensions.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientExtensions.cs @@ -63,6 +63,17 @@ internal static IChatClient WithDefaultAgentMiddleware(this IChatClient chatClie }); } + // ChatHistoryPersistingChatClient is registered after FunctionInvokingChatClient so that it sits + // between FIC and the leaf client. ChatClientBuilder.Build applies factories in reverse order, + // making the first Use() call outermost. By adding our decorator second, the resulting pipeline is: + // FunctionInvokingChatClient → ChatHistoryPersistingChatClient → leaf IChatClient + // This allows the decorator to persist messages after each individual service call within + // FIC's function invocation loop. + if (options?.PersistChatHistoryAfterEachServiceCall is true) + { + chatBuilder.Use(innerClient => new ChatHistoryPersistingChatClient(innerClient)); + } + var agentChatClient = chatBuilder.Build(services); if (options?.ChatOptions?.Tools is { Count: > 0 }) diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs new file mode 100644 index 0000000000..0b4d02c95f --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs @@ -0,0 +1,195 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.AI; + +namespace Microsoft.Agents.AI; + +/// +/// A delegating chat client that notifies and +/// instances of request and response messages after each individual call to the inner chat client. +/// +/// +/// +/// This decorator is intended to operate between the and the leaf +/// in a pipeline. It ensures that providers are notified +/// after each service call rather than only at the end of the full agent run, so that intermediate messages +/// (e.g., tool calls and results) are saved even if the process is interrupted mid-loop. +/// +/// +/// This chat client must be used within the context of a running . It retrieves the +/// current agent and session from , which is set automatically when an agent's +/// or +/// +/// method is called. An is thrown if no run context is available or if the +/// agent is not a . +/// +/// +internal sealed class ChatHistoryPersistingChatClient : DelegatingChatClient +{ + /// + /// Initializes a new instance of the class. + /// + /// The underlying chat client that will handle the core operations. + public ChatHistoryPersistingChatClient(IChatClient innerClient) + : base(innerClient) + { + } + + /// + public override async Task GetResponseAsync( + IEnumerable messages, + ChatOptions? options = null, + CancellationToken cancellationToken = default) + { + var (agent, session) = GetRequiredAgentAndSession(); + + ChatResponse response; + try + { + response = await base.GetResponseAsync(messages, options, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + var newRequestMessagesOnFailure = GetNewMessages(messages, session); + MarkAsNotified(newRequestMessagesOnFailure, session); + await agent.NotifyProvidersOfFailureAsync(session, ex, newRequestMessagesOnFailure, options, cancellationToken).ConfigureAwait(false); + throw; + } + + var newRequestMessages = GetNewMessages(messages, session); + MarkAsNotified(newRequestMessages, session); + MarkAsNotified(response.Messages, session); + await agent.NotifyProvidersOfNewMessagesAsync(session, newRequestMessages, response.Messages, options, cancellationToken).ConfigureAwait(false); + + return response; + } + + /// + public override async IAsyncEnumerable GetStreamingResponseAsync( + IEnumerable messages, + ChatOptions? options = null, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var (agent, session) = GetRequiredAgentAndSession(); + + List responseUpdates = []; + + IAsyncEnumerator enumerator; + try + { + enumerator = base.GetStreamingResponseAsync(messages, options, cancellationToken).GetAsyncEnumerator(cancellationToken); + } + catch (Exception ex) + { + var newRequestMessagesOnFailure = GetNewMessages(messages, session); + MarkAsNotified(newRequestMessagesOnFailure, session); + await agent.NotifyProvidersOfFailureAsync(session, ex, newRequestMessagesOnFailure, options, cancellationToken).ConfigureAwait(false); + throw; + } + + bool hasUpdates; + try + { + hasUpdates = await enumerator.MoveNextAsync().ConfigureAwait(false); + } + catch (Exception ex) + { + var newRequestMessagesOnFailure = GetNewMessages(messages, session); + MarkAsNotified(newRequestMessagesOnFailure, session); + await agent.NotifyProvidersOfFailureAsync(session, ex, newRequestMessagesOnFailure, options, cancellationToken).ConfigureAwait(false); + throw; + } + + while (hasUpdates) + { + var update = enumerator.Current; + responseUpdates.Add(update); + yield return update; + + try + { + hasUpdates = await enumerator.MoveNextAsync().ConfigureAwait(false); + } + catch (Exception ex) + { + var newRequestMessagesOnFailure = GetNewMessages(messages, session); + MarkAsNotified(newRequestMessagesOnFailure, session); + await agent.NotifyProvidersOfFailureAsync(session, ex, newRequestMessagesOnFailure, options, cancellationToken).ConfigureAwait(false); + throw; + } + } + + var chatResponse = responseUpdates.ToChatResponse(); + var newRequestMessages = GetNewMessages(messages, session); + MarkAsNotified(newRequestMessages, session); + MarkAsNotified(chatResponse.Messages, session); + await agent.NotifyProvidersOfNewMessagesAsync(session, newRequestMessages, chatResponse.Messages, options, cancellationToken).ConfigureAwait(false); + } + + /// + /// Gets the current and from the run context. + /// + private static (ChatClientAgent Agent, ChatClientAgentSession Session) GetRequiredAgentAndSession() + { + var runContext = AIAgent.CurrentRunContext + ?? throw new InvalidOperationException( + $"{nameof(ChatHistoryPersistingChatClient)} can only be used within the context of a running AIAgent. " + + "Ensure that the chat client is being invoked as part of an AIAgent.RunAsync or AIAgent.RunStreamingAsync call."); + + if (runContext.Agent is not ChatClientAgent chatClientAgent) + { + throw new InvalidOperationException( + $"{nameof(ChatHistoryPersistingChatClient)} can only be used with a {nameof(ChatClientAgent)}. " + + $"The current agent is of type '{runContext.Agent.GetType().Name}'."); + } + + if (runContext.Session is not ChatClientAgentSession chatClientAgentSession) + { + throw new InvalidOperationException( + $"{nameof(ChatHistoryPersistingChatClient)} requires a {nameof(ChatClientAgentSession)}. " + + $"The current session is of type '{runContext.Session?.GetType().Name ?? "null"}'."); + } + + return (chatClientAgent, chatClientAgentSession); + } + + /// + /// Filters the given messages to return only those that have not yet been notified to providers + /// during the current agent run. + /// + /// The full set of messages to filter. + /// The current session containing the set of already-notified messages. + /// A list of messages that have not yet been notified. If no tracking is available, all messages are returned. + private static IReadOnlyList GetNewMessages(IEnumerable messages, ChatClientAgentSession session) + { + HashSet? notifiedMessages = session.NotifiedMessages; + if (notifiedMessages is null or { Count: 0 }) + { + return messages as IReadOnlyList ?? messages.ToList(); + } + + return messages.Where(m => !notifiedMessages.Contains(m)).ToList(); + } + + /// + /// Marks the given messages as notified so they will be excluded from future notifications in the current run. + /// + /// The messages to mark as notified. + /// The current session containing the set of already-notified messages. + private static void MarkAsNotified(IEnumerable messages, ChatClientAgentSession session) + { + if (session.NotifiedMessages is { } notifiedMessages) + { + foreach (var message in messages) + { + notifiedMessages.Add(message); + } + } + } +} diff --git a/dotnet/src/Microsoft.Agents.AI/Compaction/SummarizationCompactionStrategy.cs b/dotnet/src/Microsoft.Agents.AI/Compaction/SummarizationCompactionStrategy.cs index 9ff7ecf405..1f7f486e23 100644 --- a/dotnet/src/Microsoft.Agents.AI/Compaction/SummarizationCompactionStrategy.cs +++ b/dotnet/src/Microsoft.Agents.AI/Compaction/SummarizationCompactionStrategy.cs @@ -11,6 +11,8 @@ using Microsoft.Shared.DiagnosticIds; using Microsoft.Shared.Diagnostics; +#pragma warning disable CA1873 + namespace Microsoft.Agents.AI.Compaction; /// diff --git a/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs b/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs new file mode 100644 index 0000000000..54bd42910e --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs @@ -0,0 +1,733 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.AI; +using Microsoft.Extensions.DependencyInjection; +using Moq; +using Moq.Protected; + +namespace Microsoft.Agents.AI.UnitTests; + +/// +/// Contains unit tests for the decorator, +/// verifying that it persists messages via the after each +/// individual service call when the +/// option is enabled. +/// +public class ChatHistoryPersistingChatClientTests +{ + /// + /// Verifies that when PersistChatHistoryAfterEachServiceCall is enabled, + /// the ChatHistoryProvider receives messages after a successful non-streaming call. + /// + [Fact] + public async Task RunAsync_PersistsMessagesPerServiceCall_WhenOptionEnabledAsync() + { + // Arrange + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())).ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); + + Mock mockChatHistoryProvider = new((object?)null, (object?)null, (object?)null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAfterEachServiceCall = true, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await agent.RunAsync([new(ChatRole.User, "test")], session); + + // Assert — InvokedCoreAsync should be called by the decorator (per service call) + mockChatHistoryProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.RequestMessages.Any(m => m.Text == "test") && + x.ResponseMessages!.Any(m => m.Text == "response")), + ItExpr.IsAny()); + } + + /// + /// Verifies that when PersistChatHistoryAfterEachServiceCall is disabled (default), + /// the ChatHistoryProvider still receives messages at end-of-run as before. + /// + [Fact] + public async Task RunAsync_PersistsMessagesAtEndOfRun_WhenOptionDisabledAsync() + { + // Arrange + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())).ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); + + Mock mockChatHistoryProvider = new((object?)null, (object?)null, (object?)null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAfterEachServiceCall = false, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await agent.RunAsync([new(ChatRole.User, "test")], session); + + // Assert — InvokedCoreAsync should be called once by the agent (end of run) + mockChatHistoryProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.RequestMessages.Any(m => m.Text == "test") && + x.ResponseMessages!.Any(m => m.Text == "response")), + ItExpr.IsAny()); + } + + /// + /// Verifies that when PersistChatHistoryAfterEachServiceCall is enabled and the service call fails, + /// the ChatHistoryProvider is notified with the exception. + /// + [Fact] + public async Task RunAsync_NotifiesProviderOfFailure_WhenOptionEnabledAndServiceFailsAsync() + { + // Arrange + var expectedException = new InvalidOperationException("Service failed"); + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())).ThrowsAsync(expectedException); + + Mock mockChatHistoryProvider = new((object?)null, (object?)null, (object?)null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAfterEachServiceCall = true, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await Assert.ThrowsAsync(() => agent.RunAsync([new(ChatRole.User, "test")], session)); + + // Assert — the decorator should have notified the provider of the failure + mockChatHistoryProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.InvokeException != null && + x.InvokeException.Message == "Service failed"), + ItExpr.IsAny()); + } + + /// + /// Verifies that the decorator is injected into the pipeline when the option is set + /// and can be discovered via GetService. + /// + [Fact] + public void ChatClient_ContainsDecorator_WhenOptionEnabled() + { + // Arrange + Mock mockService = new(); + + // Act + ChatClientAgent agent = new(mockService.Object, options: new() + { + PersistChatHistoryAfterEachServiceCall = true, + }); + + // Assert + var decorator = agent.ChatClient.GetService(); + Assert.NotNull(decorator); + } + + /// + /// Verifies that the decorator is NOT injected into the pipeline when the option is not set. + /// + [Fact] + public void ChatClient_DoesNotContainDecorator_WhenOptionDisabled() + { + // Arrange + Mock mockService = new(); + + // Act + ChatClientAgent agent = new(mockService.Object, options: new() + { + PersistChatHistoryAfterEachServiceCall = false, + }); + + // Assert + var decorator = agent.ChatClient.GetService(); + Assert.Null(decorator); + } + + /// + /// Verifies that the decorator is NOT injected when UseProvidedChatClientAsIs is true, + /// even if PersistChatHistoryAfterEachServiceCall is also true. + /// + [Fact] + public void ChatClient_DoesNotContainDecorator_WhenUseProvidedChatClientAsIs() + { + // Arrange + Mock mockService = new(); + + // Act + ChatClientAgent agent = new(mockService.Object, options: new() + { + PersistChatHistoryAfterEachServiceCall = true, + UseProvidedChatClientAsIs = true, + }); + + // Assert + var decorator = agent.ChatClient.GetService(); + Assert.Null(decorator); + } + + /// + /// Verifies that the PersistChatHistoryAfterEachServiceCall option is included in Clone(). + /// + [Fact] + public void ChatClientAgentOptions_Clone_IncludesPersistChatHistoryAfterEachServiceCall() + { + // Arrange + var options = new ChatClientAgentOptions + { + PersistChatHistoryAfterEachServiceCall = true, + }; + + // Act + var cloned = options.Clone(); + + // Assert + Assert.True(cloned.PersistChatHistoryAfterEachServiceCall); + } + + /// + /// Verifies that when PersistChatHistoryAfterEachServiceCall is enabled and the service call + /// involves a function invocation loop, the ChatHistoryProvider is called after each individual + /// service call (not just once at the end). + /// + [Fact] + public async Task RunAsync_PersistsPerServiceCall_DuringFunctionInvocationLoopAsync() + { + // Arrange + int serviceCallCount = 0; + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns(() => + { + serviceCallCount++; + if (serviceCallCount == 1) + { + // First call returns a tool call + return Task.FromResult(new ChatResponse([new(ChatRole.Assistant, [new FunctionCallContent("call1", "myTool", new Dictionary())])])); + } + + // Second call returns a final response + return Task.FromResult(new ChatResponse([new(ChatRole.Assistant, "final response")])); + }); + + var invokedContexts = new List(); + + Mock mockChatHistoryProvider = new((object?)null, (object?)null, (object?)null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Callback((ChatHistoryProvider.InvokedContext ctx, CancellationToken _) => invokedContexts.Add(ctx)) + .Returns(() => new ValueTask()); + + // Define a simple tool + var tool = AIFunctionFactory.Create(() => "tool result", "myTool", "A test tool"); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatOptions = new() { Tools = [tool] }, + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAfterEachServiceCall = true, + }, services: new ServiceCollection().BuildServiceProvider()); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + Exception? caughtException = null; + try + { + await agent.RunAsync([new(ChatRole.User, "test")], session); + } + catch (Exception ex) + { + caughtException = ex; + } + + // Diagnostic: check if there was an unexpected exception + Assert.Null(caughtException); + + // Assert — the decorator should have been called twice (once per service call in the function invocation loop) + Assert.Equal(2, serviceCallCount); + Assert.Equal(2, invokedContexts.Count); + + // First invocation should have the user message as request and tool call response + Assert.NotNull(invokedContexts[0].ResponseMessages); + var firstRequestMessages = invokedContexts[0].RequestMessages.ToList(); + Assert.Contains(firstRequestMessages, m => m.Text == "test"); + Assert.Contains(invokedContexts[0].ResponseMessages!, m => m.Contents.OfType().Any()); + + // Second invocation: request messages should NOT include the original user message (already notified). + // It should only include messages added since the first call (assistant tool call + tool result). + Assert.NotNull(invokedContexts[1].ResponseMessages); + var secondRequestMessages = invokedContexts[1].RequestMessages.ToList(); + Assert.DoesNotContain(secondRequestMessages, m => m.Text == "test"); + Assert.Contains(invokedContexts[1].ResponseMessages!, m => m.Text == "final response"); + } + + /// + /// Verifies that when PersistChatHistoryAfterEachServiceCall is enabled with streaming, + /// the ChatHistoryProvider receives messages after the stream completes. + /// + [Fact] + public async Task RunStreamingAsync_PersistsMessagesPerServiceCall_WhenOptionEnabledAsync() + { + // Arrange + Mock mockService = new(); + mockService.Setup( + s => s.GetStreamingResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns(CreateAsyncEnumerableAsync( + new ChatResponseUpdate(ChatRole.Assistant, "streaming "), + new ChatResponseUpdate(ChatRole.Assistant, "response"))); + + Mock mockChatHistoryProvider = new((object?)null, (object?)null, (object?)null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAfterEachServiceCall = true, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await foreach (var _ in agent.RunStreamingAsync([new(ChatRole.User, "test")], session)) + { + // Consume stream + } + + // Assert — InvokedCoreAsync should be called by the decorator + mockChatHistoryProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.RequestMessages.Any(m => m.Text == "test") && + x.ResponseMessages != null), + ItExpr.IsAny()); + } + + /// + /// Verifies that when PersistChatHistoryAfterEachServiceCall is enabled, + /// AIContextProviders are also notified of new messages after a successful call. + /// + [Fact] + public async Task RunAsync_NotifiesAIContextProviders_WhenOptionEnabledAsync() + { + // Arrange + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())).ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); + + Mock mockContextProvider = new((object?)null, (object?)null, (object?)null); + mockContextProvider.SetupGet(p => p.StateKeys).Returns(["TestAIContextProvider"]); + mockContextProvider + .Protected() + .Setup>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask(new AIContext())); + mockContextProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + AIContextProviders = [mockContextProvider.Object], + PersistChatHistoryAfterEachServiceCall = true, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await agent.RunAsync([new(ChatRole.User, "test")], session); + + // Assert — InvokedCoreAsync should be called by the decorator for the AIContextProvider + mockContextProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.ResponseMessages != null && + x.ResponseMessages.Any(m => m.Text == "response")), + ItExpr.IsAny()); + } + + /// + /// Verifies that when PersistChatHistoryAfterEachServiceCall is enabled and the service fails, + /// AIContextProviders are notified of the failure. + /// + [Fact] + public async Task RunAsync_NotifiesAIContextProvidersOfFailure_WhenOptionEnabledAsync() + { + // Arrange + var expectedException = new InvalidOperationException("Service failed"); + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())).ThrowsAsync(expectedException); + + Mock mockContextProvider = new((object?)null, (object?)null, (object?)null); + mockContextProvider.SetupGet(p => p.StateKeys).Returns(["TestAIContextProvider"]); + mockContextProvider + .Protected() + .Setup>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask(new AIContext())); + mockContextProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + AIContextProviders = [mockContextProvider.Object], + PersistChatHistoryAfterEachServiceCall = true, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await Assert.ThrowsAsync(() => agent.RunAsync([new(ChatRole.User, "test")], session)); + + // Assert — the decorator should have notified the AIContextProvider of the failure + mockContextProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.InvokeException != null && + x.InvokeException.Message == "Service failed"), + ItExpr.IsAny()); + } + + /// + /// Verifies that when PersistChatHistoryAfterEachServiceCall is enabled, + /// both ChatHistoryProvider and AIContextProviders are notified together. + /// + [Fact] + public async Task RunAsync_NotifiesBothProviders_WhenOptionEnabledAsync() + { + // Arrange + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())).ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); + + Mock mockChatHistoryProvider = new((object?)null, (object?)null, (object?)null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask()); + + Mock mockContextProvider = new((object?)null, (object?)null, (object?)null); + mockContextProvider.SetupGet(p => p.StateKeys).Returns(["TestAIContextProvider"]); + mockContextProvider + .Protected() + .Setup>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask(new AIContext())); + mockContextProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + AIContextProviders = [mockContextProvider.Object], + PersistChatHistoryAfterEachServiceCall = true, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await agent.RunAsync([new(ChatRole.User, "test")], session); + + // Assert — both providers should have been notified + mockChatHistoryProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.ResponseMessages != null && + x.ResponseMessages.Any(m => m.Text == "response")), + ItExpr.IsAny()); + + mockContextProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.ResponseMessages != null && + x.ResponseMessages.Any(m => m.Text == "response")), + ItExpr.IsAny()); + } + + /// + /// Verifies that during a FIC loop, response messages from the first call are not + /// re-notified as request messages on the second call. + /// + [Fact] + public async Task RunAsync_DoesNotReNotifyResponseMessagesAsRequestMessages_DuringFicLoopAsync() + { + // Arrange + int serviceCallCount = 0; + var assistantToolCallMessage = new ChatMessage(ChatRole.Assistant, [new FunctionCallContent("call1", "myTool", new Dictionary())]); + + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns(() => + { + serviceCallCount++; + if (serviceCallCount == 1) + { + return Task.FromResult(new ChatResponse([assistantToolCallMessage])); + } + + return Task.FromResult(new ChatResponse([new(ChatRole.Assistant, "final response")])); + }); + + var invokedContexts = new List(); + + Mock mockChatHistoryProvider = new((object?)null, (object?)null, (object?)null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Callback((ChatHistoryProvider.InvokedContext ctx, CancellationToken _) => invokedContexts.Add(ctx)) + .Returns(() => new ValueTask()); + + var tool = AIFunctionFactory.Create(() => "tool result", "myTool", "A test tool"); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatOptions = new() { Tools = [tool] }, + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAfterEachServiceCall = true, + }, services: new ServiceCollection().BuildServiceProvider()); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await agent.RunAsync([new(ChatRole.User, "test")], session); + + // Assert + Assert.Equal(2, invokedContexts.Count); + + // The assistant tool call message was a response in call 1 + Assert.Contains(invokedContexts[0].ResponseMessages!, m => ReferenceEquals(m, assistantToolCallMessage)); + + // It should NOT appear as a request in call 2 (it was already notified as a response) + var secondRequestMessages = invokedContexts[1].RequestMessages.ToList(); + Assert.DoesNotContain(secondRequestMessages, m => ReferenceEquals(m, assistantToolCallMessage)); + } + + /// + /// Verifies that when a failure occurs on the second call in a FIC loop, + /// only new request messages (not previously notified) are sent in the failure notification. + /// + [Fact] + public async Task RunAsync_DeduplicatesRequestMessages_OnFailureDuringFicLoopAsync() + { + // Arrange + int serviceCallCount = 0; + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns(() => + { + serviceCallCount++; + if (serviceCallCount == 1) + { + return Task.FromResult(new ChatResponse([new(ChatRole.Assistant, [new FunctionCallContent("call1", "myTool", new Dictionary())])])); + } + + throw new InvalidOperationException("Service failure on second call"); + }); + + var invokedContexts = new List(); + + Mock mockChatHistoryProvider = new((object?)null, (object?)null, (object?)null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Callback((ChatHistoryProvider.InvokedContext ctx, CancellationToken _) => invokedContexts.Add(ctx)) + .Returns(() => new ValueTask()); + + var tool = AIFunctionFactory.Create(() => "tool result", "myTool", "A test tool"); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatOptions = new() { Tools = [tool] }, + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAfterEachServiceCall = true, + }, services: new ServiceCollection().BuildServiceProvider()); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await Assert.ThrowsAsync(() => + agent.RunAsync([new(ChatRole.User, "test")], session)); + + // Assert — should have 2 notifications: success on call 1, failure on call 2 + Assert.Equal(2, invokedContexts.Count); + + // First notification: success, has user message as request + Assert.Null(invokedContexts[0].InvokeException); + Assert.Contains(invokedContexts[0].RequestMessages, m => m.Text == "test"); + + // Second notification: failure, should NOT include the user message (already notified) + Assert.NotNull(invokedContexts[1].InvokeException); + var failureRequestMessages = invokedContexts[1].RequestMessages.ToList(); + Assert.DoesNotContain(failureRequestMessages, m => m.Text == "test"); + } + + /// + /// Verifies that the NotifiedMessages set on the session is properly cleaned up after + /// a successful run completes. + /// + [Fact] + public async Task RunAsync_CleansUpNotifiedMessages_AfterRunCompletesAsync() + { + // Arrange + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())).ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); + + Mock mockChatHistoryProvider = new((object?)null, (object?)null, (object?)null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAfterEachServiceCall = true, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await agent.RunAsync([new(ChatRole.User, "test")], session); + + // Assert — NotifiedMessages should be empty (cleared) after the run completes + Assert.NotNull(session!.NotifiedMessages); + Assert.Empty(session.NotifiedMessages); + } + + private static async IAsyncEnumerable CreateAsyncEnumerableAsync(params ChatResponseUpdate[] updates) + { + foreach (var update in updates) + { + yield return update; + } + + await Task.CompletedTask; + } +} From 6909191a13a32aa04ea3e5c7f2ca5f5999ffe048 Mon Sep 17 00:00:00 2001 From: westey <164392973+westey-m@users.noreply.github.com> Date: Fri, 13 Mar 2026 17:26:02 +0000 Subject: [PATCH 2/6] Revert version reset --- dotnet/global.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dotnet/global.json b/dotnet/global.json index 482aa6b8d3..42bb8863a3 100644 --- a/dotnet/global.json +++ b/dotnet/global.json @@ -1,6 +1,6 @@ { "sdk": { - "version": "10.0.100", + "version": "10.0.200", "rollForward": "minor", "allowPrerelease": false }, From 65764197f8c31a106b4e7cda09c6590560105852 Mon Sep 17 00:00:00 2001 From: westey <164392973+westey-m@users.noreply.github.com> Date: Wed, 18 Mar 2026 11:30:08 +0000 Subject: [PATCH 3/6] Fix bugs and improve sample --- .../Program.cs | 156 +++++++++++++----- .../ChatClient/ChatClientAgent.cs | 25 --- .../ChatClient/ChatClientAgentSession.cs | 14 -- .../ChatHistoryPersistingChatClient.cs | 105 ++++++++---- .../ChatHistoryPersistingChatClientTests.cs | 40 ++--- 5 files changed, 214 insertions(+), 126 deletions(-) diff --git a/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs index 3a16c6a8e7..333d92d838 100644 --- a/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs +++ b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs @@ -8,8 +8,8 @@ // intermediate messages (tool calls and results) are persisted after each service call, // allowing you to inspect or recover them even if the process is interrupted mid-loop. // -// The sample uses RunStreamingAsync so that we can observe the chat history growing -// after each service call within a single agent run. +// The sample runs two multi-turn conversations: one using non-streaming (RunAsync) and one +// using streaming (RunStreamingAsync), to demonstrate correct behavior in both modes. using System.ComponentModel; using Azure.AI.OpenAI; @@ -34,6 +34,7 @@ static string GetWeather([Description("The city name.")] string city) => "SEATTLE" => "Seattle: 55°F, cloudy with light rain.", "NEW YORK" => "New York: 72°F, sunny and warm.", "LONDON" => "London: 48°F, overcast with fog.", + "DUBLIN" => "Dublin: 43°F, overcast with fog.", _ => $"{city}: weather data not available." }; @@ -44,6 +45,7 @@ static string GetTime([Description("The city name.")] string city) => "SEATTLE" => "Seattle: 9:00 AM PST", "NEW YORK" => "New York: 12:00 PM EST", "LONDON" => "London: 5:00 PM GMT", + "DUBLIN" => "Dublin: 5:00 PM GMT", _ => $"{city}: time data not available." }; @@ -62,57 +64,135 @@ static string GetTime([Description("The city name.")] string city) => PersistChatHistoryAfterEachServiceCall = true, }); -AgentSession session = await agent.CreateSessionAsync(); +await RunNonStreamingAsync(); +await RunStreamingAsync(); -// Ask about multiple cities — the model will need to call tools for each city, -// resulting in multiple service calls within a single agent run. -string prompt = "What's the weather and time in Seattle, New York, and London?"; +async Task RunNonStreamingAsync() +{ + int lastChatHistorySize = 0; -Console.ForegroundColor = ConsoleColor.Cyan; -Console.Write("\n[User] "); -Console.ResetColor(); -Console.WriteLine(prompt); + Console.ForegroundColor = ConsoleColor.Yellow; + Console.WriteLine("\n=== Non-Streaming Mode ==="); + Console.ResetColor(); -PrintChatHistory("Before run"); + AgentSession session = await agent.CreateSessionAsync(); -Console.ForegroundColor = ConsoleColor.Cyan; -Console.Write("\n[Agent] "); -Console.ResetColor(); + // First turn — ask about multiple cities so the model calls tools. + const string Prompt = "What's the weather and time in Seattle, New York, and London?"; + PrintUserMessage(Prompt); -// Use RunStreamingAsync to observe the response as it streams. -await foreach (var update in agent.RunStreamingAsync(prompt, session)) -{ - Console.Write(update); -} + var response = await agent.RunAsync(Prompt, session); + PrintAgentResponse(response.Text); + PrintChatHistory(session, "After run", ref lastChatHistorySize); -Console.WriteLine(); + // Second turn — follow-up to verify chat history is correct. + const string FollowUp1 = "And Dublin?"; + PrintUserMessage(FollowUp1); -PrintChatHistory("After run"); + response = await agent.RunAsync(FollowUp1, session); + PrintAgentResponse(response.Text); + PrintChatHistory(session, "After second run", ref lastChatHistorySize); -// Run a second turn to show that chat history accumulated correctly. -string followUp = "Which city is the warmest?"; -Console.ForegroundColor = ConsoleColor.Cyan; -Console.Write("\n[User] "); -Console.ResetColor(); -Console.WriteLine(followUp); + // Third turn — follow-up to verify chat history is correct. + const string FollowUp2 = "Which city is the warmest?"; + PrintUserMessage(FollowUp2); -Console.ForegroundColor = ConsoleColor.Cyan; -Console.Write("\n[Agent] "); -Console.ResetColor(); + response = await agent.RunAsync(FollowUp2, session); + PrintAgentResponse(response.Text); + PrintChatHistory(session, "After third run", ref lastChatHistorySize); +} -await foreach (var update in agent.RunStreamingAsync(followUp, session)) +async Task RunStreamingAsync() { - Console.Write(update); + int lastChatHistorySize = 0; + + Console.ForegroundColor = ConsoleColor.Yellow; + Console.WriteLine("\n=== Streaming Mode ==="); + Console.ResetColor(); + + AgentSession session = await agent.CreateSessionAsync(); + + // First turn — ask about multiple cities so the model calls tools. + const string Prompt = "What's the weather and time in Seattle, New York, and London?"; + PrintUserMessage(Prompt); + + Console.ForegroundColor = ConsoleColor.Cyan; + Console.Write("\n[Agent] "); + Console.ResetColor(); + + await foreach (var update in agent.RunStreamingAsync(Prompt, session)) + { + Console.Write(update); + + // During streaming we should be able to see updates to the chat history + // before the full run completes, as each service call is made and persisted. + PrintChatHistory(session, "During run", ref lastChatHistorySize); + } + + Console.WriteLine(); + PrintChatHistory(session, "After run", ref lastChatHistorySize); + + // Second turn — follow-up to verify chat history is correct. + const string FollowUp1 = "And Dublin?"; + PrintUserMessage(FollowUp1); + + Console.ForegroundColor = ConsoleColor.Cyan; + Console.Write("\n[Agent] "); + Console.ResetColor(); + + await foreach (var update in agent.RunStreamingAsync(FollowUp1, session)) + { + Console.Write(update); + + // During streaming we should be able to see updates to the chat history + // before the full run completes, as each service call is made and persisted. + PrintChatHistory(session, "During second run", ref lastChatHistorySize); + } + + Console.WriteLine(); + PrintChatHistory(session, "After second run", ref lastChatHistorySize); + + // Third turn — follow-up to verify chat history is correct. + const string FollowUp2 = "Which city is the warmest?"; + PrintUserMessage(FollowUp2); + + Console.ForegroundColor = ConsoleColor.Cyan; + Console.Write("\n[Agent] "); + Console.ResetColor(); + + await foreach (var update in agent.RunStreamingAsync(FollowUp2, session)) + { + Console.Write(update); + + // During streaming we should be able to see updates to the chat history + // before the full run completes, as each service call is made and persisted. + PrintChatHistory(session, "During third run", ref lastChatHistorySize); + } + + Console.WriteLine(); + PrintChatHistory(session, "After third run", ref lastChatHistorySize); } -Console.WriteLine(); +void PrintUserMessage(string message) +{ + Console.ForegroundColor = ConsoleColor.Cyan; + Console.Write("\n[User] "); + Console.ResetColor(); + Console.WriteLine(message); +} -PrintChatHistory("After second run"); +void PrintAgentResponse(string? text) +{ + Console.ForegroundColor = ConsoleColor.Cyan; + Console.Write("\n[Agent] "); + Console.ResetColor(); + Console.WriteLine(text); +} // Helper to print the current chat history from the session. -void PrintChatHistory(string label) +void PrintChatHistory(AgentSession session, string label, ref int lastChatHistorySize) { - if (session.TryGetInMemoryChatHistory(out var history)) + if (session.TryGetInMemoryChatHistory(out var history) && history.Count != lastChatHistorySize) { Console.ForegroundColor = ConsoleColor.DarkGray; Console.WriteLine($"\n [{label} — Chat history: {history.Count} message(s)]"); @@ -120,9 +200,11 @@ void PrintChatHistory(string label) { var preview = msg.Text?.Length > 80 ? msg.Text[..80] + "…" : msg.Text; var contentTypes = string.Join(", ", msg.Contents.Select(c => c.GetType().Name)); - Console.WriteLine($" {msg.Role,-12} | {preview ?? $"[{contentTypes}]"}"); + Console.WriteLine($" {msg.Role,-12} | {(string.IsNullOrWhiteSpace(preview) ? $"[{contentTypes}]" : preview)}"); } Console.ResetColor(); + + lastChatHistorySize = history.Count; } } diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs index bba8c474ef..a3f49810b8 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs @@ -217,13 +217,6 @@ protected override async Task RunCoreAsync( var loggingAgentName = this.GetLoggingAgentName(); this._logger.LogAgentChatClientInvokingAgent(nameof(RunAsync), this.Id, loggingAgentName, this._chatClientType); - // Initialize the per-service-call message tracking if the decorator is being used. - if (this.PersistsChatHistoryPerServiceCall) - { - safeSession.NotifiedMessages ??= new(); - safeSession.NotifiedMessages.Clear(); - } - // Call the IChatClient and notify the AIContextProvider of any failures. ChatResponse chatResponse; try @@ -235,11 +228,6 @@ protected override async Task RunCoreAsync( await this.NotifyProvidersOfFailureAtEndOfRunAsync(safeSession, ex, inputMessagesForChatClient, chatOptions, cancellationToken).ConfigureAwait(false); throw; } - finally - { - // Clear the per-service-call message tracking now that the run is complete (or failed). - safeSession.NotifiedMessages?.Clear(); - } this._logger.LogAgentChatClientInvokedAgent(nameof(RunAsync), this.Id, loggingAgentName, this._chatClientType, inputMessages.Count); @@ -310,13 +298,6 @@ protected override async IAsyncEnumerable RunCoreStreamingA this._logger.LogAgentChatClientInvokingAgent(nameof(RunStreamingAsync), this.Id, loggingAgentName, this._chatClientType); - // Initialize the per-service-call message tracking if the decorator is being used. - if (this.PersistsChatHistoryPerServiceCall) - { - safeSession.NotifiedMessages ??= new(); - safeSession.NotifiedMessages.Clear(); - } - List responseUpdates = GetResponseUpdates(continuationToken); IAsyncEnumerator responseUpdatesEnumerator; @@ -328,7 +309,6 @@ protected override async IAsyncEnumerable RunCoreStreamingA } catch (Exception ex) { - safeSession.NotifiedMessages?.Clear(); await this.NotifyProvidersOfFailureAtEndOfRunAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false); throw; } @@ -343,7 +323,6 @@ protected override async IAsyncEnumerable RunCoreStreamingA } catch (Exception ex) { - safeSession.NotifiedMessages?.Clear(); await this.NotifyProvidersOfFailureAtEndOfRunAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false); throw; } @@ -370,7 +349,6 @@ protected override async IAsyncEnumerable RunCoreStreamingA } catch (Exception ex) { - safeSession.NotifiedMessages?.Clear(); await this.NotifyProvidersOfFailureAtEndOfRunAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false); throw; } @@ -384,9 +362,6 @@ protected override async IAsyncEnumerable RunCoreStreamingA // Notify providers of all new messages unless persistence is handled per-service-call by the decorator. await this.NotifyProvidersOfNewMessagesAtEndOfRunAsync(safeSession, GetInputMessages(inputMessagesForChatClient, continuationToken), chatResponse.Messages, chatOptions, cancellationToken).ConfigureAwait(false); - - // Clear the per-service-call message tracking now that the run is complete. - safeSession.NotifiedMessages?.Clear(); } /// diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentSession.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentSession.cs index f7093472e9..574d54700c 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentSession.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentSession.cs @@ -90,18 +90,4 @@ internal JsonElement Serialize(JsonSerializerOptions? jsonSerializerOptions = nu private string DebuggerDisplay => this.ConversationId is { } conversationId ? $"ConversationId = {conversationId}, StateBag Count = {this.StateBag.Count}" : $"StateBag Count = {this.StateBag.Count}"; - - /// - /// Gets or sets the set of instances that have already been notified to providers - /// during the current agent run. Used by to avoid duplicate - /// notifications when loops cause the same messages to be passed - /// across multiple service calls. - /// - /// - /// This set is cleared at the start and end of each run. It uses reference equality - /// to track message identity since reuses the same message objects - /// across loop iterations. - /// - [JsonIgnore] - internal HashSet? NotifiedMessages { get; set; } } diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs index 0b4d02c95f..733e0458c4 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs @@ -32,6 +32,12 @@ namespace Microsoft.Agents.AI; /// internal sealed class ChatHistoryPersistingChatClient : DelegatingChatClient { + /// + /// The key used in and + /// to mark messages and their content as already persisted to chat history. + /// + internal const string PersistedMarkerKey = "_chatHistoryPersisted"; + /// /// Initializes a new instance of the class. /// @@ -56,16 +62,15 @@ public override async Task GetResponseAsync( } catch (Exception ex) { - var newRequestMessagesOnFailure = GetNewMessages(messages, session); - MarkAsNotified(newRequestMessagesOnFailure, session); + var newRequestMessagesOnFailure = GetNewRequestMessages(messages); await agent.NotifyProvidersOfFailureAsync(session, ex, newRequestMessagesOnFailure, options, cancellationToken).ConfigureAwait(false); throw; } - var newRequestMessages = GetNewMessages(messages, session); - MarkAsNotified(newRequestMessages, session); - MarkAsNotified(response.Messages, session); + var newRequestMessages = GetNewRequestMessages(messages); await agent.NotifyProvidersOfNewMessagesAsync(session, newRequestMessages, response.Messages, options, cancellationToken).ConfigureAwait(false); + MarkAsPersisted(newRequestMessages); + MarkAsPersisted(response.Messages); return response; } @@ -87,8 +92,7 @@ public override async IAsyncEnumerable GetStreamingResponseA } catch (Exception ex) { - var newRequestMessagesOnFailure = GetNewMessages(messages, session); - MarkAsNotified(newRequestMessagesOnFailure, session); + var newRequestMessagesOnFailure = GetNewRequestMessages(messages); await agent.NotifyProvidersOfFailureAsync(session, ex, newRequestMessagesOnFailure, options, cancellationToken).ConfigureAwait(false); throw; } @@ -100,8 +104,7 @@ public override async IAsyncEnumerable GetStreamingResponseA } catch (Exception ex) { - var newRequestMessagesOnFailure = GetNewMessages(messages, session); - MarkAsNotified(newRequestMessagesOnFailure, session); + var newRequestMessagesOnFailure = GetNewRequestMessages(messages); await agent.NotifyProvidersOfFailureAsync(session, ex, newRequestMessagesOnFailure, options, cancellationToken).ConfigureAwait(false); throw; } @@ -118,18 +121,17 @@ public override async IAsyncEnumerable GetStreamingResponseA } catch (Exception ex) { - var newRequestMessagesOnFailure = GetNewMessages(messages, session); - MarkAsNotified(newRequestMessagesOnFailure, session); + var newRequestMessagesOnFailure = GetNewRequestMessages(messages); await agent.NotifyProvidersOfFailureAsync(session, ex, newRequestMessagesOnFailure, options, cancellationToken).ConfigureAwait(false); throw; } } var chatResponse = responseUpdates.ToChatResponse(); - var newRequestMessages = GetNewMessages(messages, session); - MarkAsNotified(newRequestMessages, session); - MarkAsNotified(chatResponse.Messages, session); + var newRequestMessages = GetNewRequestMessages(messages); await agent.NotifyProvidersOfNewMessagesAsync(session, newRequestMessages, chatResponse.Messages, options, cancellationToken).ConfigureAwait(false); + MarkAsPersisted(newRequestMessages); + MarkAsPersisted(chatResponse.Messages); } /// @@ -160,35 +162,76 @@ private static (ChatClientAgent Agent, ChatClientAgentSession Session) GetRequir } /// - /// Filters the given messages to return only those that have not yet been notified to providers - /// during the current agent run. + /// Returns only the request messages that have not yet been persisted to chat history. + /// + /// + /// A message is considered already persisted if any of the following is true: + /// + /// It has the in its . + /// It has an of + /// (indicating it was loaded from chat history and does not need to be re-persisted). + /// It has and all of its items have the + /// in their . This handles the + /// streaming case where reconstructs objects + /// independently via ToChatResponse(), producing different object references that share the same + /// underlying instances. + /// + /// + /// A list of request messages that have not yet been persisted. + /// The full set of request messages to filter. + private static List GetNewRequestMessages(IEnumerable messages) + { + return messages.Where(m => !IsAlreadyPersisted(m)).ToList(); + } + + /// + /// Determines whether a message has already been persisted to chat history by this decorator. /// - /// The full set of messages to filter. - /// The current session containing the set of already-notified messages. - /// A list of messages that have not yet been notified. If no tracking is available, all messages are returned. - private static IReadOnlyList GetNewMessages(IEnumerable messages, ChatClientAgentSession session) + private static bool IsAlreadyPersisted(ChatMessage message) { - HashSet? notifiedMessages = session.NotifiedMessages; - if (notifiedMessages is null or { Count: 0 }) + if (message.AdditionalProperties?.TryGetValue(PersistedMarkerKey, out var value) == true && value is true) { - return messages as IReadOnlyList ?? messages.ToList(); + return true; } - return messages.Where(m => !notifiedMessages.Contains(m)).ToList(); + if (message.GetAgentRequestMessageSourceType() == AgentRequestMessageSourceType.ChatHistory) + { + return true; + } + + // In streaming mode, FunctionInvokingChatClient reconstructs ChatMessage objects via ToChatResponse() + // independently, producing different ChatMessage instances. However, the underlying AIContent objects + // (e.g., FunctionCallContent, FunctionResultContent) are shared references. Checking for markers on + // AIContent handles dedup in this case. + if (message.Contents.Count > 0 && message.Contents.All(c => c.AdditionalProperties?.TryGetValue(PersistedMarkerKey, out var value) == true && value is true)) + { + return true; + } + + return false; } /// - /// Marks the given messages as notified so they will be excluded from future notifications in the current run. + /// Marks the given messages as persisted by setting a marker on both the + /// and each of its items. /// - /// The messages to mark as notified. - /// The current session containing the set of already-notified messages. - private static void MarkAsNotified(IEnumerable messages, ChatClientAgentSession session) + /// + /// Both levels are marked because may reconstruct + /// objects in streaming mode (losing the message-level marker), + /// but the references are shared and retain their markers. + /// + /// The messages to mark as persisted. + private static void MarkAsPersisted(IEnumerable messages) { - if (session.NotifiedMessages is { } notifiedMessages) + foreach (var message in messages) { - foreach (var message in messages) + message.AdditionalProperties ??= new(); + message.AdditionalProperties[PersistedMarkerKey] = true; + + foreach (var content in message.Contents) { - notifiedMessages.Add(message); + content.AdditionalProperties ??= new(); + content.AdditionalProperties[PersistedMarkerKey] = true; } } } diff --git a/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs b/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs index 54bd42910e..717e38f549 100644 --- a/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs @@ -35,7 +35,7 @@ public async Task RunAsync_PersistsMessagesPerServiceCall_WhenOptionEnabledAsync It.IsAny(), It.IsAny())).ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); - Mock mockChatHistoryProvider = new((object?)null, (object?)null, (object?)null); + Mock mockChatHistoryProvider = new(null, null, null); mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); mockChatHistoryProvider .Protected() @@ -82,7 +82,7 @@ public async Task RunAsync_PersistsMessagesAtEndOfRun_WhenOptionDisabledAsync() It.IsAny(), It.IsAny())).ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); - Mock mockChatHistoryProvider = new((object?)null, (object?)null, (object?)null); + Mock mockChatHistoryProvider = new(null, null, null); mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); mockChatHistoryProvider .Protected() @@ -130,7 +130,7 @@ public async Task RunAsync_NotifiesProviderOfFailure_WhenOptionEnabledAndService It.IsAny(), It.IsAny())).ThrowsAsync(expectedException); - Mock mockChatHistoryProvider = new((object?)null, (object?)null, (object?)null); + Mock mockChatHistoryProvider = new(null, null, null); mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); mockChatHistoryProvider .Protected() @@ -275,7 +275,7 @@ public async Task RunAsync_PersistsPerServiceCall_DuringFunctionInvocationLoopAs var invokedContexts = new List(); - Mock mockChatHistoryProvider = new((object?)null, (object?)null, (object?)null); + Mock mockChatHistoryProvider = new(null, null, null); mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); mockChatHistoryProvider .Protected() @@ -349,7 +349,7 @@ public async Task RunStreamingAsync_PersistsMessagesPerServiceCall_WhenOptionEna new ChatResponseUpdate(ChatRole.Assistant, "streaming "), new ChatResponseUpdate(ChatRole.Assistant, "response"))); - Mock mockChatHistoryProvider = new((object?)null, (object?)null, (object?)null); + Mock mockChatHistoryProvider = new(null, null, null); mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); mockChatHistoryProvider .Protected() @@ -399,7 +399,7 @@ public async Task RunAsync_NotifiesAIContextProviders_WhenOptionEnabledAsync() It.IsAny(), It.IsAny())).ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); - Mock mockContextProvider = new((object?)null, (object?)null, (object?)null); + Mock mockContextProvider = new(null, null, null); mockContextProvider.SetupGet(p => p.StateKeys).Returns(["TestAIContextProvider"]); mockContextProvider .Protected() @@ -446,7 +446,7 @@ public async Task RunAsync_NotifiesAIContextProvidersOfFailure_WhenOptionEnabled It.IsAny(), It.IsAny())).ThrowsAsync(expectedException); - Mock mockContextProvider = new((object?)null, (object?)null, (object?)null); + Mock mockContextProvider = new(null, null, null); mockContextProvider.SetupGet(p => p.StateKeys).Returns(["TestAIContextProvider"]); mockContextProvider .Protected() @@ -492,7 +492,7 @@ public async Task RunAsync_NotifiesBothProviders_WhenOptionEnabledAsync() It.IsAny(), It.IsAny())).ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); - Mock mockChatHistoryProvider = new((object?)null, (object?)null, (object?)null); + Mock mockChatHistoryProvider = new(null, null, null); mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); mockChatHistoryProvider .Protected() @@ -504,7 +504,7 @@ public async Task RunAsync_NotifiesBothProviders_WhenOptionEnabledAsync() .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) .Returns(() => new ValueTask()); - Mock mockContextProvider = new((object?)null, (object?)null, (object?)null); + Mock mockContextProvider = new(null, null, null); mockContextProvider.SetupGet(p => p.StateKeys).Returns(["TestAIContextProvider"]); mockContextProvider .Protected() @@ -574,7 +574,7 @@ public async Task RunAsync_DoesNotReNotifyResponseMessagesAsRequestMessages_Duri var invokedContexts = new List(); - Mock mockChatHistoryProvider = new((object?)null, (object?)null, (object?)null); + Mock mockChatHistoryProvider = new(null, null, null); mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); mockChatHistoryProvider .Protected() @@ -639,7 +639,7 @@ public async Task RunAsync_DeduplicatesRequestMessages_OnFailureDuringFicLoopAsy var invokedContexts = new List(); - Mock mockChatHistoryProvider = new((object?)null, (object?)null, (object?)null); + Mock mockChatHistoryProvider = new(null, null, null); mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); mockChatHistoryProvider .Protected() @@ -680,11 +680,11 @@ await Assert.ThrowsAsync(() => } /// - /// Verifies that the NotifiedMessages set on the session is properly cleaned up after - /// a successful run completes. + /// Verifies that after a successful run with per-service-call persistence, the notified + /// messages are stamped with the persisted marker so they are not re-notified. /// [Fact] - public async Task RunAsync_CleansUpNotifiedMessages_AfterRunCompletesAsync() + public async Task RunAsync_MarksNotifiedMessages_WithPersistedMarkerAsync() { // Arrange Mock mockService = new(); @@ -694,7 +694,7 @@ public async Task RunAsync_CleansUpNotifiedMessages_AfterRunCompletesAsync() It.IsAny(), It.IsAny())).ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); - Mock mockChatHistoryProvider = new((object?)null, (object?)null, (object?)null); + Mock mockChatHistoryProvider = new(null, null, null); mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); mockChatHistoryProvider .Protected() @@ -713,12 +713,14 @@ public async Task RunAsync_CleansUpNotifiedMessages_AfterRunCompletesAsync() }); // Act + var inputMessage = new ChatMessage(ChatRole.User, "test"); var session = await agent.CreateSessionAsync() as ChatClientAgentSession; - await agent.RunAsync([new(ChatRole.User, "test")], session); + await agent.RunAsync([inputMessage], session); - // Assert — NotifiedMessages should be empty (cleared) after the run completes - Assert.NotNull(session!.NotifiedMessages); - Assert.Empty(session.NotifiedMessages); + // Assert — input message should be marked as persisted + Assert.True( + inputMessage.AdditionalProperties?.ContainsKey(ChatHistoryPersistingChatClient.PersistedMarkerKey) == true, + "Input message should be marked as persisted after a successful run."); } private static async IAsyncEnumerable CreateAsyncEnumerableAsync(params ChatResponseUpdate[] updates) From 13d2cb1768c43a02ffbd26add9d2da9d91a4e94e Mon Sep 17 00:00:00 2001 From: westey <164392973+westey-m@users.noreply.github.com> Date: Wed, 18 Mar 2026 11:51:02 +0000 Subject: [PATCH 4/6] Fix formatting issues --- .../Microsoft.Agents.AI/ChatClient/ChatClientAgentSession.cs | 2 -- .../Compaction/SummarizationCompactionStrategy.cs | 2 -- 2 files changed, 4 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentSession.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentSession.cs index 574d54700c..400bfbcaf6 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentSession.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentSession.cs @@ -1,11 +1,9 @@ // Copyright (c) Microsoft. All rights reserved. using System; -using System.Collections.Generic; using System.Diagnostics; using System.Text.Json; using System.Text.Json.Serialization; -using Microsoft.Extensions.AI; using Microsoft.Shared.Diagnostics; namespace Microsoft.Agents.AI; diff --git a/dotnet/src/Microsoft.Agents.AI/Compaction/SummarizationCompactionStrategy.cs b/dotnet/src/Microsoft.Agents.AI/Compaction/SummarizationCompactionStrategy.cs index f1dd0bd2cd..1a5d35144d 100644 --- a/dotnet/src/Microsoft.Agents.AI/Compaction/SummarizationCompactionStrategy.cs +++ b/dotnet/src/Microsoft.Agents.AI/Compaction/SummarizationCompactionStrategy.cs @@ -11,8 +11,6 @@ using Microsoft.Shared.DiagnosticIds; using Microsoft.Shared.Diagnostics; -#pragma warning disable CA1873 - namespace Microsoft.Agents.AI.Compaction; /// From 29e7fb01a60708f22d0fb89f6344673544126be0 Mon Sep 17 00:00:00 2001 From: westey <164392973+westey-m@users.noreply.github.com> Date: Wed, 18 Mar 2026 19:13:22 +0000 Subject: [PATCH 5/6] Also updating conversation id during run --- .../Program.cs | 40 +++++++++++++------ .../ChatClient/ChatClientAgent.cs | 24 +++++++++-- .../ChatClient/ChatClientAgentOptions.cs | 23 +++++++++-- .../ChatHistoryPersistingChatClient.cs | 2 + .../ChatHistoryPersistingChatClientTests.cs | 34 ++++++++++++++++ 5 files changed, 104 insertions(+), 19 deletions(-) diff --git a/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs index 333d92d838..3b886a55e6 100644 --- a/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs +++ b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs @@ -16,15 +16,16 @@ using Azure.Identity; using Microsoft.Agents.AI; using Microsoft.Extensions.AI; +using OpenAI.Responses; var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set."); var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini"; +var store = Environment.GetEnvironmentVariable("AZURE_OPENAI_RESPONSES_STORE") ?? "false"; // WARNING: DefaultAzureCredential is convenient for development but requires careful consideration in production. // In production, consider using a specific credential (e.g., ManagedIdentityCredential) to avoid // latency issues, unintended credential probing, and potential security risks from fallback mechanisms. AzureOpenAIClient openAIClient = new(new Uri(endpoint), new DefaultAzureCredential()); -IChatClient chatClient = openAIClient.GetChatClient(deploymentName).AsIChatClient(); // Define multiple tools so the model makes several tool calls in a single run. [Description("Get the current weather for a city.")] @@ -50,8 +51,11 @@ static string GetTime([Description("The city name.")] string city) => }; // Create the agent with PersistChatHistoryAfterEachServiceCall enabled. -// The in-memory ChatHistoryProvider is used by default when no explicit provider is set, -// so we can inspect the chat history via session.TryGetInMemoryChatHistory(). +// The in-memory ChatHistoryProvider is used by default when the service does not require service stored chat +// history, so for those cases, we can inspect the chat history via session.TryGetInMemoryChatHistory(). +IChatClient chatClient = string.Equals(store, "TRUE", StringComparison.OrdinalIgnoreCase) ? + openAIClient.GetResponsesClient(deploymentName).AsIChatClient() : + openAIClient.GetResponsesClient(deploymentName).AsIChatClientWithStoredOutputDisabled(); AIAgent agent = chatClient.AsAIAgent( new ChatClientAgentOptions { @@ -70,6 +74,7 @@ static string GetTime([Description("The city name.")] string city) => async Task RunNonStreamingAsync() { int lastChatHistorySize = 0; + string lastConversationId = string.Empty; Console.ForegroundColor = ConsoleColor.Yellow; Console.WriteLine("\n=== Non-Streaming Mode ==="); @@ -83,7 +88,7 @@ async Task RunNonStreamingAsync() var response = await agent.RunAsync(Prompt, session); PrintAgentResponse(response.Text); - PrintChatHistory(session, "After run", ref lastChatHistorySize); + PrintChatHistory(session, "After run", ref lastChatHistorySize, ref lastConversationId); // Second turn — follow-up to verify chat history is correct. const string FollowUp1 = "And Dublin?"; @@ -91,7 +96,7 @@ async Task RunNonStreamingAsync() response = await agent.RunAsync(FollowUp1, session); PrintAgentResponse(response.Text); - PrintChatHistory(session, "After second run", ref lastChatHistorySize); + PrintChatHistory(session, "After second run", ref lastChatHistorySize, ref lastConversationId); // Third turn — follow-up to verify chat history is correct. const string FollowUp2 = "Which city is the warmest?"; @@ -99,12 +104,13 @@ async Task RunNonStreamingAsync() response = await agent.RunAsync(FollowUp2, session); PrintAgentResponse(response.Text); - PrintChatHistory(session, "After third run", ref lastChatHistorySize); + PrintChatHistory(session, "After third run", ref lastChatHistorySize, ref lastConversationId); } async Task RunStreamingAsync() { int lastChatHistorySize = 0; + string lastConversationId = string.Empty; Console.ForegroundColor = ConsoleColor.Yellow; Console.WriteLine("\n=== Streaming Mode ==="); @@ -126,11 +132,11 @@ async Task RunStreamingAsync() // During streaming we should be able to see updates to the chat history // before the full run completes, as each service call is made and persisted. - PrintChatHistory(session, "During run", ref lastChatHistorySize); + PrintChatHistory(session, "During run", ref lastChatHistorySize, ref lastConversationId); } Console.WriteLine(); - PrintChatHistory(session, "After run", ref lastChatHistorySize); + PrintChatHistory(session, "After run", ref lastChatHistorySize, ref lastConversationId); // Second turn — follow-up to verify chat history is correct. const string FollowUp1 = "And Dublin?"; @@ -146,11 +152,11 @@ async Task RunStreamingAsync() // During streaming we should be able to see updates to the chat history // before the full run completes, as each service call is made and persisted. - PrintChatHistory(session, "During second run", ref lastChatHistorySize); + PrintChatHistory(session, "During second run", ref lastChatHistorySize, ref lastConversationId); } Console.WriteLine(); - PrintChatHistory(session, "After second run", ref lastChatHistorySize); + PrintChatHistory(session, "After second run", ref lastChatHistorySize, ref lastConversationId); // Third turn — follow-up to verify chat history is correct. const string FollowUp2 = "Which city is the warmest?"; @@ -166,11 +172,11 @@ async Task RunStreamingAsync() // During streaming we should be able to see updates to the chat history // before the full run completes, as each service call is made and persisted. - PrintChatHistory(session, "During third run", ref lastChatHistorySize); + PrintChatHistory(session, "During third run", ref lastChatHistorySize, ref lastConversationId); } Console.WriteLine(); - PrintChatHistory(session, "After third run", ref lastChatHistorySize); + PrintChatHistory(session, "After third run", ref lastChatHistorySize, ref lastConversationId); } void PrintUserMessage(string message) @@ -190,7 +196,7 @@ void PrintAgentResponse(string? text) } // Helper to print the current chat history from the session. -void PrintChatHistory(AgentSession session, string label, ref int lastChatHistorySize) +void PrintChatHistory(AgentSession session, string label, ref int lastChatHistorySize, ref string lastConversationId) { if (session.TryGetInMemoryChatHistory(out var history) && history.Count != lastChatHistorySize) { @@ -207,4 +213,12 @@ void PrintChatHistory(AgentSession session, string label, ref int lastChatHistor lastChatHistorySize = history.Count; } + + if (session is ChatClientAgentSession ccaSession && ccaSession.ConversationId is not null && ccaSession.ConversationId != lastConversationId) + { + Console.ForegroundColor = ConsoleColor.DarkGray; + Console.WriteLine($" [{label} — Conversation ID: {ccaSession.ConversationId}]"); + Console.ResetColor(); + lastConversationId = ccaSession.ConversationId; + } } diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs index a3f49810b8..eb37e853c3 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs @@ -233,7 +233,7 @@ protected override async Task RunCoreAsync( // We can derive the type of supported session from whether we have a conversation id, // so let's update it and set the conversation id for the service session case. - this.UpdateSessionConversationId(safeSession, chatResponse.ConversationId, cancellationToken); + this.UpdateSessionConversationIdAtEndOfRun(safeSession, chatResponse.ConversationId, cancellationToken); // Ensure that the author name is set for each message in the response. foreach (ChatMessage chatResponseMessage in chatResponse.Messages) @@ -358,7 +358,7 @@ protected override async IAsyncEnumerable RunCoreStreamingA // We can derive the type of supported session from whether we have a conversation id, // so let's update it and set the conversation id for the service session case. - this.UpdateSessionConversationId(safeSession, chatResponse.ConversationId, cancellationToken); + this.UpdateSessionConversationIdAtEndOfRun(safeSession, chatResponse.ConversationId, cancellationToken); // Notify providers of all new messages unless persistence is handled per-service-call by the decorator. await this.NotifyProvidersOfNewMessagesAtEndOfRunAsync(safeSession, GetInputMessages(inputMessagesForChatClient, continuationToken), chatResponse.Messages, chatOptions, cancellationToken).ConfigureAwait(false); @@ -766,7 +766,7 @@ private async Task return (typedSession, chatOptions, messagesList, continuationToken); } - private void UpdateSessionConversationId(ChatClientAgentSession session, string? responseConversationId, CancellationToken cancellationToken) + internal void UpdateSessionConversationId(ChatClientAgentSession session, string? responseConversationId, CancellationToken cancellationToken) { if (string.IsNullOrWhiteSpace(responseConversationId) && !string.IsNullOrWhiteSpace(session.ConversationId)) { @@ -818,6 +818,24 @@ private void UpdateSessionConversationId(ChatClientAgentSession session, string? /// decorator handles per-service-call notification, /// so this end-of-run notification is skipped. /// + /// + /// Updates the session conversation ID at the end of an agent run. + /// + /// + /// When is , the + /// decorator handles per-service-call conversation ID updates, + /// so this end-of-run update is skipped. + /// + private void UpdateSessionConversationIdAtEndOfRun(ChatClientAgentSession session, string? responseConversationId, CancellationToken cancellationToken) + { + if (this.PersistsChatHistoryPerServiceCall) + { + return; + } + + this.UpdateSessionConversationId(session, responseConversationId, cancellationToken); + } + private Task NotifyProvidersOfNewMessagesAtEndOfRunAsync( ChatClientAgentSession session, IEnumerable requestMessages, diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs index b9c23e62ad..d4e5fe58c0 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs @@ -98,10 +98,27 @@ public sealed class ChatClientAgentOptions /// /// /// By default, persists request and response messages via the - /// only after the full run completes, which may include multiple + /// or persists the latest service provided + /// conversation id only after the full run completes, which may include multiple /// iterations of the function invocation loop. Setting this property to causes - /// messages to be persisted after each individual call to the underlying AI service, so that intermediate - /// messages (e.g., tool calls and results) are saved even if the process is interrupted mid-loop. + /// messages or conversation ids to be persisted after each individual call to the underlying AI service, so that intermediate + /// progress (e.g., tool calls and results) is saved even if the process is interrupted mid-loop. + /// + /// + /// Note that when using an AI service with built in chat history storage, which uses a single threaded conversation model (e.g. OpenAI Responses with the Conversations API) + /// setting this setting to will have no effect. This type of service updates the single conversation with each service call, + /// and there is no way to revert to a previous state. + /// + /// + /// On the other hand, when using an AI service with built in chat history storage, which supports forking, (e.g. OpenAI Responses with Response Ids) + /// setting this setting to will mean that the will only persist the last returned response id at + /// the end of the run, whereas setting this setting to will mean that the will persist each returned + /// response id after each service call. This means that the last successful response id will always be available in the . + /// + /// + /// It's important to note that enabling this setting may leave your chat history in a state where is required to start a new run. + /// If the last successful service call returned it is not possible to continue the session until a + /// is provided as input for a subsequent run. /// /// /// When this option is enabled, a decorator is automatically diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs index 733e0458c4..0bff151710 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs @@ -68,6 +68,7 @@ public override async Task GetResponseAsync( } var newRequestMessages = GetNewRequestMessages(messages); + agent.UpdateSessionConversationId(session, response.ConversationId, cancellationToken); await agent.NotifyProvidersOfNewMessagesAsync(session, newRequestMessages, response.Messages, options, cancellationToken).ConfigureAwait(false); MarkAsPersisted(newRequestMessages); MarkAsPersisted(response.Messages); @@ -129,6 +130,7 @@ public override async IAsyncEnumerable GetStreamingResponseA var chatResponse = responseUpdates.ToChatResponse(); var newRequestMessages = GetNewRequestMessages(messages); + agent.UpdateSessionConversationId(session, chatResponse.ConversationId, cancellationToken); await agent.NotifyProvidersOfNewMessagesAsync(session, newRequestMessages, chatResponse.Messages, options, cancellationToken).ConfigureAwait(false); MarkAsPersisted(newRequestMessages); MarkAsPersisted(chatResponse.Messages); diff --git a/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs b/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs index 717e38f549..32bd718a1c 100644 --- a/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs @@ -723,6 +723,40 @@ public async Task RunAsync_MarksNotifiedMessages_WithPersistedMarkerAsync() "Input message should be marked as persisted after a successful run."); } + /// + /// Verifies that when per-service-call persistence is enabled and the inner client returns a + /// conversation ID, the session's ConversationId is updated after the service call. + /// + [Fact] + public async Task RunAsync_UpdatesSessionConversationId_WhenPerServiceCallPersistenceEnabledAsync() + { + // Arrange + const string ExpectedConversationId = "conv-123"; + + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")]) + { + ConversationId = ExpectedConversationId, + }); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + PersistChatHistoryAfterEachServiceCall = true, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await agent.RunAsync([new(ChatRole.User, "test")], session); + + // Assert — session should have the conversation ID returned by the inner client + Assert.Equal(ExpectedConversationId, session!.ConversationId); + } + private static async IAsyncEnumerable CreateAsyncEnumerableAsync(params ChatResponseUpdate[] updates) { foreach (var update in updates) From bc681923f3e93891fab84c4648cca2a502c20da7 Mon Sep 17 00:00:00 2001 From: westey <164392973+westey-m@users.noreply.github.com> Date: Tue, 24 Mar 2026 18:12:38 +0000 Subject: [PATCH 6/6] Update based on ADR feedback --- .../Program.cs | 24 +-- .../README.md | 9 +- .../ChatClient/ChatClientAgent.cs | 167 +++++++++++++++--- .../ChatClient/ChatClientAgentLogMessages.cs | 28 +++ .../ChatClient/ChatClientAgentOptions.cs | 59 ++++--- .../ChatClient/ChatClientBuilderExtensions.cs | 44 +++++ .../ChatClient/ChatClientExtensions.cs | 8 +- .../ChatHistoryPersistingChatClient.cs | 111 ++++++++++-- .../ChatHistoryPersistingChatClientTests.cs | 91 +++++----- 9 files changed, 402 insertions(+), 139 deletions(-) diff --git a/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs index 3b886a55e6..07382e6417 100644 --- a/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs +++ b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs @@ -1,12 +1,15 @@ // Copyright (c) Microsoft. All rights reserved. -// This sample demonstrates how the PersistChatHistoryAfterEachServiceCall option causes -// chat history to be persisted after each individual call to the AI service, rather than -// only at the end of the full agent run. When an agent uses tools, FunctionInvokingChatClient -// loops multiple times (service call → tool execution → service call), and by default the -// chat history is only persisted once the entire loop finishes. With this option enabled, -// intermediate messages (tool calls and results) are persisted after each service call, -// allowing you to inspect or recover them even if the process is interrupted mid-loop. +// This sample demonstrates how the ChatClientAgent persists chat history after each individual +// call to the AI service. +// When an agent uses tools, FunctionInvokingChatClient may loop multiple times +// (service call → tool execution → service call), and intermediate messages (tool calls and +// results) are persisted after each service call. This allows you to inspect or recover them +// even if the process is interrupted mid-loop, but may also result in chat history that is not +// yet finalized (e.g., tool calls without results) being persisted, which may be undesirable in some cases. +// +// To opt into end-of-run persistence instead (atomic run semantics), set +// PersistChatHistoryAtEndOfRun = true on ChatClientAgentOptions. // // The sample runs two multi-turn conversations: one using non-streaming (RunAsync) and one // using streaming (RunStreamingAsync), to demonstrate correct behavior in both modes. @@ -50,12 +53,12 @@ static string GetTime([Description("The city name.")] string city) => _ => $"{city}: time data not available." }; -// Create the agent with PersistChatHistoryAfterEachServiceCall enabled. +// Create the agent — per-service-call persistence is the default behavior. // The in-memory ChatHistoryProvider is used by default when the service does not require service stored chat // history, so for those cases, we can inspect the chat history via session.TryGetInMemoryChatHistory(). IChatClient chatClient = string.Equals(store, "TRUE", StringComparison.OrdinalIgnoreCase) ? - openAIClient.GetResponsesClient(deploymentName).AsIChatClient() : - openAIClient.GetResponsesClient(deploymentName).AsIChatClientWithStoredOutputDisabled(); + openAIClient.GetResponsesClient().AsIChatClient(deploymentName) : + openAIClient.GetResponsesClient().AsIChatClientWithStoredOutputDisabled(deploymentName); AIAgent agent = chatClient.AsAIAgent( new ChatClientAgentOptions { @@ -65,7 +68,6 @@ static string GetTime([Description("The city name.")] string city) => Instructions = "You are a helpful assistant. When asked about multiple cities, call the appropriate tool for each city.", Tools = [AIFunctionFactory.Create(GetWeather), AIFunctionFactory.Create(GetTime)] }, - PersistChatHistoryAfterEachServiceCall = true, }); await RunNonStreamingAsync(); diff --git a/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/README.md b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/README.md index 3c3e8a2c30..d6157586f0 100644 --- a/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/README.md +++ b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/README.md @@ -1,17 +1,18 @@ # In-Function-Loop Checkpointing -This sample demonstrates how the `PersistChatHistoryAfterEachServiceCall` option on `ChatClientAgentOptions` causes chat history to be saved after each individual call to the AI service, rather than only at the end of the full agent run. +This sample demonstrates how `ChatClientAgent` persists chat history after each individual call to the AI service by default. This per-service-call persistence ensures intermediate progress is saved during the function invocation loop. ## What This Sample Shows -When an agent uses tools, the `FunctionInvokingChatClient` loops multiple times (service call → tool execution → service call → …). By default, chat history is only persisted once the entire loop finishes. With `PersistChatHistoryAfterEachServiceCall` enabled: +When an agent uses tools, the `FunctionInvokingChatClient` loops multiple times (service call → tool execution → service call → …). By default, chat history is persisted after each service call via the `ChatHistoryPersistingChatClient` decorator: - A `ChatHistoryPersistingChatClient` decorator is automatically inserted into the chat client pipeline - After each service call, the decorator notifies the `ChatHistoryProvider` (and any `AIContextProvider` instances) with the new messages - Only **new** messages are sent to providers on each notification — messages that were already persisted in an earlier call within the same run are deduplicated automatically -- The end-of-run persistence in `ChatClientAgent` is skipped to avoid double-persisting -This is useful for: +To opt into end-of-run persistence instead (atomic run semantics), set `PersistChatHistoryAtEndOfRun = true` on `ChatClientAgentOptions`. In that mode, the decorator marks messages with metadata rather than persisting them immediately, and `ChatClientAgent` persists only the marked messages at the end of the run. + +Per-service-call persistence is useful for: - **Crash recovery** — if the process is interrupted mid-loop, the intermediate tool calls and results are already persisted - **Observability** — you can inspect the chat history while the agent is still running (e.g., during streaming) - **Long-running tool loops** — agents with many sequential tool calls benefit from incremental persistence diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs index eb37e853c3..6722bd8738 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs @@ -138,6 +138,9 @@ public ChatClientAgent(IChatClient chatClient, ChatClientAgentOptions? options, this._aiContextProviderStateKeys = ValidateAndCollectStateKeys(this._agentOptions?.AIContextProviders, this.ChatHistoryProvider); this._logger = (loggerFactory ?? chatClient.GetService() ?? NullLoggerFactory.Instance).CreateLogger(); + + // Warn if using a custom chat client stack with end-of-run persistence but no ChatHistoryPersistingChatClient. + this.WarnOnMissingPersistingClient(); } /// @@ -211,6 +214,10 @@ protected override async Task RunCoreAsync( ChatClientAgentContinuationToken? _) = await this.PrepareSessionAndMessagesAsync(session, inputMessages, options, cancellationToken).ConfigureAwait(false); + // Update the run context with the resolved session so any downstream classes + // always have a valid session, even when the caller passed null. + EnsureRunContextHasSession(safeSession); + var chatClient = this.ChatClient; chatClient = ApplyRunOptionsTransformations(options, chatClient); @@ -233,7 +240,8 @@ protected override async Task RunCoreAsync( // We can derive the type of supported session from whether we have a conversation id, // so let's update it and set the conversation id for the service session case. - this.UpdateSessionConversationIdAtEndOfRun(safeSession, chatResponse.ConversationId, cancellationToken); + var forceEndOfRunPersistence = chatOptions?.ContinuationToken is not null || chatOptions?.AllowBackgroundResponses is true; + this.UpdateSessionConversationIdAtEndOfRun(safeSession, chatResponse.ConversationId, cancellationToken, forceUpdate: forceEndOfRunPersistence); // Ensure that the author name is set for each message in the response. foreach (ChatMessage chatResponseMessage in chatResponse.Messages) @@ -242,7 +250,9 @@ protected override async Task RunCoreAsync( } // Notify providers of all new messages unless persistence is handled per-service-call by the decorator. - await this.NotifyProvidersOfNewMessagesAtEndOfRunAsync(safeSession, inputMessagesForChatClient, chatResponse.Messages, chatOptions, cancellationToken).ConfigureAwait(false); + // When background responses are allowed, force notification since per-service-call persistence + // is unreliable (the caller may stop consuming the stream before the decorator can persist). + await this.NotifyProvidersOfNewMessagesAtEndOfRunAsync(safeSession, inputMessagesForChatClient, chatResponse.Messages, chatOptions, cancellationToken, forceNotify: forceEndOfRunPersistence).ConfigureAwait(false); return new AgentResponse(chatResponse) { @@ -290,6 +300,10 @@ protected override async IAsyncEnumerable RunCoreStreamingA ChatClientAgentContinuationToken? continuationToken) = await this.PrepareSessionAndMessagesAsync(session, inputMessages, options, cancellationToken).ConfigureAwait(false); + // Update the run context with the resolved session so any downstream classes + // always have a valid session, even when the caller passed null. + EnsureRunContextHasSession(safeSession); + var chatClient = this.ChatClient; chatClient = ApplyRunOptionsTransformations(options, chatClient); @@ -345,6 +359,10 @@ protected override async IAsyncEnumerable RunCoreStreamingA try { + // Re-ensure the run context has the resolved session before each MoveNextAsync. + // The base class RunStreamingAsync restores the original context (potentially with + // null session) after each yield, so we must re-establish it for the decorator. + EnsureRunContextHasSession(safeSession); hasUpdates = await responseUpdatesEnumerator.MoveNextAsync().ConfigureAwait(false); } catch (Exception ex) @@ -356,12 +374,16 @@ protected override async IAsyncEnumerable RunCoreStreamingA var chatResponse = responseUpdates.ToChatResponse(); + var forceEndOfRunPersistence = continuationToken is not null || chatOptions?.AllowBackgroundResponses is true; + // We can derive the type of supported session from whether we have a conversation id, // so let's update it and set the conversation id for the service session case. - this.UpdateSessionConversationIdAtEndOfRun(safeSession, chatResponse.ConversationId, cancellationToken); + this.UpdateSessionConversationIdAtEndOfRun(safeSession, chatResponse.ConversationId, cancellationToken, forceUpdate: forceEndOfRunPersistence); // Notify providers of all new messages unless persistence is handled per-service-call by the decorator. - await this.NotifyProvidersOfNewMessagesAtEndOfRunAsync(safeSession, GetInputMessages(inputMessagesForChatClient, continuationToken), chatResponse.Messages, chatOptions, cancellationToken).ConfigureAwait(false); + // When resuming from a continuation token or using background responses, force notification + // to send the combined data (per-service-call persistence is unreliable for these scenarios). + await this.NotifyProvidersOfNewMessagesAtEndOfRunAsync(safeSession, GetInputMessages(inputMessagesForChatClient, continuationToken), chatResponse.Messages, chatOptions, cancellationToken, forceNotify: forceEndOfRunPersistence).ConfigureAwait(false); } /// @@ -679,6 +701,12 @@ private async Task throw new InvalidOperationException("A session must be provided when continuing a background response with a continuation token."); } + if ((continuationToken is not null || chatOptions?.AllowBackgroundResponses is true) && this.PersistsChatHistoryPerServiceCall && this._logger.IsEnabled(LogLevel.Warning)) + { + var warningAgentName = this.GetLoggingAgentName(); + this._logger.LogAgentChatClientBackgroundResponseFallback(this.Id, warningAgentName); + } + session ??= await this.CreateSessionAsync(cancellationToken).ConfigureAwait(false); if (session is not ChatClientAgentSession typedSession) { @@ -810,25 +838,18 @@ internal void UpdateSessionConversationId(ChatClientAgentSession session, string } } - /// - /// Notifies providers of successfully completed messages at the end of an agent run. - /// - /// - /// When is , the - /// decorator handles per-service-call notification, - /// so this end-of-run notification is skipped. - /// /// /// Updates the session conversation ID at the end of an agent run. /// /// - /// When is , the - /// decorator handles per-service-call conversation ID updates, - /// so this end-of-run update is skipped. + /// When a in persist mode handles per-service-call + /// conversation ID updates, this end-of-run update is skipped. When the decorator is in mark-only + /// mode or absent, the update is performed here. When is + /// (continuation token scenarios), the update is always performed. /// - private void UpdateSessionConversationIdAtEndOfRun(ChatClientAgentSession session, string? responseConversationId, CancellationToken cancellationToken) + private void UpdateSessionConversationIdAtEndOfRun(ChatClientAgentSession session, string? responseConversationId, CancellationToken cancellationToken, bool forceUpdate = false) { - if (this.PersistsChatHistoryPerServiceCall) + if (!forceUpdate && this.PersistsChatHistoryPerServiceCall) { return; } @@ -836,18 +857,39 @@ private void UpdateSessionConversationIdAtEndOfRun(ChatClientAgentSession sessio this.UpdateSessionConversationId(session, responseConversationId, cancellationToken); } + /// + /// Notifies providers of successfully completed messages at the end of an agent run. + /// + /// + /// When a in persist mode handles per-service-call + /// notification, this end-of-run notification is skipped. When the decorator is in mark-only mode, + /// only the marked messages are persisted. When no decorator is present (custom stack with + /// ), all messages are persisted. + /// When is (continuation token or + /// background response scenarios), notification is always performed with all messages because + /// per-service-call persistence is unreliable in these scenarios. + /// private Task NotifyProvidersOfNewMessagesAtEndOfRunAsync( ChatClientAgentSession session, IEnumerable requestMessages, IEnumerable responseMessages, ChatOptions? chatOptions, - CancellationToken cancellationToken) + CancellationToken cancellationToken, + bool forceNotify = false) { - if (this.PersistsChatHistoryPerServiceCall) + if (!forceNotify && this.PersistsChatHistoryPerServiceCall) { return Task.CompletedTask; } + if (!forceNotify && this.HasMarkOnlyChatHistoryPersistingClient) + { + // In mark-only mode, persist only messages that were marked by the decorator. + var markedRequestMessages = GetMarkedMessages(requestMessages); + var markedResponseMessages = GetMarkedMessages(responseMessages); + return this.NotifyProvidersOfNewMessagesAsync(session, markedRequestMessages, markedResponseMessages, chatOptions, cancellationToken); + } + return this.NotifyProvidersOfNewMessagesAsync(session, requestMessages, responseMessages, chatOptions, cancellationToken); } @@ -855,9 +897,9 @@ private Task NotifyProvidersOfNewMessagesAtEndOfRunAsync( /// Notifies providers of a failure at the end of an agent run. /// /// - /// When is , the - /// decorator handles per-service-call notification, - /// so this end-of-run notification is skipped. + /// When a in persist mode handles per-service-call + /// notification (including failure), this end-of-run notification is skipped to avoid + /// duplicate notification. In all other cases, failure is reported at the end of the run. /// private Task NotifyProvidersOfFailureAtEndOfRunAsync( ChatClientAgentSession session, @@ -875,11 +917,84 @@ private Task NotifyProvidersOfFailureAtEndOfRunAsync( } /// - /// Gets a value indicating whether the agent is configured to persist chat history after each individual service call - /// via a decorator. + /// Gets a value indicating whether the agent has a + /// decorator in persist mode (not mark-only), which handles per-service-call persistence. + /// + private bool PersistsChatHistoryPerServiceCall + { + get + { + var persistingClient = this.ChatClient.GetService(); + return persistingClient?.MarkOnly == false; + } + } + + /// + /// Gets a value indicating whether the agent has a + /// decorator in mark-only mode, which marks messages for later persistence at the end of the run. /// - private bool PersistsChatHistoryPerServiceCall => - this._agentOptions?.PersistChatHistoryAfterEachServiceCall is true && this._agentOptions?.UseProvidedChatClientAsIs is not true; + private bool HasMarkOnlyChatHistoryPersistingClient + { + get + { + var persistingClient = this.ChatClient.GetService(); + return persistingClient?.MarkOnly == true; + } + } + + /// + /// Returns only the messages that have been marked as persisted by a in mark-only mode. + /// + private static List GetMarkedMessages(IEnumerable messages) + { + return messages.Where(m => + m.AdditionalProperties?.TryGetValue(ChatHistoryPersistingChatClient.PersistedMarkerKey, out var value) == true && value is true).ToList(); + } + + /// + /// Ensures that contains the resolved session. + /// + /// + /// The base class sets with the raw session parameter + /// (which may be null) and restores it after each yield in streaming scenarios. After + /// resolves or creates a session, we update the + /// context so the decorator always has a valid session. + /// The original agent from the context is preserved to maintain the top-of-stack agent in + /// decorated agent scenarios. + /// + private static void EnsureRunContextHasSession(ChatClientAgentSession safeSession) + { + var context = CurrentRunContext; + if (context is not null && context.Session != safeSession) + { + CurrentRunContext = new(context.Agent, safeSession, context.RequestMessages, context.RunOptions); + } + } + + /// + /// Checks for potential misconfiguration when using a custom chat client stack and logs warnings. + /// + private void WarnOnMissingPersistingClient() + { + if (this._agentOptions?.UseProvidedChatClientAsIs is not true) + { + return; + } + + if (this._agentOptions?.PersistChatHistoryAtEndOfRun is not true) + { + return; + } + + var persistingClient = this.ChatClient.GetService(); + if (persistingClient is null && this._logger.IsEnabled(LogLevel.Warning)) + { + var loggingAgentName = this.GetLoggingAgentName(); + this._logger.LogAgentChatClientMissingPersistingClient( + this.Id, + loggingAgentName); + } + } private ChatHistoryProvider? ResolveChatHistoryProvider(ChatOptions? chatOptions, ChatClientAgentSession session) { diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentLogMessages.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentLogMessages.cs index 98ff4583dc..2a324522a4 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentLogMessages.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentLogMessages.cs @@ -69,4 +69,32 @@ public static partial void LogAgentChatClientHistoryProviderConflict( string chatHistoryProviderName, string agentId, string agentName); + + /// + /// Logs a warning when is + /// and is , + /// but no is found in the custom chat client stack. + /// + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Agent {AgentId}/{AgentName}: PersistChatHistoryAtEndOfRun is enabled with a custom chat client stack (UseProvidedChatClientAsIs), but no ChatHistoryPersistingChatClient was found in the pipeline. All messages will be persisted at the end of the run without marking. This setup is not supported with some other features, e.g. handoffs. Consider adding a ChatHistoryPersistingChatClient to the pipeline using the UseChatHistoryPersisting extension method.")] + public static partial void LogAgentChatClientMissingPersistingClient( + this ILogger logger, + string agentId, + string agentName); + + /// + /// Logs a warning when per-service-call persistence falls back to end-of-run persistence + /// because the run involves background responses (continuation token resumption or + /// AllowBackgroundResponses). Per-service-call persistence is + /// unreliable in these scenarios because the caller may stop consuming the stream before + /// the decorator's post-stream persistence code can execute. + /// + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Agent {AgentId}/{AgentName}: Per-service-call persistence is falling back to end-of-run persistence because the run involves background responses. Messages will be marked during the run and persisted at the end.")] + public static partial void LogAgentChatClientBackgroundResponseFallback( + this ILogger logger, + string agentId, + string agentName); } diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs index d4e5fe58c0..8df9112446 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs @@ -92,49 +92,54 @@ public sealed class ChatClientAgentOptions public bool ThrowOnChatHistoryProviderConflict { get; set; } = true; /// - /// Gets or sets a value indicating whether to persist chat history after each individual service call - /// rather than only at the end of the full agent run. + /// Gets or sets a value indicating whether to persist chat history only at the end of the full agent run + /// rather than after each individual service call. /// /// /// - /// By default, persists request and response messages via the - /// or persists the latest service provided - /// conversation id only after the full run completes, which may include multiple - /// iterations of the function invocation loop. Setting this property to causes - /// messages or conversation ids to be persisted after each individual call to the underlying AI service, so that intermediate - /// progress (e.g., tool calls and results) is saved even if the process is interrupted mid-loop. + /// By default, persists request and response messages either via + /// a , or the underlying AI service's chat history storage. + /// Persistence is done immediately after each call to the AI service within the function invocation loop. + /// When storing in the underlying AI service, the session's + /// is also updated after each service call, keeping it in sync with the service-side conversation state. /// /// - /// Note that when using an AI service with built in chat history storage, which uses a single threaded conversation model (e.g. OpenAI Responses with the Conversations API) - /// setting this setting to will have no effect. This type of service updates the single conversation with each service call, - /// and there is no way to revert to a previous state. + /// Setting this property to causes messages to be marked during the function + /// invocation loop but persisted only at the end of the full agent run, providing atomic run semantics. + /// Updating the is likewise deferred and + /// updated only at the end of the run, consistent with atomic run semantics. + /// A decorator is inserted into the chat client pipeline + /// in mark-only mode, and the persists only the marked messages at the + /// end of the run. /// /// - /// On the other hand, when using an AI service with built in chat history storage, which supports forking, (e.g. OpenAI Responses with Response Ids) - /// setting this setting to will mean that the will only persist the last returned response id at - /// the end of the run, whereas setting this setting to will mean that the will persist each returned - /// response id after each service call. This means that the last successful response id will always be available in the . + /// When this option is (the default), the + /// decorator persists messages and updates the + /// immediately after each service call. This may leave chat history in a state where + /// is required to start a new run if the last successful service + /// call returned . /// /// - /// It's important to note that enabling this setting may leave your chat history in a state where is required to start a new run. - /// If the last successful service call returned it is not possible to continue the session until a - /// is provided as input for a subsequent run. - /// - /// - /// When this option is enabled, a decorator is automatically - /// inserted into the chat client pipeline between the and the - /// leaf , and the will not perform its own - /// end-of-run chat history persistence to avoid double-persisting messages. + /// This option has no effect when is . + /// When using a custom chat client stack, you can add a + /// manually via the + /// extension method. /// /// - /// This option has no effect when is . + /// Note that when using single threaded service stored chat history, like OpenAI Conversations, + /// there is only one id, so even if the conversation id is not updated after each service call, + /// the chat history will still contain intermediate messages. Setting this property to + /// in this case will therefore have no real effect. Setting this property to when using + /// OpenAI Responses with response ids on the other hand, allows atomic run semantics, since + /// each service request produces a new response id, and if the run fails mid-loop, the session will + /// still contain the pre-run respnose id, allowing the next run to start with a clean slate. /// /// /// /// Default is . /// [Experimental(DiagnosticIds.Experiments.AgentsAIExperiments)] - public bool PersistChatHistoryAfterEachServiceCall { get; set; } + public bool PersistChatHistoryAtEndOfRun { get; set; } /// /// Creates a new instance of with the same values as this instance. @@ -152,6 +157,6 @@ public ChatClientAgentOptions Clone() ClearOnChatHistoryProviderConflict = this.ClearOnChatHistoryProviderConflict, WarnOnChatHistoryProviderConflict = this.WarnOnChatHistoryProviderConflict, ThrowOnChatHistoryProviderConflict = this.ThrowOnChatHistoryProviderConflict, - PersistChatHistoryAfterEachServiceCall = this.PersistChatHistoryAfterEachServiceCall, + PersistChatHistoryAtEndOfRun = this.PersistChatHistoryAtEndOfRun, }; } diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientBuilderExtensions.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientBuilderExtensions.cs index ee782dce52..a1e8b5f8a5 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientBuilderExtensions.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientBuilderExtensions.cs @@ -2,8 +2,10 @@ using System; using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using Microsoft.Agents.AI; using Microsoft.Extensions.Logging; +using Microsoft.Shared.DiagnosticIds; using Microsoft.Shared.Diagnostics; namespace Microsoft.Extensions.AI; @@ -82,4 +84,46 @@ public static ChatClientAgent BuildAIAgent( options: options, loggerFactory: loggerFactory, services: services); + + /// + /// Adds a to the chat client pipeline. + /// + /// + /// + /// This decorator should be positioned between the and the leaf + /// in the pipeline. It intercepts service calls to either persist messages + /// immediately or mark them for later persistence, depending on the parameter. + /// + /// + /// If is set to , the + /// should be configured with set to + /// as without this combination, messages will never be persisted when using a for + /// chat history persistence. + /// + /// + /// This extension method is intended for use with custom chat client stacks when + /// is . + /// When is (the default), + /// the automatically injects this decorator. + /// + /// + /// This decorator only works within the context of a running and will throw an + /// exception if used in any other stack. + /// + /// + /// The to add the decorator to. + /// + /// When , messages are marked with metadata but not persisted immediately, + /// and the session's is not updated. + /// The will persist only the marked messages and update the + /// conversation ID at the end of the run. + /// When (the default), messages are persisted and the conversation ID + /// is updated immediately after each service call. + /// + /// The for chaining. + [Experimental(DiagnosticIds.Experiments.AgentsAIExperiments)] + public static ChatClientBuilder UseChatHistoryPersisting(this ChatClientBuilder builder, bool markOnly = false) + { + return builder.Use(innerClient => new ChatHistoryPersistingChatClient(innerClient, markOnly)); + } } diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientExtensions.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientExtensions.cs index 03c283f74e..fffac628a6 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientExtensions.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientExtensions.cs @@ -68,11 +68,9 @@ internal static IChatClient WithDefaultAgentMiddleware(this IChatClient chatClie // making the first Use() call outermost. By adding our decorator second, the resulting pipeline is: // FunctionInvokingChatClient → ChatHistoryPersistingChatClient → leaf IChatClient // This allows the decorator to persist messages after each individual service call within - // FIC's function invocation loop. - if (options?.PersistChatHistoryAfterEachServiceCall is true) - { - chatBuilder.Use(innerClient => new ChatHistoryPersistingChatClient(innerClient)); - } + // FIC's function invocation loop, or to mark them for later persistence at the end of the run. + bool markOnly = options?.PersistChatHistoryAtEndOfRun is true; + chatBuilder.Use(innerClient => new ChatHistoryPersistingChatClient(innerClient, markOnly)); var agentChatClient = chatBuilder.Build(services); diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs index 0bff151710..0085afbdd5 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs @@ -12,22 +12,34 @@ namespace Microsoft.Agents.AI; /// /// A delegating chat client that notifies and -/// instances of request and response messages after each individual call to the inner chat client. +/// instances of request and response messages after each individual call to the inner chat client, +/// or marks messages for later persistence depending on the configured mode. /// /// /// /// This decorator is intended to operate between the and the leaf -/// in a pipeline. It ensures that providers are notified -/// after each service call rather than only at the end of the full agent run, so that intermediate messages -/// (e.g., tool calls and results) are saved even if the process is interrupted mid-loop. +/// in a pipeline. +/// +/// +/// In persist mode (the default), it ensures that providers are notified and the session's +/// is updated after each service call, so that +/// intermediate messages (e.g., tool calls and results) are saved even if the process is interrupted +/// mid-loop. +/// +/// +/// In mark-only mode ( is ), it marks messages with metadata +/// but does not notify providers or update the . +/// Both are deferred to the at the end of the run, providing atomic +/// run semantics. /// /// /// This chat client must be used within the context of a running . It retrieves the /// current agent and session from , which is set automatically when an agent's /// or /// -/// method is called. An is thrown if no run context is available or if the -/// agent is not a . +/// method is called. The ensures the run context always contains a resolved session, +/// even when the caller passes null. An is thrown if no run context is +/// available or if the agent is not a . /// /// internal sealed class ChatHistoryPersistingChatClient : DelegatingChatClient @@ -42,11 +54,32 @@ internal sealed class ChatHistoryPersistingChatClient : DelegatingChatClient /// Initializes a new instance of the class. /// /// The underlying chat client that will handle the core operations. - public ChatHistoryPersistingChatClient(IChatClient innerClient) + /// + /// When , messages are marked with metadata but not persisted immediately, + /// and the session's is not updated. + /// The will persist only the marked messages and update the + /// conversation ID at the end of the run. + /// When (the default), messages are persisted and the conversation ID + /// is updated immediately after each service call. + /// + public ChatHistoryPersistingChatClient(IChatClient innerClient, bool markOnly = false) : base(innerClient) { + this.MarkOnly = markOnly; } + /// + /// Gets a value indicating whether this decorator is in mark-only mode. + /// + /// + /// When , messages are marked with metadata but not persisted immediately, + /// and the session's is not updated. + /// Both are deferred to the at the end of the run. + /// When , messages are persisted and the conversation ID is updated + /// after each service call. + /// + public bool MarkOnly { get; } + /// public override async Task GetResponseAsync( IEnumerable messages, @@ -68,10 +101,24 @@ public override async Task GetResponseAsync( } var newRequestMessages = GetNewRequestMessages(messages); - agent.UpdateSessionConversationId(session, response.ConversationId, cancellationToken); - await agent.NotifyProvidersOfNewMessagesAsync(session, newRequestMessages, response.Messages, options, cancellationToken).ConfigureAwait(false); - MarkAsPersisted(newRequestMessages); - MarkAsPersisted(response.Messages); + + if (this.ShouldDeferPersistence(options)) + { + // In mark-only mode or when resuming from a continuation token, just mark messages + // for later persistence by ChatClientAgent. Conversation ID and provider notification + // are deferred to end-of-run. For continuation tokens, the end-of-run handler needs + // to send the combined data from both the previous and current runs. + MarkAsPersisted(newRequestMessages); + MarkAsPersisted(response.Messages); + } + else + { + // In persist mode, persist immediately and update conversation ID. + agent.UpdateSessionConversationId(session, response.ConversationId, cancellationToken); + await agent.NotifyProvidersOfNewMessagesAsync(session, newRequestMessages, response.Messages, options, cancellationToken).ConfigureAwait(false); + MarkAsPersisted(newRequestMessages); + MarkAsPersisted(response.Messages); + } return response; } @@ -130,10 +177,24 @@ public override async IAsyncEnumerable GetStreamingResponseA var chatResponse = responseUpdates.ToChatResponse(); var newRequestMessages = GetNewRequestMessages(messages); - agent.UpdateSessionConversationId(session, chatResponse.ConversationId, cancellationToken); - await agent.NotifyProvidersOfNewMessagesAsync(session, newRequestMessages, chatResponse.Messages, options, cancellationToken).ConfigureAwait(false); - MarkAsPersisted(newRequestMessages); - MarkAsPersisted(chatResponse.Messages); + + if (this.ShouldDeferPersistence(options)) + { + // In mark-only mode or when resuming from a continuation token, just mark messages + // for later persistence by ChatClientAgent. Conversation ID and provider notification + // are deferred to end-of-run. For continuation tokens, the end-of-run handler needs + // to send the combined data from both the previous and current runs. + MarkAsPersisted(newRequestMessages); + MarkAsPersisted(chatResponse.Messages); + } + else + { + // In persist mode, persist immediately and update conversation ID. + agent.UpdateSessionConversationId(session, chatResponse.ConversationId, cancellationToken); + await agent.NotifyProvidersOfNewMessagesAsync(session, newRequestMessages, chatResponse.Messages, options, cancellationToken).ConfigureAwait(false); + MarkAsPersisted(newRequestMessages); + MarkAsPersisted(chatResponse.Messages); + } } /// @@ -146,12 +207,10 @@ private static (ChatClientAgent Agent, ChatClientAgentSession Session) GetRequir $"{nameof(ChatHistoryPersistingChatClient)} can only be used within the context of a running AIAgent. " + "Ensure that the chat client is being invoked as part of an AIAgent.RunAsync or AIAgent.RunStreamingAsync call."); - if (runContext.Agent is not ChatClientAgent chatClientAgent) - { - throw new InvalidOperationException( + var chatClientAgent = runContext.Agent.GetService() + ?? throw new InvalidOperationException( $"{nameof(ChatHistoryPersistingChatClient)} can only be used with a {nameof(ChatClientAgent)}. " + $"The current agent is of type '{runContext.Agent.GetType().Name}'."); - } if (runContext.Session is not ChatClientAgentSession chatClientAgentSession) { @@ -163,6 +222,20 @@ private static (ChatClientAgent Agent, ChatClientAgentSession Session) GetRequir return (chatClientAgent, chatClientAgentSession); } + /// + /// Determines whether persistence should be deferred to end-of-run instead of happening immediately. + /// + /// + /// when in mode, when the call is resuming from + /// a continuation token (since the end-of-run handler needs to combine data from the previous + /// and current runs), or when background responses are allowed (since the caller may stop + /// consuming the stream mid-run, preventing the post-stream persistence code from executing). + /// + private bool ShouldDeferPersistence(ChatOptions? options) + { + return this.MarkOnly || options?.ContinuationToken is not null || options?.AllowBackgroundResponses is true; + } + /// /// Returns only the request messages that have not yet been persisted to chat history. /// diff --git a/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs b/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs index 32bd718a1c..459859224f 100644 --- a/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs @@ -15,17 +15,17 @@ namespace Microsoft.Agents.AI.UnitTests; /// /// Contains unit tests for the decorator, /// verifying that it persists messages via the after each -/// individual service call when the -/// option is enabled. +/// individual service call by default, or marks messages for end-of-run persistence when the +/// option is enabled. /// public class ChatHistoryPersistingChatClientTests { /// - /// Verifies that when PersistChatHistoryAfterEachServiceCall is enabled, + /// Verifies that by default (PersistChatHistoryAtEndOfRun is false), /// the ChatHistoryProvider receives messages after a successful non-streaming call. /// [Fact] - public async Task RunAsync_PersistsMessagesPerServiceCall_WhenOptionEnabledAsync() + public async Task RunAsync_PersistsMessagesPerServiceCall_ByDefaultAsync() { // Arrange Mock mockService = new(); @@ -50,7 +50,7 @@ public async Task RunAsync_PersistsMessagesPerServiceCall_WhenOptionEnabledAsync ChatClientAgent agent = new(mockService.Object, options: new() { ChatHistoryProvider = mockChatHistoryProvider.Object, - PersistChatHistoryAfterEachServiceCall = true, + PersistChatHistoryAtEndOfRun = false, }); // Act @@ -68,11 +68,11 @@ public async Task RunAsync_PersistsMessagesPerServiceCall_WhenOptionEnabledAsync } /// - /// Verifies that when PersistChatHistoryAfterEachServiceCall is disabled (default), - /// the ChatHistoryProvider still receives messages at end-of-run as before. + /// Verifies that when per-service-call persistence is active (default), + /// the ChatHistoryProvider receives messages at the end of the run. /// [Fact] - public async Task RunAsync_PersistsMessagesAtEndOfRun_WhenOptionDisabledAsync() + public async Task RunAsync_PersistsMessagesAtEndOfRun_WhenOptionEnabledAsync() { // Arrange Mock mockService = new(); @@ -97,7 +97,7 @@ public async Task RunAsync_PersistsMessagesAtEndOfRun_WhenOptionDisabledAsync() ChatClientAgent agent = new(mockService.Object, options: new() { ChatHistoryProvider = mockChatHistoryProvider.Object, - PersistChatHistoryAfterEachServiceCall = false, + PersistChatHistoryAtEndOfRun = true, }); // Act @@ -115,11 +115,11 @@ public async Task RunAsync_PersistsMessagesAtEndOfRun_WhenOptionDisabledAsync() } /// - /// Verifies that when PersistChatHistoryAfterEachServiceCall is enabled and the service call fails, + /// Verifies that when per-service-call persistence is active (default) and the service call fails, /// the ChatHistoryProvider is notified with the exception. /// [Fact] - public async Task RunAsync_NotifiesProviderOfFailure_WhenOptionEnabledAndServiceFailsAsync() + public async Task RunAsync_NotifiesProviderOfFailure_WhenPerServiceCallPersistenceActiveAsync() { // Arrange var expectedException = new InvalidOperationException("Service failed"); @@ -145,7 +145,7 @@ public async Task RunAsync_NotifiesProviderOfFailure_WhenOptionEnabledAndService ChatClientAgent agent = new(mockService.Object, options: new() { ChatHistoryProvider = mockChatHistoryProvider.Object, - PersistChatHistoryAfterEachServiceCall = true, + PersistChatHistoryAtEndOfRun = false, }); // Act @@ -163,31 +163,29 @@ public async Task RunAsync_NotifiesProviderOfFailure_WhenOptionEnabledAndService } /// - /// Verifies that the decorator is injected into the pipeline when the option is set + /// Verifies that the decorator is injected in persist mode by default /// and can be discovered via GetService. /// [Fact] - public void ChatClient_ContainsDecorator_WhenOptionEnabled() + public void ChatClient_ContainsDecorator_InPersistMode_ByDefault() { // Arrange Mock mockService = new(); // Act - ChatClientAgent agent = new(mockService.Object, options: new() - { - PersistChatHistoryAfterEachServiceCall = true, - }); + ChatClientAgent agent = new(mockService.Object, options: new()); // Assert var decorator = agent.ChatClient.GetService(); Assert.NotNull(decorator); + Assert.False(decorator.MarkOnly); } /// - /// Verifies that the decorator is NOT injected into the pipeline when the option is not set. + /// Verifies that the decorator is injected in mark-only mode when PersistChatHistoryAtEndOfRun is true. /// [Fact] - public void ChatClient_DoesNotContainDecorator_WhenOptionDisabled() + public void ChatClient_ContainsDecorator_InMarkOnlyMode_WhenPersistAtEndOfRun() { // Arrange Mock mockService = new(); @@ -195,17 +193,17 @@ public void ChatClient_DoesNotContainDecorator_WhenOptionDisabled() // Act ChatClientAgent agent = new(mockService.Object, options: new() { - PersistChatHistoryAfterEachServiceCall = false, + PersistChatHistoryAtEndOfRun = true, }); // Assert var decorator = agent.ChatClient.GetService(); - Assert.Null(decorator); + Assert.NotNull(decorator); + Assert.True(decorator.MarkOnly); } /// - /// Verifies that the decorator is NOT injected when UseProvidedChatClientAsIs is true, - /// even if PersistChatHistoryAfterEachServiceCall is also true. + /// Verifies that the decorator is NOT injected when UseProvidedChatClientAsIs is true. /// [Fact] public void ChatClient_DoesNotContainDecorator_WhenUseProvidedChatClientAsIs() @@ -216,7 +214,6 @@ public void ChatClient_DoesNotContainDecorator_WhenUseProvidedChatClientAsIs() // Act ChatClientAgent agent = new(mockService.Object, options: new() { - PersistChatHistoryAfterEachServiceCall = true, UseProvidedChatClientAsIs = true, }); @@ -226,26 +223,26 @@ public void ChatClient_DoesNotContainDecorator_WhenUseProvidedChatClientAsIs() } /// - /// Verifies that the PersistChatHistoryAfterEachServiceCall option is included in Clone(). + /// Verifies that the PersistChatHistoryAtEndOfRun option is included in Clone(). /// [Fact] - public void ChatClientAgentOptions_Clone_IncludesPersistChatHistoryAfterEachServiceCall() + public void ChatClientAgentOptions_Clone_IncludesPersistChatHistoryAtEndOfRun() { // Arrange var options = new ChatClientAgentOptions { - PersistChatHistoryAfterEachServiceCall = true, + PersistChatHistoryAtEndOfRun = true, }; // Act var cloned = options.Clone(); // Assert - Assert.True(cloned.PersistChatHistoryAfterEachServiceCall); + Assert.True(cloned.PersistChatHistoryAtEndOfRun); } /// - /// Verifies that when PersistChatHistoryAfterEachServiceCall is enabled and the service call + /// Verifies that when per-service-call persistence is active (default) and the service call /// involves a function invocation loop, the ChatHistoryProvider is called after each individual /// service call (not just once at the end). /// @@ -295,7 +292,7 @@ public async Task RunAsync_PersistsPerServiceCall_DuringFunctionInvocationLoopAs { ChatOptions = new() { Tools = [tool] }, ChatHistoryProvider = mockChatHistoryProvider.Object, - PersistChatHistoryAfterEachServiceCall = true, + PersistChatHistoryAtEndOfRun = false, }, services: new ServiceCollection().BuildServiceProvider()); // Act @@ -332,11 +329,11 @@ public async Task RunAsync_PersistsPerServiceCall_DuringFunctionInvocationLoopAs } /// - /// Verifies that when PersistChatHistoryAfterEachServiceCall is enabled with streaming, + /// Verifies that when per-service-call persistence is active (default) with streaming, /// the ChatHistoryProvider receives messages after the stream completes. /// [Fact] - public async Task RunStreamingAsync_PersistsMessagesPerServiceCall_WhenOptionEnabledAsync() + public async Task RunStreamingAsync_PersistsMessagesPerServiceCall_ByDefaultAsync() { // Arrange Mock mockService = new(); @@ -364,7 +361,7 @@ public async Task RunStreamingAsync_PersistsMessagesPerServiceCall_WhenOptionEna ChatClientAgent agent = new(mockService.Object, options: new() { ChatHistoryProvider = mockChatHistoryProvider.Object, - PersistChatHistoryAfterEachServiceCall = true, + PersistChatHistoryAtEndOfRun = false, }); // Act @@ -385,11 +382,11 @@ public async Task RunStreamingAsync_PersistsMessagesPerServiceCall_WhenOptionEna } /// - /// Verifies that when PersistChatHistoryAfterEachServiceCall is enabled, + /// Verifies that when per-service-call persistence is active (default), /// AIContextProviders are also notified of new messages after a successful call. /// [Fact] - public async Task RunAsync_NotifiesAIContextProviders_WhenOptionEnabledAsync() + public async Task RunAsync_NotifiesAIContextProviders_ByDefaultAsync() { // Arrange Mock mockService = new(); @@ -413,7 +410,7 @@ public async Task RunAsync_NotifiesAIContextProviders_WhenOptionEnabledAsync() ChatClientAgent agent = new(mockService.Object, options: new() { AIContextProviders = [mockContextProvider.Object], - PersistChatHistoryAfterEachServiceCall = true, + PersistChatHistoryAtEndOfRun = false, }); // Act @@ -431,11 +428,11 @@ public async Task RunAsync_NotifiesAIContextProviders_WhenOptionEnabledAsync() } /// - /// Verifies that when PersistChatHistoryAfterEachServiceCall is enabled and the service fails, + /// Verifies that when per-service-call persistence is active (default) and the service fails, /// AIContextProviders are notified of the failure. /// [Fact] - public async Task RunAsync_NotifiesAIContextProvidersOfFailure_WhenOptionEnabledAsync() + public async Task RunAsync_NotifiesAIContextProvidersOfFailure_ByDefaultAsync() { // Arrange var expectedException = new InvalidOperationException("Service failed"); @@ -460,7 +457,7 @@ public async Task RunAsync_NotifiesAIContextProvidersOfFailure_WhenOptionEnabled ChatClientAgent agent = new(mockService.Object, options: new() { AIContextProviders = [mockContextProvider.Object], - PersistChatHistoryAfterEachServiceCall = true, + PersistChatHistoryAtEndOfRun = false, }); // Act @@ -478,11 +475,11 @@ public async Task RunAsync_NotifiesAIContextProvidersOfFailure_WhenOptionEnabled } /// - /// Verifies that when PersistChatHistoryAfterEachServiceCall is enabled, + /// Verifies that when per-service-call persistence is active (default), /// both ChatHistoryProvider and AIContextProviders are notified together. /// [Fact] - public async Task RunAsync_NotifiesBothProviders_WhenOptionEnabledAsync() + public async Task RunAsync_NotifiesBothProviders_ByDefaultAsync() { // Arrange Mock mockService = new(); @@ -519,7 +516,7 @@ public async Task RunAsync_NotifiesBothProviders_WhenOptionEnabledAsync() { ChatHistoryProvider = mockChatHistoryProvider.Object, AIContextProviders = [mockContextProvider.Object], - PersistChatHistoryAfterEachServiceCall = true, + PersistChatHistoryAtEndOfRun = false, }); // Act @@ -593,7 +590,7 @@ public async Task RunAsync_DoesNotReNotifyResponseMessagesAsRequestMessages_Duri { ChatOptions = new() { Tools = [tool] }, ChatHistoryProvider = mockChatHistoryProvider.Object, - PersistChatHistoryAfterEachServiceCall = true, + PersistChatHistoryAtEndOfRun = false, }, services: new ServiceCollection().BuildServiceProvider()); // Act @@ -658,7 +655,7 @@ public async Task RunAsync_DeduplicatesRequestMessages_OnFailureDuringFicLoopAsy { ChatOptions = new() { Tools = [tool] }, ChatHistoryProvider = mockChatHistoryProvider.Object, - PersistChatHistoryAfterEachServiceCall = true, + PersistChatHistoryAtEndOfRun = false, }, services: new ServiceCollection().BuildServiceProvider()); // Act @@ -709,7 +706,7 @@ public async Task RunAsync_MarksNotifiedMessages_WithPersistedMarkerAsync() ChatClientAgent agent = new(mockService.Object, options: new() { ChatHistoryProvider = mockChatHistoryProvider.Object, - PersistChatHistoryAfterEachServiceCall = true, + PersistChatHistoryAtEndOfRun = false, }); // Act @@ -746,7 +743,7 @@ public async Task RunAsync_UpdatesSessionConversationId_WhenPerServiceCallPersis ChatClientAgent agent = new(mockService.Object, options: new() { - PersistChatHistoryAfterEachServiceCall = true, + PersistChatHistoryAtEndOfRun = false, }); // Act