diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index c6492270d8..4eb1290961 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -23,7 +23,8 @@ internal sealed class StreamingRunEventStream : IRunEventStream private readonly CancellationTokenSource _runLoopCancellation; private readonly bool _disableRunLoop; private Task? _runLoopTask; - private RunStatus _runStatus = RunStatus.NotStarted; + private volatile RunStatus _runStatus = RunStatus.NotStarted; + private int _completionEpoch; // Tracks which completion signal belongs to which consumer iteration public StreamingRunEventStream(ISuperStepRunner stepRunner, bool disableRunLoop = false) @@ -127,7 +128,7 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) // Wait for next input from the consumer // Works for both Idle (no work) and PendingRequests (waiting for responses) - await this._inputWaiter.WaitForInputAsync(TimeSpan.FromSeconds(1), linkedSource.Token).ConfigureAwait(false); + await this._inputWaiter.WaitForInputAsync(linkedSource.Token).ConfigureAwait(false); // When signaled, resume running this._runStatus = RunStatus.Running; @@ -209,7 +210,10 @@ public async IAsyncEnumerable TakeEventStreamAsync( [EnumeratorCancellation] CancellationToken cancellationToken = default) { // Get the current epoch - we'll only respond to completion signals from this epoch or later - int myEpoch = Volatile.Read(ref this._completionEpoch) + 1; + int currentEpoch = Volatile.Read(ref this._completionEpoch); + + bool expectingFreshWork = this._stepRunner.HasUnprocessedMessages || this._runStatus == RunStatus.Running; + int myEpoch = expectingFreshWork ? currentEpoch + 1 : currentEpoch; // Use custom async enumerable to avoid exceptions on cancellation. NonThrowingChannelReaderAsyncEnumerable eventStream = new(this._eventChannel.Reader); diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessExecutionTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessExecutionTests.cs index 76f37714fd..870d100d76 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessExecutionTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessExecutionTests.cs @@ -132,6 +132,53 @@ public async Task RunAsyncAndStreamAsyncShouldProduceSimilarResultsAsync() "both versions should produce the same number of agent events"); } + /// + /// This test checks that the logic around waiting for input and halting appropriately works right when the + /// workflow runs to halting before the EventStream is watched by the user. + /// + [Fact] + public async Task RunStreamingAsyncWaitToTakeStreamAsync() + { + // Arrange: Create a simple agent that responds to messages + var agent = new SimpleTestAgent("test-agent"); + var workflow = AgentWorkflowBuilder.BuildSequential(agent); + var inputMessage = new ChatMessage(ChatRole.User, "Hello"); + + // Act: Execute using streaming version with TurnToken + await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, new List { inputMessage }); + + // Send TurnToken to actually trigger execution (this is the key step) + bool messageSent = await run.TrySendMessageAsync(new TurnToken(emitEvents: true)); + messageSent.Should().BeTrue("TurnToken should be accepted"); + + while (await run.GetStatusAsync() != RunStatus.Idle) + { + await Task.Delay(200); + } + + // Collect events + List events = []; + + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert: The workflow should have executed and produced events + RunStatus status = await run.GetStatusAsync(); + status.Should().Be(RunStatus.Idle, "workflow should complete execution"); + + events.Should().NotBeEmpty("workflow should produce events during execution"); + + // Check that we have agent execution events + var agentEvents = events.OfType().ToList(); + agentEvents.Should().NotBeEmpty("agent should have executed and produced update events"); + + // Check that we have output events + var outputEvents = events.OfType().ToList(); + outputEvents.Should().NotBeEmpty("workflow should produce output events"); + } + /// /// Simple test agent that echoes back the input message. ///