diff --git a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs index b11f450a..d98ac125 100644 --- a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs +++ b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs @@ -1,7 +1,11 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Linq; using Azure.Core; +using Grpc.Net.Client; using Microsoft.DurableTask.Client.Grpc; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; @@ -99,11 +103,23 @@ static void ConfigureSchedulerOptions( /// /// Configuration class that sets up gRPC channels for client options /// using the provided Durable Task Scheduler options. + /// Channels are cached per configuration key and disposed when the service provider is disposed. /// - /// Monitor for accessing the current scheduler options configuration. - class ConfigureGrpcChannel(IOptionsMonitor schedulerOptions) : - IConfigureNamedOptions + sealed class ConfigureGrpcChannel : IConfigureNamedOptions, IAsyncDisposable { + readonly IOptionsMonitor schedulerOptions; + readonly ConcurrentDictionary> channels = new(); + volatile int disposed; + + /// + /// Initializes a new instance of the class. + /// + /// Monitor for accessing the current scheduler options configuration. + public ConfigureGrpcChannel(IOptionsMonitor schedulerOptions) + { + this.schedulerOptions = schedulerOptions; + } + /// /// Configures the default named options instance. /// @@ -117,8 +133,79 @@ class ConfigureGrpcChannel(IOptionsMonitor sc /// The options instance to configure. public void Configure(string? name, GrpcDurableTaskClientOptions options) { - DurableTaskSchedulerClientOptions source = schedulerOptions.Get(name ?? Options.DefaultName); - options.Channel = source.CreateChannel(); +#if NET7_0_OR_GREATER + ObjectDisposedException.ThrowIf(this.disposed == 1, this); +#else + if (this.disposed == 1) + { + throw new ObjectDisposedException(nameof(ConfigureGrpcChannel)); + } +#endif + + string optionsName = name ?? Options.DefaultName; + DurableTaskSchedulerClientOptions source = this.schedulerOptions.Get(optionsName); + + // Create a cache key that includes all properties that affect CreateChannel behavior. + // This ensures channels are reused for the same configuration + // but separate channels are created when any relevant property changes. + // Use a delimiter character (\u001F) that will not appear in typical endpoint URIs. + string credentialType = source.Credential?.GetType().FullName ?? "null"; + string retryOptionsKey = source.RetryOptions != null + ? $"{source.RetryOptions.MaxRetries}|{source.RetryOptions.InitialBackoffMs}|{source.RetryOptions.MaxBackoffMs}|{source.RetryOptions.BackoffMultiplier}|{(source.RetryOptions.RetryableStatusCodes != null ? string.Join(",", source.RetryOptions.RetryableStatusCodes) : string.Empty)}" + : "null"; + string cacheKey = $"{optionsName}\u001F{source.EndpointAddress}\u001F{source.TaskHubName}\u001F{source.ResourceId}\u001F{credentialType}\u001F{source.AllowInsecureCredentials}\u001F{retryOptionsKey}"; + options.Channel = this.channels.GetOrAdd( + cacheKey, + _ => new Lazy(source.CreateChannel)).Value; + } + + /// + public async ValueTask DisposeAsync() + { + if (Interlocked.Exchange(ref this.disposed, 1) == 1) + { + return; + } + + foreach (Lazy channel in this.channels.Values.Where(lazy => lazy.IsValueCreated)) + { + try + { + await DisposeChannelAsync(channel.Value).ConfigureAwait(false); + } + catch (Exception ex) when (ex is not OutOfMemoryException + and not StackOverflowException + and not ThreadAbortException) + { + // Swallow disposal exceptions - disposal should be best-effort to ensure + // all channels get a chance to dispose and app shutdown is not blocked. + if (ex is not OperationCanceledException and not ObjectDisposedException) + { + Trace.TraceError( + "Unexpected exception while disposing gRPC channel in DurableTaskSchedulerClientExtensions.DisposeAsync: {0}", + ex); + } + } + } + + this.channels.Clear(); + GC.SuppressFinalize(this); + } + + static async Task DisposeChannelAsync(GrpcChannel channel) + { + // ShutdownAsync is the graceful way to close a gRPC channel. + using (channel) + { + try + { + await channel.ShutdownAsync().ConfigureAwait(false); + } + catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException) + { + // Ignore expected shutdown/disposal errors + } + } } } } diff --git a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs index 7a3baa41..0164d1cc 100644 --- a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs +++ b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs @@ -1,7 +1,11 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Linq; using Azure.Core; +using Grpc.Net.Client; using Microsoft.DurableTask.Worker.Grpc; using Microsoft.DurableTask.Worker.Grpc.Internal; using Microsoft.Extensions.DependencyInjection; @@ -101,11 +105,23 @@ static void ConfigureSchedulerOptions( /// /// Configuration class that sets up gRPC channels for worker options /// using the provided Durable Task Scheduler options. + /// Channels are cached per configuration key and disposed when the service provider is disposed. /// - /// Monitor for accessing the current scheduler options configuration. - class ConfigureGrpcChannel(IOptionsMonitor schedulerOptions) : - IConfigureNamedOptions + sealed class ConfigureGrpcChannel : IConfigureNamedOptions, IAsyncDisposable { + readonly IOptionsMonitor schedulerOptions; + readonly ConcurrentDictionary> channels = new(); + volatile int disposed; + + /// + /// Initializes a new instance of the class. + /// + /// Monitor for accessing the current scheduler options configuration. + public ConfigureGrpcChannel(IOptionsMonitor schedulerOptions) + { + this.schedulerOptions = schedulerOptions; + } + /// /// Configures the default named options instance. /// @@ -119,9 +135,77 @@ class ConfigureGrpcChannel(IOptionsMonitor sc /// The options instance to configure. public void Configure(string? name, GrpcDurableTaskWorkerOptions options) { - DurableTaskSchedulerWorkerOptions source = schedulerOptions.Get(name ?? Options.DefaultName); - options.Channel = source.CreateChannel(); +#if NET7_0_OR_GREATER + ObjectDisposedException.ThrowIf(this.disposed == 1, this); +#else + if (this.disposed == 1) + { + throw new ObjectDisposedException(nameof(ConfigureGrpcChannel)); + } +#endif + + string optionsName = name ?? Options.DefaultName; + DurableTaskSchedulerWorkerOptions source = this.schedulerOptions.Get(optionsName); + + // Create a cache key that includes all properties that affect CreateChannel behavior. + // This ensures channels are reused for the same configuration + // but separate channels are created when any relevant property changes. + // Use a delimiter character (\u001F) that will not appear in typical endpoint URIs. + string credentialType = source.Credential?.GetType().FullName ?? "null"; + string cacheKey = $"{optionsName}\u001F{source.EndpointAddress}\u001F{source.TaskHubName}\u001F{source.ResourceId}\u001F{credentialType}\u001F{source.AllowInsecureCredentials}\u001F{source.WorkerId}"; + options.Channel = this.channels.GetOrAdd( + cacheKey, + _ => new Lazy(source.CreateChannel)).Value; options.ConfigureForAzureManaged(); } + + /// + public async ValueTask DisposeAsync() + { + if (Interlocked.Exchange(ref this.disposed, 1) == 1) + { + return; + } + + foreach (Lazy channel in this.channels.Values.Where(lazy => lazy.IsValueCreated)) + { + try + { + await DisposeChannelAsync(channel.Value).ConfigureAwait(false); + } + catch (Exception ex) when (ex is not OutOfMemoryException + and not StackOverflowException + and not ThreadAbortException) + { + // Swallow disposal exceptions - disposal should be best-effort to ensure + // all channels get a chance to dispose and app shutdown is not blocked. + if (ex is not OperationCanceledException and not ObjectDisposedException) + { + Trace.TraceError( + "Unexpected exception while disposing gRPC channel in DurableTaskSchedulerWorkerExtensions.DisposeAsync: {0}", + ex); + } + } + } + + this.channels.Clear(); + GC.SuppressFinalize(this); + } + + static async Task DisposeChannelAsync(GrpcChannel channel) + { + // ShutdownAsync is the graceful way to close a gRPC channel. + using (channel) + { + try + { + await channel.ShutdownAsync().ConfigureAwait(false); + } + catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException) + { + // Ignore expected shutdown/disposal errors + } + } + } } } diff --git a/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs b/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs index 3c760ddf..77c6df66 100644 --- a/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs +++ b/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs @@ -5,6 +5,7 @@ using Azure.Identity; using FluentAssertions; using Grpc.Core; +using Grpc.Net.Client; using Microsoft.DurableTask.Client.Grpc; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; @@ -280,4 +281,259 @@ public void UseDurableTaskScheduler_WithConnectionStringAndRetryOptions_ShouldCo clientOptions.RetryOptions.RetryableStatusCodes.Should().Contain(StatusCode.Unknown); } } + + [Fact] + public async Task UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + await using ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options multiple times to trigger channel configuration + IOptionsFactory optionsFactory = provider.GetRequiredService>(); + GrpcDurableTaskClientOptions options1 = optionsFactory.Create(Options.DefaultName); + GrpcDurableTaskClientOptions options2 = optionsFactory.Create(Options.DefaultName); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().BeSameAs(options2.Channel, "same configuration should reuse the same channel"); + } + + [Fact] + public async Task UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("client1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("client2"); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act - configure two different named clients with the SAME endpoint and task hub + // This validates that the options name is included in the cache key + mockBuilder1.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + await using ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named clients + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskClientOptions options1 = optionsMonitor.Get("client1"); + GrpcDurableTaskClientOptions options2 = optionsMonitor.Get("client2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different named options should use different channels even with same configuration"); + } + + [Fact] + public async Task UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + GrpcChannel channel; + await using (ServiceProvider provider = services.BuildServiceProvider()) + { + // Resolve options to trigger channel creation + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskClientOptions options = optionsMonitor.Get(Options.DefaultName); + options.Channel.Should().NotBeNull(); + channel = options.Channel!; + } + + // Assert - verify the channel was disposed by checking it throws ObjectDisposedException + Action action = () => channel.CreateCallInvoker(); + action.Should().Throw("channel should be disposed after provider disposal"); + + // Also verify that creating a new provider and getting options still works + ServiceCollection services2 = new ServiceCollection(); + Mock mockBuilder2 = new Mock(); + mockBuilder2.Setup(b => b.Services).Returns(services2); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + await using ServiceProvider provider2 = services2.BuildServiceProvider(); + + IOptionsMonitor newOptionsMonitor = provider2.GetRequiredService>(); + GrpcDurableTaskClientOptions newOptions = newOptionsMonitor.Get(Options.DefaultName); + newOptions.Channel.Should().NotBeNull(); + newOptions.Channel.Should().NotBeSameAs(channel, "new provider should create a new channel"); + } + + [Fact] + public async Task UseDurableTaskScheduler_ConfigureAfterDispose_ThrowsObjectDisposedException() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options monitor before disposal + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + + // Dispose the service provider + await provider.DisposeAsync(); + + // Assert - attempting to get options after disposal should throw + Action action = () => optionsMonitor.Get(Options.DefaultName); + action.Should().Throw("configuring options after disposal should throw"); + } + + [Fact] + public async Task UseDurableTaskScheduler_DifferentResourceId_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("client1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("client2"); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act - configure two clients with the same endpoint/taskhub but different ResourceId + mockBuilder1.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.ResourceId = "https://durabletask.io"; + }); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.ResourceId = "https://custom.durabletask.io"; + }); + await using ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named clients + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskClientOptions options1 = optionsMonitor.Get("client1"); + GrpcDurableTaskClientOptions options2 = optionsMonitor.Get("client2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different ResourceId should use different channels"); + } + + [Fact] + public async Task UseDurableTaskScheduler_DifferentCredentialType_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("client1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("client2"); + + // Act - configure two clients with the same endpoint/taskhub but different credential types + mockBuilder1.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, new DefaultAzureCredential()); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, new AzureCliCredential()); + await using ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named clients + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskClientOptions options1 = optionsMonitor.Get("client1"); + GrpcDurableTaskClientOptions options2 = optionsMonitor.Get("client2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different credential type should use different channels"); + } + + [Fact] + public async Task UseDurableTaskScheduler_DifferentAllowInsecureCredentials_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("client1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("client2"); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act - configure two clients with the same endpoint/taskhub but different AllowInsecureCredentials + mockBuilder1.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.AllowInsecureCredentials = false; + }); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.AllowInsecureCredentials = true; + }); + await using ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named clients + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskClientOptions options1 = optionsMonitor.Get("client1"); + GrpcDurableTaskClientOptions options2 = optionsMonitor.Get("client2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different AllowInsecureCredentials should use different channels"); + } + + [Fact] + public async Task UseDurableTaskScheduler_DifferentRetryOptions_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("client1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("client2"); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act - configure two clients with the same endpoint/taskhub but different RetryOptions + mockBuilder1.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.RetryOptions = new DurableTaskSchedulerClientOptions.ClientRetryOptions + { + MaxRetries = 3 + }; + }); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.RetryOptions = new DurableTaskSchedulerClientOptions.ClientRetryOptions + { + MaxRetries = 5 + }; + }); + await using ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named clients + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskClientOptions options1 = optionsMonitor.Get("client1"); + GrpcDurableTaskClientOptions options2 = optionsMonitor.Get("client2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different RetryOptions should use different channels"); + } } diff --git a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs index a661b53d..6e08347e 100644 --- a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs +++ b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs @@ -4,6 +4,7 @@ using Azure.Core; using Azure.Identity; using FluentAssertions; +using Grpc.Net.Client; using Microsoft.DurableTask.Worker.Grpc; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; @@ -18,7 +19,7 @@ public class DurableTaskSchedulerWorkerExtensionsTests const string ValidTaskHub = "testhub"; [Fact] - public void UseDurableTaskScheduler_WithEndpointAndCredential_ShouldConfigureCorrectly() + public async Task UseDurableTaskScheduler_WithEndpointAndCredential_ShouldConfigureCorrectly() { // Arrange ServiceCollection services = new ServiceCollection(); @@ -30,7 +31,7 @@ public void UseDurableTaskScheduler_WithEndpointAndCredential_ShouldConfigureCor mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); // Assert - ServiceProvider provider = services.BuildServiceProvider(); + await using ServiceProvider provider = services.BuildServiceProvider(); IOptions? options = provider.GetService>(); options.Should().NotBeNull(); @@ -44,7 +45,7 @@ public void UseDurableTaskScheduler_WithEndpointAndCredential_ShouldConfigureCor } [Fact] - public void UseDurableTaskScheduler_WithConnectionString_ShouldConfigureCorrectly() + public async Task UseDurableTaskScheduler_WithConnectionString_ShouldConfigureCorrectly() { // Arrange ServiceCollection services = new ServiceCollection(); @@ -56,7 +57,7 @@ public void UseDurableTaskScheduler_WithConnectionString_ShouldConfigureCorrectl mockBuilder.Object.UseDurableTaskScheduler(connectionString); // Assert - ServiceProvider provider = services.BuildServiceProvider(); + await using ServiceProvider provider = services.BuildServiceProvider(); IOptions? options = provider.GetService>(); options.Should().NotBeNull(); @@ -70,7 +71,7 @@ public void UseDurableTaskScheduler_WithConnectionString_ShouldConfigureCorrectl } [Fact] - public void UseDurableTaskScheduler_WithLocalhostConnectionString_ShouldConfigureCorrectly() + public async Task UseDurableTaskScheduler_WithLocalhostConnectionString_ShouldConfigureCorrectly() { // Arrange ServiceCollection services = new(); @@ -82,7 +83,7 @@ public void UseDurableTaskScheduler_WithLocalhostConnectionString_ShouldConfigur mockBuilder.Object.UseDurableTaskScheduler(connectionString); // Assert - ServiceProvider provider = services.BuildServiceProvider(); + await using ServiceProvider provider = services.BuildServiceProvider(); IOptions? options = provider.GetService>(); options.Should().NotBeNull(); @@ -98,7 +99,7 @@ public void UseDurableTaskScheduler_WithLocalhostConnectionString_ShouldConfigur [Theory] [InlineData(null, "testhub")] [InlineData("myaccount.westus3.durabletask.io", null)] - public void UseDurableTaskScheduler_WithNullParameters_ShouldThrowOptionsValidationException(string? endpoint, string? taskHub) + public async Task UseDurableTaskScheduler_WithNullParameters_ShouldThrowOptionsValidationException(string? endpoint, string? taskHub) { // Arrange ServiceCollection services = new ServiceCollection(); @@ -108,7 +109,7 @@ public void UseDurableTaskScheduler_WithNullParameters_ShouldThrowOptionsValidat // Act mockBuilder.Object.UseDurableTaskScheduler(endpoint!, taskHub!, credential); - ServiceProvider provider = services.BuildServiceProvider(); + await using ServiceProvider provider = services.BuildServiceProvider(); // Assert var action = () => provider.GetRequiredService>().Value; @@ -119,7 +120,7 @@ public void UseDurableTaskScheduler_WithNullParameters_ShouldThrowOptionsValidat } [Fact] - public void UseDurableTaskScheduler_WithNullCredential_ShouldSucceed() + public async Task UseDurableTaskScheduler_WithNullCredential_ShouldSucceed() { // Arrange ServiceCollection services = new ServiceCollection(); @@ -132,7 +133,7 @@ public void UseDurableTaskScheduler_WithNullCredential_ShouldSucceed() action.Should().NotThrow(); // Validate the configured options - ServiceProvider provider = services.BuildServiceProvider(); + await using ServiceProvider provider = services.BuildServiceProvider(); var workerOptions = provider.GetRequiredService>().Value; workerOptions.EndpointAddress.Should().Be(ValidEndpoint); workerOptions.TaskHubName.Should().Be(ValidTaskHub); @@ -174,7 +175,7 @@ public void UseDurableTaskScheduler_WithNullOrEmptyConnectionString_ShouldThrowA } [Fact] - public void UseDurableTaskScheduler_WithNamedOptions_ShouldConfigureCorrectly() + public async Task UseDurableTaskScheduler_WithNamedOptions_ShouldConfigureCorrectly() { // Arrange ServiceCollection services = new ServiceCollection(); @@ -187,7 +188,7 @@ public void UseDurableTaskScheduler_WithNamedOptions_ShouldConfigureCorrectly() mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); // Assert - ServiceProvider provider = services.BuildServiceProvider(); + await using ServiceProvider provider = services.BuildServiceProvider(); IOptionsMonitor? optionsMonitor = provider.GetService>(); optionsMonitor.Should().NotBeNull(); DurableTaskSchedulerWorkerOptions options = optionsMonitor!.Get("CustomName"); @@ -198,4 +199,254 @@ public void UseDurableTaskScheduler_WithNamedOptions_ShouldConfigureCorrectly() options.ResourceId.Should().Be("https://durabletask.io"); options.AllowInsecureCredentials.Should().BeFalse(); } + + [Fact] + public async Task UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + await using ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options multiple times to trigger channel configuration via new options instances + IOptionsFactory optionsFactory = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options1 = optionsFactory.Create(Options.DefaultName); + GrpcDurableTaskWorkerOptions options2 = optionsFactory.Create(Options.DefaultName); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().BeSameAs(options2.Channel, "same configuration should reuse the same channel"); + } + + [Fact] + public async Task UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("worker1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("worker2"); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act - configure two different named workers with the same endpoint and task hub + mockBuilder1.Object.UseDurableTaskScheduler("endpoint.westus3.durabletask.io", ValidTaskHub, credential); + mockBuilder2.Object.UseDurableTaskScheduler("endpoint.westus3.durabletask.io", ValidTaskHub, credential); + await using ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named workers + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options1 = optionsMonitor.Get("worker1"); + GrpcDurableTaskWorkerOptions options2 = optionsMonitor.Get("worker2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different configurations should use different channels"); + } + + [Fact] + public async Task UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + + GrpcChannel channel; + await using (ServiceProvider provider = services.BuildServiceProvider()) + { + // Resolve options to trigger channel creation + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options = optionsMonitor.Get(Options.DefaultName); + options.Channel.Should().NotBeNull(); + channel = options.Channel!; + } + + // Assert - verify the channel was disposed by checking it throws ObjectDisposedException + Action action = () => channel.CreateCallInvoker(); + action.Should().Throw("channel should be disposed after provider disposal"); + + // Also verify that creating a new provider and getting options still works + ServiceCollection services2 = new ServiceCollection(); + Mock mockBuilder2 = new Mock(); + mockBuilder2.Setup(b => b.Services).Returns(services2); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + await using ServiceProvider provider2 = services2.BuildServiceProvider(); + + IOptionsMonitor newOptionsMonitor = provider2.GetRequiredService>(); + GrpcDurableTaskWorkerOptions newOptions = newOptionsMonitor.Get(Options.DefaultName); + newOptions.Channel.Should().NotBeNull(); + newOptions.Channel.Should().NotBeSameAs(channel, "new provider should create a new channel"); + } + + [Fact] + public async Task UseDurableTaskScheduler_ConfigureAfterDispose_ThrowsObjectDisposedException() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + + IOptionsMonitor optionsMonitor; + await using (ServiceProvider provider = services.BuildServiceProvider()) + { + // Resolve options monitor before disposal + optionsMonitor = provider.GetRequiredService>(); + } + + // Assert - attempting to get options after disposal should throw + Action action = () => optionsMonitor.Get(Options.DefaultName); + action.Should().Throw("configuring options after disposal should throw"); + } + + [Fact] + public async Task UseDurableTaskScheduler_DifferentResourceId_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("worker1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("worker2"); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act - configure two workers with the same endpoint/taskhub but different ResourceId + mockBuilder1.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.ResourceId = "https://durabletask.io"; + }); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.ResourceId = "https://custom.durabletask.io"; + }); + await using ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named workers + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options1 = optionsMonitor.Get("worker1"); + GrpcDurableTaskWorkerOptions options2 = optionsMonitor.Get("worker2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different ResourceId should use different channels"); + } + + [Fact] + public async Task UseDurableTaskScheduler_DifferentCredentialType_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("worker1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("worker2"); + + // Act - configure two workers with the same endpoint/taskhub but different credential types + mockBuilder1.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, new DefaultAzureCredential()); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, new AzureCliCredential()); + await using ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named workers + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options1 = optionsMonitor.Get("worker1"); + GrpcDurableTaskWorkerOptions options2 = optionsMonitor.Get("worker2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different credential type should use different channels"); + } + + [Fact] + public async Task UseDurableTaskScheduler_DifferentAllowInsecureCredentials_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("worker1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("worker2"); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act - configure two workers with the same endpoint/taskhub but different AllowInsecureCredentials + mockBuilder1.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.AllowInsecureCredentials = false; + }); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.AllowInsecureCredentials = true; + }); + await using ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named workers + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options1 = optionsMonitor.Get("worker1"); + GrpcDurableTaskWorkerOptions options2 = optionsMonitor.Get("worker2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different AllowInsecureCredentials should use different channels"); + } + + [Fact] + public async Task UseDurableTaskScheduler_DifferentWorkerId_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("worker1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("worker2"); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act - configure two workers with the same endpoint/taskhub but different WorkerId + mockBuilder1.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.WorkerId = "worker-id-1"; + }); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.WorkerId = "worker-id-2"; + }); + await using ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named workers + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options1 = optionsMonitor.Get("worker1"); + GrpcDurableTaskWorkerOptions options2 = optionsMonitor.Get("worker2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different WorkerId should use different channels"); + } } +