diff --git a/dotnet/src/Agents/Runtime/InProcess.Tests/PublishMessageTests.cs b/dotnet/src/Agents/Runtime/InProcess.Tests/PublishMessageTests.cs index c81a80ba1d86..c73070ff6763 100644 --- a/dotnet/src/Agents/Runtime/InProcess.Tests/PublishMessageTests.cs +++ b/dotnet/src/Agents/Runtime/InProcess.Tests/PublishMessageTests.cs @@ -1,7 +1,6 @@ // Copyright (c) Microsoft. All rights reserved. using System; -using System.Linq; using System.Threading.Tasks; using FluentAssertions; using Xunit; @@ -56,10 +55,7 @@ public async Task Test_PublishMessage_MultipleFailures() Func publishTask = async () => await fixture.RunPublishTestAsync(new TopicId("TestTopic"), new BasicMessage { Content = "1" }); // What we are really testing here is that a single exception does not prevent sending to the remaining agents - (await publishTask.Should().ThrowAsync()) - .Which.Should().Match( - exception => exception.InnerExceptions.Count == 2 && - exception.InnerExceptions.All(exception => exception is TestException)); + await publishTask.Should().ThrowAsync(); fixture.GetAgentInstances().Values .Should().HaveCount(2) @@ -81,11 +77,7 @@ public async Task Test_PublishMessage_MixedSuccessFailure() Func publicTask = async () => await fixture.RunPublishTestAsync(new TopicId("TestTopic"), new BasicMessage { Content = "1" }); // What we are really testing here is that raising exceptions does not prevent sending to the remaining agents - (await publicTask.Should().ThrowAsync()) - .Which.Should().Match( - exception => exception.InnerExceptions.Count == 2 && - exception.InnerExceptions.All( - exception => exception is TestException)); + await publicTask.Should().ThrowAsync(); fixture.GetAgentInstances().Values .Should().HaveCount(2, "Two ReceiverAgents should have been created") diff --git a/dotnet/src/Agents/Runtime/InProcess/InProcessRuntime.cs b/dotnet/src/Agents/Runtime/InProcess/InProcessRuntime.cs index 93e3ec89144b..2ab31ab3f9ab 100644 --- a/dotnet/src/Agents/Runtime/InProcess/InProcessRuntime.cs +++ b/dotnet/src/Agents/Runtime/InProcess/InProcessRuntime.cs @@ -352,45 +352,41 @@ private async ValueTask PublishMessageServicerAsync(MessageEnvelope envelope, Ca throw new InvalidOperationException("Message must have a topic to be published."); } - List exceptions = []; + List? tasks = null; TopicId topic = envelope.Topic.Value; foreach (ISubscriptionDefinition subscription in this._subscriptions.Values.Where(subscription => subscription.Matches(topic))) { - try - { - deliveryToken.ThrowIfCancellationRequested(); + (tasks ??= []).Add(ProcessSubscriptionAsync(envelope, topic, subscription, deliveryToken)); + } - AgentId? sender = envelope.Sender; + if (tasks is not null) + { + await Task.WhenAll(tasks).ConfigureAwait(false); + } - using CancellationTokenSource combinedSource = CancellationTokenSource.CreateLinkedTokenSource(envelope.Cancellation, deliveryToken); - MessageContext messageContext = new(envelope.MessageId, combinedSource.Token) - { - Sender = sender, - Topic = topic, - IsRpc = false - }; + async Task ProcessSubscriptionAsync(MessageEnvelope envelope, TopicId topic, ISubscriptionDefinition subscription, CancellationToken deliveryToken) + { + deliveryToken.ThrowIfCancellationRequested(); - AgentId agentId = subscription.MapToAgent(topic); - if (!this.DeliverToSelf && sender.HasValue && sender == agentId) - { - continue; - } + AgentId? sender = envelope.Sender; - IHostableAgent agent = await this.EnsureAgentAsync(agentId).ConfigureAwait(false); + using CancellationTokenSource combinedSource = CancellationTokenSource.CreateLinkedTokenSource(envelope.Cancellation, deliveryToken); + MessageContext messageContext = new(envelope.MessageId, combinedSource.Token) + { + Sender = sender, + Topic = topic, + IsRpc = false + }; - // TODO: Cancellation propagation! - await agent.OnMessageAsync(envelope.Message, messageContext).ConfigureAwait(false); - } - catch (Exception ex) when (!ex.IsCriticalException()) + AgentId agentId = subscription.MapToAgent(topic); + if (!this.DeliverToSelf && sender.HasValue && sender == agentId) { - exceptions.Add(ex); + return; } - } - if (exceptions.Count > 0) - { - // TODO: Unwrap TargetInvocationException? - throw new AggregateException("One or more exceptions occurred while processing the message.", exceptions); + IHostableAgent agent = await this.EnsureAgentAsync(agentId).ConfigureAwait(false); + + await agent.OnMessageAsync(envelope.Message, messageContext).ConfigureAwait(false); } }