-
Notifications
You must be signed in to change notification settings - Fork 1.6k
.NET: Fix CheckpointInfo.Parent always null in InProcessRunner #3812
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
177 changes: 177 additions & 0 deletions
177
dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,177 @@ | ||
| // Copyright (c) Microsoft. All rights reserved. | ||
|
|
||
| using System.Collections.Generic; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using FluentAssertions; | ||
| using Microsoft.Agents.AI.Workflows.Checkpointing; | ||
|
|
||
| namespace Microsoft.Agents.AI.Workflows.UnitTests; | ||
|
|
||
| /// <summary> | ||
| /// Tests for verifying that CheckpointInfo.Parent is properly populated | ||
| /// when checkpoints are created during workflow execution (GH #3796). | ||
| /// </summary> | ||
| public class CheckpointParentTests | ||
| { | ||
| [Theory] | ||
| [InlineData(ExecutionEnvironment.InProcess_Lockstep)] | ||
| [InlineData(ExecutionEnvironment.InProcess_OffThread)] | ||
| internal async Task Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync(ExecutionEnvironment environment) | ||
| { | ||
| // Arrange: A simple two-step workflow that will produce at least one checkpoint. | ||
| ForwardMessageExecutor<string> executorA = new("A"); | ||
| ForwardMessageExecutor<string> executorB = new("B"); | ||
|
|
||
| Workflow workflow = new WorkflowBuilder(executorA) | ||
| .AddEdge(executorA, executorB) | ||
| .Build(); | ||
|
|
||
| CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); | ||
| IWorkflowExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); | ||
|
|
||
| // Act | ||
| Checkpointed<StreamingRun> checkpointed = | ||
| await env.StreamAsync(workflow, "Hello", checkpointManager); | ||
|
|
||
| List<CheckpointInfo> checkpoints = []; | ||
| await foreach (WorkflowEvent evt in checkpointed.Run.WatchStreamAsync()) | ||
| { | ||
| if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) | ||
| { | ||
| checkpoints.Add(cp); | ||
| } | ||
| } | ||
|
|
||
| // Assert: The first checkpoint should have been created and stored with a null parent. | ||
| checkpoints.Should().NotBeEmpty("at least one checkpoint should have been created"); | ||
|
|
||
| CheckpointInfo firstCheckpoint = checkpoints[0]; | ||
| Checkpoint storedFirst = await ((ICheckpointManager)checkpointManager) | ||
| .LookupCheckpointAsync(firstCheckpoint.RunId, firstCheckpoint); | ||
| storedFirst.Parent.Should().BeNull("the first checkpoint should have no parent"); | ||
| } | ||
|
|
||
| [Theory] | ||
| [InlineData(ExecutionEnvironment.InProcess_Lockstep)] | ||
| [InlineData(ExecutionEnvironment.InProcess_OffThread)] | ||
| internal async Task Checkpoint_SubsequentCheckpoints_ShouldChainParentsAsync(ExecutionEnvironment environment) | ||
| { | ||
| // Arrange: A workflow with a loop that will produce multiple checkpoints. | ||
| ForwardMessageExecutor<string> executorA = new("A"); | ||
| ForwardMessageExecutor<string> executorB = new("B"); | ||
|
|
||
| // A -> B -> A (loop) to generate multiple supersteps/checkpoints. | ||
| Workflow workflow = new WorkflowBuilder(executorA) | ||
| .AddEdge(executorA, executorB) | ||
| .AddEdge(executorB, executorA) | ||
| .Build(); | ||
|
|
||
| CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); | ||
| IWorkflowExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); | ||
|
|
||
| // Act | ||
| await using Checkpointed<StreamingRun> checkpointed = | ||
| await env.StreamAsync(workflow, "Hello", checkpointManager); | ||
|
|
||
| List<CheckpointInfo> checkpoints = []; | ||
| using CancellationTokenSource cts = new(); | ||
|
|
||
| await foreach (WorkflowEvent evt in checkpointed.Run.WatchStreamAsync(cts.Token)) | ||
| { | ||
| if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) | ||
| { | ||
| checkpoints.Add(cp); | ||
| if (checkpoints.Count >= 3) | ||
| { | ||
| cts.Cancel(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Assert: We should have at least 3 checkpoints | ||
| checkpoints.Should().HaveCountGreaterThanOrEqualTo(3); | ||
|
|
||
| // Verify the parent chain | ||
| Checkpoint stored0 = await ((ICheckpointManager)checkpointManager) | ||
| .LookupCheckpointAsync(checkpoints[0].RunId, checkpoints[0]); | ||
| stored0.Parent.Should().BeNull("the first checkpoint should have no parent"); | ||
|
|
||
| Checkpoint stored1 = await ((ICheckpointManager)checkpointManager) | ||
| .LookupCheckpointAsync(checkpoints[1].RunId, checkpoints[1]); | ||
| stored1.Parent.Should().NotBeNull("the second checkpoint should have a parent"); | ||
| stored1.Parent.Should().Be(checkpoints[0], "the second checkpoint's parent should be the first checkpoint"); | ||
|
|
||
| Checkpoint stored2 = await ((ICheckpointManager)checkpointManager) | ||
| .LookupCheckpointAsync(checkpoints[2].RunId, checkpoints[2]); | ||
| stored2.Parent.Should().NotBeNull("the third checkpoint should have a parent"); | ||
| stored2.Parent.Should().Be(checkpoints[1], "the third checkpoint's parent should be the second checkpoint"); | ||
| } | ||
|
|
||
| [Theory] | ||
| [InlineData(ExecutionEnvironment.InProcess_Lockstep)] | ||
| [InlineData(ExecutionEnvironment.InProcess_OffThread)] | ||
| internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAsync(ExecutionEnvironment environment) | ||
| { | ||
| // Arrange: A looping workflow that produces checkpoints. | ||
| ForwardMessageExecutor<string> executorA = new("A"); | ||
| ForwardMessageExecutor<string> executorB = new("B"); | ||
|
|
||
| Workflow workflow = new WorkflowBuilder(executorA) | ||
| .AddEdge(executorA, executorB) | ||
| .AddEdge(executorB, executorA) | ||
| .Build(); | ||
|
|
||
| CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); | ||
| IWorkflowExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); | ||
|
|
||
| // First run: collect a checkpoint to resume from | ||
| await using Checkpointed<StreamingRun> checkpointed = | ||
| await env.StreamAsync(workflow, "Hello", checkpointManager); | ||
|
|
||
| List<CheckpointInfo> firstRunCheckpoints = []; | ||
| using CancellationTokenSource cts = new(); | ||
| await foreach (WorkflowEvent evt in checkpointed.Run.WatchStreamAsync(cts.Token)) | ||
| { | ||
| if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) | ||
| { | ||
| firstRunCheckpoints.Add(cp); | ||
| if (firstRunCheckpoints.Count >= 2) | ||
| { | ||
| cts.Cancel(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| firstRunCheckpoints.Should().HaveCountGreaterThanOrEqualTo(2); | ||
| CheckpointInfo resumePoint = firstRunCheckpoints[0]; | ||
|
|
||
| // Dispose the first run to release workflow ownership before resuming. | ||
| await checkpointed.DisposeAsync(); | ||
|
|
||
| // Act: Resume from the first checkpoint | ||
| Checkpointed<StreamingRun> resumed = | ||
| await env.ResumeStreamAsync(workflow, resumePoint, checkpointManager); | ||
|
|
||
| List<CheckpointInfo> resumedCheckpoints = []; | ||
| using CancellationTokenSource cts2 = new(); | ||
| await foreach (WorkflowEvent evt in resumed.Run.WatchStreamAsync(cts2.Token)) | ||
| { | ||
| if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) | ||
| { | ||
| resumedCheckpoints.Add(cp); | ||
| if (resumedCheckpoints.Count >= 1) | ||
| { | ||
| cts2.Cancel(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Assert: The first checkpoint after resume should have the resume point as its parent. | ||
| resumedCheckpoints.Should().NotBeEmpty(); | ||
| Checkpoint storedResumed = await ((ICheckpointManager)checkpointManager) | ||
| .LookupCheckpointAsync(resumedCheckpoints[0].RunId, resumedCheckpoints[0]); | ||
| storedResumed.Parent.Should().NotBeNull("checkpoint created after resume should have a parent"); | ||
| storedResumed.Parent.Should().Be(resumePoint, "checkpoint after resume should reference the checkpoint we resumed from"); | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.