From 1509904c8f42b5794faf6bcc679b4636835a96ed Mon Sep 17 00:00:00 2001 From: Peter Ibekwe Date: Thu, 12 Mar 2026 14:30:13 -0700 Subject: [PATCH 1/4] Fix race condition issue in FanInEdge while processing messages. --- .../Execution/FanInEdgeState.cs | 40 +++++++++++-------- .../EdgeRunnerTests.cs | 40 +++++++++++++++++++ 2 files changed, 63 insertions(+), 17 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeState.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeState.cs index db8241c13d..6009511264 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeState.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeState.cs @@ -3,25 +3,25 @@ using System.Collections.Generic; using System.Linq; using System.Text.Json.Serialization; -using System.Threading; using Microsoft.Agents.AI.Workflows.Checkpointing; namespace Microsoft.Agents.AI.Workflows.Execution; internal sealed class FanInEdgeState { - private List _pendingMessages; + private readonly object _syncLock = new(); + public FanInEdgeState(FanInEdgeData fanInEdge) { this.SourceIds = fanInEdge.SourceIds.ToArray(); this.Unseen = [.. this.SourceIds]; - this._pendingMessages = []; + this.PendingMessages = []; } public string[] SourceIds { get; } public HashSet Unseen { get; private set; } - public List PendingMessages => this._pendingMessages; + public List PendingMessages { get; private set; } [JsonConstructor] public FanInEdgeState(string[] sourceIds, HashSet unseen, List pendingMessages) @@ -29,28 +29,34 @@ public FanInEdgeState(string[] sourceIds, HashSet unseen, List>? ProcessMessage(string sourceId, MessageEnvelope envelope) { - this.PendingMessages.Add(new(envelope)); - this.Unseen.Remove(sourceId); - - if (this.Unseen.Count == 0) + // Serialize concurrent calls from parallel executor tasks during superstep execution. + lock (this._syncLock) { - List takenMessages = Interlocked.Exchange(ref this._pendingMessages, []); - this.Unseen = [.. this.SourceIds]; + this.PendingMessages.Add(new(envelope)); + this.Unseen.Remove(sourceId); - if (takenMessages.Count == 0) + if (this.Unseen.Count == 0) { - return null; + List takenMessages = this.PendingMessages; + this.PendingMessages = []; + this.Unseen = [.. this.SourceIds]; + + if (takenMessages.Count == 0) + { + return null; + } + + return takenMessages.Select(portable => portable.ToMessageEnvelope()) + .GroupBy(keySelector: messageEnvelope => messageEnvelope.Source) + .ToList(); } - return takenMessages.Select(portable => portable.ToMessageEnvelope()) - .GroupBy(keySelector: messageEnvelope => messageEnvelope.Source); + return null; } - - return null; } } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/EdgeRunnerTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/EdgeRunnerTests.cs index 70210fac41..cc5d5a3c62 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/EdgeRunnerTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/EdgeRunnerTests.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; using FluentAssertions; @@ -199,4 +200,43 @@ async ValueTask RunIterationAsync() mapping.CheckDeliveries(["executor3"], ["part1", "part2", "final part"]); } } + + [Fact] + public async Task Test_FanInEdgeRunner_ConcurrentProcessingAsync() + { + // Arrange + const int SourceCount = 4; + const int Iterations = 50; + + string[] sourceIds = Enumerable.Range(0, SourceCount).Select(i => $"source{i}").ToArray(); + const string SinkId = "sink"; + + TestRunContext runContext = new(); + List executors = [.. sourceIds.Select(id => (Executor)new ForwardMessageExecutor(id)), new ForwardMessageExecutor(SinkId)]; + runContext.ConfigureExecutors(executors); + + FanInEdgeData edgeData = new(sourceIds.ToList(), SinkId, new EdgeId(0), null); + FanInEdgeRunner runner = new(runContext, edgeData); + + for (int iteration = 0; iteration < Iterations; iteration++) + { + // Act: send messages from all sources concurrently + using Barrier barrier = new(SourceCount); + Task[] tasks = sourceIds.Select(sourceId => Task.Run(async () => + { + barrier.SignalAndWait(); + return await runner.ChaseEdgeAsync(new($"msg-from-{sourceId}", sourceId), stepTracer: null, CancellationToken.None); + })).ToArray(); + + DeliveryMapping?[] results = await Task.WhenAll(tasks); + + // Assert: exactly one task should return a non-null mapping with all messages + DeliveryMapping?[] nonNullResults = results.Where(r => r is not null).ToArray(); + nonNullResults.Should().HaveCount(1, $"iteration {iteration}: exactly one thread should release the batch"); + + DeliveryMapping mapping = nonNullResults[0]!; + HashSet expectedMessages = [.. sourceIds.Select(id => (object)$"msg-from-{id}")]; + mapping.CheckDeliveries([SinkId], expectedMessages); + } + } } From fe583a313f704abcd7dace8fe496cd7de4738a67 Mon Sep 17 00:00:00 2001 From: Peter Ibekwe Date: Fri, 13 Mar 2026 11:18:13 -0700 Subject: [PATCH 2/4] refactored to limit the code segment under lock. --- .../Execution/FanInEdgeState.cs | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeState.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeState.cs index 6009511264..0b84bf9492 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeState.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeState.cs @@ -34,6 +34,8 @@ public FanInEdgeState(string[] sourceIds, HashSet unseen, List>? ProcessMessage(string sourceId, MessageEnvelope envelope) { + List? takenMessages = null; + // Serialize concurrent calls from parallel executor tasks during superstep execution. lock (this._syncLock) { @@ -42,21 +44,20 @@ public FanInEdgeState(string[] sourceIds, HashSet unseen, List takenMessages = this.PendingMessages; + takenMessages = this.PendingMessages; this.PendingMessages = []; this.Unseen = [.. this.SourceIds]; - - if (takenMessages.Count == 0) - { - return null; - } - - return takenMessages.Select(portable => portable.ToMessageEnvelope()) - .GroupBy(keySelector: messageEnvelope => messageEnvelope.Source) - .ToList(); } + } + if (takenMessages is null || takenMessages.Count == 0) + { return null; } + + return takenMessages + .Select(portable => portable.ToMessageEnvelope()) + .GroupBy(messageEnvelope => messageEnvelope.Source) + .ToList(); } } From d70b789480474df4d554a89a0f88eb01200d7b80 Mon Sep 17 00:00:00 2001 From: Peter Ibekwe Date: Fri, 13 Mar 2026 16:13:30 -0700 Subject: [PATCH 3/4] Remove extra materialization of the result. --- .../Microsoft.Agents.AI.Workflows/Execution/FanInEdgeState.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeState.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeState.cs index 0b84bf9492..d4bc47f9fb 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeState.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeState.cs @@ -57,7 +57,6 @@ public FanInEdgeState(string[] sourceIds, HashSet unseen, List portable.ToMessageEnvelope()) - .GroupBy(messageEnvelope => messageEnvelope.Source) - .ToList(); + .GroupBy(messageEnvelope => messageEnvelope.Source); } } From f4ac0ad570e8e3e5b43933d296f32a24f4f4dda8 Mon Sep 17 00:00:00 2001 From: Peter Ibekwe Date: Tue, 17 Mar 2026 10:19:08 -0700 Subject: [PATCH 4/4] Added comment to clarify future changes if process message is made async. --- .../Microsoft.Agents.AI.Workflows/Execution/FanInEdgeState.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeState.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeState.cs index d4bc47f9fb..c33160d159 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeState.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeState.cs @@ -37,6 +37,7 @@ public FanInEdgeState(string[] sourceIds, HashSet unseen, List? takenMessages = null; // Serialize concurrent calls from parallel executor tasks during superstep execution. + // NOTE - IMPORTANT: If this ProcessMessage method ever becomes async, replace this lock with an async friendly solution to avoid deadlocks. lock (this._syncLock) { this.PendingMessages.Add(new(envelope));