Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ internal sealed class StreamingRunEventStream : IRunEventStream
private readonly CancellationTokenSource _runLoopCancellation;
private readonly bool _disableRunLoop;
private Task? _runLoopTask;
private RunStatus _runStatus = RunStatus.NotStarted;
private volatile RunStatus _runStatus = RunStatus.NotStarted;

private int _completionEpoch; // Tracks which completion signal belongs to which consumer iteration

public StreamingRunEventStream(ISuperStepRunner stepRunner, bool disableRunLoop = false)
Expand Down Expand Up @@ -127,7 +128,7 @@ private async Task RunLoopAsync(CancellationToken cancellationToken)

// Wait for next input from the consumer
// Works for both Idle (no work) and PendingRequests (waiting for responses)
await this._inputWaiter.WaitForInputAsync(TimeSpan.FromSeconds(1), linkedSource.Token).ConfigureAwait(false);
await this._inputWaiter.WaitForInputAsync(linkedSource.Token).ConfigureAwait(false);

// When signaled, resume running
this._runStatus = RunStatus.Running;
Expand Down Expand Up @@ -209,7 +210,10 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// Get the current epoch - we'll only respond to completion signals from this epoch or later
int myEpoch = Volatile.Read(ref this._completionEpoch) + 1;
int currentEpoch = Volatile.Read(ref this._completionEpoch);

bool expectingFreshWork = this._stepRunner.HasUnprocessedMessages || this._runStatus == RunStatus.Running;
Comment thread
lokitoth marked this conversation as resolved.
int myEpoch = expectingFreshWork ? currentEpoch + 1 : currentEpoch;

// Use custom async enumerable to avoid exceptions on cancellation.
NonThrowingChannelReaderAsyncEnumerable<WorkflowEvent> eventStream = new(this._eventChannel.Reader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,53 @@ public async Task RunAsyncAndStreamAsyncShouldProduceSimilarResultsAsync()
"both versions should produce the same number of agent events");
}

/// <summary>
/// This test checks that the logic around waiting for input and halting appropriately works right when the
/// workflow runs to halting before the EventStream is watched by the user.
/// </summary>
[Fact]
public async Task RunStreamingAsyncWaitToTakeStreamAsync()
{
// Arrange: Create a simple agent that responds to messages
var agent = new SimpleTestAgent("test-agent");
var workflow = AgentWorkflowBuilder.BuildSequential(agent);
var inputMessage = new ChatMessage(ChatRole.User, "Hello");

// Act: Execute using streaming version with TurnToken
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, new List<ChatMessage> { inputMessage });

// Send TurnToken to actually trigger execution (this is the key step)
bool messageSent = await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
messageSent.Should().BeTrue("TurnToken should be accepted");
Comment thread
lokitoth marked this conversation as resolved.

while (await run.GetStatusAsync() != RunStatus.Idle)
Comment thread
lokitoth marked this conversation as resolved.
{
await Task.Delay(200);
}

// Collect events
List<WorkflowEvent> events = [];

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
Comment thread
lokitoth marked this conversation as resolved.
{
events.Add(evt);
}

// Assert: The workflow should have executed and produced events
RunStatus status = await run.GetStatusAsync();
status.Should().Be(RunStatus.Idle, "workflow should complete execution");

events.Should().NotBeEmpty("workflow should produce events during execution");

// Check that we have agent execution events
var agentEvents = events.OfType<AgentResponseUpdateEvent>().ToList();
agentEvents.Should().NotBeEmpty("agent should have executed and produced update events");

// Check that we have output events
var outputEvents = events.OfType<WorkflowOutputEvent>().ToList();
outputEvents.Should().NotBeEmpty("workflow should produce output events");
}

/// <summary>
/// Simple test agent that echoes back the input message.
/// </summary>
Expand Down
Loading