From b9822b96bfc5934b39d07357896748ed751e4b61 Mon Sep 17 00:00:00 2001 From: vsadov <8218165+VSadov@users.noreply.github.com> Date: Mon, 20 Apr 2026 17:54:39 -0700 Subject: [PATCH 1/3] reset _continuation on completion --- .../Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs index 3ce4906d5b2b70..8ba828efe51751 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs @@ -225,6 +225,7 @@ private void SignalCompletion() _continuationState = null; object? context = _capturedContext; _capturedContext = null; + _continuation = ManualResetValueTaskSourceCoreShared.s_sentinel; if (context is null) { From 8549f9662b47f4f10357f75bb046362d7a39fb63 Mon Sep 17 00:00:00 2001 From: vsadov <8218165+VSadov@users.noreply.github.com> Date: Mon, 20 Apr 2026 21:41:44 -0700 Subject: [PATCH 2/3] spin in IsCompleted --- .../Sources/ManualResetValueTaskSourceCore.cs | 74 ++++++++++++++++--- .../src/System.Threading.Channels.csproj | 1 + .../Threading/Channels/AsyncOperation.cs | 71 ++++++++++++++++-- 3 files changed, 127 insertions(+), 19 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs index 8ba828efe51751..0238edaa5a15c2 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs @@ -34,8 +34,10 @@ public struct ManualResetValueTaskSourceCore private TResult? _result; /// The current version of this value, used to help prevent misuse. private short _version; - /// Whether the current operation has completed. - private bool _completed; + /// Typical latency between rearming the source and completing it. + private long _avgLatencyTicks; + /// Ticks at time of rearming the source. + private long _startTicks; /// Whether to force continuations to run asynchronously. private bool _runContinuationsAsynchronously; @@ -57,7 +59,7 @@ public void Reset() _capturedContext = null; _error = null; _result = default; - _completed = false; + _startTicks = Stopwatch.GetTimestamp(); } /// Completes with a successful result. @@ -79,16 +81,61 @@ public void SetException(Exception error) /// Gets the operation version. public short Version => _version; + private bool IsCompleted() + { + if (Volatile.Read(ref _continuation) == (object)ManualResetValueTaskSourceCoreShared.s_sentinel) + { + return true; + } + + // Note: long reads can be torn on 32bit, + // but since this is a statistical approximation it does not matter. + long avgLatency = _avgLatencyTicks; + if (avgLatency == 0 || avgLatency > (Stopwatch.Frequency * 2 / 1000000)) + { + return false; + } + + long start = _startTicks; + long spinUntil = start + avgLatency * 2; + do + { + // Note: reading _continuation will not be hoisted out of the loop because + // we make an opaque SpinWait call. + if (_continuation == (object)ManualResetValueTaskSourceCoreShared.s_sentinel) + { + return true; + } + + Thread.SpinWait(1); + } + while (Stopwatch.GetTimestamp() < spinUntil); + + return false; + } + + private bool CheckIsCompleted() + { + return Volatile.Read(ref _continuation) == (object)ManualResetValueTaskSourceCoreShared.s_sentinel; + } + /// Gets the status of the operation. /// Opaque value that was provided to the 's constructor. public ValueTaskSourceStatus GetStatus(short token) { ValidateToken(token); + + if (!IsCompleted()) + { + return ValueTaskSourceStatus.Pending; + } + return - Volatile.Read(ref _continuation) is null || !_completed ? ValueTaskSourceStatus.Pending : - _error is null ? ValueTaskSourceStatus.Succeeded : - _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled : - ValueTaskSourceStatus.Faulted; + _error is null ? + ValueTaskSourceStatus.Succeeded : + _error.SourceException is OperationCanceledException ? + ValueTaskSourceStatus.Canceled : + ValueTaskSourceStatus.Faulted; } /// Gets the result of the operation. @@ -96,7 +143,7 @@ public ValueTaskSourceStatus GetStatus(short token) [StackTraceHidden] public TResult GetResult(short token) { - if (token != _version || !_completed || _error is not null) + if (token != _version || !CheckIsCompleted() || _error is not null) { ThrowForFailedGetResult(); } @@ -174,7 +221,7 @@ public void OnCompleted(Action continuation, object? state, short token // Operation already completed, so we need to queue the supplied callback. // At this point the storedContinuation should be the sentinal; if it's not, the instance was misused. Debug.Assert(storedContinuation is not null, $"{nameof(storedContinuation)} is null"); - if (!ReferenceEquals(storedContinuation, ManualResetValueTaskSourceCoreShared.s_sentinel)) + if (!CheckIsCompleted()) { ThrowHelper.ThrowInvalidOperationException(); } @@ -209,11 +256,16 @@ private void ValidateToken(short token) /// Signals that the operation has completed. Invoked after the result or error has been set. private void SignalCompletion() { - if (_completed) + if (CheckIsCompleted()) { ThrowHelper.ThrowInvalidOperationException(); } - _completed = true; + + if (_startTicks != 0) + { + long latencyTicks = Stopwatch.GetTimestamp() - _startTicks; + _avgLatencyTicks = (_avgLatencyTicks + latencyTicks) / 2; + } Action? continuation = Volatile.Read(ref _continuation) ?? diff --git a/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj index 67fe73b57cd42d..f76023ccd54fd8 100644 --- a/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj +++ b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj @@ -56,6 +56,7 @@ System.Threading.Channel<T> + diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs index be40c6d7924131..2bcd1fefc422a6 100644 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs @@ -4,6 +4,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Runtime.ExceptionServices; +using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Sources; @@ -92,6 +93,11 @@ private volatile /// private protected short _currentId; + /// Typical latency between rearming the source and completing it. + private protected long _avgLatencyTicks; + /// Ticks at time of rearming the source. + private protected long _startTicks; + /// Initializes the interactor. /// true if continuations should be forced to run asynchronously; otherwise, false. /// The cancellation token used to cancel the operation. @@ -139,7 +145,43 @@ private CancellationToken CancellationToken #endif /// Gets whether the operation has completed. - internal bool IsCompleted => ReferenceEquals(_continuation, s_completedSentinel); + private protected bool IsCompleted() + { + if (Volatile.Read(ref _continuation) == (object)s_completedSentinel) + { + return true; + } + + // Note: long reads can be torn on 32bit, + // but since this is a statistical approximation it does not matter. + long avgLatency = _avgLatencyTicks; + if (avgLatency == 0 || avgLatency > (Stopwatch.Frequency * 2 / 1000000)) + { + return false; + } + + long start = _startTicks; + long spinUntil = start + avgLatency * 2; + do + { + // Note: reading _continuation will not be hoisted out of the loop because + // we make an opaque SpinWait call. + if (_continuation == (object)s_completedSentinel) + { + return true; + } + + Thread.SpinWait(1); + } + while (Stopwatch.GetTimestamp() < spinUntil); + + return false; + } + + private protected bool CheckIsCompleted() + { + return Volatile.Read(ref _continuation) == (object)s_completedSentinel; + } /// Completes the operation with a failed state and the specified error. /// The error. @@ -203,6 +245,12 @@ private protected void SignalCompletion() // be a nop, as its TrySetCanceled will return false and the callback will exit without doing further work. Unregister(_cancellationRegistration); + if (_startTicks != 0) + { + long latencyTicks = Stopwatch.GetTimestamp() - _startTicks; + _avgLatencyTicks = (_avgLatencyTicks + latencyTicks) / 2; + } + // NB: Assigning _continuation happens after assigning continuation dependencies (_capturedContext, _continuationState) // and effectively "commits" the entire continuation state as ready for invocation. // We must read _continuation before accessing its dependencies. @@ -364,7 +412,7 @@ public void OnCompleted(Action continuation, object? state, short token // If the set failed because there's already a delegate in _continuation, but that delegate is // something other than s_completedSentinel, something went wrong, which should only happen if // the instance was erroneously used, likely to hook up multiple continuations. - Debug.Assert(IsCompleted, $"Expected IsCompleted"); + Debug.Assert(CheckIsCompleted(), $"Expected IsCompleted"); if (!ReferenceEquals(prevContinuation, s_completedSentinel)) { Debug.Assert(prevContinuation != s_availableSentinel, "Continuation was the available sentinel."); @@ -444,11 +492,17 @@ public ValueTaskSourceStatus GetStatus(short token) ThrowIncorrectCurrentIdException(); } + if (!IsCompleted()) + { + return ValueTaskSourceStatus.Pending; + } + return - !IsCompleted ? ValueTaskSourceStatus.Pending : - _error is null ? ValueTaskSourceStatus.Succeeded : - _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled : - ValueTaskSourceStatus.Faulted; + _error is null ? + ValueTaskSourceStatus.Succeeded : + _error.SourceException is OperationCanceledException ? + ValueTaskSourceStatus.Canceled : + ValueTaskSourceStatus.Faulted; } /// Gets the result of the operation. @@ -460,7 +514,7 @@ void IValueTaskSource.GetResult(short token) ThrowIncorrectCurrentIdException(); } - if (!IsCompleted) + if (!CheckIsCompleted()) { ThrowIncompleteOperationException(); } @@ -504,7 +558,7 @@ public TResult GetResult(short token) ThrowIncorrectCurrentIdException(); } - if (!IsCompleted) + if (!CheckIsCompleted()) { ThrowIncompleteOperationException(); } @@ -533,6 +587,7 @@ public bool TryOwnAndReset() _result = default; _error = null; _capturedContext = null; + _startTicks = Stopwatch.GetTimestamp(); return true; } From a731b336b3801ed8ba42eb4785b1d98e07ab6a11 Mon Sep 17 00:00:00 2001 From: vsadov <8218165+VSadov@users.noreply.github.com> Date: Tue, 21 Apr 2026 10:08:11 -0700 Subject: [PATCH 3/3] PR feedback --- .../Sources/ManualResetValueTaskSourceCore.cs | 23 +++++++++++-------- .../Threading/Channels/AsyncOperation.cs | 19 ++++++++------- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs index 0238edaa5a15c2..f7972b5e5d0060 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs @@ -83,7 +83,7 @@ public void SetException(Exception error) private bool IsCompleted() { - if (Volatile.Read(ref _continuation) == (object)ManualResetValueTaskSourceCoreShared.s_sentinel) + if (CheckIsCompleted()) { return true; } @@ -96,13 +96,13 @@ private bool IsCompleted() return false; } - long start = _startTicks; - long spinUntil = start + avgLatency * 2; + // torn read is not a concern here because _startTicks is set as a part of + // rearming the source, before possible publishing to other threads. + long startTicks = _startTicks; + long spinUntil = startTicks + avgLatency * 2; do { - // Note: reading _continuation will not be hoisted out of the loop because - // we make an opaque SpinWait call. - if (_continuation == (object)ManualResetValueTaskSourceCoreShared.s_sentinel) + if (CheckIsCompleted()) { return true; } @@ -219,9 +219,9 @@ public void OnCompleted(Action continuation, object? state, short token } // Operation already completed, so we need to queue the supplied callback. - // At this point the storedContinuation should be the sentinal; if it's not, the instance was misused. + // At this point the storedContinuation should be the sentinel; if it's not, the instance was misused. Debug.Assert(storedContinuation is not null, $"{nameof(storedContinuation)} is null"); - if (!CheckIsCompleted()) + if (storedContinuation != (object)ManualResetValueTaskSourceCoreShared.s_sentinel) { ThrowHelper.ThrowInvalidOperationException(); } @@ -261,9 +261,12 @@ private void SignalCompletion() ThrowHelper.ThrowInvalidOperationException(); } - if (_startTicks != 0) + // torn read is not a concern here because _startTicks is set as a part of + // rearming the source, before possible publishing to other threads. + long startTicks = _startTicks; + if (startTicks != 0) { - long latencyTicks = Stopwatch.GetTimestamp() - _startTicks; + long latencyTicks = Stopwatch.GetTimestamp() - startTicks; _avgLatencyTicks = (_avgLatencyTicks + latencyTicks) / 2; } diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs index 2bcd1fefc422a6..9083adf98e6ac5 100644 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs @@ -147,7 +147,7 @@ private CancellationToken CancellationToken /// Gets whether the operation has completed. private protected bool IsCompleted() { - if (Volatile.Read(ref _continuation) == (object)s_completedSentinel) + if (CheckIsCompleted()) { return true; } @@ -160,13 +160,13 @@ private protected bool IsCompleted() return false; } - long start = _startTicks; - long spinUntil = start + avgLatency * 2; + // torn read is not a concern here because _startTicks is set as a part of + // rearming the source, before possible publishing to other threads. + long startTicks = _startTicks; + long spinUntil = startTicks + avgLatency * 2; do { - // Note: reading _continuation will not be hoisted out of the loop because - // we make an opaque SpinWait call. - if (_continuation == (object)s_completedSentinel) + if (CheckIsCompleted()) { return true; } @@ -245,9 +245,12 @@ private protected void SignalCompletion() // be a nop, as its TrySetCanceled will return false and the callback will exit without doing further work. Unregister(_cancellationRegistration); - if (_startTicks != 0) + // torn read is not a concern here because _startTicks is set as a part of + // rearming the source, before possible publishing to other threads. + long startTicks = _startTicks; + if (startTicks != 0) { - long latencyTicks = Stopwatch.GetTimestamp() - _startTicks; + long latencyTicks = Stopwatch.GetTimestamp() - startTicks; _avgLatencyTicks = (_avgLatencyTicks + latencyTicks) / 2; }