From 9efecdd0e219676ba7fd7b26dd4b3a0dc21b4e2c Mon Sep 17 00:00:00 2001 From: Jacob Alber Date: Tue, 31 Mar 2026 09:30:39 -0400 Subject: [PATCH 1/4] fix: Remove Timeout from InputWait in StreamingRunEventStream --- .../Execution/StreamingRunEventStream.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index 6278f3446b..8737bc0f72 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -123,7 +123,7 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) // Wait for next input from the consumer // Works for both Idle (no work) and PendingRequests (waiting for responses) - await this._inputWaiter.WaitForInputAsync(TimeSpan.FromSeconds(1), linkedSource.Token).ConfigureAwait(false); + await this._inputWaiter.WaitForInputAsync(linkedSource.Token).ConfigureAwait(false); // When signaled, resume running this._runStatus = RunStatus.Running; From 9db1bf8b6187d15c6d1196da24caec2756c520b8 Mon Sep 17 00:00:00 2001 From: Jacob Alber Date: Tue, 31 Mar 2026 09:26:30 -0400 Subject: [PATCH 2/4] fix: Race condition when the workflow executes to halt before TakeEventStream --- .../Execution/StreamingRunEventStream.cs | 5 ++- .../InProcessExecutionTests.cs | 44 +++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index 8737bc0f72..d39b1e6300 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -24,6 +24,7 @@ internal sealed class StreamingRunEventStream : IRunEventStream private readonly bool _disableRunLoop; private Task? _runLoopTask; private RunStatus _runStatus = RunStatus.NotStarted; + private int _completionEpoch; // Tracks which completion signal belongs to which consumer iteration public StreamingRunEventStream(ISuperStepRunner stepRunner, bool disableRunLoop = false) @@ -205,7 +206,9 @@ public async IAsyncEnumerable TakeEventStreamAsync( [EnumeratorCancellation] CancellationToken cancellationToken = default) { // Get the current epoch - we'll only respond to completion signals from this epoch or later - int myEpoch = Volatile.Read(ref this._completionEpoch) + 1; + int currentEpoch = Volatile.Read(ref this._completionEpoch); + bool expectingFreshWork = this._stepRunner.HasUnprocessedMessages || this._runStatus == RunStatus.Running; + int myEpoch = expectingFreshWork ? currentEpoch + 1 : currentEpoch; // Use custom async enumerable to avoid exceptions on cancellation. NonThrowingChannelReaderAsyncEnumerable eventStream = new(this._eventChannel.Reader); diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessExecutionTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessExecutionTests.cs index 76f37714fd..e01feaaf9f 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessExecutionTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessExecutionTests.cs @@ -132,6 +132,50 @@ public async Task RunAsyncAndStreamAsyncShouldProduceSimilarResultsAsync() "both versions should produce the same number of agent events"); } + /// + /// This test checks that the logic around waiting for input and halting appropriately works right when the + /// workflow runs to halting before the EventStream is watched by the user. + /// + [Fact] + public async Task RunStreamingAsyncWaitToTakeStreamAsync() + { + // Arrange: Create a simple agent that responds to messages + var agent = new SimpleTestAgent("test-agent"); + var workflow = AgentWorkflowBuilder.BuildSequential(agent); + var inputMessage = new ChatMessage(ChatRole.User, "Hello"); + + // Act: Execute using streaming version with TurnToken + await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, new List { inputMessage }); + + // Send TurnToken to actually trigger execution (this is the key step) + bool messageSent = await run.TrySendMessageAsync(new TurnToken(emitEvents: true)); + messageSent.Should().BeTrue("TurnToken should be accepted"); + + Thread.Sleep(TimeSpan.FromSeconds(2)); + + // Collect events + List events = []; + + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert: The workflow should have executed and produced events + RunStatus status = await run.GetStatusAsync(); + status.Should().Be(RunStatus.Idle, "workflow should complete execution"); + + events.Should().NotBeEmpty("workflow should produce events during execution"); + + // Check that we have agent execution events + var agentEvents = events.OfType().ToList(); + agentEvents.Should().NotBeEmpty("agent should have executed and produced update events"); + + // Check that we have output events + var outputEvents = events.OfType().ToList(); + outputEvents.Should().NotBeEmpty("workflow should produce output events"); + } + /// /// Simple test agent that echoes back the input message. /// From 74c2ab8588fea48ab5abe70967205187e95db0ae Mon Sep 17 00:00:00 2001 From: Jacob Alber Date: Tue, 31 Mar 2026 09:59:55 -0400 Subject: [PATCH 3/4] test: Make the OffThread Delay test more nimble --- .../InProcessExecutionTests.cs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessExecutionTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessExecutionTests.cs index e01feaaf9f..870d100d76 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessExecutionTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessExecutionTests.cs @@ -151,7 +151,10 @@ public async Task RunStreamingAsyncWaitToTakeStreamAsync() bool messageSent = await run.TrySendMessageAsync(new TurnToken(emitEvents: true)); messageSent.Should().BeTrue("TurnToken should be accepted"); - Thread.Sleep(TimeSpan.FromSeconds(2)); + while (await run.GetStatusAsync() != RunStatus.Idle) + { + await Task.Delay(200); + } // Collect events List events = []; From f98d969d22ac09113ee1e65da7dd4a36270e6868 Mon Sep 17 00:00:00 2001 From: Jacob Alber Date: Tue, 31 Mar 2026 10:02:59 -0400 Subject: [PATCH 4/4] fix: Remove slight window where runStatus could be stale --- .../Execution/StreamingRunEventStream.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index d39b1e6300..aaa6564b2c 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -23,7 +23,7 @@ internal sealed class StreamingRunEventStream : IRunEventStream private readonly CancellationTokenSource _runLoopCancellation; private readonly bool _disableRunLoop; private Task? _runLoopTask; - private RunStatus _runStatus = RunStatus.NotStarted; + private volatile RunStatus _runStatus = RunStatus.NotStarted; private int _completionEpoch; // Tracks which completion signal belongs to which consumer iteration @@ -207,6 +207,7 @@ public async IAsyncEnumerable TakeEventStreamAsync( { // Get the current epoch - we'll only respond to completion signals from this epoch or later int currentEpoch = Volatile.Read(ref this._completionEpoch); + bool expectingFreshWork = this._stepRunner.HasUnprocessedMessages || this._runStatus == RunStatus.Running; int myEpoch = expectingFreshWork ? currentEpoch + 1 : currentEpoch;