Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 92 additions & 5 deletions src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using Azure.Core;
using Grpc.Net.Client;
using Microsoft.DurableTask.Client.Grpc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
Expand Down Expand Up @@ -99,11 +103,23 @@ static void ConfigureSchedulerOptions(
/// <summary>
/// Configuration class that sets up gRPC channels for client options
/// using the provided Durable Task Scheduler options.
/// Channels are cached per configuration key and disposed when the service provider is disposed.
/// </summary>
/// <param name="schedulerOptions">Monitor for accessing the current scheduler options configuration.</param>
class ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerClientOptions> schedulerOptions) :
IConfigureNamedOptions<GrpcDurableTaskClientOptions>
sealed class ConfigureGrpcChannel : IConfigureNamedOptions<GrpcDurableTaskClientOptions>, IAsyncDisposable
{
readonly IOptionsMonitor<DurableTaskSchedulerClientOptions> schedulerOptions;
readonly ConcurrentDictionary<string, Lazy<GrpcChannel>> channels = new();
volatile int disposed;

/// <summary>
/// Initializes a new instance of the <see cref="ConfigureGrpcChannel"/> class.
/// </summary>
/// <param name="schedulerOptions">Monitor for accessing the current scheduler options configuration.</param>
public ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerClientOptions> schedulerOptions)
{
this.schedulerOptions = schedulerOptions;
}

/// <summary>
/// Configures the default named options instance.
/// </summary>
Expand All @@ -117,8 +133,79 @@ class ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerClientOptions> sc
/// <param name="options">The options instance to configure.</param>
public void Configure(string? name, GrpcDurableTaskClientOptions options)
{
DurableTaskSchedulerClientOptions source = schedulerOptions.Get(name ?? Options.DefaultName);
options.Channel = source.CreateChannel();
#if NET7_0_OR_GREATER
ObjectDisposedException.ThrowIf(this.disposed == 1, this);
#else
if (this.disposed == 1)
{
throw new ObjectDisposedException(nameof(ConfigureGrpcChannel));
}
#endif

string optionsName = name ?? Options.DefaultName;
DurableTaskSchedulerClientOptions source = this.schedulerOptions.Get(optionsName);

// Create a cache key that includes all properties that affect CreateChannel behavior.
// This ensures channels are reused for the same configuration
// but separate channels are created when any relevant property changes.
// Use a delimiter character (\u001F) that will not appear in typical endpoint URIs.
string credentialType = source.Credential?.GetType().FullName ?? "null";
string retryOptionsKey = source.RetryOptions != null
? $"{source.RetryOptions.MaxRetries}|{source.RetryOptions.InitialBackoffMs}|{source.RetryOptions.MaxBackoffMs}|{source.RetryOptions.BackoffMultiplier}|{(source.RetryOptions.RetryableStatusCodes != null ? string.Join(",", source.RetryOptions.RetryableStatusCodes) : string.Empty)}"
: "null";
string cacheKey = $"{optionsName}\u001F{source.EndpointAddress}\u001F{source.TaskHubName}\u001F{source.ResourceId}\u001F{credentialType}\u001F{source.AllowInsecureCredentials}\u001F{retryOptionsKey}";
options.Channel = this.channels.GetOrAdd(
cacheKey,
_ => new Lazy<GrpcChannel>(source.CreateChannel)).Value;
}

/// <inheritdoc/>
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref this.disposed, 1) == 1)
{
return;
}

foreach (Lazy<GrpcChannel> channel in this.channels.Values.Where(lazy => lazy.IsValueCreated))
{
try
{
await DisposeChannelAsync(channel.Value).ConfigureAwait(false);
}
catch (Exception ex) when (ex is not OutOfMemoryException
and not StackOverflowException
and not ThreadAbortException)
{
// Swallow disposal exceptions - disposal should be best-effort to ensure
// all channels get a chance to dispose and app shutdown is not blocked.
if (ex is not OperationCanceledException and not ObjectDisposedException)
{
Trace.TraceError(
"Unexpected exception while disposing gRPC channel in DurableTaskSchedulerClientExtensions.DisposeAsync: {0}",
ex);
}
}
}

this.channels.Clear();
GC.SuppressFinalize(this);
}

static async Task DisposeChannelAsync(GrpcChannel channel)
{
// ShutdownAsync is the graceful way to close a gRPC channel.
using (channel)
{
try
{
await channel.ShutdownAsync().ConfigureAwait(false);
}
catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException)
{
// Ignore expected shutdown/disposal errors
}
}
}
}
}
94 changes: 89 additions & 5 deletions src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using Azure.Core;
using Grpc.Net.Client;
using Microsoft.DurableTask.Worker.Grpc;
using Microsoft.DurableTask.Worker.Grpc.Internal;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -101,11 +105,23 @@ static void ConfigureSchedulerOptions(
/// <summary>
/// Configuration class that sets up gRPC channels for worker options
/// using the provided Durable Task Scheduler options.
/// Channels are cached per configuration key and disposed when the service provider is disposed.
/// </summary>
/// <param name="schedulerOptions">Monitor for accessing the current scheduler options configuration.</param>
class ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerWorkerOptions> schedulerOptions) :
IConfigureNamedOptions<GrpcDurableTaskWorkerOptions>
sealed class ConfigureGrpcChannel : IConfigureNamedOptions<GrpcDurableTaskWorkerOptions>, IAsyncDisposable
{
readonly IOptionsMonitor<DurableTaskSchedulerWorkerOptions> schedulerOptions;
readonly ConcurrentDictionary<string, Lazy<GrpcChannel>> channels = new();
volatile int disposed;

/// <summary>
/// Initializes a new instance of the <see cref="ConfigureGrpcChannel"/> class.
/// </summary>
/// <param name="schedulerOptions">Monitor for accessing the current scheduler options configuration.</param>
public ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerWorkerOptions> schedulerOptions)
{
this.schedulerOptions = schedulerOptions;
}

/// <summary>
/// Configures the default named options instance.
/// </summary>
Expand All @@ -119,9 +135,77 @@ class ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerWorkerOptions> sc
/// <param name="options">The options instance to configure.</param>
public void Configure(string? name, GrpcDurableTaskWorkerOptions options)
{
DurableTaskSchedulerWorkerOptions source = schedulerOptions.Get(name ?? Options.DefaultName);
options.Channel = source.CreateChannel();
#if NET7_0_OR_GREATER
ObjectDisposedException.ThrowIf(this.disposed == 1, this);
#else
if (this.disposed == 1)
{
throw new ObjectDisposedException(nameof(ConfigureGrpcChannel));
}
#endif

string optionsName = name ?? Options.DefaultName;
DurableTaskSchedulerWorkerOptions source = this.schedulerOptions.Get(optionsName);

// Create a cache key that includes all properties that affect CreateChannel behavior.
// This ensures channels are reused for the same configuration
// but separate channels are created when any relevant property changes.
// Use a delimiter character (\u001F) that will not appear in typical endpoint URIs.
string credentialType = source.Credential?.GetType().FullName ?? "null";
string cacheKey = $"{optionsName}\u001F{source.EndpointAddress}\u001F{source.TaskHubName}\u001F{source.ResourceId}\u001F{credentialType}\u001F{source.AllowInsecureCredentials}\u001F{source.WorkerId}";
options.Channel = this.channels.GetOrAdd(
cacheKey,
_ => new Lazy<GrpcChannel>(source.CreateChannel)).Value;
options.ConfigureForAzureManaged();
}

/// <inheritdoc/>
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref this.disposed, 1) == 1)
{
return;
}

foreach (Lazy<GrpcChannel> channel in this.channels.Values.Where(lazy => lazy.IsValueCreated))
{
try
{
await DisposeChannelAsync(channel.Value).ConfigureAwait(false);
}
catch (Exception ex) when (ex is not OutOfMemoryException
and not StackOverflowException
and not ThreadAbortException)
{
// Swallow disposal exceptions - disposal should be best-effort to ensure
// all channels get a chance to dispose and app shutdown is not blocked.
if (ex is not OperationCanceledException and not ObjectDisposedException)
{
Trace.TraceError(
"Unexpected exception while disposing gRPC channel in DurableTaskSchedulerWorkerExtensions.DisposeAsync: {0}",
ex);
}
}
}

this.channels.Clear();
GC.SuppressFinalize(this);
}

static async Task DisposeChannelAsync(GrpcChannel channel)
{
// ShutdownAsync is the graceful way to close a gRPC channel.
using (channel)
{
try
{
await channel.ShutdownAsync().ConfigureAwait(false);
}
catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException)
{
// Ignore expected shutdown/disposal errors
}
}
}
}
}
Loading
Loading