diff --git a/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj b/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj index 8a42115792903d..3700e11fc1fc9d 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj +++ b/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj @@ -14,6 +14,8 @@ System.Threading.RateLimiting.RateLimitLease + + diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedPartitionedRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedPartitionedRateLimiter.cs index 0d74b7f39f7676..2fc7eeafc6788e 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedPartitionedRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedPartitionedRateLimiter.cs @@ -76,7 +76,7 @@ protected override RateLimitLease AttemptAcquireCore(TResource resource, int per exception = ex; } - RateLimitLease? notAcquiredLease = ChainedRateLimiter.CommonAcquireLogic(exception, lease, ref leases, i, _limiters.Length); + RateLimitLease? notAcquiredLease = ChainedRateLimiterShared.CommonAcquireLogic(exception, lease, ref leases, i, _limiters.Length); if (notAcquiredLease is not null) { @@ -107,7 +107,7 @@ protected override async ValueTask AcquireAsyncCore(TResource re exception = ex; } - RateLimitLease? notAcquiredLease = ChainedRateLimiter.CommonAcquireLogic(exception, lease, ref leases, i, _limiters.Length); + RateLimitLease? notAcquiredLease = ChainedRateLimiterShared.CommonAcquireLogic(exception, lease, ref leases, i, _limiters.Length); if (notAcquiredLease is not null) { diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedRateLimiter.cs index 1e786cbb70fd62..ce719baa5caeee 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedRateLimiter.cs @@ -1,8 +1,6 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -using System.Collections.Generic; -using System.Runtime.ExceptionServices; using System.Threading.Tasks; namespace System.Threading.RateLimiting @@ -11,7 +9,7 @@ namespace System.Threading.RateLimiting /// Acquires leases from rate limiters in the order given. If a lease fails to be acquired (throwing or IsAcquired == false) /// then the already acquired leases are disposed in reverse order and the failing lease is returned or the exception is thrown to user code. /// - internal sealed partial class ChainedRateLimiter : RateLimiter + internal sealed class ChainedRateLimiter : RateLimiter { private readonly RateLimiter[] _limiters; private bool _disposed; @@ -24,34 +22,7 @@ public ChainedRateLimiter(RateLimiter[] limiters) public override RateLimiterStatistics? GetStatistics() { ThrowIfDisposed(); - - long lowestAvailablePermits = long.MaxValue; - long currentQueuedCount = 0; - long totalFailedLeases = 0; - long innerMostSuccessfulLeases = 0; - - foreach (RateLimiter limiter in _limiters) - { - if (limiter.GetStatistics() is { } statistics) - { - if (statistics.CurrentAvailablePermits < lowestAvailablePermits) - { - lowestAvailablePermits = statistics.CurrentAvailablePermits; - } - - currentQueuedCount += statistics.CurrentQueuedCount; - totalFailedLeases += statistics.TotalFailedLeases; - innerMostSuccessfulLeases = statistics.TotalSuccessfulLeases; - } - } - - return new RateLimiterStatistics() - { - CurrentAvailablePermits = lowestAvailablePermits, - CurrentQueuedCount = currentQueuedCount, - TotalFailedLeases = totalFailedLeases, - TotalSuccessfulLeases = innerMostSuccessfulLeases, - }; + return ChainedRateLimiterShared.GetStatisticsCore(_limiters); } public override TimeSpan? IdleDuration @@ -59,84 +30,20 @@ public override TimeSpan? IdleDuration get { ThrowIfDisposed(); - - TimeSpan? lowestIdleDuration = null; - - foreach (RateLimiter limiter in _limiters) - { - if (limiter.IdleDuration is { } idleDuration) - { - if (lowestIdleDuration is null || idleDuration < lowestIdleDuration) - { - lowestIdleDuration = idleDuration; - } - } - } - - return lowestIdleDuration; + return ChainedRateLimiterShared.GetIdleDurationCore(_limiters); } } protected override RateLimitLease AttemptAcquireCore(int permitCount) { ThrowIfDisposed(); - - RateLimitLease[]? leases = null; - - for (int i = 0; i < _limiters.Length; i++) - { - RateLimitLease? lease = null; - Exception? exception = null; - - try - { - lease = _limiters[i].AttemptAcquire(permitCount); - } - catch (Exception ex) - { - exception = ex; - } - - RateLimitLease? notAcquiredLease = CommonAcquireLogic(exception, lease, ref leases, i, _limiters.Length); - - if (notAcquiredLease is not null) - { - return notAcquiredLease; - } - } - - return new CombinedRateLimitLease(leases!); + return ChainedRateLimiterShared.AttemptAcquireChained(_limiters, permitCount); } protected override async ValueTask AcquireAsyncCore(int permitCount, CancellationToken cancellationToken) { ThrowIfDisposed(); - - RateLimitLease[]? leases = null; - - for (int i = 0; i < _limiters.Length; i++) - { - RateLimitLease? lease = null; - Exception? exception = null; - - try - { - lease = await _limiters[i].AcquireAsync(permitCount, cancellationToken).ConfigureAwait(false); - } - catch (Exception ex) - { - exception = ex; - } - - RateLimitLease? notAcquiredLease = CommonAcquireLogic(exception, lease, ref leases, i, _limiters.Length); - - if (notAcquiredLease is not null) - { - return notAcquiredLease; - } - } - - return new CombinedRateLimitLease(leases!); + return await ChainedRateLimiterShared.AcquireAsyncChained(_limiters, permitCount, cancellationToken).ConfigureAwait(false); } protected override void Dispose(bool disposing) @@ -151,60 +58,5 @@ private void ThrowIfDisposed() throw new ObjectDisposedException(nameof(ChainedRateLimiter)); } } - - internal static RateLimitLease? CommonAcquireLogic(Exception? ex, RateLimitLease? lease, ref RateLimitLease[]? leases, int index, int length) - { - if (ex is not null) - { - AggregateException? innerEx = CommonDispose(leases, index); - - if (innerEx is not null) - { - Exception[] exceptions = new Exception[innerEx.InnerExceptions.Count + 1]; - innerEx.InnerExceptions.CopyTo(exceptions, 0); - exceptions[exceptions.Length - 1] = ex; - throw new AggregateException(exceptions); - } - - ExceptionDispatchInfo.Capture(ex).Throw(); - } - - if (!lease!.IsAcquired) - { - AggregateException? innerEx = CommonDispose(leases, index); - return innerEx is not null ? throw innerEx : lease; - } - - leases ??= new RateLimitLease[length]; - leases[index] = lease; - return null; - } - - private static AggregateException? CommonDispose(RateLimitLease[]? leases, int i) - { - List? exceptions = null; - - while (i > 0) - { - i--; - - try - { - leases![i].Dispose(); - } - catch (Exception ex) - { - exceptions ??= []; - exceptions.Add(ex); - } - } - - if (exceptions is not null) - { - return new AggregateException(exceptions); - } - - return null; - } } } diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedRateLimiterShared.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedRateLimiterShared.cs new file mode 100644 index 00000000000000..8a86fa0fe24a56 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedRateLimiterShared.cs @@ -0,0 +1,183 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Runtime.ExceptionServices; +using System.Threading.Tasks; + +namespace System.Threading.RateLimiting +{ + /// + /// Shared static methods used by , + /// , and . + /// + internal static class ChainedRateLimiterShared + { + internal static RateLimiterStatistics GetStatisticsCore(RateLimiter[] limiters) + { + long lowestAvailablePermits = long.MaxValue; + long currentQueuedCount = 0; + long totalFailedLeases = 0; + long innerMostSuccessfulLeases = 0; + + foreach (RateLimiter limiter in limiters) + { + if (limiter.GetStatistics() is { } statistics) + { + if (statistics.CurrentAvailablePermits < lowestAvailablePermits) + { + lowestAvailablePermits = statistics.CurrentAvailablePermits; + } + + currentQueuedCount += statistics.CurrentQueuedCount; + totalFailedLeases += statistics.TotalFailedLeases; + innerMostSuccessfulLeases = statistics.TotalSuccessfulLeases; + } + } + + return new RateLimiterStatistics() + { + CurrentAvailablePermits = lowestAvailablePermits, + CurrentQueuedCount = currentQueuedCount, + TotalFailedLeases = totalFailedLeases, + TotalSuccessfulLeases = innerMostSuccessfulLeases, + }; + } + + internal static TimeSpan? GetIdleDurationCore(RateLimiter[] limiters) + { + TimeSpan? lowestIdleDuration = null; + + foreach (RateLimiter limiter in limiters) + { + TimeSpan? idleDuration = limiter.IdleDuration; + if (idleDuration is null) + { + // The chain should not be considered idle if any of its children is not idle. + return null; + } + + if (lowestIdleDuration is null || idleDuration < lowestIdleDuration) + { + lowestIdleDuration = idleDuration; + } + } + + return lowestIdleDuration; + } + + internal static RateLimitLease AttemptAcquireChained(RateLimiter[] limiters, int permitCount) + { + RateLimitLease[]? leases = null; + + for (int i = 0; i < limiters.Length; i++) + { + RateLimitLease? lease = null; + Exception? exception = null; + + try + { + lease = limiters[i].AttemptAcquire(permitCount); + } + catch (Exception ex) + { + exception = ex; + } + + RateLimitLease? notAcquiredLease = CommonAcquireLogic(exception, lease, ref leases, i, limiters.Length); + + if (notAcquiredLease is not null) + { + return notAcquiredLease; + } + } + + return new CombinedRateLimitLease(leases!); + } + + internal static async ValueTask AcquireAsyncChained(RateLimiter[] limiters, int permitCount, CancellationToken cancellationToken) + { + RateLimitLease[]? leases = null; + + for (int i = 0; i < limiters.Length; i++) + { + RateLimitLease? lease = null; + Exception? exception = null; + + try + { + lease = await limiters[i].AcquireAsync(permitCount, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + exception = ex; + } + + RateLimitLease? notAcquiredLease = CommonAcquireLogic(exception, lease, ref leases, i, limiters.Length); + + if (notAcquiredLease is not null) + { + return notAcquiredLease; + } + } + + return new CombinedRateLimitLease(leases!); + } + + internal static RateLimitLease? CommonAcquireLogic(Exception? ex, RateLimitLease? lease, ref RateLimitLease[]? leases, int index, int length) + { + if (ex is not null) + { + AggregateException? innerEx = CommonDispose(leases, index); + + if (innerEx is not null) + { + Exception[] exceptions = new Exception[innerEx.InnerExceptions.Count + 1]; + innerEx.InnerExceptions.CopyTo(exceptions, 0); + exceptions[exceptions.Length - 1] = ex; + throw new AggregateException(exceptions); + } + + ExceptionDispatchInfo.Capture(ex).Throw(); + } + + if (!lease!.IsAcquired) + { + AggregateException? innerEx = CommonDispose(leases, index); + return innerEx is not null ? throw innerEx : lease; + } + + leases ??= new RateLimitLease[length]; + leases[index] = lease; + return null; + } + + private static AggregateException? CommonDispose(RateLimitLease[]? leases, int i) + { + List? exceptions = null; + + while (i > 0) + { + i--; + + try + { + leases![i].Dispose(); + } + catch (Exception ex) + { + exceptions ??= []; + exceptions.Add(ex); + } + } + + if (exceptions is not null) + { + return new AggregateException(exceptions); + } + + return null; + } + + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedReplenishingRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedReplenishingRateLimiter.cs new file mode 100644 index 00000000000000..4aaa768a51eaa3 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedReplenishingRateLimiter.cs @@ -0,0 +1,127 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace System.Threading.RateLimiting +{ + /// + /// A chained rate limiter that extends when at least one of the + /// chained limiters is a . + /// + internal sealed class ChainedReplenishingRateLimiter : ReplenishingRateLimiter + { + private readonly RateLimiter[] _limiters; + private readonly ReplenishingRateLimiter[] _replenishingLimiters; + private readonly bool _isAutoReplenishing; + private readonly TimeSpan _replenishmentPeriod; + private bool _disposed; + + public ChainedReplenishingRateLimiter(RateLimiter[] limiters) + { + _limiters = (RateLimiter[])limiters.Clone(); + + var replenishingLimiters = new List(); + bool isAutoReplenishing = true; + TimeSpan lowestPeriod = TimeSpan.MaxValue; + + foreach (RateLimiter limiter in _limiters) + { + if (limiter is ReplenishingRateLimiter replenishing) + { + replenishingLimiters.Add(replenishing); + + if (!replenishing.IsAutoReplenishing) + { + isAutoReplenishing = false; + } + + TimeSpan period = replenishing.ReplenishmentPeriod; + if (period > TimeSpan.Zero && period < lowestPeriod) + { + lowestPeriod = period; + } + } + } + + _replenishingLimiters = replenishingLimiters.ToArray(); + _isAutoReplenishing = isAutoReplenishing; + _replenishmentPeriod = lowestPeriod == TimeSpan.MaxValue ? TimeSpan.Zero : lowestPeriod; + } + + public override bool IsAutoReplenishing => _isAutoReplenishing; + + public override TimeSpan ReplenishmentPeriod => _replenishmentPeriod; + + public override bool TryReplenish() + { + ThrowIfDisposed(); + + bool replenished = false; + List? exceptions = null; + foreach (ReplenishingRateLimiter limiter in _replenishingLimiters) + { + try + { + if (limiter.TryReplenish()) + { + replenished = true; + } + } + catch (Exception ex) + { + exceptions ??= new List(); + exceptions.Add(ex); + } + } + + if (exceptions is not null) + { + throw new AggregateException(exceptions); + } + + return replenished; + } + + public override RateLimiterStatistics? GetStatistics() + { + ThrowIfDisposed(); + return ChainedRateLimiterShared.GetStatisticsCore(_limiters); + } + + public override TimeSpan? IdleDuration + { + get + { + ThrowIfDisposed(); + return ChainedRateLimiterShared.GetIdleDurationCore(_limiters); + } + } + + protected override RateLimitLease AttemptAcquireCore(int permitCount) + { + ThrowIfDisposed(); + return ChainedRateLimiterShared.AttemptAcquireChained(_limiters, permitCount); + } + + protected override async ValueTask AcquireAsyncCore(int permitCount, CancellationToken cancellationToken) + { + ThrowIfDisposed(); + return await ChainedRateLimiterShared.AcquireAsyncChained(_limiters, permitCount, cancellationToken).ConfigureAwait(false); + } + + protected override void Dispose(bool disposing) + { + _disposed = true; + } + + private void ThrowIfDisposed() + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(ChainedReplenishingRateLimiter)); + } + } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/DefaultPartitionedRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/DefaultPartitionedRateLimiter.cs index 5c58a1de5970dd..5988d0b1b59f8e 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/DefaultPartitionedRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/DefaultPartitionedRateLimiter.cs @@ -259,7 +259,7 @@ private async Task Heartbeat() } catch (Exception ex) { - aggregateExceptions ??= new List(); + aggregateExceptions ??= []; aggregateExceptions.Add(ex); } } diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/PartitionedRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/PartitionedRateLimiter.cs index 560ed4b63bafe8..920c3bfc1fb502 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/PartitionedRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/PartitionedRateLimiter.cs @@ -45,6 +45,10 @@ public static PartitionedRateLimiter Create /// /// s returned will aggregate metadata and for duplicates use the value of the first lease with the same metadata name. /// + /// + /// Disposing the returned does not dispose the inner . + /// Callers are expected to dispose the inner limiters themselves once they are no longer in use. + /// /// /// The resource type that is being rate limited. /// The s that will be called in order when acquiring resources. diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimiter.cs index 428c92504f9325..277f08f0a986cf 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimiter.cs @@ -25,6 +25,10 @@ public abstract class RateLimiter : IAsyncDisposable, IDisposable /// /// s returned will aggregate metadata and for duplicates use the value of the first lease with the same metadata name. /// + /// + /// Disposing the returned does not dispose the inner . + /// Callers are expected to dispose the inner limiters themselves once they are no longer in use. + /// /// /// The s that will be called in order when acquiring resources. /// @@ -39,6 +43,14 @@ public static RateLimiter CreateChained(params RateLimiter[] limiters) throw new ArgumentException("Must pass in at least 1 limiter.", nameof(limiters)); } + foreach (RateLimiter limiter in limiters) + { + if (limiter is ReplenishingRateLimiter) + { + return new ChainedReplenishingRateLimiter(limiters); + } + } + return new ChainedRateLimiter(limiters); } diff --git a/src/libraries/System.Threading.RateLimiting/tests/ChainedLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/ChainedLimiterTests.cs index 70e90b2ec397c1..b1faea5579af4b 100644 --- a/src/libraries/System.Threading.RateLimiting/tests/ChainedLimiterTests.cs +++ b/src/libraries/System.Threading.RateLimiting/tests/ChainedLimiterTests.cs @@ -10,6 +10,188 @@ namespace System.Threading.RateLimiting.Tests { public class ChainedLimiterTests { + [Fact] + public void CreateChainedReturnsRateLimiterWhenNoReplenishingLimiters() + { + using var limiter1 = new ConcurrencyLimiter(new ConcurrencyLimiterOptions + { + PermitLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0 + }); + using var limiter2 = new ConcurrencyLimiter(new ConcurrencyLimiterOptions + { + PermitLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0 + }); + + using var chainedLimiter = RateLimiter.CreateChained(limiter1, limiter2); + + Assert.False(chainedLimiter is ReplenishingRateLimiter); + } + + [Fact] + public void CreateChainedReturnsReplenishingRateLimiterWhenAnyReplenishingLimiter() + { + using var limiter1 = new ConcurrencyLimiter(new ConcurrencyLimiterOptions + { + PermitLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0 + }); + using var limiter2 = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions + { + TokenLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0, + ReplenishmentPeriod = TimeSpan.FromSeconds(1), + TokensPerPeriod = 1, + AutoReplenishment = false + }); + + using var chainedLimiter = RateLimiter.CreateChained(limiter1, limiter2); + + Assert.IsAssignableFrom(chainedLimiter); + } + + [Fact] + public void ReplenishingChainedLimiter_IsAutoReplenishingTrue_WhenAllReplenishingAreAutoReplenishing() + { + using var limiter1 = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions + { + TokenLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0, + ReplenishmentPeriod = TimeSpan.FromSeconds(1), + TokensPerPeriod = 1, + AutoReplenishment = true + }); + using var limiter2 = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions + { + PermitLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0, + Window = TimeSpan.FromSeconds(2), + AutoReplenishment = true + }); + + using var chainedLimiter = RateLimiter.CreateChained(limiter1, limiter2); + + var replenishing = Assert.IsAssignableFrom(chainedLimiter); + Assert.True(replenishing.IsAutoReplenishing); + } + + [Fact] + public void ReplenishingChainedLimiter_IsAutoReplenishingFalse_WhenAnyReplenishingIsNotAutoReplenishing() + { + using var limiter1 = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions + { + TokenLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0, + ReplenishmentPeriod = TimeSpan.FromSeconds(1), + TokensPerPeriod = 1, + AutoReplenishment = true + }); + using var limiter2 = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions + { + PermitLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0, + Window = TimeSpan.FromSeconds(2), + AutoReplenishment = false + }); + + using var chainedLimiter = RateLimiter.CreateChained(limiter1, limiter2); + + var replenishing = Assert.IsAssignableFrom(chainedLimiter); + Assert.False(replenishing.IsAutoReplenishing); + } + + [Fact] + public void ReplenishingChainedLimiter_ReplenishmentPeriod_IsLowestPositiveValue() + { + using var limiter1 = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions + { + TokenLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0, + ReplenishmentPeriod = TimeSpan.FromSeconds(5), + TokensPerPeriod = 1, + AutoReplenishment = false + }); + using var limiter2 = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions + { + PermitLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0, + Window = TimeSpan.FromSeconds(2), + AutoReplenishment = false + }); + using var nonReplenishing = new ConcurrencyLimiter(new ConcurrencyLimiterOptions + { + PermitLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0 + }); + + using var chainedLimiter = RateLimiter.CreateChained(limiter1, limiter2, nonReplenishing); + + var replenishing = Assert.IsAssignableFrom(chainedLimiter); + Assert.Equal(TimeSpan.FromSeconds(2), replenishing.ReplenishmentPeriod); + } + + [Fact] + public void ReplenishingChainedLimiter_TryReplenish_CallsAllReplenishingLimiters() + { + var replenishingLimiter1 = new CustomizableReplenishingLimiter(); + bool replenish1Called = false; + replenishingLimiter1.TryReplenishImpl = () => + { + replenish1Called = true; + return true; + }; + + var replenishingLimiter2 = new CustomizableReplenishingLimiter(); + bool replenish2Called = false; + replenishingLimiter2.TryReplenishImpl = () => + { + replenish2Called = true; + return false; + }; + + using var nonReplenishing = new CustomizableLimiter(); + + using var chainedLimiter = RateLimiter.CreateChained(replenishingLimiter1, nonReplenishing, replenishingLimiter2); + + var replenishing = Assert.IsAssignableFrom(chainedLimiter); + bool result = replenishing.TryReplenish(); + + Assert.True(replenish1Called); + Assert.True(replenish2Called); + Assert.True(result); + } + + [Fact] + public void ReplenishingChainedLimiter_TryReplenish_ReturnsFalseWhenNoneReplenished() + { + var replenishingLimiter1 = new CustomizableReplenishingLimiter + { + TryReplenishImpl = () => false + }; + + var replenishingLimiter2 = new CustomizableReplenishingLimiter + { + TryReplenishImpl = () => false + }; + + using var chainedLimiter = RateLimiter.CreateChained(replenishingLimiter1, replenishingLimiter2); + + var replenishing = Assert.IsAssignableFrom(chainedLimiter); + Assert.False(replenishing.TryReplenish()); + } + [Fact] public void ThrowsWhenNoLimitersProvided() { @@ -209,16 +391,27 @@ public async Task GetStatisticsHasCorrectValues() } [Fact] - public void IdleDurationReturnsLowestValue() + public void IdleDurationReturnsNullWhenAnyChildReturnsNull() { - using var limiter1 = new CustomizableLimiter(); + using var limiter1 = new CustomizableLimiter { IdleDurationImpl = () => null }; using var limiter2 = new CustomizableLimiter { IdleDurationImpl = () => TimeSpan.FromMilliseconds(2) }; using var limiter3 = new CustomizableLimiter { IdleDurationImpl = () => TimeSpan.FromMilliseconds(3) }; + using var chainedLimiter = RateLimiter.CreateChained(limiter1, limiter2, limiter3); + Assert.Null(chainedLimiter.IdleDuration); + } + + [Fact] + public void IdleDurationReturnsLowestValueWhenAllChildrenAreIdle() + { + using var limiter1 = new CustomizableLimiter { IdleDurationImpl = () => TimeSpan.FromMilliseconds(2) }; + using var limiter2 = new CustomizableLimiter { IdleDurationImpl = () => TimeSpan.FromMilliseconds(1) }; + using var limiter3 = new CustomizableLimiter { IdleDurationImpl = () => TimeSpan.FromMilliseconds(3) }; + using var chainedLimiter = RateLimiter.CreateChained(limiter1, limiter2, limiter3); var idleDuration = chainedLimiter.IdleDuration; - Assert.Equal(2, idleDuration.Value.TotalMilliseconds); + Assert.Equal(1, idleDuration.Value.TotalMilliseconds); } [Fact] @@ -809,7 +1002,7 @@ public async Task AcquireAsyncWaitsForResourcesBeforeCallingNextLimiter() QueueProcessingOrder = QueueProcessingOrder.OldestFirst, QueueLimit = 0 }); - + using var chainedLimiter = RateLimiter.CreateChained(limiter1, limiter2); var lease = chainedLimiter.AttemptAcquire(); @@ -1040,5 +1233,65 @@ public void DuplicateMetadataUsesFirstOne() Assert.True(lease.TryGetMetadata("3", out obj)); Assert.IsType>(obj); } + + [Fact] + public void DisposeDoesNotDisposeInnerLimiters() + { + var limiter1 = new TrackingRateLimiter(); + var limiter2 = new TrackingRateLimiter(); + var chainedLimiter = RateLimiter.CreateChained(limiter1, limiter2); + + chainedLimiter.Dispose(); + + Assert.Equal(0, limiter1.DisposeCallCount); + Assert.Equal(0, limiter2.DisposeCallCount); + } + + [Fact] + public async Task DisposeAsyncDoesNotDisposeInnerLimiters() + { + var limiter1 = new TrackingRateLimiter(); + var limiter2 = new TrackingRateLimiter(); + var chainedLimiter = RateLimiter.CreateChained(limiter1, limiter2); + + await chainedLimiter.DisposeAsync(); + + Assert.Equal(0, limiter1.DisposeCallCount); + Assert.Equal(0, limiter1.DisposeAsyncCallCount); + Assert.Equal(0, limiter2.DisposeCallCount); + Assert.Equal(0, limiter2.DisposeAsyncCallCount); + } + + [Fact] + public void DisposeDoesNotDisposeInnerReplenishingLimiters() + { + var limiter1 = new CustomizableReplenishingLimiter { ReplenishmentPeriodImpl = () => TimeSpan.FromSeconds(1) }; + var limiter2 = new CustomizableReplenishingLimiter { ReplenishmentPeriodImpl = () => TimeSpan.FromSeconds(2) }; + var chainedLimiter = RateLimiter.CreateChained(limiter1, limiter2); + + Assert.IsAssignableFrom(chainedLimiter); + + chainedLimiter.Dispose(); + + // Inner limiters should still be usable after chained limiter is disposed + Assert.NotNull(limiter1.AttemptAcquire()); + Assert.NotNull(limiter2.AttemptAcquire()); + } + + [Fact] + public async Task DisposeAsyncDoesNotDisposeInnerReplenishingLimiters() + { + var limiter1 = new CustomizableReplenishingLimiter { ReplenishmentPeriodImpl = () => TimeSpan.FromSeconds(1) }; + var limiter2 = new CustomizableReplenishingLimiter { ReplenishmentPeriodImpl = () => TimeSpan.FromSeconds(2) }; + var chainedLimiter = RateLimiter.CreateChained(limiter1, limiter2); + + Assert.IsAssignableFrom(chainedLimiter); + + await chainedLimiter.DisposeAsync(); + + // Inner limiters should still be usable after chained limiter is disposed + Assert.NotNull(limiter1.AttemptAcquire()); + Assert.NotNull(limiter2.AttemptAcquire()); + } } } diff --git a/src/libraries/System.Threading.RateLimiting/tests/Infrastructure/Utils.cs b/src/libraries/System.Threading.RateLimiting/tests/Infrastructure/Utils.cs index fc8091b21fdc16..c807c197f1ee0a 100644 --- a/src/libraries/System.Threading.RateLimiting/tests/Infrastructure/Utils.cs +++ b/src/libraries/System.Threading.RateLimiting/tests/Infrastructure/Utils.cs @@ -203,7 +203,8 @@ internal sealed class CustomizableReplenishingLimiter : ReplenishingRateLimiter public override bool IsAutoReplenishing => false; - public override TimeSpan ReplenishmentPeriod => throw new NotImplementedException(); + public Func ReplenishmentPeriodImpl { get; set; } = () => TimeSpan.Zero; + public override TimeSpan ReplenishmentPeriod => ReplenishmentPeriodImpl(); public Func TryReplenishImpl { get; set; } = () => true; public override bool TryReplenish() => TryReplenishImpl(); diff --git a/src/libraries/System.Threading.RateLimiting/tests/PartitionedChainedLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/PartitionedChainedLimiterTests.cs index 3b8950e85d7fd0..14f81def49d02b 100644 --- a/src/libraries/System.Threading.RateLimiting/tests/PartitionedChainedLimiterTests.cs +++ b/src/libraries/System.Threading.RateLimiting/tests/PartitionedChainedLimiterTests.cs @@ -1409,5 +1409,79 @@ public void DuplicateMetadataUsesFirstOne() Assert.True(lease.TryGetMetadata("3", out obj)); Assert.IsType>(obj); } + + [Fact] + public void DisposeDoesNotDisposeInnerLimiters() + { + var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.GetConcurrencyLimiter(1, _ => + new ConcurrencyLimiterOptions + { + PermitLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.NewestFirst, + QueueLimit = 0 + }); + }); + var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.GetConcurrencyLimiter(1, _ => + new ConcurrencyLimiterOptions + { + PermitLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.NewestFirst, + QueueLimit = 0 + }); + }); + var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + + chainedLimiter.Dispose(); + + // Inner limiters should still be usable after chained limiter is disposed + using var lease1 = limiter1.AttemptAcquire(""); + Assert.True(lease1.IsAcquired); + using var lease2 = limiter2.AttemptAcquire(""); + Assert.True(lease2.IsAcquired); + + limiter1.Dispose(); + limiter2.Dispose(); + } + + [Fact] + public async Task DisposeAsyncDoesNotDisposeInnerLimiters() + { + var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.GetConcurrencyLimiter(1, _ => + new ConcurrencyLimiterOptions + { + PermitLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.NewestFirst, + QueueLimit = 0 + }); + }); + var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.GetConcurrencyLimiter(1, _ => + new ConcurrencyLimiterOptions + { + PermitLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.NewestFirst, + QueueLimit = 0 + }); + }); + var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + + await chainedLimiter.DisposeAsync(); + + // Inner limiters should still be usable after chained limiter is disposed + using var lease1 = limiter1.AttemptAcquire(""); + Assert.True(lease1.IsAcquired); + using var lease2 = limiter2.AttemptAcquire(""); + Assert.True(lease2.IsAcquired); + + await limiter1.DisposeAsync(); + await limiter2.DisposeAsync(); + } } } diff --git a/src/libraries/System.Threading.RateLimiting/tests/PartitionedRateLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/PartitionedRateLimiterTests.cs index b0380a0dcd6a4f..21dbad67c48a22 100644 --- a/src/libraries/System.Threading.RateLimiting/tests/PartitionedRateLimiterTests.cs +++ b/src/libraries/System.Threading.RateLimiting/tests/PartitionedRateLimiterTests.cs @@ -632,6 +632,149 @@ public async Task ThrowingTryReplenishDoesNotPreventIdleLimiterBeingCleanedUp() await disposeTcs.Task; } + [Fact] + public async Task ChainedLimiter_HeartbeatCallsTryReplenishOnInnerReplenishingLimiters() + { + var replenishCallCount = 0; + CustomizableReplenishingLimiter replenishLimiter = new CustomizableReplenishingLimiter + { + TryReplenishImpl = () => { replenishCallCount++; return true; } + }; + CustomizableLimiter nonReplenishLimiter = new CustomizableLimiter(); + + using var limiter = Utils.CreatePartitionedLimiterWithoutTimer(resource => + { + return RateLimitPartition.Get(1, _ => RateLimiter.CreateChained(nonReplenishLimiter, replenishLimiter)); + }); + + limiter.AttemptAcquire(""); + + await Utils.RunTimerFunc(limiter); + + Assert.Equal(1, replenishCallCount); + } + + [Fact] + public async Task ChainedLimiter_HeartbeatCallsTryReplenishOnAllInnerReplenishingLimiters() + { + var replenishCallCount1 = 0; + var replenishCallCount2 = 0; + CustomizableReplenishingLimiter replenishLimiter1 = new CustomizableReplenishingLimiter + { + TryReplenishImpl = () => { replenishCallCount1++; return true; } + }; + CustomizableReplenishingLimiter replenishLimiter2 = new CustomizableReplenishingLimiter + { + TryReplenishImpl = () => { replenishCallCount2++; return true; } + }; + + using var limiter = Utils.CreatePartitionedLimiterWithoutTimer(resource => + { + return RateLimitPartition.Get(1, _ => RateLimiter.CreateChained(replenishLimiter1, replenishLimiter2)); + }); + + limiter.AttemptAcquire(""); + + await Utils.RunTimerFunc(limiter); + + Assert.Equal(1, replenishCallCount1); + Assert.Equal(1, replenishCallCount2); + } + + [Fact] + public async Task ChainedLimiter_ThrowingTryReplenishStillReplenishesOtherLimiters() + { + var replenishCallCount = 0; + CustomizableReplenishingLimiter throwingLimiter = new CustomizableReplenishingLimiter + { + TryReplenishImpl = () => throw new Exception("replenish failed") + }; + CustomizableReplenishingLimiter goodLimiter = new CustomizableReplenishingLimiter + { + TryReplenishImpl = () => { replenishCallCount++; return true; } + }; + + using var limiter = Utils.CreatePartitionedLimiterWithoutTimer(resource => + { + return RateLimitPartition.Get(1, _ => RateLimiter.CreateChained(throwingLimiter, goodLimiter)); + }); + + limiter.AttemptAcquire(""); + + var ex = await Assert.ThrowsAsync(() => Utils.RunTimerFunc(limiter)); + Assert.Single(ex.InnerExceptions); + Assert.IsType(ex.InnerExceptions[0]); + + // The good limiter was still replenished despite the first one throwing + Assert.Equal(1, replenishCallCount); + } + + [Fact] + public async Task ChainedLimiter_IdleChainedLimiterWithNullChildIdleDurationNotEvicted() + { + var factoryCallCount = 0; + CustomizableReplenishingLimiter replenishLimiter = null; + CustomizableLimiter idleLimiter = null; + + using var limiter = Utils.CreatePartitionedLimiterWithoutTimer(resource => + { + return RateLimitPartition.Get(1, _ => + { + factoryCallCount++; + replenishLimiter = new CustomizableReplenishingLimiter(); + idleLimiter = new CustomizableLimiter(); + return RateLimiter.CreateChained(idleLimiter, replenishLimiter); + }); + }); + + limiter.AttemptAcquire(""); + Assert.Equal(1, factoryCallCount); + + // Idle limiter reports > 10s idle, but replenishing limiter reports null (still active) + idleLimiter.IdleDurationImpl = () => TimeSpan.FromMinutes(1); + replenishLimiter.IdleDurationImpl = () => null; + + await Utils.RunTimerFunc(limiter); + + // Factory should not have been called again — limiter was not evicted + limiter.AttemptAcquire(""); + Assert.Equal(1, factoryCallCount); + } + + [Fact] + public async Task ChainedLimiter_FullyIdleChainedLimiterIsEvicted() + { + var factoryCallCount = 0; + CustomizableLimiter innerLimiter1 = null; + CustomizableLimiter innerLimiter2 = null; + + using var limiter = Utils.CreatePartitionedLimiterWithoutTimer(resource => + { + return RateLimitPartition.Get(1, _ => + { + factoryCallCount++; + innerLimiter1 = new CustomizableLimiter(); + innerLimiter2 = new CustomizableLimiter(); + return RateLimiter.CreateChained(innerLimiter1, innerLimiter2); + }); + }); + + limiter.AttemptAcquire(""); + Assert.Equal(1, factoryCallCount); + + // Both children report idle > 10s, chain should be evicted + innerLimiter1.IdleDurationImpl = () => TimeSpan.FromMinutes(1); + innerLimiter2.IdleDurationImpl = () => TimeSpan.FromMinutes(1); + innerLimiter1.DisposeAsyncCoreImpl = () => default; + innerLimiter2.DisposeAsyncCoreImpl = () => default; + + await Utils.RunTimerFunc(limiter); + + // Factory should be called again on next acquire — limiter was evicted and recreated + limiter.AttemptAcquire(""); + Assert.Equal(2, factoryCallCount); + } + // Translate [Fact]