Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"editor.tabSize": 2
},
"files.trimTrailingWhitespace": true,
"files.insertFinalNewline": true,
"files.associations": {
"*.props": "xml",
"*.targets": "xml"
Expand Down
67 changes: 67 additions & 0 deletions src/Kestrel.Transport.Sockets/Internal/IOQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Concurrent;
using System.IO.Pipelines;
using System.Threading;

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
{
public class IOQueue : PipeScheduler
{
private static readonly WaitCallback _doWorkCallback = s => ((IOQueue)s).DoWork();

private readonly object _workSync = new object();
private readonly ConcurrentQueue<Work> _workItems = new ConcurrentQueue<Work>();
private bool _doingWork;

public override void Schedule<T>(Action<T> action, T state)
{
var work = new Work
{
CallbackAdapter = (c, s) => ((Action<T>)c)((T)s),
Callback = action,
State = state
};

_workItems.Enqueue(work);

lock (_workSync)
{
if (!_doingWork)
{
System.Threading.ThreadPool.QueueUserWorkItem(_doWorkCallback, this);
_doingWork = true;
}
}
}

private void DoWork()
{
while (true)
{
while (_workItems.TryDequeue(out Work item))
{
item.CallbackAdapter(item.Callback, item.State);
}

lock (_workSync)
{
if (_workItems.IsEmpty)
{
_doingWork = false;
return;
}
}
}
}

private struct Work
{
public Action<object, object> CallbackAdapter;
public object Callback;
public object State;
}
}
}
23 changes: 18 additions & 5 deletions src/Kestrel.Transport.Sockets/Internal/SocketAwaitable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Threading;
Expand All @@ -13,10 +14,17 @@ public class SocketAwaitable : ICriticalNotifyCompletion
{
private static readonly Action _callbackCompleted = () => { };

private readonly PipeScheduler _ioScheduler;

private Action _callback;
private int _bytesTransfered;
private int _bytesTransferred;
private SocketError _error;

public SocketAwaitable(PipeScheduler ioScheduler)
{
_ioScheduler = ioScheduler;
}

public SocketAwaitable GetAwaiter() => this;
public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted);

Expand All @@ -31,7 +39,7 @@ public int GetResult()
throw new SocketException((int)_error);
}

return _bytesTransfered;
return _bytesTransferred;
}

public void OnCompleted(Action continuation)
Expand All @@ -51,8 +59,13 @@ public void UnsafeOnCompleted(Action continuation)
public void Complete(int bytesTransferred, SocketError socketError)
{
_error = socketError;
_bytesTransfered = bytesTransferred;
Interlocked.Exchange(ref _callback, _callbackCompleted)?.Invoke();
_bytesTransferred = bytesTransferred;
var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted);

if (continuation != null)
{
_ioScheduler.Schedule(c => c(), continuation);
}
}
}
}
}
18 changes: 12 additions & 6 deletions src/Kestrel.Transport.Sockets/Internal/SocketConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
using System.IO.Pipelines;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Protocols;
using System.Threading;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
using Microsoft.Extensions.Logging;

Expand All @@ -19,22 +19,25 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
internal sealed class SocketConnection : TransportConnection
{
private const int MinAllocBufferSize = 2048;
public static bool IsWindows = RuntimeInformation.IsOSPlatform(OSPlatform.Windows);

private readonly Socket _socket;
private readonly PipeScheduler _scheduler;
private readonly ISocketsTrace _trace;
private readonly SocketReceiver _receiver;
private readonly SocketSender _sender;

private volatile bool _aborted;

internal SocketConnection(Socket socket, MemoryPool<byte> memoryPool, ISocketsTrace trace)
internal SocketConnection(Socket socket, MemoryPool<byte> memoryPool, PipeScheduler scheduler, ISocketsTrace trace)
{
Debug.Assert(socket != null);
Debug.Assert(memoryPool != null);
Debug.Assert(trace != null);

_socket = socket;
MemoryPool = memoryPool;
_scheduler = scheduler;
_trace = trace;

var localEndPoint = (IPEndPoint)_socket.LocalEndPoint;
Expand All @@ -46,13 +49,16 @@ internal SocketConnection(Socket socket, MemoryPool<byte> memoryPool, ISocketsTr
RemoteAddress = remoteEndPoint.Address;
RemotePort = remoteEndPoint.Port;

_receiver = new SocketReceiver(_socket);
_sender = new SocketSender(_socket);
// On *nix platforms, Sockets already dispatches to the ThreadPool.
var awaiterScheduler = IsWindows ? _scheduler : PipeScheduler.Inline;

_receiver = new SocketReceiver(_socket, awaiterScheduler);
_sender = new SocketSender(_socket, awaiterScheduler);
}

public override MemoryPool<byte> MemoryPool { get; }
public override PipeScheduler InputWriterScheduler => PipeScheduler.Inline;
public override PipeScheduler OutputReaderScheduler => PipeScheduler.ThreadPool;
public override PipeScheduler InputWriterScheduler => _scheduler;
public override PipeScheduler OutputReaderScheduler => _scheduler;

public async Task StartAsync(IConnectionHandler connectionHandler)
{
Expand Down
6 changes: 4 additions & 2 deletions src/Kestrel.Transport.Sockets/Internal/SocketReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.IO.Pipelines;
using System.Net.Sockets;

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
Expand All @@ -10,11 +11,12 @@ public class SocketReceiver
{
private readonly Socket _socket;
private readonly SocketAsyncEventArgs _eventArgs = new SocketAsyncEventArgs();
private readonly SocketAwaitable _awaitable = new SocketAwaitable();
private readonly SocketAwaitable _awaitable;

public SocketReceiver(Socket socket)
public SocketReceiver(Socket socket, PipeScheduler scheduler)
{
_socket = socket;
_awaitable = new SocketAwaitable(scheduler);
_eventArgs.UserToken = _awaitable;
_eventArgs.Completed += (_, e) => ((SocketAwaitable)e.UserToken).Complete(e.BytesTransferred, e.SocketError);
}
Expand Down
6 changes: 4 additions & 2 deletions src/Kestrel.Transport.Sockets/Internal/SocketSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Net.Sockets;
using System.Runtime.InteropServices;

Expand All @@ -14,13 +15,14 @@ public class SocketSender
{
private readonly Socket _socket;
private readonly SocketAsyncEventArgs _eventArgs = new SocketAsyncEventArgs();
private readonly SocketAwaitable _awaitable = new SocketAwaitable();
private readonly SocketAwaitable _awaitable;

private List<ArraySegment<byte>> _bufferList;

public SocketSender(Socket socket)
public SocketSender(Socket socket, PipeScheduler scheduler)
{
_socket = socket;
_awaitable = new SocketAwaitable(scheduler);
_eventArgs.UserToken = _awaitable;
_eventArgs.Completed += (_, e) => ((SocketAwaitable)e.UserToken).Complete(e.BytesTransferred, e.SocketError);
}
Expand Down
55 changes: 40 additions & 15 deletions src/Kestrel.Transport.Sockets/SocketTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Net;
using System.Net.Sockets;
using System.Runtime.ExceptionServices;
Expand All @@ -19,10 +20,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
{
internal sealed class SocketTransport : ITransport
{
private static readonly PipeScheduler[] ThreadPoolSchedulerArray = new PipeScheduler[] { PipeScheduler.ThreadPool };

private readonly MemoryPool<byte> _memoryPool = KestrelMemoryPool.Create();
private readonly IEndPointInformation _endPointInformation;
private readonly IConnectionHandler _handler;
private readonly IApplicationLifetime _appLifetime;
private readonly int _numSchedulers;
private readonly PipeScheduler[] _schedulers;
private readonly ISocketsTrace _trace;
private Socket _listenSocket;
private Task _listenTask;
Expand All @@ -33,6 +38,7 @@ internal SocketTransport(
IEndPointInformation endPointInformation,
IConnectionHandler handler,
IApplicationLifetime applicationLifetime,
int ioQueueCount,
ISocketsTrace trace)
{
Debug.Assert(endPointInformation != null);
Expand All @@ -45,6 +51,22 @@ internal SocketTransport(
_handler = handler;
_appLifetime = applicationLifetime;
_trace = trace;

if (ioQueueCount > 0)
{
_numSchedulers = ioQueueCount;
_schedulers = new IOQueue[_numSchedulers];

for (var i = 0; i < _numSchedulers; i++)
{
_schedulers[i] = new IOQueue();
}
}
else
{
_numSchedulers = ThreadPoolSchedulerArray.Length;
_schedulers = ThreadPoolSchedulerArray;
}
}

public Task BindAsync()
Expand Down Expand Up @@ -125,22 +147,25 @@ private async Task RunAcceptLoopAsync()
{
while (true)
{
try
{
var acceptSocket = await _listenSocket.AcceptAsync();
acceptSocket.NoDelay = _endPointInformation.NoDelay;

var connection = new SocketConnection(acceptSocket, _memoryPool, _trace);
_ = connection.StartAsync(_handler);
}
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset)
{
// REVIEW: Should there be a seperate log message for a connection reset this early?
_trace.ConnectionReset(connectionId: "(null)");
}
catch (SocketException ex) when (!_unbinding)
for (var schedulerIndex = 0; schedulerIndex < _numSchedulers; schedulerIndex++)
{
_trace.ConnectionError(connectionId: "(null)", ex);
try
{
var acceptSocket = await _listenSocket.AcceptAsync();
acceptSocket.NoDelay = _endPointInformation.NoDelay;

var connection = new SocketConnection(acceptSocket, _memoryPool, _schedulers[schedulerIndex], _trace);
_ = connection.StartAsync(_handler);
}
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset)
{
// REVIEW: Should there be a seperate log message for a connection reset this early?
_trace.ConnectionReset(connectionId: "(null)");
}
catch (SocketException ex) when (!_unbinding)
{
_trace.ConnectionError(connectionId: "(null)", ex);
}
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/Kestrel.Transport.Sockets/SocketTransportFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
{
public sealed class SocketTransportFactory : ITransportFactory
{
private readonly SocketsTrace _trace;
private readonly SocketTransportOptions _options;
private readonly IApplicationLifetime _appLifetime;
private readonly SocketsTrace _trace;

public SocketTransportFactory(
IOptions<SocketTransportOptions> options,
Expand All @@ -33,6 +34,7 @@ public SocketTransportFactory(
throw new ArgumentNullException(nameof(loggerFactory));
}

_options = options.Value;
_appLifetime = applicationLifetime;
var logger = loggerFactory.CreateLogger("Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets");
_trace = new SocketsTrace(logger);
Expand All @@ -55,7 +57,7 @@ public ITransport Create(IEndPointInformation endPointInformation, IConnectionHa
throw new ArgumentNullException(nameof(handler));
}

return new SocketTransport(endPointInformation, handler, _appLifetime, _trace);
return new SocketTransport(endPointInformation, handler, _appLifetime, _options.IOQueueCount, _trace);
}
}
}
10 changes: 8 additions & 2 deletions src/Kestrel.Transport.Sockets/SocketTransportOptions.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
{
// TODO: Come up with some options
public class SocketTransportOptions
{

/// <summary>
/// The number of I/O queues used to process requests. Set to 0 to directly schedule I/O to the ThreadPool.
/// </summary>
/// <remarks>
/// Defaults to <see cref="Environment.ProcessorCount" /> rounded down and clamped between 1 and 16.
/// </remarks>
public int IOQueueCount { get; set; } = Math.Min(Environment.ProcessorCount, 16);
}
}