diff --git a/dotnet/src/Microsoft.Agents.AI.Hosting.A2A/A2AAgentHandler.cs b/dotnet/src/Microsoft.Agents.AI.Hosting.A2A/A2AAgentHandler.cs index 65bc4c30bd..2113d273e0 100644 --- a/dotnet/src/Microsoft.Agents.AI.Hosting.A2A/A2AAgentHandler.cs +++ b/dotnet/src/Microsoft.Agents.AI.Hosting.A2A/A2AAgentHandler.cs @@ -42,11 +42,19 @@ public A2AAgentHandler( /// public Task ExecuteAsync(RequestContext context, AgentEventQueue eventQueue, CancellationToken cancellationToken) { + // Handle task updates if (context.IsContinuation) { return this.HandleTaskUpdateAsync(context, eventQueue, cancellationToken); } + // Handle messages received via streaming endpoint + if (context.StreamingResponse) + { + return this.HandleNewMessageStreamingAsync(context, eventQueue, cancellationToken); + } + + // Handle new messages received via non-streaming endpoint return this.HandleNewMessageAsync(context, eventQueue, cancellationToken); } @@ -80,13 +88,19 @@ private async Task HandleNewMessageAsync(RequestContext context, AgentEventQueue ? new AgentRunOptions { AllowBackgroundResponses = allowBackgroundResponses } : new AgentRunOptions { AllowBackgroundResponses = allowBackgroundResponses, AdditionalProperties = context.Metadata.ToAdditionalProperties() }; - var response = await this._hostAgent.RunAsync( - chatMessages, - session: session, - options: options, - cancellationToken: cancellationToken).ConfigureAwait(false); - - await this._hostAgent.SaveSessionAsync(contextId, session, cancellationToken).ConfigureAwait(false); + AgentResponse response; + try + { + response = await this._hostAgent.RunAsync( + chatMessages, + session: session, + options: options, + cancellationToken: cancellationToken).ConfigureAwait(false); + } + finally + { + await this._hostAgent.SaveSessionAsync(contextId, session, CancellationToken.None).ConfigureAwait(false); + } if (response.ContinuationToken is null) { @@ -108,6 +122,39 @@ private async Task HandleNewMessageAsync(RequestContext context, AgentEventQueue } } + private async Task HandleNewMessageStreamingAsync(RequestContext context, AgentEventQueue eventQueue, CancellationToken cancellationToken) + { + var contextId = context.ContextId ?? Guid.NewGuid().ToString("N"); + var session = await this._hostAgent.GetOrCreateSessionAsync(contextId, cancellationToken).ConfigureAwait(false); + + // AIAgent does not support resuming from arbitrary prior tasks. + // Throw explicitly so the client gets a clear error rather than a response + // that silently ignores the referenced task context. + if (context.Message?.ReferenceTaskIds is { Count: > 0 }) + { + throw new NotSupportedException("ReferenceTaskIds is not supported. AIAgent cannot resume from arbitrary prior task context."); + } + + List chatMessages = context.Message is not null ? [context.Message.ToChatMessage()] : []; + + var options = context.Metadata is { Count: > 0 } + ? new AgentRunOptions { AdditionalProperties = context.Metadata.ToAdditionalProperties() } + : null; + + try + { + await foreach (var update in this._hostAgent.RunStreamingAsync(chatMessages, session, options, cancellationToken).ConfigureAwait(false)) + { + var message = CreateMessageFromUpdate(contextId, update); + await eventQueue.EnqueueMessageAsync(message, cancellationToken).ConfigureAwait(false); + } + } + finally + { + await this._hostAgent.SaveSessionAsync(contextId, session, CancellationToken.None).ConfigureAwait(false); + } + } + private async Task HandleTaskUpdateAsync(RequestContext context, AgentEventQueue eventQueue, CancellationToken cancellationToken) { var contextId = context.ContextId ?? Guid.NewGuid().ToString("N"); @@ -141,8 +188,10 @@ private async Task HandleTaskUpdateAsync(RequestContext context, AgentEventQueue await failUpdater.FailAsync(message: null, CancellationToken.None).ConfigureAwait(false); throw; } - - await this._hostAgent.SaveSessionAsync(contextId, session, cancellationToken).ConfigureAwait(false); + finally + { + await this._hostAgent.SaveSessionAsync(contextId, session, CancellationToken.None).ConfigureAwait(false); + } if (response.ContinuationToken is null) { @@ -174,6 +223,16 @@ private static Message CreateMessageFromResponse(string contextId, AgentResponse Metadata = response.AdditionalProperties?.ToA2AMetadata() }; + private static Message CreateMessageFromUpdate(string contextId, AgentResponseUpdate update) => + new() + { + MessageId = update.ResponseId ?? Guid.NewGuid().ToString("N"), + ContextId = contextId, + Role = Role.Agent, + Parts = update.ToParts(), + Metadata = update.AdditionalProperties?.ToA2AMetadata() + }; + private static List ExtractChatMessagesFromTaskHistory(AgentTask? agentTask) { if (agentTask?.History is not { Count: > 0 }) diff --git a/dotnet/src/Microsoft.Agents.AI.Hosting.A2A/Converters/MessageConverter.cs b/dotnet/src/Microsoft.Agents.AI.Hosting.A2A/Converters/MessageConverter.cs index b2f57fc09e..a231f322c4 100644 --- a/dotnet/src/Microsoft.Agents.AI.Hosting.A2A/Converters/MessageConverter.cs +++ b/dotnet/src/Microsoft.Agents.AI.Hosting.A2A/Converters/MessageConverter.cs @@ -8,6 +8,26 @@ namespace Microsoft.Agents.AI.Hosting.A2A.Converters; internal static class MessageConverter { + public static List ToParts(this AgentResponseUpdate update) + { + if (update is null || update.Contents is not { Count: > 0 }) + { + return []; + } + + var parts = new List(); + foreach (var content in update.Contents) + { + var part = content.ToPart(); + if (part is not null) + { + parts.Add(part); + } + } + + return parts; + } + public static List ToParts(this IList chatMessages) { if (chatMessages is null || chatMessages.Count == 0) diff --git a/dotnet/tests/Microsoft.Agents.AI.Hosting.A2A.UnitTests/A2AAgentHandlerTests.cs b/dotnet/tests/Microsoft.Agents.AI.Hosting.A2A.UnitTests/A2AAgentHandlerTests.cs index 6558144184..b54b3d7db7 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Hosting.A2A.UnitTests/A2AAgentHandlerTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Hosting.A2A.UnitTests/A2AAgentHandlerTests.cs @@ -586,6 +586,457 @@ await handler.CancelAsync( #pragma warning restore MEAI001 + /// + /// Verifies that in streaming mode, each update from RunStreamingAsync produces a message event. + /// + [Fact] + public async Task ExecuteAsync_Streaming_EnqueuesMessageForEachUpdateAsync() + { + // Arrange + AgentResponseUpdate[] updates = + [ + new AgentResponseUpdate(ChatRole.Assistant, "chunk 1") { ResponseId = "r1" }, + new AgentResponseUpdate(ChatRole.Assistant, "chunk 2") { ResponseId = "r2" } + ]; + A2AAgentHandler handler = CreateHandler(CreateStreamingAgentMock(updates)); + + // Act + var events = await CollectEventsAsync(handler, new RequestContext + { + StreamingResponse = true, + TaskId = "", + ContextId = "ctx", + Message = new Message { MessageId = "test-id", Role = Role.User, Parts = [new Part { Text = "Hello" }] } + }); + + // Assert + Assert.Equal(2, events.Messages.Count); + Assert.Equal("chunk 1", events.Messages[0].Parts![0].Text); + Assert.Equal("chunk 2", events.Messages[1].Parts![0].Text); + } + + /// + /// Verifies that in streaming mode, when metadata is present, options with AdditionalProperties + /// are passed to RunStreamingAsync. + /// + [Fact] + public async Task ExecuteAsync_Streaming_WithMetadata_PassesOptionsWithAdditionalPropertiesAsync() + { + // Arrange + AgentRunOptions? capturedOptions = null; + A2AAgentHandler handler = CreateHandler(CreateStreamingAgentMockWithOptionsCapture( + options => capturedOptions = options)); + + // Act + await InvokeExecuteAsync(handler, new RequestContext + { + StreamingResponse = true, + TaskId = "", + ContextId = "ctx", + Message = new Message { MessageId = "test-id", Role = Role.User, Parts = [new Part { Text = "Hello" }] }, + Metadata = new Dictionary + { + ["key1"] = JsonSerializer.SerializeToElement("value1") + } + }); + + // Assert + Assert.NotNull(capturedOptions); + Assert.NotNull(capturedOptions.AdditionalProperties); + Assert.Equal("value1", capturedOptions.AdditionalProperties["key1"]?.ToString()); + } + + /// + /// Verifies that in streaming mode, when metadata is null, null options are passed to RunStreamingAsync. + /// + [Fact] + public async Task ExecuteAsync_Streaming_WithNullMetadata_PassesNullOptionsAsync() + { + // Arrange + AgentRunOptions? capturedOptions = null; + bool optionsCaptured = false; + A2AAgentHandler handler = CreateHandler(CreateStreamingAgentMockWithOptionsCapture( + options => { capturedOptions = options; optionsCaptured = true; })); + + // Act + await InvokeExecuteAsync(handler, new RequestContext + { + StreamingResponse = true, + TaskId = "", + ContextId = "ctx", + Message = new Message { MessageId = "test-id", Role = Role.User, Parts = [new Part { Text = "Hello" }] } + }); + + // Assert + Assert.True(optionsCaptured); + Assert.Null(capturedOptions); + } + + /// + /// Verifies that in streaming mode, ReferenceTaskIds throws NotSupportedException. + /// + [Fact] + public async Task ExecuteAsync_Streaming_WithReferenceTaskIds_ThrowsNotSupportedExceptionAsync() + { + // Arrange + A2AAgentHandler handler = CreateHandler(CreateStreamingAgentMock([])); + + // Act & Assert + var eventQueue = new AgentEventQueue(); + await Assert.ThrowsAsync(() => + handler.ExecuteAsync( + new RequestContext + { + StreamingResponse = true, + TaskId = "", + ContextId = "ctx", + Message = new Message + { + MessageId = "test-id", + Role = Role.User, + Parts = [new Part { Text = "Hello" }], + ReferenceTaskIds = ["other-task-id"] + } + }, + eventQueue, + CancellationToken.None)); + } + + /// + /// Verifies that in streaming mode, when ContextId is null, a new one is generated. + /// + [Fact] + public async Task ExecuteAsync_Streaming_WhenContextIdIsNull_GeneratesContextIdAsync() + { + // Arrange + AgentResponseUpdate[] updates = + [ + new AgentResponseUpdate(ChatRole.Assistant, "Reply") { ResponseId = "r1" } + ]; + A2AAgentHandler handler = CreateHandler(CreateStreamingAgentMock(updates)); + + // Act + var events = await CollectEventsAsync(handler, new RequestContext + { + StreamingResponse = true, + TaskId = "", + ContextId = null!, + Message = new Message { MessageId = "test-id", Role = Role.User, Parts = [new Part { Text = "Hello" }] } + }); + + // Assert + Message message = Assert.Single(events.Messages); + Assert.NotNull(message.ContextId); + Assert.NotEmpty(message.ContextId); + } + + /// + /// Verifies that in streaming mode, the provided ContextId is used in the response. + /// + [Fact] + public async Task ExecuteAsync_Streaming_UsesProvidedContextIdAsync() + { + // Arrange + AgentResponseUpdate[] updates = + [ + new AgentResponseUpdate(ChatRole.Assistant, "Reply") { ResponseId = "r1" } + ]; + A2AAgentHandler handler = CreateHandler(CreateStreamingAgentMock(updates)); + + // Act + var events = await CollectEventsAsync(handler, new RequestContext + { + StreamingResponse = true, + TaskId = "", + ContextId = "my-streaming-ctx", + Message = new Message { MessageId = "test-id", Role = Role.User, Parts = [new Part { Text = "Hello" }] } + }); + + // Assert + Message message = Assert.Single(events.Messages); + Assert.Equal("my-streaming-ctx", message.ContextId); + } + + /// + /// Verifies that in streaming mode, when Message is null, the handler succeeds with empty messages. + /// + [Fact] + public async Task ExecuteAsync_Streaming_WhenMessageIsNull_SucceedsWithEmptyMessagesAsync() + { + // Arrange + AgentResponseUpdate[] updates = + [ + new AgentResponseUpdate(ChatRole.Assistant, "Reply") { ResponseId = "r1" } + ]; + A2AAgentHandler handler = CreateHandler(CreateStreamingAgentMock(updates)); + + // Act + var events = await CollectEventsAsync(handler, new RequestContext + { + StreamingResponse = true, + TaskId = "", + ContextId = "ctx", + Message = null! + }); + + // Assert + Message message = Assert.Single(events.Messages); + Assert.Equal("ctx", message.ContextId); + } + + /// + /// Verifies that in streaming mode, the ResponseId from the update is used as the MessageId in the response. + /// + [Fact] + public async Task ExecuteAsync_Streaming_ResponseIdIsUsedAsMessageIdAsync() + { + // Arrange + AgentResponseUpdate[] updates = + [ + new AgentResponseUpdate(ChatRole.Assistant, "chunk") { ResponseId = "resp-42" } + ]; + A2AAgentHandler handler = CreateHandler(CreateStreamingAgentMock(updates)); + + // Act + var events = await CollectEventsAsync(handler, new RequestContext + { + StreamingResponse = true, + TaskId = "", + ContextId = "ctx", + Message = new Message { MessageId = "test-id", Role = Role.User, Parts = [new Part { Text = "Hello" }] } + }); + + // Assert + Message message = Assert.Single(events.Messages); + Assert.Equal("resp-42", message.MessageId); + } + + /// + /// Verifies that in streaming mode, when ResponseId is null, a MessageId is still generated. + /// + [Fact] + public async Task ExecuteAsync_Streaming_WhenResponseIdIsNull_GeneratesMessageIdAsync() + { + // Arrange + AgentResponseUpdate[] updates = + [ + new AgentResponseUpdate(ChatRole.Assistant, "chunk") { ResponseId = null } + ]; + A2AAgentHandler handler = CreateHandler(CreateStreamingAgentMock(updates)); + + // Act + var events = await CollectEventsAsync(handler, new RequestContext + { + StreamingResponse = true, + TaskId = "", + ContextId = "ctx", + Message = new Message { MessageId = "test-id", Role = Role.User, Parts = [new Part { Text = "Hello" }] } + }); + + // Assert + Message message = Assert.Single(events.Messages); + Assert.NotNull(message.MessageId); + Assert.NotEmpty(message.MessageId); + } + + /// + /// Verifies that in streaming mode, when the update has AdditionalProperties, the message has metadata. + /// + [Fact] + public async Task ExecuteAsync_Streaming_WithResponseAdditionalProperties_ReturnsMessageWithMetadataAsync() + { + // Arrange + AdditionalPropertiesDictionary additionalProps = new() + { + ["streamKey"] = "streamValue" + }; + AgentResponseUpdate[] updates = + [ + new AgentResponseUpdate(ChatRole.Assistant, "chunk") { ResponseId = "r1", AdditionalProperties = additionalProps } + ]; + A2AAgentHandler handler = CreateHandler(CreateStreamingAgentMock(updates)); + + // Act + var events = await CollectEventsAsync(handler, new RequestContext + { + StreamingResponse = true, + TaskId = "", + ContextId = "ctx", + Message = new Message { MessageId = "test-id", Role = Role.User, Parts = [new Part { Text = "Hello" }] } + }); + + // Assert + Message message = Assert.Single(events.Messages); + Assert.NotNull(message.Metadata); + Assert.True(message.Metadata.ContainsKey("streamKey")); + } + + /// + /// Verifies that in streaming mode, when the update has null AdditionalProperties, the message has null metadata. + /// + [Fact] + public async Task ExecuteAsync_Streaming_WithNullAdditionalProperties_ReturnsMessageWithNullMetadataAsync() + { + // Arrange + AgentResponseUpdate[] updates = + [ + new AgentResponseUpdate(ChatRole.Assistant, "chunk") { ResponseId = "r1", AdditionalProperties = null } + ]; + A2AAgentHandler handler = CreateHandler(CreateStreamingAgentMock(updates)); + + // Act + var events = await CollectEventsAsync(handler, new RequestContext + { + StreamingResponse = true, + TaskId = "", + ContextId = "ctx", + Message = new Message { MessageId = "test-id", Role = Role.User, Parts = [new Part { Text = "Hello" }] } + }); + + // Assert + Message message = Assert.Single(events.Messages); + Assert.Null(message.Metadata); + } + + /// + /// Verifies that in streaming mode, the session is saved after all updates are processed. + /// + [Fact] + public async Task ExecuteAsync_Streaming_SavesSessionAfterProcessingAsync() + { + // Arrange + var mockSessionStore = new Mock(); + mockSessionStore + .Setup(x => x.GetSessionAsync( + It.IsAny(), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(new TestAgentSession()); + mockSessionStore + .Setup(x => x.SaveSessionAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns(ValueTask.CompletedTask); + + AgentResponseUpdate[] updates = + [ + new AgentResponseUpdate(ChatRole.Assistant, "chunk") { ResponseId = "r1" } + ]; + A2AAgentHandler handler = CreateHandler(CreateStreamingAgentMock(updates), agentSessionStore: mockSessionStore.Object); + + // Act + await InvokeExecuteAsync(handler, new RequestContext + { + StreamingResponse = true, + TaskId = "", + ContextId = "ctx-stream", + Message = new Message { MessageId = "test-id", Role = Role.User, Parts = [new Part { Text = "Hello" }] } + }); + + // Assert - verify session was saved + mockSessionStore.Verify( + x => x.SaveSessionAsync( + It.IsAny(), + It.Is(s => s == "ctx-stream"), + It.IsAny(), + It.IsAny()), + Times.Once); + } + + /// + /// Verifies that in streaming mode, when RunStreamingAsync yields no updates, + /// no messages are enqueued and the session is still saved. + /// + [Fact] + public async Task ExecuteAsync_Streaming_WhenNoUpdates_EnqueuesNoMessagesAndSavesSessionAsync() + { + // Arrange + var mockSessionStore = new Mock(); + mockSessionStore + .Setup(x => x.GetSessionAsync( + It.IsAny(), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(new TestAgentSession()); + mockSessionStore + .Setup(x => x.SaveSessionAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns(ValueTask.CompletedTask); + + A2AAgentHandler handler = CreateHandler(CreateStreamingAgentMock([]), agentSessionStore: mockSessionStore.Object); + + // Act + var events = await CollectEventsAsync(handler, new RequestContext + { + StreamingResponse = true, + TaskId = "", + ContextId = "ctx", + Message = new Message { MessageId = "test-id", Role = Role.User, Parts = [new Part { Text = "Hello" }] } + }); + + // Assert + Assert.Empty(events.Messages); + mockSessionStore.Verify( + x => x.SaveSessionAsync( + It.IsAny(), + It.Is(s => s == "ctx"), + It.IsAny(), + It.IsAny()), + Times.Once); + } + + /// + /// Verifies that the CancellationToken is propagated to RunStreamingAsync in the streaming path. + /// + [Fact] + public async Task ExecuteAsync_Streaming_CancellationTokenIsPropagatedToRunStreamingAsync() + { + // Arrange + CancellationToken capturedToken = default; + using var cts = new CancellationTokenSource(); + + Mock agentMock = new() { CallBase = true }; + agentMock.SetupGet(x => x.Name).Returns("TestAgent"); + agentMock + .Protected() + .Setup>("CreateSessionCoreAsync", ItExpr.IsAny()) + .ReturnsAsync(new TestAgentSession()); + agentMock + .Protected() + .Setup>("RunCoreStreamingAsync", + ItExpr.IsAny>(), + ItExpr.IsAny(), + ItExpr.IsAny(), + ItExpr.IsAny()) + .Callback, AgentSession?, AgentRunOptions?, CancellationToken>( + (_, _, _, ct) => capturedToken = ct) + .Returns(() => ToAsyncEnumerableAsync([new AgentResponseUpdate(ChatRole.Assistant, "reply") { ResponseId = "r1" }])); + + A2AAgentHandler handler = CreateHandler(agentMock); + + // Act + var eventQueue = new AgentEventQueue(); + await handler.ExecuteAsync( + new RequestContext + { + TaskId = "", + ContextId = "ctx", + StreamingResponse = true, + Message = new Message { MessageId = "test-id", Role = Role.User, Parts = [new Part { Text = "Hello" }] } + }, + eventQueue, + cts.Token); + eventQueue.Complete(null); + + // Assert + Assert.Equal(cts.Token, capturedToken); + } + /// /// Verifies that when no session store is provided, the handler uses InMemoryAgentSessionStore /// and can execute successfully. @@ -656,169 +1107,471 @@ public async Task Handler_WithCustomSessionStore_UsesProvidedSessionStoreAsync() } }); - // Assert - verify the custom session store was called - mockSessionStore.Verify( - x => x.GetSessionAsync( - It.IsAny(), - It.Is(s => s == "ctx-1"), - It.IsAny()), - Times.Once); + // Assert - verify the custom session store was called + mockSessionStore.Verify( + x => x.GetSessionAsync( + It.IsAny(), + It.Is(s => s == "ctx-1"), + It.IsAny()), + Times.Once); + mockSessionStore.Verify( + x => x.SaveSessionAsync( + It.IsAny(), + It.Is(s => s == "ctx-1"), + It.IsAny(), + It.IsAny()), + Times.Once); + } + + /// + /// Verifies that when no session store is provided, the default InMemoryAgentSessionStore + /// persists sessions across multiple calls with the same context ID. + /// + [Fact] + public async Task Handler_WithNullSessionStore_SessionIsPersistedAcrossCallsAsync() + { + // Arrange - track how many times CreateSessionCoreAsync is called + int createSessionCallCount = 0; + var sessionInstance = new TestAgentSession(); + + Mock agentMock = new() { CallBase = true }; + agentMock.SetupGet(x => x.Name).Returns("TestAgent"); + agentMock + .Protected() + .Setup>("CreateSessionCoreAsync", ItExpr.IsAny()) + .Callback(() => Interlocked.Increment(ref createSessionCallCount)) + .ReturnsAsync(() => new TestAgentSession()); + agentMock + .Protected() + .Setup>("SerializeSessionCoreAsync", + ItExpr.IsAny(), + ItExpr.IsAny(), + ItExpr.IsAny()) + .ReturnsAsync(JsonDocument.Parse("{}").RootElement); + agentMock + .Protected() + .Setup>("DeserializeSessionCoreAsync", + ItExpr.IsAny(), + ItExpr.IsAny(), + ItExpr.IsAny()) + .ReturnsAsync(sessionInstance); + agentMock + .Protected() + .Setup>("RunCoreAsync", + ItExpr.IsAny>(), + ItExpr.IsAny(), + ItExpr.IsAny(), + ItExpr.IsAny()) + .ReturnsAsync(new AgentResponse([new ChatMessage(ChatRole.Assistant, "Reply")])); + + A2AAgentHandler handler = CreateHandler(agentMock, agentSessionStore: null); + + var context = new RequestContext + { + StreamingResponse = false, + TaskId = "", + ContextId = "ctx-persistent", + Message = new Message + { + MessageId = "test-id", + Role = Role.User, + Parts = [new Part { Text = "Hello" }] + } + }; + + // Act - call twice with the same context ID + await InvokeExecuteAsync(handler, context); + await InvokeExecuteAsync(handler, context); + + // Assert - CreateSessionCoreAsync should be called once (first call creates, second retrieves from store) + Assert.Equal(1, createSessionCallCount); + } + + /// + /// Verifies that when the AllowBackgroundWhen delegate throws, the exception propagates + /// and the agent is not invoked. + /// + [Fact] + public async Task ExecuteAsync_DynamicMode_WhenCallbackThrows_PropagatesExceptionAsync() + { + // Arrange + bool agentInvoked = false; + A2AAgentHandler handler = CreateHandler( + CreateAgentMock(_ => agentInvoked = true), + runMode: AgentRunMode.AllowBackgroundWhen((_, _) => + throw new InvalidOperationException("Callback failed"))); + + // Act & Assert + await Assert.ThrowsAsync(() => + InvokeExecuteAsync(handler, new RequestContext + { + TaskId = "", ContextId = "ctx", StreamingResponse = false, Message = new Message { MessageId = "test-id", Role = Role.User, Parts = [new Part { Text = "Hello" }] } + })); + + Assert.False(agentInvoked); + } + + /// + /// Verifies that the CancellationToken is propagated to the AllowBackgroundWhen delegate. + /// + [Fact] + public async Task ExecuteAsync_DynamicMode_CancellationTokenIsPropagatedToCallbackAsync() + { + // Arrange + CancellationToken capturedToken = default; + using var cts = new CancellationTokenSource(); + A2AAgentHandler handler = CreateHandler( + CreateAgentMock(_ => { }), + runMode: AgentRunMode.AllowBackgroundWhen((_, ct) => + { + capturedToken = ct; + return ValueTask.FromResult(false); + })); + + // Act + var eventQueue = new AgentEventQueue(); + await handler.ExecuteAsync( + new RequestContext + { + TaskId = "", ContextId = "ctx", StreamingResponse = false, Message = new Message { MessageId = "test-id", Role = Role.User, Parts = [new Part { Text = "Hello" }] } + }, + eventQueue, + cts.Token); + eventQueue.Complete(null); + + // Assert + Assert.Equal(cts.Token, capturedToken); + } + + /// + /// Verifies that the agent run mode is applied on the continuation/task-update path, + /// not just the new message path. + /// + [Fact] + public async Task ExecuteAsync_OnContinuation_RunModeIsAppliedAsync() + { + // Arrange + AgentRunOptions? capturedOptions = null; + A2AAgentHandler handler = CreateHandler( + CreateAgentMock(options => capturedOptions = options), + runMode: AgentRunMode.AllowBackgroundIfSupported); + + // Act + await InvokeExecuteAsync(handler, new RequestContext + { + StreamingResponse = false, + TaskId = "task-1", + ContextId = "ctx-1", + Message = new Message { MessageId = "empty", Role = Role.User, Parts = [] }, + + Task = new AgentTask { Id = "task-1", ContextId = "ctx-1", History = [new Message { Role = Role.User, Parts = [new Part { Text = "Hello" }] }] } + }); + + // Assert + Assert.NotNull(capturedOptions); + Assert.True(capturedOptions.AllowBackgroundResponses); + } + + /// + /// Verifies that in the non-streaming path, SaveSessionAsync is called with + /// CancellationToken.None even when RunAsync throws an exception. + /// + [Fact] + public async Task ExecuteAsync_NonStreaming_WhenRunAsyncThrows_SavesSessionWithUncancelledTokenAsync() + { + // Arrange + var mockSessionStore = new Mock(); + mockSessionStore + .Setup(x => x.GetSessionAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new TestAgentSession()); + mockSessionStore + .Setup(x => x.SaveSessionAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(ValueTask.CompletedTask); + + Mock agentMock = new() { CallBase = true }; + agentMock.SetupGet(x => x.Name).Returns("TestAgent"); + agentMock.Protected() + .Setup>("CreateSessionCoreAsync", ItExpr.IsAny()) + .ReturnsAsync(new TestAgentSession()); + agentMock.Protected() + .Setup>("RunCoreAsync", + ItExpr.IsAny>(), + ItExpr.IsAny(), + ItExpr.IsAny(), + ItExpr.IsAny()) + .ThrowsAsync(new InvalidOperationException("Agent failed")); + + using var cts = new CancellationTokenSource(); + A2AAgentHandler handler = CreateHandler(agentMock, agentSessionStore: mockSessionStore.Object); + + // Act + var eventQueue = new AgentEventQueue(); + await Assert.ThrowsAsync(() => + handler.ExecuteAsync( + new RequestContext + { + TaskId = "", ContextId = "ctx", StreamingResponse = false, + Message = new Message { MessageId = "test-id", Role = Role.User, Parts = [new Part { Text = "Hello" }] } + }, + eventQueue, + cts.Token)); + + // Assert - SaveSessionAsync was called with CancellationToken.None despite the exception + mockSessionStore.Verify( + x => x.SaveSessionAsync( + It.IsAny(), + It.Is(s => s == "ctx"), + It.IsAny(), + It.Is(ct => ct == CancellationToken.None)), + Times.Once); + } + + /// + /// Verifies that in the streaming path, SaveSessionAsync is called with + /// CancellationToken.None even when RunStreamingAsync throws an exception. + /// + [Fact] + public async Task ExecuteAsync_Streaming_WhenRunStreamingAsyncThrows_SavesSessionWithUncancelledTokenAsync() + { + // Arrange + var mockSessionStore = new Mock(); + mockSessionStore + .Setup(x => x.GetSessionAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new TestAgentSession()); + mockSessionStore + .Setup(x => x.SaveSessionAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(ValueTask.CompletedTask); + + Mock agentMock = new() { CallBase = true }; + agentMock.SetupGet(x => x.Name).Returns("TestAgent"); + agentMock.Protected() + .Setup>("CreateSessionCoreAsync", ItExpr.IsAny()) + .ReturnsAsync(new TestAgentSession()); + agentMock.Protected() + .Setup>("RunCoreStreamingAsync", + ItExpr.IsAny>(), + ItExpr.IsAny(), + ItExpr.IsAny(), + ItExpr.IsAny()) + .Returns(() => ToThrowingAsyncEnumerableAsync(new InvalidOperationException("Stream failed"))); + + using var cts = new CancellationTokenSource(); + A2AAgentHandler handler = CreateHandler(agentMock, agentSessionStore: mockSessionStore.Object); + + // Act + var eventQueue = new AgentEventQueue(); + await Assert.ThrowsAsync(() => + handler.ExecuteAsync( + new RequestContext + { + TaskId = "", ContextId = "ctx-stream", StreamingResponse = true, + Message = new Message { MessageId = "test-id", Role = Role.User, Parts = [new Part { Text = "Hello" }] } + }, + eventQueue, + cts.Token)); + + // Assert - SaveSessionAsync was called with CancellationToken.None despite the exception mockSessionStore.Verify( x => x.SaveSessionAsync( It.IsAny(), - It.Is(s => s == "ctx-1"), + It.Is(s => s == "ctx-stream"), It.IsAny(), - It.IsAny()), + It.Is(ct => ct == CancellationToken.None)), Times.Once); } /// - /// Verifies that when no session store is provided, the default InMemoryAgentSessionStore - /// persists sessions across multiple calls with the same context ID. + /// Verifies that on the continuation path, SaveSessionAsync is called with + /// CancellationToken.None even when RunAsync throws an exception. /// [Fact] - public async Task Handler_WithNullSessionStore_SessionIsPersistedAcrossCallsAsync() + public async Task ExecuteAsync_OnContinuation_WhenRunAsyncThrows_SavesSessionWithUncancelledTokenAsync() { - // Arrange - track how many times CreateSessionCoreAsync is called - int createSessionCallCount = 0; - var sessionInstance = new TestAgentSession(); + // Arrange + var mockSessionStore = new Mock(); + mockSessionStore + .Setup(x => x.GetSessionAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new TestAgentSession()); + mockSessionStore + .Setup(x => x.SaveSessionAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(ValueTask.CompletedTask); Mock agentMock = new() { CallBase = true }; agentMock.SetupGet(x => x.Name).Returns("TestAgent"); - agentMock - .Protected() + agentMock.Protected() .Setup>("CreateSessionCoreAsync", ItExpr.IsAny()) - .Callback(() => Interlocked.Increment(ref createSessionCallCount)) - .ReturnsAsync(() => new TestAgentSession()); - agentMock - .Protected() - .Setup>("SerializeSessionCoreAsync", - ItExpr.IsAny(), - ItExpr.IsAny(), - ItExpr.IsAny()) - .ReturnsAsync(JsonDocument.Parse("{}").RootElement); - agentMock - .Protected() - .Setup>("DeserializeSessionCoreAsync", - ItExpr.IsAny(), - ItExpr.IsAny(), - ItExpr.IsAny()) - .ReturnsAsync(sessionInstance); - agentMock - .Protected() + .ReturnsAsync(new TestAgentSession()); + agentMock.Protected() .Setup>("RunCoreAsync", ItExpr.IsAny>(), ItExpr.IsAny(), ItExpr.IsAny(), ItExpr.IsAny()) - .ReturnsAsync(new AgentResponse([new ChatMessage(ChatRole.Assistant, "Reply")])); - - A2AAgentHandler handler = CreateHandler(agentMock, agentSessionStore: null); + .ThrowsAsync(new InvalidOperationException("Agent failed")); - var context = new RequestContext - { - StreamingResponse = false, - TaskId = "", - ContextId = "ctx-persistent", - Message = new Message - { - MessageId = "test-id", - Role = Role.User, - Parts = [new Part { Text = "Hello" }] - } - }; + using var cts = new CancellationTokenSource(); + A2AAgentHandler handler = CreateHandler(agentMock, agentSessionStore: mockSessionStore.Object); - // Act - call twice with the same context ID - await InvokeExecuteAsync(handler, context); - await InvokeExecuteAsync(handler, context); + // Act + var eventQueue = new AgentEventQueue(); + var events = new EventCollector(); + var readerTask = ReadEventsAsync(eventQueue, events); + await Assert.ThrowsAsync(() => + handler.ExecuteAsync( + new RequestContext + { + StreamingResponse = false, + TaskId = "task-1", ContextId = "ctx-cont", + Message = new Message { MessageId = "empty", Role = Role.User, Parts = [] }, + Task = new AgentTask { Id = "task-1", ContextId = "ctx-cont", History = [new Message { Role = Role.User, Parts = [new Part { Text = "Hello" }] }] } + }, + eventQueue, + cts.Token)); + eventQueue.Complete(null); + await readerTask; - // Assert - CreateSessionCoreAsync should be called once (first call creates, second retrieves from store) - Assert.Equal(1, createSessionCallCount); + // Assert - SaveSessionAsync was called with CancellationToken.None despite the exception + mockSessionStore.Verify( + x => x.SaveSessionAsync( + It.IsAny(), + It.Is(s => s == "ctx-cont"), + It.IsAny(), + It.Is(ct => ct == CancellationToken.None)), + Times.Once); } /// - /// Verifies that when the AllowBackgroundWhen delegate throws, the exception propagates - /// and the agent is not invoked. + /// Verifies that in the non-streaming path, SaveSessionAsync is called with + /// CancellationToken.None rather than the caller's cancellation token. /// [Fact] - public async Task ExecuteAsync_DynamicMode_WhenCallbackThrows_PropagatesExceptionAsync() + public async Task ExecuteAsync_NonStreaming_SavesSessionWithUncancelledTokenAsync() { // Arrange - bool agentInvoked = false; - A2AAgentHandler handler = CreateHandler( - CreateAgentMock(_ => agentInvoked = true), - runMode: AgentRunMode.AllowBackgroundWhen((_, _) => - throw new InvalidOperationException("Callback failed"))); + var mockSessionStore = new Mock(); + mockSessionStore + .Setup(x => x.GetSessionAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new TestAgentSession()); + mockSessionStore + .Setup(x => x.SaveSessionAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(ValueTask.CompletedTask); - // Act & Assert - await Assert.ThrowsAsync(() => - InvokeExecuteAsync(handler, new RequestContext + AgentResponse response = new([new ChatMessage(ChatRole.Assistant, "Reply")]); + A2AAgentHandler handler = CreateHandler(CreateAgentMockWithResponse(response), agentSessionStore: mockSessionStore.Object); + + using var cts = new CancellationTokenSource(); + + // Act + var eventQueue = new AgentEventQueue(); + await handler.ExecuteAsync( + new RequestContext { - TaskId = "", ContextId = "ctx", StreamingResponse = false, Message = new Message { MessageId = "test-id", Role = Role.User, Parts = [new Part { Text = "Hello" }] } - })); + TaskId = "", ContextId = "ctx", StreamingResponse = false, + Message = new Message { MessageId = "test-id", Role = Role.User, Parts = [new Part { Text = "Hello" }] } + }, + eventQueue, + cts.Token); + eventQueue.Complete(null); - Assert.False(agentInvoked); + // Assert - SaveSessionAsync was called with CancellationToken.None, not the caller's token + mockSessionStore.Verify( + x => x.SaveSessionAsync( + It.IsAny(), + It.Is(s => s == "ctx"), + It.IsAny(), + It.Is(ct => ct == CancellationToken.None)), + Times.Once); } /// - /// Verifies that the CancellationToken is propagated to the AllowBackgroundWhen delegate. + /// Verifies that in the streaming path, SaveSessionAsync is called with + /// CancellationToken.None rather than the caller's cancellation token. /// [Fact] - public async Task ExecuteAsync_DynamicMode_CancellationTokenIsPropagatedToCallbackAsync() + public async Task ExecuteAsync_Streaming_SavesSessionWithUncancelledTokenAsync() { // Arrange - CancellationToken capturedToken = default; + var mockSessionStore = new Mock(); + mockSessionStore + .Setup(x => x.GetSessionAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new TestAgentSession()); + mockSessionStore + .Setup(x => x.SaveSessionAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(ValueTask.CompletedTask); + + AgentResponseUpdate[] updates = [new AgentResponseUpdate(ChatRole.Assistant, "chunk") { ResponseId = "r1" }]; + A2AAgentHandler handler = CreateHandler(CreateStreamingAgentMock(updates), agentSessionStore: mockSessionStore.Object); + using var cts = new CancellationTokenSource(); - A2AAgentHandler handler = CreateHandler( - CreateAgentMock(_ => { }), - runMode: AgentRunMode.AllowBackgroundWhen((_, ct) => - { - capturedToken = ct; - return ValueTask.FromResult(false); - })); // Act var eventQueue = new AgentEventQueue(); await handler.ExecuteAsync( new RequestContext { - TaskId = "", ContextId = "ctx", StreamingResponse = false, Message = new Message { MessageId = "test-id", Role = Role.User, Parts = [new Part { Text = "Hello" }] } + TaskId = "", ContextId = "ctx-stream", StreamingResponse = true, + Message = new Message { MessageId = "test-id", Role = Role.User, Parts = [new Part { Text = "Hello" }] } }, eventQueue, cts.Token); eventQueue.Complete(null); - // Assert - Assert.Equal(cts.Token, capturedToken); + // Assert - SaveSessionAsync was called with CancellationToken.None, not the caller's token + mockSessionStore.Verify( + x => x.SaveSessionAsync( + It.IsAny(), + It.Is(s => s == "ctx-stream"), + It.IsAny(), + It.Is(ct => ct == CancellationToken.None)), + Times.Once); } /// - /// Verifies that the agent run mode is applied on the continuation/task-update path, - /// not just the new message path. + /// Verifies that on the continuation path, SaveSessionAsync is called with + /// CancellationToken.None rather than the caller's cancellation token. /// [Fact] - public async Task ExecuteAsync_OnContinuation_RunModeIsAppliedAsync() + public async Task ExecuteAsync_OnContinuation_SavesSessionWithUncancelledTokenAsync() { // Arrange - AgentRunOptions? capturedOptions = null; - A2AAgentHandler handler = CreateHandler( - CreateAgentMock(options => capturedOptions = options), - runMode: AgentRunMode.AllowBackgroundIfSupported); + var mockSessionStore = new Mock(); + mockSessionStore + .Setup(x => x.GetSessionAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new TestAgentSession()); + mockSessionStore + .Setup(x => x.SaveSessionAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(ValueTask.CompletedTask); - // Act - await InvokeExecuteAsync(handler, new RequestContext - { - StreamingResponse = false, - TaskId = "task-1", - ContextId = "ctx-1", - Message = new Message { MessageId = "empty", Role = Role.User, Parts = [] }, + AgentResponse response = new([new ChatMessage(ChatRole.Assistant, "Done!")]); + A2AAgentHandler handler = CreateHandler(CreateAgentMockWithResponse(response), agentSessionStore: mockSessionStore.Object); - Task = new AgentTask { Id = "task-1", ContextId = "ctx-1", History = [new Message { Role = Role.User, Parts = [new Part { Text = "Hello" }] }] } - }); + using var cts = new CancellationTokenSource(); - // Assert - Assert.NotNull(capturedOptions); - Assert.True(capturedOptions.AllowBackgroundResponses); + // Act + var eventQueue = new AgentEventQueue(); + var events = new EventCollector(); + var readerTask = ReadEventsAsync(eventQueue, events); + await handler.ExecuteAsync( + new RequestContext + { + StreamingResponse = false, + TaskId = "task-1", ContextId = "ctx-cont", + Message = new Message { MessageId = "empty", Role = Role.User, Parts = [] }, + Task = new AgentTask { Id = "task-1", ContextId = "ctx-cont", History = [new Message { Role = Role.User, Parts = [new Part { Text = "Hello" }] }] } + }, + eventQueue, + cts.Token); + eventQueue.Complete(null); + await readerTask; + + // Assert - SaveSessionAsync was called with CancellationToken.None, not the caller's token + mockSessionStore.Verify( + x => x.SaveSessionAsync( + It.IsAny(), + It.Is(s => s == "ctx-cont"), + It.IsAny(), + It.Is(ct => ct == CancellationToken.None)), + Times.Once); } private static A2AAgentHandler CreateHandler( @@ -905,6 +1658,68 @@ private static Mock CreateAgentMockWithCallCount( return agentMock; } + private static Mock CreateStreamingAgentMock(IEnumerable updates) + { + Mock agentMock = new() { CallBase = true }; + agentMock.SetupGet(x => x.Name).Returns("TestAgent"); + agentMock + .Protected() + .Setup>("CreateSessionCoreAsync", ItExpr.IsAny()) + .ReturnsAsync(new TestAgentSession()); + agentMock + .Protected() + .Setup>("RunCoreStreamingAsync", + ItExpr.IsAny>(), + ItExpr.IsAny(), + ItExpr.IsAny(), + ItExpr.IsAny()) + .Returns(() => ToAsyncEnumerableAsync(updates)); + + return agentMock; + } + + private static Mock CreateStreamingAgentMockWithOptionsCapture( + Action optionsCallback) + { + Mock agentMock = new() { CallBase = true }; + agentMock.SetupGet(x => x.Name).Returns("TestAgent"); + agentMock + .Protected() + .Setup>("CreateSessionCoreAsync", ItExpr.IsAny()) + .ReturnsAsync(new TestAgentSession()); + agentMock + .Protected() + .Setup>("RunCoreStreamingAsync", + ItExpr.IsAny>(), + ItExpr.IsAny(), + ItExpr.IsAny(), + ItExpr.IsAny()) + .Callback, AgentSession?, AgentRunOptions?, CancellationToken>( + (_, _, options, _) => optionsCallback(options)) + .Returns(() => ToAsyncEnumerableAsync([new AgentResponseUpdate(ChatRole.Assistant, "reply") { ResponseId = "r1" }])); + + return agentMock; + } + + private static async IAsyncEnumerable ToAsyncEnumerableAsync(IEnumerable items) + { + await Task.Yield(); + foreach (var item in items) + { + yield return item; + } + } + + private static async IAsyncEnumerable ToThrowingAsyncEnumerableAsync(Exception exception) + { + await Task.Yield(); + throw exception; + +#pragma warning disable CS0162 // Unreachable code detected - yield is required for async iterator + yield break; +#pragma warning restore CS0162 + } + private static async Task InvokeExecuteAsync(A2AAgentHandler handler, RequestContext context) { var eventQueue = new AgentEventQueue(); diff --git a/dotnet/tests/Microsoft.Agents.AI.Hosting.A2A.UnitTests/Converters/MessageConverterTests.cs b/dotnet/tests/Microsoft.Agents.AI.Hosting.A2A.UnitTests/Converters/MessageConverterTests.cs index e26189e9e5..9c7b398644 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Hosting.A2A.UnitTests/Converters/MessageConverterTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Hosting.A2A.UnitTests/Converters/MessageConverterTests.cs @@ -147,4 +147,67 @@ public void ToParts_WithMultipleMessages_ReturnsAllParts() Assert.Equal("First message", result[0].Text); Assert.Equal("Second message", result[1].Text); } + + [Fact] + public void ToParts_AgentResponseUpdate_WithNoContents_ReturnsEmptyList() + { + // Arrange + var update = new AgentResponseUpdate(); + + // Act + var result = update.ToParts(); + + // Assert + Assert.NotNull(result); + Assert.Empty(result); + } + + [Fact] + public void ToParts_AgentResponseUpdate_WithTextContent_ReturnsTextPart() + { + // Arrange + var update = new AgentResponseUpdate(ChatRole.Assistant, "Hello from streaming!"); + + // Act + var result = update.ToParts(); + + // Assert + Assert.Single(result); + Assert.Equal("Hello from streaming!", result[0].Text); + } + + [Fact] + public void ToParts_AgentResponseUpdate_WithMultipleContents_ReturnsAllParts() + { + // Arrange + var update = new AgentResponseUpdate(ChatRole.Assistant, [ + new TextContent("First chunk"), + new TextContent("Second chunk") + ]); + + // Act + var result = update.ToParts(); + + // Assert + Assert.Equal(2, result.Count); + Assert.Equal("First chunk", result[0].Text); + Assert.Equal("Second chunk", result[1].Text); + } + + [Fact] + public void ToParts_AgentResponseUpdate_WithUnsupportedContent_FiltersOutNulls() + { + // Arrange - FunctionCallContent maps to null Part since it's not a supported A2A content type + var update = new AgentResponseUpdate(ChatRole.Assistant, [ + new TextContent("Supported text"), + new FunctionCallContent("call-1", "myFunction") + ]); + + // Act + var result = update.ToParts(); + + // Assert - only the text part should be returned + Assert.Single(result); + Assert.Equal("Supported text", result[0].Text); + } }