From 345f6165ca35f8bdd26a320062f4c66e0d64dc51 Mon Sep 17 00:00:00 2001 From: Peter Ibekwe Date: Fri, 27 Feb 2026 14:38:36 -0800 Subject: [PATCH 1/9] Handle external input request and response conversion for workflow as agent scenario --- .../WorkflowSession.cs | 133 +++++++++++- .../WorkflowHostSmokeTests.cs | 198 ++++++++++++++++++ 2 files changed, 328 insertions(+), 3 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs index 40a18dbadb..a045c6de8a 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs @@ -25,6 +25,10 @@ internal sealed class WorkflowSession : AgentSession private InMemoryCheckpointManager? _inMemoryCheckpointManager; + // Key prefix used in StateBag to track pending external requests by their content ID + // This enables converting incoming response content back to ExternalResponse when resuming. + private const string PendingRequestKeyPrefix = "workflow.pendingRequest:"; + internal static bool VerifyCheckpointingConfiguration(IWorkflowExecutionEnvironment executionEnvironment, [NotNullWhen(true)] out InProcessExecutionEnvironment? inProcEnv) { inProcEnv = null; @@ -154,7 +158,8 @@ await this._executionEnvironment cancellationToken) .ConfigureAwait(false); - await run.TrySendMessageAsync(messages).ConfigureAwait(false); + // Process messages: convert response content to ExternalResponse, send regular messages as-is + await this.SendMessagesWithResponseConversionAsync(run, messages, cancellationToken).ConfigureAwait(false); return run; } @@ -166,6 +171,116 @@ await this._executionEnvironment .ConfigureAwait(false); } + /// + /// Sends messages to the run, converting FunctionResultContent and UserInputResponseContent + /// to ExternalResponse when there's a matching pending request. + /// + private async ValueTask SendMessagesWithResponseConversionAsync(StreamingRun run, List messages, CancellationToken cancellationToken) + { + List regularMessages = []; + + foreach (ChatMessage message in messages) + { + List regularContents = []; + + foreach (AIContent content in message.Contents) + { + if (this.TryCreateExternalResponse(content) is ExternalResponse response) + { + await run.SendResponseAsync(response).ConfigureAwait(false); + + if (GetResponseContentId(content) is string contentId) + { + this.RemovePendingRequest(contentId); + } + } + else + { + regularContents.Add(content); + } + } + + if (regularContents.Count > 0) + { + regularMessages.Add(new ChatMessage(message.Role, regularContents) + { + AuthorName = message.AuthorName, + MessageId = message.MessageId, + CreatedAt = message.CreatedAt + }); + } + } + + // Send any remaining regular messages + if (regularMessages.Count > 0) + { + await run.TrySendMessageAsync(regularMessages).ConfigureAwait(false); + } + } + + /// + /// Attempts to create an ExternalResponse from response content (FunctionResultContent or UserInputResponseContent) + /// by matching it to a pending request. + /// + private ExternalResponse? TryCreateExternalResponse(AIContent content) + { + string? contentId = GetResponseContentId(content); + if (contentId == null) + { + return null; + } + + ExternalRequest? pendingRequest = this.TryGetPendingRequest(contentId); + if (pendingRequest == null) + { + return null; + } + + // Create the response data based on content type + object? responseData = content switch + { + FunctionResultContent functionResultContent => functionResultContent, + UserInputResponseContent userInputResponseContent => userInputResponseContent, + _ => null + }; + + if (responseData == null) + { + return null; + } + + // Create ExternalResponse using the pending request's port info + return new ExternalResponse(pendingRequest.PortInfo, pendingRequest.RequestId, new PortableValue(responseData)); + } + + /// + /// Gets the content ID from response content types. + /// + private static string? GetResponseContentId(AIContent content) => content switch + { + FunctionResultContent functionResultContent => functionResultContent.CallId, + UserInputResponseContent userInputResponseContent => userInputResponseContent.Id, + _ => null + }; + + /// + /// Tries to get a pending request from the state bag by content ID. + /// + private ExternalRequest? TryGetPendingRequest(string contentId) => + this.StateBag.GetValue(PendingRequestKeyPrefix + contentId); + + /// + /// Adds a pending request to the state bag. + /// + private void AddPendingRequest(string contentId, ExternalRequest request) => + this.StateBag.SetValue(PendingRequestKeyPrefix + contentId, request); + + /// + /// Removes a pending request from the state bag. + /// + private void RemovePendingRequest(string contentId) => + this.StateBag.TryRemoveValue(PendingRequestKeyPrefix + contentId); + internal async IAsyncEnumerable InvokeStageAsync( [EnumeratorCancellation] CancellationToken cancellationToken = default) @@ -192,8 +307,20 @@ IAsyncEnumerable InvokeStageAsync( break; case RequestInfoEvent requestInfo: - FunctionCallContent fcContent = requestInfo.Request.ToFunctionCall(); - AgentResponseUpdate update = this.CreateUpdate(this.LastResponseId, evt, fcContent); + (AIContent requestContent, string? contentId) = requestInfo.Request switch + { + ExternalRequest externalRequest when externalRequest.TryGetDataAs(out FunctionCallContent? fcc) => (fcc, fcc.CallId), + ExternalRequest externalRequest when externalRequest.TryGetDataAs(out UserInputRequestContent? uic) => (uic, uic.Id), + ExternalRequest externalRequest => ((AIContent)externalRequest.ToFunctionCall(), externalRequest.RequestId) + }; + + // Track the pending request so we can convert incoming responses back to ExternalResponse + if (contentId != null) + { + this.AddPendingRequest(contentId, requestInfo.Request); + } + + AgentResponseUpdate update = this.CreateUpdate(this.LastResponseId, evt, requestContent); yield return update; break; diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs index 40e4f2098f..f8184ee2c9 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs @@ -28,6 +28,43 @@ public ExpectedException(string? message, Exception? innerException) : base(mess } } +/// +/// A simple agent that emits a FunctionCallContent or UserInputRequestContent request. +/// Used to test that RequestInfoEvent handling preserves the original content type. +/// +internal sealed class RequestEmittingAgent : AIAgent +{ + private readonly AIContent _requestContent; + + public RequestEmittingAgent(AIContent requestContent) + { + this._requestContent = requestContent; + } + + private sealed class Session : AgentSession + { + public Session() { } + } + + protected override ValueTask DeserializeSessionCoreAsync(JsonElement serializedState, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default) + => new(new Session()); + + protected override ValueTask CreateSessionCoreAsync(CancellationToken cancellationToken = default) + => new(new Session()); + + protected override ValueTask SerializeSessionCoreAsync(AgentSession session, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default) + => default; + + protected override Task RunCoreAsync(IEnumerable messages, AgentSession? session = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) + => this.RunStreamingAsync(messages, session, options, cancellationToken).ToAgentResponseAsync(cancellationToken); + + protected override async IAsyncEnumerable RunCoreStreamingAsync(IEnumerable messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + // Emit the request content + yield return new AgentResponseUpdate(ChatRole.Assistant, [this._requestContent]); + } +} + public class WorkflowHostSmokeTests { private sealed class AlwaysFailsAIAgent(bool failByThrowing) : AIAgent @@ -112,4 +149,165 @@ public async Task Test_AsAgent_ErrorContentStreamedOutAsync(bool includeExceptio hadErrorContent.Should().BeTrue(); } + + /// + /// Tests that when a workflow emits a RequestInfoEvent with FunctionCallContent data, + /// the AgentResponseUpdate preserves the original FunctionCallContent type. + /// Regression test for GitHub issue #3029. + /// + [Fact] + public async Task Test_AsAgent_FunctionCallContentPreservedInRequestInfoAsync() + { + // Arrange + const string CallId = "test-call-id"; + const string FunctionName = "testFunction"; + FunctionCallContent originalContent = new(CallId, FunctionName); + RequestEmittingAgent requestAgent = new(originalContent); + ExecutorBinding agentBinding = requestAgent.BindAsExecutor( + new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true }); + Workflow workflow = new WorkflowBuilder(agentBinding).Build(); + + // Act + List updates = await workflow.AsAIAgent("WorkflowAgent") + .RunStreamingAsync(new ChatMessage(ChatRole.User, "Hello")) + .ToListAsync(); + + // Assert + AgentResponseUpdate? updateWithFunctionCall = updates.FirstOrDefault(u => + u.Contents.Any(c => c is FunctionCallContent)); + + updateWithFunctionCall.Should().NotBeNull("a FunctionCallContent should be present in the response updates"); + FunctionCallContent retrievedContent = updateWithFunctionCall!.Contents + .OfType() + .Should().ContainSingle() + .Which; + + retrievedContent.CallId.Should().Be(CallId); + retrievedContent.Name.Should().Be(FunctionName); + } + + /// + /// Tests that when a workflow emits a RequestInfoEvent with UserInputRequestContent data, + /// the AgentResponseUpdate preserves the original UserInputRequestContent type. + /// Regression test for GitHub issue #3029. + /// + [Fact] + public async Task Test_AsAgent_UserInputRequestContentPreservedInRequestInfoAsync() + { + // Arrange + const string RequestId = "test-request-id"; + McpServerToolCallContent mcpCalll = new("call-id", "testToolName", "http://localhost"); + UserInputRequestContent originalContent = new McpServerToolApprovalRequestContent(RequestId, mcpCalll); + RequestEmittingAgent requestAgent = new(originalContent); + ExecutorBinding agentBinding = requestAgent.BindAsExecutor( + new AIAgentHostOptions { InterceptUserInputRequests = false, EmitAgentUpdateEvents = true }); + Workflow workflow = new WorkflowBuilder(agentBinding).Build(); + + // Act + List updates = await workflow.AsAIAgent("WorkflowAgent") + .RunStreamingAsync(new ChatMessage(ChatRole.User, "Hello")) + .ToListAsync(); + + // Assert + AgentResponseUpdate? updateWithUserInput = updates.FirstOrDefault(u => + u.Contents.Any(c => c is UserInputRequestContent)); + + updateWithUserInput.Should().NotBeNull("a UserInputRequestContent should be present in the response updates"); + UserInputRequestContent retrievedContent = updateWithUserInput!.Contents + .OfType() + .Should().ContainSingle() + .Which; + + retrievedContent.Should().BeOfType(); + retrievedContent.Id.Should().Be(RequestId); + } + + /// + /// Tests the full roundtrip: workflow emits a request, external caller responds, workflow processes response. + /// + [Fact] + public async Task Test_AsAgent_FunctionCallRoundtrip_ResponseIsProcessedAsync() + { + // Arrange: Create an agent that emits a FunctionCallContent request + const string CallId = "roundtrip-call-id"; + const string FunctionName = "testFunction"; + FunctionCallContent requestContent = new(CallId, FunctionName); + RequestEmittingAgent requestAgent = new(requestContent); + ExecutorBinding agentBinding = requestAgent.BindAsExecutor( + new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true }); + Workflow workflow = new WorkflowBuilder(agentBinding).Build(); + AIAgent agent = workflow.AsAIAgent("WorkflowAgent"); + + // Act 1: First call - should receive the FunctionCallContent request + AgentSession session = await agent.CreateSessionAsync(); + List firstCallUpdates = await agent.RunStreamingAsync( + new ChatMessage(ChatRole.User, "Start"), + session).ToListAsync(); + + // Assert 1: We should have received a FunctionCallContent + AgentResponseUpdate? updateWithRequest = firstCallUpdates.FirstOrDefault(u => + u.Contents.Any(c => c is FunctionCallContent)); + updateWithRequest.Should().NotBeNull("a FunctionCallContent should be present in the response updates"); + + FunctionCallContent receivedRequest = updateWithRequest!.Contents + .OfType() + .First(); + receivedRequest.CallId.Should().Be(CallId); + + // Act 2: Send the response back + FunctionResultContent responseContent = new(CallId, "test result"); + ChatMessage responseMessage = new(ChatRole.Tool, [responseContent]); + + // This should work without throwing - the response should be converted to ExternalResponse + // and processed by the workflow + Func sendResponse = () => agent.RunStreamingAsync(responseMessage, session).ToListAsync().AsTask(); + + // Assert 2: The response should be accepted without error + await sendResponse.Should().NotThrowAsync("the response should be converted to ExternalResponse and processed"); + } + + /// + /// Tests the full roundtrip for UserInputRequestContent: workflow emits request, external caller responds. + /// Verifying inbound UserInputResponseContent conversion. + /// + [Fact] + public async Task Test_AsAgent_UserInputRoundtrip_ResponseIsProcessedAsync() + { + // Arrange: Create an agent that emits a UserInputRequestContent request + const string RequestId = "roundtrip-request-id"; + McpServerToolCallContent mcpCall = new("mcp-call-id", "testMcpTool", "http://localhost"); + McpServerToolApprovalRequestContent requestContent = new(RequestId, mcpCall); + RequestEmittingAgent requestAgent = new(requestContent); + ExecutorBinding agentBinding = requestAgent.BindAsExecutor( + new AIAgentHostOptions { InterceptUserInputRequests = false, EmitAgentUpdateEvents = true }); + Workflow workflow = new WorkflowBuilder(agentBinding).Build(); + AIAgent agent = workflow.AsAIAgent("WorkflowAgent"); + + // Act 1: First call - should receive the UserInputRequestContent request + AgentSession session = await agent.CreateSessionAsync(); + List firstCallUpdates = await agent.RunStreamingAsync( + new ChatMessage(ChatRole.User, "Start"), + session).ToListAsync(); + + // Assert 1: We should have received a UserInputRequestContent + AgentResponseUpdate? updateWithRequest = firstCallUpdates.FirstOrDefault(u => + u.Contents.Any(c => c is UserInputRequestContent)); + updateWithRequest.Should().NotBeNull("a UserInputRequestContent should be present in the response updates"); + + UserInputRequestContent receivedRequest = updateWithRequest!.Contents + .OfType() + .First(); + receivedRequest.Id.Should().Be(RequestId); + + // Act 2: Send the response back - use CreateResponse to get the right response type + UserInputResponseContent responseContent = requestContent.CreateResponse(approved: true); + ChatMessage responseMessage = new(ChatRole.User, [responseContent]); + + // This should work without throwing - the response should be converted to ExternalResponse + // and processed by the workflow + Func sendResponse = () => agent.RunStreamingAsync(responseMessage, session).ToListAsync().AsTask(); + + // Assert 2: The response should be accepted without error + await sendResponse.Should().NotThrowAsync("the response should be converted to ExternalResponse and processed"); + } } From 9ed290d617ed70c6c9ae66590df8d2e2cf5696fe Mon Sep 17 00:00:00 2001 From: Peter Ibekwe Date: Fri, 27 Feb 2026 14:57:41 -0800 Subject: [PATCH 2/9] Remove unnecessary test comment --- .../WorkflowHostSmokeTests.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs index f8184ee2c9..8f1c5f1c89 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs @@ -153,7 +153,6 @@ public async Task Test_AsAgent_ErrorContentStreamedOutAsync(bool includeExceptio /// /// Tests that when a workflow emits a RequestInfoEvent with FunctionCallContent data, /// the AgentResponseUpdate preserves the original FunctionCallContent type. - /// Regression test for GitHub issue #3029. /// [Fact] public async Task Test_AsAgent_FunctionCallContentPreservedInRequestInfoAsync() @@ -189,7 +188,6 @@ public async Task Test_AsAgent_FunctionCallContentPreservedInRequestInfoAsync() /// /// Tests that when a workflow emits a RequestInfoEvent with UserInputRequestContent data, /// the AgentResponseUpdate preserves the original UserInputRequestContent type. - /// Regression test for GitHub issue #3029. /// [Fact] public async Task Test_AsAgent_UserInputRequestContentPreservedInRequestInfoAsync() From 7dbd68ea9719bd0d57f98bfbf19cf84ae9fbfc2c Mon Sep 17 00:00:00 2001 From: Peter Ibekwe Date: Mon, 2 Mar 2026 14:16:58 -0800 Subject: [PATCH 3/9] Fix PR comments --- .../WorkflowSession.cs | 13 +++++-------- .../WorkflowHostSmokeTests.cs | 4 ++-- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs index a045c6de8a..8041429a47 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs @@ -159,7 +159,7 @@ await this._executionEnvironment .ConfigureAwait(false); // Process messages: convert response content to ExternalResponse, send regular messages as-is - await this.SendMessagesWithResponseConversionAsync(run, messages, cancellationToken).ConfigureAwait(false); + await this.SendMessagesWithResponseConversionAsync(run, messages).ConfigureAwait(false); return run; } @@ -175,7 +175,7 @@ await this._executionEnvironment /// Sends messages to the run, converting FunctionResultContent and UserInputResponseContent /// to ExternalResponse when there's a matching pending request. /// - private async ValueTask SendMessagesWithResponseConversionAsync(StreamingRun run, List messages, CancellationToken cancellationToken) + private async ValueTask SendMessagesWithResponseConversionAsync(StreamingRun run, List messages) { List regularMessages = []; @@ -202,12 +202,9 @@ private async ValueTask SendMessagesWithResponseConversionAsync(StreamingRun run if (regularContents.Count > 0) { - regularMessages.Add(new ChatMessage(message.Role, regularContents) - { - AuthorName = message.AuthorName, - MessageId = message.MessageId, - CreatedAt = message.CreatedAt - }); + ChatMessage cloned = message.Clone(); + cloned.Contents = regularContents; + regularMessages.Add(cloned); } } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs index 8f1c5f1c89..35db39b963 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs @@ -194,8 +194,8 @@ public async Task Test_AsAgent_UserInputRequestContentPreservedInRequestInfoAsyn { // Arrange const string RequestId = "test-request-id"; - McpServerToolCallContent mcpCalll = new("call-id", "testToolName", "http://localhost"); - UserInputRequestContent originalContent = new McpServerToolApprovalRequestContent(RequestId, mcpCalll); + McpServerToolCallContent mcpCall = new("call-id", "testToolName", "http://localhost"); + UserInputRequestContent originalContent = new McpServerToolApprovalRequestContent(RequestId, mcpCall); RequestEmittingAgent requestAgent = new(originalContent); ExecutorBinding agentBinding = requestAgent.BindAsExecutor( new AIAgentHostOptions { InterceptUserInputRequests = false, EmitAgentUpdateEvents = true }); From e143c3de15c19e5248fa572b89347cade1b6762c Mon Sep 17 00:00:00 2001 From: Peter Ibekwe Date: Tue, 3 Mar 2026 14:07:24 -0800 Subject: [PATCH 4/9] Updated to fix edge cases, and add more tests. --- .../Specialized/AIAgentHostExecutor.cs | 32 ++- .../Specialized/AIContentExternalHandler.cs | 4 +- .../WorkflowSession.cs | 117 +++++++---- .../WorkflowHostSmokeTests.cs | 198 ++++++++++++++++-- 4 files changed, 285 insertions(+), 66 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs index a38f49681a..d1737d62f8 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs @@ -68,10 +68,17 @@ private ValueTask HandleUserInputResponseAsync( throw new InvalidOperationException($"No pending UserInputRequest found with id '{response.Id}'."); } - List implicitTurnMessages = [new ChatMessage(ChatRole.User, [response])]; + // Merge the external response with any already-buffered regular messages so mixed-content + // resumes can be processed in one invocation. + return this.ProcessTurnMessagesAsync(async (pendingMessages, ctx, ct) => + { + pendingMessages.Add(new ChatMessage(ChatRole.User, [response])); + + await this.ContinueTurnAsync(pendingMessages, ctx, this._currentTurnEmitEvents ?? false, ct).ConfigureAwait(false); - // ContinueTurnAsync owns failing to emit a TurnToken if this response does not clear up all remaining outstanding requests. - return this.ContinueTurnAsync(implicitTurnMessages, context, this._currentTurnEmitEvents ?? false, cancellationToken); + // Clear the buffered turn messages because they were consumed by ContinueTurnAsync. + return null; + }, context, cancellationToken); } private ValueTask HandleFunctionResultAsync( @@ -84,8 +91,17 @@ private ValueTask HandleFunctionResultAsync( throw new InvalidOperationException($"No pending FunctionCall found with id '{result.CallId}'."); } - List implicitTurnMessages = [new ChatMessage(ChatRole.Tool, [result])]; - return this.ContinueTurnAsync(implicitTurnMessages, context, this._currentTurnEmitEvents ?? false, cancellationToken); + // Merge the external response with any already-buffered regular messages so mixed-content + // resumes can be processed in one invocation. + return this.ProcessTurnMessagesAsync(async (pendingMessages, ctx, ct) => + { + pendingMessages.Add(new ChatMessage(ChatRole.Tool, [result])); + + await this.ContinueTurnAsync(pendingMessages, ctx, this._currentTurnEmitEvents ?? false, ct).ConfigureAwait(false); + + // Clear the buffered turn messages because they were consumed by ContinueTurnAsync. + return null; + }, context, cancellationToken); } public bool ShouldEmitStreamingEvents(bool? emitEvents) @@ -164,8 +180,8 @@ protected override ValueTask TakeTurnAsync(List messages, IWorkflow private async ValueTask InvokeAgentAsync(IEnumerable messages, IWorkflowContext context, bool emitEvents, CancellationToken cancellationToken = default) { #pragma warning disable MEAI001 - Dictionary userInputRequests = new(); - Dictionary functionCalls = new(); + Dictionary userInputRequests = []; + Dictionary functionCalls = []; AgentResponse response; if (emitEvents) @@ -198,7 +214,7 @@ await this.EnsureSessionAsync(context, cancellationToken).ConfigureAwait(false), ExtractUnservicedRequests(response.Messages.SelectMany(message => message.Contents)); } - if (this._options.EmitAgentResponseEvents == true) + if (this._options.EmitAgentResponseEvents) { await context.YieldOutputAsync(response, cancellationToken).ConfigureAwait(false); } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIContentExternalHandler.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIContentExternalHandler.cs index 9173100b3e..c19fab62bb 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIContentExternalHandler.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIContentExternalHandler.cs @@ -58,7 +58,9 @@ public ValueTask ProcessRequestContentAsync(string id, TRequestContent requestCo { if (!this._pendingRequests.TryAdd(id, requestContent)) { - throw new InvalidOperationException($"A pending request with ID '{id}' already exists."); + // Request is already pending; treat as an idempotent re-emission. + // Do not repost to the sink because request IDs must remain unique while pending. + return default; } return this.IsIntercepted diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs index 8041429a47..64354c58c4 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs @@ -145,7 +145,7 @@ public AgentResponseUpdate CreateUpdate(string responseId, object raw, ChatMessa return update; } - private async ValueTask CreateOrResumeRunAsync(List messages, CancellationToken cancellationToken = default) + private async ValueTask CreateOrResumeRunAsync(List messages, CancellationToken cancellationToken = default) { // The workflow is validated to be a ChatProtocol workflow by the WorkflowHostAgent before creating the session, // and does not need to be checked again here. @@ -159,46 +159,38 @@ await this._executionEnvironment .ConfigureAwait(false); // Process messages: convert response content to ExternalResponse, send regular messages as-is - await this.SendMessagesWithResponseConversionAsync(run, messages).ConfigureAwait(false); - return run; + bool hasMatchedExternalResponses = await this.SendMessagesWithResponseConversionAsync(run, messages).ConfigureAwait(false); + return new ResumeRunResult(run, hasMatchedExternalResponses: hasMatchedExternalResponses); } - return await this._executionEnvironment + StreamingRun newRun = await this._executionEnvironment .RunStreamingAsync(this._workflow, messages, this.SessionId, cancellationToken) .ConfigureAwait(false); + return new ResumeRunResult(newRun); } /// /// Sends messages to the run, converting FunctionResultContent and UserInputResponseContent /// to ExternalResponse when there's a matching pending request. /// - private async ValueTask SendMessagesWithResponseConversionAsync(StreamingRun run, List messages) + /// + /// if any external responses were sent; otherwise, . + /// + private async ValueTask SendMessagesWithResponseConversionAsync(StreamingRun run, List messages) { List regularMessages = []; + // Responses are deferred until after regular messages are queued so response handlers + // can merge buffered regular content in the same continuation turn. + List<(ExternalResponse Response, string? ContentId)> externalResponses = []; + bool hasMatchedExternalResponses = false; foreach (ChatMessage message in messages) { List regularContents = []; - - foreach (AIContent content in message.Contents) - { - if (this.TryCreateExternalResponse(content) is ExternalResponse response) - { - await run.SendResponseAsync(response).ConfigureAwait(false); - - if (GetResponseContentId(content) is string contentId) - { - this.RemovePendingRequest(contentId); - } - } - else - { - regularContents.Add(content); - } - } + PartitionMessageContents(message, regularContents); if (regularContents.Count > 0) { @@ -208,11 +200,41 @@ private async ValueTask SendMessagesWithResponseConversionAsync(StreamingRun run } } - // Send any remaining regular messages + // Send regular messages first so response handlers can merge them with responses. if (regularMessages.Count > 0) { await run.TrySendMessageAsync(regularMessages).ConfigureAwait(false); } + + // Send external responses after regular messages. + foreach ((ExternalResponse response, string? contentId) in externalResponses) + { + await run.SendResponseAsync(response).ConfigureAwait(false); + hasMatchedExternalResponses = true; + + if (contentId is string id) + { + this.RemovePendingRequest(id); + } + } + + return hasMatchedExternalResponses; + + void PartitionMessageContents(ChatMessage message, List regularContents) + { + foreach (AIContent content in message.Contents) + { + string? contentId = GetResponseContentId(content); + if (this.TryCreateExternalResponse(content) is ExternalResponse response) + { + externalResponses.Add((response, contentId)); + } + else + { + regularContents.Add(content); + } + } + } } /// @@ -233,21 +255,8 @@ private async ValueTask SendMessagesWithResponseConversionAsync(StreamingRun run return null; } - // Create the response data based on content type - object? responseData = content switch - { - FunctionResultContent functionResultContent => functionResultContent, - UserInputResponseContent userInputResponseContent => userInputResponseContent, - _ => null - }; - - if (responseData == null) - { - return null; - } - - // Create ExternalResponse using the pending request's port info - return new ExternalResponse(pendingRequest.PortInfo, pendingRequest.RequestId, new PortableValue(responseData)); + // Create ExternalResponse via the pending request to ensure proper validation and wrapping + return pendingRequest.CreateResponse(content); } /// @@ -287,12 +296,19 @@ IAsyncEnumerable InvokeStageAsync( this.LastResponseId = Guid.NewGuid().ToString("N"); List messages = this.ChatHistoryProvider.GetFromBookmark(this).ToList(); -#pragma warning disable CA2007 // Analyzer misfiring and not seeing .ConfigureAwait(false) below. - await using StreamingRun run = + ResumeRunResult resumeResult = await this.CreateOrResumeRunAsync(messages, cancellationToken).ConfigureAwait(false); +#pragma warning disable CA2007 // Analyzer misfiring. + await using StreamingRun run = resumeResult.Run; #pragma warning restore CA2007 - await run.TrySendMessageAsync(new TurnToken(emitEvents: true)).ConfigureAwait(false); + // Send a TurnToken only when no external responses were delivered. + // External response handlers already drive continuation turns and can merge + // buffered regular messages, so an extra TurnToken would cause a redundant turn. + if (!resumeResult.HasMatchedExternalResponses) + { + await run.TrySendMessageAsync(new TurnToken(emitEvents: true)).ConfigureAwait(false); + } await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false, cancellationToken) .ConfigureAwait(false) .WithCancellation(cancellationToken)) @@ -391,6 +407,25 @@ ExternalRequest externalRequest when externalRequest.TryGetDataAs(out UserInputR /// public WorkflowChatHistoryProvider ChatHistoryProvider { get; } + /// + /// Captures the outcome of creating or resuming a workflow run, + /// indicating what types of messages were sent during resume. + /// + private readonly struct ResumeRunResult + { + /// The streaming run that was created or resumed. + public StreamingRun Run { get; } + + /// Whether any external responses (e.g., ) were delivered. + public bool HasMatchedExternalResponses { get; } + + public ResumeRunResult(StreamingRun run, bool hasMatchedExternalResponses = false) + { + this.Run = Throw.IfNull(run); + this.HasMatchedExternalResponses = hasMatchedExternalResponses; + } + } + internal sealed class SessionState( string sessionId, CheckpointInfo? lastCheckpoint, diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs index 35db39b963..bd35e9825d 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs @@ -35,10 +35,22 @@ public ExpectedException(string? message, Exception? innerException) : base(mess internal sealed class RequestEmittingAgent : AIAgent { private readonly AIContent _requestContent; + private readonly bool _completeOnResponse; - public RequestEmittingAgent(AIContent requestContent) + /// + /// Creates a new that emits the given request content. + /// + /// The content to emit on each turn. + /// + /// When , the agent emits a text completion instead of re-emitting + /// the request when the incoming messages contain a + /// or . This models realistic agent behaviour + /// where the agent processes the tool result and produces a final answer. + /// + public RequestEmittingAgent(AIContent requestContent, bool completeOnResponse = false) { this._requestContent = requestContent; + this._completeOnResponse = completeOnResponse; } private sealed class Session : AgentSession @@ -60,8 +72,16 @@ protected override Task RunCoreAsync(IEnumerable mes protected override async IAsyncEnumerable RunCoreStreamingAsync(IEnumerable messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { - // Emit the request content - yield return new AgentResponseUpdate(ChatRole.Assistant, [this._requestContent]); + if (this._completeOnResponse && messages.Any(m => m.Contents.Any(c => + c is FunctionResultContent || c is UserInputResponseContent))) + { + yield return new AgentResponseUpdate(ChatRole.Assistant, [new TextContent("Request processed")]); + } + else + { + // Emit the request content + yield return new AgentResponseUpdate(ChatRole.Assistant, [this._requestContent]); + } } } @@ -230,7 +250,7 @@ public async Task Test_AsAgent_FunctionCallRoundtrip_ResponseIsProcessedAsync() const string CallId = "roundtrip-call-id"; const string FunctionName = "testFunction"; FunctionCallContent requestContent = new(CallId, FunctionName); - RequestEmittingAgent requestAgent = new(requestContent); + RequestEmittingAgent requestAgent = new(requestContent, completeOnResponse: true); ExecutorBinding agentBinding = requestAgent.BindAsExecutor( new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true }); Workflow workflow = new WorkflowBuilder(agentBinding).Build(); @@ -256,12 +276,17 @@ public async Task Test_AsAgent_FunctionCallRoundtrip_ResponseIsProcessedAsync() FunctionResultContent responseContent = new(CallId, "test result"); ChatMessage responseMessage = new(ChatRole.Tool, [responseContent]); - // This should work without throwing - the response should be converted to ExternalResponse - // and processed by the workflow - Func sendResponse = () => agent.RunStreamingAsync(responseMessage, session).ToListAsync().AsTask(); - - // Assert 2: The response should be accepted without error - await sendResponse.Should().NotThrowAsync("the response should be converted to ExternalResponse and processed"); + // Act 2: Run the workflow with the response and capture the resulting updates + List secondCallUpdates = await agent.RunStreamingAsync(responseMessage, session).ToListAsync(); + + // Assert 2: The response should be processed and the original request should no longer be pending. + // Concretely, the workflow should not re-emit a FunctionCallContent with the same CallId. + secondCallUpdates.Should().NotBeNull("processing the response should produce updates"); + secondCallUpdates.Should().NotBeEmpty("processing the response should progress the workflow"); + secondCallUpdates + .SelectMany(u => u.Contents.OfType()) + .Should() + .NotContain(c => c.CallId == CallId, "the original FunctionCallContent request should be cleared after processing the response"); } /// @@ -275,7 +300,7 @@ public async Task Test_AsAgent_UserInputRoundtrip_ResponseIsProcessedAsync() const string RequestId = "roundtrip-request-id"; McpServerToolCallContent mcpCall = new("mcp-call-id", "testMcpTool", "http://localhost"); McpServerToolApprovalRequestContent requestContent = new(RequestId, mcpCall); - RequestEmittingAgent requestAgent = new(requestContent); + RequestEmittingAgent requestAgent = new(requestContent, completeOnResponse: true); ExecutorBinding agentBinding = requestAgent.BindAsExecutor( new AIAgentHostOptions { InterceptUserInputRequests = false, EmitAgentUpdateEvents = true }); Workflow workflow = new WorkflowBuilder(agentBinding).Build(); @@ -301,11 +326,152 @@ public async Task Test_AsAgent_UserInputRoundtrip_ResponseIsProcessedAsync() UserInputResponseContent responseContent = requestContent.CreateResponse(approved: true); ChatMessage responseMessage = new(ChatRole.User, [responseContent]); - // This should work without throwing - the response should be converted to ExternalResponse - // and processed by the workflow - Func sendResponse = () => agent.RunStreamingAsync(responseMessage, session).ToListAsync().AsTask(); + // Act 2: Run the workflow again with the response and capture the updates + List secondCallUpdates = await agent.RunStreamingAsync(responseMessage, session).ToListAsync(); + + // Assert 2: The response should be applied so that the original request is no longer pending + secondCallUpdates.Should().NotBeEmpty("handling the user input response should produce follow-up updates"); + bool requestStillPresent = secondCallUpdates.Any(u => u.Contents.OfType().Any(r => r.Id == RequestId)); + requestStillPresent.Should().BeFalse("the original UserInputRequestContent should not be re-emitted after its response is processed"); + } + + /// + /// Tests the mixed-message scenario: resume contains both an external response + /// (FunctionResultContent matching a pending request) and regular non-response content + /// in the same message. + /// Verifies that regular content is still processed and that no duplicate + /// pending-request errors, redundant FunctionCallContent re-emissions, + /// or workflow errors occur. + /// + [Fact] + public async Task Test_AsAgent_MixedResponseAndRegularMessage_BothProcessedAsync() + { + // Arrange: Create an agent that emits a FunctionCallContent request + const string CallId = "mixed-call-id"; + const string FunctionName = "mixedTestFunction"; + FunctionCallContent requestContent = new(CallId, FunctionName); + RequestEmittingAgent requestAgent = new(requestContent, completeOnResponse: true); + ExecutorBinding agentBinding = requestAgent.BindAsExecutor( + new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true }); + Workflow workflow = new WorkflowBuilder(agentBinding).Build(); + AIAgent agent = workflow.AsAIAgent("WorkflowAgent"); + + // Act 1: First call - should receive the FunctionCallContent request + AgentSession session = await agent.CreateSessionAsync(); + List firstCallUpdates = await agent.RunStreamingAsync( + new ChatMessage(ChatRole.User, "Start"), + session).ToListAsync(); + + // Assert 1: We should have received a FunctionCallContent + firstCallUpdates.Should().Contain(u => u.Contents.Any(c => c is FunctionCallContent), + "the first call should emit a FunctionCallContent request"); + + // Act 2: Send a mixed message containing both the function result AND regular non-response content + FunctionResultContent responseContent = new(CallId, "tool output"); + ChatMessage mixedMessage = new(ChatRole.Tool, [responseContent, new TextContent("additional context")]); + + List secondCallUpdates = await agent.RunStreamingAsync(mixedMessage, session).ToListAsync(); + + // Assert 2: The workflow should have processed both parts without errors + secondCallUpdates.Should().NotBeEmpty("the mixed message should produce follow-up updates"); + secondCallUpdates + .SelectMany(u => u.Contents.OfType()) + .Should() + .NotContain(c => c.CallId == CallId, "the original FunctionCallContent should be cleared after the response is processed"); + secondCallUpdates + .SelectMany(u => u.Contents.OfType()) + .Should() + .BeEmpty("no workflow errors should occur when processing a mixed response-and-regular message"); + } + + [Fact] + public async Task Test_AsAgent_ResponseThenRegularAcrossMessages_NoDuplicateFunctionCallAsync() + { + const string CallId = "mixed-separate-call-id"; + const string FunctionName = "mixedSeparateTestFunction"; + + RequestEmittingAgent requestAgent = new(new FunctionCallContent(CallId, FunctionName), completeOnResponse: true); + ExecutorBinding agentBinding = requestAgent.BindAsExecutor( + new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true }); + Workflow workflow = new WorkflowBuilder(agentBinding).Build(); + AIAgent agent = workflow.AsAIAgent("WorkflowAgent"); + + AgentSession session = await agent.CreateSessionAsync(); + List firstCallUpdates = await agent.RunStreamingAsync(new ChatMessage(ChatRole.User, "Start"), session).ToListAsync(); + firstCallUpdates.Should().Contain(u => u.Contents.Any(c => c is FunctionCallContent)); + + ChatMessage[] resumeMessages = + [ + new(ChatRole.Tool, [new FunctionResultContent(CallId, "tool output")]), + new(ChatRole.Tool, [new TextContent("extra context in separate message")]) + ]; + + List secondCallUpdates = await agent.RunStreamingAsync(resumeMessages, session).ToListAsync(); + + secondCallUpdates.Should().NotBeEmpty(); + secondCallUpdates + .SelectMany(u => u.Contents.OfType()) + .Should() + .NotContain(c => c.CallId == CallId, "response+regular content split across messages should not re-emit the handled request"); + secondCallUpdates + .SelectMany(u => u.Contents.OfType()) + .Should() + .BeEmpty(); + } + + [Fact] + public async Task Test_AsAgent_MatchingResponse_DoesNotCauseExtraTurnAsync() + { + const string CallId = "matching-response-call-id"; + const string FunctionName = "matchingResponseFunction"; + + RequestEmittingAgent requestAgent = new(new FunctionCallContent(CallId, FunctionName), completeOnResponse: false); + ExecutorBinding agentBinding = requestAgent.BindAsExecutor( + new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true }); + Workflow workflow = new WorkflowBuilder(agentBinding).Build(); + AIAgent agent = workflow.AsAIAgent("WorkflowAgent"); + + AgentSession session = await agent.CreateSessionAsync(); + List firstCallUpdates = await agent.RunStreamingAsync(new ChatMessage(ChatRole.User, "Start"), session).ToListAsync(); + firstCallUpdates.Should().Contain(u => u.Contents.Any(c => c is FunctionCallContent)); + + List secondCallUpdates = await agent.RunStreamingAsync( + new ChatMessage(ChatRole.Tool, [new FunctionResultContent(CallId, "tool output")]), + session).ToListAsync(); + + int functionCallCount = secondCallUpdates + .Where(u => u.RawRepresentation?.GetType().Name == "RequestInfoEvent") + .SelectMany(u => u.Contents.OfType()) + .Count(c => c.CallId == CallId); + + functionCallCount.Should().Be(1, "a matching external response should not trigger an extra TurnToken-driven turn"); + } + + [Fact] + public async Task Test_AsAgent_UnmatchedResponse_TriggersTurnAndKeepsProgressingAsync() + { + const string CallId = "unmatched-response-call-id"; + const string FunctionName = "unmatchedResponseFunction"; + + RequestEmittingAgent requestAgent = new(new FunctionCallContent(CallId, FunctionName), completeOnResponse: false); + ExecutorBinding agentBinding = requestAgent.BindAsExecutor( + new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true }); + Workflow workflow = new WorkflowBuilder(agentBinding).Build(); + AIAgent agent = workflow.AsAIAgent("WorkflowAgent"); + + AgentSession session = await agent.CreateSessionAsync(); + List firstCallUpdates = await agent.RunStreamingAsync(new ChatMessage(ChatRole.User, "Start"), session).ToListAsync(); + firstCallUpdates.Should().Contain(u => u.Contents.Any(c => c is FunctionCallContent)); + + List secondCallUpdates = await agent.RunStreamingAsync( + new ChatMessage(ChatRole.Tool, [new FunctionResultContent("different-call-id", "tool output")]), + session).ToListAsync(); + + int functionCallCount = secondCallUpdates + .SelectMany(u => u.Contents.OfType()) + .Count(c => c.CallId == CallId); - // Assert 2: The response should be accepted without error - await sendResponse.Should().NotThrowAsync("the response should be converted to ExternalResponse and processed"); + functionCallCount.Should().Be(1, "an unmatched response should be treated as regular input and still drive a TurnToken continuation without workflow errors"); + secondCallUpdates.SelectMany(u => u.Contents.OfType()).Should().BeEmpty(); } } From 8db651ac2b431a4a333557acdeb83336d0069bf7 Mon Sep 17 00:00:00 2001 From: Peter Ibekwe Date: Mon, 9 Mar 2026 17:33:49 -0700 Subject: [PATCH 5/9] Update pending requests to use typed properties instead of relying on StateBag. replying to PR feedback. --- .../WorkflowSession.cs | 41 +++++++++++----- .../WorkflowsJsonUtilities.cs | 1 + .../JsonSerializationTests.cs | 49 +++++++++++++++++++ 3 files changed, 80 insertions(+), 11 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs index 64354c58c4..f08f0c85a3 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs @@ -25,9 +25,24 @@ internal sealed class WorkflowSession : AgentSession private InMemoryCheckpointManager? _inMemoryCheckpointManager; - // Key prefix used in StateBag to track pending external requests by their content ID - // This enables converting incoming response content back to ExternalResponse when resuming. - private const string PendingRequestKeyPrefix = "workflow.pendingRequest:"; + /// + /// Tracks pending external requests by their content ID (e.g., + /// or ). This mapping enables converting incoming response + /// content back to when resuming a workflow from a checkpoint. + /// + /// + /// + /// Entries are added when a is received during workflow execution, + /// and removed when a matching response is delivered via . + /// + /// + /// The number of entries is bounded by the number of outstanding external requests in a single workflow run. + /// When a session is abandoned, all pending requests are released with the session object. + /// Request-level timeouts, if needed, should be implemented in the workflow definition itself + /// (e.g., using a timer racing against an external event). + /// + /// + private readonly Dictionary _pendingRequests = []; internal static bool VerifyCheckpointingConfiguration(IWorkflowExecutionEnvironment executionEnvironment, [NotNullWhen(true)] out InProcessExecutionEnvironment? inProcEnv) { @@ -94,6 +109,7 @@ public WorkflowSession(Workflow workflow, JsonElement serializedSession, IWorkfl this.LastCheckpoint = sessionState.LastCheckpoint; this.StateBag = sessionState.StateBag; + this._pendingRequests = sessionState.PendingRequests ?? []; } public CheckpointInfo? LastCheckpoint { get; set; } @@ -105,7 +121,8 @@ internal JsonElement Serialize(JsonSerializerOptions? jsonSerializerOptions = nu this.SessionId, this.LastCheckpoint, this._inMemoryCheckpointManager, - this.StateBag); + this.StateBag, + this._pendingRequests); return marshaller.Marshal(info); } @@ -270,22 +287,22 @@ void PartitionMessageContents(ChatMessage message, List regularConten }; /// - /// Tries to get a pending request from the state bag by content ID. + /// Tries to get a pending request by content ID. /// private ExternalRequest? TryGetPendingRequest(string contentId) => - this.StateBag.GetValue(PendingRequestKeyPrefix + contentId); + this._pendingRequests.TryGetValue(contentId, out ExternalRequest? request) ? request : null; /// - /// Adds a pending request to the state bag. + /// Adds a pending request indexed by content ID. /// private void AddPendingRequest(string contentId, ExternalRequest request) => - this.StateBag.SetValue(PendingRequestKeyPrefix + contentId, request); + this._pendingRequests[contentId] = request; /// - /// Removes a pending request from the state bag. + /// Removes a pending request by content ID. /// private void RemovePendingRequest(string contentId) => - this.StateBag.TryRemoveValue(PendingRequestKeyPrefix + contentId); + this._pendingRequests.Remove(contentId); internal async IAsyncEnumerable InvokeStageAsync( @@ -430,11 +447,13 @@ internal sealed class SessionState( string sessionId, CheckpointInfo? lastCheckpoint, InMemoryCheckpointManager? checkpointManager = null, - AgentSessionStateBag? stateBag = null) + AgentSessionStateBag? stateBag = null, + Dictionary? pendingRequests = null) { public string SessionId { get; } = sessionId; public CheckpointInfo? LastCheckpoint { get; } = lastCheckpoint; public InMemoryCheckpointManager? CheckpointManager { get; } = checkpointManager; public AgentSessionStateBag StateBag { get; } = stateBag ?? new(); + public Dictionary? PendingRequests { get; } = pendingRequests; } } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowsJsonUtilities.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowsJsonUtilities.cs index 08d1dcbcbb..5e9a86fbec 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowsJsonUtilities.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowsJsonUtilities.cs @@ -90,6 +90,7 @@ private static JsonSerializerOptions CreateDefaultOptions() [JsonSerializable(typeof(ChatMessage))] [JsonSerializable(typeof(ExternalRequest))] [JsonSerializable(typeof(ExternalResponse))] + [JsonSerializable(typeof(Dictionary))] [JsonSerializable(typeof(TurnToken))] // Built-in Executor State Types diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/JsonSerializationTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/JsonSerializationTests.cs index c2a538b302..572f858f13 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/JsonSerializationTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/JsonSerializationTests.cs @@ -673,6 +673,55 @@ public async Task Test_InMemoryCheckpointManager_JsonRoundTripAsync() ValidateCheckpoint(retrievedCheckpoint, prototype); } + [Fact] + public void Test_SessionState_JsonRoundtrip_WithPendingRequests() + { + // Arrange + Dictionary pendingRequests = new() + { + ["call-1"] = TestExternalRequest, + ["call-2"] = ExternalRequest.Create(TestPort, "Request2", "OtherData"), + }; + + WorkflowSession.SessionState prototype = new( + sessionId: "test-session-123", + lastCheckpoint: TestParentCheckpointInfo, + pendingRequests: pendingRequests); + + // Act + WorkflowSession.SessionState result = RunJsonRoundtrip(prototype); + + // Assert + result.SessionId.Should().Be(prototype.SessionId); + result.LastCheckpoint.Should().Be(prototype.LastCheckpoint); + result.StateBag.Should().NotBeNull(); + result.PendingRequests.Should().NotBeNull() + .And.HaveCount(pendingRequests.Count); + + foreach (string key in pendingRequests.Keys) + { + result.PendingRequests.Should().ContainKey(key); + ValidateExternalRequest(result.PendingRequests![key], pendingRequests[key]); + } + } + + [Fact] + public void Test_SessionState_JsonRoundtrip_WithoutPendingRequests() + { + // Arrange + WorkflowSession.SessionState prototype = new( + sessionId: "test-session-456", + lastCheckpoint: null); + + // Act + WorkflowSession.SessionState result = RunJsonRoundtrip(prototype); + + // Assert + result.SessionId.Should().Be(prototype.SessionId); + result.LastCheckpoint.Should().BeNull(); + result.PendingRequests.Should().BeNull(); + } + /// /// Verifies that the default behavior (without AllowOutOfOrderMetadataProperties) fails /// when $type metadata is not the first property, demonstrating the PostgreSQL jsonb issue. From 335d59841e2d9b39fe26be05a3ab783c59f5006c Mon Sep 17 00:00:00 2001 From: Peter Ibekwe Date: Mon, 9 Mar 2026 22:58:36 -0700 Subject: [PATCH 6/9] Fixed external response de-dup and updated possible brittle test. --- .../WorkflowSession.cs | 65 ++++++++----------- .../WorkflowHostSmokeTests.cs | 2 +- 2 files changed, 27 insertions(+), 40 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs index f08f0c85a3..c26c9b1985 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs @@ -204,10 +204,35 @@ private async ValueTask SendMessagesWithResponseConversionAsync(StreamingR List<(ExternalResponse Response, string? ContentId)> externalResponses = []; bool hasMatchedExternalResponses = false; + // Tracks content IDs already matched to pending requests within this invocation, + // preventing duplicate responses for the same ID from being sent to the workflow engine. + HashSet? matchedContentIds = null; + foreach (ChatMessage message in messages) { List regularContents = []; - PartitionMessageContents(message, regularContents); + + foreach (AIContent content in message.Contents) + { + string? contentId = GetResponseContentId(content); + + // Skip duplicate response content for an already-matched content ID + if (contentId != null && matchedContentIds?.Contains(contentId) == true) + { + continue; + } + + if (contentId != null + && this.TryGetPendingRequest(contentId) is ExternalRequest pendingRequest) + { + externalResponses.Add((pendingRequest.CreateResponse(content), contentId)); + (matchedContentIds ??= new(StringComparer.OrdinalIgnoreCase)).Add(contentId); + } + else + { + regularContents.Add(content); + } + } if (regularContents.Count > 0) { @@ -236,44 +261,6 @@ private async ValueTask SendMessagesWithResponseConversionAsync(StreamingR } return hasMatchedExternalResponses; - - void PartitionMessageContents(ChatMessage message, List regularContents) - { - foreach (AIContent content in message.Contents) - { - string? contentId = GetResponseContentId(content); - if (this.TryCreateExternalResponse(content) is ExternalResponse response) - { - externalResponses.Add((response, contentId)); - } - else - { - regularContents.Add(content); - } - } - } - } - - /// - /// Attempts to create an ExternalResponse from response content (FunctionResultContent or UserInputResponseContent) - /// by matching it to a pending request. - /// - private ExternalResponse? TryCreateExternalResponse(AIContent content) - { - string? contentId = GetResponseContentId(content); - if (contentId == null) - { - return null; - } - - ExternalRequest? pendingRequest = this.TryGetPendingRequest(contentId); - if (pendingRequest == null) - { - return null; - } - - // Create ExternalResponse via the pending request to ensure proper validation and wrapping - return pendingRequest.CreateResponse(content); } /// diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs index bd35e9825d..1ee9f748a8 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs @@ -440,7 +440,7 @@ public async Task Test_AsAgent_MatchingResponse_DoesNotCauseExtraTurnAsync() session).ToListAsync(); int functionCallCount = secondCallUpdates - .Where(u => u.RawRepresentation?.GetType().Name == "RequestInfoEvent") + .Where(u => u.RawRepresentation is RequestInfoEvent) .SelectMany(u => u.Contents.OfType()) .Count(c => c.CallId == CallId); From 2ccb06383a6ca52690dbb263ca273c0ededafe20 Mon Sep 17 00:00:00 2001 From: Peter Ibekwe Date: Mon, 16 Mar 2026 21:44:36 -0700 Subject: [PATCH 7/9] Address PR comments on sending turn token for normal messages and handle contentId collision by source agent --- .../Execution/AsyncRunHandle.cs | 3 + .../Execution/EdgeMap.cs | 13 ++ .../Execution/ISuperStepRunner.cs | 1 + .../InProc/InProcessRunner.cs | 2 + .../InProc/InProcessRunnerContext.cs | 3 + .../Specialized/AIContentExternalHandler.cs | 6 +- .../StreamingRun.cs | 3 + .../WorkflowSession.cs | 218 ++++++++++++++---- .../WorkflowHostSmokeTests.cs | 182 +++++++++++++-- 9 files changed, 363 insertions(+), 68 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/AsyncRunHandle.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/AsyncRunHandle.cs index f5d1d40370..bda7e61a38 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/AsyncRunHandle.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/AsyncRunHandle.cs @@ -53,6 +53,9 @@ internal AsyncRunHandle(ISuperStepRunner stepRunner, ICheckpointingHandle checkp public ValueTask GetStatusAsync(CancellationToken cancellationToken = default) => this._eventStream.GetStatusAsync(cancellationToken); + internal bool TryGetResponsePortExecutorId(string portId, out string? executorId) + => this._stepRunner.TryGetResponsePortExecutorId(portId, out executorId); + public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPendingRequest, [EnumeratorCancellation] CancellationToken cancellationToken = default) { //Debug.Assert(breakOnHalt); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeMap.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeMap.cs index 8c2162508d..6e3f2af5e6 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeMap.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeMap.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -95,6 +96,18 @@ public bool TryRegisterPort(IRunnerContext runContext, string executorId, Reques return portRunner.ChaseEdgeAsync(new MessageEnvelope(response, ExecutorIdentity.None), this._stepTracer, cancellationToken); } + internal bool TryGetResponsePortExecutorId(string portId, [NotNullWhen(true)] out string? executorId) + { + if (this._portEdgeRunners.TryGetValue(portId, out ResponseEdgeRunner? portRunner)) + { + executorId = portRunner.ExecutorId; + return true; + } + + executorId = null; + return false; + } + internal async ValueTask> ExportStateAsync() { Dictionary exportedStates = []; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepRunner.cs index 9b8c3c460c..8de0dbd5e2 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepRunner.cs @@ -19,6 +19,7 @@ internal interface ISuperStepRunner bool HasUnprocessedMessages { get; } ValueTask EnqueueResponseAsync(ExternalResponse response, CancellationToken cancellationToken = default); + bool TryGetResponsePortExecutorId(string portId, out string? executorId); ValueTask IsValidInputTypeAsync(CancellationToken cancellationToken = default); ValueTask EnqueueMessageAsync(T message, CancellationToken cancellationToken = default); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs index 2a61f80ced..f93b09ddf3 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs @@ -160,6 +160,8 @@ public async ValueTask ResumeStreamAsync(ExecutionMode mode, Che bool ISuperStepRunner.HasUnservicedRequests => this.RunContext.HasUnservicedRequests; bool ISuperStepRunner.HasUnprocessedMessages => this.RunContext.NextStepHasActions; + bool ISuperStepRunner.TryGetResponsePortExecutorId(string portId, out string? executorId) + => this.RunContext.TryGetResponsePortExecutorId(portId, out executorId); public bool IsCheckpointingEnabled => this.RunContext.IsCheckpointingEnabled; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs index eda7b90a80..f0bb8cac26 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs @@ -296,6 +296,9 @@ public bool CompleteRequest(string requestId) return this._externalRequests.TryRemove(requestId, out _); } + internal bool TryGetResponsePortExecutorId(string portId, [NotNullWhen(true)] out string? executorId) + => this._edgeMap.TryGetResponsePortExecutorId(portId, out executorId); + private IEventSink OutgoingEvents { get; } internal StateManager StateManager { get; } = new(); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIContentExternalHandler.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIContentExternalHandler.cs index c19fab62bb..15203fd5dc 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIContentExternalHandler.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIContentExternalHandler.cs @@ -16,10 +16,12 @@ internal sealed class AIContentExternalHandler _pendingRequests = new(); public AIContentExternalHandler(ref ProtocolBuilder protocolBuilder, string portId, bool intercepted, Func handler) { + this._portId = portId; PortBinding? portBinding = null; protocolBuilder = protocolBuilder.ConfigureRoutes(routeBuilder => ConfigureRoutes(routeBuilder, out portBinding)); this._portBinding = portBinding; @@ -65,7 +67,7 @@ public ValueTask ProcessRequestContentAsync(string id, TRequestContent requestCo return this.IsIntercepted ? context.SendMessageAsync(requestContent, cancellationToken: cancellationToken) - : this._portBinding.PostRequestAsync(requestContent, id, cancellationToken); + : this._portBinding.PostRequestAsync(requestContent, this.CreateExternalRequestId(id), cancellationToken); } public bool MarkRequestAsHandled(string id) @@ -76,6 +78,8 @@ public bool MarkRequestAsHandled(string id) [MemberNotNullWhen(false, nameof(_portBinding))] private bool IsIntercepted => this._portBinding == null; + private string CreateExternalRequestId(string requestId) => $"{this._portId.Length}:{this._portId}:{requestId}"; + private static string MakeKey(string id) => $"{id}_PendingRequests"; public async ValueTask OnCheckpointingAsync(string id, IWorkflowContext context, CancellationToken cancellationToken = default) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/StreamingRun.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/StreamingRun.cs index b479cae75e..c7a19380b2 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/StreamingRun.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/StreamingRun.cs @@ -60,6 +60,9 @@ public ValueTask TrySendMessageAsync(TMessage message) internal ValueTask TrySendMessageUntypedAsync(object message, Type? declaredType = null) => this._runHandle.EnqueueMessageUntypedAsync(message, declaredType); + internal bool TryGetResponsePortExecutorId(string portId, out string? executorId) + => this._runHandle.TryGetResponsePortExecutorId(portId, out executorId); + /// /// Asynchronously streams workflow events as they occur during workflow execution. /// diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs index c26c9b1985..76c66df6ec 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs @@ -26,9 +26,9 @@ internal sealed class WorkflowSession : AgentSession private InMemoryCheckpointManager? _inMemoryCheckpointManager; /// - /// Tracks pending external requests by their content ID (e.g., - /// or ). This mapping enables converting incoming response - /// content back to when resuming a workflow from a checkpoint. + /// Tracks pending external requests by their workflow-facing request ID. + /// This mapping enables converting incoming response content back to + /// when resuming a workflow from a checkpoint. /// /// /// @@ -176,8 +176,8 @@ await this._executionEnvironment .ConfigureAwait(false); // Process messages: convert response content to ExternalResponse, send regular messages as-is - bool hasMatchedExternalResponses = await this.SendMessagesWithResponseConversionAsync(run, messages).ConfigureAwait(false); - return new ResumeRunResult(run, hasMatchedExternalResponses: hasMatchedExternalResponses); + ResumeDispatchInfo dispatchInfo = await this.SendMessagesWithResponseConversionAsync(run, messages).ConfigureAwait(false); + return new ResumeRunResult(run, dispatchInfo); } StreamingRun newRun = await this._executionEnvironment @@ -194,15 +194,15 @@ await this._executionEnvironment /// to ExternalResponse when there's a matching pending request. /// /// - /// if any external responses were sent; otherwise, . + /// Structured information about how resume content was dispatched. /// - private async ValueTask SendMessagesWithResponseConversionAsync(StreamingRun run, List messages) + private async ValueTask SendMessagesWithResponseConversionAsync(StreamingRun run, List messages) { List regularMessages = []; // Responses are deferred until after regular messages are queued so response handlers // can merge buffered regular content in the same continuation turn. - List<(ExternalResponse Response, string? ContentId)> externalResponses = []; - bool hasMatchedExternalResponses = false; + List<(ExternalResponse Response, string RequestId)> externalResponses = []; + bool hasMatchedResponseForStartExecutor = false; // Tracks content IDs already matched to pending requests within this invocation, // preventing duplicate responses for the same ID from being sent to the workflow engine. @@ -225,8 +225,16 @@ private async ValueTask SendMessagesWithResponseConversionAsync(StreamingR if (contentId != null && this.TryGetPendingRequest(contentId) is ExternalRequest pendingRequest) { - externalResponses.Add((pendingRequest.CreateResponse(content), contentId)); - (matchedContentIds ??= new(StringComparer.OrdinalIgnoreCase)).Add(contentId); + if (!run.TryGetResponsePortExecutorId(pendingRequest.PortInfo.PortId, out string? responseExecutorId)) + { + throw new InvalidOperationException( + $"Matched pending request '{pendingRequest.RequestId}' refers to unknown response port '{pendingRequest.PortInfo.PortId}'."); + } + + AIContent normalizedResponseContent = NormalizeResponseContentForDelivery(content, pendingRequest); + externalResponses.Add((pendingRequest.CreateResponse(normalizedResponseContent), pendingRequest.RequestId)); + (matchedContentIds ??= new(StringComparer.Ordinal)).Add(contentId); + hasMatchedResponseForStartExecutor |= string.Equals(responseExecutorId, this._workflow.StartExecutorId, StringComparison.Ordinal); } else { @@ -243,28 +251,54 @@ private async ValueTask SendMessagesWithResponseConversionAsync(StreamingR } // Send regular messages first so response handlers can merge them with responses. - if (regularMessages.Count > 0) + bool hasRegularMessages = regularMessages.Count > 0; + if (hasRegularMessages) { await run.TrySendMessageAsync(regularMessages).ConfigureAwait(false); } // Send external responses after regular messages. - foreach ((ExternalResponse response, string? contentId) in externalResponses) + bool hasMatchedExternalResponses = false; + foreach ((ExternalResponse response, string requestId) in externalResponses) { await run.SendResponseAsync(response).ConfigureAwait(false); hasMatchedExternalResponses = true; - - if (contentId is string id) - { - this.RemovePendingRequest(id); - } + this.RemovePendingRequest(requestId); } - return hasMatchedExternalResponses; + return new ResumeDispatchInfo( + hasRegularMessages, + hasMatchedExternalResponses, + hasMatchedResponseForStartExecutor); } /// - /// Gets the content ID from response content types. + /// Creates the workflow-facing request content surfaced in response updates. + /// + private static AIContent CreateRequestContentForDelivery(ExternalRequest request) => request switch + { + ExternalRequest externalRequest when externalRequest.TryGetDataAs(out FunctionCallContent? functionCallContent) + => CloneFunctionCallContent(functionCallContent, externalRequest.RequestId), + ExternalRequest externalRequest when externalRequest.TryGetDataAs(out UserInputRequestContent? userInputRequestContent) + => CloneUserInputRequestContent(userInputRequestContent, externalRequest.RequestId), + ExternalRequest externalRequest + => externalRequest.ToFunctionCall(), + }; + + /// + /// Rewrites workflow-facing response content back to the original agent-owned content ID. + /// + private static AIContent NormalizeResponseContentForDelivery(AIContent content, ExternalRequest request) => content switch + { + FunctionResultContent functionResultContent when request.TryGetDataAs(out FunctionCallContent? functionCallContent) + => CloneFunctionResultContent(functionResultContent, functionCallContent.CallId), + UserInputResponseContent userInputResponseContent when request.TryGetDataAs(out UserInputRequestContent? userInputRequestContent) + => CloneUserInputResponseContent(userInputResponseContent, userInputRequestContent.Id), + _ => content, + }; + + /// + /// Gets the workflow-facing request ID from response content types. /// private static string? GetResponseContentId(AIContent content) => content switch { @@ -274,22 +308,21 @@ private async ValueTask SendMessagesWithResponseConversionAsync(StreamingR }; /// - /// Tries to get a pending request by content ID. + /// Tries to get a pending request by workflow-facing request ID. /// - private ExternalRequest? TryGetPendingRequest(string contentId) => - this._pendingRequests.TryGetValue(contentId, out ExternalRequest? request) ? request : null; + private ExternalRequest? TryGetPendingRequest(string requestId) => + this._pendingRequests.TryGetValue(requestId, out ExternalRequest? request) ? request : null; /// - /// Adds a pending request indexed by content ID. + /// Adds a pending request indexed by workflow-facing request ID. /// - private void AddPendingRequest(string contentId, ExternalRequest request) => - this._pendingRequests[contentId] = request; + private void AddPendingRequest(string requestId, ExternalRequest request) => this._pendingRequests[requestId] = request; /// - /// Removes a pending request by content ID. + /// Removes a pending request by workflow-facing request ID. /// - private void RemovePendingRequest(string contentId) => - this._pendingRequests.Remove(contentId); + private void RemovePendingRequest(string requestId) => + this._pendingRequests.Remove(requestId); internal async IAsyncEnumerable InvokeStageAsync( @@ -306,10 +339,11 @@ IAsyncEnumerable InvokeStageAsync( await using StreamingRun run = resumeResult.Run; #pragma warning restore CA2007 - // Send a TurnToken only when no external responses were delivered. - // External response handlers already drive continuation turns and can merge - // buffered regular messages, so an extra TurnToken would cause a redundant turn. - if (!resumeResult.HasMatchedExternalResponses) + ResumeDispatchInfo dispatchInfo = resumeResult.DispatchInfo; + bool shouldSendTurnToken = + !dispatchInfo.HasMatchedExternalResponses + || (dispatchInfo.HasRegularMessages && !dispatchInfo.HasMatchedResponseForStartExecutor); + if (shouldSendTurnToken) { await run.TrySendMessageAsync(new TurnToken(emitEvents: true)).ConfigureAwait(false); } @@ -324,18 +358,11 @@ IAsyncEnumerable InvokeStageAsync( break; case RequestInfoEvent requestInfo: - (AIContent requestContent, string? contentId) = requestInfo.Request switch - { - ExternalRequest externalRequest when externalRequest.TryGetDataAs(out FunctionCallContent? fcc) => (fcc, fcc.CallId), - ExternalRequest externalRequest when externalRequest.TryGetDataAs(out UserInputRequestContent? uic) => (uic, uic.Id), - ExternalRequest externalRequest => ((AIContent)externalRequest.ToFunctionCall(), externalRequest.RequestId) - }; + AIContent requestContent = CreateRequestContentForDelivery(requestInfo.Request); - // Track the pending request so we can convert incoming responses back to ExternalResponse - if (contentId != null) - { - this.AddPendingRequest(contentId, requestInfo.Request); - } + // Track the pending request so we can convert incoming responses back to ExternalResponse. + // External callers respond using the workflow-facing request ID, which is always RequestId. + this.AddPendingRequest(requestInfo.Request.RequestId, requestInfo.Request); AgentResponseUpdate update = this.CreateUpdate(this.LastResponseId, evt, requestContent); yield return update; @@ -420,14 +447,111 @@ private readonly struct ResumeRunResult /// The streaming run that was created or resumed. public StreamingRun Run { get; } - /// Whether any external responses (e.g., ) were delivered. - public bool HasMatchedExternalResponses { get; } + /// How resume-time content was dispatched into the workflow runtime. + public ResumeDispatchInfo DispatchInfo { get; } - public ResumeRunResult(StreamingRun run, bool hasMatchedExternalResponses = false) + public ResumeRunResult(StreamingRun run, ResumeDispatchInfo dispatchInfo = default) { this.Run = Throw.IfNull(run); + this.DispatchInfo = dispatchInfo; + } + } + + /// + /// Captures how resumed input was split across regular-message and external-response delivery paths. + /// + private readonly struct ResumeDispatchInfo + { + public ResumeDispatchInfo(bool hasRegularMessages, bool hasMatchedExternalResponses, bool hasMatchedResponseForStartExecutor) + { + this.HasRegularMessages = hasRegularMessages; this.HasMatchedExternalResponses = hasMatchedExternalResponses; + this.HasMatchedResponseForStartExecutor = hasMatchedResponseForStartExecutor; } + + public bool HasRegularMessages { get; } + + public bool HasMatchedExternalResponses { get; } + + public bool HasMatchedResponseForStartExecutor { get; } + } + + /// + /// Clones a with a workflow-facing call ID. + /// + private static FunctionCallContent CloneFunctionCallContent(FunctionCallContent content, string callId) + { + FunctionCallContent clone = new(callId, content.Name, content.Arguments) + { + Exception = content.Exception, + InformationalOnly = content.InformationalOnly, + }; + + return CopyContentMetadata(content, clone); + } + + /// + /// Clones a with an agent-owned call ID. + /// + private static FunctionResultContent CloneFunctionResultContent(FunctionResultContent content, string callId) + { + FunctionResultContent clone = new(callId, content.Result) + { + Exception = content.Exception, + }; + + return CopyContentMetadata(content, clone); + } + + /// + /// Clones a with a workflow-facing request ID. + /// + private static UserInputRequestContent CloneUserInputRequestContent(UserInputRequestContent content, string id) + { + UserInputRequestContent clone = content switch + { + FunctionApprovalRequestContent functionApprovalRequestContent => + new FunctionApprovalRequestContent(id, functionApprovalRequestContent.FunctionCall), + McpServerToolApprovalRequestContent mcpApprovalRequestContent => + new McpServerToolApprovalRequestContent(id, mcpApprovalRequestContent.ToolCall), + _ => throw new NotSupportedException( + $"Unsupported user input request content type '{content.GetType().Name}' for workflow request ID rewriting."), + }; + + return CopyContentMetadata(content, clone); + } + + /// + /// Clones a with an agent-owned request ID. + /// + private static UserInputResponseContent CloneUserInputResponseContent(UserInputResponseContent content, string id) + { + UserInputResponseContent clone = content switch + { + FunctionApprovalResponseContent functionApprovalResponseContent => + new FunctionApprovalResponseContent(id, functionApprovalResponseContent.Approved, functionApprovalResponseContent.FunctionCall) + { + Reason = functionApprovalResponseContent.Reason, + }, + McpServerToolApprovalResponseContent mcpApprovalResponseContent => + new McpServerToolApprovalResponseContent(id, mcpApprovalResponseContent.Approved), + _ => throw new NotSupportedException( + $"Unsupported user input response content type '{content.GetType().Name}' for workflow response ID rewriting."), + }; + + return CopyContentMetadata(content, clone); + } + + /// + /// Copies shared metadata to a cloned content instance. + /// + private static TContent CopyContentMetadata(AIContent source, TContent target) + where TContent : AIContent + { + target.AdditionalProperties = source.AdditionalProperties; + target.Annotations = source.Annotations; + target.RawRepresentation = source.RawRepresentation; + return target; } internal sealed class SessionState( diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs index 1ee9f748a8..013c0eb2ea 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs @@ -85,6 +85,71 @@ protected override async IAsyncEnumerable RunCoreStreamingA } } +internal sealed class KickoffOnStartExecutor : ChatProtocolExecutor +{ + private static readonly ChatProtocolExecutorOptions s_options = new() + { + AutoSendTurnToken = false, + }; + + private readonly string _downstreamExecutorId; + private readonly string _kickoffInputText; + private readonly string _kickoffMessageText; + private readonly string _regularResumeText; + private readonly string _regularProcessedText; + + public KickoffOnStartExecutor( + string id, + string downstreamExecutorId, + string kickoffInputText, + string kickoffMessageText, + string regularResumeText, + string regularProcessedText) + : base(id, s_options) + { + this._downstreamExecutorId = downstreamExecutorId; + this._kickoffInputText = kickoffInputText; + this._kickoffMessageText = kickoffMessageText; + this._regularResumeText = regularResumeText; + this._regularProcessedText = regularProcessedText; + } + + protected override async ValueTask TakeTurnAsync(List messages, IWorkflowContext context, bool? emitEvents, CancellationToken cancellationToken = default) + { + List textContents = + [ + .. messages + .SelectMany(message => message.Contents.OfType()) + .Select(content => content.Text) + ]; + + if (textContents.Contains(this._kickoffInputText, StringComparer.Ordinal)) + { + await context.SendMessageAsync( + new List { new(ChatRole.User, this._kickoffMessageText) }, + this._downstreamExecutorId, + cancellationToken).ConfigureAwait(false); + await context.SendMessageAsync( + new TurnToken(emitEvents), + this._downstreamExecutorId, + cancellationToken).ConfigureAwait(false); + } + + if (textContents.Contains(this._regularResumeText, StringComparer.Ordinal)) + { + AgentResponseUpdate update = new(ChatRole.Assistant, [new TextContent(this._regularProcessedText)]) + { + CreatedAt = DateTimeOffset.UtcNow, + MessageId = Guid.NewGuid().ToString("N"), + ResponseId = Guid.NewGuid().ToString("N"), + Role = ChatRole.Assistant, + }; + + await context.AddEventAsync(new AgentResponseUpdateEvent(this.Id, update), cancellationToken).ConfigureAwait(false); + } + } +} + public class WorkflowHostSmokeTests { private sealed class AlwaysFailsAIAgent(bool failByThrowing) : AIAgent @@ -193,7 +258,7 @@ public async Task Test_AsAgent_FunctionCallContentPreservedInRequestInfoAsync() // Assert AgentResponseUpdate? updateWithFunctionCall = updates.FirstOrDefault(u => - u.Contents.Any(c => c is FunctionCallContent)); + u.RawRepresentation is RequestInfoEvent && u.Contents.Any(c => c is FunctionCallContent)); updateWithFunctionCall.Should().NotBeNull("a FunctionCallContent should be present in the response updates"); FunctionCallContent retrievedContent = updateWithFunctionCall!.Contents @@ -201,7 +266,8 @@ public async Task Test_AsAgent_FunctionCallContentPreservedInRequestInfoAsync() .Should().ContainSingle() .Which; - retrievedContent.CallId.Should().Be(CallId); + retrievedContent.CallId.Should().NotBe(CallId); + retrievedContent.CallId.Should().EndWith($":{CallId}"); retrievedContent.Name.Should().Be(FunctionName); } @@ -228,7 +294,7 @@ public async Task Test_AsAgent_UserInputRequestContentPreservedInRequestInfoAsyn // Assert AgentResponseUpdate? updateWithUserInput = updates.FirstOrDefault(u => - u.Contents.Any(c => c is UserInputRequestContent)); + u.RawRepresentation is RequestInfoEvent && u.Contents.Any(c => c is UserInputRequestContent)); updateWithUserInput.Should().NotBeNull("a UserInputRequestContent should be present in the response updates"); UserInputRequestContent retrievedContent = updateWithUserInput!.Contents @@ -237,7 +303,8 @@ public async Task Test_AsAgent_UserInputRequestContentPreservedInRequestInfoAsyn .Which; retrievedContent.Should().BeOfType(); - retrievedContent.Id.Should().Be(RequestId); + retrievedContent.Id.Should().NotBe(RequestId); + retrievedContent.Id.Should().EndWith($":{RequestId}"); } /// @@ -264,16 +331,16 @@ public async Task Test_AsAgent_FunctionCallRoundtrip_ResponseIsProcessedAsync() // Assert 1: We should have received a FunctionCallContent AgentResponseUpdate? updateWithRequest = firstCallUpdates.FirstOrDefault(u => - u.Contents.Any(c => c is FunctionCallContent)); + u.RawRepresentation is RequestInfoEvent && u.Contents.Any(c => c is FunctionCallContent)); updateWithRequest.Should().NotBeNull("a FunctionCallContent should be present in the response updates"); FunctionCallContent receivedRequest = updateWithRequest!.Contents .OfType() .First(); - receivedRequest.CallId.Should().Be(CallId); + receivedRequest.CallId.Should().EndWith($":{CallId}"); // Act 2: Send the response back - FunctionResultContent responseContent = new(CallId, "test result"); + FunctionResultContent responseContent = new(receivedRequest.CallId, "test result"); ChatMessage responseMessage = new(ChatRole.Tool, [responseContent]); // Act 2: Run the workflow with the response and capture the resulting updates @@ -284,9 +351,10 @@ public async Task Test_AsAgent_FunctionCallRoundtrip_ResponseIsProcessedAsync() secondCallUpdates.Should().NotBeNull("processing the response should produce updates"); secondCallUpdates.Should().NotBeEmpty("processing the response should progress the workflow"); secondCallUpdates + .Where(u => u.RawRepresentation is RequestInfoEvent) .SelectMany(u => u.Contents.OfType()) .Should() - .NotContain(c => c.CallId == CallId, "the original FunctionCallContent request should be cleared after processing the response"); + .NotContain(c => c.CallId == receivedRequest.CallId, "the external FunctionCallContent request should be cleared after processing the response"); } /// @@ -314,16 +382,17 @@ public async Task Test_AsAgent_UserInputRoundtrip_ResponseIsProcessedAsync() // Assert 1: We should have received a UserInputRequestContent AgentResponseUpdate? updateWithRequest = firstCallUpdates.FirstOrDefault(u => - u.Contents.Any(c => c is UserInputRequestContent)); + u.RawRepresentation is RequestInfoEvent && u.Contents.Any(c => c is UserInputRequestContent)); updateWithRequest.Should().NotBeNull("a UserInputRequestContent should be present in the response updates"); UserInputRequestContent receivedRequest = updateWithRequest!.Contents .OfType() .First(); - receivedRequest.Id.Should().Be(RequestId); + receivedRequest.Id.Should().EndWith($":{RequestId}"); + receivedRequest.Should().BeOfType(); // Act 2: Send the response back - use CreateResponse to get the right response type - UserInputResponseContent responseContent = requestContent.CreateResponse(approved: true); + UserInputResponseContent responseContent = ((McpServerToolApprovalRequestContent)receivedRequest).CreateResponse(approved: true); ChatMessage responseMessage = new(ChatRole.User, [responseContent]); // Act 2: Run the workflow again with the response and capture the updates @@ -331,7 +400,9 @@ public async Task Test_AsAgent_UserInputRoundtrip_ResponseIsProcessedAsync() // Assert 2: The response should be applied so that the original request is no longer pending secondCallUpdates.Should().NotBeEmpty("handling the user input response should produce follow-up updates"); - bool requestStillPresent = secondCallUpdates.Any(u => u.Contents.OfType().Any(r => r.Id == RequestId)); + bool requestStillPresent = secondCallUpdates.Any(u => + u.RawRepresentation is RequestInfoEvent + && u.Contents.OfType().Any(r => r.Id == receivedRequest.Id)); requestStillPresent.Should().BeFalse("the original UserInputRequestContent should not be re-emitted after its response is processed"); } @@ -363,11 +434,15 @@ public async Task Test_AsAgent_MixedResponseAndRegularMessage_BothProcessedAsync session).ToListAsync(); // Assert 1: We should have received a FunctionCallContent + AgentResponseUpdate requestUpdate = firstCallUpdates.First(u => + u.RawRepresentation is RequestInfoEvent && u.Contents.Any(c => c is FunctionCallContent)); + FunctionCallContent emittedRequest = requestUpdate.Contents.OfType().Single(); + firstCallUpdates.Should().Contain(u => u.Contents.Any(c => c is FunctionCallContent), "the first call should emit a FunctionCallContent request"); // Act 2: Send a mixed message containing both the function result AND regular non-response content - FunctionResultContent responseContent = new(CallId, "tool output"); + FunctionResultContent responseContent = new(emittedRequest.CallId, "tool output"); ChatMessage mixedMessage = new(ChatRole.Tool, [responseContent, new TextContent("additional context")]); List secondCallUpdates = await agent.RunStreamingAsync(mixedMessage, session).ToListAsync(); @@ -375,9 +450,10 @@ public async Task Test_AsAgent_MixedResponseAndRegularMessage_BothProcessedAsync // Assert 2: The workflow should have processed both parts without errors secondCallUpdates.Should().NotBeEmpty("the mixed message should produce follow-up updates"); secondCallUpdates + .Where(u => u.RawRepresentation is RequestInfoEvent) .SelectMany(u => u.Contents.OfType()) .Should() - .NotContain(c => c.CallId == CallId, "the original FunctionCallContent should be cleared after the response is processed"); + .NotContain(c => c.CallId == emittedRequest.CallId, "the external FunctionCallContent should be cleared after the response is processed"); secondCallUpdates .SelectMany(u => u.Contents.OfType()) .Should() @@ -398,11 +474,14 @@ public async Task Test_AsAgent_ResponseThenRegularAcrossMessages_NoDuplicateFunc AgentSession session = await agent.CreateSessionAsync(); List firstCallUpdates = await agent.RunStreamingAsync(new ChatMessage(ChatRole.User, "Start"), session).ToListAsync(); - firstCallUpdates.Should().Contain(u => u.Contents.Any(c => c is FunctionCallContent)); + FunctionCallContent emittedRequest = firstCallUpdates + .Where(u => u.RawRepresentation is RequestInfoEvent) + .SelectMany(u => u.Contents.OfType()) + .Single(); ChatMessage[] resumeMessages = [ - new(ChatRole.Tool, [new FunctionResultContent(CallId, "tool output")]), + new(ChatRole.Tool, [new FunctionResultContent(emittedRequest.CallId, "tool output")]), new(ChatRole.Tool, [new TextContent("extra context in separate message")]) ]; @@ -410,9 +489,10 @@ public async Task Test_AsAgent_ResponseThenRegularAcrossMessages_NoDuplicateFunc secondCallUpdates.Should().NotBeEmpty(); secondCallUpdates + .Where(u => u.RawRepresentation is RequestInfoEvent) .SelectMany(u => u.Contents.OfType()) .Should() - .NotContain(c => c.CallId == CallId, "response+regular content split across messages should not re-emit the handled request"); + .NotContain(c => c.CallId == emittedRequest.CallId, "response+regular content split across messages should not re-emit the handled external request"); secondCallUpdates .SelectMany(u => u.Contents.OfType()) .Should() @@ -433,20 +513,82 @@ public async Task Test_AsAgent_MatchingResponse_DoesNotCauseExtraTurnAsync() AgentSession session = await agent.CreateSessionAsync(); List firstCallUpdates = await agent.RunStreamingAsync(new ChatMessage(ChatRole.User, "Start"), session).ToListAsync(); - firstCallUpdates.Should().Contain(u => u.Contents.Any(c => c is FunctionCallContent)); + FunctionCallContent emittedRequest = firstCallUpdates + .Where(u => u.RawRepresentation is RequestInfoEvent) + .SelectMany(u => u.Contents.OfType()) + .Single(); List secondCallUpdates = await agent.RunStreamingAsync( - new ChatMessage(ChatRole.Tool, [new FunctionResultContent(CallId, "tool output")]), + new ChatMessage(ChatRole.Tool, [new FunctionResultContent(emittedRequest.CallId, "tool output")]), session).ToListAsync(); int functionCallCount = secondCallUpdates .Where(u => u.RawRepresentation is RequestInfoEvent) .SelectMany(u => u.Contents.OfType()) - .Count(c => c.CallId == CallId); + .Count(c => c.CallId == emittedRequest.CallId); functionCallCount.Should().Be(1, "a matching external response should not trigger an extra TurnToken-driven turn"); } + [Fact] + public async Task Test_AsAgent_MixedResponseAndRegularMessage_CrossExecutorStartExecutorIsReawakenedAsync() + { + const string StartExecutorId = "start-executor"; + const string KickoffInputText = "Start"; + const string KickoffMessageText = "kickoff downstream"; + const string ResumeRegularText = "resume regular"; + const string ResumeProcessedText = "regular message processed"; + const string CallId = "cross-executor-call-id"; + const string FunctionName = "crossExecutorFunction"; + + RequestEmittingAgent requestAgent = new(new FunctionCallContent(CallId, FunctionName), completeOnResponse: true); + ExecutorBinding requestBinding = requestAgent.BindAsExecutor( + new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true }); + + KickoffOnStartExecutor startExecutor = new( + StartExecutorId, + requestBinding.Id, + KickoffInputText, + KickoffMessageText, + ResumeRegularText, + ResumeProcessedText); + ExecutorBinding startBinding = startExecutor.BindExecutor(); + + Workflow workflow = new WorkflowBuilder(startBinding) + .AddEdge>(startBinding, requestBinding, messages => + messages?.Any(message => message.Contents.OfType().Any(content => content.Text == KickoffMessageText)) == true) + .AddEdge(startBinding, requestBinding, _ => true) + .Build(); + AIAgent agent = workflow.AsAIAgent("WorkflowAgent"); + + AgentSession session = await agent.CreateSessionAsync(); + List firstCallUpdates = await agent.RunStreamingAsync( + new ChatMessage(ChatRole.User, KickoffInputText), + session).ToListAsync(); + FunctionCallContent emittedRequest = firstCallUpdates + .Where(u => u.RawRepresentation is RequestInfoEvent) + .SelectMany(u => u.Contents.OfType()) + .Single(); + + ChatMessage[] resumeMessages = + [ + new(ChatRole.Tool, [new FunctionResultContent(emittedRequest.CallId, "tool output")]), + new(ChatRole.User, ResumeRegularText) + ]; + + List secondCallUpdates = await agent.RunStreamingAsync(resumeMessages, session).ToListAsync(); + List textContents = [.. secondCallUpdates.SelectMany(update => update.Contents.OfType()).Select(content => content.Text)]; + + textContents.Should().Contain(ResumeProcessedText, "the start executor should receive an explicit TurnToken when the matched response wakes a different executor"); + textContents.Should().Contain("Request processed", "the matched external response should still be delivered to the downstream request owner"); + secondCallUpdates + .Where(u => u.RawRepresentation is RequestInfoEvent) + .SelectMany(u => u.Contents.OfType()) + .Should() + .NotContain(c => c.CallId == emittedRequest.CallId, "the handled external request should not be re-emitted while waking the start executor"); + secondCallUpdates.SelectMany(u => u.Contents.OfType()).Should().BeEmpty(); + } + [Fact] public async Task Test_AsAgent_UnmatchedResponse_TriggersTurnAndKeepsProgressingAsync() { From 551ce8d761dd4726fed00f987ac8cc30281b3381 Mon Sep 17 00:00:00 2001 From: Peter Ibekwe Date: Mon, 23 Mar 2026 11:01:30 -0700 Subject: [PATCH 8/9] Remove unnecessary serialization element and address pr comment on intercepted outgoing requests --- .../WorkflowSession.cs | 15 ++- .../WorkflowsJsonUtilities.cs | 1 - .../WorkflowHostSmokeTests.cs | 116 ++++++++++++++++++ 3 files changed, 126 insertions(+), 6 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs index 76c66df6ec..fcdefcfd22 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs @@ -225,16 +225,16 @@ private async ValueTask SendMessagesWithResponseConversionAs if (contentId != null && this.TryGetPendingRequest(contentId) is ExternalRequest pendingRequest) { - if (!run.TryGetResponsePortExecutorId(pendingRequest.PortInfo.PortId, out string? responseExecutorId)) + // For intercepted/complex topologies the port may not be registered in the EdgeMap. + // Treat unknown port as non-start-executor (conservative): TurnToken will still be sent. + if (run.TryGetResponsePortExecutorId(pendingRequest.PortInfo.PortId, out string? responseExecutorId)) { - throw new InvalidOperationException( - $"Matched pending request '{pendingRequest.RequestId}' refers to unknown response port '{pendingRequest.PortInfo.PortId}'."); + hasMatchedResponseForStartExecutor |= string.Equals(responseExecutorId, this._workflow.StartExecutorId, StringComparison.Ordinal); } AIContent normalizedResponseContent = NormalizeResponseContentForDelivery(content, pendingRequest); externalResponses.Add((pendingRequest.CreateResponse(normalizedResponseContent), pendingRequest.RequestId)); (matchedContentIds ??= new(StringComparer.Ordinal)).Add(contentId); - hasMatchedResponseForStartExecutor |= string.Equals(responseExecutorId, this._workflow.StartExecutorId, StringComparison.Ordinal); } else { @@ -340,9 +340,14 @@ IAsyncEnumerable InvokeStageAsync( #pragma warning restore CA2007 ResumeDispatchInfo dispatchInfo = resumeResult.DispatchInfo; + + // Send a TurnToken to the start executor unless the only activity is an external + // response directed at the start executor itself (which self-emits a TurnToken via + // ContinueTurnAsync). Non-start executors (e.g., RequestInfoExecutor) do not emit + // TurnTokens after processing responses, so the session must always provide one. bool shouldSendTurnToken = !dispatchInfo.HasMatchedExternalResponses - || (dispatchInfo.HasRegularMessages && !dispatchInfo.HasMatchedResponseForStartExecutor); + || !dispatchInfo.HasMatchedResponseForStartExecutor; if (shouldSendTurnToken) { await run.TrySendMessageAsync(new TurnToken(emitEvents: true)).ConfigureAwait(false); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowsJsonUtilities.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowsJsonUtilities.cs index 5e9a86fbec..08d1dcbcbb 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowsJsonUtilities.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowsJsonUtilities.cs @@ -90,7 +90,6 @@ private static JsonSerializerOptions CreateDefaultOptions() [JsonSerializable(typeof(ChatMessage))] [JsonSerializable(typeof(ExternalRequest))] [JsonSerializable(typeof(ExternalResponse))] - [JsonSerializable(typeof(Dictionary))] [JsonSerializable(typeof(TurnToken))] // Built-in Executor State Types diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs index 013c0eb2ea..0d543acec3 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs @@ -150,6 +150,62 @@ await context.SendMessageAsync( } } +/// +/// A start executor that always emits a response update on every turn, +/// useful for verifying that a TurnToken was delivered by the session. +/// On the first turn (user messages present), it kicks off a downstream executor. +/// +internal sealed class TurnTrackingStartExecutor : ChatProtocolExecutor +{ + private static readonly ChatProtocolExecutorOptions s_options = new() + { + AutoSendTurnToken = false, + }; + + private readonly string _downstreamExecutorId; + private readonly string _activatedMarker; + private int _activationCount; + + /// Gets the number of times this executor has been activated (i.e., called). + public int ActivationCount => this._activationCount; + + public TurnTrackingStartExecutor(string id, string downstreamExecutorId, string activatedMarker) + : base(id, s_options) + { + this._downstreamExecutorId = downstreamExecutorId; + this._activatedMarker = activatedMarker; + } + + protected override async ValueTask TakeTurnAsync(List messages, IWorkflowContext context, bool? emitEvents, CancellationToken cancellationToken = default) + { + Interlocked.Increment(ref this._activationCount); + + // On the first turn, forward user messages and a TurnToken to the downstream executor. + if (messages.Any(m => m.Role == ChatRole.User)) + { + await context.SendMessageAsync( + messages, + this._downstreamExecutorId, + cancellationToken).ConfigureAwait(false); + await context.SendMessageAsync( + new TurnToken(emitEvents), + this._downstreamExecutorId, + cancellationToken).ConfigureAwait(false); + } + + // Always emit a marker to prove this executor was activated. + AgentResponseUpdate update = new(ChatRole.Assistant, [new TextContent(this._activatedMarker)]) + { + CreatedAt = DateTimeOffset.UtcNow, + MessageId = Guid.NewGuid().ToString("N"), + ResponseId = Guid.NewGuid().ToString("N"), + Role = ChatRole.Assistant, + }; + + await context.AddEventAsync(new AgentResponseUpdateEvent(this.Id, update), cancellationToken).ConfigureAwait(false); + } +} + public class WorkflowHostSmokeTests { private sealed class AlwaysFailsAIAgent(bool failByThrowing) : AIAgent @@ -616,4 +672,64 @@ public async Task Test_AsAgent_UnmatchedResponse_TriggersTurnAndKeepsProgressing functionCallCount.Should().Be(1, "an unmatched response should be treated as regular input and still drive a TurnToken continuation without workflow errors"); secondCallUpdates.SelectMany(u => u.Contents.OfType()).Should().BeEmpty(); } + + /// + /// Tests that when a resume contains only an external response directed at a non-start executor + /// (no regular messages), the start executor still receives a TurnToken and is activated. + /// This is a regression test for the case where the TurnToken was previously skipped because + /// HasRegularMessages was , leaving the start executor dormant. + /// + [Fact] + public async Task Test_AsAgent_ResponseOnlyToNonStartExecutor_StartExecutorIsStillActivatedAsync() + { + // Arrange + const string StartExecutorId = "start-executor"; + const string ActivatedMarker = "start-executor-activated"; + const string CallId = "response-only-call-id"; + const string FunctionName = "responseOnlyFunction"; + + RequestEmittingAgent requestAgent = new(new FunctionCallContent(CallId, FunctionName), completeOnResponse: true); + ExecutorBinding requestBinding = requestAgent.BindAsExecutor( + new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true }); + + TurnTrackingStartExecutor startExecutor = new(StartExecutorId, requestBinding.Id, ActivatedMarker); + ExecutorBinding startBinding = startExecutor.BindExecutor(); + + Workflow workflow = new WorkflowBuilder(startBinding) + .AddEdge>(startBinding, requestBinding, messages => + messages?.Any(m => m.Contents.OfType().Any()) == true) + .AddEdge(startBinding, requestBinding, _ => true) + .Build(); + AIAgent agent = workflow.AsAIAgent("WorkflowAgent"); + + // Act 1: First call triggers the downstream FunctionCallContent request + AgentSession session = await agent.CreateSessionAsync(); + List firstCallUpdates = await agent.RunStreamingAsync( + new ChatMessage(ChatRole.User, "Start"), + session).ToListAsync(); + + FunctionCallContent emittedRequest = firstCallUpdates + .Where(u => u.RawRepresentation is RequestInfoEvent) + .SelectMany(u => u.Contents.OfType()) + .Single(); + + // Act 2: Resume with ONLY the external response (no regular messages) + List secondCallUpdates = await agent.RunStreamingAsync( + new ChatMessage(ChatRole.Tool, [new FunctionResultContent(emittedRequest.CallId, "tool output")]), + session).ToListAsync(); + + // Assert: Both the downstream and start executor should have been activated + List textContents = [.. secondCallUpdates + .SelectMany(u => u.Contents.OfType()) + .Select(c => c.Text)]; + + textContents.Should().Contain("Request processed", + "the downstream executor should process the external response"); + textContents.Should().Contain(ActivatedMarker, + "the start executor should receive a TurnToken and be activated even when resume contains only an external response"); + secondCallUpdates + .SelectMany(u => u.Contents.OfType()) + .Should() + .BeEmpty(); + } } From ed5adee38df10f8ea8ad9d4c67c401acb2a0d9d6 Mon Sep 17 00:00:00 2001 From: Peter Ibekwe Date: Mon, 23 Mar 2026 15:34:17 -0700 Subject: [PATCH 9/9] Updated MEAI changes for UserInput request and response abstractions. --- .../WorkflowSession.cs | 41 ++++--------- .../WorkflowHostSmokeTests.cs | 59 +++++++++---------- 2 files changed, 41 insertions(+), 59 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs index fcdefcfd22..db3d299ee9 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs @@ -279,8 +279,8 @@ private async ValueTask SendMessagesWithResponseConversionAs { ExternalRequest externalRequest when externalRequest.TryGetDataAs(out FunctionCallContent? functionCallContent) => CloneFunctionCallContent(functionCallContent, externalRequest.RequestId), - ExternalRequest externalRequest when externalRequest.TryGetDataAs(out UserInputRequestContent? userInputRequestContent) - => CloneUserInputRequestContent(userInputRequestContent, externalRequest.RequestId), + ExternalRequest externalRequest when externalRequest.TryGetDataAs(out ToolApprovalRequestContent? toolApprovalRequestContent) + => CloneToolApprovalRequestContent(toolApprovalRequestContent, externalRequest.RequestId), ExternalRequest externalRequest => externalRequest.ToFunctionCall(), }; @@ -292,8 +292,8 @@ ExternalRequest externalRequest { FunctionResultContent functionResultContent when request.TryGetDataAs(out FunctionCallContent? functionCallContent) => CloneFunctionResultContent(functionResultContent, functionCallContent.CallId), - UserInputResponseContent userInputResponseContent when request.TryGetDataAs(out UserInputRequestContent? userInputRequestContent) - => CloneUserInputResponseContent(userInputResponseContent, userInputRequestContent.Id), + ToolApprovalResponseContent toolApprovalResponseContent when request.TryGetDataAs(out ToolApprovalRequestContent? toolApprovalRequestContent) + => CloneToolApprovalResponseContent(toolApprovalResponseContent, toolApprovalRequestContent.RequestId), _ => content, }; @@ -303,7 +303,7 @@ UserInputResponseContent userInputResponseContent when request.TryGetDataAs(out private static string? GetResponseContentId(AIContent content) => content switch { FunctionResultContent functionResultContent => functionResultContent.CallId, - UserInputResponseContent userInputResponseContent => userInputResponseContent.Id, + ToolApprovalResponseContent toolApprovalResponseContent => toolApprovalResponseContent.RequestId, _ => null }; @@ -509,39 +509,22 @@ private static FunctionResultContent CloneFunctionResultContent(FunctionResultCo } /// - /// Clones a with a workflow-facing request ID. + /// Clones a with a workflow-facing request ID. /// - private static UserInputRequestContent CloneUserInputRequestContent(UserInputRequestContent content, string id) + private static ToolApprovalRequestContent CloneToolApprovalRequestContent(ToolApprovalRequestContent content, string id) { - UserInputRequestContent clone = content switch - { - FunctionApprovalRequestContent functionApprovalRequestContent => - new FunctionApprovalRequestContent(id, functionApprovalRequestContent.FunctionCall), - McpServerToolApprovalRequestContent mcpApprovalRequestContent => - new McpServerToolApprovalRequestContent(id, mcpApprovalRequestContent.ToolCall), - _ => throw new NotSupportedException( - $"Unsupported user input request content type '{content.GetType().Name}' for workflow request ID rewriting."), - }; - + ToolApprovalRequestContent clone = new(id, content.ToolCall); return CopyContentMetadata(content, clone); } /// - /// Clones a with an agent-owned request ID. + /// Clones a with an agent-owned request ID. /// - private static UserInputResponseContent CloneUserInputResponseContent(UserInputResponseContent content, string id) + private static ToolApprovalResponseContent CloneToolApprovalResponseContent(ToolApprovalResponseContent content, string id) { - UserInputResponseContent clone = content switch + ToolApprovalResponseContent clone = new(id, content.Approved, content.ToolCall) { - FunctionApprovalResponseContent functionApprovalResponseContent => - new FunctionApprovalResponseContent(id, functionApprovalResponseContent.Approved, functionApprovalResponseContent.FunctionCall) - { - Reason = functionApprovalResponseContent.Reason, - }, - McpServerToolApprovalResponseContent mcpApprovalResponseContent => - new McpServerToolApprovalResponseContent(id, mcpApprovalResponseContent.Approved), - _ => throw new NotSupportedException( - $"Unsupported user input response content type '{content.GetType().Name}' for workflow response ID rewriting."), + Reason = content.Reason, }; return CopyContentMetadata(content, clone); diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs index 0d543acec3..1920a57cb1 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs @@ -29,7 +29,7 @@ public ExpectedException(string? message, Exception? innerException) : base(mess } /// -/// A simple agent that emits a FunctionCallContent or UserInputRequestContent request. +/// A simple agent that emits a FunctionCallContent or ToolApprovalRequestContent request. /// Used to test that RequestInfoEvent handling preserves the original content type. /// internal sealed class RequestEmittingAgent : AIAgent @@ -44,7 +44,7 @@ internal sealed class RequestEmittingAgent : AIAgent /// /// When , the agent emits a text completion instead of re-emitting /// the request when the incoming messages contain a - /// or . This models realistic agent behaviour + /// or . This models realistic agent behaviour /// where the agent processes the tool result and produces a final answer. /// public RequestEmittingAgent(AIContent requestContent, bool completeOnResponse = false) @@ -73,7 +73,7 @@ protected override Task RunCoreAsync(IEnumerable mes protected override async IAsyncEnumerable RunCoreStreamingAsync(IEnumerable messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { if (this._completeOnResponse && messages.Any(m => m.Contents.Any(c => - c is FunctionResultContent || c is UserInputResponseContent))) + c is FunctionResultContent || c is ToolApprovalResponseContent))) { yield return new AgentResponseUpdate(ChatRole.Assistant, [new TextContent("Request processed")]); } @@ -328,16 +328,16 @@ public async Task Test_AsAgent_FunctionCallContentPreservedInRequestInfoAsync() } /// - /// Tests that when a workflow emits a RequestInfoEvent with UserInputRequestContent data, - /// the AgentResponseUpdate preserves the original UserInputRequestContent type. + /// Tests that when a workflow emits a RequestInfoEvent with ToolApprovalRequestContent data, + /// the AgentResponseUpdate preserves the original ToolApprovalRequestContent type. /// [Fact] - public async Task Test_AsAgent_UserInputRequestContentPreservedInRequestInfoAsync() + public async Task Test_AsAgent_ToolApprovalRequestContentPreservedInRequestInfoAsync() { // Arrange const string RequestId = "test-request-id"; McpServerToolCallContent mcpCall = new("call-id", "testToolName", "http://localhost"); - UserInputRequestContent originalContent = new McpServerToolApprovalRequestContent(RequestId, mcpCall); + ToolApprovalRequestContent originalContent = new(RequestId, mcpCall); RequestEmittingAgent requestAgent = new(originalContent); ExecutorBinding agentBinding = requestAgent.BindAsExecutor( new AIAgentHostOptions { InterceptUserInputRequests = false, EmitAgentUpdateEvents = true }); @@ -350,17 +350,17 @@ public async Task Test_AsAgent_UserInputRequestContentPreservedInRequestInfoAsyn // Assert AgentResponseUpdate? updateWithUserInput = updates.FirstOrDefault(u => - u.RawRepresentation is RequestInfoEvent && u.Contents.Any(c => c is UserInputRequestContent)); + u.RawRepresentation is RequestInfoEvent && u.Contents.Any(c => c is ToolApprovalRequestContent)); - updateWithUserInput.Should().NotBeNull("a UserInputRequestContent should be present in the response updates"); - UserInputRequestContent retrievedContent = updateWithUserInput!.Contents - .OfType() + updateWithUserInput.Should().NotBeNull("a ToolApprovalRequestContent should be present in the response updates"); + ToolApprovalRequestContent retrievedContent = updateWithUserInput!.Contents + .OfType() .Should().ContainSingle() .Which; - retrievedContent.Should().BeOfType(); - retrievedContent.Id.Should().NotBe(RequestId); - retrievedContent.Id.Should().EndWith($":{RequestId}"); + retrievedContent.Should().NotBeNull(); + retrievedContent.RequestId.Should().NotBe(RequestId); + retrievedContent.RequestId.Should().EndWith($":{RequestId}"); } /// @@ -414,41 +414,40 @@ public async Task Test_AsAgent_FunctionCallRoundtrip_ResponseIsProcessedAsync() } /// - /// Tests the full roundtrip for UserInputRequestContent: workflow emits request, external caller responds. - /// Verifying inbound UserInputResponseContent conversion. + /// Tests the full roundtrip for ToolApprovalRequestContent: workflow emits request, external caller responds. + /// Verifying inbound ToolApprovalResponseContent conversion. /// [Fact] - public async Task Test_AsAgent_UserInputRoundtrip_ResponseIsProcessedAsync() + public async Task Test_AsAgent_ToolApprovalRoundtrip_ResponseIsProcessedAsync() { - // Arrange: Create an agent that emits a UserInputRequestContent request + // Arrange: Create an agent that emits a ToolApprovalRequestContent request const string RequestId = "roundtrip-request-id"; McpServerToolCallContent mcpCall = new("mcp-call-id", "testMcpTool", "http://localhost"); - McpServerToolApprovalRequestContent requestContent = new(RequestId, mcpCall); + ToolApprovalRequestContent requestContent = new(RequestId, mcpCall); RequestEmittingAgent requestAgent = new(requestContent, completeOnResponse: true); ExecutorBinding agentBinding = requestAgent.BindAsExecutor( new AIAgentHostOptions { InterceptUserInputRequests = false, EmitAgentUpdateEvents = true }); Workflow workflow = new WorkflowBuilder(agentBinding).Build(); AIAgent agent = workflow.AsAIAgent("WorkflowAgent"); - // Act 1: First call - should receive the UserInputRequestContent request + // Act 1: First call - should receive the ToolApprovalRequestContent request AgentSession session = await agent.CreateSessionAsync(); List firstCallUpdates = await agent.RunStreamingAsync( new ChatMessage(ChatRole.User, "Start"), session).ToListAsync(); - // Assert 1: We should have received a UserInputRequestContent + // Assert 1: We should have received a ToolApprovalRequestContent AgentResponseUpdate? updateWithRequest = firstCallUpdates.FirstOrDefault(u => - u.RawRepresentation is RequestInfoEvent && u.Contents.Any(c => c is UserInputRequestContent)); - updateWithRequest.Should().NotBeNull("a UserInputRequestContent should be present in the response updates"); + u.RawRepresentation is RequestInfoEvent && u.Contents.Any(c => c is ToolApprovalRequestContent)); + updateWithRequest.Should().NotBeNull("a ToolApprovalRequestContent should be present in the response updates"); - UserInputRequestContent receivedRequest = updateWithRequest!.Contents - .OfType() + ToolApprovalRequestContent receivedRequest = updateWithRequest!.Contents + .OfType() .First(); - receivedRequest.Id.Should().EndWith($":{RequestId}"); - receivedRequest.Should().BeOfType(); + receivedRequest.RequestId.Should().EndWith($":{RequestId}"); // Act 2: Send the response back - use CreateResponse to get the right response type - UserInputResponseContent responseContent = ((McpServerToolApprovalRequestContent)receivedRequest).CreateResponse(approved: true); + ToolApprovalResponseContent responseContent = receivedRequest.CreateResponse(approved: true); ChatMessage responseMessage = new(ChatRole.User, [responseContent]); // Act 2: Run the workflow again with the response and capture the updates @@ -458,8 +457,8 @@ public async Task Test_AsAgent_UserInputRoundtrip_ResponseIsProcessedAsync() secondCallUpdates.Should().NotBeEmpty("handling the user input response should produce follow-up updates"); bool requestStillPresent = secondCallUpdates.Any(u => u.RawRepresentation is RequestInfoEvent - && u.Contents.OfType().Any(r => r.Id == receivedRequest.Id)); - requestStillPresent.Should().BeFalse("the original UserInputRequestContent should not be re-emitted after its response is processed"); + && u.Contents.OfType().Any(r => r.RequestId == receivedRequest.RequestId)); + requestStillPresent.Should().BeFalse("the original ToolApprovalRequestContent should not be re-emitted after its response is processed"); } ///