Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,7 @@ public void Trace(SocketAsyncContext context, string message, [CallerMemberName]
private readonly SafeSocketHandle _socket;
private OperationQueue<ReadOperation> _receiveQueue;
private OperationQueue<WriteOperation> _sendQueue;
private SocketAsyncEngine.Token _asyncEngineToken;
private SocketAsyncEngine.Token? _asyncEngineToken;
private bool _registered;
private bool _nonBlockingSet;

Expand All @@ -1184,6 +1184,7 @@ public void Trace(SocketAsyncContext context, string message, [CallerMemberName]
public SocketAsyncContext(SafeSocketHandle socket)
{
_socket = socket;
_asyncEngineToken = null;

_receiveQueue.Init();
_sendQueue.Init();
Expand All @@ -1196,13 +1197,12 @@ private void Register()
{
if (!_registered)
{
Debug.Assert(!_asyncEngineToken.WasAllocated);
var token = new SocketAsyncEngine.Token(this);
Debug.Assert(_asyncEngineToken == null);
var token = new SocketAsyncEngine.Token(this, _socket);

Interop.Error errorCode;
if (!token.TryRegister(_socket, out errorCode))
if (!token.TryRegister(out errorCode))
{
token.Free();
if (errorCode == Interop.Error.ENOMEM || errorCode == Interop.Error.ENOSPC)
{
throw new OutOfMemoryException();
Expand Down Expand Up @@ -1233,7 +1233,8 @@ public bool StopAndAbort()
{
// Freeing the token will prevent any future event delivery. This socket will be unregistered
// from the event port automatically by the OS when it's closed.
_asyncEngineToken.Free();
_asyncEngineToken?.Free();
_asyncEngineToken = null;
}

return aborted;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,49 +12,68 @@ namespace System.Net.Sockets
{
internal sealed unsafe class SocketAsyncEngine : IThreadPoolWorkItem
{
//
// Encapsulates a particular SocketAsyncContext object's access to a SocketAsyncEngine.
//
public readonly struct Token
internal class Token
{
private readonly SocketAsyncEngine? _engine;
private readonly IntPtr _handle;
private readonly SocketAsyncContext _context;
private readonly SafeSocketHandle _socket;
private readonly SocketAsyncEngine _engine;

public Token(SocketAsyncContext context)
internal Token(SocketAsyncContext context, SafeSocketHandle socket)
{
AllocateToken(context, out _engine, out _handle);
_context = context;
_socket = socket;
_engine = GetSocketAsyncEngine();
}

public bool WasAllocated
internal bool TryRegister(out Interop.Error error)
{
get { return _engine != null; }
}
IntPtr socketFileDescriptor = _socket.DangerousGetHandle();

public void Free()
{
if (WasAllocated)
// add context to the map first to make sure that the very first epoll|kqueue notification
// can be handled as soon as we receive it
_engine.AddToMap(socketFileDescriptor, _context);

bool result = _engine.TryRegister(_socket, out error);

if (!result)
{
_engine!.FreeHandle(_handle);
_engine.RemoveFromMap(socketFileDescriptor);
}

return result;
}

public bool TryRegister(SafeSocketHandle socket, out Interop.Error error)
internal void Free()
{
Debug.Assert(WasAllocated, "Expected WasAllocated to be true");
return _engine!.TryRegister(socket, _handle, out error);
// remove context from the map to make sure that we stop handling notifications first
IntPtr socketFileDescriptor = _socket.DangerousGetHandle();
_engine.RemoveFromMap(socketFileDescriptor);

_engine.TryUnregister(_socket, out Interop.Error error);
Debug.Assert(error == Interop.Error.SUCCESS, "Unregister should always succeed");
}
}

private const int EventBufferCount =
#if DEBUG
32;
#else
1024;
#endif
private const int EventBufferCount = 1024;

private static readonly SocketAsyncEngine[] s_engines = CreateEngines();

private static int s_previousEngineIndex = -1;

private static readonly object s_lock = new object();
private static SocketAsyncEngine[] CreateEngines()
{
int engineCount = GetEngineCount();

var engines = new SocketAsyncEngine[engineCount];

for (int i = 0; i < engineCount; i++)
{
engines[i] = new SocketAsyncEngine();
}

private static readonly int s_maxEngineCount = GetEngineCount();
return engines;
}

private static int GetEngineCount()
{
Expand Down Expand Up @@ -83,150 +102,26 @@ private static int GetEngineCount()
return Math.Max(1, (int)Math.Round(Environment.ProcessorCount / (double)coresPerEngine));
}

//
// The current engines. We replace an engine when it runs out of "handle" values.
// Must be accessed under s_lock.
//
private static readonly SocketAsyncEngine?[] s_currentEngines = new SocketAsyncEngine?[s_maxEngineCount];
private static int s_allocateFromEngine = 0;
private static SocketAsyncEngine GetSocketAsyncEngine() => s_engines[Interlocked.Increment(ref s_previousEngineIndex) % s_engines.Length];

private readonly IntPtr _port;
private readonly Interop.Sys.SocketEvent* _buffer;

//
// The read and write ends of a native pipe, used to signal that this instance's event loop should stop
// processing events.
//
// The read and write ends of a native pipe, used to signal that this instance's event loop should stop processing events.
private readonly int _shutdownReadPipe;
private readonly int _shutdownWritePipe;

//
// Each SocketAsyncContext is associated with a particular "handle" value, used to identify that
// SocketAsyncContext when events are raised. These handle values are never reused, because we do not have
// a way to ensure that we will never see an event for a socket/handle that has been freed. Instead, we
// allocate monotonically increasing handle values up to some limit; when we would exceed that limit,
// we allocate a new SocketAsyncEngine (and thus a new event port) and start the handle values over at zero.
// Thus we can uniquely identify a given SocketAsyncContext by the *pair* {SocketAsyncEngine, handle},
// and avoid any issues with misidentifying the target of an event we read from the port.
//
#if DEBUG
//
// In debug builds, force rollover to new SocketAsyncEngine instances so that code doesn't go untested, since
// it's very unlikely that the "real" limits will ever be reached in test code.
//
private static readonly IntPtr MaxHandles = (IntPtr)(EventBufferCount * 2);
#else
//
// In release builds, we use *very* high limits. No 64-bit process running on release builds should ever
// reach the handle limit for a single event port, and even 32-bit processes should see this only very rarely.
//
private static readonly IntPtr MaxHandles = IntPtr.Size == 4 ? (IntPtr)int.MaxValue : (IntPtr)long.MaxValue;
#endif
//
// Sentinel handle value to identify events from the "shutdown pipe," used to signal an event loop to stop
// processing events.
//
private static readonly IntPtr ShutdownHandle = (IntPtr)(-1);

//
// The next handle value to be allocated for this event port.
// Must be accessed under s_lock.
//
private IntPtr _nextHandle;

//
// Maps handle values to SocketAsyncContext instances.
//
// Maps socket file descriptors to SocketAsyncContext instances.
private readonly ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper> _handleToContextMap = new ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper>();

//
// Queue of events generated by EventLoop() that would be processed by the thread pool
//
private readonly ConcurrentQueue<SocketIOEvent> _eventQueue = new ConcurrentQueue<SocketIOEvent>();

//
// This field is set to 1 to indicate that a thread pool work item is scheduled to process events in _eventQueue. It is
// set to 0 when the scheduled work item starts running, to indicate that a thread pool work item to process events is
// not scheduled. Changes are protected by atomic operations as appropriate.
//
private int _eventQueueProcessingRequested;

//
// True if we've reached the handle value limit for this event port, and thus must allocate a new event port
// on the next handle allocation.
//
private bool IsFull { get { return _nextHandle == MaxHandles; } }

//
// Allocates a new {SocketAsyncEngine, handle} pair.
//
private static void AllocateToken(SocketAsyncContext context, out SocketAsyncEngine? engine, out IntPtr handle)
{
lock (s_lock)
{
engine = s_currentEngines[s_allocateFromEngine];
if (engine == null)
{
s_currentEngines[s_allocateFromEngine] = engine = new SocketAsyncEngine();
}

handle = engine.AllocateHandle(context);

if (engine.IsFull)
{
// We'll need to create a new event port for the next handle.
s_currentEngines[s_allocateFromEngine] = null;
}

// Round-robin to the next engine once we have sufficient sockets on this one.
s_allocateFromEngine = (s_allocateFromEngine + 1) % s_maxEngineCount;
}
}

private IntPtr AllocateHandle(SocketAsyncContext context)
{
Debug.Assert(Monitor.IsEntered(s_lock), "Expected s_lock to be held");
Debug.Assert(!IsFull, "Expected !IsFull");

IntPtr handle = _nextHandle;
Debug.Assert(handle != ShutdownHandle, "ShutdownHandle must not be added to the dictionary");
bool added = _handleToContextMap.TryAdd(handle, new SocketAsyncContextWrapper(context));
Debug.Assert(added, "Add should always succeed");
_nextHandle = IntPtr.Add(_nextHandle, 1);

return handle;
}

private void FreeHandle(IntPtr handle)
{
Debug.Assert(handle != ShutdownHandle, $"Expected handle != ShutdownHandle: {handle}");

bool shutdownNeeded = false;

lock (s_lock)
{
if (_handleToContextMap.TryRemove(handle, out _))
{
//
// If we've allocated all possible handles for this instance, and freed them all, then
// we don't need the event loop any more, and can reclaim resources.
//
if (IsFull && _handleToContextMap.IsEmpty)
{
shutdownNeeded = true;
}
}
}

//
// Signal shutdown outside of the lock to reduce contention.
//
if (shutdownNeeded)
{
RequestEventLoopShutdown();
}
}

private SocketAsyncEngine()
{
_port = (IntPtr)(-1);
Expand Down Expand Up @@ -261,7 +156,7 @@ private SocketAsyncEngine()
_shutdownReadPipe = pipeFds[Interop.Sys.ReadEndOfPipe];
_shutdownWritePipe = pipeFds[Interop.Sys.WriteEndOfPipe];

err = Interop.Sys.TryChangeSocketEventRegistration(_port, (IntPtr)_shutdownReadPipe, Interop.Sys.SocketEvents.None, Interop.Sys.SocketEvents.Read, ShutdownHandle);
err = Interop.Sys.TryChangeSocketEventRegistration(_port, (IntPtr)_shutdownReadPipe, Interop.Sys.SocketEvents.None, Interop.Sys.SocketEvents.Read, (IntPtr)_shutdownReadPipe);
if (err != Interop.Error.SUCCESS)
{
throw new InternalException(err);
Expand Down Expand Up @@ -289,6 +184,22 @@ private SocketAsyncEngine()
}
}

private void AddToMap(IntPtr socketFileDescriptor, SocketAsyncContext context)
{
Debug.Assert(socketFileDescriptor != (IntPtr)_shutdownReadPipe, "ShutdownHandle must not be added to the dictionary");

bool added = _handleToContextMap.TryAdd(socketFileDescriptor, new SocketAsyncContextWrapper(context));
Debug.Assert(added, "TryAdd should always succeed");
}

private void RemoveFromMap(IntPtr socketFileDescriptor)
{
Debug.Assert(socketFileDescriptor != (IntPtr)_shutdownReadPipe, "ShutdownHandle must not be added to the dictionary");

bool removed = _handleToContextMap.TryRemove(socketFileDescriptor, out _);
Debug.Assert(removed, "TryRemove should always succeed");
}

private void EventLoop()
{
try
Expand All @@ -297,7 +208,7 @@ private void EventLoop()
Interop.Sys.SocketEvent* buffer = _buffer;
ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper> handleToContextMap = _handleToContextMap;
ConcurrentQueue<SocketIOEvent> eventQueue = _eventQueue;
IntPtr shutdownHandle = ShutdownHandle;
IntPtr shutdownHandle = (IntPtr)_shutdownReadPipe;
SocketAsyncContext? context = null;
while (!shutdown)
{
Expand All @@ -318,8 +229,6 @@ private void EventLoop()

if (handleToContextMap.TryGetValue(handle, out SocketAsyncContextWrapper contextWrapper) && (context = contextWrapper.Context) != null)
{
Debug.Assert(handle.ToInt64() < MaxHandles.ToInt64(), $"Unexpected values: handle={handle}, MaxHandles={MaxHandles}");

Interop.Sys.SocketEvents events = context.HandleSyncEventsSpeculatively(socketEvent.Events);
if (events != Interop.Sys.SocketEvents.None)
{
Expand Down Expand Up @@ -458,10 +367,18 @@ private void FreeNativeResources()
}
}

private bool TryRegister(SafeSocketHandle socket, IntPtr handle, out Interop.Error error)
private bool TryRegister(SafeSocketHandle socket, out Interop.Error error)
{
error = Interop.Sys.TryChangeSocketEventRegistration(_port, socket, Interop.Sys.SocketEvents.None,
Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, handle);
Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, socket.DangerousGetHandle());
return error == Interop.Error.SUCCESS;
}

private bool TryUnregister(SafeSocketHandle socket, out Interop.Error error)
{
error = Interop.Sys.TryChangeSocketEventRegistration(_port, socket,
Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write,
Interop.Sys.SocketEvents.None, socket.DangerousGetHandle());
return error == Interop.Error.SUCCESS;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This happens automagically in the kernel when the socket is closed. And it doesn't solve the race issue between the epoll thread and the thread that creates/closes sockets. See #36115 (comment).

}

Expand Down