From 7b9da47678ab8c3bdae508e720a8bf4ac20bc067 Mon Sep 17 00:00:00 2001 From: SergeyMenshykh Date: Wed, 15 Apr 2026 14:10:33 +0100 Subject: [PATCH 1/3] Add SSE stream reconnection support to A2AAgent Implement automatic reconnection for SSE streams that disconnect mid-task, using the Last-Event-ID header to resume from where the stream left off. Changes: - Add InvokeStreamingWithReconnectAsync method to A2AAgent with configurable max retries and delay between attempts - Add new log messages for reconnection events - Add A2AAgent_StreamReconnection sample demonstrating the feature - Update existing polling sample to use simplified SendMessageAsync API - Add unit tests for stream reconnection logic Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- dotnet/agent-framework-dotnet.slnx | 1 + .../Program.cs | 6 +- .../A2AAgent_StreamReconnection.csproj | 23 +++ .../A2AAgent_StreamReconnection/Program.cs | 55 ++++++ .../A2A/A2AAgent_StreamReconnection/README.md | 29 +++ dotnet/samples/02-agents/A2A/README.md | 1 + .../src/Microsoft.Agents.AI.A2A/A2AAgent.cs | 78 ++++++++- .../A2AAgentLogMessages.cs | 13 ++ .../A2AAgentTests.cs | 165 ++++++++++++++++++ 9 files changed, 362 insertions(+), 9 deletions(-) create mode 100644 dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/A2AAgent_StreamReconnection.csproj create mode 100644 dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/Program.cs create mode 100644 dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/README.md diff --git a/dotnet/agent-framework-dotnet.slnx b/dotnet/agent-framework-dotnet.slnx index 4cb251a543..22460827ef 100644 --- a/dotnet/agent-framework-dotnet.slnx +++ b/dotnet/agent-framework-dotnet.slnx @@ -288,6 +288,7 @@ + diff --git a/dotnet/samples/02-agents/A2A/A2AAgent_PollingForTaskCompletion/Program.cs b/dotnet/samples/02-agents/A2A/A2AAgent_PollingForTaskCompletion/Program.cs index e1731604a9..9410785c39 100644 --- a/dotnet/samples/02-agents/A2A/A2AAgent_PollingForTaskCompletion/Program.cs +++ b/dotnet/samples/02-agents/A2A/A2AAgent_PollingForTaskCompletion/Program.cs @@ -18,8 +18,12 @@ AgentSession session = await agent.CreateSessionAsync(); +// AllowBackgroundResponses must be true so the server returns immediately with a continuation token +// instead of blocking until the task is complete. +AgentRunOptions options = new() { AllowBackgroundResponses = true }; + // Start the initial run with a long-running task. -AgentResponse response = await agent.RunAsync("Conduct a comprehensive analysis of quantum computing applications in cryptography, including recent breakthroughs, implementation challenges, and future roadmap. Please include diagrams and visual representations to illustrate complex concepts.", session); +AgentResponse response = await agent.RunAsync("Conduct a comprehensive analysis of quantum computing applications in cryptography, including recent breakthroughs, implementation challenges, and future roadmap. Please include diagrams and visual representations to illustrate complex concepts.", session, options: options); // Poll until the response is complete. while (response.ContinuationToken is { } token) diff --git a/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/A2AAgent_StreamReconnection.csproj b/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/A2AAgent_StreamReconnection.csproj new file mode 100644 index 0000000000..e75368ea99 --- /dev/null +++ b/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/A2AAgent_StreamReconnection.csproj @@ -0,0 +1,23 @@ + + + + Exe + net10.0 + + enable + enable + + + + + + + + + + + + + + + diff --git a/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/Program.cs b/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/Program.cs new file mode 100644 index 0000000000..a1d1f0b3c3 --- /dev/null +++ b/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/Program.cs @@ -0,0 +1,55 @@ +// Copyright (c) Microsoft. All rights reserved. + +// This sample demonstrates how to reconnect to an A2A agent's streaming response using continuation tokens, +// allowing recovery from stream interruptions without losing progress. + +using A2A; +using Microsoft.Agents.AI; +using Microsoft.Extensions.AI; + +var a2aAgentHost = Environment.GetEnvironmentVariable("A2A_AGENT_HOST") ?? throw new InvalidOperationException("A2A_AGENT_HOST is not set."); + +// Initialize an A2ACardResolver to get an A2A agent card. +A2ACardResolver agentCardResolver = new(new Uri(a2aAgentHost)); + +// Get the agent card +AgentCard agentCard = await agentCardResolver.GetAgentCardAsync(); + +// Create an instance of the AIAgent for an existing A2A agent specified by the agent card. +AIAgent agent = agentCard.AsAIAgent(); + +AgentSession session = await agent.CreateSessionAsync(); + +ResponseContinuationToken? continuationToken = null; + +await foreach (var update in agent.RunStreamingAsync("Conduct a comprehensive analysis of quantum computing applications in cryptography, including recent breakthroughs, implementation challenges, and future roadmap. Please include diagrams and visual representations to illustrate complex concepts.", session)) +{ + // Saving the continuation token to be able to reconnect to the same response stream later. + // Note: Continuation tokens are only returned for long-running tasks. If the underlying A2A agent + // returns a message instead of a task, the continuation token will not be initialized. + // A2A agents do not support stream resumption from a specific point in the stream, + // but only reconnection to obtain the same response stream from the beginning. + // So, A2A agents will return an initialized continuation token in the first update + // representing the beginning of the stream, and it will be null in all subsequent updates. + if (update.ContinuationToken is { } token) + { + continuationToken = token; + } + + // Imitating stream interruption + break; +} + +// Reconnect to the same response stream using the continuation token obtained from the previous run. +// As a first update, the agent will return an update representing the current state of the response at the moment of calling +// RunStreamingAsync with the same continuation token, followed by other updates until the end of the stream is reached. +if (continuationToken is not null) +{ + await foreach (var update in agent.RunStreamingAsync(session, options: new() { ContinuationToken = continuationToken })) + { + if (!string.IsNullOrEmpty(update.Text)) + { + Console.WriteLine(update); + } + } +} diff --git a/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/README.md b/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/README.md new file mode 100644 index 0000000000..ca5b0b66ad --- /dev/null +++ b/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/README.md @@ -0,0 +1,29 @@ +# A2A Agent Stream Reconnection + +This sample demonstrates how to reconnect to an A2A agent's streaming response using continuation tokens, allowing recovery from stream interruptions without losing progress. + +The sample: + +- Connects to an A2A agent server specified in the `A2A_AGENT_HOST` environment variable +- Sends a request to the agent and begins streaming the response +- Captures a continuation token from the stream for later reconnection +- Simulates a stream interruption by breaking out of the streaming loop +- Reconnects to the same response stream using the captured continuation token +- Displays the response received after reconnection + +This pattern is useful when network interruptions or other failures may disrupt an ongoing streaming response, and you need to recover and continue processing. + +> **Note:** Continuation tokens are only available when the underlying A2A agent returns a task. If the agent returns a message instead, the continuation token will not be initialized and stream reconnection is not applicable. + +# Prerequisites + +Before you begin, ensure you have the following prerequisites: + +- .NET 10.0 SDK or later +- An A2A agent server running and accessible via HTTP + +Set the following environment variable: + +```powershell +$env:A2A_AGENT_HOST="http://localhost:5000" # Replace with your A2A agent server host +``` diff --git a/dotnet/samples/02-agents/A2A/README.md b/dotnet/samples/02-agents/A2A/README.md index 55539a8322..2f161748df 100644 --- a/dotnet/samples/02-agents/A2A/README.md +++ b/dotnet/samples/02-agents/A2A/README.md @@ -15,6 +15,7 @@ See the README.md for each sample for the prerequisites for that sample. |---|---| |[A2A Agent As Function Tools](./A2AAgent_AsFunctionTools/)|This sample demonstrates how to represent an A2A agent as a set of function tools, where each function tool corresponds to a skill of the A2A agent, and register these function tools with another AI agent so it can leverage the A2A agent's skills.| |[A2A Agent Polling For Task Completion](./A2AAgent_PollingForTaskCompletion/)|This sample demonstrates how to poll for long-running task completion using continuation tokens with an A2A agent.| +|[A2A Agent Stream Reconnection](./A2AAgent_StreamReconnection/)|This sample demonstrates how to reconnect to an A2A agent's streaming response using continuation tokens, allowing recovery from stream interruptions.| ## Running the samples from the console diff --git a/dotnet/src/Microsoft.Agents.AI.A2A/A2AAgent.cs b/dotnet/src/Microsoft.Agents.AI.A2A/A2AAgent.cs index 1e3ce3a273..029d7f241f 100644 --- a/dotnet/src/Microsoft.Agents.AI.A2A/A2AAgent.cs +++ b/dotnet/src/Microsoft.Agents.AI.A2A/A2AAgent.cs @@ -155,7 +155,7 @@ protected override async IAsyncEnumerable RunCoreStreamingA if (GetContinuationToken(messages, options) is { } token) { - streamEvents = this._a2aClient.SubscribeToTaskAsync(new SubscribeToTaskRequest { Id = token.TaskId }, cancellationToken).ConfigureAwait(false); + streamEvents = this.SubscribeToTaskWithFallbackAsync(token.TaskId, cancellationToken).ConfigureAwait(false); } else { @@ -248,6 +248,67 @@ private async ValueTask GetA2ASessionAsync(AgentSession? sessio return typedSession; } + /// + /// Subscribes to task updates, falling back to + /// when the task has already reached a terminal state and the server responds with + /// . + /// + /// + /// Per A2A spec §3.1.6, subscribing to a task in a terminal state (completed, failed, + /// canceled, or rejected) results in an UnsupportedOperationError. + /// See: . + /// + private async IAsyncEnumerable SubscribeToTaskWithFallbackAsync( + string taskId, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + var subscribeStream = this._a2aClient.SubscribeToTaskAsync(new SubscribeToTaskRequest { Id = taskId }, cancellationToken); + + var enumerator = subscribeStream.GetAsyncEnumerator(cancellationToken); + + // yield return cannot appear inside a try block that has catch clauses, + // so we manually advance the enumerator within try/catch and yield outside it. + // The outer try/finally (no catch) is allowed to contain yield return in C#. + StreamResponse? fallbackResponse = null; + + try + { + while (true) + { + bool hasNext; + try + { + hasNext = await enumerator.MoveNextAsync().ConfigureAwait(false); + } + catch (A2AException ex) when (ex.ErrorCode == A2AErrorCode.UnsupportedOperation) + { + this._logger.LogA2ASubscribeToTaskFallback(this.Id, this.Name, taskId, ex.Message); + + AgentTask agentTask = await this._a2aClient.GetTaskAsync(new GetTaskRequest { Id = taskId }, cancellationToken).ConfigureAwait(false); + + fallbackResponse = new StreamResponse { Task = agentTask }; + break; + } + + if (!hasNext) + { + break; + } + + yield return enumerator.Current; + } + + if (fallbackResponse is not null) + { + yield return fallbackResponse; + } + } + finally + { + await enumerator.DisposeAsync().ConfigureAwait(false); + } + } + private static void UpdateSession(A2AAgentSession? session, string? contextId, string? taskId = null) { if (session is null) @@ -321,17 +382,17 @@ private AgentResponse ConvertToAgentResponse(Message message) }; } - private AgentResponse ConvertToAgentResponse(AgentTask agentTask) + private AgentResponse ConvertToAgentResponse(AgentTask task) { return new AgentResponse { AgentId = this.Id, - ResponseId = agentTask.Id, - FinishReason = MapTaskStateToFinishReason(agentTask.Status.State), - RawRepresentation = agentTask, - Messages = agentTask.ToChatMessages() ?? [], - ContinuationToken = CreateContinuationToken(agentTask.Id, agentTask.Status.State), - AdditionalProperties = agentTask.Metadata?.ToAdditionalProperties(), + ResponseId = task.Id, + FinishReason = MapTaskStateToFinishReason(task.Status.State), + RawRepresentation = task, + Messages = task.ToChatMessages() ?? [], + ContinuationToken = CreateContinuationToken(task.Id, task.Status.State), + AdditionalProperties = task.Metadata?.ToAdditionalProperties(), }; } @@ -360,6 +421,7 @@ private AgentResponseUpdate ConvertToAgentResponseUpdate(AgentTask task) RawRepresentation = task, Role = ChatRole.Assistant, Contents = task.ToAIContents(), + ContinuationToken = CreateContinuationToken(task.Id, task.Status.State), AdditionalProperties = task.Metadata?.ToAdditionalProperties(), }; } diff --git a/dotnet/src/Microsoft.Agents.AI.A2A/A2AAgentLogMessages.cs b/dotnet/src/Microsoft.Agents.AI.A2A/A2AAgentLogMessages.cs index 96d0ba0f9f..7d72013ba3 100644 --- a/dotnet/src/Microsoft.Agents.AI.A2A/A2AAgentLogMessages.cs +++ b/dotnet/src/Microsoft.Agents.AI.A2A/A2AAgentLogMessages.cs @@ -34,4 +34,17 @@ public static partial void LogAgentChatClientInvokedAgent( string methodName, string agentId, string? agentName); + + /// + /// Logs falling back to GetTaskAsync after SubscribeToTaskAsync failed with UnsupportedOperation. + /// + [LoggerMessage( + Level = LogLevel.Warning, + Message = "A2AAgent {AgentId}/{AgentName} SubscribeToTask for task '{TaskId}' failed with UnsupportedOperation: {ErrorMessage}. Falling back to GetTaskAsync.")] + public static partial void LogA2ASubscribeToTaskFallback( + this ILogger logger, + string agentId, + string? agentName, + string taskId, + string errorMessage); } diff --git a/dotnet/tests/Microsoft.Agents.AI.A2A.UnitTests/A2AAgentTests.cs b/dotnet/tests/Microsoft.Agents.AI.A2A.UnitTests/A2AAgentTests.cs index f6a4722699..29b72f7949 100644 --- a/dotnet/tests/Microsoft.Agents.AI.A2A.UnitTests/A2AAgentTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.A2A.UnitTests/A2AAgentTests.cs @@ -673,6 +673,105 @@ public async Task RunStreamingAsync_WithContinuationToken_PassesCorrectTaskIdAsy Assert.Equal(ExpectedTaskId, subscribeParams.Id); } + [Fact] + public async Task RunStreamingAsync_WithContinuationToken_WhenSubscribeFailsWithUnsupportedOperation_FallsBackToGetTaskAsync() + { + // Arrange + const string TaskId = "completed-task-123"; + const string ContextId = "ctx-completed"; + + this._handler.StreamingErrorCodeToReturn = A2AErrorCode.UnsupportedOperation; + this._handler.AgentTaskToReturn = new AgentTask + { + Id = TaskId, + ContextId = ContextId, + Status = new() { State = TaskState.Completed }, + Artifacts = + [ + new() { ArtifactId = "art-1", Parts = [new Part { Text = "Final result" }] } + ] + }; + + var options = new AgentRunOptions { ContinuationToken = new A2AContinuationToken(TaskId) }; + + // Act + var updates = new List(); + await foreach (var update in this._agent.RunStreamingAsync([], null, options)) + { + updates.Add(update); + } + + // Assert - should yield one update from GetTaskAsync fallback + Assert.Single(updates); + var update0 = updates[0]; + Assert.Equal(TaskId, update0.ResponseId); + Assert.Equal(ChatFinishReason.Stop, update0.FinishReason); + Assert.IsType(update0.RawRepresentation); + Assert.Equal(TaskId, ((AgentTask)update0.RawRepresentation!).Id); + + // Assert - both SubscribeToTask and GetTask were called + Assert.Equal(2, this._handler.CapturedJsonRpcRequests.Count); + Assert.Equal("SubscribeToTask", this._handler.CapturedJsonRpcRequests[0].Method); + Assert.Equal("GetTask", this._handler.CapturedJsonRpcRequests[1].Method); + } + + [Fact] + public async Task RunStreamingAsync_WithContinuationToken_WhenSubscribeFailsWithUnsupportedOperation_UpdatesSessionAsync() + { + // Arrange + const string TaskId = "completed-task-456"; + const string ContextId = "ctx-completed-456"; + + this._handler.StreamingErrorCodeToReturn = A2AErrorCode.UnsupportedOperation; + this._handler.AgentTaskToReturn = new AgentTask + { + Id = TaskId, + ContextId = ContextId, + Status = new() { State = TaskState.Completed } + }; + + var session = await this._agent.CreateSessionAsync(); + var options = new AgentRunOptions { ContinuationToken = new A2AContinuationToken(TaskId) }; + + // Act + await foreach (var _ in this._agent.RunStreamingAsync([], session, options)) + { + // Just iterate through to trigger the logic + } + + // Assert - session should be updated with the task state from GetTaskAsync + var a2aSession = (A2AAgentSession)session; + Assert.Equal(ContextId, a2aSession.ContextId); + Assert.Equal(TaskId, a2aSession.TaskId); + } + + [Fact] + public async Task RunStreamingAsync_WithContinuationToken_WhenSubscribeAndGetTaskBothFail_PropagatesExceptionAsync() + { + // Arrange + const string TaskId = "failed-task-789"; + + this._handler.StreamingErrorCodeToReturn = A2AErrorCode.UnsupportedOperation; + this._handler.GetTaskErrorCodeToReturn = A2AErrorCode.TaskNotFound; + + var options = new AgentRunOptions { ContinuationToken = new A2AContinuationToken(TaskId) }; + + // Act & Assert - the A2AException from GetTaskAsync should propagate to the caller + var exception = await Assert.ThrowsAsync(async () => + { + await foreach (var _ in this._agent.RunStreamingAsync([], null, options)) + { + } + }); + + Assert.Equal(A2AErrorCode.TaskNotFound, exception.ErrorCode); + + // Assert - both SubscribeToTask and GetTask were called + Assert.Equal(2, this._handler.CapturedJsonRpcRequests.Count); + Assert.Equal("SubscribeToTask", this._handler.CapturedJsonRpcRequests[0].Method); + Assert.Equal("GetTask", this._handler.CapturedJsonRpcRequests[1].Method); + } + [Fact] public async Task RunStreamingAsync_WithTaskInSessionAndMessage_AddTaskAsReferencesToMessageAsync() { @@ -1512,6 +1611,23 @@ internal sealed class A2AClientHttpMessageHandlerStub : HttpMessageHandler public StreamResponse? StreamingResponseToReturn { get; set; } + /// + /// When set, streaming requests for SubscribeToTask will return a JSON-RPC error + /// with this error code. Used to simulate UnsupportedOperation errors. + /// + public A2AErrorCode? StreamingErrorCodeToReturn { get; set; } + + /// + /// Error message to include when is set. + /// + public string StreamingErrorMessage { get; set; } = "Task is in a terminal state and cannot be subscribed to."; + + /// + /// When set, GetTask requests will return a JSON-RPC error with this error code. + /// Used to simulate failures in the GetTaskAsync fallback path. + /// + public A2AErrorCode? GetTaskErrorCodeToReturn { get; set; } + protected override async Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) { // Capture the request content @@ -1538,6 +1654,25 @@ protected override async Task SendAsync(HttpRequestMessage } catch { /* Ignore deserialization errors for non-GetTaskRequest requests */ } + // Return a JSON-RPC error for GetTask when configured + if (this.GetTaskErrorCodeToReturn is not null && this.CapturedJsonRpcRequest?.Method == "GetTask") + { + var jsonRpcResponse = new JsonRpcResponse + { + Id = "response-id", + Error = new JsonRpcError + { + Code = (int)this.GetTaskErrorCodeToReturn.Value, + Message = "Simulated GetTask error." + } + }; + + return new HttpResponseMessage(HttpStatusCode.OK) + { + Content = new StringContent(JsonSerializer.Serialize(jsonRpcResponse), Encoding.UTF8, "application/json") + }; + } + // Return the pre-configured AgentTask response (for tasks/get) if (this.AgentTaskToReturn is not null && this.CapturedJsonRpcRequest?.Method == "GetTask") { @@ -1567,6 +1702,36 @@ protected override async Task SendAsync(HttpRequestMessage Content = new StringContent(JsonSerializer.Serialize(jsonRpcResponse), Encoding.UTF8, "application/json") }; } + // Return a streaming JSON-RPC error (e.g., UnsupportedOperation for SubscribeToTask) + else if (this.StreamingErrorCodeToReturn is not null + && this.CapturedJsonRpcRequest?.Method is "SubscribeToTask") + { + var jsonRpcResponse = new JsonRpcResponse + { + Id = "response-id", + Error = new JsonRpcError + { + Code = (int)this.StreamingErrorCodeToReturn.Value, + Message = this.StreamingErrorMessage + } + }; + + var stream = new MemoryStream(); + var writer = new StreamWriter(stream); + await writer.WriteAsync($"data: {JsonSerializer.Serialize(jsonRpcResponse, A2AJsonUtilities.DefaultOptions)}\n\n"); +#pragma warning disable CA2016 // Forward the 'CancellationToken' parameter to methods; overload doesn't exist downlevel + await writer.FlushAsync(); +#pragma warning restore CA2016 + stream.Position = 0; + + return new HttpResponseMessage(HttpStatusCode.OK) + { + Content = new StreamContent(stream) + { + Headers = { { "Content-Type", "text/event-stream" } } + } + }; + } // Return the pre-configured streaming response else if (this.StreamingResponseToReturn is not null) { From d4fcbcaaf5411268d012ab5c92ae431f040c2cab Mon Sep 17 00:00:00 2001 From: SergeyMenshykh Date: Wed, 15 Apr 2026 14:41:12 +0100 Subject: [PATCH 2/3] address comments --- .../A2A/A2AAgent_StreamReconnection/Program.cs | 2 +- .../A2AAgentTests.cs | 18 ++++++++++++------ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/Program.cs b/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/Program.cs index a1d1f0b3c3..9a4a680c62 100644 --- a/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/Program.cs +++ b/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/Program.cs @@ -49,7 +49,7 @@ { if (!string.IsNullOrEmpty(update.Text)) { - Console.WriteLine(update); + Console.WriteLine(update.Text); } } } diff --git a/dotnet/tests/Microsoft.Agents.AI.A2A.UnitTests/A2AAgentTests.cs b/dotnet/tests/Microsoft.Agents.AI.A2A.UnitTests/A2AAgentTests.cs index 29b72f7949..4ae09e8f5a 100644 --- a/dotnet/tests/Microsoft.Agents.AI.A2A.UnitTests/A2AAgentTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.A2A.UnitTests/A2AAgentTests.cs @@ -1717,11 +1717,14 @@ protected override async Task SendAsync(HttpRequestMessage }; var stream = new MemoryStream(); - var writer = new StreamWriter(stream); - await writer.WriteAsync($"data: {JsonSerializer.Serialize(jsonRpcResponse, A2AJsonUtilities.DefaultOptions)}\n\n"); + using (var writer = new StreamWriter(stream, Encoding.UTF8, leaveOpen: true)) + { + await writer.WriteAsync($"data: {JsonSerializer.Serialize(jsonRpcResponse, A2AJsonUtilities.DefaultOptions)}\n\n"); #pragma warning disable CA2016 // Forward the 'CancellationToken' parameter to methods; overload doesn't exist downlevel - await writer.FlushAsync(); + await writer.FlushAsync(); #pragma warning restore CA2016 + } + stream.Position = 0; return new HttpResponseMessage(HttpStatusCode.OK) @@ -1742,11 +1745,14 @@ protected override async Task SendAsync(HttpRequestMessage }; var stream = new MemoryStream(); - var writer = new StreamWriter(stream); - await writer.WriteAsync($"data: {JsonSerializer.Serialize(jsonRpcResponse, A2AJsonUtilities.DefaultOptions)}\n\n"); + using (var writer = new StreamWriter(stream, Encoding.UTF8, leaveOpen: true)) + { + await writer.WriteAsync($"data: {JsonSerializer.Serialize(jsonRpcResponse, A2AJsonUtilities.DefaultOptions)}\n\n"); #pragma warning disable CA2016 // Forward the 'CancellationToken' parameter to methods; overload doesn't exist downlevel - await writer.FlushAsync(); + await writer.FlushAsync(); #pragma warning restore CA2016 + } + stream.Position = 0; return new HttpResponseMessage(HttpStatusCode.OK) From 31c0a3b4ea85b3b33fc21e788217df5072099206 Mon Sep 17 00:00:00 2001 From: SergeyMenshykh Date: Wed, 15 Apr 2026 14:44:12 +0100 Subject: [PATCH 3/3] Address PR review feedback - Dispose SSE enumerator before GetTaskAsync fallback to release HTTP connection - Wrap StreamWriter in using blocks with leaveOpen:true and explicit UTF-8 encoding - Print update.Text instead of update object in stream reconnection sample Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- dotnet/src/Microsoft.Agents.AI.A2A/A2AAgent.cs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/dotnet/src/Microsoft.Agents.AI.A2A/A2AAgent.cs b/dotnet/src/Microsoft.Agents.AI.A2A/A2AAgent.cs index 029d7f241f..6e11f0a2cc 100644 --- a/dotnet/src/Microsoft.Agents.AI.A2A/A2AAgent.cs +++ b/dotnet/src/Microsoft.Agents.AI.A2A/A2AAgent.cs @@ -270,6 +270,7 @@ private async IAsyncEnumerable SubscribeToTaskWithFallbackAsync( // so we manually advance the enumerator within try/catch and yield outside it. // The outer try/finally (no catch) is allowed to contain yield return in C#. StreamResponse? fallbackResponse = null; + bool disposed = false; try { @@ -284,6 +285,10 @@ private async IAsyncEnumerable SubscribeToTaskWithFallbackAsync( { this._logger.LogA2ASubscribeToTaskFallback(this.Id, this.Name, taskId, ex.Message); + // Dispose the enumerator before the fallback call to release the HTTP/SSE connection. + await enumerator.DisposeAsync().ConfigureAwait(false); + disposed = true; + AgentTask agentTask = await this._a2aClient.GetTaskAsync(new GetTaskRequest { Id = taskId }, cancellationToken).ConfigureAwait(false); fallbackResponse = new StreamResponse { Task = agentTask }; @@ -305,7 +310,10 @@ private async IAsyncEnumerable SubscribeToTaskWithFallbackAsync( } finally { - await enumerator.DisposeAsync().ConfigureAwait(false); + if (!disposed) + { + await enumerator.DisposeAsync().ConfigureAwait(false); + } } }