Skip to content

.NET: Fan-in issue #1863

@donghaiting

Description

@donghaiting

I followed the tutorial: Create a Simple Concurrent Workflow to learn about concurrent workflows, but the aggregationExecutor is never executed. My code is as follows:
Program.cs:
`using AgentFramworkLearning;
using Azure;
using Azure.AI.OpenAI;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;

var token = Environment.GetEnvironmentVariable("GITHUB_TOKEN");
var chatClient = new AzureOpenAIClient(new Uri("https://models.inference.ai.azure.com"), new AzureKeyCredential(token)).GetChatClient("gpt-4o-mini").AsIChatClient();

ChatClientAgent physicist = new(
chatClient,
name: "Physicist",
instructions: "You are an expert in physics. You answer questions from a physics perspective."
);
ChatClientAgent chemist = new(
chatClient,
name: "Chemist",
instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
);
var startExecutor = new ConcurrentStartExecutor();
var aggregationExecutor = new ConcurrentAggregationExecutor();

// Build the workflow by adding executors and connecting them
var workflow = new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, targets: [physicist, chemist])
.AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
.WithOutputFrom(aggregationExecutor)
.Build();

// Execute the workflow in streaming mode
StreamingRun run = await InProcessExecution.StreamAsync(workflow, input: "What is temperature?");
//await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
Console.WriteLine($"[Event] {evt.GetType().Name}: {evt}");
if (evt is WorkflowOutputEvent output)
{
Console.WriteLine($"Workflow completed with results:\n{output.Data}");
}
}

Console.ReadLine();`

ConcurrentStartExecutor.cs:
internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor") { /// <summary> /// Starts the concurrent processing by sending messages to the agents. /// </summary> /// <param name="message">The user message to process</param> /// <param name="context">Workflow context for accessing workflow services and adding events</param> /// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. /// The default is <see cref="CancellationToken.None"/>.</param> /// <returns>A task representing the asynchronous operation</returns> public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default) { // Broadcast the message to all connected agents. Receiving agents will queue // the message but will not start processing until they receive a turn token. await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken: cancellationToken); // Broadcast the turn token to kick off the agents. await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken: cancellationToken); } }

ConcurrentAggregationExecutor.cs:
`internal sealed class ConcurrentAggregationExecutor() : Executor("ConcurrentAggregationExecutor")
{
private readonly List _messages = [];

/// <summary>
/// Handles incoming messages from the agents and aggregates their responses.
/// </summary>
/// <param name="message">The message from the agent</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
    this._messages.Add(message);

    if (this._messages.Count == 2)
    {
        var formattedMessages = string.Join(Environment.NewLine, this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
        await context.YieldOutputAsync(formattedMessages, cancellationToken);
    }
}

}`

Metadata

Metadata

Assignees

Labels

.NETworkflowsRelated to Workflows in agent-framework

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions