Skip to content
This repository was archived by the owner on Jan 23, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ internal struct SocketEvent
[DllImport(Libraries.SystemNative, EntryPoint = "SystemNative_CloseSocketEventPort")]
internal static extern Error CloseSocketEventPort(IntPtr port);

[DllImport(Libraries.SystemNative, EntryPoint = "SystemNative_CreateSocketEventBuffer")]
internal static extern unsafe Error CreateSocketEventBuffer(int count, out SocketEvent* buffer);
[DllImport(Libraries.SystemNative, EntryPoint = "SystemNative_ResizeSocketEventBuffer")]
internal static extern unsafe Error ResizeSocketEventBuffer(int count, ref SocketEvent* buffer);

[DllImport(Libraries.SystemNative, EntryPoint = "SystemNative_FreeSocketEventBuffer")]
internal static extern unsafe Error FreeSocketEventBuffer(SocketEvent* buffer);
Expand Down
6 changes: 4 additions & 2 deletions src/Native/Unix/System.Native/pal_networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -2373,19 +2373,21 @@ int32_t SystemNative_CloseSocketEventPort(intptr_t port)
return CloseSocketEventPortInner(ToFileDescriptor(port));
}

int32_t SystemNative_CreateSocketEventBuffer(int32_t count, SocketEvent** buffer)
int32_t SystemNative_ResizeSocketEventBuffer(int32_t count, SocketEvent** buffer)
{
if (buffer == NULL || count < 0)
{
return Error_EFAULT;
}

size_t bufferSize;
SocketEvent* newBuffer;
if (!multiply_s(SocketEventBufferElementSize, (size_t)count, &bufferSize) ||
(*buffer = (SocketEvent*)malloc(bufferSize)) == NULL)
(newBuffer = (SocketEvent*)realloc(*buffer, bufferSize)) == NULL)
{
return Error_ENOMEM;
}
*buffer = newBuffer;

return Error_SUCCESS;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Native/Unix/System.Native/pal_networking.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ DLLEXPORT int32_t SystemNative_CreateSocketEventPort(intptr_t* port);

DLLEXPORT int32_t SystemNative_CloseSocketEventPort(intptr_t port);

DLLEXPORT int32_t SystemNative_CreateSocketEventBuffer(int32_t count, SocketEvent** buffer);
DLLEXPORT int32_t SystemNative_ResizeSocketEventBuffer(int32_t count, SocketEvent** buffer);

DLLEXPORT int32_t SystemNative_FreeSocketEventBuffer(SocketEvent* buffer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,32 +46,15 @@ public bool TryRegister(SafeSocketHandle socket, out Interop.Error error)
}
}

private const int EventBufferCount =
#if DEBUG
32;
#else
1024;
#endif

private static readonly object s_lock = new object();

// In debug builds, force there to be 2 engines. In release builds, use half the number of processors when
// there are at least 6. The lower bound is to avoid using multiple engines on systems which aren't servers.
private static readonly int EngineCount =
#if DEBUG
2;
#else
Environment.ProcessorCount >= 6 ? Environment.ProcessorCount / 2 : 1;
#endif
//
// The current engines. We replace an engine when it runs out of "handle" values.
// Must be accessed under s_lock.
//
private static readonly SocketAsyncEngine[] s_currentEngines = new SocketAsyncEngine[EngineCount];
private static int s_allocateFromEngine = 0;
private static SocketAsyncEngine Current { get; set; } = new SocketAsyncEngine();

private readonly IntPtr _port;
private readonly Interop.Sys.SocketEvent* _buffer;

//
// The read and write ends of a native pipe, used to signal that this instance's event loop should stop
Expand All @@ -94,15 +77,17 @@ public bool TryRegister(SafeSocketHandle socket, out Interop.Error error)
// In debug builds, force rollover to new SocketAsyncEngine instances so that code doesn't go untested, since
// it's very unlikely that the "real" limits will ever be reached in test code.
//
private static readonly IntPtr MaxHandles = (IntPtr)(EventBufferCount * 2);
private static readonly IntPtr MaxHandles = (IntPtr)64;
#else
//
// In release builds, we use *very* high limits. No 64-bit process running on release builds should ever
// reach the handle limit for a single event port, and even 32-bit processes should see this only very rarely.
//
private static readonly IntPtr MaxHandles = IntPtr.Size == 4 ? (IntPtr)int.MaxValue : (IntPtr)long.MaxValue;
#endif
private static readonly IntPtr MinHandlesForAdditionalEngine = EngineCount == 1 ? MaxHandles : (IntPtr)32;

// Determine max event buffer count based on nr of processors.
private static readonly int MaxEventBufferCount = 512 * Environment.ProcessorCount;

//
// Sentinel handle value to identify events from the "shutdown pipe," used to signal an event loop to stop
Expand Down Expand Up @@ -133,56 +118,20 @@ public bool TryRegister(SafeSocketHandle socket, out Interop.Error error)
//
private bool IsFull { get { return _nextHandle == MaxHandles; } }

// True if we've don't have sufficient active sockets to allow allocating a new engine.
private bool HasLowNumberOfSockets
{
get
{
return IntPtr.Size == 4 ? _outstandingHandles.ToInt32() < MinHandlesForAdditionalEngine.ToInt32() :
_outstandingHandles.ToInt64() < MinHandlesForAdditionalEngine.ToInt64();
}
}

//
// Allocates a new {SocketAsyncEngine, handle} pair.
//
private static void AllocateToken(SocketAsyncContext context, out SocketAsyncEngine engine, out IntPtr handle)
{
lock (s_lock)
{
engine = s_currentEngines[s_allocateFromEngine];
if (engine == null)
{
// We minimize the number of engines on applications that have a low number of concurrent sockets.
for (int i = 0; i < s_allocateFromEngine; i++)
{
var previousEngine = s_currentEngines[i];
if (previousEngine == null || previousEngine.HasLowNumberOfSockets)
{
s_allocateFromEngine = i;
engine = previousEngine;
break;
}
}
if (engine == null)
{
s_currentEngines[s_allocateFromEngine] = engine = new SocketAsyncEngine();
}
}

handle = engine.AllocateHandle(context);

engine = Current;
if (engine.IsFull)
{
// We'll need to create a new event port for the next handle.
s_currentEngines[s_allocateFromEngine] = null;
Current = engine = new SocketAsyncEngine();
}

// Round-robin to the next engine once we have sufficient sockets on this one.
if (!engine.HasLowNumberOfSockets)
{
s_allocateFromEngine = (s_allocateFromEngine + 1) % EngineCount;
}
handle = engine.AllocateHandle(context);
}
}

Expand Down Expand Up @@ -249,11 +198,6 @@ private SocketAsyncEngine()
{
throw new InternalException(err);
}
err = Interop.Sys.CreateSocketEventBuffer(EventBufferCount, out _buffer);
if (err != Interop.Error.SUCCESS)
{
throw new InternalException(err);
}

//
// Create the pipe for signaling shutdown, and register for "read" events for the pipe. Now writing
Expand Down Expand Up @@ -304,11 +248,27 @@ private void EventLoop()
{
try
{
Interop.Sys.SocketEvent* buffer = null;
#if DEBUG
//
// In debug builds, force reallocation of event buffers.
//
int eventBufferCount = 4;
#else
int eventBufferCount = 1024;
#endif

Interop.Error err = Interop.Sys.ResizeSocketEventBuffer(eventBufferCount, ref buffer);
if (err != Interop.Error.SUCCESS)
{
throw new InternalException(err);
}

bool shutdown = false;
while (!shutdown)
{
int numEvents = EventBufferCount;
Interop.Error err = Interop.Sys.WaitForSocketEvents(_port, _buffer, &numEvents);
int numEvents = eventBufferCount;
err = Interop.Sys.WaitForSocketEvents(_port, buffer, &numEvents);
if (err != Interop.Error.SUCCESS)
{
throw new InternalException(err);
Expand All @@ -319,7 +279,7 @@ private void EventLoop()

for (int i = 0; i < numEvents; i++)
{
IntPtr handle = _buffer[i].Data;
IntPtr handle = buffer[i].Data;
if (handle == ShutdownHandle)
{
shutdown = true;
Expand All @@ -330,10 +290,27 @@ private void EventLoop()
_handleToContextMap.TryGetValue(handle, out SocketAsyncContext context);
if (context != null)
{
context.HandleEvents(_buffer[i].Events);
context.HandleEvents(buffer[i].Events);
}
}
}

// Resize the event buffer if it was filled completely.
if (numEvents == eventBufferCount &&
eventBufferCount < MaxEventBufferCount)
{
int desiredEventBufferCount = eventBufferCount * 2;
err = Interop.Sys.ResizeSocketEventBuffer(desiredEventBufferCount, ref buffer);
if (err == Interop.Error.SUCCESS)
{
eventBufferCount = desiredEventBufferCount;
}
}
}

if (buffer != null)
{
Interop.Sys.FreeSocketEventBuffer(buffer);
}

FreeNativeResources();
Expand Down Expand Up @@ -367,10 +344,6 @@ private void FreeNativeResources()
{
Interop.Sys.Close((IntPtr)_shutdownWritePipe);
}
if (_buffer != null)
{
Interop.Sys.FreeSocketEventBuffer(_buffer);
}
if (_port != (IntPtr)(-1))
{
Interop.Sys.CloseSocketEventPort(_port);
Expand Down