diff --git a/dotnet/agent-framework-dotnet.slnx b/dotnet/agent-framework-dotnet.slnx
index 4cb251a543..22460827ef 100644
--- a/dotnet/agent-framework-dotnet.slnx
+++ b/dotnet/agent-framework-dotnet.slnx
@@ -288,6 +288,7 @@
+
diff --git a/dotnet/samples/02-agents/A2A/A2AAgent_PollingForTaskCompletion/Program.cs b/dotnet/samples/02-agents/A2A/A2AAgent_PollingForTaskCompletion/Program.cs
index e1731604a9..9410785c39 100644
--- a/dotnet/samples/02-agents/A2A/A2AAgent_PollingForTaskCompletion/Program.cs
+++ b/dotnet/samples/02-agents/A2A/A2AAgent_PollingForTaskCompletion/Program.cs
@@ -18,8 +18,12 @@
AgentSession session = await agent.CreateSessionAsync();
+// AllowBackgroundResponses must be true so the server returns immediately with a continuation token
+// instead of blocking until the task is complete.
+AgentRunOptions options = new() { AllowBackgroundResponses = true };
+
// Start the initial run with a long-running task.
-AgentResponse response = await agent.RunAsync("Conduct a comprehensive analysis of quantum computing applications in cryptography, including recent breakthroughs, implementation challenges, and future roadmap. Please include diagrams and visual representations to illustrate complex concepts.", session);
+AgentResponse response = await agent.RunAsync("Conduct a comprehensive analysis of quantum computing applications in cryptography, including recent breakthroughs, implementation challenges, and future roadmap. Please include diagrams and visual representations to illustrate complex concepts.", session, options: options);
// Poll until the response is complete.
while (response.ContinuationToken is { } token)
diff --git a/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/A2AAgent_StreamReconnection.csproj b/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/A2AAgent_StreamReconnection.csproj
new file mode 100644
index 0000000000..e75368ea99
--- /dev/null
+++ b/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/A2AAgent_StreamReconnection.csproj
@@ -0,0 +1,23 @@
+
+
+
+ Exe
+ net10.0
+
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/Program.cs b/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/Program.cs
new file mode 100644
index 0000000000..9a4a680c62
--- /dev/null
+++ b/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/Program.cs
@@ -0,0 +1,55 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+// This sample demonstrates how to reconnect to an A2A agent's streaming response using continuation tokens,
+// allowing recovery from stream interruptions without losing progress.
+
+using A2A;
+using Microsoft.Agents.AI;
+using Microsoft.Extensions.AI;
+
+var a2aAgentHost = Environment.GetEnvironmentVariable("A2A_AGENT_HOST") ?? throw new InvalidOperationException("A2A_AGENT_HOST is not set.");
+
+// Initialize an A2ACardResolver to get an A2A agent card.
+A2ACardResolver agentCardResolver = new(new Uri(a2aAgentHost));
+
+// Get the agent card
+AgentCard agentCard = await agentCardResolver.GetAgentCardAsync();
+
+// Create an instance of the AIAgent for an existing A2A agent specified by the agent card.
+AIAgent agent = agentCard.AsAIAgent();
+
+AgentSession session = await agent.CreateSessionAsync();
+
+ResponseContinuationToken? continuationToken = null;
+
+await foreach (var update in agent.RunStreamingAsync("Conduct a comprehensive analysis of quantum computing applications in cryptography, including recent breakthroughs, implementation challenges, and future roadmap. Please include diagrams and visual representations to illustrate complex concepts.", session))
+{
+ // Saving the continuation token to be able to reconnect to the same response stream later.
+ // Note: Continuation tokens are only returned for long-running tasks. If the underlying A2A agent
+ // returns a message instead of a task, the continuation token will not be initialized.
+ // A2A agents do not support stream resumption from a specific point in the stream,
+ // but only reconnection to obtain the same response stream from the beginning.
+ // So, A2A agents will return an initialized continuation token in the first update
+ // representing the beginning of the stream, and it will be null in all subsequent updates.
+ if (update.ContinuationToken is { } token)
+ {
+ continuationToken = token;
+ }
+
+ // Imitating stream interruption
+ break;
+}
+
+// Reconnect to the same response stream using the continuation token obtained from the previous run.
+// As a first update, the agent will return an update representing the current state of the response at the moment of calling
+// RunStreamingAsync with the same continuation token, followed by other updates until the end of the stream is reached.
+if (continuationToken is not null)
+{
+ await foreach (var update in agent.RunStreamingAsync(session, options: new() { ContinuationToken = continuationToken }))
+ {
+ if (!string.IsNullOrEmpty(update.Text))
+ {
+ Console.WriteLine(update.Text);
+ }
+ }
+}
diff --git a/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/README.md b/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/README.md
new file mode 100644
index 0000000000..ca5b0b66ad
--- /dev/null
+++ b/dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/README.md
@@ -0,0 +1,29 @@
+# A2A Agent Stream Reconnection
+
+This sample demonstrates how to reconnect to an A2A agent's streaming response using continuation tokens, allowing recovery from stream interruptions without losing progress.
+
+The sample:
+
+- Connects to an A2A agent server specified in the `A2A_AGENT_HOST` environment variable
+- Sends a request to the agent and begins streaming the response
+- Captures a continuation token from the stream for later reconnection
+- Simulates a stream interruption by breaking out of the streaming loop
+- Reconnects to the same response stream using the captured continuation token
+- Displays the response received after reconnection
+
+This pattern is useful when network interruptions or other failures may disrupt an ongoing streaming response, and you need to recover and continue processing.
+
+> **Note:** Continuation tokens are only available when the underlying A2A agent returns a task. If the agent returns a message instead, the continuation token will not be initialized and stream reconnection is not applicable.
+
+# Prerequisites
+
+Before you begin, ensure you have the following prerequisites:
+
+- .NET 10.0 SDK or later
+- An A2A agent server running and accessible via HTTP
+
+Set the following environment variable:
+
+```powershell
+$env:A2A_AGENT_HOST="http://localhost:5000" # Replace with your A2A agent server host
+```
diff --git a/dotnet/samples/02-agents/A2A/README.md b/dotnet/samples/02-agents/A2A/README.md
index 55539a8322..2f161748df 100644
--- a/dotnet/samples/02-agents/A2A/README.md
+++ b/dotnet/samples/02-agents/A2A/README.md
@@ -15,6 +15,7 @@ See the README.md for each sample for the prerequisites for that sample.
|---|---|
|[A2A Agent As Function Tools](./A2AAgent_AsFunctionTools/)|This sample demonstrates how to represent an A2A agent as a set of function tools, where each function tool corresponds to a skill of the A2A agent, and register these function tools with another AI agent so it can leverage the A2A agent's skills.|
|[A2A Agent Polling For Task Completion](./A2AAgent_PollingForTaskCompletion/)|This sample demonstrates how to poll for long-running task completion using continuation tokens with an A2A agent.|
+|[A2A Agent Stream Reconnection](./A2AAgent_StreamReconnection/)|This sample demonstrates how to reconnect to an A2A agent's streaming response using continuation tokens, allowing recovery from stream interruptions.|
## Running the samples from the console
diff --git a/dotnet/src/Microsoft.Agents.AI.A2A/A2AAgent.cs b/dotnet/src/Microsoft.Agents.AI.A2A/A2AAgent.cs
index 1e3ce3a273..6e11f0a2cc 100644
--- a/dotnet/src/Microsoft.Agents.AI.A2A/A2AAgent.cs
+++ b/dotnet/src/Microsoft.Agents.AI.A2A/A2AAgent.cs
@@ -155,7 +155,7 @@ protected override async IAsyncEnumerable RunCoreStreamingA
if (GetContinuationToken(messages, options) is { } token)
{
- streamEvents = this._a2aClient.SubscribeToTaskAsync(new SubscribeToTaskRequest { Id = token.TaskId }, cancellationToken).ConfigureAwait(false);
+ streamEvents = this.SubscribeToTaskWithFallbackAsync(token.TaskId, cancellationToken).ConfigureAwait(false);
}
else
{
@@ -248,6 +248,75 @@ private async ValueTask GetA2ASessionAsync(AgentSession? sessio
return typedSession;
}
+ ///
+ /// Subscribes to task updates, falling back to
+ /// when the task has already reached a terminal state and the server responds with
+ /// .
+ ///
+ ///
+ /// Per A2A spec §3.1.6, subscribing to a task in a terminal state (completed, failed,
+ /// canceled, or rejected) results in an UnsupportedOperationError.
+ /// See: .
+ ///
+ private async IAsyncEnumerable SubscribeToTaskWithFallbackAsync(
+ string taskId,
+ [EnumeratorCancellation] CancellationToken cancellationToken)
+ {
+ var subscribeStream = this._a2aClient.SubscribeToTaskAsync(new SubscribeToTaskRequest { Id = taskId }, cancellationToken);
+
+ var enumerator = subscribeStream.GetAsyncEnumerator(cancellationToken);
+
+ // yield return cannot appear inside a try block that has catch clauses,
+ // so we manually advance the enumerator within try/catch and yield outside it.
+ // The outer try/finally (no catch) is allowed to contain yield return in C#.
+ StreamResponse? fallbackResponse = null;
+ bool disposed = false;
+
+ try
+ {
+ while (true)
+ {
+ bool hasNext;
+ try
+ {
+ hasNext = await enumerator.MoveNextAsync().ConfigureAwait(false);
+ }
+ catch (A2AException ex) when (ex.ErrorCode == A2AErrorCode.UnsupportedOperation)
+ {
+ this._logger.LogA2ASubscribeToTaskFallback(this.Id, this.Name, taskId, ex.Message);
+
+ // Dispose the enumerator before the fallback call to release the HTTP/SSE connection.
+ await enumerator.DisposeAsync().ConfigureAwait(false);
+ disposed = true;
+
+ AgentTask agentTask = await this._a2aClient.GetTaskAsync(new GetTaskRequest { Id = taskId }, cancellationToken).ConfigureAwait(false);
+
+ fallbackResponse = new StreamResponse { Task = agentTask };
+ break;
+ }
+
+ if (!hasNext)
+ {
+ break;
+ }
+
+ yield return enumerator.Current;
+ }
+
+ if (fallbackResponse is not null)
+ {
+ yield return fallbackResponse;
+ }
+ }
+ finally
+ {
+ if (!disposed)
+ {
+ await enumerator.DisposeAsync().ConfigureAwait(false);
+ }
+ }
+ }
+
private static void UpdateSession(A2AAgentSession? session, string? contextId, string? taskId = null)
{
if (session is null)
@@ -321,17 +390,17 @@ private AgentResponse ConvertToAgentResponse(Message message)
};
}
- private AgentResponse ConvertToAgentResponse(AgentTask agentTask)
+ private AgentResponse ConvertToAgentResponse(AgentTask task)
{
return new AgentResponse
{
AgentId = this.Id,
- ResponseId = agentTask.Id,
- FinishReason = MapTaskStateToFinishReason(agentTask.Status.State),
- RawRepresentation = agentTask,
- Messages = agentTask.ToChatMessages() ?? [],
- ContinuationToken = CreateContinuationToken(agentTask.Id, agentTask.Status.State),
- AdditionalProperties = agentTask.Metadata?.ToAdditionalProperties(),
+ ResponseId = task.Id,
+ FinishReason = MapTaskStateToFinishReason(task.Status.State),
+ RawRepresentation = task,
+ Messages = task.ToChatMessages() ?? [],
+ ContinuationToken = CreateContinuationToken(task.Id, task.Status.State),
+ AdditionalProperties = task.Metadata?.ToAdditionalProperties(),
};
}
@@ -360,6 +429,7 @@ private AgentResponseUpdate ConvertToAgentResponseUpdate(AgentTask task)
RawRepresentation = task,
Role = ChatRole.Assistant,
Contents = task.ToAIContents(),
+ ContinuationToken = CreateContinuationToken(task.Id, task.Status.State),
AdditionalProperties = task.Metadata?.ToAdditionalProperties(),
};
}
diff --git a/dotnet/src/Microsoft.Agents.AI.A2A/A2AAgentLogMessages.cs b/dotnet/src/Microsoft.Agents.AI.A2A/A2AAgentLogMessages.cs
index 96d0ba0f9f..7d72013ba3 100644
--- a/dotnet/src/Microsoft.Agents.AI.A2A/A2AAgentLogMessages.cs
+++ b/dotnet/src/Microsoft.Agents.AI.A2A/A2AAgentLogMessages.cs
@@ -34,4 +34,17 @@ public static partial void LogAgentChatClientInvokedAgent(
string methodName,
string agentId,
string? agentName);
+
+ ///
+ /// Logs falling back to GetTaskAsync after SubscribeToTaskAsync failed with UnsupportedOperation.
+ ///
+ [LoggerMessage(
+ Level = LogLevel.Warning,
+ Message = "A2AAgent {AgentId}/{AgentName} SubscribeToTask for task '{TaskId}' failed with UnsupportedOperation: {ErrorMessage}. Falling back to GetTaskAsync.")]
+ public static partial void LogA2ASubscribeToTaskFallback(
+ this ILogger logger,
+ string agentId,
+ string? agentName,
+ string taskId,
+ string errorMessage);
}
diff --git a/dotnet/tests/Microsoft.Agents.AI.A2A.UnitTests/A2AAgentTests.cs b/dotnet/tests/Microsoft.Agents.AI.A2A.UnitTests/A2AAgentTests.cs
index f6a4722699..4ae09e8f5a 100644
--- a/dotnet/tests/Microsoft.Agents.AI.A2A.UnitTests/A2AAgentTests.cs
+++ b/dotnet/tests/Microsoft.Agents.AI.A2A.UnitTests/A2AAgentTests.cs
@@ -673,6 +673,105 @@ public async Task RunStreamingAsync_WithContinuationToken_PassesCorrectTaskIdAsy
Assert.Equal(ExpectedTaskId, subscribeParams.Id);
}
+ [Fact]
+ public async Task RunStreamingAsync_WithContinuationToken_WhenSubscribeFailsWithUnsupportedOperation_FallsBackToGetTaskAsync()
+ {
+ // Arrange
+ const string TaskId = "completed-task-123";
+ const string ContextId = "ctx-completed";
+
+ this._handler.StreamingErrorCodeToReturn = A2AErrorCode.UnsupportedOperation;
+ this._handler.AgentTaskToReturn = new AgentTask
+ {
+ Id = TaskId,
+ ContextId = ContextId,
+ Status = new() { State = TaskState.Completed },
+ Artifacts =
+ [
+ new() { ArtifactId = "art-1", Parts = [new Part { Text = "Final result" }] }
+ ]
+ };
+
+ var options = new AgentRunOptions { ContinuationToken = new A2AContinuationToken(TaskId) };
+
+ // Act
+ var updates = new List();
+ await foreach (var update in this._agent.RunStreamingAsync([], null, options))
+ {
+ updates.Add(update);
+ }
+
+ // Assert - should yield one update from GetTaskAsync fallback
+ Assert.Single(updates);
+ var update0 = updates[0];
+ Assert.Equal(TaskId, update0.ResponseId);
+ Assert.Equal(ChatFinishReason.Stop, update0.FinishReason);
+ Assert.IsType(update0.RawRepresentation);
+ Assert.Equal(TaskId, ((AgentTask)update0.RawRepresentation!).Id);
+
+ // Assert - both SubscribeToTask and GetTask were called
+ Assert.Equal(2, this._handler.CapturedJsonRpcRequests.Count);
+ Assert.Equal("SubscribeToTask", this._handler.CapturedJsonRpcRequests[0].Method);
+ Assert.Equal("GetTask", this._handler.CapturedJsonRpcRequests[1].Method);
+ }
+
+ [Fact]
+ public async Task RunStreamingAsync_WithContinuationToken_WhenSubscribeFailsWithUnsupportedOperation_UpdatesSessionAsync()
+ {
+ // Arrange
+ const string TaskId = "completed-task-456";
+ const string ContextId = "ctx-completed-456";
+
+ this._handler.StreamingErrorCodeToReturn = A2AErrorCode.UnsupportedOperation;
+ this._handler.AgentTaskToReturn = new AgentTask
+ {
+ Id = TaskId,
+ ContextId = ContextId,
+ Status = new() { State = TaskState.Completed }
+ };
+
+ var session = await this._agent.CreateSessionAsync();
+ var options = new AgentRunOptions { ContinuationToken = new A2AContinuationToken(TaskId) };
+
+ // Act
+ await foreach (var _ in this._agent.RunStreamingAsync([], session, options))
+ {
+ // Just iterate through to trigger the logic
+ }
+
+ // Assert - session should be updated with the task state from GetTaskAsync
+ var a2aSession = (A2AAgentSession)session;
+ Assert.Equal(ContextId, a2aSession.ContextId);
+ Assert.Equal(TaskId, a2aSession.TaskId);
+ }
+
+ [Fact]
+ public async Task RunStreamingAsync_WithContinuationToken_WhenSubscribeAndGetTaskBothFail_PropagatesExceptionAsync()
+ {
+ // Arrange
+ const string TaskId = "failed-task-789";
+
+ this._handler.StreamingErrorCodeToReturn = A2AErrorCode.UnsupportedOperation;
+ this._handler.GetTaskErrorCodeToReturn = A2AErrorCode.TaskNotFound;
+
+ var options = new AgentRunOptions { ContinuationToken = new A2AContinuationToken(TaskId) };
+
+ // Act & Assert - the A2AException from GetTaskAsync should propagate to the caller
+ var exception = await Assert.ThrowsAsync(async () =>
+ {
+ await foreach (var _ in this._agent.RunStreamingAsync([], null, options))
+ {
+ }
+ });
+
+ Assert.Equal(A2AErrorCode.TaskNotFound, exception.ErrorCode);
+
+ // Assert - both SubscribeToTask and GetTask were called
+ Assert.Equal(2, this._handler.CapturedJsonRpcRequests.Count);
+ Assert.Equal("SubscribeToTask", this._handler.CapturedJsonRpcRequests[0].Method);
+ Assert.Equal("GetTask", this._handler.CapturedJsonRpcRequests[1].Method);
+ }
+
[Fact]
public async Task RunStreamingAsync_WithTaskInSessionAndMessage_AddTaskAsReferencesToMessageAsync()
{
@@ -1512,6 +1611,23 @@ internal sealed class A2AClientHttpMessageHandlerStub : HttpMessageHandler
public StreamResponse? StreamingResponseToReturn { get; set; }
+ ///
+ /// When set, streaming requests for SubscribeToTask will return a JSON-RPC error
+ /// with this error code. Used to simulate UnsupportedOperation errors.
+ ///
+ public A2AErrorCode? StreamingErrorCodeToReturn { get; set; }
+
+ ///
+ /// Error message to include when is set.
+ ///
+ public string StreamingErrorMessage { get; set; } = "Task is in a terminal state and cannot be subscribed to.";
+
+ ///
+ /// When set, GetTask requests will return a JSON-RPC error with this error code.
+ /// Used to simulate failures in the GetTaskAsync fallback path.
+ ///
+ public A2AErrorCode? GetTaskErrorCodeToReturn { get; set; }
+
protected override async Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
// Capture the request content
@@ -1538,6 +1654,25 @@ protected override async Task SendAsync(HttpRequestMessage
}
catch { /* Ignore deserialization errors for non-GetTaskRequest requests */ }
+ // Return a JSON-RPC error for GetTask when configured
+ if (this.GetTaskErrorCodeToReturn is not null && this.CapturedJsonRpcRequest?.Method == "GetTask")
+ {
+ var jsonRpcResponse = new JsonRpcResponse
+ {
+ Id = "response-id",
+ Error = new JsonRpcError
+ {
+ Code = (int)this.GetTaskErrorCodeToReturn.Value,
+ Message = "Simulated GetTask error."
+ }
+ };
+
+ return new HttpResponseMessage(HttpStatusCode.OK)
+ {
+ Content = new StringContent(JsonSerializer.Serialize(jsonRpcResponse), Encoding.UTF8, "application/json")
+ };
+ }
+
// Return the pre-configured AgentTask response (for tasks/get)
if (this.AgentTaskToReturn is not null && this.CapturedJsonRpcRequest?.Method == "GetTask")
{
@@ -1567,6 +1702,39 @@ protected override async Task SendAsync(HttpRequestMessage
Content = new StringContent(JsonSerializer.Serialize(jsonRpcResponse), Encoding.UTF8, "application/json")
};
}
+ // Return a streaming JSON-RPC error (e.g., UnsupportedOperation for SubscribeToTask)
+ else if (this.StreamingErrorCodeToReturn is not null
+ && this.CapturedJsonRpcRequest?.Method is "SubscribeToTask")
+ {
+ var jsonRpcResponse = new JsonRpcResponse
+ {
+ Id = "response-id",
+ Error = new JsonRpcError
+ {
+ Code = (int)this.StreamingErrorCodeToReturn.Value,
+ Message = this.StreamingErrorMessage
+ }
+ };
+
+ var stream = new MemoryStream();
+ using (var writer = new StreamWriter(stream, Encoding.UTF8, leaveOpen: true))
+ {
+ await writer.WriteAsync($"data: {JsonSerializer.Serialize(jsonRpcResponse, A2AJsonUtilities.DefaultOptions)}\n\n");
+#pragma warning disable CA2016 // Forward the 'CancellationToken' parameter to methods; overload doesn't exist downlevel
+ await writer.FlushAsync();
+#pragma warning restore CA2016
+ }
+
+ stream.Position = 0;
+
+ return new HttpResponseMessage(HttpStatusCode.OK)
+ {
+ Content = new StreamContent(stream)
+ {
+ Headers = { { "Content-Type", "text/event-stream" } }
+ }
+ };
+ }
// Return the pre-configured streaming response
else if (this.StreamingResponseToReturn is not null)
{
@@ -1577,11 +1745,14 @@ protected override async Task SendAsync(HttpRequestMessage
};
var stream = new MemoryStream();
- var writer = new StreamWriter(stream);
- await writer.WriteAsync($"data: {JsonSerializer.Serialize(jsonRpcResponse, A2AJsonUtilities.DefaultOptions)}\n\n");
+ using (var writer = new StreamWriter(stream, Encoding.UTF8, leaveOpen: true))
+ {
+ await writer.WriteAsync($"data: {JsonSerializer.Serialize(jsonRpcResponse, A2AJsonUtilities.DefaultOptions)}\n\n");
#pragma warning disable CA2016 // Forward the 'CancellationToken' parameter to methods; overload doesn't exist downlevel
- await writer.FlushAsync();
+ await writer.FlushAsync();
#pragma warning restore CA2016
+ }
+
stream.Position = 0;
return new HttpResponseMessage(HttpStatusCode.OK)