-
Notifications
You must be signed in to change notification settings - Fork 5.4k
Avoid threadpool hop in fast valuetask source completions. #127202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,8 +34,10 @@ public struct ManualResetValueTaskSourceCore<TResult> | |
| private TResult? _result; | ||
| /// <summary>The current version of this value, used to help prevent misuse.</summary> | ||
| private short _version; | ||
| /// <summary>Whether the current operation has completed.</summary> | ||
| private bool _completed; | ||
| /// <summary>Typical latency between rearming the source and completing it.</summary> | ||
| private long _avgLatencyTicks; | ||
| /// <summary>Ticks at time of rearming the source.</summary> | ||
| private long _startTicks; | ||
| /// <summary>Whether to force continuations to run asynchronously.</summary> | ||
| private bool _runContinuationsAsynchronously; | ||
|
|
||
|
|
@@ -57,7 +59,7 @@ public void Reset() | |
| _capturedContext = null; | ||
| _error = null; | ||
| _result = default; | ||
| _completed = false; | ||
| _startTicks = Stopwatch.GetTimestamp(); | ||
| } | ||
|
|
||
| /// <summary>Completes with a successful result.</summary> | ||
|
|
@@ -79,24 +81,69 @@ public void SetException(Exception error) | |
| /// <summary>Gets the operation version.</summary> | ||
| public short Version => _version; | ||
|
|
||
| private bool IsCompleted() | ||
| { | ||
| if (CheckIsCompleted()) | ||
| { | ||
| return true; | ||
| } | ||
|
|
||
| // Note: long reads can be torn on 32bit, | ||
| // but since this is a statistical approximation it does not matter. | ||
| long avgLatency = _avgLatencyTicks; | ||
| if (avgLatency == 0 || avgLatency > (Stopwatch.Frequency * 2 / 1000000)) | ||
| { | ||
| return false; | ||
| } | ||
|
|
||
| // torn read is not a concern here because _startTicks is set as a part of | ||
| // rearming the source, before possible publishing to other threads. | ||
| long startTicks = _startTicks; | ||
| long spinUntil = startTicks + avgLatency * 2; | ||
| do | ||
| { | ||
| if (CheckIsCompleted()) | ||
| { | ||
| return true; | ||
| } | ||
|
|
||
| Thread.SpinWait(1); | ||
| } | ||
| while (Stopwatch.GetTimestamp() < spinUntil); | ||
|
Comment on lines
+103
to
+112
|
||
|
|
||
|
Comment on lines
+91
to
+113
|
||
| return false; | ||
| } | ||
|
|
||
| private bool CheckIsCompleted() | ||
| { | ||
| return Volatile.Read(ref _continuation) == (object)ManualResetValueTaskSourceCoreShared.s_sentinel; | ||
| } | ||
|
|
||
| /// <summary>Gets the status of the operation.</summary> | ||
| /// <param name="token">Opaque value that was provided to the <see cref="ValueTask"/>'s constructor.</param> | ||
| public ValueTaskSourceStatus GetStatus(short token) | ||
| { | ||
| ValidateToken(token); | ||
|
|
||
| if (!IsCompleted()) | ||
| { | ||
| return ValueTaskSourceStatus.Pending; | ||
| } | ||
|
|
||
| return | ||
| Volatile.Read(ref _continuation) is null || !_completed ? ValueTaskSourceStatus.Pending : | ||
| _error is null ? ValueTaskSourceStatus.Succeeded : | ||
| _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled : | ||
| ValueTaskSourceStatus.Faulted; | ||
| _error is null ? | ||
| ValueTaskSourceStatus.Succeeded : | ||
| _error.SourceException is OperationCanceledException ? | ||
| ValueTaskSourceStatus.Canceled : | ||
| ValueTaskSourceStatus.Faulted; | ||
| } | ||
|
|
||
| /// <summary>Gets the result of the operation.</summary> | ||
| /// <param name="token">Opaque value that was provided to the <see cref="ValueTask"/>'s constructor.</param> | ||
| [StackTraceHidden] | ||
| public TResult GetResult(short token) | ||
| { | ||
| if (token != _version || !_completed || _error is not null) | ||
| if (token != _version || !CheckIsCompleted() || _error is not null) | ||
| { | ||
| ThrowForFailedGetResult(); | ||
| } | ||
|
|
@@ -172,9 +219,9 @@ public void OnCompleted(Action<object?> continuation, object? state, short token | |
| } | ||
|
|
||
| // Operation already completed, so we need to queue the supplied callback. | ||
| // At this point the storedContinuation should be the sentinal; if it's not, the instance was misused. | ||
| // At this point the storedContinuation should be the sentinel; if it's not, the instance was misused. | ||
| Debug.Assert(storedContinuation is not null, $"{nameof(storedContinuation)} is null"); | ||
| if (!ReferenceEquals(storedContinuation, ManualResetValueTaskSourceCoreShared.s_sentinel)) | ||
| if (storedContinuation != (object)ManualResetValueTaskSourceCoreShared.s_sentinel) | ||
| { | ||
| ThrowHelper.ThrowInvalidOperationException(); | ||
| } | ||
|
|
@@ -209,11 +256,19 @@ private void ValidateToken(short token) | |
| /// <summary>Signals that the operation has completed. Invoked after the result or error has been set.</summary> | ||
| private void SignalCompletion() | ||
| { | ||
| if (_completed) | ||
| if (CheckIsCompleted()) | ||
| { | ||
| ThrowHelper.ThrowInvalidOperationException(); | ||
| } | ||
| _completed = true; | ||
|
|
||
| // torn read is not a concern here because _startTicks is set as a part of | ||
| // rearming the source, before possible publishing to other threads. | ||
| long startTicks = _startTicks; | ||
| if (startTicks != 0) | ||
| { | ||
| long latencyTicks = Stopwatch.GetTimestamp() - startTicks; | ||
| _avgLatencyTicks = (_avgLatencyTicks + latencyTicks) / 2; | ||
| } | ||
|
|
||
| Action<object?>? continuation = | ||
| Volatile.Read(ref _continuation) ?? | ||
|
|
@@ -225,6 +280,7 @@ private void SignalCompletion() | |
| _continuationState = null; | ||
| object? context = _capturedContext; | ||
| _capturedContext = null; | ||
| _continuation = ManualResetValueTaskSourceCoreShared.s_sentinel; | ||
|
|
||
| if (context is null) | ||
| { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -56,6 +56,7 @@ System.Threading.Channel<T></PackageDescription> | |
| <ProjectReference Include="$(LibrariesProjectRoot)System.Memory\src\System.Memory.csproj" /> | ||
| <ProjectReference Include="$(LibrariesProjectRoot)System.Runtime\src\System.Runtime.csproj" /> | ||
| <ProjectReference Include="$(LibrariesProjectRoot)System.Threading\src\System.Threading.csproj" /> | ||
| <ProjectReference Include="$(LibrariesProjectRoot)System.Threading.Thread\src\System.Threading.Thread.csproj" /> | ||
| <ProjectReference Include="$(LibrariesProjectRoot)System.Threading.ThreadPool\src\System.Threading.ThreadPool.csproj" /> | ||
|
Comment on lines
56
to
60
|
||
| </ItemGroup> | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ | |
| using System.Collections.Generic; | ||
| using System.Diagnostics; | ||
| using System.Runtime.ExceptionServices; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using System.Threading.Tasks.Sources; | ||
|
|
||
|
|
@@ -92,6 +93,11 @@ private volatile | |
| /// </remarks> | ||
| private protected short _currentId; | ||
|
|
||
| /// <summary>Typical latency between rearming the source and completing it.</summary> | ||
| private protected long _avgLatencyTicks; | ||
| /// <summary>Ticks at time of rearming the source.</summary> | ||
| private protected long _startTicks; | ||
|
|
||
| /// <summary>Initializes the interactor.</summary> | ||
| /// <param name="runContinuationsAsynchronously">true if continuations should be forced to run asynchronously; otherwise, false.</param> | ||
| /// <param name="cancellationToken">The cancellation token used to cancel the operation.</param> | ||
|
|
@@ -139,7 +145,43 @@ private CancellationToken CancellationToken | |
| #endif | ||
|
|
||
| /// <summary>Gets whether the operation has completed.</summary> | ||
| internal bool IsCompleted => ReferenceEquals(_continuation, s_completedSentinel); | ||
| private protected bool IsCompleted() | ||
| { | ||
| if (CheckIsCompleted()) | ||
| { | ||
| return true; | ||
| } | ||
|
|
||
| // Note: long reads can be torn on 32bit, | ||
| // but since this is a statistical approximation it does not matter. | ||
| long avgLatency = _avgLatencyTicks; | ||
| if (avgLatency == 0 || avgLatency > (Stopwatch.Frequency * 2 / 1000000)) | ||
| { | ||
| return false; | ||
| } | ||
|
|
||
| // torn read is not a concern here because _startTicks is set as a part of | ||
| // rearming the source, before possible publishing to other threads. | ||
| long startTicks = _startTicks; | ||
| long spinUntil = startTicks + avgLatency * 2; | ||
| do | ||
| { | ||
| if (CheckIsCompleted()) | ||
| { | ||
| return true; | ||
| } | ||
|
|
||
| Thread.SpinWait(1); | ||
| } | ||
| while (Stopwatch.GetTimestamp() < spinUntil); | ||
|
Comment on lines
+155
to
+176
|
||
|
|
||
| return false; | ||
| } | ||
|
|
||
| private protected bool CheckIsCompleted() | ||
| { | ||
| return Volatile.Read(ref _continuation) == (object)s_completedSentinel; | ||
| } | ||
|
|
||
| /// <summary>Completes the operation with a failed state and the specified error.</summary> | ||
| /// <param name="exception">The error.</param> | ||
|
|
@@ -203,6 +245,15 @@ private protected void SignalCompletion() | |
| // be a nop, as its TrySetCanceled will return false and the callback will exit without doing further work. | ||
| Unregister(_cancellationRegistration); | ||
|
|
||
| // torn read is not a concern here because _startTicks is set as a part of | ||
| // rearming the source, before possible publishing to other threads. | ||
| long startTicks = _startTicks; | ||
| if (startTicks != 0) | ||
| { | ||
| long latencyTicks = Stopwatch.GetTimestamp() - startTicks; | ||
| _avgLatencyTicks = (_avgLatencyTicks + latencyTicks) / 2; | ||
| } | ||
|
|
||
| // NB: Assigning _continuation happens after assigning continuation dependencies (_capturedContext, _continuationState) | ||
| // and effectively "commits" the entire continuation state as ready for invocation. | ||
| // We must read _continuation before accessing its dependencies. | ||
|
|
@@ -364,7 +415,7 @@ public void OnCompleted(Action<object?> continuation, object? state, short token | |
| // If the set failed because there's already a delegate in _continuation, but that delegate is | ||
| // something other than s_completedSentinel, something went wrong, which should only happen if | ||
| // the instance was erroneously used, likely to hook up multiple continuations. | ||
| Debug.Assert(IsCompleted, $"Expected IsCompleted"); | ||
| Debug.Assert(CheckIsCompleted(), $"Expected IsCompleted"); | ||
| if (!ReferenceEquals(prevContinuation, s_completedSentinel)) | ||
| { | ||
| Debug.Assert(prevContinuation != s_availableSentinel, "Continuation was the available sentinel."); | ||
|
|
@@ -444,11 +495,17 @@ public ValueTaskSourceStatus GetStatus(short token) | |
| ThrowIncorrectCurrentIdException(); | ||
| } | ||
|
|
||
| if (!IsCompleted()) | ||
| { | ||
| return ValueTaskSourceStatus.Pending; | ||
| } | ||
|
|
||
| return | ||
| !IsCompleted ? ValueTaskSourceStatus.Pending : | ||
| _error is null ? ValueTaskSourceStatus.Succeeded : | ||
| _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled : | ||
| ValueTaskSourceStatus.Faulted; | ||
| _error is null ? | ||
| ValueTaskSourceStatus.Succeeded : | ||
| _error.SourceException is OperationCanceledException ? | ||
| ValueTaskSourceStatus.Canceled : | ||
| ValueTaskSourceStatus.Faulted; | ||
| } | ||
|
|
||
| /// <summary>Gets the result of the operation.</summary> | ||
|
|
@@ -460,7 +517,7 @@ void IValueTaskSource.GetResult(short token) | |
| ThrowIncorrectCurrentIdException(); | ||
| } | ||
|
|
||
| if (!IsCompleted) | ||
| if (!CheckIsCompleted()) | ||
| { | ||
| ThrowIncompleteOperationException(); | ||
| } | ||
|
|
@@ -504,7 +561,7 @@ public TResult GetResult(short token) | |
| ThrowIncorrectCurrentIdException(); | ||
| } | ||
|
|
||
| if (!IsCompleted) | ||
| if (!CheckIsCompleted()) | ||
| { | ||
| ThrowIncompleteOperationException(); | ||
| } | ||
|
|
@@ -533,6 +590,7 @@ public bool TryOwnAndReset() | |
| _result = default; | ||
| _error = null; | ||
| _capturedContext = null; | ||
| _startTicks = Stopwatch.GetTimestamp(); | ||
| return true; | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change replaces the
_completedbool with twolongfields, significantly increasing the size ofManualResetValueTaskSourceCore<TResult>(a public, widely-used struct). Because it’s frequently embedded in other structs/classes and passed by value, the extra 16 bytes can have measurable cache/copying impact. Please include perf data justifying the tradeoff (or consider a smaller representation / feature-gating the latency tracking).