Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 59 additions & 93 deletions src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -19,11 +19,6 @@ namespace Microsoft.AspNet.Server.Kestrel
/// </summary>
public class KestrelThread
{
// maximum times the work queues swapped and are processed in a single pass
// as completing a task may immediately have write data to put on the network
// otherwise it needs to wait till the next pass of the libuv loop
private const int _maxLoops = 8;

private static readonly Action<object, object> _threadCallbackAdapter = (callback, state) => ((Action<KestrelThread>)callback).Invoke((KestrelThread)state);
private static readonly Action<object, object> _socketCallbackAdapter = (callback, state) => ((Action<SocketOutput>)callback).Invoke((SocketOutput)state);
private static readonly Action<object, object> _tcsCallbackAdapter = (callback, state) => ((Action<TaskCompletionSource<int>>)callback).Invoke((TaskCompletionSource<int>)state);
Expand All @@ -35,19 +30,19 @@ public class KestrelThread
private readonly Thread _thread;
private readonly UvLoopHandle _loop;
private readonly UvAsyncHandle _post;
private Queue<Work> _workAdding = new Queue<Work>(1024);
private Queue<Work> _workRunning = new Queue<Work>(1024);
private Queue<CloseHandle> _closeHandleAdding = new Queue<CloseHandle>(256);
private Queue<CloseHandle> _closeHandleRunning = new Queue<CloseHandle>(256);
private readonly object _workSync = new Object();
private readonly ConcurrentQueue<Work> _workQueue = new ConcurrentQueue<Work>();
private readonly ConcurrentQueue<CloseHandle> _closeHandleQueue = new ConcurrentQueue<CloseHandle>();
private bool _stopImmediate = false;
private bool _initCompleted = false;
private ExceptionDispatchInfo _closeError;
private readonly IKestrelTrace _log;
private readonly IThreadPool _threadPool;

private volatile bool _loopIdle;

public KestrelThread(KestrelEngine engine)
{
_loopIdle = true;
_engine = engine;
_appLifetime = engine.AppLifetime;
_log = engine.Log;
Expand Down Expand Up @@ -143,72 +138,57 @@ private void OnStopImmediate()

private void Post(Action<KestrelThread> callback)
{
lock (_workSync)
{
_workAdding.Enqueue(new Work { CallbackAdapter = _threadCallbackAdapter, Callback = callback, State = this });
}
_post.Send();
_workQueue.Enqueue(new Work { CallbackAdapter = _threadCallbackAdapter, Callback = callback, State = this });
WakeUpLoop();
}

public void Post(Action<SocketOutput> callback, SocketOutput state)
{
lock (_workSync)
_workQueue.Enqueue(new Work
{
_workAdding.Enqueue(new Work
{
CallbackAdapter = _socketCallbackAdapter,
Callback = callback,
State = state
});
}
_post.Send();
CallbackAdapter = _socketCallbackAdapter,
Callback = callback,
State = state
});
WakeUpLoop();
}

public void Post(Action<TaskCompletionSource<int>> callback, TaskCompletionSource<int> state)
{
lock (_workSync)
_workQueue.Enqueue(new Work
{
_workAdding.Enqueue(new Work
{
CallbackAdapter = _tcsCallbackAdapter,
Callback = callback,
State = state
});
}
_post.Send();
CallbackAdapter = _tcsCallbackAdapter,
Callback = callback,
State = state
});
WakeUpLoop();
}

public Task PostAsync(Action<ListenerPrimary> callback, ListenerPrimary state)
{
var tcs = new TaskCompletionSource<object>();
lock (_workSync)
_workQueue.Enqueue(new Work
{
_workAdding.Enqueue(new Work
{
CallbackAdapter = _listenerPrimaryCallbackAdapter,
Callback = callback,
State = state,
Completion = tcs
});
}
_post.Send();
CallbackAdapter = _listenerPrimaryCallbackAdapter,
Callback = callback,
State = state,
Completion = tcs
});
WakeUpLoop();
return tcs.Task;
}

public Task PostAsync(Action<ListenerSecondary> callback, ListenerSecondary state)
{
var tcs = new TaskCompletionSource<object>();
lock (_workSync)
_workQueue.Enqueue(new Work
{
_workAdding.Enqueue(new Work
{
CallbackAdapter = _listenerSecondaryCallbackAdapter,
Callback = callback,
State = state,
Completion = tcs
});
}
_post.Send();
CallbackAdapter = _listenerSecondaryCallbackAdapter,
Callback = callback,
State = state,
Completion = tcs
});
WakeUpLoop();
return tcs.Task;
}

Expand All @@ -226,11 +206,17 @@ public void Send(Action<ListenerSecondary> callback, ListenerSecondary state)

private void PostCloseHandle(Action<IntPtr> callback, IntPtr handle)
{
lock (_workSync)
_closeHandleQueue.Enqueue(new CloseHandle { Callback = callback, Handle = handle });
WakeUpLoop();
}

private void WakeUpLoop()
{
if (_loopIdle)
{
_closeHandleAdding.Enqueue(new CloseHandle { Callback = callback, Handle = handle });
_loopIdle = false;
_post.Send();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure there is a race here.

What if we don't call uv_async_send because the loop is apparently not idle when in fact the loop is merely finishing up? It's possible that _workQueue.IsEmpty returns true, but before _loopIdle is actually set to true, new work is enqueued and WakeUpLoop is called.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought about this for a while; only performant way I can think of is to hit it with a sledgehammer. Changed how OnPost works.

}
_post.Send();
}

private void ThreadStart(object parameter)
Expand Down Expand Up @@ -291,31 +277,24 @@ private void ThreadStart(object parameter)

private void OnPost()
{
var loopsRemaining = _maxLoops;
bool wasWork;
do
{
wasWork = DoPostWork();
wasWork = DoPostCloseHandle() || wasWork;
loopsRemaining--;
} while (wasWork && loopsRemaining > 0);
}
DoPostWork();
DoPostCloseHandle();

private bool DoPostWork()
{
Queue<Work> queue;
lock (_workSync)
if (_loopIdle == false)
{
queue = _workAdding;
_workAdding = _workRunning;
_workRunning = queue;
_loopIdle = true;
// Run the loops once more to pick up any remaining event that
// might not have triggered uv_async_send due to loop not being idle
DoPostWork();
DoPostCloseHandle();
}
}

bool wasWork = queue.Count > 0;

while (queue.Count != 0)
private void DoPostWork()
{
Work work;
while (_workQueue.TryDequeue(out work))
{
var work = queue.Dequeue();
try
{
work.CallbackAdapter(work.Callback, work.State);
Expand All @@ -338,23 +317,12 @@ private bool DoPostWork()
}
}

return wasWork;
}
private bool DoPostCloseHandle()
private void DoPostCloseHandle()
{
Queue<CloseHandle> queue;
lock (_workSync)
{
queue = _closeHandleAdding;
_closeHandleAdding = _closeHandleRunning;
_closeHandleRunning = queue;
}

bool wasWork = queue.Count > 0;

while (queue.Count != 0)
CloseHandle closeHandle;
while (_closeHandleQueue.TryDequeue(out closeHandle))
{
var closeHandle = queue.Dequeue();
try
{
closeHandle.Callback(closeHandle.Handle);
Expand All @@ -365,8 +333,6 @@ private bool DoPostCloseHandle()
throw;
}
}

return wasWork;
}

private struct Work
Expand Down