From fc66d5ed0eef98875857bc7c602e43c4867096e3 Mon Sep 17 00:00:00 2001 From: Geoff Kizer Date: Mon, 17 Jul 2017 15:28:05 -0700 Subject: [PATCH 1/3] perform queued IO operations om threadpool, and rework queue locking --- src/Common/src/System/Net/SafeCloseSocket.cs | 1 - .../Net/Sockets/MultipleConnectAsync.cs | 7 +- .../Net/Sockets/SocketAsyncContext.Unix.cs | 1827 ++++++++--------- .../Net/Sockets/SocketAsyncEngine.Unix.cs | 15 +- .../tests/FunctionalTests/DisconnectTest.cs | 6 + .../FunctionalTests/DualModeSocketTest.cs | 12 +- .../SocketAsyncEventArgsTest.cs | 3 +- 7 files changed, 905 insertions(+), 966 deletions(-) diff --git a/src/Common/src/System/Net/SafeCloseSocket.cs b/src/Common/src/System/Net/SafeCloseSocket.cs index 6e0d5b845702..036ae305c8dc 100644 --- a/src/Common/src/System/Net/SafeCloseSocket.cs +++ b/src/Common/src/System/Net/SafeCloseSocket.cs @@ -182,7 +182,6 @@ internal void CloseAsIs() // Now free it with blocking. innerSocket.BlockingRelease(); } - #if DEBUG } catch (Exception exception) when (!ExceptionCheck.IsFatal(exception)) diff --git a/src/System.Net.Sockets/src/System/Net/Sockets/MultipleConnectAsync.cs b/src/System.Net.Sockets/src/System/Net/Sockets/MultipleConnectAsync.cs index 29080a03761c..953aaa2479e2 100644 --- a/src/System.Net.Sockets/src/System/Net/Sockets/MultipleConnectAsync.cs +++ b/src/System.Net.Sockets/src/System/Net/Sockets/MultipleConnectAsync.cs @@ -254,7 +254,7 @@ private Exception AttemptConnection() } } - private static Exception AttemptConnection(Socket attemptSocket, SocketAsyncEventArgs args) + private Exception AttemptConnection(Socket attemptSocket, SocketAsyncEventArgs args) { try { @@ -263,9 +263,10 @@ private static Exception AttemptConnection(Socket attemptSocket, SocketAsyncEven NetEventSource.Fail(null, "attemptSocket is null!"); } - if (!attemptSocket.ConnectAsync(args)) + bool pending = attemptSocket.ConnectAsync(args); + if (!pending) { - return new SocketException((int)args.SocketError); + InternalConnectCallback(null, args); } } catch (ObjectDisposedException) diff --git a/src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index db4604fe4171..34723bb7651c 100644 --- a/src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -5,14 +5,18 @@ using Microsoft.Win32.SafeHandles; using System.Collections.Generic; using System.Diagnostics; +using System.Runtime.CompilerServices; using System.Threading; +// Disable unreachable code warning for trace code +#pragma warning disable CS0162 + namespace System.Net.Sockets { // Note on asynchronous behavior here: // The asynchronous socket operations here generally do the following: - // (1) If the operation queue is empty, try to perform the operation immediately, non-blocking. + // (1) If the operation queue is Ready (queue is empty), try to perform the operation immediately, non-blocking. // If this completes (i.e. does not return EWOULDBLOCK), then we return the results immediately // for both success (SocketError.Success) or failure. // No callback will happen; callers are expected to handle these synchronous completions themselves. @@ -20,13 +24,13 @@ namespace System.Net.Sockets // appropriate queue and return SocketError.IOPending. // Enqueuing itself may fail because the socket is closed before the operation can be enqueued; // in this case, we return SocketError.OperationAborted (which matches what Winsock would return in this case). - // (3) When the queue completes the operation, it will post a work item to the threadpool - // to call the callback with results (either success or failure). + // (3) When we receive an epoll notification for the socket, we post a work item to the threadpool + // to perform the I/O and invoke the callback with the I/O result. // Synchronous operations generally do the same, except that instead of returning IOPending, // they block on an event handle until the operation is processed by the queue. - // Also, synchronous methods return SocketError.Interrupted when enqueuing fails - // (which again matches Winsock behavior). + + // See comments on OperationQueue below for more details of how the queue coordination works. internal sealed class SocketAsyncContext { @@ -66,101 +70,135 @@ public AsyncOperation() public bool TryComplete(SocketAsyncContext context) { - Debug.Assert(_state == (int)State.Waiting, $"Unexpected _state: {_state}"); + if (TraceEnabled) TraceWithContext(context, "Enter"); - return DoTryComplete(context); - } + bool result = DoTryComplete(context); - public bool TryCompleteAsync(SocketAsyncContext context) - { - return TryCompleteOrAbortAsync(context, abort: false); - } + if (TraceEnabled) TraceWithContext(context, $"Exit, result={result}"); - public void AbortAsync() - { - bool completed = TryCompleteOrAbortAsync(null, abort: true); - Debug.Assert(completed, $"Expected TryCompleteOrAbortAsync to return true"); + return result; } - private bool TryCompleteOrAbortAsync(SocketAsyncContext context, bool abort) + public bool TrySetRunning() { - Debug.Assert(context != null || abort, $"Unexpected values: context={context}, abort={abort}"); - State oldState = (State)Interlocked.CompareExchange(ref _state, (int)State.Running, (int)State.Waiting); if (oldState == State.Cancelled) { - // This operation has been cancelled. The canceller is responsible for - // correctly updating any state that would have been handled by - // AsyncOperation.Abort. - return true; + // This operation has already been cancelled, and had its completion processed. + // Simply return true to indicate no further processing is needed. + return false; } - Debug.Assert(oldState != State.Complete && oldState != State.Running, $"Unexpected oldState: {oldState}"); + Debug.Assert(oldState == (int)State.Waiting); + return true; + } + + public bool SetComplete() + { + Debug.Assert(Volatile.Read(ref _state) == (int)State.Running); + + Volatile.Write(ref _state, (int)State.Complete); - bool completed; - if (abort) + if (CallbackOrEvent is ManualResetEventSlim e) { - Abort(); - ErrorCode = SocketError.OperationAborted; - completed = true; + e.Set(); + + // No callback needed + return false; } else { - completed = DoTryComplete(context); - } - - if (completed) - { - var @event = CallbackOrEvent as ManualResetEventSlim; - if (@event != null) - { - @event.Set(); - } - else - { - Debug.Assert(_state != (int)State.Cancelled, $"Unexpected _state: {_state}"); #if DEBUG - Debug.Assert(Interlocked.CompareExchange(ref _callbackQueued, 1, 0) == 0, $"Unexpected _callbackQueued: {_callbackQueued}"); + Debug.Assert(Interlocked.CompareExchange(ref _callbackQueued, 1, 0) == 0, $"Unexpected _callbackQueued: {_callbackQueued}"); #endif - ThreadPool.QueueUserWorkItem(o => ((AsyncOperation)o).InvokeCallback(), this); - } - - Volatile.Write(ref _state, (int)State.Complete); + // Indicate callback is needed return true; } + } + + public void SetWaiting() + { + Debug.Assert(Volatile.Read(ref _state) == (int)State.Running); Volatile.Write(ref _state, (int)State.Waiting); - return false; } - public bool Wait(int timeout) + public void DoCallback() { - if (Event.Wait(timeout)) - { - return true; - } + InvokeCallback(); + } + + public bool TryCancel() + { + if (TraceEnabled) Trace("Enter"); + // Try to transition from Waiting to Cancelled var spinWait = new SpinWait(); - for (;;) + bool keepWaiting = true; + while (keepWaiting) { int state = Interlocked.CompareExchange(ref _state, (int)State.Cancelled, (int)State.Waiting); switch ((State)state) { case State.Running: // A completion attempt is in progress. Keep busy-waiting. + if (TraceEnabled) Trace("Busy wait"); spinWait.SpinOnce(); break; case State.Complete: // A completion attempt succeeded. Consider this operation as having completed within the timeout. - return true; + if (TraceEnabled) Trace("Exit, previously completed"); + return false; case State.Waiting: // This operation was successfully cancelled. - return false; + // Break out of the loop to handle the cancellation + keepWaiting = false; + break; + + case State.Cancelled: + // Someone else cancelled the operation. + // Just return true to indicate the operation was cancelled. + // The previous canceller will have fired the completion, etc. + if (TraceEnabled) Trace("Exit, previously cancelled"); + return true; } } + + if (TraceEnabled) Trace("Cancelled, processing completion"); + + // The operation successfully cancelled. + // It's our responsibility to set the error code and queue the completion. + DoAbort(); + + var @event = CallbackOrEvent as ManualResetEventSlim; + if (@event != null) + { + @event.Set(); + } + else + { +#if DEBUG + Debug.Assert(Interlocked.CompareExchange(ref _callbackQueued, 1, 0) == 0, $"Unexpected _callbackQueued: {_callbackQueued}"); +#endif + + ThreadPool.QueueUserWorkItem(o => ((AsyncOperation)o).InvokeCallback(), this); + } + + if (TraceEnabled) Trace("Exit"); + + // Note, we leave the operation in the OperationQueue. + // When we get around to processing it, we'll see it's cancelled and skip it. + return true; + } + + // Called when op is not in the queue yet, so can't be otherwise executing + public void DoAbort() + { + Abort(); + ErrorCode = SocketError.OperationAborted; } protected abstract void Abort(); @@ -168,6 +206,16 @@ public bool Wait(int timeout) protected abstract bool DoTryComplete(SocketAsyncContext context); protected abstract void InvokeCallback(); + + public void Trace(string message, [CallerMemberName] string memberName = null) + { + OutputTrace($"{IdOf(this)}.{memberName}: {message}"); + } + + public void TraceWithContext(SocketAsyncContext context, string message, [CallerMemberName] string memberName = null) + { + OutputTrace($"{IdOf(context)}, {IdOf(this)}.{memberName}: {message}"); + } } // These two abstract classes differentiate the operations that go in the @@ -370,118 +418,402 @@ protected override bool DoTryComplete(SocketAsyncContext context) => SocketPal.TryCompleteSendFile(context._socket, FileHandle, ref Offset, ref Count, ref BytesTransferred, out ErrorCode); } - private enum QueueState + // In debug builds, this struct guards against: + // (1) Unexpected lock reentrancy, which should never happen + // (2) Deadlock, by setting a reasonably large timeout + private struct LockToken : IDisposable { - Clear = 0, - Set = 1, - Stopped = 2, + private object _lockObject; + + public LockToken(object lockObject) + { + Debug.Assert(lockObject != null); + + _lockObject = lockObject; + + Debug.Assert(!Monitor.IsEntered(_lockObject)); + +#if DEBUG + bool success = Monitor.TryEnter(_lockObject, 10000); + Debug.Assert(success); +#else + Monitor.Enter(_lockObject); +#endif + } + + public void Dispose() + { + Debug.Assert(Monitor.IsEntered(_lockObject)); + Monitor.Exit(_lockObject); + } } private struct OperationQueue where TOperation : AsyncOperation { + // Quick overview: + // + // When attempting to perform an IO operation, the caller first checks IsReady, + // and if true, attempts to perform the operation itself. + // If this returns EWOULDBLOCK, or if the queue was not ready, then the operation + // is enqueued by calling StartAsyncOperation and the state becomes Waiting. + // When an epoll notification is received, we check if the state is Waiting, + // and if so, change the state to Processing and enqueue a workitem to the threadpool + // to try to perform the enqueued operations. + // If an operation is successfully performed, we remove it from the queue, + // enqueue another threadpool workitem to process the next item in the queue (if any), + // and call the user's completion callback. + // If we successfully process all enqueued operations, then the state becomes Ready; + // otherwise, the state becomes Waiting and we wait for another epoll notification. + + private enum QueueState + { + Ready = 0, // Indicates that data MAY be available on the socket. + // Queue must be empty. + Waiting = 1, // Indicates that data is definitely not available on the socket. + // Queue must not be empty. + Processing = 2, // Indicates that a thread pool item has been scheduled (and may + // be executing) to process the IO operations in the queue. + // Queue must not be empty. + Stopped = 3, // Indicates that the queue has been stopped because the + // socket has been closed. + // Queue must be empty. + } + + // These fields define the queue state. + + private QueueState _state; // See above + private int _sequenceNumber; // This sequence number is updated when we receive an epoll notification. + // It allows us to detect when a new epoll notification has arrived + // since the last time we checked the state of the queue. + // If this happens, we MUST retry the operation, otherwise we risk + // "losing" the notification and causing the operation to pend indefinitely. + private AsyncOperation _tail; // Queue of pending IO operations to process when data becomes available. + + // The _queueLock is used to ensure atomic access to the queue state above. + // The lock is only ever held briefly, to read and/or update queue state, and + // never around any external call, e.g. OS call or user code invocation. private object _queueLock; - private AsyncOperation _tail; - public QueueState State { get; set; } - public bool IsStopped { get { return State == QueueState.Stopped; } } - public bool IsEmpty { get { return _tail == null; } } - public object QueueLock { get { return _queueLock; } } + private LockToken Lock() => new LockToken(_queueLock); + + private static WaitCallback s_processingCallback = + typeof(TOperation) == typeof(ReadOperation) ? ((o) => { var context = ((SocketAsyncContext)o); context._receiveQueue.ProcessQueue(context); }) : + typeof(TOperation) == typeof(WriteOperation) ? ((o) => { var context = ((SocketAsyncContext)o); context._sendQueue.ProcessQueue(context); }) : + (WaitCallback)null; public void Init() { Debug.Assert(_queueLock == null); _queueLock = new object(); + + _state = QueueState.Ready; + _sequenceNumber = 0; } - public void Enqueue(TOperation operation) + // IsReady returns the current _sequenceNumber, which must be passed to StartAsyncOperation below. + public bool IsReady(SocketAsyncContext context, out int observedSequenceNumber) { - Debug.Assert(!IsStopped, "Expected !IsStopped"); - Debug.Assert(operation.Next == operation, "Expected operation.Next == operation"); - - if (!IsEmpty) + using (Lock()) { - operation.Next = _tail.Next; - _tail.Next = operation; - } + observedSequenceNumber = _sequenceNumber; + bool isReady = (_state == QueueState.Ready); - _tail = operation; + if (TraceEnabled) Trace(context, $"{isReady}"); + + return isReady; + } } - private bool TryDequeue(out TOperation operation) + // Return true for pending, false for completed synchronously (including failure and abort) + public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation, int observedSequenceNumber) { - if (_tail == null) + if (TraceEnabled) Trace(context, $"Enter"); + + if (!context._registered) { - operation = null; - return false; + context.Register(); } - AsyncOperation head = _tail.Next; - if (head == _tail) + while (true) { - _tail = null; + bool doAbort = false; + using (Lock()) + { + switch (_state) + { + case QueueState.Ready: + if (observedSequenceNumber != _sequenceNumber) + { + // The queue has become ready again since we previously checked it. + // So, we need to retry the operation before we enqueue it. + Debug.Assert(observedSequenceNumber - _sequenceNumber < 10000, "Very large sequence number increase???"); + observedSequenceNumber = _sequenceNumber; + break; + } + + // Caller tried the operation and got an EWOULDBLOCK, so we need to transition. + _state = QueueState.Waiting; + goto case QueueState.Waiting; + + case QueueState.Waiting: + case QueueState.Processing: + // Enqueue the operation. + Debug.Assert(operation.Next == operation, "Expected operation.Next == operation"); + + if (_tail != null) + { + operation.Next = _tail.Next; + _tail.Next = operation; + } + + _tail = operation; + + if (TraceEnabled) Trace(context, $"Leave, enqueued {IdOf(operation)}"); + return true; + + case QueueState.Stopped: + Debug.Assert(_tail == null); + doAbort = true; + break; + + default: + Environment.FailFast("unexpected queue state"); + break; + } + } + + if (doAbort) + { + operation.DoAbort(); + if (TraceEnabled) Trace(context, $"Leave, queue stopped"); + return false; + } + + // Retry the operation. + if (operation.TryComplete(context)) + { + if (TraceEnabled) Trace(context, $"Leave, retry succeeded"); + return false; + } } - else + } + + // Called on the epoll thread whenever we receive an epoll notification. + public void HandleEvent(SocketAsyncContext context) + { + using (Lock()) { - _tail.Next = head.Next; + if (TraceEnabled) Trace(context, $"Enter"); + + switch (_state) + { + case QueueState.Ready: + Debug.Assert(_tail == null, "State == Ready but queue is not empty!"); + _sequenceNumber++; + if (TraceEnabled) Trace(context, $"Exit (previously ready)"); + return; + + case QueueState.Waiting: + Debug.Assert(_tail != null, "State == Waiting but queue is empty!"); + _state = QueueState.Processing; + // Break out and release lock + break; + + case QueueState.Processing: + Debug.Assert(_tail != null, "State == Processing but queue is empty!"); + _sequenceNumber++; + if (TraceEnabled) Trace(context, $"Exit (currently processing)"); + return; + + case QueueState.Stopped: + Debug.Assert(_tail == null); + if (TraceEnabled) Trace(context, $"Exit (stopped)"); + return; + + default: + Environment.FailFast("unexpected queue state"); + return; + } } - head.Next = null; - operation = (TOperation)head; - return true; + // We just transitioned from Waiting to Processing. + // Spawn a work item to do the actual processing. + ThreadPool.QueueUserWorkItem(s_processingCallback, context); } - private void Requeue(TOperation operation) + // Called on the threadpool when data may be available. + public void ProcessQueue(SocketAsyncContext context) { - // Insert at the head of the queue - Debug.Assert(!IsStopped, "Expected !IsStopped"); - Debug.Assert(operation.Next == null, "Operation already in queue"); + int observedSequenceNumber; + AsyncOperation op; + using (Lock()) + { + if (TraceEnabled) Trace(context, $"Enter"); + + if (_state == QueueState.Stopped) + { + Debug.Assert(_tail == null); + if (TraceEnabled) Trace(context, $"Exit (stopped)"); + return; + } + else + { + Debug.Assert(_state == QueueState.Processing, $"_state={_state} while processing queue!"); + Debug.Assert(_tail != null, "Unexpected empty queue while processing I/O"); + observedSequenceNumber = _sequenceNumber; + op = _tail.Next; // head of queue + } + } - if (IsEmpty) + bool needCallback = false; + AsyncOperation nextOp; + while (true) + { + bool wasCompleted = false; + bool wasCancelled = !op.TrySetRunning(); + if (!wasCancelled) + { + // Try to perform the IO + wasCompleted = op.TryComplete(context); + if (wasCompleted) + { + needCallback = op.SetComplete(); + } + else + { + op.SetWaiting(); + } + } + + nextOp = null; + if (wasCompleted || wasCancelled) + { + // Remove the op from the queue and see if there's more to process. + + using (Lock()) + { + if (_state == QueueState.Stopped) + { + Debug.Assert(_tail == null); + if (TraceEnabled) Trace(context, $"Exit (stopped)"); + } + else + { + Debug.Assert(_state == QueueState.Processing, $"_state={_state} while processing queue!"); + + if (op == _tail) + { + // No more operations to process + _tail = null; + _state = QueueState.Ready; + _sequenceNumber++; + if (TraceEnabled) Trace(context, $"Exit (finished queue)"); + } + else + { + // Pop current operation and advance to next + nextOp = _tail.Next = op.Next; + } + } + } + } + else + { + // Check for retry and reset queue state. + + using (Lock()) + { + if (_state == QueueState.Stopped) + { + Debug.Assert(_tail == null); + if (TraceEnabled) Trace(context, $"Exit (stopped)"); + } + else + { + Debug.Assert(_state == QueueState.Processing, $"_state={_state} while processing queue!"); + + if (observedSequenceNumber != _sequenceNumber) + { + // We received another epoll notification since we previously checked it. + // So, we need to retry the operation. + Debug.Assert(observedSequenceNumber - _sequenceNumber < 10000, "Very large sequence number increase???"); + observedSequenceNumber = _sequenceNumber; + nextOp = op; + } + else + { + _state = QueueState.Waiting; + if (TraceEnabled) Trace(context, $"Exit (received EAGAIN)"); + } + } + } + } + + if (needCallback || nextOp == null) + { + break; + } + + op = nextOp; + } + + if (needCallback) { - operation.Next = operation; - _tail = operation; + if (nextOp != null) + { + Debug.Assert(_state == QueueState.Processing); + + // Spawn a new work item to continue processing the queue. + ThreadPool.QueueUserWorkItem(s_processingCallback, context); + } + + op.DoCallback(); } else { - operation.Next = _tail.Next; - _tail.Next = operation; + Debug.Assert(nextOp == null); } } - public void Complete(SocketAsyncContext context) + // Called when the socket is closed. + public void StopAndAbort(SocketAsyncContext context) { - lock (_queueLock) + // We should be called exactly once, by SafeCloseSocket. + Debug.Assert(_state != QueueState.Stopped); + + using (Lock()) { - if (IsStopped) - return; + if (TraceEnabled) Trace(context, $"Enter"); + + Debug.Assert(_state != QueueState.Stopped); - State = QueueState.Set; + _state = QueueState.Stopped; - TOperation op; - while (TryDequeue(out op)) + if (_tail != null) { - if (!op.TryCompleteAsync(context)) + AsyncOperation op = _tail; + do { - Requeue(op); - return; - } + op.TryCancel(); + op = op.Next; + } while (op != _tail); } + + _tail = null; + + if (TraceEnabled) Trace(context, $"Exit"); } } - public void StopAndAbort() + public void Trace(SocketAsyncContext context, string message, [CallerMemberName] string memberName = null) { - lock (_queueLock) - { - State = QueueState.Stopped; + string queueType = + typeof(TOperation) == typeof(ReadOperation) ? "recv" : + typeof(TOperation) == typeof(WriteOperation) ? "send" : + "???"; - TOperation op; - while (TryDequeue(out op)) - { - op.AbortAsync(); - } - } + OutputTrace($"{IdOf(context)}-{queueType}.{memberName}: {message}, {_state}-{_sequenceNumber}, {((_tail == null) ? "empty" : "not empty")}"); } } @@ -489,7 +821,7 @@ public void StopAndAbort() private OperationQueue _receiveQueue; private OperationQueue _sendQueue; private SocketAsyncEngine.Token _asyncEngineToken; - private Interop.Sys.SocketEvents _registeredEvents; + private bool _registered; private bool _nonBlockingSet; private readonly object _registerLock = new object(); @@ -502,41 +834,43 @@ public SocketAsyncContext(SafeCloseSocket socket) _sendQueue.Init(); } - private void Register(Interop.Sys.SocketEvents events) + private void Register() { + Debug.Assert(_nonBlockingSet); lock (_registerLock) { - Debug.Assert((_registeredEvents & events) == Interop.Sys.SocketEvents.None, $"Unexpected values: _registeredEvents={_registeredEvents}, events={events}"); - - if (!_asyncEngineToken.WasAllocated) + if (!_registered) { - _asyncEngineToken = new SocketAsyncEngine.Token(this); - } - - events |= _registeredEvents; + Debug.Assert(!_asyncEngineToken.WasAllocated); + var token = new SocketAsyncEngine.Token(this); - Interop.Error errorCode; - if (!_asyncEngineToken.TryRegister(_socket, _registeredEvents, events, out errorCode)) - { - if (errorCode == Interop.Error.ENOMEM || errorCode == Interop.Error.ENOSPC) - { - throw new OutOfMemoryException(); - } - else + Interop.Error errorCode; + if (!token.TryRegister(_socket, out errorCode)) { - throw new InternalException(); + token.Free(); + if (errorCode == Interop.Error.ENOMEM || errorCode == Interop.Error.ENOSPC) + { + throw new OutOfMemoryException(); + } + else + { + throw new InternalException(); + } } - } - _registeredEvents = events; + _asyncEngineToken = token; + _registered = true; + + if (TraceEnabled) Trace("Registered"); + } } } public void Close() { // Drain queues - _sendQueue.StopAndAbort(); - _receiveQueue.StopAndAbort(); + _sendQueue.StopAndAbort(this); + _receiveQueue.StopAndAbort(this); lock (_registerLock) { @@ -568,39 +902,31 @@ public void SetNonBlocking() } } - private bool TryBeginOperation(ref OperationQueue queue, TOperation operation, Interop.Sys.SocketEvents events, bool maintainOrder, out bool isStopped) + private void PerformSyncOperation(ref OperationQueue queue, TOperation operation, int timeout, int observedSequenceNumber) where TOperation : AsyncOperation { - // Exactly one of the two queue locks must be held by the caller - Debug.Assert(Monitor.IsEntered(_sendQueue.QueueLock) ^ Monitor.IsEntered(_receiveQueue.QueueLock)); - - switch (queue.State) + using (var e = new ManualResetEventSlim(false, 0)) { - case QueueState.Stopped: - isStopped = true; - return false; + operation.Event = e; - case QueueState.Clear: - break; + if (!queue.StartAsyncOperation(this, operation, observedSequenceNumber)) + { + // Completed synchronously + return; + } - case QueueState.Set: - if (queue.IsEmpty || !maintainOrder) - { - isStopped = false; - queue.State = QueueState.Clear; - return false; - } - break; - } + if (e.Wait(timeout)) + { + // Completed within timeout + return; + } - if ((_registeredEvents & events) == Interop.Sys.SocketEvents.None) - { - Register(events); + bool cancelled = operation.TryCancel(); + if (cancelled) + { + operation.ErrorCode = SocketError.TimedOut; + } } - - queue.Enqueue(operation); - isStopped = false; - return true; } public SocketError Accept(byte[] socketAddress, ref int socketAddressLen, int timeout, out IntPtr acceptedFd) @@ -610,55 +936,25 @@ public SocketError Accept(byte[] socketAddress, ref int socketAddressLen, int ti Debug.Assert(timeout == -1 || timeout > 0, $"Unexpected timeout: {timeout}"); SocketError errorCode; - if (SocketPal.TryCompleteAccept(_socket, socketAddress, ref socketAddressLen, out acceptedFd, out errorCode)) + int observedSequenceNumber; + if (_receiveQueue.IsReady(this, out observedSequenceNumber) && + SocketPal.TryCompleteAccept(_socket, socketAddress, ref socketAddressLen, out acceptedFd, out errorCode)) { Debug.Assert(errorCode == SocketError.Success || acceptedFd == (IntPtr)(-1), $"Unexpected values: errorCode={errorCode}, acceptedFd={acceptedFd}"); return errorCode; } - using (var @event = new ManualResetEventSlim(false, 0)) + var operation = new AcceptOperation { - var operation = new AcceptOperation { - Event = @event, - SocketAddress = socketAddress, - SocketAddressLen = socketAddressLen - }; - - bool isStopped; - while (true) - { - lock (_receiveQueue.QueueLock) - { - if (TryBeginOperation(ref _receiveQueue, operation, Interop.Sys.SocketEvents.Read, maintainOrder: false, isStopped: out isStopped)) - { - break; - } - } - - if (isStopped) - { - acceptedFd = (IntPtr)(-1); - return SocketError.Interrupted; - } - - if (operation.TryComplete(this)) - { - socketAddressLen = operation.SocketAddressLen; - acceptedFd = operation.AcceptedFileDescriptor; - return operation.ErrorCode; - } - } + SocketAddress = socketAddress, + SocketAddressLen = socketAddressLen + }; - if (!operation.Wait(timeout)) - { - acceptedFd = (IntPtr)(-1); - return SocketError.TimedOut; - } + PerformSyncOperation(ref _receiveQueue, operation, timeout, observedSequenceNumber); - socketAddressLen = operation.SocketAddressLen; - acceptedFd = operation.AcceptedFileDescriptor; - return operation.ErrorCode; - } + socketAddressLen = operation.SocketAddressLen; + acceptedFd = operation.AcceptedFileDescriptor; + return operation.ErrorCode; } public SocketError AcceptAsync(byte[] socketAddress, ref int socketAddressLen, out IntPtr acceptedFd, Action callback) @@ -670,7 +966,9 @@ public SocketError AcceptAsync(byte[] socketAddress, ref int socketAddressLen, o SetNonBlocking(); SocketError errorCode; - if (SocketPal.TryCompleteAccept(_socket, socketAddress, ref socketAddressLen, out acceptedFd, out errorCode)) + int observedSequenceNumber; + if (_receiveQueue.IsReady(this, out observedSequenceNumber) && + SocketPal.TryCompleteAccept(_socket, socketAddress, ref socketAddressLen, out acceptedFd, out errorCode)) { Debug.Assert(errorCode == SocketError.Success || acceptedFd == (IntPtr)(-1), $"Unexpected values: errorCode={errorCode}, acceptedFd={acceptedFd}"); @@ -683,29 +981,14 @@ public SocketError AcceptAsync(byte[] socketAddress, ref int socketAddressLen, o SocketAddressLen = socketAddressLen }; - bool isStopped; - while (true) + if (!_receiveQueue.StartAsyncOperation(this, operation, observedSequenceNumber)) { - lock (_receiveQueue.QueueLock) - { - if (TryBeginOperation(ref _receiveQueue, operation, Interop.Sys.SocketEvents.Read, maintainOrder: false, isStopped: out isStopped)) - { - break; - } - } - - if (isStopped) - { - return SocketError.OperationAborted; - } - - if (operation.TryComplete(this)) - { - socketAddressLen = operation.SocketAddressLen; - acceptedFd = operation.AcceptedFileDescriptor; - return operation.ErrorCode; - } + socketAddressLen = operation.SocketAddressLen; + acceptedFd = operation.AcceptedFileDescriptor; + return operation.ErrorCode; } + + acceptedFd = (IntPtr)(-1); return SocketError.IOPending; } @@ -715,45 +998,27 @@ public SocketError Connect(byte[] socketAddress, int socketAddressLen, int timeo Debug.Assert(socketAddressLen > 0, $"Unexpected socketAddressLen: {socketAddressLen}"); Debug.Assert(timeout == -1 || timeout > 0, $"Unexpected timeout: {timeout}"); + // Connect is different than the usual "readiness" pattern of other operations. + // We need to initiate the connect before we try to complete it. + // Thus, always call TryStartConnect regardless of readiness. SocketError errorCode; + int observedSequenceNumber; + _sendQueue.IsReady(this, out observedSequenceNumber); if (SocketPal.TryStartConnect(_socket, socketAddress, socketAddressLen, out errorCode)) { _socket.RegisterConnectResult(errorCode); return errorCode; } - using (var @event = new ManualResetEventSlim(false, 0)) + var operation = new ConnectOperation { - var operation = new ConnectOperation { - Event = @event, - SocketAddress = socketAddress, - SocketAddressLen = socketAddressLen - }; + SocketAddress = socketAddress, + SocketAddressLen = socketAddressLen + }; - bool isStopped; - while (true) - { - lock (_sendQueue.QueueLock) - { - if (TryBeginOperation(ref _sendQueue, operation, Interop.Sys.SocketEvents.Write, maintainOrder: false, isStopped: out isStopped)) - { - break; - } - } - - if (isStopped) - { - return SocketError.Interrupted; - } + PerformSyncOperation(ref _sendQueue, operation, timeout, observedSequenceNumber); - if (operation.TryComplete(this)) - { - return operation.ErrorCode; - } - } - - return operation.Wait(timeout) ? operation.ErrorCode : SocketError.TimedOut; - } + return operation.ErrorCode; } public SocketError ConnectAsync(byte[] socketAddress, int socketAddressLen, Action callback) @@ -764,11 +1029,15 @@ public SocketError ConnectAsync(byte[] socketAddress, int socketAddressLen, Acti SetNonBlocking(); + // Connect is different than the usual "readiness" pattern of other operations. + // We need to initiate the connect before we try to complete it. + // Thus, always call TryStartConnect regardless of readiness. SocketError errorCode; + int observedSequenceNumber; + _sendQueue.IsReady(this, out observedSequenceNumber); if (SocketPal.TryStartConnect(_socket, socketAddress, socketAddressLen, out errorCode)) { _socket.RegisterConnectResult(errorCode); - return errorCode; } @@ -778,27 +1047,11 @@ public SocketError ConnectAsync(byte[] socketAddress, int socketAddressLen, Acti SocketAddressLen = socketAddressLen }; - bool isStopped; - while (true) + if (!_sendQueue.StartAsyncOperation(this, operation, observedSequenceNumber)) { - lock (_sendQueue.QueueLock) - { - if (TryBeginOperation(ref _sendQueue, operation, Interop.Sys.SocketEvents.Write, maintainOrder: false, isStopped: out isStopped)) - { - break; - } - } - - if (isStopped) - { - return SocketError.OperationAborted; - } - - if (operation.TryComplete(this)) - { - return operation.ErrorCode; - } + return operation.ErrorCode; } + return SocketError.IOPending; } @@ -824,184 +1077,97 @@ public SocketError ReceiveFrom(byte[] buffer, int offset, int count, ref SocketF { Debug.Assert(timeout == -1 || timeout > 0, $"Unexpected timeout: {timeout}"); - ManualResetEventSlim @event = null; - try + SocketFlags receivedFlags; + SocketError errorCode; + int observedSequenceNumber; + if (_receiveQueue.IsReady(this, out observedSequenceNumber) && + SocketPal.TryCompleteReceiveFrom(_socket, buffer, offset, count, flags, socketAddress, ref socketAddressLen, out bytesReceived, out receivedFlags, out errorCode)) { - ReceiveOperation operation; - lock (_receiveQueue.QueueLock) - { - SocketFlags receivedFlags; - SocketError errorCode; - - if (_receiveQueue.IsEmpty && - SocketPal.TryCompleteReceiveFrom(_socket, buffer, offset, count, flags, socketAddress, ref socketAddressLen, out bytesReceived, out receivedFlags, out errorCode)) - { - flags = receivedFlags; - return errorCode; - } + flags = receivedFlags; + return errorCode; + } - @event = new ManualResetEventSlim(false, 0); + var operation = new BufferArrayReceiveOperation + { + Buffer = buffer, + Offset = offset, + Count = count, + Flags = flags, + SocketAddress = socketAddress, + SocketAddressLen = socketAddressLen, + }; - operation = new BufferArrayReceiveOperation - { - Event = @event, - Buffer = buffer, - Offset = offset, - Count = count, - Flags = flags, - SocketAddress = socketAddress, - SocketAddressLen = socketAddressLen, - }; - - bool isStopped; - while (!TryBeginOperation(ref _receiveQueue, operation, Interop.Sys.SocketEvents.Read, maintainOrder: true, isStopped: out isStopped)) - { - if (isStopped) - { - flags = operation.ReceivedFlags; - bytesReceived = operation.BytesTransferred; - return SocketError.Interrupted; - } + PerformSyncOperation(ref _receiveQueue, operation, timeout, observedSequenceNumber); - if (operation.TryComplete(this)) - { - socketAddressLen = operation.SocketAddressLen; - flags = operation.ReceivedFlags; - bytesReceived = operation.BytesTransferred; - return operation.ErrorCode; - } - } - } - - bool signaled = operation.Wait(timeout); - socketAddressLen = operation.SocketAddressLen; - flags = operation.ReceivedFlags; - bytesReceived = operation.BytesTransferred; - return signaled ? operation.ErrorCode : SocketError.TimedOut; - } - finally - { - @event?.Dispose(); - } + flags = operation.ReceivedFlags; + bytesReceived = operation.BytesTransferred; + return operation.ErrorCode; } public unsafe SocketError ReceiveFrom(Span buffer, ref SocketFlags flags, byte[] socketAddress, ref int socketAddressLen, int timeout, out int bytesReceived) { - Debug.Assert(timeout == -1 || timeout > 0, $"Unexpected timeout: {timeout}"); + SocketFlags receivedFlags; + SocketError errorCode; + int observedSequenceNumber; + if (_receiveQueue.IsReady(this, out observedSequenceNumber) && + SocketPal.TryCompleteReceiveFrom(_socket, buffer, flags, socketAddress, ref socketAddressLen, out bytesReceived, out receivedFlags, out errorCode)) + { + flags = receivedFlags; + return errorCode; + } fixed (byte* bufferPtr = &buffer.DangerousGetPinnableReference()) { - ManualResetEventSlim @event = null; - try + var operation = new BufferPtrReceiveOperation { - ReceiveOperation operation; - lock (_receiveQueue.QueueLock) - { - SocketFlags receivedFlags; - SocketError errorCode; - - if (_receiveQueue.IsEmpty && - SocketPal.TryCompleteReceiveFrom(_socket, buffer, flags, socketAddress, ref socketAddressLen, out bytesReceived, out receivedFlags, out errorCode)) - { - flags = receivedFlags; - return errorCode; - } - - @event = new ManualResetEventSlim(false, 0); - - operation = new BufferPtrReceiveOperation - { - Event = @event, - BufferPtr = bufferPtr, - Length = buffer.Length, - Flags = flags, - SocketAddress = socketAddress, - SocketAddressLen = socketAddressLen, - }; - - bool isStopped; - while (!TryBeginOperation(ref _receiveQueue, operation, Interop.Sys.SocketEvents.Read, maintainOrder: true, isStopped: out isStopped)) - { - if (isStopped) - { - flags = operation.ReceivedFlags; - bytesReceived = operation.BytesTransferred; - return SocketError.Interrupted; - } + BufferPtr = bufferPtr, + Length = buffer.Length, + Flags = flags, + SocketAddress = socketAddress, + SocketAddressLen = socketAddressLen, + }; - if (operation.TryComplete(this)) - { - socketAddressLen = operation.SocketAddressLen; - flags = operation.ReceivedFlags; - bytesReceived = operation.BytesTransferred; - return operation.ErrorCode; - } - } - } + PerformSyncOperation(ref _receiveQueue, operation, timeout, observedSequenceNumber); - bool signaled = operation.Wait(timeout); - socketAddressLen = operation.SocketAddressLen; - flags = operation.ReceivedFlags; - bytesReceived = operation.BytesTransferred; - return signaled ? operation.ErrorCode : SocketError.TimedOut; - } - finally - { - @event?.Dispose(); - } + flags = operation.ReceivedFlags; + bytesReceived = operation.BytesTransferred; + return operation.ErrorCode; } } - public SocketError ReceiveFromAsync(byte[] buffer, int offset, int count, SocketFlags flags, byte[] socketAddress, ref int socketAddressLen, out int bytesReceived, out SocketFlags receivedFlags, Action callback) { SetNonBlocking(); - lock (_receiveQueue.QueueLock) + SocketError errorCode; + int observedSequenceNumber; + if (_receiveQueue.IsReady(this, out observedSequenceNumber) && + SocketPal.TryCompleteReceiveFrom(_socket, buffer, offset, count, flags, socketAddress, ref socketAddressLen, out bytesReceived, out receivedFlags, out errorCode)) { - SocketError errorCode; - - if (_receiveQueue.IsEmpty && - SocketPal.TryCompleteReceiveFrom(_socket, buffer, offset, count, flags, socketAddress, ref socketAddressLen, out bytesReceived, out receivedFlags, out errorCode)) - { - // Synchronous success or failure - return errorCode; - } - - var operation = new BufferArrayReceiveOperation - { - Callback = callback, - Buffer = buffer, - Offset = offset, - Count = count, - Flags = flags, - SocketAddress = socketAddress, - SocketAddressLen = socketAddressLen, - }; - - bool isStopped; - while (!TryBeginOperation(ref _receiveQueue, operation, Interop.Sys.SocketEvents.Read, maintainOrder: true, isStopped: out isStopped)) - { - if (isStopped) - { - bytesReceived = 0; - receivedFlags = SocketFlags.None; - return SocketError.OperationAborted; - } + return errorCode; + } - if (operation.TryComplete(this)) - { - socketAddressLen = operation.SocketAddressLen; - receivedFlags = operation.ReceivedFlags; - bytesReceived = operation.BytesTransferred; - return operation.ErrorCode; - } - } + var operation = new BufferArrayReceiveOperation + { + Callback = callback, + Buffer = buffer, + Offset = offset, + Count = count, + Flags = flags, + SocketAddress = socketAddress, + SocketAddressLen = socketAddressLen, + }; - bytesReceived = 0; - receivedFlags = SocketFlags.None; - return SocketError.IOPending; + if (!_receiveQueue.StartAsyncOperation(this, operation, observedSequenceNumber)) + { + receivedFlags = operation.ReceivedFlags; + bytesReceived = operation.BytesTransferred; + return operation.ErrorCode; } + + bytesReceived = 0; + receivedFlags = SocketFlags.None; + return SocketError.IOPending; } public SocketError Receive(IList> buffers, ref SocketFlags flags, int timeout, out int bytesReceived) @@ -1019,114 +1185,65 @@ public SocketError ReceiveFrom(IList> buffers, ref SocketFlag { Debug.Assert(timeout == -1 || timeout > 0, $"Unexpected timeout: {timeout}"); - ManualResetEventSlim @event = null; - try + SocketFlags receivedFlags; + SocketError errorCode; + int observedSequenceNumber; + if (_receiveQueue.IsReady(this, out observedSequenceNumber) && + SocketPal.TryCompleteReceiveFrom(_socket, buffers, flags, socketAddress, ref socketAddressLen, out bytesReceived, out receivedFlags, out errorCode)) { - ReceiveOperation operation; - - lock (_receiveQueue.QueueLock) - { - SocketFlags receivedFlags; - SocketError errorCode; - if (_receiveQueue.IsEmpty && - SocketPal.TryCompleteReceiveFrom(_socket, buffers, flags, socketAddress, ref socketAddressLen, out bytesReceived, out receivedFlags, out errorCode)) - { - flags = receivedFlags; - return errorCode; - } - - @event = new ManualResetEventSlim(false, 0); - - operation = new BufferListReceiveOperation - { - Event = @event, - Buffers = buffers, - Flags = flags, - SocketAddress = socketAddress, - SocketAddressLen = socketAddressLen, - }; - - bool isStopped; - while (!TryBeginOperation(ref _receiveQueue, operation, Interop.Sys.SocketEvents.Read, maintainOrder: true, isStopped: out isStopped)) - { - if (isStopped) - { - flags = operation.ReceivedFlags; - bytesReceived = operation.BytesTransferred; - return SocketError.Interrupted; - } + flags = receivedFlags; + return errorCode; + } - if (operation.TryComplete(this)) - { - socketAddressLen = operation.SocketAddressLen; - flags = operation.ReceivedFlags; - bytesReceived = operation.BytesTransferred; - return operation.ErrorCode; - } - } + var operation = new BufferListReceiveOperation + { + Buffers = buffers, + Flags = flags, + SocketAddress = socketAddress, + SocketAddressLen = socketAddressLen, + }; - } + PerformSyncOperation(ref _receiveQueue, operation, timeout, observedSequenceNumber); - bool signaled = operation.Wait(timeout); - socketAddressLen = operation.SocketAddressLen; - flags = operation.ReceivedFlags; - bytesReceived = operation.BytesTransferred; - return signaled ? operation.ErrorCode : SocketError.TimedOut; - } - finally - { - @event?.Dispose(); - } + socketAddressLen = operation.SocketAddressLen; + flags = operation.ReceivedFlags; + bytesReceived = operation.BytesTransferred; + return operation.ErrorCode; } public SocketError ReceiveFromAsync(IList> buffers, SocketFlags flags, byte[] socketAddress, ref int socketAddressLen, out int bytesReceived, out SocketFlags receivedFlags, Action callback) { SetNonBlocking(); - ReceiveOperation operation; - - lock (_receiveQueue.QueueLock) + SocketError errorCode; + int observedSequenceNumber; + if (_receiveQueue.IsReady(this, out observedSequenceNumber) && + SocketPal.TryCompleteReceiveFrom(_socket, buffers, flags, socketAddress, ref socketAddressLen, out bytesReceived, out receivedFlags, out errorCode)) { - SocketError errorCode; - if (_receiveQueue.IsEmpty && - SocketPal.TryCompleteReceiveFrom(_socket, buffers, flags, socketAddress, ref socketAddressLen, out bytesReceived, out receivedFlags, out errorCode)) - { - // Synchronous success or failure - return errorCode; - } - - operation = new BufferListReceiveOperation - { - Callback = callback, - Buffers = buffers, - Flags = flags, - SocketAddress = socketAddress, - SocketAddressLen = socketAddressLen, - }; - - bool isStopped; - while (!TryBeginOperation(ref _receiveQueue, operation, Interop.Sys.SocketEvents.Read, maintainOrder: true, isStopped: out isStopped)) - { - if (isStopped) - { - bytesReceived = 0; - receivedFlags = SocketFlags.None; - return SocketError.OperationAborted; - } + // Synchronous success or failure + return errorCode; + } - if (operation.TryComplete(this)) - { - socketAddressLen = operation.SocketAddressLen; - receivedFlags = operation.ReceivedFlags; - bytesReceived = operation.BytesTransferred; - return operation.ErrorCode; - } - } + var operation = new BufferListReceiveOperation + { + Callback = callback, + Buffers = buffers, + Flags = flags, + SocketAddress = socketAddress, + SocketAddressLen = socketAddressLen, + }; - bytesReceived = 0; - receivedFlags = SocketFlags.None; - return SocketError.IOPending; + if (!_receiveQueue.StartAsyncOperation(this, operation, observedSequenceNumber)) + { + socketAddressLen = operation.SocketAddressLen; + receivedFlags = operation.ReceivedFlags; + bytesReceived = operation.BytesTransferred; + return operation.ErrorCode; } + + receivedFlags = SocketFlags.None; + bytesReceived = 0; + return SocketError.IOPending; } public SocketError ReceiveMessageFrom( @@ -1134,129 +1251,77 @@ public SocketError ReceiveMessageFrom( { Debug.Assert(timeout == -1 || timeout > 0, $"Unexpected timeout: {timeout}"); - ManualResetEventSlim @event = null; - try + SocketFlags receivedFlags; + SocketError errorCode; + int observedSequenceNumber; + if (_receiveQueue.IsReady(this, out observedSequenceNumber) && + SocketPal.TryCompleteReceiveMessageFrom(_socket, buffer, buffers, offset, count, flags, socketAddress, ref socketAddressLen, isIPv4, isIPv6, out bytesReceived, out receivedFlags, out ipPacketInformation, out errorCode)) { - ReceiveMessageFromOperation operation; - - lock (_receiveQueue.QueueLock) - { - SocketFlags receivedFlags; - SocketError errorCode; - if (_receiveQueue.IsEmpty && - SocketPal.TryCompleteReceiveMessageFrom(_socket, buffer, buffers, offset, count, flags, socketAddress, ref socketAddressLen, isIPv4, isIPv6, out bytesReceived, out receivedFlags, out ipPacketInformation, out errorCode)) - { - flags = receivedFlags; - return errorCode; - } - - @event = new ManualResetEventSlim(false, 0); + flags = receivedFlags; + return errorCode; + } - operation = new ReceiveMessageFromOperation - { - Event = @event, - Buffer = buffer, - Buffers = buffers, - Offset = offset, - Count = count, - Flags = flags, - SocketAddress = socketAddress, - SocketAddressLen = socketAddressLen, - IsIPv4 = isIPv4, - IsIPv6 = isIPv6, - }; - - bool isStopped; - while (!TryBeginOperation(ref _receiveQueue, operation, Interop.Sys.SocketEvents.Read, maintainOrder: true, isStopped: out isStopped)) - { - if (isStopped) - { - socketAddressLen = operation.SocketAddressLen; - flags = operation.ReceivedFlags; - ipPacketInformation = operation.IPPacketInformation; - bytesReceived = operation.BytesTransferred; - return SocketError.Interrupted; - } + var operation = new ReceiveMessageFromOperation + { + Buffer = buffer, + Buffers = buffers, + Offset = offset, + Count = count, + Flags = flags, + SocketAddress = socketAddress, + SocketAddressLen = socketAddressLen, + IsIPv4 = isIPv4, + IsIPv6 = isIPv6, + }; - if (operation.TryComplete(this)) - { - socketAddressLen = operation.SocketAddressLen; - flags = operation.ReceivedFlags; - ipPacketInformation = operation.IPPacketInformation; - bytesReceived = operation.BytesTransferred; - return operation.ErrorCode; - } - } - } + PerformSyncOperation(ref _receiveQueue, operation, timeout, observedSequenceNumber); - bool signaled = operation.Wait(timeout); - socketAddressLen = operation.SocketAddressLen; - flags = operation.ReceivedFlags; - ipPacketInformation = operation.IPPacketInformation; - bytesReceived = operation.BytesTransferred; - return signaled ? operation.ErrorCode : SocketError.TimedOut; - } - finally - { - @event?.Dispose(); - } + socketAddressLen = operation.SocketAddressLen; + flags = operation.ReceivedFlags; + ipPacketInformation = operation.IPPacketInformation; + bytesReceived = operation.BytesTransferred; + return operation.ErrorCode; } public SocketError ReceiveMessageFromAsync(byte[] buffer, IList> buffers, int offset, int count, SocketFlags flags, byte[] socketAddress, ref int socketAddressLen, bool isIPv4, bool isIPv6, out int bytesReceived, out SocketFlags receivedFlags, out IPPacketInformation ipPacketInformation, Action callback) { SetNonBlocking(); - lock (_receiveQueue.QueueLock) + SocketError errorCode; + int observedSequenceNumber; + if (_receiveQueue.IsReady(this, out observedSequenceNumber) && + SocketPal.TryCompleteReceiveMessageFrom(_socket, buffer, buffers, offset, count, flags, socketAddress, ref socketAddressLen, isIPv4, isIPv6, out bytesReceived, out receivedFlags, out ipPacketInformation, out errorCode)) { - SocketError errorCode; - - if (_receiveQueue.IsEmpty && - SocketPal.TryCompleteReceiveMessageFrom(_socket, buffer, buffers, offset, count, flags, socketAddress, ref socketAddressLen, isIPv4, isIPv6, out bytesReceived, out receivedFlags, out ipPacketInformation, out errorCode)) - { - // Synchronous success or failure - return errorCode; - } - - var operation = new ReceiveMessageFromOperation - { - Callback = callback, - Buffer = buffer, - Buffers = buffers, - Offset = offset, - Count = count, - Flags = flags, - SocketAddress = socketAddress, - SocketAddressLen = socketAddressLen, - IsIPv4 = isIPv4, - IsIPv6 = isIPv6, - }; - - bool isStopped; - while (!TryBeginOperation(ref _receiveQueue, operation, Interop.Sys.SocketEvents.Read, maintainOrder: true, isStopped: out isStopped)) - { - if (isStopped) - { - ipPacketInformation = default(IPPacketInformation); - bytesReceived = 0; - receivedFlags = SocketFlags.None; - return SocketError.OperationAborted; - } + return errorCode; + } - if (operation.TryComplete(this)) - { - socketAddressLen = operation.SocketAddressLen; - receivedFlags = operation.ReceivedFlags; - ipPacketInformation = operation.IPPacketInformation; - bytesReceived = operation.BytesTransferred; - return operation.ErrorCode; - } - } + var operation = new ReceiveMessageFromOperation + { + Callback = callback, + Buffer = buffer, + Buffers = buffers, + Offset = offset, + Count = count, + Flags = flags, + SocketAddress = socketAddress, + SocketAddressLen = socketAddressLen, + IsIPv4 = isIPv4, + IsIPv6 = isIPv6, + }; - ipPacketInformation = default(IPPacketInformation); - bytesReceived = 0; - receivedFlags = SocketFlags.None; - return SocketError.IOPending; + if (!_receiveQueue.StartAsyncOperation(this, operation, observedSequenceNumber)) + { + socketAddressLen = operation.SocketAddressLen; + receivedFlags = operation.ReceivedFlags; + ipPacketInformation = operation.IPPacketInformation; + bytesReceived = operation.BytesTransferred; + return operation.ErrorCode; } + + ipPacketInformation = default(IPPacketInformation); + bytesReceived = 0; + receivedFlags = SocketFlags.None; + return SocketError.IOPending; } public SocketError Send(ReadOnlySpan buffer, SocketFlags flags, int timeout, out int bytesSent) => @@ -1277,125 +1342,63 @@ public SocketError SendTo(byte[] buffer, int offset, int count, SocketFlags flag { Debug.Assert(timeout == -1 || timeout > 0, $"Unexpected timeout: {timeout}"); - ManualResetEventSlim @event = null; - try + bytesSent = 0; + SocketError errorCode; + int observedSequenceNumber; + if (_sendQueue.IsReady(this, out observedSequenceNumber) && + SocketPal.TryCompleteSendTo(_socket, buffer, ref offset, ref count, flags, socketAddress, socketAddressLen, ref bytesSent, out errorCode)) { - BufferArraySendOperation operation; - - lock (_sendQueue.QueueLock) - { - bytesSent = 0; - SocketError errorCode; - - if (_sendQueue.IsEmpty && - SocketPal.TryCompleteSendTo(_socket, buffer, ref offset, ref count, flags, socketAddress, socketAddressLen, ref bytesSent, out errorCode)) - { - return errorCode; - } - - @event = new ManualResetEventSlim(false, 0); + return errorCode; + } - operation = new BufferArraySendOperation - { - Event = @event, - Buffer = buffer, - Offset = offset, - Count = count, - Flags = flags, - SocketAddress = socketAddress, - SocketAddressLen = socketAddressLen, - BytesTransferred = bytesSent - }; - - bool isStopped; - while (!TryBeginOperation(ref _sendQueue, operation, Interop.Sys.SocketEvents.Write, maintainOrder: true, isStopped: out isStopped)) - { - if (isStopped) - { - bytesSent = operation.BytesTransferred; - return SocketError.Interrupted; - } + var operation = new BufferArraySendOperation + { + Buffer = buffer, + Offset = offset, + Count = count, + Flags = flags, + SocketAddress = socketAddress, + SocketAddressLen = socketAddressLen, + BytesTransferred = bytesSent + }; - if (operation.TryComplete(this)) - { - bytesSent = operation.BytesTransferred; - return operation.ErrorCode; - } - } - } + PerformSyncOperation(ref _sendQueue, operation, timeout, observedSequenceNumber); - bool signaled = operation.Wait(timeout); - bytesSent = operation.BytesTransferred; - return signaled ? operation.ErrorCode : SocketError.TimedOut; - } - finally - { - @event?.Dispose(); - } + bytesSent = operation.BytesTransferred; + return operation.ErrorCode; } public unsafe SocketError SendTo(ReadOnlySpan buffer, SocketFlags flags, byte[] socketAddress, int socketAddressLen, int timeout, out int bytesSent) { Debug.Assert(timeout == -1 || timeout > 0, $"Unexpected timeout: {timeout}"); + bytesSent = 0; + SocketError errorCode; + int bufferIndexIgnored = 0, offset = 0, count = buffer.Length; + int observedSequenceNumber; + if (_sendQueue.IsReady(this, out observedSequenceNumber) && + SocketPal.TryCompleteSendTo(_socket, buffer, null, ref bufferIndexIgnored, ref offset, ref count, flags, socketAddress, socketAddressLen, ref bytesSent, out errorCode)) + { + return errorCode; + } + fixed (byte* bufferPtr = &buffer.DangerousGetPinnableReference()) { - ManualResetEventSlim @event = null; - try + var operation = new BufferPtrSendOperation { - BufferPtrSendOperation operation; - - lock (_sendQueue.QueueLock) - { - bytesSent = 0; - SocketError errorCode; - - int bufferIndexIgnored = 0, offset = 0, count = buffer.Length; - if (_sendQueue.IsEmpty && - SocketPal.TryCompleteSendTo(_socket, buffer, null, ref bufferIndexIgnored, ref offset, ref count, flags, socketAddress, socketAddressLen, ref bytesSent, out errorCode)) - { - return errorCode; - } - - @event = new ManualResetEventSlim(false, 0); - - operation = new BufferPtrSendOperation - { - Event = @event, - BufferPtr = bufferPtr, - Offset = offset, - Count = count, - Flags = flags, - SocketAddress = socketAddress, - SocketAddressLen = socketAddressLen, - BytesTransferred = bytesSent - }; - - bool isStopped; - while (!TryBeginOperation(ref _sendQueue, operation, Interop.Sys.SocketEvents.Write, maintainOrder: true, isStopped: out isStopped)) - { - if (isStopped) - { - bytesSent = operation.BytesTransferred; - return SocketError.Interrupted; - } + BufferPtr = bufferPtr, + Offset = offset, + Count = count, + Flags = flags, + SocketAddress = socketAddress, + SocketAddressLen = socketAddressLen, + BytesTransferred = bytesSent + }; - if (operation.TryComplete(this)) - { - bytesSent = operation.BytesTransferred; - return operation.ErrorCode; - } - } - } + PerformSyncOperation(ref _sendQueue, operation, timeout, observedSequenceNumber); - bool signaled = operation.Wait(timeout); - bytesSent = operation.BytesTransferred; - return signaled ? operation.ErrorCode : SocketError.TimedOut; - } - finally - { - @event?.Dispose(); - } + bytesSent = operation.BytesTransferred; + return operation.ErrorCode; } } @@ -1403,47 +1406,34 @@ public SocketError SendToAsync(byte[] buffer, int offset, int count, SocketFlags { SetNonBlocking(); - lock (_sendQueue.QueueLock) + bytesSent = 0; + SocketError errorCode; + int observedSequenceNumber; + if (_sendQueue.IsReady(this, out observedSequenceNumber) && + SocketPal.TryCompleteSendTo(_socket, buffer, ref offset, ref count, flags, socketAddress, socketAddressLen, ref bytesSent, out errorCode)) { - bytesSent = 0; - SocketError errorCode; - - if (_sendQueue.IsEmpty && - SocketPal.TryCompleteSendTo(_socket, buffer, ref offset, ref count, flags, socketAddress, socketAddressLen, ref bytesSent, out errorCode)) - { - // Synchronous success or failure - return errorCode; - } - - var operation = new BufferArraySendOperation - { - Callback = callback, - Buffer = buffer, - Offset = offset, - Count = count, - Flags = flags, - SocketAddress = socketAddress, - SocketAddressLen = socketAddressLen, - BytesTransferred = bytesSent - }; + return errorCode; + } - bool isStopped; - while (!TryBeginOperation(ref _sendQueue, operation, Interop.Sys.SocketEvents.Write, maintainOrder: true, isStopped: out isStopped)) - { - if (isStopped) - { - return SocketError.OperationAborted; - } + var operation = new BufferArraySendOperation + { + Callback = callback, + Buffer = buffer, + Offset = offset, + Count = count, + Flags = flags, + SocketAddress = socketAddress, + SocketAddressLen = socketAddressLen, + BytesTransferred = bytesSent + }; - if (operation.TryComplete(this)) - { - bytesSent = operation.BytesTransferred; - return operation.ErrorCode; - } - } - - return SocketError.IOPending; + if (!_sendQueue.StartAsyncOperation(this, operation, observedSequenceNumber)) + { + bytesSent = operation.BytesTransferred; + return operation.ErrorCode; } + + return SocketError.IOPending; } public SocketError Send(IList> buffers, SocketFlags flags, int timeout, out int bytesSent) @@ -1461,214 +1451,126 @@ public SocketError SendTo(IList> buffers, SocketFlags flags, { Debug.Assert(timeout == -1 || timeout > 0, $"Unexpected timeout: {timeout}"); - ManualResetEventSlim @event = null; - try + bytesSent = 0; + int bufferIndex = 0; + int offset = 0; + SocketError errorCode; + int observedSequenceNumber; + if (_sendQueue.IsReady(this, out observedSequenceNumber) && + SocketPal.TryCompleteSendTo(_socket, buffers, ref bufferIndex, ref offset, flags, socketAddress, socketAddressLen, ref bytesSent, out errorCode)) { - BufferListSendOperation operation; - - lock (_sendQueue.QueueLock) - { - bytesSent = 0; - int bufferIndex = 0; - int offset = 0; - SocketError errorCode; - - if (_sendQueue.IsEmpty && - SocketPal.TryCompleteSendTo(_socket, buffers, ref bufferIndex, ref offset, flags, socketAddress, socketAddressLen, ref bytesSent, out errorCode)) - { - return errorCode; - } - - @event = new ManualResetEventSlim(false, 0); + return errorCode; + } - operation = new BufferListSendOperation - { - Event = @event, - Buffers = buffers, - BufferIndex = bufferIndex, - Offset = offset, - Flags = flags, - SocketAddress = socketAddress, - SocketAddressLen = socketAddressLen, - BytesTransferred = bytesSent - }; - - bool isStopped; - while (!TryBeginOperation(ref _sendQueue, operation, Interop.Sys.SocketEvents.Write, maintainOrder: true, isStopped: out isStopped)) - { - if (isStopped) - { - bytesSent = operation.BytesTransferred; - return SocketError.Interrupted; - } + var operation = new BufferListSendOperation + { + Buffers = buffers, + BufferIndex = bufferIndex, + Offset = offset, + Flags = flags, + SocketAddress = socketAddress, + SocketAddressLen = socketAddressLen, + BytesTransferred = bytesSent + }; - if (operation.TryComplete(this)) - { - bytesSent = operation.BytesTransferred; - return operation.ErrorCode; - } - } - } + PerformSyncOperation(ref _sendQueue, operation, timeout, observedSequenceNumber); - bool signaled = operation.Wait(timeout); - bytesSent = operation.BytesTransferred; - return signaled ? operation.ErrorCode : SocketError.TimedOut; - } - finally - { - @event?.Dispose(); - } + bytesSent = operation.BytesTransferred; + return operation.ErrorCode; } public SocketError SendToAsync(IList> buffers, SocketFlags flags, byte[] socketAddress, ref int socketAddressLen, out int bytesSent, Action callback) { SetNonBlocking(); - lock (_sendQueue.QueueLock) + bytesSent = 0; + int bufferIndex = 0; + int offset = 0; + SocketError errorCode; + int observedSequenceNumber; + if (_sendQueue.IsReady(this, out observedSequenceNumber) && + SocketPal.TryCompleteSendTo(_socket, buffers, ref bufferIndex, ref offset, flags, socketAddress, socketAddressLen, ref bytesSent, out errorCode)) { - bytesSent = 0; - int bufferIndex = 0; - int offset = 0; - SocketError errorCode; - - if (_sendQueue.IsEmpty && - SocketPal.TryCompleteSendTo(_socket, buffers, ref bufferIndex, ref offset, flags, socketAddress, socketAddressLen, ref bytesSent, out errorCode)) - { - // Synchronous success or failure - return errorCode; - } + return errorCode; + } var operation = new BufferListSendOperation - { - Callback = callback, - Buffers = buffers, - BufferIndex = bufferIndex, - Offset = offset, - Flags = flags, - SocketAddress = socketAddress, - SocketAddressLen = socketAddressLen, - BytesTransferred = bytesSent - }; - - bool isStopped; - while (!TryBeginOperation(ref _sendQueue, operation, Interop.Sys.SocketEvents.Write, maintainOrder: true, isStopped: out isStopped)) - { - if (isStopped) - { - return SocketError.OperationAborted; - } - - if (operation.TryComplete(this)) - { - bytesSent = operation.BytesTransferred; - return operation.ErrorCode; - } - } + { + Callback = callback, + Buffers = buffers, + BufferIndex = bufferIndex, + Offset = offset, + Flags = flags, + SocketAddress = socketAddress, + SocketAddressLen = socketAddressLen, + BytesTransferred = bytesSent + }; - return SocketError.IOPending; + if (!_sendQueue.StartAsyncOperation(this, operation, observedSequenceNumber)) + { + bytesSent = operation.BytesTransferred; + return operation.ErrorCode; } + + return SocketError.IOPending; } public SocketError SendFile(SafeFileHandle fileHandle, long offset, long count, int timeout, out long bytesSent) { Debug.Assert(timeout == -1 || timeout > 0, $"Unexpected timeout: {timeout}"); - ManualResetEventSlim @event = null; - try + bytesSent = 0; + SocketError errorCode; + int observedSequenceNumber; + if (_sendQueue.IsReady(this, out observedSequenceNumber) && + SocketPal.TryCompleteSendFile(_socket, fileHandle, ref offset, ref count, ref bytesSent, out errorCode)) { - SendFileOperation operation; - - lock (_sendQueue.QueueLock) - { - bytesSent = 0; - SocketError errorCode; - - if (_sendQueue.IsEmpty && - SocketPal.TryCompleteSendFile(_socket, fileHandle, ref offset, ref count, ref bytesSent, out errorCode)) - { - return errorCode; - } - - @event = new ManualResetEventSlim(false, 0); + return errorCode; + } - operation = new SendFileOperation - { - Event = @event, - FileHandle = fileHandle, - Offset = offset, - Count = count, - BytesTransferred = bytesSent - }; - - bool isStopped; - while (!TryBeginOperation(ref _sendQueue, operation, Interop.Sys.SocketEvents.Write, maintainOrder: true, isStopped: out isStopped)) - { - if (isStopped) - { - bytesSent = operation.BytesTransferred; - return SocketError.Interrupted; - } + var operation = new SendFileOperation + { + FileHandle = fileHandle, + Offset = offset, + Count = count, + BytesTransferred = bytesSent + }; - if (operation.TryComplete(this)) - { - bytesSent = operation.BytesTransferred; - return operation.ErrorCode; - } - } - } + PerformSyncOperation(ref _sendQueue, operation, timeout, observedSequenceNumber); - bool signaled = operation.Wait(timeout); - bytesSent = operation.BytesTransferred; - return signaled ? operation.ErrorCode : SocketError.TimedOut; - } - finally - { - @event?.Dispose(); - } + bytesSent = operation.BytesTransferred; + return operation.ErrorCode; } public SocketError SendFileAsync(SafeFileHandle fileHandle, long offset, long count, out long bytesSent, Action callback) { SetNonBlocking(); - lock (_sendQueue.QueueLock) + bytesSent = 0; + SocketError errorCode; + int observedSequenceNumber; + if (_sendQueue.IsReady(this, out observedSequenceNumber) && + SocketPal.TryCompleteSendFile(_socket, fileHandle, ref offset, ref count, ref bytesSent, out errorCode)) { - bytesSent = 0; - SocketError errorCode; - - if (_sendQueue.IsEmpty && - SocketPal.TryCompleteSendFile(_socket, fileHandle, ref offset, ref count, ref bytesSent, out errorCode)) - { - // Synchronous success or failure - return errorCode; - } - - var operation = new SendFileOperation - { - Callback = callback, - FileHandle = fileHandle, - Offset = offset, - Count = count, - BytesTransferred = bytesSent - }; - - bool isStopped; - while (!TryBeginOperation(ref _sendQueue, operation, Interop.Sys.SocketEvents.Write, maintainOrder: true, isStopped: out isStopped)) - { - if (isStopped) - { - return SocketError.OperationAborted; - } + return errorCode; + } - if (operation.TryComplete(this)) - { - bytesSent = operation.BytesTransferred; - return operation.ErrorCode; - } - } + var operation = new SendFileOperation + { + Callback = callback, + FileHandle = fileHandle, + Offset = offset, + Count = count, + BytesTransferred = bytesSent + }; - return SocketError.IOPending; + if (!_sendQueue.StartAsyncOperation(this, operation, observedSequenceNumber)) + { + bytesSent = operation.BytesTransferred; + return operation.ErrorCode; } + + return SocketError.IOPending; } public unsafe void HandleEvents(Interop.Sys.SocketEvents events) @@ -1682,13 +1584,42 @@ public unsafe void HandleEvents(Interop.Sys.SocketEvents events) if ((events & Interop.Sys.SocketEvents.Read) != 0) { - _receiveQueue.Complete(this); + _receiveQueue.HandleEvent(this); } if ((events & Interop.Sys.SocketEvents.Write) != 0) { - _sendQueue.Complete(this); + _sendQueue.HandleEvent(this); } } + + // + // Tracing stuff + // + + // To enabled tracing: + // (1) Add reference to System.Console in the csproj + // (2) #define SOCKETASYNCCONTEXT_TRACE + +#if SOCKETASYNCCONTEXT_TRACE + public const bool TraceEnabled = true; +#else + public const bool TraceEnabled = false; +#endif + + public void Trace(string message, [CallerMemberName] string memberName = null) + { + OutputTrace($"{IdOf(this)}.{memberName}: {message}"); + } + + public static void OutputTrace(string s) + { + // CONSIDER: Change to NetEventSource +#if SOCKETASYNCCONTEXT_TRACE + Console.WriteLine(s); +#endif + } + + public static string IdOf(object o) => o == null ? "(null)" : $"{o.GetType().Name}#{o.GetHashCode():X2}"; } } diff --git a/src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index f9aec91675c7..d8a107b88010 100644 --- a/src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -39,10 +39,10 @@ public void Free() } } - public bool TryRegister(SafeCloseSocket socket, Interop.Sys.SocketEvents current, Interop.Sys.SocketEvents events, out Interop.Error error) + public bool TryRegister(SafeCloseSocket socket, out Interop.Error error) { Debug.Assert(WasAllocated, "Expected WasAllocated to be true"); - return _engine.TryRegister(socket, current, events, _handle, out error); + return _engine.TryRegister(socket, _handle, out error); } } @@ -375,15 +375,10 @@ private void FreeNativeResources() } } - private bool TryRegister(SafeCloseSocket socket, Interop.Sys.SocketEvents current, Interop.Sys.SocketEvents events, IntPtr handle, out Interop.Error error) + private bool TryRegister(SafeCloseSocket socket, IntPtr handle, out Interop.Error error) { - if (current == events) - { - error = Interop.Error.SUCCESS; - return true; - } - - error = Interop.Sys.TryChangeSocketEventRegistration(_port, socket, current, events, handle); + error = Interop.Sys.TryChangeSocketEventRegistration(_port, socket, Interop.Sys.SocketEvents.None, + Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, handle); return error == Interop.Error.SUCCESS; } } diff --git a/src/System.Net.Sockets/tests/FunctionalTests/DisconnectTest.cs b/src/System.Net.Sockets/tests/FunctionalTests/DisconnectTest.cs index f886b66f01ac..a22d8b8051be 100644 --- a/src/System.Net.Sockets/tests/FunctionalTests/DisconnectTest.cs +++ b/src/System.Net.Sockets/tests/FunctionalTests/DisconnectTest.cs @@ -69,6 +69,8 @@ public void Disconnect_Success(bool reuseSocket) client.Disconnect(reuseSocket); + Assert.False(client.Connected); + args.RemoteEndPoint = server2.EndPoint; if (client.ConnectAsync(args)) @@ -114,6 +116,7 @@ public void DisconnectAsync_Success(bool reuseSocket) } Assert.Equal(SocketError.Success, args.SocketError); + Assert.False(client.Connected); args.RemoteEndPoint = server2.EndPoint; @@ -155,6 +158,9 @@ public void BeginDisconnect_Success(bool reuseSocket) IAsyncResult ar = client.BeginDisconnect(reuseSocket, null, null); client.EndDisconnect(ar); + + Assert.False(client.Connected); + Assert.Throws(() => client.EndDisconnect(ar)); args.RemoteEndPoint = server2.EndPoint; diff --git a/src/System.Net.Sockets/tests/FunctionalTests/DualModeSocketTest.cs b/src/System.Net.Sockets/tests/FunctionalTests/DualModeSocketTest.cs index 0cdb3fe03fed..086daba8e097 100644 --- a/src/System.Net.Sockets/tests/FunctionalTests/DualModeSocketTest.cs +++ b/src/System.Net.Sockets/tests/FunctionalTests/DualModeSocketTest.cs @@ -588,7 +588,9 @@ private void DualModeConnectAsync_IPEndPointToHost_Helper(IPAddress connectTo, I args.RemoteEndPoint = new IPEndPoint(connectTo, port); args.UserToken = waitHandle; - socket.ConnectAsync(args); + bool pending = socket.ConnectAsync(args); + if (!pending) + waitHandle.Set(); Assert.True(waitHandle.WaitOne(TestSettings.PassingTestTimeout), "Timed out while waiting for connection"); if (args.SocketError != SocketError.Success) @@ -627,7 +629,9 @@ public void DualModeConnectAsync_DnsEndPointToHost_Helper(IPAddress listenOn, bo args.RemoteEndPoint = new DnsEndPoint("localhost", port); args.UserToken = waitHandle; - socket.ConnectAsync(args); + bool pending = socket.ConnectAsync(args); + if (!pending) + waitHandle.Set(); Assert.True(waitHandle.WaitOne(TestSettings.PassingTestTimeout), "Timed out while waiting for connection"); if (args.SocketError != SocketError.Success) @@ -651,7 +655,9 @@ public void DualModeConnectAsync_Static_DnsEndPointToHost_Helper(IPAddress liste args.RemoteEndPoint = new DnsEndPoint("localhost", port); args.UserToken = waitHandle; - Socket.ConnectAsync(SocketType.Stream, ProtocolType.Tcp, args); + bool pending = Socket.ConnectAsync(SocketType.Stream, ProtocolType.Tcp, args); + if (!pending) + waitHandle.Set(); Assert.True(waitHandle.WaitOne(TestSettings.PassingTestTimeout), "Timed out while waiting for connection"); if (args.SocketError != SocketError.Success) diff --git a/src/System.Net.Sockets/tests/FunctionalTests/SocketAsyncEventArgsTest.cs b/src/System.Net.Sockets/tests/FunctionalTests/SocketAsyncEventArgsTest.cs index b2d77f1424e3..78b8b696faf2 100644 --- a/src/System.Net.Sockets/tests/FunctionalTests/SocketAsyncEventArgsTest.cs +++ b/src/System.Net.Sockets/tests/FunctionalTests/SocketAsyncEventArgsTest.cs @@ -327,7 +327,8 @@ public void CancelConnectAsync_InstanceConnect_CancelsInProgressConnect() connectSaea.Completed += (s, e) => tcs.SetResult(e.SocketError); connectSaea.RemoteEndPoint = new IPEndPoint(IPAddress.Loopback, ((IPEndPoint)listen.LocalEndPoint).Port); - Assert.True(client.ConnectAsync(connectSaea), $"ConnectAsync completed synchronously with SocketError == {connectSaea.SocketError}"); + bool pending = client.ConnectAsync(connectSaea); + if (!pending) tcs.SetResult(connectSaea.SocketError); if (tcs.Task.IsCompleted) { Assert.NotEqual(SocketError.Success, tcs.Task.Result); From 86afe942a9e98061bb52257e6c4e7ae9baa5600c Mon Sep 17 00:00:00 2001 From: Geoff Kizer Date: Tue, 22 Aug 2017 00:12:32 -0700 Subject: [PATCH 2/3] address PR feedback --- .../Net/Sockets/SocketAsyncContext.Unix.cs | 88 ++++++++++--------- 1 file changed, 45 insertions(+), 43 deletions(-) diff --git a/src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index 34723bb7651c..af9a8824c326 100644 --- a/src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -8,9 +8,6 @@ using System.Runtime.CompilerServices; using System.Threading; -// Disable unreachable code warning for trace code -#pragma warning disable CS0162 - namespace System.Net.Sockets { // Note on asynchronous behavior here: @@ -70,11 +67,11 @@ public AsyncOperation() public bool TryComplete(SocketAsyncContext context) { - if (TraceEnabled) TraceWithContext(context, "Enter"); + TraceWithContext(context, "Enter"); bool result = DoTryComplete(context); - if (TraceEnabled) TraceWithContext(context, $"Exit, result={result}"); + TraceWithContext(context, $"Exit, result={result}"); return result; } @@ -85,7 +82,7 @@ public bool TrySetRunning() if (oldState == State.Cancelled) { // This operation has already been cancelled, and had its completion processed. - // Simply return true to indicate no further processing is needed. + // Simply return false to indicate no further processing is needed. return false; } @@ -131,7 +128,7 @@ public void DoCallback() public bool TryCancel() { - if (TraceEnabled) Trace("Enter"); + Trace("Enter"); // Try to transition from Waiting to Cancelled var spinWait = new SpinWait(); @@ -143,13 +140,13 @@ public bool TryCancel() { case State.Running: // A completion attempt is in progress. Keep busy-waiting. - if (TraceEnabled) Trace("Busy wait"); + Trace("Busy wait"); spinWait.SpinOnce(); break; case State.Complete: // A completion attempt succeeded. Consider this operation as having completed within the timeout. - if (TraceEnabled) Trace("Exit, previously completed"); + Trace("Exit, previously completed"); return false; case State.Waiting: @@ -162,12 +159,12 @@ public bool TryCancel() // Someone else cancelled the operation. // Just return true to indicate the operation was cancelled. // The previous canceller will have fired the completion, etc. - if (TraceEnabled) Trace("Exit, previously cancelled"); + Trace("Exit, previously cancelled"); return true; } } - if (TraceEnabled) Trace("Cancelled, processing completion"); + Trace("Cancelled, processing completion"); // The operation successfully cancelled. // It's our responsibility to set the error code and queue the completion. @@ -187,7 +184,7 @@ public bool TryCancel() ThreadPool.QueueUserWorkItem(o => ((AsyncOperation)o).InvokeCallback(), this); } - if (TraceEnabled) Trace("Exit"); + Trace("Exit"); // Note, we leave the operation in the OperationQueue. // When we get around to processing it, we'll see it's cancelled and skip it. @@ -207,11 +204,13 @@ public void DoAbort() protected abstract void InvokeCallback(); + [Conditional("SOCKETASYNCCONTEXT_TRACE")] public void Trace(string message, [CallerMemberName] string memberName = null) { OutputTrace($"{IdOf(this)}.{memberName}: {message}"); } + [Conditional("SOCKETASYNCCONTEXT_TRACE")] public void TraceWithContext(SocketAsyncContext context, string message, [CallerMemberName] string memberName = null) { OutputTrace($"{IdOf(context)}, {IdOf(this)}.{memberName}: {message}"); @@ -423,7 +422,7 @@ protected override bool DoTryComplete(SocketAsyncContext context) => // (2) Deadlock, by setting a reasonably large timeout private struct LockToken : IDisposable { - private object _lockObject; + private readonly object _lockObject; public LockToken(object lockObject) { @@ -435,7 +434,7 @@ public LockToken(object lockObject) #if DEBUG bool success = Monitor.TryEnter(_lockObject, 10000); - Debug.Assert(success); + Debug.Assert(success, "Timed out waiting for queue lock"); #else Monitor.Enter(_lockObject); #endif @@ -466,7 +465,7 @@ private struct OperationQueue // If we successfully process all enqueued operations, then the state becomes Ready; // otherwise, the state becomes Waiting and we wait for another epoll notification. - private enum QueueState + private enum QueueState : byte { Ready = 0, // Indicates that data MAY be available on the socket. // Queue must be empty. @@ -519,7 +518,7 @@ public bool IsReady(SocketAsyncContext context, out int observedSequenceNumber) observedSequenceNumber = _sequenceNumber; bool isReady = (_state == QueueState.Ready); - if (TraceEnabled) Trace(context, $"{isReady}"); + Trace(context, $"{isReady}"); return isReady; } @@ -528,7 +527,7 @@ public bool IsReady(SocketAsyncContext context, out int observedSequenceNumber) // Return true for pending, false for completed synchronously (including failure and abort) public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation, int observedSequenceNumber) { - if (TraceEnabled) Trace(context, $"Enter"); + Trace(context, $"Enter"); if (!context._registered) { @@ -569,7 +568,7 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation _tail = operation; - if (TraceEnabled) Trace(context, $"Leave, enqueued {IdOf(operation)}"); + Trace(context, $"Leave, enqueued {IdOf(operation)}"); return true; case QueueState.Stopped: @@ -586,14 +585,14 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation if (doAbort) { operation.DoAbort(); - if (TraceEnabled) Trace(context, $"Leave, queue stopped"); + Trace(context, $"Leave, queue stopped"); return false; } // Retry the operation. if (operation.TryComplete(context)) { - if (TraceEnabled) Trace(context, $"Leave, retry succeeded"); + Trace(context, $"Leave, retry succeeded"); return false; } } @@ -604,14 +603,14 @@ public void HandleEvent(SocketAsyncContext context) { using (Lock()) { - if (TraceEnabled) Trace(context, $"Enter"); + Trace(context, $"Enter"); switch (_state) { case QueueState.Ready: Debug.Assert(_tail == null, "State == Ready but queue is not empty!"); _sequenceNumber++; - if (TraceEnabled) Trace(context, $"Exit (previously ready)"); + Trace(context, $"Exit (previously ready)"); return; case QueueState.Waiting: @@ -623,12 +622,12 @@ public void HandleEvent(SocketAsyncContext context) case QueueState.Processing: Debug.Assert(_tail != null, "State == Processing but queue is empty!"); _sequenceNumber++; - if (TraceEnabled) Trace(context, $"Exit (currently processing)"); + Trace(context, $"Exit (currently processing)"); return; case QueueState.Stopped: Debug.Assert(_tail == null); - if (TraceEnabled) Trace(context, $"Exit (stopped)"); + Trace(context, $"Exit (stopped)"); return; default: @@ -649,12 +648,12 @@ public void ProcessQueue(SocketAsyncContext context) AsyncOperation op; using (Lock()) { - if (TraceEnabled) Trace(context, $"Enter"); + Trace(context, $"Enter"); if (_state == QueueState.Stopped) { Debug.Assert(_tail == null); - if (TraceEnabled) Trace(context, $"Exit (stopped)"); + Trace(context, $"Exit (stopped)"); return; } else @@ -671,8 +670,12 @@ public void ProcessQueue(SocketAsyncContext context) while (true) { bool wasCompleted = false; - bool wasCancelled = !op.TrySetRunning(); - if (!wasCancelled) + + // Try to change the op state to Running. + // If this fails, it means the operation was previously cancelled, + // and we should just remove it from the queue without further processing. + bool isRunning = op.TrySetRunning(); + if (isRunning) { // Try to perform the IO wasCompleted = op.TryComplete(context); @@ -687,7 +690,7 @@ public void ProcessQueue(SocketAsyncContext context) } nextOp = null; - if (wasCompleted || wasCancelled) + if (wasCompleted || !isRunning) { // Remove the op from the queue and see if there's more to process. @@ -696,11 +699,12 @@ public void ProcessQueue(SocketAsyncContext context) if (_state == QueueState.Stopped) { Debug.Assert(_tail == null); - if (TraceEnabled) Trace(context, $"Exit (stopped)"); + Trace(context, $"Exit (stopped)"); } else { Debug.Assert(_state == QueueState.Processing, $"_state={_state} while processing queue!"); + Debug.Assert(_tail.Next == op, "Queue modified while processing queue"); if (op == _tail) { @@ -708,7 +712,7 @@ public void ProcessQueue(SocketAsyncContext context) _tail = null; _state = QueueState.Ready; _sequenceNumber++; - if (TraceEnabled) Trace(context, $"Exit (finished queue)"); + Trace(context, $"Exit (finished queue)"); } else { @@ -727,7 +731,7 @@ public void ProcessQueue(SocketAsyncContext context) if (_state == QueueState.Stopped) { Debug.Assert(_tail == null); - if (TraceEnabled) Trace(context, $"Exit (stopped)"); + Trace(context, $"Exit (stopped)"); } else { @@ -744,7 +748,7 @@ public void ProcessQueue(SocketAsyncContext context) else { _state = QueueState.Waiting; - if (TraceEnabled) Trace(context, $"Exit (received EAGAIN)"); + Trace(context, $"Exit (received EAGAIN)"); } } } @@ -784,7 +788,7 @@ public void StopAndAbort(SocketAsyncContext context) using (Lock()) { - if (TraceEnabled) Trace(context, $"Enter"); + Trace(context, $"Enter"); Debug.Assert(_state != QueueState.Stopped); @@ -802,10 +806,11 @@ public void StopAndAbort(SocketAsyncContext context) _tail = null; - if (TraceEnabled) Trace(context, $"Exit"); + Trace(context, $"Exit"); } } + [Conditional("SOCKETASYNCCONTEXT_TRACE")] public void Trace(SocketAsyncContext context, string message, [CallerMemberName] string memberName = null) { string queueType = @@ -861,7 +866,7 @@ private void Register() _asyncEngineToken = token; _registered = true; - if (TraceEnabled) Trace("Registered"); + Trace("Registered"); } } } @@ -999,7 +1004,8 @@ public SocketError Connect(byte[] socketAddress, int socketAddressLen, int timeo Debug.Assert(timeout == -1 || timeout > 0, $"Unexpected timeout: {timeout}"); // Connect is different than the usual "readiness" pattern of other operations. - // We need to initiate the connect before we try to complete it. + // We need to call TryStartConnect to initiate the connect with the OS, + // before we try to complete it via epoll notification. // Thus, always call TryStartConnect regardless of readiness. SocketError errorCode; int observedSequenceNumber; @@ -1601,17 +1607,13 @@ public unsafe void HandleEvents(Interop.Sys.SocketEvents events) // (1) Add reference to System.Console in the csproj // (2) #define SOCKETASYNCCONTEXT_TRACE -#if SOCKETASYNCCONTEXT_TRACE - public const bool TraceEnabled = true; -#else - public const bool TraceEnabled = false; -#endif - + [Conditional("SOCKETASYNCCONTEXT_TRACE")] public void Trace(string message, [CallerMemberName] string memberName = null) { OutputTrace($"{IdOf(this)}.{memberName}: {message}"); } + [Conditional("SOCKETASYNCCONTEXT_TRACE")] public static void OutputTrace(string s) { // CONSIDER: Change to NetEventSource From a2e07810267e88ed62dccb90b4c195436b05c5a9 Mon Sep 17 00:00:00 2001 From: Geoff Kizer Date: Wed, 15 Nov 2017 12:03:20 -0800 Subject: [PATCH 3/3] disable assert that's not true on OSX --- .../src/System/Net/Sockets/SocketAsyncContext.Unix.cs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index af9a8824c326..d48f82a482d2 100644 --- a/src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -841,7 +841,10 @@ public SocketAsyncContext(SafeCloseSocket socket) private void Register() { - Debug.Assert(_nonBlockingSet); + // Note, on OSX, this is not always true because in certain cases, + // the socket can already be in non-blocking mode even though we didn't set that ourselves. + // TODO: Track down exactly why this is +// Debug.Assert(_nonBlockingSet); lock (_registerLock) { if (!_registered)