diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index a0479bfc336298..f39efe9115e6dd 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -1175,7 +1175,7 @@ public void Trace(SocketAsyncContext context, string message, [CallerMemberName] private readonly SafeSocketHandle _socket; private OperationQueue _receiveQueue; private OperationQueue _sendQueue; - private SocketAsyncEngine.Token _asyncEngineToken; + private SocketAsyncEngine.Token? _asyncEngineToken; private bool _registered; private bool _nonBlockingSet; @@ -1184,6 +1184,7 @@ public void Trace(SocketAsyncContext context, string message, [CallerMemberName] public SocketAsyncContext(SafeSocketHandle socket) { _socket = socket; + _asyncEngineToken = null; _receiveQueue.Init(); _sendQueue.Init(); @@ -1196,13 +1197,12 @@ private void Register() { if (!_registered) { - Debug.Assert(!_asyncEngineToken.WasAllocated); - var token = new SocketAsyncEngine.Token(this); + Debug.Assert(_asyncEngineToken == null); + var token = new SocketAsyncEngine.Token(this, _socket); Interop.Error errorCode; - if (!token.TryRegister(_socket, out errorCode)) + if (!token.TryRegister(out errorCode)) { - token.Free(); if (errorCode == Interop.Error.ENOMEM || errorCode == Interop.Error.ENOSPC) { throw new OutOfMemoryException(); @@ -1233,7 +1233,8 @@ public bool StopAndAbort() { // Freeing the token will prevent any future event delivery. This socket will be unregistered // from the event port automatically by the OS when it's closed. - _asyncEngineToken.Free(); + _asyncEngineToken?.Free(); + _asyncEngineToken = null; } return aborted; diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 053cc4fcb6f77c..30ebc432012655 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -12,49 +12,68 @@ namespace System.Net.Sockets { internal sealed unsafe class SocketAsyncEngine : IThreadPoolWorkItem { - // // Encapsulates a particular SocketAsyncContext object's access to a SocketAsyncEngine. - // - public readonly struct Token + internal class Token { - private readonly SocketAsyncEngine? _engine; - private readonly IntPtr _handle; + private readonly SocketAsyncContext _context; + private readonly SafeSocketHandle _socket; + private readonly SocketAsyncEngine _engine; - public Token(SocketAsyncContext context) + internal Token(SocketAsyncContext context, SafeSocketHandle socket) { - AllocateToken(context, out _engine, out _handle); + _context = context; + _socket = socket; + _engine = GetSocketAsyncEngine(); } - public bool WasAllocated + internal bool TryRegister(out Interop.Error error) { - get { return _engine != null; } - } + IntPtr socketFileDescriptor = _socket.DangerousGetHandle(); - public void Free() - { - if (WasAllocated) + // add context to the map first to make sure that the very first epoll|kqueue notification + // can be handled as soon as we receive it + _engine.AddToMap(socketFileDescriptor, _context); + + bool result = _engine.TryRegister(_socket, out error); + + if (!result) { - _engine!.FreeHandle(_handle); + _engine.RemoveFromMap(socketFileDescriptor); } + + return result; } - public bool TryRegister(SafeSocketHandle socket, out Interop.Error error) + internal void Free() { - Debug.Assert(WasAllocated, "Expected WasAllocated to be true"); - return _engine!.TryRegister(socket, _handle, out error); + // remove context from the map to make sure that we stop handling notifications first + IntPtr socketFileDescriptor = _socket.DangerousGetHandle(); + _engine.RemoveFromMap(socketFileDescriptor); + + _engine.TryUnregister(_socket, out Interop.Error error); + Debug.Assert(error == Interop.Error.SUCCESS, "Unregister should always succeed"); } } - private const int EventBufferCount = -#if DEBUG - 32; -#else - 1024; -#endif + private const int EventBufferCount = 1024; + + private static readonly SocketAsyncEngine[] s_engines = CreateEngines(); + + private static int s_previousEngineIndex = -1; - private static readonly object s_lock = new object(); + private static SocketAsyncEngine[] CreateEngines() + { + int engineCount = GetEngineCount(); + + var engines = new SocketAsyncEngine[engineCount]; + + for (int i = 0; i < engineCount; i++) + { + engines[i] = new SocketAsyncEngine(); + } - private static readonly int s_maxEngineCount = GetEngineCount(); + return engines; + } private static int GetEngineCount() { @@ -83,150 +102,26 @@ private static int GetEngineCount() return Math.Max(1, (int)Math.Round(Environment.ProcessorCount / (double)coresPerEngine)); } - // - // The current engines. We replace an engine when it runs out of "handle" values. - // Must be accessed under s_lock. - // - private static readonly SocketAsyncEngine?[] s_currentEngines = new SocketAsyncEngine?[s_maxEngineCount]; - private static int s_allocateFromEngine = 0; + private static SocketAsyncEngine GetSocketAsyncEngine() => s_engines[Interlocked.Increment(ref s_previousEngineIndex) % s_engines.Length]; private readonly IntPtr _port; private readonly Interop.Sys.SocketEvent* _buffer; - // - // The read and write ends of a native pipe, used to signal that this instance's event loop should stop - // processing events. - // + // The read and write ends of a native pipe, used to signal that this instance's event loop should stop processing events. private readonly int _shutdownReadPipe; private readonly int _shutdownWritePipe; - // - // Each SocketAsyncContext is associated with a particular "handle" value, used to identify that - // SocketAsyncContext when events are raised. These handle values are never reused, because we do not have - // a way to ensure that we will never see an event for a socket/handle that has been freed. Instead, we - // allocate monotonically increasing handle values up to some limit; when we would exceed that limit, - // we allocate a new SocketAsyncEngine (and thus a new event port) and start the handle values over at zero. - // Thus we can uniquely identify a given SocketAsyncContext by the *pair* {SocketAsyncEngine, handle}, - // and avoid any issues with misidentifying the target of an event we read from the port. - // -#if DEBUG - // - // In debug builds, force rollover to new SocketAsyncEngine instances so that code doesn't go untested, since - // it's very unlikely that the "real" limits will ever be reached in test code. - // - private static readonly IntPtr MaxHandles = (IntPtr)(EventBufferCount * 2); -#else - // - // In release builds, we use *very* high limits. No 64-bit process running on release builds should ever - // reach the handle limit for a single event port, and even 32-bit processes should see this only very rarely. - // - private static readonly IntPtr MaxHandles = IntPtr.Size == 4 ? (IntPtr)int.MaxValue : (IntPtr)long.MaxValue; -#endif - // - // Sentinel handle value to identify events from the "shutdown pipe," used to signal an event loop to stop - // processing events. - // - private static readonly IntPtr ShutdownHandle = (IntPtr)(-1); - - // - // The next handle value to be allocated for this event port. - // Must be accessed under s_lock. - // - private IntPtr _nextHandle; - - // - // Maps handle values to SocketAsyncContext instances. - // + // Maps socket file descriptors to SocketAsyncContext instances. private readonly ConcurrentDictionary _handleToContextMap = new ConcurrentDictionary(); - // // Queue of events generated by EventLoop() that would be processed by the thread pool - // private readonly ConcurrentQueue _eventQueue = new ConcurrentQueue(); - // // This field is set to 1 to indicate that a thread pool work item is scheduled to process events in _eventQueue. It is // set to 0 when the scheduled work item starts running, to indicate that a thread pool work item to process events is // not scheduled. Changes are protected by atomic operations as appropriate. - // private int _eventQueueProcessingRequested; - // - // True if we've reached the handle value limit for this event port, and thus must allocate a new event port - // on the next handle allocation. - // - private bool IsFull { get { return _nextHandle == MaxHandles; } } - - // - // Allocates a new {SocketAsyncEngine, handle} pair. - // - private static void AllocateToken(SocketAsyncContext context, out SocketAsyncEngine? engine, out IntPtr handle) - { - lock (s_lock) - { - engine = s_currentEngines[s_allocateFromEngine]; - if (engine == null) - { - s_currentEngines[s_allocateFromEngine] = engine = new SocketAsyncEngine(); - } - - handle = engine.AllocateHandle(context); - - if (engine.IsFull) - { - // We'll need to create a new event port for the next handle. - s_currentEngines[s_allocateFromEngine] = null; - } - - // Round-robin to the next engine once we have sufficient sockets on this one. - s_allocateFromEngine = (s_allocateFromEngine + 1) % s_maxEngineCount; - } - } - - private IntPtr AllocateHandle(SocketAsyncContext context) - { - Debug.Assert(Monitor.IsEntered(s_lock), "Expected s_lock to be held"); - Debug.Assert(!IsFull, "Expected !IsFull"); - - IntPtr handle = _nextHandle; - Debug.Assert(handle != ShutdownHandle, "ShutdownHandle must not be added to the dictionary"); - bool added = _handleToContextMap.TryAdd(handle, new SocketAsyncContextWrapper(context)); - Debug.Assert(added, "Add should always succeed"); - _nextHandle = IntPtr.Add(_nextHandle, 1); - - return handle; - } - - private void FreeHandle(IntPtr handle) - { - Debug.Assert(handle != ShutdownHandle, $"Expected handle != ShutdownHandle: {handle}"); - - bool shutdownNeeded = false; - - lock (s_lock) - { - if (_handleToContextMap.TryRemove(handle, out _)) - { - // - // If we've allocated all possible handles for this instance, and freed them all, then - // we don't need the event loop any more, and can reclaim resources. - // - if (IsFull && _handleToContextMap.IsEmpty) - { - shutdownNeeded = true; - } - } - } - - // - // Signal shutdown outside of the lock to reduce contention. - // - if (shutdownNeeded) - { - RequestEventLoopShutdown(); - } - } - private SocketAsyncEngine() { _port = (IntPtr)(-1); @@ -261,7 +156,7 @@ private SocketAsyncEngine() _shutdownReadPipe = pipeFds[Interop.Sys.ReadEndOfPipe]; _shutdownWritePipe = pipeFds[Interop.Sys.WriteEndOfPipe]; - err = Interop.Sys.TryChangeSocketEventRegistration(_port, (IntPtr)_shutdownReadPipe, Interop.Sys.SocketEvents.None, Interop.Sys.SocketEvents.Read, ShutdownHandle); + err = Interop.Sys.TryChangeSocketEventRegistration(_port, (IntPtr)_shutdownReadPipe, Interop.Sys.SocketEvents.None, Interop.Sys.SocketEvents.Read, (IntPtr)_shutdownReadPipe); if (err != Interop.Error.SUCCESS) { throw new InternalException(err); @@ -289,6 +184,22 @@ private SocketAsyncEngine() } } + private void AddToMap(IntPtr socketFileDescriptor, SocketAsyncContext context) + { + Debug.Assert(socketFileDescriptor != (IntPtr)_shutdownReadPipe, "ShutdownHandle must not be added to the dictionary"); + + bool added = _handleToContextMap.TryAdd(socketFileDescriptor, new SocketAsyncContextWrapper(context)); + Debug.Assert(added, "TryAdd should always succeed"); + } + + private void RemoveFromMap(IntPtr socketFileDescriptor) + { + Debug.Assert(socketFileDescriptor != (IntPtr)_shutdownReadPipe, "ShutdownHandle must not be added to the dictionary"); + + bool removed = _handleToContextMap.TryRemove(socketFileDescriptor, out _); + Debug.Assert(removed, "TryRemove should always succeed"); + } + private void EventLoop() { try @@ -297,7 +208,7 @@ private void EventLoop() Interop.Sys.SocketEvent* buffer = _buffer; ConcurrentDictionary handleToContextMap = _handleToContextMap; ConcurrentQueue eventQueue = _eventQueue; - IntPtr shutdownHandle = ShutdownHandle; + IntPtr shutdownHandle = (IntPtr)_shutdownReadPipe; SocketAsyncContext? context = null; while (!shutdown) { @@ -318,8 +229,6 @@ private void EventLoop() if (handleToContextMap.TryGetValue(handle, out SocketAsyncContextWrapper contextWrapper) && (context = contextWrapper.Context) != null) { - Debug.Assert(handle.ToInt64() < MaxHandles.ToInt64(), $"Unexpected values: handle={handle}, MaxHandles={MaxHandles}"); - Interop.Sys.SocketEvents events = context.HandleSyncEventsSpeculatively(socketEvent.Events); if (events != Interop.Sys.SocketEvents.None) { @@ -458,10 +367,18 @@ private void FreeNativeResources() } } - private bool TryRegister(SafeSocketHandle socket, IntPtr handle, out Interop.Error error) + private bool TryRegister(SafeSocketHandle socket, out Interop.Error error) { error = Interop.Sys.TryChangeSocketEventRegistration(_port, socket, Interop.Sys.SocketEvents.None, - Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, handle); + Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, socket.DangerousGetHandle()); + return error == Interop.Error.SUCCESS; + } + + private bool TryUnregister(SafeSocketHandle socket, out Interop.Error error) + { + error = Interop.Sys.TryChangeSocketEventRegistration(_port, socket, + Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, + Interop.Sys.SocketEvents.None, socket.DangerousGetHandle()); return error == Interop.Error.SUCCESS; }