diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs index 5286c0595..adbd97deb 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs @@ -2,7 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; -using System.Collections.Generic; +using System.Collections.Concurrent; using System.Runtime.ExceptionServices; using System.Threading; using System.Threading.Tasks; @@ -19,11 +19,6 @@ namespace Microsoft.AspNet.Server.Kestrel /// public class KestrelThread { - // maximum times the work queues swapped and are processed in a single pass - // as completing a task may immediately have write data to put on the network - // otherwise it needs to wait till the next pass of the libuv loop - private const int _maxLoops = 8; - private static readonly Action _threadCallbackAdapter = (callback, state) => ((Action)callback).Invoke((KestrelThread)state); private static readonly Action _socketCallbackAdapter = (callback, state) => ((Action)callback).Invoke((SocketOutput)state); private static readonly Action _tcsCallbackAdapter = (callback, state) => ((Action>)callback).Invoke((TaskCompletionSource)state); @@ -35,19 +30,19 @@ public class KestrelThread private readonly Thread _thread; private readonly UvLoopHandle _loop; private readonly UvAsyncHandle _post; - private Queue _workAdding = new Queue(1024); - private Queue _workRunning = new Queue(1024); - private Queue _closeHandleAdding = new Queue(256); - private Queue _closeHandleRunning = new Queue(256); - private readonly object _workSync = new Object(); + private readonly ConcurrentQueue _workQueue = new ConcurrentQueue(); + private readonly ConcurrentQueue _closeHandleQueue = new ConcurrentQueue(); private bool _stopImmediate = false; private bool _initCompleted = false; private ExceptionDispatchInfo _closeError; private readonly IKestrelTrace _log; private readonly IThreadPool _threadPool; + private volatile bool _loopIdle; + public KestrelThread(KestrelEngine engine) { + _loopIdle = true; _engine = engine; _appLifetime = engine.AppLifetime; _log = engine.Log; @@ -143,72 +138,57 @@ private void OnStopImmediate() private void Post(Action callback) { - lock (_workSync) - { - _workAdding.Enqueue(new Work { CallbackAdapter = _threadCallbackAdapter, Callback = callback, State = this }); - } - _post.Send(); + _workQueue.Enqueue(new Work { CallbackAdapter = _threadCallbackAdapter, Callback = callback, State = this }); + WakeUpLoop(); } public void Post(Action callback, SocketOutput state) { - lock (_workSync) + _workQueue.Enqueue(new Work { - _workAdding.Enqueue(new Work - { - CallbackAdapter = _socketCallbackAdapter, - Callback = callback, - State = state - }); - } - _post.Send(); + CallbackAdapter = _socketCallbackAdapter, + Callback = callback, + State = state + }); + WakeUpLoop(); } public void Post(Action> callback, TaskCompletionSource state) { - lock (_workSync) + _workQueue.Enqueue(new Work { - _workAdding.Enqueue(new Work - { - CallbackAdapter = _tcsCallbackAdapter, - Callback = callback, - State = state - }); - } - _post.Send(); + CallbackAdapter = _tcsCallbackAdapter, + Callback = callback, + State = state + }); + WakeUpLoop(); } public Task PostAsync(Action callback, ListenerPrimary state) { var tcs = new TaskCompletionSource(); - lock (_workSync) + _workQueue.Enqueue(new Work { - _workAdding.Enqueue(new Work - { - CallbackAdapter = _listenerPrimaryCallbackAdapter, - Callback = callback, - State = state, - Completion = tcs - }); - } - _post.Send(); + CallbackAdapter = _listenerPrimaryCallbackAdapter, + Callback = callback, + State = state, + Completion = tcs + }); + WakeUpLoop(); return tcs.Task; } public Task PostAsync(Action callback, ListenerSecondary state) { var tcs = new TaskCompletionSource(); - lock (_workSync) + _workQueue.Enqueue(new Work { - _workAdding.Enqueue(new Work - { - CallbackAdapter = _listenerSecondaryCallbackAdapter, - Callback = callback, - State = state, - Completion = tcs - }); - } - _post.Send(); + CallbackAdapter = _listenerSecondaryCallbackAdapter, + Callback = callback, + State = state, + Completion = tcs + }); + WakeUpLoop(); return tcs.Task; } @@ -226,11 +206,17 @@ public void Send(Action callback, ListenerSecondary state) private void PostCloseHandle(Action callback, IntPtr handle) { - lock (_workSync) + _closeHandleQueue.Enqueue(new CloseHandle { Callback = callback, Handle = handle }); + WakeUpLoop(); + } + + private void WakeUpLoop() + { + if (_loopIdle) { - _closeHandleAdding.Enqueue(new CloseHandle { Callback = callback, Handle = handle }); + _loopIdle = false; + _post.Send(); } - _post.Send(); } private void ThreadStart(object parameter) @@ -291,31 +277,24 @@ private void ThreadStart(object parameter) private void OnPost() { - var loopsRemaining = _maxLoops; - bool wasWork; - do - { - wasWork = DoPostWork(); - wasWork = DoPostCloseHandle() || wasWork; - loopsRemaining--; - } while (wasWork && loopsRemaining > 0); - } + DoPostWork(); + DoPostCloseHandle(); - private bool DoPostWork() - { - Queue queue; - lock (_workSync) + if (_loopIdle == false) { - queue = _workAdding; - _workAdding = _workRunning; - _workRunning = queue; + _loopIdle = true; + // Run the loops once more to pick up any remaining event that + // might not have triggered uv_async_send due to loop not being idle + DoPostWork(); + DoPostCloseHandle(); } + } - bool wasWork = queue.Count > 0; - - while (queue.Count != 0) + private void DoPostWork() + { + Work work; + while (_workQueue.TryDequeue(out work)) { - var work = queue.Dequeue(); try { work.CallbackAdapter(work.Callback, work.State); @@ -338,23 +317,12 @@ private bool DoPostWork() } } - return wasWork; } - private bool DoPostCloseHandle() + private void DoPostCloseHandle() { - Queue queue; - lock (_workSync) - { - queue = _closeHandleAdding; - _closeHandleAdding = _closeHandleRunning; - _closeHandleRunning = queue; - } - - bool wasWork = queue.Count > 0; - - while (queue.Count != 0) + CloseHandle closeHandle; + while (_closeHandleQueue.TryDequeue(out closeHandle)) { - var closeHandle = queue.Dequeue(); try { closeHandle.Callback(closeHandle.Handle); @@ -365,8 +333,6 @@ private bool DoPostCloseHandle() throw; } } - - return wasWork; } private struct Work