From bc829b72831c9de3bc24d43a870bba1cb1226061 Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Sun, 19 Apr 2020 02:17:49 -0700 Subject: [PATCH 01/13] Parallelize epoll events on thread pool and process events in the same thread --- .../Net/Sockets/SocketAsyncContext.Unix.cs | 12 ++-- .../Net/Sockets/SocketAsyncEngine.Unix.cs | 56 ++++++++++++++++--- 2 files changed, 57 insertions(+), 11 deletions(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index beb4cc088839d8..5a45ca86200d26 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -264,7 +264,7 @@ public bool TryCancel() return true; } - public void Dispatch() + public void Dispatch(bool inlineAsync) { ManualResetEventSlim? e = Event; if (e != null) @@ -272,6 +272,10 @@ public void Dispatch() // Sync operation. Signal waiting thread to continue processing. e.Set(); } + else if (inlineAsync) + { + ((IThreadPoolWorkItem)this).Execute(); + } else { // Async operation. Process the IO on the threadpool. @@ -866,7 +870,7 @@ public void HandleEvent(SocketAsyncContext context) } // Dispatch the op so we can try to process it. - op.Dispatch(); + op.Dispatch(inlineAsync: true); } internal void ProcessAsyncOperation(TOperation op) @@ -1003,7 +1007,7 @@ public OperationResult ProcessQueuedOperation(TOperation op) } } - nextOp?.Dispatch(); + nextOp?.Dispatch(inlineAsync: false); return (wasCompleted ? OperationResult.Completed : OperationResult.Cancelled); } @@ -1082,7 +1086,7 @@ public void CancelAndContinueProcessing(TOperation op) } } - nextOp?.Dispatch(); + nextOp?.Dispatch(inlineAsync: false); } // Called when the socket is closed. diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 9d5f48b7045e37..243c89945a3394 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -11,7 +11,7 @@ namespace System.Net.Sockets { - internal sealed unsafe class SocketAsyncEngine + internal sealed unsafe class SocketAsyncEngine : IThreadPoolWorkItem { // // Encapsulates a particular SocketAsyncContext object's access to a SocketAsyncEngine. @@ -130,6 +130,9 @@ public bool TryRegister(SafeSocketHandle socket, out Interop.Error error) // private readonly ConcurrentDictionary _handleToContextMap = new ConcurrentDictionary(); + private readonly ConcurrentQueue _eventQueue = new ConcurrentQueue(); + private int _eventQueueProcessingRequested; + // // True if we've reached the handle value limit for this event port, and thus must allocate a new event port // on the next handle allocation. @@ -308,6 +311,7 @@ private void EventLoop() try { bool shutdown = false; + ConcurrentQueue eventQueue = _eventQueue; while (!shutdown) { int numEvents = EventBufferCount; @@ -320,6 +324,7 @@ private void EventLoop() // The native shim is responsible for ensuring this condition. Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}"); + bool scheduleProcessing = false; for (int i = 0; i < numEvents; i++) { IntPtr handle = _buffer[i].Data; @@ -330,14 +335,15 @@ private void EventLoop() else { Debug.Assert(handle.ToInt64() < MaxHandles.ToInt64(), $"Unexpected values: handle={handle}, MaxHandles={MaxHandles}"); - _handleToContextMap.TryGetValue(handle, out SocketAsyncContext? context); - if (context != null) - { - context.HandleEvents(_buffer[i].Events); - context = null; - } + eventQueue.Enqueue(new Event(handle, _buffer[i].Events)); + scheduleProcessing = true; } } + + if (scheduleProcessing && Interlocked.CompareExchange(ref _eventQueueProcessingRequested, 1, 0) == 0) + { + ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); + } } FreeNativeResources(); @@ -348,6 +354,30 @@ private void EventLoop() } } + void IThreadPoolWorkItem.Execute() + { + Volatile.Write(ref _eventQueueProcessingRequested, 0); + + ConcurrentDictionary handleToContextMap = _handleToContextMap; + ConcurrentQueue eventQueue = _eventQueue; + while (eventQueue.TryDequeue(out Event ev)) + { + handleToContextMap.TryGetValue(ev.Handle, out SocketAsyncContext? context); + if (context == null) + { + continue; + } + + if (_eventQueueProcessingRequested == 0 && + Interlocked.CompareExchange(ref _eventQueueProcessingRequested, 1, 0) == 0) + { + ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); + } + + context.HandleEvents(ev.Events); + } + } + private void RequestEventLoopShutdown() { // @@ -387,5 +417,17 @@ private bool TryRegister(SafeSocketHandle socket, IntPtr handle, out Interop.Err Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, handle); return error == Interop.Error.SUCCESS; } + + private struct Event + { + public IntPtr Handle { get; private set; } + public Interop.Sys.SocketEvents Events { get; private set; } + + public Event(IntPtr handle, Interop.Sys.SocketEvents events) + { + Handle = handle; + Events = events; + } + } } } From 27992a16c7b7007945d1e1325af1bcbd1d2c20c0 Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Tue, 21 Apr 2020 11:12:53 -0700 Subject: [PATCH 02/13] Use interlocked write instead of volatile write --- .../src/System/Net/Sockets/SocketAsyncEngine.Unix.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 243c89945a3394..04dc86812bd578 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -356,7 +356,7 @@ private void EventLoop() void IThreadPoolWorkItem.Execute() { - Volatile.Write(ref _eventQueueProcessingRequested, 0); + Interlocked.Exchange(ref _eventQueueProcessingRequested, 0); ConcurrentDictionary handleToContextMap = _handleToContextMap; ConcurrentQueue eventQueue = _eventQueue; From 46dc10d699f77c9e4e36ab123d84cb743cbe822f Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Wed, 22 Apr 2020 12:49:56 -0700 Subject: [PATCH 03/13] Upon epoll notification for reads and writes to a socket, queue read work and process write work in same thread --- .../Net/Sockets/SocketAsyncContext.Unix.cs | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index 5a45ca86200d26..87f1c691c8930c 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -829,10 +829,9 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation } } - // Called on the epoll thread whenever we receive an epoll notification. - public void HandleEvent(SocketAsyncContext context) + /// An operation that should be processed, or null if no further processing is required + public AsyncOperation? HandleEvent(SocketAsyncContext context) { - AsyncOperation op; using (Lock()) { Trace(context, $"Enter"); @@ -843,34 +842,29 @@ public void HandleEvent(SocketAsyncContext context) Debug.Assert(_tail == null, "State == Ready but queue is not empty!"); _sequenceNumber++; Trace(context, $"Exit (previously ready)"); - return; + return null; case QueueState.Waiting: Debug.Assert(_tail != null, "State == Waiting but queue is empty!"); _state = QueueState.Processing; - op = _tail.Next; - // Break out and release lock - break; + return _tail.Next; case QueueState.Processing: Debug.Assert(_tail != null, "State == Processing but queue is empty!"); _sequenceNumber++; Trace(context, $"Exit (currently processing)"); - return; + return null; case QueueState.Stopped: Debug.Assert(_tail == null); Trace(context, $"Exit (stopped)"); - return; + return null; default: Environment.FailFast("unexpected queue state"); - return; + return null; } } - - // Dispatch the op so we can try to process it. - op.Dispatch(inlineAsync: true); } internal void ProcessAsyncOperation(TOperation op) @@ -1959,14 +1953,18 @@ public unsafe void HandleEvents(Interop.Sys.SocketEvents events) events |= Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write; } - if ((events & Interop.Sys.SocketEvents.Read) != 0) + AsyncOperation? receiveOperation = + (events & Interop.Sys.SocketEvents.Read) != 0 ? _receiveQueue.HandleEvent(this) : null; + AsyncOperation? sendOperation = + (events & Interop.Sys.SocketEvents.Write) != 0 ? _sendQueue.HandleEvent(this) : null; + if (sendOperation == null) { - _receiveQueue.HandleEvent(this); + receiveOperation?.Dispatch(inlineAsync: true); } - - if ((events & Interop.Sys.SocketEvents.Write) != 0) + else { - _sendQueue.HandleEvent(this); + receiveOperation?.Dispatch(inlineAsync: false); + sendOperation.Dispatch(inlineAsync: true); } } From df0caf0197c6d948878f8cc02bb943805c8861c1 Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Thu, 23 Apr 2020 01:05:56 -0700 Subject: [PATCH 04/13] Clean up change --- .../Net/Sockets/SocketAsyncContext.Unix.cs | 22 +++++---- .../Net/Sockets/SocketAsyncEngine.Unix.cs | 46 +++++++++++++++---- 2 files changed, 51 insertions(+), 17 deletions(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index 87f1c691c8930c..008d2a1ae594e9 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -264,7 +264,7 @@ public bool TryCancel() return true; } - public void Dispatch(bool inlineAsync) + public void Dispatch(bool processAsyncOperationSynchronously) { ManualResetEventSlim? e = Event; if (e != null) @@ -272,13 +272,14 @@ public void Dispatch(bool inlineAsync) // Sync operation. Signal waiting thread to continue processing. e.Set(); } - else if (inlineAsync) + else if (processAsyncOperationSynchronously) { + // Async operation. Process the IO and callback on the current thread synchronously as requested. ((IThreadPoolWorkItem)this).Execute(); } else { - // Async operation. Process the IO on the threadpool. + // Async operation. Process the IO and callback on the threadpool. ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); } } @@ -1001,7 +1002,7 @@ public OperationResult ProcessQueuedOperation(TOperation op) } } - nextOp?.Dispatch(inlineAsync: false); + nextOp?.Dispatch(processAsyncOperationSynchronously: false); return (wasCompleted ? OperationResult.Completed : OperationResult.Cancelled); } @@ -1080,7 +1081,7 @@ public void CancelAndContinueProcessing(TOperation op) } } - nextOp?.Dispatch(inlineAsync: false); + nextOp?.Dispatch(processAsyncOperationSynchronously: false); } // Called when the socket is closed. @@ -1957,14 +1958,19 @@ public unsafe void HandleEvents(Interop.Sys.SocketEvents events) (events & Interop.Sys.SocketEvents.Read) != 0 ? _receiveQueue.HandleEvent(this) : null; AsyncOperation? sendOperation = (events & Interop.Sys.SocketEvents.Write) != 0 ? _sendQueue.HandleEvent(this) : null; + + // This method is called from a thread pool thread. When we have only one operation to process, process it + // synchronously to avoid an extra thread pool work item. When we have two operations to process, processing both + // synchronously may delay the second operation, so schedule one onto the thread pool and process the other + // synchronously. There might be better ways of doing this. if (sendOperation == null) { - receiveOperation?.Dispatch(inlineAsync: true); + receiveOperation?.Dispatch(processAsyncOperationSynchronously: true); } else { - receiveOperation?.Dispatch(inlineAsync: false); - sendOperation.Dispatch(inlineAsync: true); + receiveOperation?.Dispatch(processAsyncOperationSynchronously: false); + sendOperation.Dispatch(processAsyncOperationSynchronously: true); } } diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 04dc86812bd578..5258c4f67260c1 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Concurrent; using System.Diagnostics; +using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; @@ -130,7 +131,16 @@ public bool TryRegister(SafeSocketHandle socket, out Interop.Error error) // private readonly ConcurrentDictionary _handleToContextMap = new ConcurrentDictionary(); + // + // Queue of events generated by EventLoop() that would be processed by the thread pool + // private readonly ConcurrentQueue _eventQueue = new ConcurrentQueue(); + + // + // This field is set to 1 to indicate that a thread pool work item is scheduled to process events in _eventQueue. It is + // set to 0 when the scheduled work item starts running, to indicate that a thread pool work item to process events is + // not scheduled. Changes are protected by atomic operations as appropriate. + // private int _eventQueueProcessingRequested; // @@ -324,7 +334,7 @@ private void EventLoop() // The native shim is responsible for ensuring this condition. Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}"); - bool scheduleProcessing = false; + bool enqueuedEvent = false; for (int i = 0; i < numEvents; i++) { IntPtr handle = _buffer[i].Data; @@ -336,13 +346,13 @@ private void EventLoop() { Debug.Assert(handle.ToInt64() < MaxHandles.ToInt64(), $"Unexpected values: handle={handle}, MaxHandles={MaxHandles}"); eventQueue.Enqueue(new Event(handle, _buffer[i].Events)); - scheduleProcessing = true; + enqueuedEvent = true; } } - if (scheduleProcessing && Interlocked.CompareExchange(ref _eventQueueProcessingRequested, 1, 0) == 0) + if (enqueuedEvent) { - ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); + ScheduleToProcessEvents(); } } @@ -354,8 +364,24 @@ private void EventLoop() } } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void ScheduleToProcessEvents() + { + // Schedule a thread pool work item to process events. Only one work item is scheduled at any given time to avoid + // over-parallelization. When the work item begins running, this field is reset to 0, allowing for another work item + // to be scheduled for parallelizing processing of events. + if (Interlocked.CompareExchange(ref _eventQueueProcessingRequested, 1, 0) == 0) + { + ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); + } + } + void IThreadPoolWorkItem.Execute() { + // Indicate that a work item is no longer scheduled to process events. The change needs to be visible to enqueuer + // threads (only for EventLoop() currently) before an event is attempted to be dequeued. In particular, if an + // enqueuer queues an event and does not schedule a work item because it is already scheduled, and this thread is + // the last thread processing events, it must see the event queued by the enqueuer. Interlocked.Exchange(ref _eventQueueProcessingRequested, 0); ConcurrentDictionary handleToContextMap = _handleToContextMap; @@ -368,10 +394,12 @@ void IThreadPoolWorkItem.Execute() continue; } - if (_eventQueueProcessingRequested == 0 && - Interlocked.CompareExchange(ref _eventQueueProcessingRequested, 1, 0) == 0) + // An event was successfully dequeued, and as there may be more events to process, speculatively schedule a work + // item to parallelize processing of events. Since this is only for additional parallelization, doing so + // speculatively is ok. + if (_eventQueueProcessingRequested == 0) { - ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); + ScheduleToProcessEvents(); } context.HandleEvents(ev.Events); @@ -420,8 +448,8 @@ private bool TryRegister(SafeSocketHandle socket, IntPtr handle, out Interop.Err private struct Event { - public IntPtr Handle { get; private set; } - public Interop.Sys.SocketEvents Events { get; private set; } + public IntPtr Handle { get; } + public Interop.Sys.SocketEvents Events { get; } public Event(IntPtr handle, Interop.Sys.SocketEvents events) { From 723e7f25fd651506dc18792c53cef3ea622b10f9 Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Thu, 23 Apr 2020 07:14:52 -0400 Subject: [PATCH 05/13] Update comment as suggested Co-Authored-By: Stephen Toub --- .../src/System/Net/Sockets/SocketAsyncContext.Unix.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index 008d2a1ae594e9..af5897097f0572 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -279,7 +279,7 @@ public void Dispatch(bool processAsyncOperationSynchronously) } else { - // Async operation. Process the IO and callback on the threadpool. + // Async operation. Process the IO and callback asynchronously on the threadpool (even if we're already on a pool thread). ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); } } From f12febc756c5e691dc309f20a3b66a9dd0a0d4ec Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Thu, 23 Apr 2020 15:43:50 -0700 Subject: [PATCH 06/13] Readonly for Event struct and rename struct to SocketIOEvent --- .../System/Net/Sockets/SocketAsyncEngine.Unix.cs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 5258c4f67260c1..51e0b8674f9aa8 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -134,7 +134,7 @@ public bool TryRegister(SafeSocketHandle socket, out Interop.Error error) // // Queue of events generated by EventLoop() that would be processed by the thread pool // - private readonly ConcurrentQueue _eventQueue = new ConcurrentQueue(); + private readonly ConcurrentQueue _eventQueue = new ConcurrentQueue(); // // This field is set to 1 to indicate that a thread pool work item is scheduled to process events in _eventQueue. It is @@ -321,7 +321,7 @@ private void EventLoop() try { bool shutdown = false; - ConcurrentQueue eventQueue = _eventQueue; + ConcurrentQueue eventQueue = _eventQueue; while (!shutdown) { int numEvents = EventBufferCount; @@ -345,7 +345,7 @@ private void EventLoop() else { Debug.Assert(handle.ToInt64() < MaxHandles.ToInt64(), $"Unexpected values: handle={handle}, MaxHandles={MaxHandles}"); - eventQueue.Enqueue(new Event(handle, _buffer[i].Events)); + eventQueue.Enqueue(new SocketIOEvent(handle, _buffer[i].Events)); enqueuedEvent = true; } } @@ -385,8 +385,8 @@ void IThreadPoolWorkItem.Execute() Interlocked.Exchange(ref _eventQueueProcessingRequested, 0); ConcurrentDictionary handleToContextMap = _handleToContextMap; - ConcurrentQueue eventQueue = _eventQueue; - while (eventQueue.TryDequeue(out Event ev)) + ConcurrentQueue eventQueue = _eventQueue; + while (eventQueue.TryDequeue(out SocketIOEvent ev)) { handleToContextMap.TryGetValue(ev.Handle, out SocketAsyncContext? context); if (context == null) @@ -446,12 +446,12 @@ private bool TryRegister(SafeSocketHandle socket, IntPtr handle, out Interop.Err return error == Interop.Error.SUCCESS; } - private struct Event + private readonly struct SocketIOEvent { public IntPtr Handle { get; } public Interop.Sys.SocketEvents Events { get; } - public Event(IntPtr handle, Interop.Sys.SocketEvents events) + public SocketIOEvent(IntPtr handle, Interop.Sys.SocketEvents events) { Handle = handle; Events = events; From 9c0e0f15a8e115a22dd7fe49c20019ab87ac307c Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Fri, 24 Apr 2020 10:56:49 -0700 Subject: [PATCH 07/13] Track and speculatively handle epoll events for synchronous operations on the epoll thread --- .../Net/Sockets/SocketAsyncContext.Unix.cs | 101 ++++++++++++++---- .../Net/Sockets/SocketAsyncEngine.Unix.cs | 34 +++--- 2 files changed, 101 insertions(+), 34 deletions(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index af5897097f0572..32d9a983c7fffc 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -264,7 +264,7 @@ public bool TryCancel() return true; } - public void Dispatch(bool processAsyncOperationSynchronously) + public void Dispatch() { ManualResetEventSlim? e = Event; if (e != null) @@ -272,18 +272,23 @@ public void Dispatch(bool processAsyncOperationSynchronously) // Sync operation. Signal waiting thread to continue processing. e.Set(); } - else if (processAsyncOperationSynchronously) - { - // Async operation. Process the IO and callback on the current thread synchronously as requested. - ((IThreadPoolWorkItem)this).Execute(); - } else { - // Async operation. Process the IO and callback asynchronously on the threadpool (even if we're already on a pool thread). - ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); + // Async operation. + Schedule(); } } + public void Schedule() + { + Debug.Assert(Event == null); + + // Async operation. Process the IO on the threadpool. + ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); + } + + public void Process() => ((IThreadPoolWorkItem)this).Execute(); + void IThreadPoolWorkItem.Execute() { // ReadOperation and WriteOperation, the only two types derived from @@ -711,6 +716,7 @@ private enum QueueState : byte // These fields define the queue state. private QueueState _state; // See above + private bool _isNextOperationSynchronous; private int _sequenceNumber; // This sequence number is updated when we receive an epoll notification. // It allows us to detect when a new epoll notification has arrived // since the last time we checked the state of the queue. @@ -725,6 +731,8 @@ private enum QueueState : byte private LockToken Lock() => new LockToken(_queueLock); + public bool IsNextOperationSynchronous_Speculative => _isNextOperationSynchronous; + public void Init() { Debug.Assert(_queueLock == null); @@ -784,7 +792,12 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation // Enqueue the operation. Debug.Assert(operation.Next == operation, "Expected operation.Next == operation"); - if (_tail != null) + if (_tail == null) + { + Debug.Assert(!_isNextOperationSynchronous); + _isNextOperationSynchronous = operation.Event != null; + } + else { operation.Next = _tail.Next; _tail.Next = operation; @@ -830,9 +843,9 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation } } - /// An operation that should be processed, or null if no further processing is required - public AsyncOperation? HandleEvent(SocketAsyncContext context) + public AsyncOperation? ProcessSyncEventOrGetAsyncEvent(SocketAsyncContext context) { + AsyncOperation op; using (Lock()) { Trace(context, $"Enter"); @@ -848,7 +861,9 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation case QueueState.Waiting: Debug.Assert(_tail != null, "State == Waiting but queue is empty!"); _state = QueueState.Processing; - return _tail.Next; + op = _tail.Next; + // Break out and release lock + break; case QueueState.Processing: Debug.Assert(_tail != null, "State == Processing but queue is empty!"); @@ -866,6 +881,19 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation return null; } } + + ManualResetEventSlim? e = op.Event; + if (e != null) + { + // Sync operation. Signal waiting thread to continue processing. + e.Set(); + return null; + } + else + { + // Async operation. The caller will figure out how to process the IO. + return op; + } } internal void ProcessAsyncOperation(TOperation op) @@ -990,6 +1018,7 @@ public OperationResult ProcessQueuedOperation(TOperation op) { // No more operations to process _tail = null; + _isNextOperationSynchronous = false; _state = QueueState.Ready; _sequenceNumber++; Trace(context, $"Exit (finished queue)"); @@ -998,11 +1027,12 @@ public OperationResult ProcessQueuedOperation(TOperation op) { // Pop current operation and advance to next nextOp = _tail.Next = op.Next; + _isNextOperationSynchronous = nextOp.Event != null; } } } - nextOp?.Dispatch(processAsyncOperationSynchronously: false); + nextOp?.Dispatch(); return (wasCompleted ? OperationResult.Completed : OperationResult.Cancelled); } @@ -1032,11 +1062,13 @@ public void CancelAndContinueProcessing(TOperation op) { // No more operations _tail = null; + _isNextOperationSynchronous = false; } else { // Pop current operation and advance to next _tail.Next = op.Next; + _isNextOperationSynchronous = op.Next.Event != null; } // We're the first op in the queue. @@ -1081,7 +1113,7 @@ public void CancelAndContinueProcessing(TOperation op) } } - nextOp?.Dispatch(processAsyncOperationSynchronously: false); + nextOp?.Dispatch(); } // Called when the socket is closed. @@ -1111,6 +1143,7 @@ public bool StopAndAbort(SocketAsyncContext context) } _tail = null; + _isNextOperationSynchronous = false; Trace(context, $"Exit"); } @@ -1945,6 +1978,36 @@ public SocketError SendFileAsync(SafeFileHandle fileHandle, long offset, long co return SocketError.IOPending; } + // Called on the epoll thread, speculatively tries to process synchronous events and errors for synchronous events, and + // returns any remaining events that remain to be processed. Taking a lock for each operation queue to deterministically + // handle synchronous events on the epoll thread seems to significantly reduce throughput in benchmarks. + public unsafe Interop.Sys.SocketEvents HandleSyncEventsSpeculatively(Interop.Sys.SocketEvents events) + { + if ((events & Interop.Sys.SocketEvents.Error) != 0) + { + // Set the Read and Write flags; the processing for these events + // will pick up the error. + events ^= Interop.Sys.SocketEvents.Error; + events |= Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write; + } + + if ((events & Interop.Sys.SocketEvents.Read) != 0 && + _receiveQueue.IsNextOperationSynchronous_Speculative && + _receiveQueue.ProcessSyncEventOrGetAsyncEvent(this) == null) + { + events ^= Interop.Sys.SocketEvents.Read; + } + + if ((events & Interop.Sys.SocketEvents.Write) != 0 && + _sendQueue.IsNextOperationSynchronous_Speculative && + _sendQueue.ProcessSyncEventOrGetAsyncEvent(this) == null) + { + events ^= Interop.Sys.SocketEvents.Write; + } + + return events; + } + public unsafe void HandleEvents(Interop.Sys.SocketEvents events) { if ((events & Interop.Sys.SocketEvents.Error) != 0) @@ -1955,9 +2018,9 @@ public unsafe void HandleEvents(Interop.Sys.SocketEvents events) } AsyncOperation? receiveOperation = - (events & Interop.Sys.SocketEvents.Read) != 0 ? _receiveQueue.HandleEvent(this) : null; + (events & Interop.Sys.SocketEvents.Read) != 0 ? _receiveQueue.ProcessSyncEventOrGetAsyncEvent(this) : null; AsyncOperation? sendOperation = - (events & Interop.Sys.SocketEvents.Write) != 0 ? _sendQueue.HandleEvent(this) : null; + (events & Interop.Sys.SocketEvents.Write) != 0 ? _sendQueue.ProcessSyncEventOrGetAsyncEvent(this) : null; // This method is called from a thread pool thread. When we have only one operation to process, process it // synchronously to avoid an extra thread pool work item. When we have two operations to process, processing both @@ -1965,12 +2028,12 @@ public unsafe void HandleEvents(Interop.Sys.SocketEvents events) // synchronously. There might be better ways of doing this. if (sendOperation == null) { - receiveOperation?.Dispatch(processAsyncOperationSynchronously: true); + receiveOperation?.Process(); } else { - receiveOperation?.Dispatch(processAsyncOperationSynchronously: false); - sendOperation.Dispatch(processAsyncOperationSynchronously: true); + receiveOperation?.Schedule(); + sendOperation.Process(); } } diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 51e0b8674f9aa8..781f101f7d4c1b 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -321,11 +321,13 @@ private void EventLoop() try { bool shutdown = false; + Interop.Sys.SocketEvent* buffer = _buffer; + ConcurrentDictionary handleToContextMap = _handleToContextMap; ConcurrentQueue eventQueue = _eventQueue; while (!shutdown) { int numEvents = EventBufferCount; - Interop.Error err = Interop.Sys.WaitForSocketEvents(_port, _buffer, &numEvents); + Interop.Error err = Interop.Sys.WaitForSocketEvents(_port, buffer, &numEvents); if (err != Interop.Error.SUCCESS) { throw new InternalException(err); @@ -337,7 +339,7 @@ private void EventLoop() bool enqueuedEvent = false; for (int i = 0; i < numEvents; i++) { - IntPtr handle = _buffer[i].Data; + IntPtr handle = buffer[i].Data; if (handle == ShutdownHandle) { shutdown = true; @@ -345,8 +347,17 @@ private void EventLoop() else { Debug.Assert(handle.ToInt64() < MaxHandles.ToInt64(), $"Unexpected values: handle={handle}, MaxHandles={MaxHandles}"); - eventQueue.Enqueue(new SocketIOEvent(handle, _buffer[i].Events)); - enqueuedEvent = true; + handleToContextMap.TryGetValue(handle, out SocketAsyncContext? context); + if (context != null) + { + Interop.Sys.SocketEvents events = buffer[i].Events; + events = context.HandleSyncEventsSpeculatively(events); + if (events != Interop.Sys.SocketEvents.None) + { + eventQueue.Enqueue(new SocketIOEvent(context, events)); + enqueuedEvent = true; + } + } } } @@ -384,16 +395,9 @@ void IThreadPoolWorkItem.Execute() // the last thread processing events, it must see the event queued by the enqueuer. Interlocked.Exchange(ref _eventQueueProcessingRequested, 0); - ConcurrentDictionary handleToContextMap = _handleToContextMap; ConcurrentQueue eventQueue = _eventQueue; while (eventQueue.TryDequeue(out SocketIOEvent ev)) { - handleToContextMap.TryGetValue(ev.Handle, out SocketAsyncContext? context); - if (context == null) - { - continue; - } - // An event was successfully dequeued, and as there may be more events to process, speculatively schedule a work // item to parallelize processing of events. Since this is only for additional parallelization, doing so // speculatively is ok. @@ -402,7 +406,7 @@ void IThreadPoolWorkItem.Execute() ScheduleToProcessEvents(); } - context.HandleEvents(ev.Events); + ev.Context.HandleEvents(ev.Events); } } @@ -448,12 +452,12 @@ private bool TryRegister(SafeSocketHandle socket, IntPtr handle, out Interop.Err private readonly struct SocketIOEvent { - public IntPtr Handle { get; } + public SocketAsyncContext Context { get; } public Interop.Sys.SocketEvents Events { get; } - public SocketIOEvent(IntPtr handle, Interop.Sys.SocketEvents events) + public SocketIOEvent(SocketAsyncContext context, Interop.Sys.SocketEvents events) { - Handle = handle; + Context = context; Events = events; } } From 6f414e86d7cb7fcb94fc61a62b26b7c87af2dbbe Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Fri, 24 Apr 2020 20:05:41 -0700 Subject: [PATCH 08/13] Prevent event scheduling threads from becoming long-running --- .../src/System/Net/Sockets/SocketAsyncEngine.Unix.cs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 781f101f7d4c1b..31900a10115ffd 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -395,6 +395,7 @@ void IThreadPoolWorkItem.Execute() // the last thread processing events, it must see the event queued by the enqueuer. Interlocked.Exchange(ref _eventQueueProcessingRequested, 0); + int startTimeMs = Environment.TickCount; ConcurrentQueue eventQueue = _eventQueue; while (eventQueue.TryDequeue(out SocketIOEvent ev)) { @@ -407,6 +408,13 @@ void IThreadPoolWorkItem.Execute() } ev.Context.HandleEvents(ev.Events); + + if (Environment.TickCount - startTimeMs >= 15) + { + // Yield the thread to allow the thread pool to run other work items + ScheduleToProcessEvents(); + return; + } } } From 7ac0395186d2f6438aecceb9a396e78a35ad1e65 Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Sat, 25 Apr 2020 10:25:07 -0700 Subject: [PATCH 09/13] Non-speculatively schedule a work item to process epoll events upon first dequeue, delegating scheduling of more work items to other threads --- .../Net/Sockets/SocketAsyncEngine.Unix.cs | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 31900a10115ffd..2a4780f1463fb1 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -395,27 +395,35 @@ void IThreadPoolWorkItem.Execute() // the last thread processing events, it must see the event queued by the enqueuer. Interlocked.Exchange(ref _eventQueueProcessingRequested, 0); - int startTimeMs = Environment.TickCount; ConcurrentQueue eventQueue = _eventQueue; - while (eventQueue.TryDequeue(out SocketIOEvent ev)) + int startTimeMs = Environment.TickCount; + if (!eventQueue.TryDequeue(out SocketIOEvent ev)) { - // An event was successfully dequeued, and as there may be more events to process, speculatively schedule a work - // item to parallelize processing of events. Since this is only for additional parallelization, doing so - // speculatively is ok. - if (_eventQueueProcessingRequested == 0) - { - ScheduleToProcessEvents(); - } + return; + } + + // An event was successfully dequeued, and as there may be more events to process, speculatively schedule a work + // item to parallelize processing of events. Since this is only for additional parallelization, doing so + // speculatively is ok. + ScheduleToProcessEvents(); + while (true) + { ev.Context.HandleEvents(ev.Events); if (Environment.TickCount - startTimeMs >= 15) { - // Yield the thread to allow the thread pool to run other work items - ScheduleToProcessEvents(); + break; + } + + if (!eventQueue.TryDequeue(out ev)) + { return; } } + + // Yield the thread to allow the thread pool to run other work items + ScheduleToProcessEvents(); } private void RequestEventLoopShutdown() From acd9c4ddf88923a176b2f64b6eb59afd8ccd315b Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Sat, 25 Apr 2020 10:44:30 -0700 Subject: [PATCH 10/13] Update comment --- .../src/System/Net/Sockets/SocketAsyncEngine.Unix.cs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 2a4780f1463fb1..e1ee9a540392fc 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -402,9 +402,11 @@ void IThreadPoolWorkItem.Execute() return; } - // An event was successfully dequeued, and as there may be more events to process, speculatively schedule a work - // item to parallelize processing of events. Since this is only for additional parallelization, doing so - // speculatively is ok. + // An event was successfully dequeued, and there may be more events to process. Schedule a work item to parallelize + // processing of events, before processing more events. Following this, it is the responsibility of the new work + // item and the epoll thread to schedule more work items as necessary. The parallelization may be necessary here if + // the user callback as part of handling the event blocks for some reason that may have a dependency on other queued + // socket events. ScheduleToProcessEvents(); while (true) From aea8e266ae8950c9b205438aa76d7af9d55f4f92 Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Sat, 25 Apr 2020 14:01:15 -0700 Subject: [PATCH 11/13] Fix test --- .../System/Net/Sockets/SocketAsyncEngine.Unix.cs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index e1ee9a540392fc..2bef3d2a13e8a0 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -354,9 +354,20 @@ private void EventLoop() events = context.HandleSyncEventsSpeculatively(events); if (events != Interop.Sys.SocketEvents.None) { - eventQueue.Enqueue(new SocketIOEvent(context, events)); + var ev = new SocketIOEvent(context, events); + eventQueue.Enqueue(ev); enqueuedEvent = true; + + // This is necessary when the JIT generates unoptimized code (debug builds, live debugging, + // quick JIT, etc.) to ensure that the context does not remain referenced by this method, as + // such code may keep the stack location live for longer than necessary + ev = default; } + + // This is necessary when the JIT generates unoptimized code (debug builds, live debugging, + // quick JIT, etc.) to ensure that the context does not remain referenced by this method, as + // such code may keep the stack location live for longer than necessary + context = null; } } } From 079e9626eb3c2bdb22994ac81341546653e3f241 Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Thu, 30 Apr 2020 08:15:24 -0700 Subject: [PATCH 12/13] Address feedback --- .../src/System/Net/Sockets/SocketAsyncContext.Unix.cs | 7 ++++++- .../src/System/Net/Sockets/SocketAsyncEngine.Unix.cs | 10 ++++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index 32d9a983c7fffc..a268bfd6557ea4 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -1980,7 +1980,12 @@ public SocketError SendFileAsync(SafeFileHandle fileHandle, long offset, long co // Called on the epoll thread, speculatively tries to process synchronous events and errors for synchronous events, and // returns any remaining events that remain to be processed. Taking a lock for each operation queue to deterministically - // handle synchronous events on the epoll thread seems to significantly reduce throughput in benchmarks. + // handle synchronous events on the epoll thread seems to significantly reduce throughput in benchmarks. On the other + // hand, the speculative checks make it nondeterministic, where it would be possible for the epoll thread to think that + // the next operation in a queue is not synchronous when it is (due to a race, old caches, etc.) and cause the event to + // be scheduled instead. It's not functionally incorrect to schedule the release of a synchronous operation, just it may + // lead to thread pool starvation issues if the synchronous operations are blocking thread pool threads (typically not + // advised) and more threads are not immediately available to run work items that would release those operations. public unsafe Interop.Sys.SocketEvents HandleSyncEventsSpeculatively(Interop.Sys.SocketEvents events) { if ((events & Interop.Sys.SocketEvents.Error) != 0) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 2bef3d2a13e8a0..03a0e570b6d198 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -407,12 +407,13 @@ void IThreadPoolWorkItem.Execute() Interlocked.Exchange(ref _eventQueueProcessingRequested, 0); ConcurrentQueue eventQueue = _eventQueue; - int startTimeMs = Environment.TickCount; if (!eventQueue.TryDequeue(out SocketIOEvent ev)) { return; } + int startTimeMs = Environment.TickCount; + // An event was successfully dequeued, and there may be more events to process. Schedule a work item to parallelize // processing of events, before processing more events. Following this, it is the responsibility of the new work // item and the epoll thread to schedule more work items as necessary. The parallelization may be necessary here if @@ -424,6 +425,11 @@ void IThreadPoolWorkItem.Execute() { ev.Context.HandleEvents(ev.Events); + // If there is a constant stream of new events, and/or if user callbacks take long to process an event, this + // work item may run for a long time. If work items of this type are using up all of the thread pool threads, + // collectively they may starve other types of work items from running. Before dequeuing and processing another + // event, check the elapsed time since the start of the work item and yield the thread after some time has + // elapsed to allow the thread pool to run other work items. if (Environment.TickCount - startTimeMs >= 15) { break; @@ -435,7 +441,7 @@ void IThreadPoolWorkItem.Execute() } } - // Yield the thread to allow the thread pool to run other work items + // The queue was not observed to be empty, schedule another work item before yielding the thread ScheduleToProcessEvents(); } From 99fc37bed8c0daed596f83d026e661d76097ebab Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Sat, 2 May 2020 15:17:45 -0700 Subject: [PATCH 13/13] Update comment --- .../src/System/Net/Sockets/SocketAsyncEngine.Unix.cs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 03a0e570b6d198..7c47c42c5caaaa 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -430,6 +430,13 @@ void IThreadPoolWorkItem.Execute() // collectively they may starve other types of work items from running. Before dequeuing and processing another // event, check the elapsed time since the start of the work item and yield the thread after some time has // elapsed to allow the thread pool to run other work items. + // + // The threshold chosen below was based on trying various thresholds and in trying to keep the latency of + // running another work item low when these work items are using up all of the thread pool worker threads. In + // such cases, the latency would be something like threshold / proc count. Smaller thresholds were tried and + // using Stopwatch instead (like 1 ms, 5 ms, etc.), from quick tests they appeared to have a slightly greater + // impact on throughput compared to the threshold chosen below, though it is slight enough that it may not + // matter much. Higher thresholds didn't seem to have any noticeable effect. if (Environment.TickCount - startTimeMs >= 15) { break;