diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs index 3ce4906d5b2b70..f7972b5e5d0060 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs @@ -34,8 +34,10 @@ public struct ManualResetValueTaskSourceCore private TResult? _result; /// The current version of this value, used to help prevent misuse. private short _version; - /// Whether the current operation has completed. - private bool _completed; + /// Typical latency between rearming the source and completing it. + private long _avgLatencyTicks; + /// Ticks at time of rearming the source. + private long _startTicks; /// Whether to force continuations to run asynchronously. private bool _runContinuationsAsynchronously; @@ -57,7 +59,7 @@ public void Reset() _capturedContext = null; _error = null; _result = default; - _completed = false; + _startTicks = Stopwatch.GetTimestamp(); } /// Completes with a successful result. @@ -79,16 +81,61 @@ public void SetException(Exception error) /// Gets the operation version. public short Version => _version; + private bool IsCompleted() + { + if (CheckIsCompleted()) + { + return true; + } + + // Note: long reads can be torn on 32bit, + // but since this is a statistical approximation it does not matter. + long avgLatency = _avgLatencyTicks; + if (avgLatency == 0 || avgLatency > (Stopwatch.Frequency * 2 / 1000000)) + { + return false; + } + + // torn read is not a concern here because _startTicks is set as a part of + // rearming the source, before possible publishing to other threads. + long startTicks = _startTicks; + long spinUntil = startTicks + avgLatency * 2; + do + { + if (CheckIsCompleted()) + { + return true; + } + + Thread.SpinWait(1); + } + while (Stopwatch.GetTimestamp() < spinUntil); + + return false; + } + + private bool CheckIsCompleted() + { + return Volatile.Read(ref _continuation) == (object)ManualResetValueTaskSourceCoreShared.s_sentinel; + } + /// Gets the status of the operation. /// Opaque value that was provided to the 's constructor. public ValueTaskSourceStatus GetStatus(short token) { ValidateToken(token); + + if (!IsCompleted()) + { + return ValueTaskSourceStatus.Pending; + } + return - Volatile.Read(ref _continuation) is null || !_completed ? ValueTaskSourceStatus.Pending : - _error is null ? ValueTaskSourceStatus.Succeeded : - _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled : - ValueTaskSourceStatus.Faulted; + _error is null ? + ValueTaskSourceStatus.Succeeded : + _error.SourceException is OperationCanceledException ? + ValueTaskSourceStatus.Canceled : + ValueTaskSourceStatus.Faulted; } /// Gets the result of the operation. @@ -96,7 +143,7 @@ public ValueTaskSourceStatus GetStatus(short token) [StackTraceHidden] public TResult GetResult(short token) { - if (token != _version || !_completed || _error is not null) + if (token != _version || !CheckIsCompleted() || _error is not null) { ThrowForFailedGetResult(); } @@ -172,9 +219,9 @@ public void OnCompleted(Action continuation, object? state, short token } // Operation already completed, so we need to queue the supplied callback. - // At this point the storedContinuation should be the sentinal; if it's not, the instance was misused. + // At this point the storedContinuation should be the sentinel; if it's not, the instance was misused. Debug.Assert(storedContinuation is not null, $"{nameof(storedContinuation)} is null"); - if (!ReferenceEquals(storedContinuation, ManualResetValueTaskSourceCoreShared.s_sentinel)) + if (storedContinuation != (object)ManualResetValueTaskSourceCoreShared.s_sentinel) { ThrowHelper.ThrowInvalidOperationException(); } @@ -209,11 +256,19 @@ private void ValidateToken(short token) /// Signals that the operation has completed. Invoked after the result or error has been set. private void SignalCompletion() { - if (_completed) + if (CheckIsCompleted()) { ThrowHelper.ThrowInvalidOperationException(); } - _completed = true; + + // torn read is not a concern here because _startTicks is set as a part of + // rearming the source, before possible publishing to other threads. + long startTicks = _startTicks; + if (startTicks != 0) + { + long latencyTicks = Stopwatch.GetTimestamp() - startTicks; + _avgLatencyTicks = (_avgLatencyTicks + latencyTicks) / 2; + } Action? continuation = Volatile.Read(ref _continuation) ?? @@ -225,6 +280,7 @@ private void SignalCompletion() _continuationState = null; object? context = _capturedContext; _capturedContext = null; + _continuation = ManualResetValueTaskSourceCoreShared.s_sentinel; if (context is null) { diff --git a/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj index 67fe73b57cd42d..f76023ccd54fd8 100644 --- a/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj +++ b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj @@ -56,6 +56,7 @@ System.Threading.Channel<T> + diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs index be40c6d7924131..9083adf98e6ac5 100644 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs @@ -4,6 +4,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Runtime.ExceptionServices; +using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Sources; @@ -92,6 +93,11 @@ private volatile /// private protected short _currentId; + /// Typical latency between rearming the source and completing it. + private protected long _avgLatencyTicks; + /// Ticks at time of rearming the source. + private protected long _startTicks; + /// Initializes the interactor. /// true if continuations should be forced to run asynchronously; otherwise, false. /// The cancellation token used to cancel the operation. @@ -139,7 +145,43 @@ private CancellationToken CancellationToken #endif /// Gets whether the operation has completed. - internal bool IsCompleted => ReferenceEquals(_continuation, s_completedSentinel); + private protected bool IsCompleted() + { + if (CheckIsCompleted()) + { + return true; + } + + // Note: long reads can be torn on 32bit, + // but since this is a statistical approximation it does not matter. + long avgLatency = _avgLatencyTicks; + if (avgLatency == 0 || avgLatency > (Stopwatch.Frequency * 2 / 1000000)) + { + return false; + } + + // torn read is not a concern here because _startTicks is set as a part of + // rearming the source, before possible publishing to other threads. + long startTicks = _startTicks; + long spinUntil = startTicks + avgLatency * 2; + do + { + if (CheckIsCompleted()) + { + return true; + } + + Thread.SpinWait(1); + } + while (Stopwatch.GetTimestamp() < spinUntil); + + return false; + } + + private protected bool CheckIsCompleted() + { + return Volatile.Read(ref _continuation) == (object)s_completedSentinel; + } /// Completes the operation with a failed state and the specified error. /// The error. @@ -203,6 +245,15 @@ private protected void SignalCompletion() // be a nop, as its TrySetCanceled will return false and the callback will exit without doing further work. Unregister(_cancellationRegistration); + // torn read is not a concern here because _startTicks is set as a part of + // rearming the source, before possible publishing to other threads. + long startTicks = _startTicks; + if (startTicks != 0) + { + long latencyTicks = Stopwatch.GetTimestamp() - startTicks; + _avgLatencyTicks = (_avgLatencyTicks + latencyTicks) / 2; + } + // NB: Assigning _continuation happens after assigning continuation dependencies (_capturedContext, _continuationState) // and effectively "commits" the entire continuation state as ready for invocation. // We must read _continuation before accessing its dependencies. @@ -364,7 +415,7 @@ public void OnCompleted(Action continuation, object? state, short token // If the set failed because there's already a delegate in _continuation, but that delegate is // something other than s_completedSentinel, something went wrong, which should only happen if // the instance was erroneously used, likely to hook up multiple continuations. - Debug.Assert(IsCompleted, $"Expected IsCompleted"); + Debug.Assert(CheckIsCompleted(), $"Expected IsCompleted"); if (!ReferenceEquals(prevContinuation, s_completedSentinel)) { Debug.Assert(prevContinuation != s_availableSentinel, "Continuation was the available sentinel."); @@ -444,11 +495,17 @@ public ValueTaskSourceStatus GetStatus(short token) ThrowIncorrectCurrentIdException(); } + if (!IsCompleted()) + { + return ValueTaskSourceStatus.Pending; + } + return - !IsCompleted ? ValueTaskSourceStatus.Pending : - _error is null ? ValueTaskSourceStatus.Succeeded : - _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled : - ValueTaskSourceStatus.Faulted; + _error is null ? + ValueTaskSourceStatus.Succeeded : + _error.SourceException is OperationCanceledException ? + ValueTaskSourceStatus.Canceled : + ValueTaskSourceStatus.Faulted; } /// Gets the result of the operation. @@ -460,7 +517,7 @@ void IValueTaskSource.GetResult(short token) ThrowIncorrectCurrentIdException(); } - if (!IsCompleted) + if (!CheckIsCompleted()) { ThrowIncompleteOperationException(); } @@ -504,7 +561,7 @@ public TResult GetResult(short token) ThrowIncorrectCurrentIdException(); } - if (!IsCompleted) + if (!CheckIsCompleted()) { ThrowIncompleteOperationException(); } @@ -533,6 +590,7 @@ public bool TryOwnAndReset() _result = default; _error = null; _capturedContext = null; + _startTicks = Stopwatch.GetTimestamp(); return true; }