Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Linq;
using System.Threading.Tasks;
using FluentAssertions;
using Xunit;
Expand Down Expand Up @@ -56,10 +55,7 @@ public async Task Test_PublishMessage_MultipleFailures()
Func<Task> publishTask = async () => await fixture.RunPublishTestAsync(new TopicId("TestTopic"), new BasicMessage { Content = "1" });

// What we are really testing here is that a single exception does not prevent sending to the remaining agents
(await publishTask.Should().ThrowAsync<AggregateException>())
.Which.Should().Match<AggregateException>(
exception => exception.InnerExceptions.Count == 2 &&
exception.InnerExceptions.All(exception => exception is TestException));
await publishTask.Should().ThrowAsync<TestException>();

fixture.GetAgentInstances<ErrorAgent>().Values
.Should().HaveCount(2)
Expand All @@ -81,11 +77,7 @@ public async Task Test_PublishMessage_MixedSuccessFailure()
Func<Task> publicTask = async () => await fixture.RunPublishTestAsync(new TopicId("TestTopic"), new BasicMessage { Content = "1" });

// What we are really testing here is that raising exceptions does not prevent sending to the remaining agents
(await publicTask.Should().ThrowAsync<AggregateException>())
.Which.Should().Match<AggregateException>(
exception => exception.InnerExceptions.Count == 2 &&
exception.InnerExceptions.All(
exception => exception is TestException));
await publicTask.Should().ThrowAsync<TestException>();

fixture.GetAgentInstances<ReceiverAgent>().Values
.Should().HaveCount(2, "Two ReceiverAgents should have been created")
Expand Down
52 changes: 24 additions & 28 deletions dotnet/src/Agents/Runtime/InProcess/InProcessRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -352,45 +352,41 @@ private async ValueTask PublishMessageServicerAsync(MessageEnvelope envelope, Ca
throw new InvalidOperationException("Message must have a topic to be published.");
}

List<Exception> exceptions = [];
List<Task>? tasks = null;
TopicId topic = envelope.Topic.Value;
foreach (ISubscriptionDefinition subscription in this._subscriptions.Values.Where(subscription => subscription.Matches(topic)))
{
try
{
deliveryToken.ThrowIfCancellationRequested();
(tasks ??= []).Add(ProcessSubscriptionAsync(envelope, topic, subscription, deliveryToken));
}

AgentId? sender = envelope.Sender;
if (tasks is not null)
{
await Task.WhenAll(tasks).ConfigureAwait(false);
}

using CancellationTokenSource combinedSource = CancellationTokenSource.CreateLinkedTokenSource(envelope.Cancellation, deliveryToken);
MessageContext messageContext = new(envelope.MessageId, combinedSource.Token)
{
Sender = sender,
Topic = topic,
IsRpc = false
};
async Task ProcessSubscriptionAsync(MessageEnvelope envelope, TopicId topic, ISubscriptionDefinition subscription, CancellationToken deliveryToken)
{
deliveryToken.ThrowIfCancellationRequested();

AgentId agentId = subscription.MapToAgent(topic);
if (!this.DeliverToSelf && sender.HasValue && sender == agentId)
{
continue;
}
AgentId? sender = envelope.Sender;

IHostableAgent agent = await this.EnsureAgentAsync(agentId).ConfigureAwait(false);
using CancellationTokenSource combinedSource = CancellationTokenSource.CreateLinkedTokenSource(envelope.Cancellation, deliveryToken);
MessageContext messageContext = new(envelope.MessageId, combinedSource.Token)
{
Sender = sender,
Topic = topic,
IsRpc = false
};

// TODO: Cancellation propagation!
await agent.OnMessageAsync(envelope.Message, messageContext).ConfigureAwait(false);
}
catch (Exception ex) when (!ex.IsCriticalException())
AgentId agentId = subscription.MapToAgent(topic);
if (!this.DeliverToSelf && sender.HasValue && sender == agentId)
{
exceptions.Add(ex);
return;
}
}

if (exceptions.Count > 0)
{
// TODO: Unwrap TargetInvocationException?
throw new AggregateException("One or more exceptions occurred while processing the message.", exceptions);
IHostableAgent agent = await this.EnsureAgentAsync(agentId).ConfigureAwait(false);

await agent.OnMessageAsync(envelope.Message, messageContext).ConfigureAwait(false);
}
}

Expand Down
Loading