Skip to content

.NET: FanInEdge fails to collect all messages in second iteration of loop/cycle workflows #3060

@kinben

Description

@kinben

Describe the bug

When using FanInEdge in a workflow that contains a loop/cycle, the FanIn mechanism works correctly in the first iteration but fails in subsequent iterations. Specifically, in the second loop iteration, only one of the expected messages is delivered to the target executor, causing the workflow to terminate prematurely.

To Reproduce

Workflow Structure:

StartNode
    │
    ▼
Round_Gate ◄─────────────────────┐
    │                            │
    ├── FanOut ──► Bull_Round ──┐│
    │                           ││
    └── FanOut ──► Bear_Round ──┼┤
                                ││
                    FanIn ◄─────┘│
                      │          │
                      ▼          │
                 Merge_Round     │
                      │          │
                      ▼          │
               FuncAgentAction ──┘ (loop back)
                      │
                      ▼
               Final_Judgment

Edge Configuration:

StartNode --> Round_Gate (Direct)
Round_Gate --> [Bull_Round, Bear_Round] (FanOut)
[Bull_Round, Bear_Round] --> Merge_Round (FanIn)
Merge_Round --> FuncAgentAction (Direct)
FuncAgentAction --> Round_Gate (Direct, loop)
FuncAgentAction --> Final_Judgment (Direct)

Steps to reproduce:

  1. Create a workflow with the above structure
  2. Execute the workflow where the loop runs at least 2 iterations
  3. Observe that the first iteration completes successfully (Merge_Round receives 2 messages)
  4. Observe that the second iteration fails (Merge_Round receives only 1 message)

Expected behavior

In each iteration of the loop, Merge_Round should receive exactly 2 messages (one from Bull_Round and one from Bear_Round), then merge them and continue the workflow.

Actual behavior

  • First iteration (works): Merge_Round is called twice in the same superstep, receives both messages, completes successfully
  • Second iteration (fails): Merge_Round is called only once, receives only one message, workflow terminates with HasPendingMessages: False

Debug Logs

First iteration (Superstep 3-4) - SUCCESS:

--- SUPERSTEP 3 完成 ---
  激活的执行器: [Bear_Round, Bull_Round, Merge_Round]  ← Merge_Round activated
  有待处理消息:  True

= SUPERSTEP 4 开始 =
  [调用] Merge_Round    ← First call
  not completed
  [完成] Merge_Round
  [调用] Merge_Round    ← Second call (same superstep)
  completed
  [完成] Merge_Round

Second iteration (Superstep 7-8) - FAILURE:

--- SUPERSTEP 7 完成 ---
  激活的执行器: [Bear_Round, Bull_Round]  ← Merge_Round NOT activated! 
  有待处理消息: True

= SUPERSTEP 8 开始 =
  [调用] Merge_Round    ← Only ONE call! 
  not completed
  [完成] Merge_Round

--- SUPERSTEP 8 完成 ---
  有待处理消息: False  ← No more messages, workflow terminates

Root Cause Analysis

Looking at FanInEdgeState. cs:

public IEnumerable<MessageEnvelope>? ProcessMessage(string sourceId, MessageEnvelope envelope)
{
    this.PendingMessages.Add(new(envelope));
    this. Unseen.Remove(sourceId);

    if (this. Unseen.Count == 0)
    {
        List<PortableMessageEnvelope> takenMessages = Interlocked.Exchange(ref this._pendingMessages, []);
        this. Unseen = [.. this. SourceIds];  // Reset for next batch
        return takenMessages. Select(portable => portable.ToMessageEnvelope());
    }

    return null;
}

The Unseen HashSet is reset after releasing messages, but in loop scenarios, there appears to be a race condition or state corruption where:

  1. The second iteration's messages are processed
  2. But Unseen doesn't correctly track both sources
  3. Only one message is released to the target executor

Environment

  • agent-framework version: (latest from main branch)
  • Runtime: .NET 8/9
  • OS: Windows

Workaround

Replace FanInEdge with regular DirectEdge and implement manual aggregation in the target executor:

// Instead of FanInEdge
. AddEdge(bullRound, mergeRound)
.AddEdge(bearRound, mergeRound)

// Manual aggregation in Merge_Round using StatefulExecutor
public class MergeExecutor : StatefulExecutor<List<Message>, object, Result?>
{
    public override async ValueTask<Result?> HandleAsync(object message, ...)
    {
        await InvokeWithStateAsync((state, ctx, ct) =>
        {
            state.Add(Unwrap(message));
            if (state.Count >= 2)
            {
                result = Merge(state);
                state.Clear();
            }
            return ValueTask.FromResult(state);
        }, ... );
        return result;
    }
}

Additional context

This bug makes it impossible to use FanInEdge in any workflow that contains loops/cycles, which is a common pattern for iterative processing (e.g., multi-round debate, iterative refinement, retry loops).

Metadata

Metadata

Assignees

Labels

.NETv1.0Features being tracked for the version 1.0 GAworkflowsRelated to Workflows in agent-framework

Type

No type

Projects

Status

Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions