diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index beb4cc088839d8..a268bfd6557ea4 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -274,11 +274,21 @@ public void Dispatch() } else { - // Async operation. Process the IO on the threadpool. - ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); + // Async operation. + Schedule(); } } + public void Schedule() + { + Debug.Assert(Event == null); + + // Async operation. Process the IO on the threadpool. + ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); + } + + public void Process() => ((IThreadPoolWorkItem)this).Execute(); + void IThreadPoolWorkItem.Execute() { // ReadOperation and WriteOperation, the only two types derived from @@ -706,6 +716,7 @@ private enum QueueState : byte // These fields define the queue state. private QueueState _state; // See above + private bool _isNextOperationSynchronous; private int _sequenceNumber; // This sequence number is updated when we receive an epoll notification. // It allows us to detect when a new epoll notification has arrived // since the last time we checked the state of the queue. @@ -720,6 +731,8 @@ private enum QueueState : byte private LockToken Lock() => new LockToken(_queueLock); + public bool IsNextOperationSynchronous_Speculative => _isNextOperationSynchronous; + public void Init() { Debug.Assert(_queueLock == null); @@ -779,7 +792,12 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation // Enqueue the operation. Debug.Assert(operation.Next == operation, "Expected operation.Next == operation"); - if (_tail != null) + if (_tail == null) + { + Debug.Assert(!_isNextOperationSynchronous); + _isNextOperationSynchronous = operation.Event != null; + } + else { operation.Next = _tail.Next; _tail.Next = operation; @@ -825,8 +843,7 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation } } - // Called on the epoll thread whenever we receive an epoll notification. - public void HandleEvent(SocketAsyncContext context) + public AsyncOperation? ProcessSyncEventOrGetAsyncEvent(SocketAsyncContext context) { AsyncOperation op; using (Lock()) @@ -839,7 +856,7 @@ public void HandleEvent(SocketAsyncContext context) Debug.Assert(_tail == null, "State == Ready but queue is not empty!"); _sequenceNumber++; Trace(context, $"Exit (previously ready)"); - return; + return null; case QueueState.Waiting: Debug.Assert(_tail != null, "State == Waiting but queue is empty!"); @@ -852,21 +869,31 @@ public void HandleEvent(SocketAsyncContext context) Debug.Assert(_tail != null, "State == Processing but queue is empty!"); _sequenceNumber++; Trace(context, $"Exit (currently processing)"); - return; + return null; case QueueState.Stopped: Debug.Assert(_tail == null); Trace(context, $"Exit (stopped)"); - return; + return null; default: Environment.FailFast("unexpected queue state"); - return; + return null; } } - // Dispatch the op so we can try to process it. - op.Dispatch(); + ManualResetEventSlim? e = op.Event; + if (e != null) + { + // Sync operation. Signal waiting thread to continue processing. + e.Set(); + return null; + } + else + { + // Async operation. The caller will figure out how to process the IO. + return op; + } } internal void ProcessAsyncOperation(TOperation op) @@ -991,6 +1018,7 @@ public OperationResult ProcessQueuedOperation(TOperation op) { // No more operations to process _tail = null; + _isNextOperationSynchronous = false; _state = QueueState.Ready; _sequenceNumber++; Trace(context, $"Exit (finished queue)"); @@ -999,6 +1027,7 @@ public OperationResult ProcessQueuedOperation(TOperation op) { // Pop current operation and advance to next nextOp = _tail.Next = op.Next; + _isNextOperationSynchronous = nextOp.Event != null; } } } @@ -1033,11 +1062,13 @@ public void CancelAndContinueProcessing(TOperation op) { // No more operations _tail = null; + _isNextOperationSynchronous = false; } else { // Pop current operation and advance to next _tail.Next = op.Next; + _isNextOperationSynchronous = op.Next.Event != null; } // We're the first op in the queue. @@ -1112,6 +1143,7 @@ public bool StopAndAbort(SocketAsyncContext context) } _tail = null; + _isNextOperationSynchronous = false; Trace(context, $"Exit"); } @@ -1946,6 +1978,41 @@ public SocketError SendFileAsync(SafeFileHandle fileHandle, long offset, long co return SocketError.IOPending; } + // Called on the epoll thread, speculatively tries to process synchronous events and errors for synchronous events, and + // returns any remaining events that remain to be processed. Taking a lock for each operation queue to deterministically + // handle synchronous events on the epoll thread seems to significantly reduce throughput in benchmarks. On the other + // hand, the speculative checks make it nondeterministic, where it would be possible for the epoll thread to think that + // the next operation in a queue is not synchronous when it is (due to a race, old caches, etc.) and cause the event to + // be scheduled instead. It's not functionally incorrect to schedule the release of a synchronous operation, just it may + // lead to thread pool starvation issues if the synchronous operations are blocking thread pool threads (typically not + // advised) and more threads are not immediately available to run work items that would release those operations. + public unsafe Interop.Sys.SocketEvents HandleSyncEventsSpeculatively(Interop.Sys.SocketEvents events) + { + if ((events & Interop.Sys.SocketEvents.Error) != 0) + { + // Set the Read and Write flags; the processing for these events + // will pick up the error. + events ^= Interop.Sys.SocketEvents.Error; + events |= Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write; + } + + if ((events & Interop.Sys.SocketEvents.Read) != 0 && + _receiveQueue.IsNextOperationSynchronous_Speculative && + _receiveQueue.ProcessSyncEventOrGetAsyncEvent(this) == null) + { + events ^= Interop.Sys.SocketEvents.Read; + } + + if ((events & Interop.Sys.SocketEvents.Write) != 0 && + _sendQueue.IsNextOperationSynchronous_Speculative && + _sendQueue.ProcessSyncEventOrGetAsyncEvent(this) == null) + { + events ^= Interop.Sys.SocketEvents.Write; + } + + return events; + } + public unsafe void HandleEvents(Interop.Sys.SocketEvents events) { if ((events & Interop.Sys.SocketEvents.Error) != 0) @@ -1955,14 +2022,23 @@ public unsafe void HandleEvents(Interop.Sys.SocketEvents events) events |= Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write; } - if ((events & Interop.Sys.SocketEvents.Read) != 0) + AsyncOperation? receiveOperation = + (events & Interop.Sys.SocketEvents.Read) != 0 ? _receiveQueue.ProcessSyncEventOrGetAsyncEvent(this) : null; + AsyncOperation? sendOperation = + (events & Interop.Sys.SocketEvents.Write) != 0 ? _sendQueue.ProcessSyncEventOrGetAsyncEvent(this) : null; + + // This method is called from a thread pool thread. When we have only one operation to process, process it + // synchronously to avoid an extra thread pool work item. When we have two operations to process, processing both + // synchronously may delay the second operation, so schedule one onto the thread pool and process the other + // synchronously. There might be better ways of doing this. + if (sendOperation == null) { - _receiveQueue.HandleEvent(this); + receiveOperation?.Process(); } - - if ((events & Interop.Sys.SocketEvents.Write) != 0) + else { - _sendQueue.HandleEvent(this); + receiveOperation?.Schedule(); + sendOperation.Process(); } } diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 9d5f48b7045e37..7c47c42c5caaaa 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -5,13 +5,14 @@ using System; using System.Collections.Concurrent; using System.Diagnostics; +using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; namespace System.Net.Sockets { - internal sealed unsafe class SocketAsyncEngine + internal sealed unsafe class SocketAsyncEngine : IThreadPoolWorkItem { // // Encapsulates a particular SocketAsyncContext object's access to a SocketAsyncEngine. @@ -130,6 +131,18 @@ public bool TryRegister(SafeSocketHandle socket, out Interop.Error error) // private readonly ConcurrentDictionary _handleToContextMap = new ConcurrentDictionary(); + // + // Queue of events generated by EventLoop() that would be processed by the thread pool + // + private readonly ConcurrentQueue _eventQueue = new ConcurrentQueue(); + + // + // This field is set to 1 to indicate that a thread pool work item is scheduled to process events in _eventQueue. It is + // set to 0 when the scheduled work item starts running, to indicate that a thread pool work item to process events is + // not scheduled. Changes are protected by atomic operations as appropriate. + // + private int _eventQueueProcessingRequested; + // // True if we've reached the handle value limit for this event port, and thus must allocate a new event port // on the next handle allocation. @@ -308,10 +321,13 @@ private void EventLoop() try { bool shutdown = false; + Interop.Sys.SocketEvent* buffer = _buffer; + ConcurrentDictionary handleToContextMap = _handleToContextMap; + ConcurrentQueue eventQueue = _eventQueue; while (!shutdown) { int numEvents = EventBufferCount; - Interop.Error err = Interop.Sys.WaitForSocketEvents(_port, _buffer, &numEvents); + Interop.Error err = Interop.Sys.WaitForSocketEvents(_port, buffer, &numEvents); if (err != Interop.Error.SUCCESS) { throw new InternalException(err); @@ -320,9 +336,10 @@ private void EventLoop() // The native shim is responsible for ensuring this condition. Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}"); + bool enqueuedEvent = false; for (int i = 0; i < numEvents; i++) { - IntPtr handle = _buffer[i].Data; + IntPtr handle = buffer[i].Data; if (handle == ShutdownHandle) { shutdown = true; @@ -330,14 +347,35 @@ private void EventLoop() else { Debug.Assert(handle.ToInt64() < MaxHandles.ToInt64(), $"Unexpected values: handle={handle}, MaxHandles={MaxHandles}"); - _handleToContextMap.TryGetValue(handle, out SocketAsyncContext? context); + handleToContextMap.TryGetValue(handle, out SocketAsyncContext? context); if (context != null) { - context.HandleEvents(_buffer[i].Events); + Interop.Sys.SocketEvents events = buffer[i].Events; + events = context.HandleSyncEventsSpeculatively(events); + if (events != Interop.Sys.SocketEvents.None) + { + var ev = new SocketIOEvent(context, events); + eventQueue.Enqueue(ev); + enqueuedEvent = true; + + // This is necessary when the JIT generates unoptimized code (debug builds, live debugging, + // quick JIT, etc.) to ensure that the context does not remain referenced by this method, as + // such code may keep the stack location live for longer than necessary + ev = default; + } + + // This is necessary when the JIT generates unoptimized code (debug builds, live debugging, + // quick JIT, etc.) to ensure that the context does not remain referenced by this method, as + // such code may keep the stack location live for longer than necessary context = null; } } } + + if (enqueuedEvent) + { + ScheduleToProcessEvents(); + } } FreeNativeResources(); @@ -348,6 +386,72 @@ private void EventLoop() } } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void ScheduleToProcessEvents() + { + // Schedule a thread pool work item to process events. Only one work item is scheduled at any given time to avoid + // over-parallelization. When the work item begins running, this field is reset to 0, allowing for another work item + // to be scheduled for parallelizing processing of events. + if (Interlocked.CompareExchange(ref _eventQueueProcessingRequested, 1, 0) == 0) + { + ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); + } + } + + void IThreadPoolWorkItem.Execute() + { + // Indicate that a work item is no longer scheduled to process events. The change needs to be visible to enqueuer + // threads (only for EventLoop() currently) before an event is attempted to be dequeued. In particular, if an + // enqueuer queues an event and does not schedule a work item because it is already scheduled, and this thread is + // the last thread processing events, it must see the event queued by the enqueuer. + Interlocked.Exchange(ref _eventQueueProcessingRequested, 0); + + ConcurrentQueue eventQueue = _eventQueue; + if (!eventQueue.TryDequeue(out SocketIOEvent ev)) + { + return; + } + + int startTimeMs = Environment.TickCount; + + // An event was successfully dequeued, and there may be more events to process. Schedule a work item to parallelize + // processing of events, before processing more events. Following this, it is the responsibility of the new work + // item and the epoll thread to schedule more work items as necessary. The parallelization may be necessary here if + // the user callback as part of handling the event blocks for some reason that may have a dependency on other queued + // socket events. + ScheduleToProcessEvents(); + + while (true) + { + ev.Context.HandleEvents(ev.Events); + + // If there is a constant stream of new events, and/or if user callbacks take long to process an event, this + // work item may run for a long time. If work items of this type are using up all of the thread pool threads, + // collectively they may starve other types of work items from running. Before dequeuing and processing another + // event, check the elapsed time since the start of the work item and yield the thread after some time has + // elapsed to allow the thread pool to run other work items. + // + // The threshold chosen below was based on trying various thresholds and in trying to keep the latency of + // running another work item low when these work items are using up all of the thread pool worker threads. In + // such cases, the latency would be something like threshold / proc count. Smaller thresholds were tried and + // using Stopwatch instead (like 1 ms, 5 ms, etc.), from quick tests they appeared to have a slightly greater + // impact on throughput compared to the threshold chosen below, though it is slight enough that it may not + // matter much. Higher thresholds didn't seem to have any noticeable effect. + if (Environment.TickCount - startTimeMs >= 15) + { + break; + } + + if (!eventQueue.TryDequeue(out ev)) + { + return; + } + } + + // The queue was not observed to be empty, schedule another work item before yielding the thread + ScheduleToProcessEvents(); + } + private void RequestEventLoopShutdown() { // @@ -387,5 +491,17 @@ private bool TryRegister(SafeSocketHandle socket, IntPtr handle, out Interop.Err Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, handle); return error == Interop.Error.SUCCESS; } + + private readonly struct SocketIOEvent + { + public SocketAsyncContext Context { get; } + public Interop.Sys.SocketEvents Events { get; } + + public SocketIOEvent(SocketAsyncContext context, Interop.Sys.SocketEvents events) + { + Context = context; + Events = events; + } + } } }