From 5b5960a546bab9a110dc55ebd56463fc0548ab37 Mon Sep 17 00:00:00 2001 From: Jacob Alber Date: Thu, 4 Dec 2025 18:42:24 -0500 Subject: [PATCH 1/2] fix: RunState is sometimes Running when it should be Idle There is a timeout-wait for messages in the inner loop driving the running of the workflow. When it gets cleared, an attempt to run a superstep is made. Currently we set the state to running as soon as the wait clears, even if we have no work to do, and clear it upon discovering this. The fix is to only set the Running state when actual Workflow execution is happening when running Super-Steps, and not until the loop confirms there is work to do. A broader-term fix would be to remove the Semaphore and timeout-wait in it, since it is working around the inability to atomicaly insert and release the Semaphore, --- .../Execution/StreamingRunEventStream.cs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index 4cce8df844..cf49dceac6 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -69,7 +69,6 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) // The consumer will call EnqueueMessageAsync which signals the run loop await this._inputWaiter.WaitForInputAsync(cancellationToken: linkedSource.Token).ConfigureAwait(false); - this._runStatus = RunStatus.Running; activity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted)); while (!linkedSource.Token.IsCancellationRequested) @@ -78,6 +77,9 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) // Events are streamed out in real-time as they happen via the event handler while (this._stepRunner.HasUnprocessedMessages && !linkedSource.Token.IsCancellationRequested) { + this._runStatus = RunStatus.Running; // Ensure we do not inappropriately signal Running status unless + // messages are being processed. + await this._stepRunner.RunSuperStepAsync(linkedSource.Token).ConfigureAwait(false); } @@ -96,9 +98,6 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) // Wait for next input from the consumer // Works for both Idle (no work) and PendingRequests (waiting for responses) await this._inputWaiter.WaitForInputAsync(TimeSpan.FromSeconds(1), linkedSource.Token).ConfigureAwait(false); - - // When signaled, resume running - this._runStatus = RunStatus.Running; } } catch (OperationCanceledException) From 4011c97b75e2f135ba133300ea7d78a23091fc2a Mon Sep 17 00:00:00 2001 From: Jacob Alber Date: Fri, 5 Dec 2025 16:55:19 -0500 Subject: [PATCH 2/2] test: WIP --- .../Execution/LockstepRunEventStream.cs | 36 ++- .../Execution/StreamingRunEventStream.cs | 5 +- .../DelayValueTaskSource.cs | 29 ++ .../RunStatusTests.cs | 290 ++++++++++++++++++ .../TestValueTaskSource.cs | 130 ++++++++ 5 files changed, 481 insertions(+), 9 deletions(-) create mode 100644 dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/DelayValueTaskSource.cs create mode 100644 dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/RunStatusTests.cs create mode 100644 dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestValueTaskSource.cs diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs index 250a9ee612..81a053e129 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs @@ -53,6 +53,9 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe using Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.RunId, this._stepRunner.RunId); + bool hadException = false; + bool hadCancellation = false; + try { this.RunStatus = RunStatus.Running; @@ -74,14 +77,21 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe } catch (OperationCanceledException) { + hadCancellation = true; } - catch (Exception ex) when (activity is not null) + catch (Exception ex) { - activity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() { - { Tags.ErrorType, ex.GetType().FullName }, - { Tags.BuildErrorMessage, ex.Message }, - })); - activity.CaptureException(ex); + hadException = true; + + if (activity != null) + { + activity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() { + { Tags.ErrorType, ex.GetType().FullName }, + { Tags.BuildErrorMessage, ex.Message }, + })); + activity.CaptureException(ex); + } + throw; } @@ -133,7 +143,19 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe } finally { - this.RunStatus = this._stepRunner.HasUnservicedRequests ? RunStatus.PendingRequests : RunStatus.Idle; + if (hadException || hadCancellation || linkedSource.Token.IsCancellationRequested) + { + this.RunStatus = RunStatus.Ended; + } + else if (this._stepRunner.HasUnservicedRequests) + { + this.RunStatus = RunStatus.PendingRequests; + } + else + { + this.RunStatus = RunStatus.Idle; + } + this._stepRunner.OutgoingEvents.EventRaised -= OnWorkflowEventAsync; } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index cf49dceac6..5063aa33eb 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -118,11 +118,12 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) } finally { + // Mark as ended when run loop exits + this._runStatus = RunStatus.Ended; + this._stepRunner.OutgoingEvents.EventRaised -= OnEventRaisedAsync; this._eventChannel.Writer.Complete(); - // Mark as ended when run loop exits - this._runStatus = RunStatus.Ended; activity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/DelayValueTaskSource.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/DelayValueTaskSource.cs new file mode 100644 index 0000000000..4630588a75 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/DelayValueTaskSource.cs @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Threading.Tasks; +using System.Threading.Tasks.Sources; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +internal sealed class DelayValueTaskSource : IValueTaskSource +{ + private readonly TestValueTaskSource _innerSource = new(); + private readonly T _value; + + public DelayValueTaskSource(T value) + { + this._value = value; + } + + public ValueTask ReleaseSucceededAsync() => this._innerSource.SetSucceededAsync(this._value); + public ValueTask ReleaseFaultedAsync(Exception exception) => this._innerSource.SetFaultedAsync(exception); + public ValueTask ReleaseCanceledAsync() => this._innerSource.SetCanceledAsync(); + + public T GetResult(short token) => this._innerSource.GetResult(token); + + public ValueTaskSourceStatus GetStatus(short token) => this._innerSource.GetStatus(token); + + public void OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) + => this._innerSource.OnCompleted(continuation, state, token, flags); +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/RunStatusTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/RunStatusTests.cs new file mode 100644 index 0000000000..e3f225548a --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/RunStatusTests.cs @@ -0,0 +1,290 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using FluentAssertions; +using Microsoft.Agents.AI.Workflows.Execution; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +public static class RunStatusTests +{ + internal sealed class TestStepRunner : ISuperStepRunner + { + public TestStepRunner([CallerMemberName] string? name = null) + { + Console.WriteLine($"Starting test {name}"); + } + + public string RunId { get; } = Guid.NewGuid().ToString("N"); + + public string StartExecutorId { get; } = "start"; + + public bool HasUnservicedRequests { get; set; } + public bool HasUnprocessedMessages { get; set; } + + public ConcurrentEventSink OutgoingEvents { get; } = new(); + + public ValueTask EnqueueMessageAsync(T message, CancellationToken cancellationToken = default) + { + this.HasUnprocessedMessages = true; + return new(true); + } + + ValueTask ISuperStepRunner.EnqueueMessageUntypedAsync(object message, Type declaredType, CancellationToken cancellationToken) + { + this.HasUnprocessedMessages = true; + return new(true); + } + + public async ValueTask EnqueueResponseAsync(ExternalResponse response, CancellationToken cancellationToken = default) + { + this.HasUnservicedRequests = false; + await this.EnqueueMessageAsync(response, cancellationToken); + } + + public ValueTask IsValidInputTypeAsync(CancellationToken cancellationToken = default) => new(true); + + public ValueTask RequestEndRunAsync() + { + if (this._currentStepSource != null) + { + return this.CancelStepAsync(); + } + + return new(); + } + + private DelayValueTaskSource? _currentStepSource; + private CancellationTokenRegistration? _registration; + + ValueTask ISuperStepRunner.RunSuperStepAsync(CancellationToken cancellationToken) + { + Debug.Assert(Interlocked.CompareExchange(ref this._currentStepSource, + value: new DelayValueTaskSource(this.HasUnprocessedMessages), + null) is null); + + this._registration = cancellationToken.Register(() => _ = this._currentStepSource == null + ? Task.CompletedTask + : this._currentStepSource.ReleaseCanceledAsync().AsTask()); + this.HasUnprocessedMessages = false; + + return new(this._currentStepSource, 0); + } + + private DelayValueTaskSource TakeCurrentStepSource() + { + DelayValueTaskSource? currentStepSource = Interlocked.Exchange(ref this._currentStepSource, null); + Debug.Assert(currentStepSource is not null); + this._registration?.Dispose(); + this._registration = null; + + return currentStepSource; + } + + public ValueTask CompleteStepAsync() => this.TakeCurrentStepSource().ReleaseSucceededAsync(); + + public ValueTask CompleteStepWithPendingAsync() + { + this.HasUnservicedRequests = true; + return this.CompleteStepAsync(); + } + + public ValueTask CancelStepAsync() => this.TakeCurrentStepSource().ReleaseCanceledAsync(); + + public ValueTask FailStepAsync(Exception exception) => this.TakeCurrentStepSource().ReleaseFaultedAsync(exception); + } + + public enum EventStreamKind + { + OffThread, + Lockstep + } + + private static IRunEventStream GetRunStreamForKind(EventStreamKind kind, ISuperStepRunner stepRunner) + { + IRunEventStream result; + switch (kind) + { + case EventStreamKind.OffThread: + result = new StreamingRunEventStream(stepRunner); + break; + case EventStreamKind.Lockstep: + result = new LockstepRunEventStream(stepRunner); + break; + default: + throw new NotSupportedException($"Unsupported RunStream kind: {kind}"); + } + + result.Start(); + return result; + } + + [Theory] + [InlineData(EventStreamKind.OffThread)] + [InlineData(EventStreamKind.Lockstep)] + public static async Task Test_RunStatus_NotStartedWhenStartingAsync(EventStreamKind mode) + { + TestStepRunner runner = new(); + IRunEventStream eventStream = GetRunStreamForKind(mode, runner); + + RunStatus status = await eventStream.GetStatusAsync(); + status.Should().Be(RunStatus.NotStarted); + } + + [Theory] + [InlineData(EventStreamKind.OffThread)] + [InlineData(EventStreamKind.Lockstep)] + public static async Task Test_RunStatus_RunningWhenInSuperstepAsync(EventStreamKind mode) + { + TestStepRunner runner = new(); + IRunEventStream eventStream = GetRunStreamForKind(mode, runner); + + await runner.EnqueueMessageAsync(new object()); + eventStream.SignalInput(); + + _ = WatchStreamAsync(); + + RunStatus status = await eventStream.GetStatusAsync(); + status.Should().Be(RunStatus.Running); + + await eventStream.DisposeAsync(); + + async Task WatchStreamAsync() + { + await foreach (var _ in eventStream.TakeEventStreamAsync(false)) { } + } + } + + [Theory] + [InlineData(EventStreamKind.OffThread)] + [InlineData(EventStreamKind.Lockstep)] + public static async Task Test_RunStatus_IdleWhenFinishedSuperstepsAsync(EventStreamKind mode) + { + TestStepRunner runner = new(); + IRunEventStream eventStream = GetRunStreamForKind(mode, runner); + + await runner.EnqueueMessageAsync(new object()); + eventStream.SignalInput(); + + Task watchTask = WatchStreamAsync(); + await runner.CompleteStepAsync(); + await watchTask; + + RunStatus status = await eventStream.GetStatusAsync(); + status.Should().Be(RunStatus.Idle); + + await eventStream.DisposeAsync(); + + async Task WatchStreamAsync() + { + await foreach (var _ in eventStream.TakeEventStreamAsync(false)) { } + } + } + + [Theory] + [InlineData(EventStreamKind.OffThread)] + [InlineData(EventStreamKind.Lockstep)] + public static async Task Test_RunStatus_EndedWhenCancelledAsync(EventStreamKind mode) + { + TestStepRunner runner = new(); + IRunEventStream eventStream = GetRunStreamForKind(mode, runner); + + await runner.EnqueueMessageAsync(new object()); + eventStream.SignalInput(); + + Task watchTask = WatchStreamAsync(); + await runner.CancelStepAsync(); + await watchTask; + + RunStatus status = await eventStream.GetStatusAsync(); + status.Should().Be(RunStatus.Ended); + + await eventStream.DisposeAsync(); + + async Task WatchStreamAsync() + { + await foreach (var _ in eventStream.TakeEventStreamAsync(false)) { } + } + } + + [Theory] + [InlineData(EventStreamKind.OffThread)] + [InlineData(EventStreamKind.Lockstep)] + public static async Task Test_RunStatus_ExceptionWhenFaultedAsync(EventStreamKind mode) + { + TestStepRunner runner = new(); + IRunEventStream eventStream = GetRunStreamForKind(mode, runner); + + await runner.EnqueueMessageAsync(new object()); + eventStream.SignalInput(); + + Task watchTask = WatchStreamAsync(); + await runner.FailStepAsync(new InvalidOperationException()); + await watchTask; + + RunStatus status = await eventStream.GetStatusAsync(); + status.Should().Be(RunStatus.Ended); + + await eventStream.DisposeAsync(); + + async Task WatchStreamAsync() + { + await foreach (var _ in eventStream.TakeEventStreamAsync(false)) { } + } + } + + //[Theory] + //[InlineData(EventStreamKind.OffThread)] + //[InlineData(EventStreamKind.Lockstep)] + internal static async Task Test_RunStatus_PendingRequestsAsync(EventStreamKind mode) + { + TestStepRunner runner = new(); + IRunEventStream eventStream = GetRunStreamForKind(mode, runner); + + // Act 1: Send the input object, and run the step to PendingRequest + await runner.EnqueueMessageAsync(new object()); + eventStream.SignalInput(); + + Task watchTask = WatchStreamAsync(); + await runner.CompleteStepWithPendingAsync(); + await watchTask; + + // Assert 1 + RunStatus status = await eventStream.GetStatusAsync(); + status.Should().Be(RunStatus.PendingRequests); + + // Act 2: Send the response, check running state + await runner.EnqueueResponseAsync( + new ExternalResponse( + new Checkpointing.RequestPortInfo(new(typeof(object)), new(typeof(object)), "_"), + Guid.NewGuid().ToString("N"), + new(new()))); + eventStream.SignalInput(); + + watchTask = WatchStreamAsync(); + + // Assert 2 + status = await eventStream.GetStatusAsync(); + status.Should().Be(RunStatus.Running); + + // Act 3: Process the response, check state is idle + await runner.CompleteStepAsync(); + await watchTask; status = await eventStream.GetStatusAsync(); + status.Should().Be(RunStatus.Running); + + // Assert 3 + status = await eventStream.GetStatusAsync(); + status.Should().Be(RunStatus.Idle); + + await eventStream.DisposeAsync(); + + async Task WatchStreamAsync() + { + await foreach (var _ in eventStream.TakeEventStreamAsync(false)) { } + } + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestValueTaskSource.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestValueTaskSource.cs new file mode 100644 index 0000000000..2a118a1e90 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestValueTaskSource.cs @@ -0,0 +1,130 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Sources; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +internal sealed class TestValueTaskSource : IValueTaskSource +{ + private int _status = (int)ValueTaskSourceStatus.Pending; + private T? _value; + private Exception? _exception; + private int _continuationScheduled; + private readonly object _continuationMutex = new(); + + private bool _ranContinuation; + private Action _continuationClosure = () => { }; + + public TestValueTaskSource() + { + } + + private bool TrySetCompletionStatus(ValueTaskSourceStatus status) + => Interlocked.CompareExchange(ref this._status, + value: (int)status, + comparand: (int)ValueTaskSourceStatus.Pending) + == (int)ValueTaskSourceStatus.Pending; + + private void RunScheduledContinuation() + { + Console.WriteLine("Running scheduled continuation"); + lock (this._continuationMutex) + { + this._ranContinuation = true; + this._continuationClosure(); + } + } + + public ValueTask SetSucceededAsync(T value) + { + Console.WriteLine($"Setting succeeded {value}"); + + if (this.TrySetCompletionStatus(ValueTaskSourceStatus.Succeeded)) + { + // If the status was Pending, we can set it + this._value = value; + } + + this.RunScheduledContinuation(); + return new(); + } + + public ValueTask SetFaultedAsync(Exception exception) + { + Console.WriteLine($"Setting faulted {exception}"); + + if (this.TrySetCompletionStatus(ValueTaskSourceStatus.Faulted)) + { + // If the status was Pending, we can set it + this._exception = exception; + } + + this.RunScheduledContinuation(); + return new(); + } + + public ValueTask SetCanceledAsync() + { + Console.WriteLine("Setting canceled"); + + this.TrySetCompletionStatus(ValueTaskSourceStatus.Canceled); + this.RunScheduledContinuation(); + return new(); + } + + public T GetResult(short token) + { + Debug.Assert(token == 0); + + switch (this.GetStatus(0)) + { + case ValueTaskSourceStatus.Succeeded: + return this._value!; + case ValueTaskSourceStatus.Faulted: + throw this._exception!; + case ValueTaskSourceStatus.Canceled: + throw new TaskCanceledException(); + case ValueTaskSourceStatus.Pending: + throw new InvalidOperationException("The operation is not yet completed."); + default: + throw new NotSupportedException(); + } + } + + public ValueTaskSourceStatus GetStatus(short token) + { + Debug.Assert(token == 0); + + return (ValueTaskSourceStatus)Volatile.Read(ref this._status); + } + + public void OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) + { + Debug.Assert(token == 0); + + if (Interlocked.Exchange(ref this._continuationScheduled, 1) == 1) + { + throw new InvalidOperationException("Cannot schedule more than one continuation on ValueTaskSource"); + } + + lock (this._continuationMutex) + { + if (this._ranContinuation) + { + // The default no-op was run, since we have not yet scheduled a continuation + // Run this continuation immediately + Console.WriteLine("Running continuation"); + continuation(state); + } + else + { + Console.WriteLine("Scheduling continuation"); + this._continuationClosure = () => continuation(state); + } + } + } +}