diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index a0479bfc336298..4fb1a4cbcf4704 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using Microsoft.Win32.SafeHandles; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Runtime.CompilerServices; @@ -125,13 +126,17 @@ private enum State #endif public readonly SocketAsyncContext AssociatedContext; - public AsyncOperation Next = null!; // initialized by helper called from ctor + public AsyncOperation? Next; protected object? CallbackOrEvent; public SocketError ErrorCode; public byte[]? SocketAddress; public int SocketAddressLen; public CancellationTokenRegistration CancellationRegistration; + protected bool AllowPooling => _state == (int)State.Complete; + + public bool IsCancelled => _state == (int)State.Cancelled; + public ManualResetEventSlim? Event { get { return CallbackOrEvent as ManualResetEventSlim; } @@ -141,13 +146,12 @@ public ManualResetEventSlim? Event public AsyncOperation(SocketAsyncContext context) { AssociatedContext = context; - Reset(); + Next = this; } public void Reset() { - _state = (int)State.Waiting; - Next = this; + Debug.Assert(Next == this); #if DEBUG _callbackQueued = 0; #endif @@ -192,6 +196,11 @@ public void SetWaiting() Volatile.Write(ref _state, (int)State.Waiting); } + public void PrepareForQueueing() + { + _state = (int)State.Waiting; + } + public bool TryCancel() { Trace("Enter"); @@ -254,7 +263,7 @@ public bool TryCancel() // we can't pool the object, as ProcessQueue may still have a reference to it, due to // using a pattern whereby it takes the lock to grab an item, but then releases the lock // to do further processing on the item that's still in the list. - ThreadPool.UnsafeQueueUserWorkItem(o => ((AsyncOperation)o!).InvokeCallback(allowPooling: false), this); + ThreadPool.UnsafeQueueUserWorkItem(o => ((AsyncOperation)o!).InvokeCallback(), this); } Trace("Exit"); @@ -318,7 +327,7 @@ public void DoAbort() protected abstract bool DoTryComplete(SocketAsyncContext context); - public abstract void InvokeCallback(bool allowPooling); + public abstract void InvokeCallback(); [Conditional("SOCKETASYNCCONTEXT_TRACE")] public void Trace(string message, [CallerMemberName] string? memberName = null) @@ -365,7 +374,7 @@ protected sealed override void Abort() { } set => CallbackOrEvent = value; } - public override void InvokeCallback(bool allowPooling) => + public override void InvokeCallback() => ((Action)CallbackOrEvent!)(BytesTransferred, SocketAddress, SocketAddressLen, SocketFlags.None, ErrorCode); } @@ -381,7 +390,7 @@ protected override bool DoTryComplete(SocketAsyncContext context) return SocketPal.TryCompleteSendTo(context._socket, Buffer.Span, null, ref bufferIndex, ref Offset, ref Count, Flags, SocketAddress, SocketAddressLen, ref BytesTransferred, out ErrorCode); } - public override void InvokeCallback(bool allowPooling) + public override void InvokeCallback() { var cb = (Action)CallbackOrEvent!; int bt = BytesTransferred; @@ -389,7 +398,7 @@ public override void InvokeCallback(bool allowPooling) int sal = SocketAddressLen; SocketError ec = ErrorCode; - if (allowPooling) + if (AllowPooling) { AssociatedContext.ReturnOperation(this); } @@ -410,7 +419,7 @@ protected override bool DoTryComplete(SocketAsyncContext context) return SocketPal.TryCompleteSendTo(context._socket, default(ReadOnlySpan), Buffers, ref BufferIndex, ref Offset, ref Count, Flags, SocketAddress, SocketAddressLen, ref BytesTransferred, out ErrorCode); } - public override void InvokeCallback(bool allowPooling) + public override void InvokeCallback() { var cb = (Action)CallbackOrEvent!; int bt = BytesTransferred; @@ -418,7 +427,7 @@ public override void InvokeCallback(bool allowPooling) int sal = SocketAddressLen; SocketError ec = ErrorCode; - if (allowPooling) + if (AllowPooling) { AssociatedContext.ReturnOperation(this); } @@ -455,7 +464,7 @@ protected sealed override void Abort() { } set => CallbackOrEvent = value; } - public override void InvokeCallback(bool allowPooling) => + public override void InvokeCallback() => ((Action)CallbackOrEvent!)( BytesTransferred, SocketAddress, SocketAddressLen, ReceivedFlags, ErrorCode); } @@ -483,7 +492,7 @@ protected override bool DoTryComplete(SocketAsyncContext context) } } - public override void InvokeCallback(bool allowPooling) + public override void InvokeCallback() { var cb = (Action)CallbackOrEvent!; int bt = BytesTransferred; @@ -492,7 +501,7 @@ public override void InvokeCallback(bool allowPooling) SocketFlags rf = ReceivedFlags; SocketError ec = ErrorCode; - if (allowPooling) + if (AllowPooling) { AssociatedContext.ReturnOperation(this); } @@ -510,7 +519,7 @@ public BufferListReceiveOperation(SocketAsyncContext context) : base(context) { protected override bool DoTryComplete(SocketAsyncContext context) => SocketPal.TryCompleteReceiveFrom(context._socket, default(Span), Buffers, Flags, SocketAddress, ref SocketAddressLen, out BytesTransferred, out ReceivedFlags, out ErrorCode); - public override void InvokeCallback(bool allowPooling) + public override void InvokeCallback() { var cb = (Action)CallbackOrEvent!; int bt = BytesTransferred; @@ -519,7 +528,7 @@ public override void InvokeCallback(bool allowPooling) SocketFlags rf = ReceivedFlags; SocketError ec = ErrorCode; - if (allowPooling) + if (AllowPooling) { AssociatedContext.ReturnOperation(this); } @@ -563,7 +572,7 @@ public Action C protected override bool DoTryComplete(SocketAsyncContext context) => SocketPal.TryCompleteReceiveMessageFrom(context._socket, Buffer.Span, Buffers, Flags, SocketAddress!, ref SocketAddressLen, IsIPv4, IsIPv6, out BytesTransferred, out ReceivedFlags, out IPPacketInformation, out ErrorCode); - public override void InvokeCallback(bool allowPooling) => + public override void InvokeCallback() => ((Action)CallbackOrEvent!)( BytesTransferred, SocketAddress!, SocketAddressLen, ReceivedFlags, IPPacketInformation, ErrorCode); } @@ -589,7 +598,7 @@ protected override bool DoTryComplete(SocketAsyncContext context) return completed; } - public override void InvokeCallback(bool allowPooling) + public override void InvokeCallback() { var cb = (Action)CallbackOrEvent!; IntPtr fd = AcceptedFileDescriptor; @@ -597,7 +606,7 @@ public override void InvokeCallback(bool allowPooling) int sal = SocketAddressLen; SocketError ec = ErrorCode; - if (allowPooling) + if (AllowPooling) { AssociatedContext.ReturnOperation(this); } @@ -624,7 +633,7 @@ protected override bool DoTryComplete(SocketAsyncContext context) return result; } - public override void InvokeCallback(bool allowPooling) => + public override void InvokeCallback() => ((Action)CallbackOrEvent!)(ErrorCode); } @@ -644,264 +653,324 @@ public Action Callback set => CallbackOrEvent = value; } - public override void InvokeCallback(bool allowPooling) => + public override void InvokeCallback() => ((Action)CallbackOrEvent!)(BytesTransferred, ErrorCode); protected override bool DoTryComplete(SocketAsyncContext context) => SocketPal.TryCompleteSendFile(context._socket, FileHandle, ref Offset, ref Count, ref BytesTransferred, out ErrorCode); } - // In debug builds, this struct guards against: - // (1) Unexpected lock reentrancy, which should never happen - // (2) Deadlock, by setting a reasonably large timeout - private readonly struct LockToken : IDisposable - { - private readonly object _lockObject; - - public LockToken(object lockObject) - { - Debug.Assert(lockObject != null); - - _lockObject = lockObject; - - Debug.Assert(!Monitor.IsEntered(_lockObject)); - -#if DEBUG - bool success = Monitor.TryEnter(_lockObject, 10000); - Debug.Assert(success, "Timed out waiting for queue lock"); -#else - Monitor.Enter(_lockObject); -#endif - } - - public void Dispose() - { - Debug.Assert(Monitor.IsEntered(_lockObject)); - Monitor.Exit(_lockObject); - } - } - private struct OperationQueue where TOperation : AsyncOperation { - // Quick overview: - // - // When attempting to perform an IO operation, the caller first checks IsReady, - // and if true, attempts to perform the operation itself. - // If this returns EWOULDBLOCK, or if the queue was not ready, then the operation - // is enqueued by calling StartAsyncOperation and the state becomes Waiting. - // When an epoll notification is received, we check if the state is Waiting, - // and if so, change the state to Processing and enqueue a workitem to the threadpool - // to try to perform the enqueued operations. - // If an operation is successfully performed, we remove it from the queue, - // enqueue another threadpool workitem to process the next item in the queue (if any), - // and call the user's completion callback. - // If we successfully process all enqueued operations, then the state becomes Ready; - // otherwise, the state becomes Waiting and we wait for another epoll notification. - - private enum QueueState : byte - { - Ready = 0, // Indicates that data MAY be available on the socket. - // Queue must be empty. - Waiting = 1, // Indicates that data is definitely not available on the socket. - // Queue must not be empty. - Processing = 2, // Indicates that a thread pool item has been scheduled (and may - // be executing) to process the IO operations in the queue. - // Queue must not be empty. - Stopped = 3, // Indicates that the queue has been stopped because the - // socket has been closed. - // Queue must be empty. - } - - // These fields define the queue state. - - private QueueState _state; // See above + private int _processRequests; // Tracks whether operations in the queue are being processed. private bool _isNextOperationSynchronous; private int _sequenceNumber; // This sequence number is updated when we receive an epoll notification. // It allows us to detect when a new epoll notification has arrived // since the last time we checked the state of the queue. // If this happens, we MUST retry the operation, otherwise we risk // "losing" the notification and causing the operation to pend indefinitely. - private AsyncOperation? _tail; // Queue of pending IO operations to process when data becomes available. - // The _queueLock is used to ensure atomic access to the queue state above. - // The lock is only ever held briefly, to read and/or update queue state, and - // never around any external call, e.g. OS call or user code invocation. - private object _queueLock; - - private LockToken Lock() => new LockToken(_queueLock); + // _queue contains the executing/pending operations. + // Though Sockets can have multiple pending operations, the + // common case is there is only one. + // + // _queue has one of these values: + // + // * null: the queue is empty. + // * DisposedSentinel: the queue was disposed. + // * x: the queue contains a single operation x. + // * is AsyncOperationGate: the queue contained several elements at some point. + // + // _queue is not an operation. It is used to synchronize + // access to the list of operations. + // + // _queue = gate -> last -> first -> second -> ... + // ^ | + // +-------------------------+ + // + private AsyncOperation? _queue; public bool IsNextOperationSynchronous_Speculative => _isNextOperationSynchronous; - public void Init() + private class AsyncOperationGate : AsyncOperation { - Debug.Assert(_queueLock == null); - _queueLock = new object(); + public AsyncOperationGate() : base(null!) { } - _state = QueueState.Ready; - _sequenceNumber = 0; + protected override bool DoTryComplete(SocketAsyncContext context) => throw new InvalidOperationException(); + + public override void InvokeCallback() => throw new InvalidOperationException(); + + protected override void Abort() => throw new InvalidOperationException(); } - // IsReady returns the current _sequenceNumber, which must be passed to StartAsyncOperation below. + private class SentinelOperation : AsyncOperation + { + public SentinelOperation() : base(null!) { } + + protected override bool DoTryComplete(SocketAsyncContext context) => throw new InvalidOperationException(); + + public override void InvokeCallback() => throw new InvalidOperationException(); + + protected override void Abort() => throw new InvalidOperationException(); + } + + private static readonly SentinelOperation DisposedSentinel = new SentinelOperation(); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static bool IsDisposed(AsyncOperation? queue) => object.ReferenceEquals(queue, DisposedSentinel); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static bool IsGate(AsyncOperation queue) => queue.GetType() == typeof(AsyncOperationGate); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool IsReady(SocketAsyncContext context, out int observedSequenceNumber) { - using (Lock()) - { - observedSequenceNumber = _sequenceNumber; - bool isReady = (_state == QueueState.Ready) || (_state == QueueState.Stopped); + bool isReady = QueueGetFirst() is null; + observedSequenceNumber = isReady ? Volatile.Read(ref _sequenceNumber) : _sequenceNumber; - Trace(context, $"{isReady}"); + Trace(context, $"{isReady}"); - return isReady; - } + return isReady; } - // Return true for pending, false for completed synchronously (including failure and abort) - public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation, int observedSequenceNumber, CancellationToken cancellationToken = default) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private AsyncOperation? QueueGetFirst() { - Trace(context, $"Enter"); + AsyncOperation? queue = Volatile.Read(ref _queue); + if (queue is null || IsDisposed(queue)) + { + return null; + } - if (!context._registered) + if (IsGate(queue)) { - context.Register(); + lock (queue) + { + return queue.Next?.Next; + } + } + else + { + return queue; + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private (bool isFirst, bool isDisposed) Enqueue(AsyncOperation operation) + { + Debug.Assert(operation.Next == operation); // non-queued point to self. + operation.PrepareForQueueing(); + _isNextOperationSynchronous = operation.Event != null; // assume we'll be first. + AsyncOperation? queue = Interlocked.CompareExchange(ref _queue, operation, null); + if (queue is null) + { + return (true, false); } + return EnqueueSlow(operation, queue); + } + + private (bool isFirst, bool isDisposed) EnqueueSlow(AsyncOperation operation, AsyncOperation? queue) + { + Debug.Assert(queue != null); + + SpinWait spin = default; while (true) { - bool doAbort = false; - using (Lock()) + if (IsDisposed(queue)) + { + _isNextOperationSynchronous = false; + return (false, true); + } + else { - switch (_state) + // Install a gate. + if (!IsGate(queue)) { - case QueueState.Ready: - if (observedSequenceNumber != _sequenceNumber) - { - // The queue has become ready again since we previously checked it. - // So, we need to retry the operation before we enqueue it. - Debug.Assert(observedSequenceNumber - _sequenceNumber < 10000, "Very large sequence number increase???"); - observedSequenceNumber = _sequenceNumber; - break; - } - - // Caller tried the operation and got an EWOULDBLOCK, so we need to transition. - _state = QueueState.Waiting; - goto case QueueState.Waiting; - - case QueueState.Waiting: - case QueueState.Processing: - // Enqueue the operation. - Debug.Assert(operation.Next == operation, "Expected operation.Next == operation"); + AsyncOperation singleOperation = queue; + Debug.Assert(singleOperation.Next == singleOperation); - if (_tail == null) - { - Debug.Assert(!_isNextOperationSynchronous); - _isNextOperationSynchronous = operation.Event != null; - } - else + AsyncOperation gate = new AsyncOperationGate(); + gate.Next = singleOperation; + queue = Interlocked.CompareExchange(ref _queue, gate, singleOperation); + if (queue != singleOperation) + { + if (queue is null) { - operation.Next = _tail.Next; - _tail.Next = operation; + queue = Interlocked.CompareExchange(ref _queue, operation, null); + if (queue is null) + { + return (true, false); + } } + spin.SpinOnce(); + continue; + } + queue = gate; + } - _tail = operation; - Trace(context, $"Leave, enqueued {IdOf(operation)}"); + lock (queue) + { + if (object.ReferenceEquals(_queue, DisposedSentinel)) + { + _isNextOperationSynchronous = false; + return (false, true); + } - // Now that the object is enqueued, hook up cancellation. - // Note that it's possible the call to register itself could - // call TryCancel, so we do this after the op is fully enqueued. - if (cancellationToken.CanBeCanceled) - { - operation.CancellationRegistration = cancellationToken.UnsafeRegister(s => ((TOperation)s!).TryCancel(), operation); - } + AsyncOperation? last = queue.Next; + if (last == null) // empty queue + { + queue.Next = operation; + return (true, false); + } + else + { + AsyncOperation first = last.Next!; + _isNextOperationSynchronous = first.Event != null; + queue.Next = operation; // gate points to new last + operation.Next = first; // new last points to first + last.Next = operation; // previous last points to new last + return (false, false); + } + } + } + } + } - return true; + // Return true for pending, false for completed synchronously (including failure and abort) + public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation, int observedSequenceNumber, CancellationToken cancellationToken = default) + { + Trace(context, $"Enter"); - case QueueState.Stopped: - Debug.Assert(_tail == null); - doAbort = true; - break; + if (!context._registered) + { + context.Register(); + } - default: - Environment.FailFast("unexpected queue state"); - break; - } + // The operation may complete as soon as it gets added to the queue. + // To ensure cancellation registration is disposed, we must register before enqueueing. + // The registration may cause TryCancel to be called if the token was already cancelled. + bool hasCancellableToken = cancellationToken.CanBeCanceled; + if (hasCancellableToken) + { + operation.CancellationRegistration = cancellationToken.UnsafeRegister(s => ((TOperation)s!).TryCancel(), operation); + if (operation.IsCancelled) + { + return true; } + } + + (bool isFirst, bool doAbort) = Enqueue(operation); - if (doAbort) + if (doAbort) + { + if (hasCancellableToken) { - operation.DoAbort(); - Trace(context, $"Leave, queue stopped"); - return false; + operation.CancellationRegistration.Dispose(); } - // Retry the operation. - if (operation.TryComplete(context)) + operation.DoAbort(); + Trace(context, $"Leave, queue stopped"); + + return false; + } + else + { + if (isFirst && observedSequenceNumber != Volatile.Read(ref _sequenceNumber)) { - Trace(context, $"Leave, retry succeeded"); - return false; + EnsureProcessingIfNeeded(context); } + + Trace(context, $"Leave, enqueued {IdOf(operation)}"); + return true; } } public AsyncOperation? ProcessSyncEventOrGetAsyncEvent(SocketAsyncContext context, bool skipAsyncEvents = false) { - AsyncOperation op; - using (Lock()) - { - Trace(context, $"Enter"); + Trace(context, "Enter"); + + Interlocked.Increment(ref _sequenceNumber); - switch (_state) + while (true) + { + AsyncOperation? op; + if (Interlocked.Increment(ref _processRequests) == 1) { - case QueueState.Ready: - Debug.Assert(_tail == null, "State == Ready but queue is not empty!"); - _sequenceNumber++; - Trace(context, $"Exit (previously ready)"); - return null; + op = QueueGetFirst(); + if (op != null) + { + ManualResetEventSlim? e = op.Event; + if (e != null) + { + Trace(context, "Exit (process sync)"); - case QueueState.Waiting: - Debug.Assert(_tail != null, "State == Waiting but queue is empty!"); - op = _tail.Next; - Debug.Assert(_isNextOperationSynchronous == (op.Event != null)); - if (skipAsyncEvents && !_isNextOperationSynchronous) + // Sync operation. Signal waiting thread to continue processing. + e.Set(); + return null; + } + else { - // Return the operation to indicate that the async operation was not processed, without making - // any state changes because async operations are being skipped + // We got called on epoll thread, but processing will be started on ThreadPool. + if (skipAsyncEvents) + { + Volatile.Write(ref _processRequests, 0); + Trace(context, "Exit (process defer)"); + } + else + { + Trace(context, "Exit (process async)"); + } return op; } - - _state = QueueState.Processing; - // Break out and release lock - break; - - case QueueState.Processing: - Debug.Assert(_tail != null, "State == Processing but queue is empty!"); - _sequenceNumber++; - Trace(context, $"Exit (currently processing)"); - return null; - - case QueueState.Stopped: - Debug.Assert(_tail == null); - Trace(context, $"Exit (stopped)"); - return null; - - default: - Environment.FailFast("unexpected queue state"); + } + if (Interlocked.Exchange(ref _processRequests, 0) == 1) + { + Trace(context, "Exit (empty)"); return null; + } + } + else + { + Trace(context, "Exit (already processing)"); + // Already processing. + return null; } } + } - ManualResetEventSlim? e = op.Event; - if (e != null) - { - // Sync operation. Signal waiting thread to continue processing. - e.Set(); - return null; - } - else + private void EnsureProcessingIfNeeded(SocketAsyncContext context) + { + Trace(context, $"Enter"); + + while (true) { - // Async operation. The caller will figure out how to process the IO. - Debug.Assert(!skipAsyncEvents); - return op; + AsyncOperation? op; + if (Interlocked.Increment(ref _processRequests) == 1) + { + op = QueueGetFirst(); + if (op != null) + { + Trace(context, "Exit (processing)"); + + // Dispatch processing. + op.Dispatch(); + return; + } + + if (Interlocked.Exchange(ref _processRequests, 0) == 1) + { + Trace(context, "Exit (empty)"); + return; + } + } + else + { + Trace(context, "Exit (already processing)"); + // Already processing. + return; + } } } @@ -921,7 +990,7 @@ internal void ProcessAsyncOperation(TOperation op) // request for a previous operation could affect a subsequent one) // and here we know the operation has completed. op.CancellationRegistration.Dispose(); - op.InvokeCallback(allowPooling: true); + op.InvokeCallback(); } } @@ -932,31 +1001,47 @@ public enum OperationResult Cancelled = 2 } - public OperationResult ProcessQueuedOperation(TOperation op) + private AsyncOperation? DequeueFirstAndGetNext(AsyncOperation first) { - SocketAsyncContext context = op.AssociatedContext; - - int observedSequenceNumber; - using (Lock()) + AsyncOperation? queue = Interlocked.CompareExchange(ref _queue, null, first); + Debug.Assert(queue != null); + _isNextOperationSynchronous = false; // assume we're the only operation + if (object.ReferenceEquals(queue, first) || IsDisposed(queue)) { - Trace(context, $"Enter"); + return null; + } - if (_state == QueueState.Stopped) + Debug.Assert(IsGate(queue)); + lock (queue) + { + if (queue.Next == first) // we're the last -> single element { - Debug.Assert(_tail == null); - Trace(context, $"Exit (stopped)"); - return OperationResult.Cancelled; + Debug.Assert(first.Next == first); // verify we're a single element list + queue.Next = null; + return null; } else { - Debug.Assert(_state == QueueState.Processing, $"_state={_state} while processing queue!"); - Debug.Assert(_tail != null, "Unexpected empty queue while processing I/O"); - Debug.Assert(op == _tail.Next, "Operation is not at head of queue???"); - observedSequenceNumber = _sequenceNumber; + AsyncOperation? last = queue.Next; + Debug.Assert(last != null); // there is an element + Debug.Assert(last.Next == first); // we're first + AsyncOperation newFirst = first.Next!; + last.Next = newFirst; // skip operation + first.Next = first; // point to self + _isNextOperationSynchronous = newFirst.Event != null; + return newFirst; } } + } + + public OperationResult ProcessQueuedOperation(TOperation op) + { + SocketAsyncContext context = op.AssociatedContext; + Trace(context, $"Enter"); bool wasCompleted = false; + int observedProcessRequests = Volatile.Read(ref _processRequests); + Debug.Assert(observedProcessRequests > 0); while (true) { // Try to change the op state to Running. @@ -975,188 +1060,187 @@ public OperationResult ProcessQueuedOperation(TOperation op) break; } + // Operation indicates we need to wait. op.SetWaiting(); - // Check for retry and reset queue state. - - using (Lock()) + // Try to stop processing until a new event arrives. + int processRequests = Interlocked.CompareExchange(ref _processRequests, 0, observedProcessRequests); + if (processRequests == observedProcessRequests) { - if (_state == QueueState.Stopped) - { - Debug.Assert(_tail == null); - Trace(context, $"Exit (stopped)"); - return OperationResult.Cancelled; - } - else - { - Debug.Assert(_state == QueueState.Processing, $"_state={_state} while processing queue!"); - - if (observedSequenceNumber != _sequenceNumber) - { - // We received another epoll notification since we previously checked it. - // So, we need to retry the operation. - Debug.Assert(observedSequenceNumber - _sequenceNumber < 10000, "Very large sequence number increase???"); - observedSequenceNumber = _sequenceNumber; - } - else - { - _state = QueueState.Waiting; - Trace(context, $"Exit (received EAGAIN)"); - return OperationResult.Pending; - } - } + Trace(context, $"Leave (pending)"); + return OperationResult.Pending; } + observedProcessRequests = processRequests; } // Remove the op from the queue and see if there's more to process. - - AsyncOperation? nextOp = null; - using (Lock()) + AsyncOperation? nextOp = DequeueFirstAndGetNext(op); + while (true) { - if (_state == QueueState.Stopped) + if (nextOp != null) { - Debug.Assert(_tail == null); - Trace(context, $"Exit (stopped)"); + Trace(context, $"Leave (dispatch)"); + nextOp.Dispatch(); + break; } - else + // Try to stop when there are no more operations. + int processRequests = Interlocked.CompareExchange(ref _processRequests, 0, observedProcessRequests); + if (processRequests == observedProcessRequests) { - Debug.Assert(_state == QueueState.Processing, $"_state={_state} while processing queue!"); - Debug.Assert(_tail.Next == op, "Queue modified while processing queue"); - - if (op == _tail) - { - // No more operations to process - _tail = null; - _isNextOperationSynchronous = false; - _state = QueueState.Ready; - _sequenceNumber++; - Trace(context, $"Exit (finished queue)"); - } - else - { - // Pop current operation and advance to next - nextOp = _tail.Next = op.Next; - _isNextOperationSynchronous = nextOp.Event != null; - } + Trace(context, $"Leave (stop)"); + break; } + observedProcessRequests = processRequests; + nextOp = QueueGetFirst(); } - nextOp?.Dispatch(); - return (wasCompleted ? OperationResult.Completed : OperationResult.Cancelled); } - public void CancelAndContinueProcessing(TOperation op) + private void RemoveQueued(AsyncOperation operation) { - // Note, only sync operations use this method. - Debug.Assert(op.Event != null); - - // Remove operation from queue. - // Note it must be there since it can only be processed and removed by the caller. - AsyncOperation? nextOp = null; - using (Lock()) + _isNextOperationSynchronous = false; // assume we're the only + AsyncOperation? queue = Interlocked.CompareExchange(ref _queue, null, operation); + if (object.ReferenceEquals(queue, operation)) { - if (_state == QueueState.Stopped) - { - Debug.Assert(_tail == null); - } - else + return; + } + if (queue is object && IsGate(queue)) + { + lock (queue) { - Debug.Assert(_tail != null, "Unexpected empty queue in CancelAndContinueProcessing"); - - if (_tail.Next == op) + if (queue.Next == operation) // We're the last { - // We're the head of the queue - if (op == _tail) + if (operation.Next == operation) // We're the only { - // No more operations - _tail = null; - _isNextOperationSynchronous = false; + queue.Next = null; // empty } else { - // Pop current operation and advance to next - _tail.Next = op.Next; - _isNextOperationSynchronous = op.Next.Event != null; - } - - // We're the first op in the queue. - if (_state == QueueState.Processing) - { - // The queue has already handed off execution responsibility to us. - // We need to dispatch to the next op. - if (_tail == null) - { - _state = QueueState.Ready; - _sequenceNumber++; - } - else - { - nextOp = _tail.Next; - } - } - else if (_state == QueueState.Waiting) - { - if (_tail == null) + // Find newLast + AsyncOperation newLast = operation.Next!; { - _state = QueueState.Ready; + AsyncOperation newLastNext = newLast.Next!; + while (newLastNext != operation) + { + newLast = newLastNext; + } } + newLast.Next = operation.Next; // last point to first + queue.Next = newLast; // gate points to last + operation.Next = operation; // point to self + AsyncOperation first = newLast.Next!; + _isNextOperationSynchronous = first.Event != null; } } else { - // We're not the head of the queue. - // Just find this op and remove it. - AsyncOperation current = _tail.Next; - while (current.Next != op) + AsyncOperation? last = queue.Next; + if (last != null) { - current = current.Next; - } - - if (current.Next == _tail) - { - _tail = current; + AsyncOperation it = last; + do + { + AsyncOperation next = it.Next!; + if (next == operation) + { + it.Next = operation.Next; // skip operation + operation.Next = operation; // point to self + AsyncOperation first = last.Next!; + _isNextOperationSynchronous = first.Event != null; + return; + } + it = next; + } while (it != last); } - current.Next = current.Next.Next; } } } + } - nextOp?.Dispatch(); + public void CancelAndContinueProcessing(TOperation op, ManualResetEventSlim e) + { + // Note, only sync operations use this method. + Debug.Assert(op.Event != null); + + // Become responsible for processing. + SpinWait spinWait = default; + while (true) + { + if (Interlocked.CompareExchange(ref _processRequests, 1, 0) == 0) + { + break; + } + if (e.IsSet) + { + break; + } + spinWait.SpinOnce(); + } + + // Remove the operation. + RemoveQueued(op); + + // Try to stop when there are no more operations. + int observedProcessRequests = 1; + while (true) + { + int processRequests = Interlocked.CompareExchange(ref _processRequests, 0, observedProcessRequests); + if (processRequests == observedProcessRequests) + { + return; + } + AsyncOperation? first = QueueGetFirst(); + if (first != null) + { + first.Dispatch(); + return; + } + observedProcessRequests = processRequests; + } } // Called when the socket is closed. public bool StopAndAbort(SocketAsyncContext context) { + Trace(context, $"Enter"); bool aborted = false; + _isNextOperationSynchronous = false; + AsyncOperation? queue = Interlocked.Exchange(ref _queue, DisposedSentinel); + // We should be called exactly once, by SafeSocketHandle. - Debug.Assert(_state != QueueState.Stopped); + Debug.Assert(queue != DisposedSentinel); - using (Lock()) + if (queue != null) { - Trace(context, $"Enter"); - - Debug.Assert(_state != QueueState.Stopped); - - _state = QueueState.Stopped; - - if (_tail != null) + AsyncOperation? gate = queue as AsyncOperationGate; + if (gate != null) { - AsyncOperation op = _tail; - do + // Synchronize with Enqueue. + lock (gate) + { } + + AsyncOperation? last = gate.Next; + if (last != null) { - aborted |= op.TryCancel(); - op = op.Next; - } while (op != _tail); + AsyncOperation op = last; + do + { + aborted |= op.TryCancel(); + op = op.Next!; + } while (op != last); + } + } + else + { + // queue is single operation + aborted |= queue.TryCancel(); } - - _tail = null; - _isNextOperationSynchronous = false; - - Trace(context, $"Exit"); } + Trace(context, $"Exit"); + return aborted; } @@ -1168,7 +1252,7 @@ public void Trace(SocketAsyncContext context, string message, [CallerMemberName] typeof(TOperation) == typeof(WriteOperation) ? "send" : "???"; - OutputTrace($"{IdOf(context)}-{queueType}.{memberName}: {message}, {_state}-{_sequenceNumber}, {((_tail == null) ? "empty" : "not empty")}"); + OutputTrace($"{IdOf(context)}-{queueType}.{memberName}: {message}, {(_processRequests > 0 ? "processing" : "not processing")}-{_sequenceNumber}, {((QueueGetFirst() == null) ? "empty" : "not empty")}"); } } @@ -1184,9 +1268,6 @@ public void Trace(SocketAsyncContext context, string message, [CallerMemberName] public SocketAsyncContext(SafeSocketHandle socket) { _socket = socket; - - _receiveQueue.Init(); - _sendQueue.Init(); } private void Register() @@ -1287,6 +1368,12 @@ private void PerformSyncOperation(ref OperationQueue que break; } + // Operation was canceled by aborting the queue. + if (operation.IsCancelled) + { + break; + } + // Reset the event now to avoid lost notifications if the processing is unsuccessful. e.Reset(); @@ -1314,7 +1401,7 @@ private void PerformSyncOperation(ref OperationQueue que if (timeoutExpired) { - queue.CancelAndContinueProcessing(operation); + queue.CancelAndContinueProcessing(operation, e); operation.ErrorCode = SocketError.TimedOut; } } @@ -1393,7 +1480,6 @@ public SocketError AcceptAsync(byte[] socketAddress, ref int socketAddressLen, o acceptedFd = operation.AcceptedFileDescriptor; errorCode = operation.ErrorCode; - ReturnOperation(operation); return errorCode; } @@ -1570,7 +1656,6 @@ public SocketError ReceiveFromAsync(Memory buffer, SocketFlags flags, byt bytesReceived = operation.BytesTransferred; errorCode = operation.ErrorCode; - ReturnOperation(operation); return errorCode; } @@ -1648,7 +1733,6 @@ public SocketError ReceiveFromAsync(IList> buffers, SocketFla bytesReceived = operation.BytesTransferred; errorCode = operation.ErrorCode; - ReturnOperation(operation); return errorCode; } @@ -1840,7 +1924,6 @@ public SocketError SendToAsync(Memory buffer, int offset, int count, Socke bytesSent = operation.BytesTransferred; errorCode = operation.ErrorCode; - ReturnOperation(operation); return errorCode; } @@ -1921,7 +2004,6 @@ public SocketError SendToAsync(IList> buffers, SocketFlags fl bytesSent = operation.BytesTransferred; errorCode = operation.ErrorCode; - ReturnOperation(operation); return errorCode; }