diff --git a/dotnet/agent-framework-dotnet.slnx b/dotnet/agent-framework-dotnet.slnx index af7ca9f0be..6e98d5f95c 100644 --- a/dotnet/agent-framework-dotnet.slnx +++ b/dotnet/agent-framework-dotnet.slnx @@ -57,6 +57,7 @@ + diff --git a/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Agent_Step19_InFunctionLoopCheckpointing.csproj b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Agent_Step19_InFunctionLoopCheckpointing.csproj new file mode 100644 index 0000000000..41aafe3437 --- /dev/null +++ b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Agent_Step19_InFunctionLoopCheckpointing.csproj @@ -0,0 +1,20 @@ + + + + Exe + net10.0 + + enable + enable + + + + + + + + + + + + diff --git a/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs new file mode 100644 index 0000000000..07382e6417 --- /dev/null +++ b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs @@ -0,0 +1,226 @@ +// Copyright (c) Microsoft. All rights reserved. + +// This sample demonstrates how the ChatClientAgent persists chat history after each individual +// call to the AI service. +// When an agent uses tools, FunctionInvokingChatClient may loop multiple times +// (service call → tool execution → service call), and intermediate messages (tool calls and +// results) are persisted after each service call. This allows you to inspect or recover them +// even if the process is interrupted mid-loop, but may also result in chat history that is not +// yet finalized (e.g., tool calls without results) being persisted, which may be undesirable in some cases. +// +// To opt into end-of-run persistence instead (atomic run semantics), set +// PersistChatHistoryAtEndOfRun = true on ChatClientAgentOptions. +// +// The sample runs two multi-turn conversations: one using non-streaming (RunAsync) and one +// using streaming (RunStreamingAsync), to demonstrate correct behavior in both modes. + +using System.ComponentModel; +using Azure.AI.OpenAI; +using Azure.Identity; +using Microsoft.Agents.AI; +using Microsoft.Extensions.AI; +using OpenAI.Responses; + +var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set."); +var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini"; +var store = Environment.GetEnvironmentVariable("AZURE_OPENAI_RESPONSES_STORE") ?? "false"; + +// WARNING: DefaultAzureCredential is convenient for development but requires careful consideration in production. +// In production, consider using a specific credential (e.g., ManagedIdentityCredential) to avoid +// latency issues, unintended credential probing, and potential security risks from fallback mechanisms. +AzureOpenAIClient openAIClient = new(new Uri(endpoint), new DefaultAzureCredential()); + +// Define multiple tools so the model makes several tool calls in a single run. +[Description("Get the current weather for a city.")] +static string GetWeather([Description("The city name.")] string city) => + city.ToUpperInvariant() switch + { + "SEATTLE" => "Seattle: 55°F, cloudy with light rain.", + "NEW YORK" => "New York: 72°F, sunny and warm.", + "LONDON" => "London: 48°F, overcast with fog.", + "DUBLIN" => "Dublin: 43°F, overcast with fog.", + _ => $"{city}: weather data not available." + }; + +[Description("Get the current time in a city.")] +static string GetTime([Description("The city name.")] string city) => + city.ToUpperInvariant() switch + { + "SEATTLE" => "Seattle: 9:00 AM PST", + "NEW YORK" => "New York: 12:00 PM EST", + "LONDON" => "London: 5:00 PM GMT", + "DUBLIN" => "Dublin: 5:00 PM GMT", + _ => $"{city}: time data not available." + }; + +// Create the agent — per-service-call persistence is the default behavior. +// The in-memory ChatHistoryProvider is used by default when the service does not require service stored chat +// history, so for those cases, we can inspect the chat history via session.TryGetInMemoryChatHistory(). +IChatClient chatClient = string.Equals(store, "TRUE", StringComparison.OrdinalIgnoreCase) ? + openAIClient.GetResponsesClient().AsIChatClient(deploymentName) : + openAIClient.GetResponsesClient().AsIChatClientWithStoredOutputDisabled(deploymentName); +AIAgent agent = chatClient.AsAIAgent( + new ChatClientAgentOptions + { + Name = "WeatherAssistant", + ChatOptions = new() + { + Instructions = "You are a helpful assistant. When asked about multiple cities, call the appropriate tool for each city.", + Tools = [AIFunctionFactory.Create(GetWeather), AIFunctionFactory.Create(GetTime)] + }, + }); + +await RunNonStreamingAsync(); +await RunStreamingAsync(); + +async Task RunNonStreamingAsync() +{ + int lastChatHistorySize = 0; + string lastConversationId = string.Empty; + + Console.ForegroundColor = ConsoleColor.Yellow; + Console.WriteLine("\n=== Non-Streaming Mode ==="); + Console.ResetColor(); + + AgentSession session = await agent.CreateSessionAsync(); + + // First turn — ask about multiple cities so the model calls tools. + const string Prompt = "What's the weather and time in Seattle, New York, and London?"; + PrintUserMessage(Prompt); + + var response = await agent.RunAsync(Prompt, session); + PrintAgentResponse(response.Text); + PrintChatHistory(session, "After run", ref lastChatHistorySize, ref lastConversationId); + + // Second turn — follow-up to verify chat history is correct. + const string FollowUp1 = "And Dublin?"; + PrintUserMessage(FollowUp1); + + response = await agent.RunAsync(FollowUp1, session); + PrintAgentResponse(response.Text); + PrintChatHistory(session, "After second run", ref lastChatHistorySize, ref lastConversationId); + + // Third turn — follow-up to verify chat history is correct. + const string FollowUp2 = "Which city is the warmest?"; + PrintUserMessage(FollowUp2); + + response = await agent.RunAsync(FollowUp2, session); + PrintAgentResponse(response.Text); + PrintChatHistory(session, "After third run", ref lastChatHistorySize, ref lastConversationId); +} + +async Task RunStreamingAsync() +{ + int lastChatHistorySize = 0; + string lastConversationId = string.Empty; + + Console.ForegroundColor = ConsoleColor.Yellow; + Console.WriteLine("\n=== Streaming Mode ==="); + Console.ResetColor(); + + AgentSession session = await agent.CreateSessionAsync(); + + // First turn — ask about multiple cities so the model calls tools. + const string Prompt = "What's the weather and time in Seattle, New York, and London?"; + PrintUserMessage(Prompt); + + Console.ForegroundColor = ConsoleColor.Cyan; + Console.Write("\n[Agent] "); + Console.ResetColor(); + + await foreach (var update in agent.RunStreamingAsync(Prompt, session)) + { + Console.Write(update); + + // During streaming we should be able to see updates to the chat history + // before the full run completes, as each service call is made and persisted. + PrintChatHistory(session, "During run", ref lastChatHistorySize, ref lastConversationId); + } + + Console.WriteLine(); + PrintChatHistory(session, "After run", ref lastChatHistorySize, ref lastConversationId); + + // Second turn — follow-up to verify chat history is correct. + const string FollowUp1 = "And Dublin?"; + PrintUserMessage(FollowUp1); + + Console.ForegroundColor = ConsoleColor.Cyan; + Console.Write("\n[Agent] "); + Console.ResetColor(); + + await foreach (var update in agent.RunStreamingAsync(FollowUp1, session)) + { + Console.Write(update); + + // During streaming we should be able to see updates to the chat history + // before the full run completes, as each service call is made and persisted. + PrintChatHistory(session, "During second run", ref lastChatHistorySize, ref lastConversationId); + } + + Console.WriteLine(); + PrintChatHistory(session, "After second run", ref lastChatHistorySize, ref lastConversationId); + + // Third turn — follow-up to verify chat history is correct. + const string FollowUp2 = "Which city is the warmest?"; + PrintUserMessage(FollowUp2); + + Console.ForegroundColor = ConsoleColor.Cyan; + Console.Write("\n[Agent] "); + Console.ResetColor(); + + await foreach (var update in agent.RunStreamingAsync(FollowUp2, session)) + { + Console.Write(update); + + // During streaming we should be able to see updates to the chat history + // before the full run completes, as each service call is made and persisted. + PrintChatHistory(session, "During third run", ref lastChatHistorySize, ref lastConversationId); + } + + Console.WriteLine(); + PrintChatHistory(session, "After third run", ref lastChatHistorySize, ref lastConversationId); +} + +void PrintUserMessage(string message) +{ + Console.ForegroundColor = ConsoleColor.Cyan; + Console.Write("\n[User] "); + Console.ResetColor(); + Console.WriteLine(message); +} + +void PrintAgentResponse(string? text) +{ + Console.ForegroundColor = ConsoleColor.Cyan; + Console.Write("\n[Agent] "); + Console.ResetColor(); + Console.WriteLine(text); +} + +// Helper to print the current chat history from the session. +void PrintChatHistory(AgentSession session, string label, ref int lastChatHistorySize, ref string lastConversationId) +{ + if (session.TryGetInMemoryChatHistory(out var history) && history.Count != lastChatHistorySize) + { + Console.ForegroundColor = ConsoleColor.DarkGray; + Console.WriteLine($"\n [{label} — Chat history: {history.Count} message(s)]"); + foreach (var msg in history) + { + var preview = msg.Text?.Length > 80 ? msg.Text[..80] + "…" : msg.Text; + var contentTypes = string.Join(", ", msg.Contents.Select(c => c.GetType().Name)); + Console.WriteLine($" {msg.Role,-12} | {(string.IsNullOrWhiteSpace(preview) ? $"[{contentTypes}]" : preview)}"); + } + + Console.ResetColor(); + + lastChatHistorySize = history.Count; + } + + if (session is ChatClientAgentSession ccaSession && ccaSession.ConversationId is not null && ccaSession.ConversationId != lastConversationId) + { + Console.ForegroundColor = ConsoleColor.DarkGray; + Console.WriteLine($" [{label} — Conversation ID: {ccaSession.ConversationId}]"); + Console.ResetColor(); + lastConversationId = ccaSession.ConversationId; + } +} diff --git a/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/README.md b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/README.md new file mode 100644 index 0000000000..d6157586f0 --- /dev/null +++ b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/README.md @@ -0,0 +1,63 @@ +# In-Function-Loop Checkpointing + +This sample demonstrates how `ChatClientAgent` persists chat history after each individual call to the AI service by default. This per-service-call persistence ensures intermediate progress is saved during the function invocation loop. + +## What This Sample Shows + +When an agent uses tools, the `FunctionInvokingChatClient` loops multiple times (service call → tool execution → service call → …). By default, chat history is persisted after each service call via the `ChatHistoryPersistingChatClient` decorator: + +- A `ChatHistoryPersistingChatClient` decorator is automatically inserted into the chat client pipeline +- After each service call, the decorator notifies the `ChatHistoryProvider` (and any `AIContextProvider` instances) with the new messages +- Only **new** messages are sent to providers on each notification — messages that were already persisted in an earlier call within the same run are deduplicated automatically + +To opt into end-of-run persistence instead (atomic run semantics), set `PersistChatHistoryAtEndOfRun = true` on `ChatClientAgentOptions`. In that mode, the decorator marks messages with metadata rather than persisting them immediately, and `ChatClientAgent` persists only the marked messages at the end of the run. + +Per-service-call persistence is useful for: +- **Crash recovery** — if the process is interrupted mid-loop, the intermediate tool calls and results are already persisted +- **Observability** — you can inspect the chat history while the agent is still running (e.g., during streaming) +- **Long-running tool loops** — agents with many sequential tool calls benefit from incremental persistence + +## How It Works + +The sample asks the agent about the weather and time in three cities. The model calls the `GetWeather` and `GetTime` tools for each city, resulting in multiple service calls within a single `RunStreamingAsync` invocation. After the run completes, the sample prints the full chat history to show all the intermediate messages that were persisted along the way. + +### Pipeline Architecture + +``` +ChatClientAgent + └─ FunctionInvokingChatClient (handles tool call loop) + └─ ChatHistoryPersistingChatClient (persists after each service call) + └─ Leaf IChatClient (Azure OpenAI) +``` + +## Prerequisites + +- .NET 10 SDK or later +- Azure OpenAI service endpoint and model deployment +- Azure CLI installed and authenticated + +**Note**: This sample uses `DefaultAzureCredential`. Sign in with `az login` before running. For production, prefer a specific credential such as `ManagedIdentityCredential`. For more information, see the [Azure CLI authentication documentation](https://learn.microsoft.com/cli/azure/authenticate-azure-cli-interactively). + +## Environment Variables + +```powershell +$env:AZURE_OPENAI_ENDPOINT="https://your-resource.openai.azure.com/" # Required +$env:AZURE_OPENAI_DEPLOYMENT_NAME="gpt-4o-mini" # Optional, defaults to gpt-4o-mini +``` + +## Running the Sample + +```powershell +cd dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing +dotnet run +``` + +## Expected Behavior + +The sample runs two conversation turns: + +1. **First turn** — asks about weather and time in three cities. The model calls `GetWeather` and `GetTime` tools (potentially in parallel or sequentially), then provides a summary. The chat history dump after the run shows all the intermediate tool call and result messages. + +2. **Second turn** — asks a follow-up question ("Which city is the warmest?") that uses the persisted conversation context. The chat history dump shows the full accumulated conversation. + +The chat history printout uses `session.TryGetInMemoryChatHistory()` to inspect the in-memory storage. diff --git a/dotnet/samples/02-agents/Agents/README.md b/dotnet/samples/02-agents/Agents/README.md index 4ac53ba246..c5258ba9f4 100644 --- a/dotnet/samples/02-agents/Agents/README.md +++ b/dotnet/samples/02-agents/Agents/README.md @@ -45,6 +45,7 @@ Before you begin, ensure you have the following prerequisites: |[Declarative agent](./Agent_Step16_Declarative/)|This sample demonstrates how to declaratively define an agent.| |[Providing additional AI Context to an agent using multiple AIContextProviders](./Agent_Step17_AdditionalAIContext/)|This sample demonstrates how to inject additional AI context into a ChatClientAgent using multiple custom AIContextProvider components that are attached to the agent.| |[Using compaction pipeline with an agent](./Agent_Step18_CompactionPipeline/)|This sample demonstrates how to use a compaction pipeline to efficiently limit the size of the conversation history for an agent.| +|[In-function-loop checkpointing](./Agent_Step19_InFunctionLoopCheckpointing/)|This sample demonstrates how to persist chat history after each service call during a tool-calling loop, enabling crash recovery and mid-run observability.| ## Running the samples from the console diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs index adb6eb9f83..6722bd8738 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs @@ -138,6 +138,9 @@ public ChatClientAgent(IChatClient chatClient, ChatClientAgentOptions? options, this._aiContextProviderStateKeys = ValidateAndCollectStateKeys(this._agentOptions?.AIContextProviders, this.ChatHistoryProvider); this._logger = (loggerFactory ?? chatClient.GetService() ?? NullLoggerFactory.Instance).CreateLogger(); + + // Warn if using a custom chat client stack with end-of-run persistence but no ChatHistoryPersistingChatClient. + this.WarnOnMissingPersistingClient(); } /// @@ -211,12 +214,14 @@ protected override async Task RunCoreAsync( ChatClientAgentContinuationToken? _) = await this.PrepareSessionAndMessagesAsync(session, inputMessages, options, cancellationToken).ConfigureAwait(false); - var chatClient = this.ChatClient; + // Update the run context with the resolved session so any downstream classes + // always have a valid session, even when the caller passed null. + EnsureRunContextHasSession(safeSession); + var chatClient = this.ChatClient; chatClient = ApplyRunOptionsTransformations(options, chatClient); var loggingAgentName = this.GetLoggingAgentName(); - this._logger.LogAgentChatClientInvokingAgent(nameof(RunAsync), this.Id, loggingAgentName, this._chatClientType); // Call the IChatClient and notify the AIContextProvider of any failures. @@ -227,8 +232,7 @@ protected override async Task RunCoreAsync( } catch (Exception ex) { - await this.NotifyChatHistoryProviderOfFailureAsync(safeSession, ex, inputMessagesForChatClient, chatOptions, cancellationToken).ConfigureAwait(false); - await this.NotifyAIContextProviderOfFailureAsync(safeSession, ex, inputMessagesForChatClient, cancellationToken).ConfigureAwait(false); + await this.NotifyProvidersOfFailureAtEndOfRunAsync(safeSession, ex, inputMessagesForChatClient, chatOptions, cancellationToken).ConfigureAwait(false); throw; } @@ -236,7 +240,8 @@ protected override async Task RunCoreAsync( // We can derive the type of supported session from whether we have a conversation id, // so let's update it and set the conversation id for the service session case. - this.UpdateSessionConversationId(safeSession, chatResponse.ConversationId, cancellationToken); + var forceEndOfRunPersistence = chatOptions?.ContinuationToken is not null || chatOptions?.AllowBackgroundResponses is true; + this.UpdateSessionConversationIdAtEndOfRun(safeSession, chatResponse.ConversationId, cancellationToken, forceUpdate: forceEndOfRunPersistence); // Ensure that the author name is set for each message in the response. foreach (ChatMessage chatResponseMessage in chatResponse.Messages) @@ -244,11 +249,10 @@ protected override async Task RunCoreAsync( chatResponseMessage.AuthorName ??= this.Name; } - // Only notify the session of new messages if the chatResponse was successful to avoid inconsistent message state in the session. - await this.NotifyChatHistoryProviderOfNewMessagesAsync(safeSession, inputMessagesForChatClient, chatResponse.Messages, chatOptions, cancellationToken).ConfigureAwait(false); - - // Notify the AIContextProvider of all new messages. - await this.NotifyAIContextProviderOfSuccessAsync(safeSession, inputMessagesForChatClient, chatResponse.Messages, cancellationToken).ConfigureAwait(false); + // Notify providers of all new messages unless persistence is handled per-service-call by the decorator. + // When background responses are allowed, force notification since per-service-call persistence + // is unreliable (the caller may stop consuming the stream before the decorator can persist). + await this.NotifyProvidersOfNewMessagesAtEndOfRunAsync(safeSession, inputMessagesForChatClient, chatResponse.Messages, chatOptions, cancellationToken, forceNotify: forceEndOfRunPersistence).ConfigureAwait(false); return new AgentResponse(chatResponse) { @@ -296,6 +300,10 @@ protected override async IAsyncEnumerable RunCoreStreamingA ChatClientAgentContinuationToken? continuationToken) = await this.PrepareSessionAndMessagesAsync(session, inputMessages, options, cancellationToken).ConfigureAwait(false); + // Update the run context with the resolved session so any downstream classes + // always have a valid session, even when the caller passed null. + EnsureRunContextHasSession(safeSession); + var chatClient = this.ChatClient; chatClient = ApplyRunOptionsTransformations(options, chatClient); @@ -315,8 +323,7 @@ protected override async IAsyncEnumerable RunCoreStreamingA } catch (Exception ex) { - await this.NotifyChatHistoryProviderOfFailureAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false); - await this.NotifyAIContextProviderOfFailureAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), cancellationToken).ConfigureAwait(false); + await this.NotifyProvidersOfFailureAtEndOfRunAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false); throw; } @@ -330,8 +337,7 @@ protected override async IAsyncEnumerable RunCoreStreamingA } catch (Exception ex) { - await this.NotifyChatHistoryProviderOfFailureAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false); - await this.NotifyAIContextProviderOfFailureAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), cancellationToken).ConfigureAwait(false); + await this.NotifyProvidersOfFailureAtEndOfRunAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false); throw; } @@ -353,27 +359,31 @@ protected override async IAsyncEnumerable RunCoreStreamingA try { + // Re-ensure the run context has the resolved session before each MoveNextAsync. + // The base class RunStreamingAsync restores the original context (potentially with + // null session) after each yield, so we must re-establish it for the decorator. + EnsureRunContextHasSession(safeSession); hasUpdates = await responseUpdatesEnumerator.MoveNextAsync().ConfigureAwait(false); } catch (Exception ex) { - await this.NotifyChatHistoryProviderOfFailureAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false); - await this.NotifyAIContextProviderOfFailureAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), cancellationToken).ConfigureAwait(false); + await this.NotifyProvidersOfFailureAtEndOfRunAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false); throw; } } var chatResponse = responseUpdates.ToChatResponse(); + var forceEndOfRunPersistence = continuationToken is not null || chatOptions?.AllowBackgroundResponses is true; + // We can derive the type of supported session from whether we have a conversation id, // so let's update it and set the conversation id for the service session case. - this.UpdateSessionConversationId(safeSession, chatResponse.ConversationId, cancellationToken); + this.UpdateSessionConversationIdAtEndOfRun(safeSession, chatResponse.ConversationId, cancellationToken, forceUpdate: forceEndOfRunPersistence); - // To avoid inconsistent state we only notify the session of the input messages if no error occurs after the initial request. - await this.NotifyChatHistoryProviderOfNewMessagesAsync(safeSession, GetInputMessages(inputMessagesForChatClient, continuationToken), chatResponse.Messages, chatOptions, cancellationToken).ConfigureAwait(false); - - // Notify the AIContextProvider of all new messages. - await this.NotifyAIContextProviderOfSuccessAsync(safeSession, GetInputMessages(inputMessagesForChatClient, continuationToken), chatResponse.Messages, cancellationToken).ConfigureAwait(false); + // Notify providers of all new messages unless persistence is handled per-service-call by the decorator. + // When resuming from a continuation token or using background responses, force notification + // to send the combined data (per-service-call persistence is unreliable for these scenarios). + await this.NotifyProvidersOfNewMessagesAtEndOfRunAsync(safeSession, GetInputMessages(inputMessagesForChatClient, continuationToken), chatResponse.Messages, chatOptions, cancellationToken, forceNotify: forceEndOfRunPersistence).ConfigureAwait(false); } /// @@ -441,17 +451,29 @@ protected override ValueTask DeserializeSessionCoreAsync(JsonEleme #region Private /// - /// Notify the when an agent run succeeded, if there is an . + /// Notifies the and all of successfully completed messages. /// - private async Task NotifyAIContextProviderOfSuccessAsync( + /// + /// This method is also called by to persist messages per-service-call. + /// + internal async Task NotifyProvidersOfNewMessagesAsync( ChatClientAgentSession session, - IEnumerable inputMessages, + IEnumerable requestMessages, IEnumerable responseMessages, + ChatOptions? chatOptions, CancellationToken cancellationToken) { + ChatHistoryProvider? chatHistoryProvider = this.ResolveChatHistoryProvider(chatOptions, session); + + if (chatHistoryProvider is not null) + { + var invokedContext = new ChatHistoryProvider.InvokedContext(this, session, requestMessages, responseMessages); + await chatHistoryProvider.InvokedAsync(invokedContext, cancellationToken).ConfigureAwait(false); + } + if (this.AIContextProviders is { Count: > 0 } contextProviders) { - AIContextProvider.InvokedContext invokedContext = new(this, session, inputMessages, responseMessages); + AIContextProvider.InvokedContext invokedContext = new(this, session, requestMessages, responseMessages); foreach (var contextProvider in contextProviders) { @@ -461,17 +483,29 @@ private async Task NotifyAIContextProviderOfSuccessAsync( } /// - /// Notify the of any failure during an agent run, if there is an . + /// Notifies the and all of a failure during a service call. /// - private async Task NotifyAIContextProviderOfFailureAsync( + /// + /// This method is also called by to report failures per-service-call. + /// + internal async Task NotifyProvidersOfFailureAsync( ChatClientAgentSession session, Exception ex, - IEnumerable inputMessages, + IEnumerable requestMessages, + ChatOptions? chatOptions, CancellationToken cancellationToken) { + ChatHistoryProvider? chatHistoryProvider = this.ResolveChatHistoryProvider(chatOptions, session); + + if (chatHistoryProvider is not null) + { + var invokedContext = new ChatHistoryProvider.InvokedContext(this, session, requestMessages, ex); + await chatHistoryProvider.InvokedAsync(invokedContext, cancellationToken).ConfigureAwait(false); + } + if (this.AIContextProviders is { Count: > 0 } contextProviders) { - AIContextProvider.InvokedContext invokedContext = new(this, session, inputMessages, ex); + AIContextProvider.InvokedContext invokedContext = new(this, session, requestMessages, ex); foreach (var contextProvider in contextProviders) { @@ -667,6 +701,12 @@ private async Task throw new InvalidOperationException("A session must be provided when continuing a background response with a continuation token."); } + if ((continuationToken is not null || chatOptions?.AllowBackgroundResponses is true) && this.PersistsChatHistoryPerServiceCall && this._logger.IsEnabled(LogLevel.Warning)) + { + var warningAgentName = this.GetLoggingAgentName(); + this._logger.LogAgentChatClientBackgroundResponseFallback(this.Id, warningAgentName); + } + session ??= await this.CreateSessionAsync(cancellationToken).ConfigureAwait(false); if (session is not ChatClientAgentSession typedSession) { @@ -754,7 +794,7 @@ private async Task return (typedSession, chatOptions, messagesList, continuationToken); } - private void UpdateSessionConversationId(ChatClientAgentSession session, string? responseConversationId, CancellationToken cancellationToken) + internal void UpdateSessionConversationId(ChatClientAgentSession session, string? responseConversationId, CancellationToken cancellationToken) { if (string.IsNullOrWhiteSpace(responseConversationId) && !string.IsNullOrWhiteSpace(session.ConversationId)) { @@ -798,45 +838,162 @@ private void UpdateSessionConversationId(ChatClientAgentSession session, string? } } - private Task NotifyChatHistoryProviderOfFailureAsync( + /// + /// Updates the session conversation ID at the end of an agent run. + /// + /// + /// When a in persist mode handles per-service-call + /// conversation ID updates, this end-of-run update is skipped. When the decorator is in mark-only + /// mode or absent, the update is performed here. When is + /// (continuation token scenarios), the update is always performed. + /// + private void UpdateSessionConversationIdAtEndOfRun(ChatClientAgentSession session, string? responseConversationId, CancellationToken cancellationToken, bool forceUpdate = false) + { + if (!forceUpdate && this.PersistsChatHistoryPerServiceCall) + { + return; + } + + this.UpdateSessionConversationId(session, responseConversationId, cancellationToken); + } + + /// + /// Notifies providers of successfully completed messages at the end of an agent run. + /// + /// + /// When a in persist mode handles per-service-call + /// notification, this end-of-run notification is skipped. When the decorator is in mark-only mode, + /// only the marked messages are persisted. When no decorator is present (custom stack with + /// ), all messages are persisted. + /// When is (continuation token or + /// background response scenarios), notification is always performed with all messages because + /// per-service-call persistence is unreliable in these scenarios. + /// + private Task NotifyProvidersOfNewMessagesAtEndOfRunAsync( ChatClientAgentSession session, - Exception ex, IEnumerable requestMessages, + IEnumerable responseMessages, ChatOptions? chatOptions, - CancellationToken cancellationToken) + CancellationToken cancellationToken, + bool forceNotify = false) { - ChatHistoryProvider? provider = this.ResolveChatHistoryProvider(chatOptions, session); - - // Only notify the provider if we have one. - // If we don't have one, it means that the chat history is service managed and the underlying service is responsible for storing messages. - if (provider is not null) + if (!forceNotify && this.PersistsChatHistoryPerServiceCall) { - var invokedContext = new ChatHistoryProvider.InvokedContext(this, session, requestMessages, ex); + return Task.CompletedTask; + } - return provider.InvokedAsync(invokedContext, cancellationToken).AsTask(); + if (!forceNotify && this.HasMarkOnlyChatHistoryPersistingClient) + { + // In mark-only mode, persist only messages that were marked by the decorator. + var markedRequestMessages = GetMarkedMessages(requestMessages); + var markedResponseMessages = GetMarkedMessages(responseMessages); + return this.NotifyProvidersOfNewMessagesAsync(session, markedRequestMessages, markedResponseMessages, chatOptions, cancellationToken); } - return Task.CompletedTask; + return this.NotifyProvidersOfNewMessagesAsync(session, requestMessages, responseMessages, chatOptions, cancellationToken); } - private Task NotifyChatHistoryProviderOfNewMessagesAsync( + /// + /// Notifies providers of a failure at the end of an agent run. + /// + /// + /// When a in persist mode handles per-service-call + /// notification (including failure), this end-of-run notification is skipped to avoid + /// duplicate notification. In all other cases, failure is reported at the end of the run. + /// + private Task NotifyProvidersOfFailureAtEndOfRunAsync( ChatClientAgentSession session, + Exception ex, IEnumerable requestMessages, - IEnumerable responseMessages, ChatOptions? chatOptions, CancellationToken cancellationToken) { - ChatHistoryProvider? provider = this.ResolveChatHistoryProvider(chatOptions, session); + if (this.PersistsChatHistoryPerServiceCall) + { + return Task.CompletedTask; + } + + return this.NotifyProvidersOfFailureAsync(session, ex, requestMessages, chatOptions, cancellationToken); + } - // Only notify the provider if we have one. - // If we don't have one, it means that the chat history is service managed and the underlying service is responsible for storing messages. - if (provider is not null) + /// + /// Gets a value indicating whether the agent has a + /// decorator in persist mode (not mark-only), which handles per-service-call persistence. + /// + private bool PersistsChatHistoryPerServiceCall + { + get { - var invokedContext = new ChatHistoryProvider.InvokedContext(this, session, requestMessages, responseMessages); - return provider.InvokedAsync(invokedContext, cancellationToken).AsTask(); + var persistingClient = this.ChatClient.GetService(); + return persistingClient?.MarkOnly == false; + } + } + + /// + /// Gets a value indicating whether the agent has a + /// decorator in mark-only mode, which marks messages for later persistence at the end of the run. + /// + private bool HasMarkOnlyChatHistoryPersistingClient + { + get + { + var persistingClient = this.ChatClient.GetService(); + return persistingClient?.MarkOnly == true; + } + } + + /// + /// Returns only the messages that have been marked as persisted by a in mark-only mode. + /// + private static List GetMarkedMessages(IEnumerable messages) + { + return messages.Where(m => + m.AdditionalProperties?.TryGetValue(ChatHistoryPersistingChatClient.PersistedMarkerKey, out var value) == true && value is true).ToList(); + } + + /// + /// Ensures that contains the resolved session. + /// + /// + /// The base class sets with the raw session parameter + /// (which may be null) and restores it after each yield in streaming scenarios. After + /// resolves or creates a session, we update the + /// context so the decorator always has a valid session. + /// The original agent from the context is preserved to maintain the top-of-stack agent in + /// decorated agent scenarios. + /// + private static void EnsureRunContextHasSession(ChatClientAgentSession safeSession) + { + var context = CurrentRunContext; + if (context is not null && context.Session != safeSession) + { + CurrentRunContext = new(context.Agent, safeSession, context.RequestMessages, context.RunOptions); + } + } + + /// + /// Checks for potential misconfiguration when using a custom chat client stack and logs warnings. + /// + private void WarnOnMissingPersistingClient() + { + if (this._agentOptions?.UseProvidedChatClientAsIs is not true) + { + return; + } + + if (this._agentOptions?.PersistChatHistoryAtEndOfRun is not true) + { + return; } - return Task.CompletedTask; + var persistingClient = this.ChatClient.GetService(); + if (persistingClient is null && this._logger.IsEnabled(LogLevel.Warning)) + { + var loggingAgentName = this.GetLoggingAgentName(); + this._logger.LogAgentChatClientMissingPersistingClient( + this.Id, + loggingAgentName); + } } private ChatHistoryProvider? ResolveChatHistoryProvider(ChatOptions? chatOptions, ChatClientAgentSession session) diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentLogMessages.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentLogMessages.cs index 98ff4583dc..2a324522a4 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentLogMessages.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentLogMessages.cs @@ -69,4 +69,32 @@ public static partial void LogAgentChatClientHistoryProviderConflict( string chatHistoryProviderName, string agentId, string agentName); + + /// + /// Logs a warning when is + /// and is , + /// but no is found in the custom chat client stack. + /// + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Agent {AgentId}/{AgentName}: PersistChatHistoryAtEndOfRun is enabled with a custom chat client stack (UseProvidedChatClientAsIs), but no ChatHistoryPersistingChatClient was found in the pipeline. All messages will be persisted at the end of the run without marking. This setup is not supported with some other features, e.g. handoffs. Consider adding a ChatHistoryPersistingChatClient to the pipeline using the UseChatHistoryPersisting extension method.")] + public static partial void LogAgentChatClientMissingPersistingClient( + this ILogger logger, + string agentId, + string agentName); + + /// + /// Logs a warning when per-service-call persistence falls back to end-of-run persistence + /// because the run involves background responses (continuation token resumption or + /// AllowBackgroundResponses). Per-service-call persistence is + /// unreliable in these scenarios because the caller may stop consuming the stream before + /// the decorator's post-stream persistence code can execute. + /// + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Agent {AgentId}/{AgentName}: Per-service-call persistence is falling back to end-of-run persistence because the run involves background responses. Messages will be marked during the run and persisted at the end.")] + public static partial void LogAgentChatClientBackgroundResponseFallback( + this ILogger logger, + string agentId, + string agentName); } diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs index 38cad40bbe..8df9112446 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs @@ -1,7 +1,9 @@ // Copyright (c) Microsoft. All rights reserved. using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using Microsoft.Extensions.AI; +using Microsoft.Shared.DiagnosticIds; namespace Microsoft.Agents.AI; @@ -89,6 +91,56 @@ public sealed class ChatClientAgentOptions /// public bool ThrowOnChatHistoryProviderConflict { get; set; } = true; + /// + /// Gets or sets a value indicating whether to persist chat history only at the end of the full agent run + /// rather than after each individual service call. + /// + /// + /// + /// By default, persists request and response messages either via + /// a , or the underlying AI service's chat history storage. + /// Persistence is done immediately after each call to the AI service within the function invocation loop. + /// When storing in the underlying AI service, the session's + /// is also updated after each service call, keeping it in sync with the service-side conversation state. + /// + /// + /// Setting this property to causes messages to be marked during the function + /// invocation loop but persisted only at the end of the full agent run, providing atomic run semantics. + /// Updating the is likewise deferred and + /// updated only at the end of the run, consistent with atomic run semantics. + /// A decorator is inserted into the chat client pipeline + /// in mark-only mode, and the persists only the marked messages at the + /// end of the run. + /// + /// + /// When this option is (the default), the + /// decorator persists messages and updates the + /// immediately after each service call. This may leave chat history in a state where + /// is required to start a new run if the last successful service + /// call returned . + /// + /// + /// This option has no effect when is . + /// When using a custom chat client stack, you can add a + /// manually via the + /// extension method. + /// + /// + /// Note that when using single threaded service stored chat history, like OpenAI Conversations, + /// there is only one id, so even if the conversation id is not updated after each service call, + /// the chat history will still contain intermediate messages. Setting this property to + /// in this case will therefore have no real effect. Setting this property to when using + /// OpenAI Responses with response ids on the other hand, allows atomic run semantics, since + /// each service request produces a new response id, and if the run fails mid-loop, the session will + /// still contain the pre-run respnose id, allowing the next run to start with a clean slate. + /// + /// + /// + /// Default is . + /// + [Experimental(DiagnosticIds.Experiments.AgentsAIExperiments)] + public bool PersistChatHistoryAtEndOfRun { get; set; } + /// /// Creates a new instance of with the same values as this instance. /// @@ -105,5 +157,6 @@ public ChatClientAgentOptions Clone() ClearOnChatHistoryProviderConflict = this.ClearOnChatHistoryProviderConflict, WarnOnChatHistoryProviderConflict = this.WarnOnChatHistoryProviderConflict, ThrowOnChatHistoryProviderConflict = this.ThrowOnChatHistoryProviderConflict, + PersistChatHistoryAtEndOfRun = this.PersistChatHistoryAtEndOfRun, }; } diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientBuilderExtensions.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientBuilderExtensions.cs index ee782dce52..a1e8b5f8a5 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientBuilderExtensions.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientBuilderExtensions.cs @@ -2,8 +2,10 @@ using System; using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using Microsoft.Agents.AI; using Microsoft.Extensions.Logging; +using Microsoft.Shared.DiagnosticIds; using Microsoft.Shared.Diagnostics; namespace Microsoft.Extensions.AI; @@ -82,4 +84,46 @@ public static ChatClientAgent BuildAIAgent( options: options, loggerFactory: loggerFactory, services: services); + + /// + /// Adds a to the chat client pipeline. + /// + /// + /// + /// This decorator should be positioned between the and the leaf + /// in the pipeline. It intercepts service calls to either persist messages + /// immediately or mark them for later persistence, depending on the parameter. + /// + /// + /// If is set to , the + /// should be configured with set to + /// as without this combination, messages will never be persisted when using a for + /// chat history persistence. + /// + /// + /// This extension method is intended for use with custom chat client stacks when + /// is . + /// When is (the default), + /// the automatically injects this decorator. + /// + /// + /// This decorator only works within the context of a running and will throw an + /// exception if used in any other stack. + /// + /// + /// The to add the decorator to. + /// + /// When , messages are marked with metadata but not persisted immediately, + /// and the session's is not updated. + /// The will persist only the marked messages and update the + /// conversation ID at the end of the run. + /// When (the default), messages are persisted and the conversation ID + /// is updated immediately after each service call. + /// + /// The for chaining. + [Experimental(DiagnosticIds.Experiments.AgentsAIExperiments)] + public static ChatClientBuilder UseChatHistoryPersisting(this ChatClientBuilder builder, bool markOnly = false) + { + return builder.Use(innerClient => new ChatHistoryPersistingChatClient(innerClient, markOnly)); + } } diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientExtensions.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientExtensions.cs index 8290c39974..fffac628a6 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientExtensions.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientExtensions.cs @@ -63,6 +63,15 @@ internal static IChatClient WithDefaultAgentMiddleware(this IChatClient chatClie }); } + // ChatHistoryPersistingChatClient is registered after FunctionInvokingChatClient so that it sits + // between FIC and the leaf client. ChatClientBuilder.Build applies factories in reverse order, + // making the first Use() call outermost. By adding our decorator second, the resulting pipeline is: + // FunctionInvokingChatClient → ChatHistoryPersistingChatClient → leaf IChatClient + // This allows the decorator to persist messages after each individual service call within + // FIC's function invocation loop, or to mark them for later persistence at the end of the run. + bool markOnly = options?.PersistChatHistoryAtEndOfRun is true; + chatBuilder.Use(innerClient => new ChatHistoryPersistingChatClient(innerClient, markOnly)); + var agentChatClient = chatBuilder.Build(services); if (options?.ChatOptions?.Tools is { Count: > 0 }) diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs new file mode 100644 index 0000000000..0085afbdd5 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs @@ -0,0 +1,313 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.AI; + +namespace Microsoft.Agents.AI; + +/// +/// A delegating chat client that notifies and +/// instances of request and response messages after each individual call to the inner chat client, +/// or marks messages for later persistence depending on the configured mode. +/// +/// +/// +/// This decorator is intended to operate between the and the leaf +/// in a pipeline. +/// +/// +/// In persist mode (the default), it ensures that providers are notified and the session's +/// is updated after each service call, so that +/// intermediate messages (e.g., tool calls and results) are saved even if the process is interrupted +/// mid-loop. +/// +/// +/// In mark-only mode ( is ), it marks messages with metadata +/// but does not notify providers or update the . +/// Both are deferred to the at the end of the run, providing atomic +/// run semantics. +/// +/// +/// This chat client must be used within the context of a running . It retrieves the +/// current agent and session from , which is set automatically when an agent's +/// or +/// +/// method is called. The ensures the run context always contains a resolved session, +/// even when the caller passes null. An is thrown if no run context is +/// available or if the agent is not a . +/// +/// +internal sealed class ChatHistoryPersistingChatClient : DelegatingChatClient +{ + /// + /// The key used in and + /// to mark messages and their content as already persisted to chat history. + /// + internal const string PersistedMarkerKey = "_chatHistoryPersisted"; + + /// + /// Initializes a new instance of the class. + /// + /// The underlying chat client that will handle the core operations. + /// + /// When , messages are marked with metadata but not persisted immediately, + /// and the session's is not updated. + /// The will persist only the marked messages and update the + /// conversation ID at the end of the run. + /// When (the default), messages are persisted and the conversation ID + /// is updated immediately after each service call. + /// + public ChatHistoryPersistingChatClient(IChatClient innerClient, bool markOnly = false) + : base(innerClient) + { + this.MarkOnly = markOnly; + } + + /// + /// Gets a value indicating whether this decorator is in mark-only mode. + /// + /// + /// When , messages are marked with metadata but not persisted immediately, + /// and the session's is not updated. + /// Both are deferred to the at the end of the run. + /// When , messages are persisted and the conversation ID is updated + /// after each service call. + /// + public bool MarkOnly { get; } + + /// + public override async Task GetResponseAsync( + IEnumerable messages, + ChatOptions? options = null, + CancellationToken cancellationToken = default) + { + var (agent, session) = GetRequiredAgentAndSession(); + + ChatResponse response; + try + { + response = await base.GetResponseAsync(messages, options, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + var newRequestMessagesOnFailure = GetNewRequestMessages(messages); + await agent.NotifyProvidersOfFailureAsync(session, ex, newRequestMessagesOnFailure, options, cancellationToken).ConfigureAwait(false); + throw; + } + + var newRequestMessages = GetNewRequestMessages(messages); + + if (this.ShouldDeferPersistence(options)) + { + // In mark-only mode or when resuming from a continuation token, just mark messages + // for later persistence by ChatClientAgent. Conversation ID and provider notification + // are deferred to end-of-run. For continuation tokens, the end-of-run handler needs + // to send the combined data from both the previous and current runs. + MarkAsPersisted(newRequestMessages); + MarkAsPersisted(response.Messages); + } + else + { + // In persist mode, persist immediately and update conversation ID. + agent.UpdateSessionConversationId(session, response.ConversationId, cancellationToken); + await agent.NotifyProvidersOfNewMessagesAsync(session, newRequestMessages, response.Messages, options, cancellationToken).ConfigureAwait(false); + MarkAsPersisted(newRequestMessages); + MarkAsPersisted(response.Messages); + } + + return response; + } + + /// + public override async IAsyncEnumerable GetStreamingResponseAsync( + IEnumerable messages, + ChatOptions? options = null, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var (agent, session) = GetRequiredAgentAndSession(); + + List responseUpdates = []; + + IAsyncEnumerator enumerator; + try + { + enumerator = base.GetStreamingResponseAsync(messages, options, cancellationToken).GetAsyncEnumerator(cancellationToken); + } + catch (Exception ex) + { + var newRequestMessagesOnFailure = GetNewRequestMessages(messages); + await agent.NotifyProvidersOfFailureAsync(session, ex, newRequestMessagesOnFailure, options, cancellationToken).ConfigureAwait(false); + throw; + } + + bool hasUpdates; + try + { + hasUpdates = await enumerator.MoveNextAsync().ConfigureAwait(false); + } + catch (Exception ex) + { + var newRequestMessagesOnFailure = GetNewRequestMessages(messages); + await agent.NotifyProvidersOfFailureAsync(session, ex, newRequestMessagesOnFailure, options, cancellationToken).ConfigureAwait(false); + throw; + } + + while (hasUpdates) + { + var update = enumerator.Current; + responseUpdates.Add(update); + yield return update; + + try + { + hasUpdates = await enumerator.MoveNextAsync().ConfigureAwait(false); + } + catch (Exception ex) + { + var newRequestMessagesOnFailure = GetNewRequestMessages(messages); + await agent.NotifyProvidersOfFailureAsync(session, ex, newRequestMessagesOnFailure, options, cancellationToken).ConfigureAwait(false); + throw; + } + } + + var chatResponse = responseUpdates.ToChatResponse(); + var newRequestMessages = GetNewRequestMessages(messages); + + if (this.ShouldDeferPersistence(options)) + { + // In mark-only mode or when resuming from a continuation token, just mark messages + // for later persistence by ChatClientAgent. Conversation ID and provider notification + // are deferred to end-of-run. For continuation tokens, the end-of-run handler needs + // to send the combined data from both the previous and current runs. + MarkAsPersisted(newRequestMessages); + MarkAsPersisted(chatResponse.Messages); + } + else + { + // In persist mode, persist immediately and update conversation ID. + agent.UpdateSessionConversationId(session, chatResponse.ConversationId, cancellationToken); + await agent.NotifyProvidersOfNewMessagesAsync(session, newRequestMessages, chatResponse.Messages, options, cancellationToken).ConfigureAwait(false); + MarkAsPersisted(newRequestMessages); + MarkAsPersisted(chatResponse.Messages); + } + } + + /// + /// Gets the current and from the run context. + /// + private static (ChatClientAgent Agent, ChatClientAgentSession Session) GetRequiredAgentAndSession() + { + var runContext = AIAgent.CurrentRunContext + ?? throw new InvalidOperationException( + $"{nameof(ChatHistoryPersistingChatClient)} can only be used within the context of a running AIAgent. " + + "Ensure that the chat client is being invoked as part of an AIAgent.RunAsync or AIAgent.RunStreamingAsync call."); + + var chatClientAgent = runContext.Agent.GetService() + ?? throw new InvalidOperationException( + $"{nameof(ChatHistoryPersistingChatClient)} can only be used with a {nameof(ChatClientAgent)}. " + + $"The current agent is of type '{runContext.Agent.GetType().Name}'."); + + if (runContext.Session is not ChatClientAgentSession chatClientAgentSession) + { + throw new InvalidOperationException( + $"{nameof(ChatHistoryPersistingChatClient)} requires a {nameof(ChatClientAgentSession)}. " + + $"The current session is of type '{runContext.Session?.GetType().Name ?? "null"}'."); + } + + return (chatClientAgent, chatClientAgentSession); + } + + /// + /// Determines whether persistence should be deferred to end-of-run instead of happening immediately. + /// + /// + /// when in mode, when the call is resuming from + /// a continuation token (since the end-of-run handler needs to combine data from the previous + /// and current runs), or when background responses are allowed (since the caller may stop + /// consuming the stream mid-run, preventing the post-stream persistence code from executing). + /// + private bool ShouldDeferPersistence(ChatOptions? options) + { + return this.MarkOnly || options?.ContinuationToken is not null || options?.AllowBackgroundResponses is true; + } + + /// + /// Returns only the request messages that have not yet been persisted to chat history. + /// + /// + /// A message is considered already persisted if any of the following is true: + /// + /// It has the in its . + /// It has an of + /// (indicating it was loaded from chat history and does not need to be re-persisted). + /// It has and all of its items have the + /// in their . This handles the + /// streaming case where reconstructs objects + /// independently via ToChatResponse(), producing different object references that share the same + /// underlying instances. + /// + /// + /// A list of request messages that have not yet been persisted. + /// The full set of request messages to filter. + private static List GetNewRequestMessages(IEnumerable messages) + { + return messages.Where(m => !IsAlreadyPersisted(m)).ToList(); + } + + /// + /// Determines whether a message has already been persisted to chat history by this decorator. + /// + private static bool IsAlreadyPersisted(ChatMessage message) + { + if (message.AdditionalProperties?.TryGetValue(PersistedMarkerKey, out var value) == true && value is true) + { + return true; + } + + if (message.GetAgentRequestMessageSourceType() == AgentRequestMessageSourceType.ChatHistory) + { + return true; + } + + // In streaming mode, FunctionInvokingChatClient reconstructs ChatMessage objects via ToChatResponse() + // independently, producing different ChatMessage instances. However, the underlying AIContent objects + // (e.g., FunctionCallContent, FunctionResultContent) are shared references. Checking for markers on + // AIContent handles dedup in this case. + if (message.Contents.Count > 0 && message.Contents.All(c => c.AdditionalProperties?.TryGetValue(PersistedMarkerKey, out var value) == true && value is true)) + { + return true; + } + + return false; + } + + /// + /// Marks the given messages as persisted by setting a marker on both the + /// and each of its items. + /// + /// + /// Both levels are marked because may reconstruct + /// objects in streaming mode (losing the message-level marker), + /// but the references are shared and retain their markers. + /// + /// The messages to mark as persisted. + private static void MarkAsPersisted(IEnumerable messages) + { + foreach (var message in messages) + { + message.AdditionalProperties ??= new(); + message.AdditionalProperties[PersistedMarkerKey] = true; + + foreach (var content in message.Contents) + { + content.AdditionalProperties ??= new(); + content.AdditionalProperties[PersistedMarkerKey] = true; + } + } + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs b/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs new file mode 100644 index 0000000000..459859224f --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs @@ -0,0 +1,766 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.AI; +using Microsoft.Extensions.DependencyInjection; +using Moq; +using Moq.Protected; + +namespace Microsoft.Agents.AI.UnitTests; + +/// +/// Contains unit tests for the decorator, +/// verifying that it persists messages via the after each +/// individual service call by default, or marks messages for end-of-run persistence when the +/// option is enabled. +/// +public class ChatHistoryPersistingChatClientTests +{ + /// + /// Verifies that by default (PersistChatHistoryAtEndOfRun is false), + /// the ChatHistoryProvider receives messages after a successful non-streaming call. + /// + [Fact] + public async Task RunAsync_PersistsMessagesPerServiceCall_ByDefaultAsync() + { + // Arrange + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())).ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAtEndOfRun = false, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await agent.RunAsync([new(ChatRole.User, "test")], session); + + // Assert — InvokedCoreAsync should be called by the decorator (per service call) + mockChatHistoryProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.RequestMessages.Any(m => m.Text == "test") && + x.ResponseMessages!.Any(m => m.Text == "response")), + ItExpr.IsAny()); + } + + /// + /// Verifies that when per-service-call persistence is active (default), + /// the ChatHistoryProvider receives messages at the end of the run. + /// + [Fact] + public async Task RunAsync_PersistsMessagesAtEndOfRun_WhenOptionEnabledAsync() + { + // Arrange + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())).ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAtEndOfRun = true, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await agent.RunAsync([new(ChatRole.User, "test")], session); + + // Assert — InvokedCoreAsync should be called once by the agent (end of run) + mockChatHistoryProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.RequestMessages.Any(m => m.Text == "test") && + x.ResponseMessages!.Any(m => m.Text == "response")), + ItExpr.IsAny()); + } + + /// + /// Verifies that when per-service-call persistence is active (default) and the service call fails, + /// the ChatHistoryProvider is notified with the exception. + /// + [Fact] + public async Task RunAsync_NotifiesProviderOfFailure_WhenPerServiceCallPersistenceActiveAsync() + { + // Arrange + var expectedException = new InvalidOperationException("Service failed"); + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())).ThrowsAsync(expectedException); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAtEndOfRun = false, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await Assert.ThrowsAsync(() => agent.RunAsync([new(ChatRole.User, "test")], session)); + + // Assert — the decorator should have notified the provider of the failure + mockChatHistoryProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.InvokeException != null && + x.InvokeException.Message == "Service failed"), + ItExpr.IsAny()); + } + + /// + /// Verifies that the decorator is injected in persist mode by default + /// and can be discovered via GetService. + /// + [Fact] + public void ChatClient_ContainsDecorator_InPersistMode_ByDefault() + { + // Arrange + Mock mockService = new(); + + // Act + ChatClientAgent agent = new(mockService.Object, options: new()); + + // Assert + var decorator = agent.ChatClient.GetService(); + Assert.NotNull(decorator); + Assert.False(decorator.MarkOnly); + } + + /// + /// Verifies that the decorator is injected in mark-only mode when PersistChatHistoryAtEndOfRun is true. + /// + [Fact] + public void ChatClient_ContainsDecorator_InMarkOnlyMode_WhenPersistAtEndOfRun() + { + // Arrange + Mock mockService = new(); + + // Act + ChatClientAgent agent = new(mockService.Object, options: new() + { + PersistChatHistoryAtEndOfRun = true, + }); + + // Assert + var decorator = agent.ChatClient.GetService(); + Assert.NotNull(decorator); + Assert.True(decorator.MarkOnly); + } + + /// + /// Verifies that the decorator is NOT injected when UseProvidedChatClientAsIs is true. + /// + [Fact] + public void ChatClient_DoesNotContainDecorator_WhenUseProvidedChatClientAsIs() + { + // Arrange + Mock mockService = new(); + + // Act + ChatClientAgent agent = new(mockService.Object, options: new() + { + UseProvidedChatClientAsIs = true, + }); + + // Assert + var decorator = agent.ChatClient.GetService(); + Assert.Null(decorator); + } + + /// + /// Verifies that the PersistChatHistoryAtEndOfRun option is included in Clone(). + /// + [Fact] + public void ChatClientAgentOptions_Clone_IncludesPersistChatHistoryAtEndOfRun() + { + // Arrange + var options = new ChatClientAgentOptions + { + PersistChatHistoryAtEndOfRun = true, + }; + + // Act + var cloned = options.Clone(); + + // Assert + Assert.True(cloned.PersistChatHistoryAtEndOfRun); + } + + /// + /// Verifies that when per-service-call persistence is active (default) and the service call + /// involves a function invocation loop, the ChatHistoryProvider is called after each individual + /// service call (not just once at the end). + /// + [Fact] + public async Task RunAsync_PersistsPerServiceCall_DuringFunctionInvocationLoopAsync() + { + // Arrange + int serviceCallCount = 0; + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns(() => + { + serviceCallCount++; + if (serviceCallCount == 1) + { + // First call returns a tool call + return Task.FromResult(new ChatResponse([new(ChatRole.Assistant, [new FunctionCallContent("call1", "myTool", new Dictionary())])])); + } + + // Second call returns a final response + return Task.FromResult(new ChatResponse([new(ChatRole.Assistant, "final response")])); + }); + + var invokedContexts = new List(); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Callback((ChatHistoryProvider.InvokedContext ctx, CancellationToken _) => invokedContexts.Add(ctx)) + .Returns(() => new ValueTask()); + + // Define a simple tool + var tool = AIFunctionFactory.Create(() => "tool result", "myTool", "A test tool"); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatOptions = new() { Tools = [tool] }, + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAtEndOfRun = false, + }, services: new ServiceCollection().BuildServiceProvider()); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + Exception? caughtException = null; + try + { + await agent.RunAsync([new(ChatRole.User, "test")], session); + } + catch (Exception ex) + { + caughtException = ex; + } + + // Diagnostic: check if there was an unexpected exception + Assert.Null(caughtException); + + // Assert — the decorator should have been called twice (once per service call in the function invocation loop) + Assert.Equal(2, serviceCallCount); + Assert.Equal(2, invokedContexts.Count); + + // First invocation should have the user message as request and tool call response + Assert.NotNull(invokedContexts[0].ResponseMessages); + var firstRequestMessages = invokedContexts[0].RequestMessages.ToList(); + Assert.Contains(firstRequestMessages, m => m.Text == "test"); + Assert.Contains(invokedContexts[0].ResponseMessages!, m => m.Contents.OfType().Any()); + + // Second invocation: request messages should NOT include the original user message (already notified). + // It should only include messages added since the first call (assistant tool call + tool result). + Assert.NotNull(invokedContexts[1].ResponseMessages); + var secondRequestMessages = invokedContexts[1].RequestMessages.ToList(); + Assert.DoesNotContain(secondRequestMessages, m => m.Text == "test"); + Assert.Contains(invokedContexts[1].ResponseMessages!, m => m.Text == "final response"); + } + + /// + /// Verifies that when per-service-call persistence is active (default) with streaming, + /// the ChatHistoryProvider receives messages after the stream completes. + /// + [Fact] + public async Task RunStreamingAsync_PersistsMessagesPerServiceCall_ByDefaultAsync() + { + // Arrange + Mock mockService = new(); + mockService.Setup( + s => s.GetStreamingResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns(CreateAsyncEnumerableAsync( + new ChatResponseUpdate(ChatRole.Assistant, "streaming "), + new ChatResponseUpdate(ChatRole.Assistant, "response"))); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAtEndOfRun = false, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await foreach (var _ in agent.RunStreamingAsync([new(ChatRole.User, "test")], session)) + { + // Consume stream + } + + // Assert — InvokedCoreAsync should be called by the decorator + mockChatHistoryProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.RequestMessages.Any(m => m.Text == "test") && + x.ResponseMessages != null), + ItExpr.IsAny()); + } + + /// + /// Verifies that when per-service-call persistence is active (default), + /// AIContextProviders are also notified of new messages after a successful call. + /// + [Fact] + public async Task RunAsync_NotifiesAIContextProviders_ByDefaultAsync() + { + // Arrange + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())).ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); + + Mock mockContextProvider = new(null, null, null); + mockContextProvider.SetupGet(p => p.StateKeys).Returns(["TestAIContextProvider"]); + mockContextProvider + .Protected() + .Setup>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask(new AIContext())); + mockContextProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + AIContextProviders = [mockContextProvider.Object], + PersistChatHistoryAtEndOfRun = false, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await agent.RunAsync([new(ChatRole.User, "test")], session); + + // Assert — InvokedCoreAsync should be called by the decorator for the AIContextProvider + mockContextProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.ResponseMessages != null && + x.ResponseMessages.Any(m => m.Text == "response")), + ItExpr.IsAny()); + } + + /// + /// Verifies that when per-service-call persistence is active (default) and the service fails, + /// AIContextProviders are notified of the failure. + /// + [Fact] + public async Task RunAsync_NotifiesAIContextProvidersOfFailure_ByDefaultAsync() + { + // Arrange + var expectedException = new InvalidOperationException("Service failed"); + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())).ThrowsAsync(expectedException); + + Mock mockContextProvider = new(null, null, null); + mockContextProvider.SetupGet(p => p.StateKeys).Returns(["TestAIContextProvider"]); + mockContextProvider + .Protected() + .Setup>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask(new AIContext())); + mockContextProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + AIContextProviders = [mockContextProvider.Object], + PersistChatHistoryAtEndOfRun = false, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await Assert.ThrowsAsync(() => agent.RunAsync([new(ChatRole.User, "test")], session)); + + // Assert — the decorator should have notified the AIContextProvider of the failure + mockContextProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.InvokeException != null && + x.InvokeException.Message == "Service failed"), + ItExpr.IsAny()); + } + + /// + /// Verifies that when per-service-call persistence is active (default), + /// both ChatHistoryProvider and AIContextProviders are notified together. + /// + [Fact] + public async Task RunAsync_NotifiesBothProviders_ByDefaultAsync() + { + // Arrange + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())).ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask()); + + Mock mockContextProvider = new(null, null, null); + mockContextProvider.SetupGet(p => p.StateKeys).Returns(["TestAIContextProvider"]); + mockContextProvider + .Protected() + .Setup>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask(new AIContext())); + mockContextProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + AIContextProviders = [mockContextProvider.Object], + PersistChatHistoryAtEndOfRun = false, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await agent.RunAsync([new(ChatRole.User, "test")], session); + + // Assert — both providers should have been notified + mockChatHistoryProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.ResponseMessages != null && + x.ResponseMessages.Any(m => m.Text == "response")), + ItExpr.IsAny()); + + mockContextProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.ResponseMessages != null && + x.ResponseMessages.Any(m => m.Text == "response")), + ItExpr.IsAny()); + } + + /// + /// Verifies that during a FIC loop, response messages from the first call are not + /// re-notified as request messages on the second call. + /// + [Fact] + public async Task RunAsync_DoesNotReNotifyResponseMessagesAsRequestMessages_DuringFicLoopAsync() + { + // Arrange + int serviceCallCount = 0; + var assistantToolCallMessage = new ChatMessage(ChatRole.Assistant, [new FunctionCallContent("call1", "myTool", new Dictionary())]); + + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns(() => + { + serviceCallCount++; + if (serviceCallCount == 1) + { + return Task.FromResult(new ChatResponse([assistantToolCallMessage])); + } + + return Task.FromResult(new ChatResponse([new(ChatRole.Assistant, "final response")])); + }); + + var invokedContexts = new List(); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Callback((ChatHistoryProvider.InvokedContext ctx, CancellationToken _) => invokedContexts.Add(ctx)) + .Returns(() => new ValueTask()); + + var tool = AIFunctionFactory.Create(() => "tool result", "myTool", "A test tool"); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatOptions = new() { Tools = [tool] }, + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAtEndOfRun = false, + }, services: new ServiceCollection().BuildServiceProvider()); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await agent.RunAsync([new(ChatRole.User, "test")], session); + + // Assert + Assert.Equal(2, invokedContexts.Count); + + // The assistant tool call message was a response in call 1 + Assert.Contains(invokedContexts[0].ResponseMessages!, m => ReferenceEquals(m, assistantToolCallMessage)); + + // It should NOT appear as a request in call 2 (it was already notified as a response) + var secondRequestMessages = invokedContexts[1].RequestMessages.ToList(); + Assert.DoesNotContain(secondRequestMessages, m => ReferenceEquals(m, assistantToolCallMessage)); + } + + /// + /// Verifies that when a failure occurs on the second call in a FIC loop, + /// only new request messages (not previously notified) are sent in the failure notification. + /// + [Fact] + public async Task RunAsync_DeduplicatesRequestMessages_OnFailureDuringFicLoopAsync() + { + // Arrange + int serviceCallCount = 0; + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns(() => + { + serviceCallCount++; + if (serviceCallCount == 1) + { + return Task.FromResult(new ChatResponse([new(ChatRole.Assistant, [new FunctionCallContent("call1", "myTool", new Dictionary())])])); + } + + throw new InvalidOperationException("Service failure on second call"); + }); + + var invokedContexts = new List(); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Callback((ChatHistoryProvider.InvokedContext ctx, CancellationToken _) => invokedContexts.Add(ctx)) + .Returns(() => new ValueTask()); + + var tool = AIFunctionFactory.Create(() => "tool result", "myTool", "A test tool"); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatOptions = new() { Tools = [tool] }, + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAtEndOfRun = false, + }, services: new ServiceCollection().BuildServiceProvider()); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await Assert.ThrowsAsync(() => + agent.RunAsync([new(ChatRole.User, "test")], session)); + + // Assert — should have 2 notifications: success on call 1, failure on call 2 + Assert.Equal(2, invokedContexts.Count); + + // First notification: success, has user message as request + Assert.Null(invokedContexts[0].InvokeException); + Assert.Contains(invokedContexts[0].RequestMessages, m => m.Text == "test"); + + // Second notification: failure, should NOT include the user message (already notified) + Assert.NotNull(invokedContexts[1].InvokeException); + var failureRequestMessages = invokedContexts[1].RequestMessages.ToList(); + Assert.DoesNotContain(failureRequestMessages, m => m.Text == "test"); + } + + /// + /// Verifies that after a successful run with per-service-call persistence, the notified + /// messages are stamped with the persisted marker so they are not re-notified. + /// + [Fact] + public async Task RunAsync_MarksNotifiedMessages_WithPersistedMarkerAsync() + { + // Arrange + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())).ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAtEndOfRun = false, + }); + + // Act + var inputMessage = new ChatMessage(ChatRole.User, "test"); + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await agent.RunAsync([inputMessage], session); + + // Assert — input message should be marked as persisted + Assert.True( + inputMessage.AdditionalProperties?.ContainsKey(ChatHistoryPersistingChatClient.PersistedMarkerKey) == true, + "Input message should be marked as persisted after a successful run."); + } + + /// + /// Verifies that when per-service-call persistence is enabled and the inner client returns a + /// conversation ID, the session's ConversationId is updated after the service call. + /// + [Fact] + public async Task RunAsync_UpdatesSessionConversationId_WhenPerServiceCallPersistenceEnabledAsync() + { + // Arrange + const string ExpectedConversationId = "conv-123"; + + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")]) + { + ConversationId = ExpectedConversationId, + }); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + PersistChatHistoryAtEndOfRun = false, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await agent.RunAsync([new(ChatRole.User, "test")], session); + + // Assert — session should have the conversation ID returned by the inner client + Assert.Equal(ExpectedConversationId, session!.ConversationId); + } + + private static async IAsyncEnumerable CreateAsyncEnumerableAsync(params ChatResponseUpdate[] updates) + { + foreach (var update in updates) + { + yield return update; + } + + await Task.CompletedTask; + } +}