Skip to content
This repository was archived by the owner on Jan 23, 2023. It is now read-only.
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 65 additions & 10 deletions src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ internal sealed class WorkStealingQueue

private volatile int m_headIndex = START_INDEX;
private volatile int m_tailIndex = START_INDEX;
private volatile bool m_isActive;

private SpinLock m_foreignLock = new SpinLock(enableThreadOwnerTracking: false);

Expand Down Expand Up @@ -214,11 +215,23 @@ public void LocalPush(object obj)
m_foreignLock.Exit(useMemoryBarrier: false);
}
}

if (!m_isActive)
{
// New local item queued, and local queue not active.
// Activate it and add it to active local queues so it can be stolen from.
Activate();
}
}

[SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")]
public bool LocalFindAndPop(object obj)
{
if (!m_isActive)
{
return false;
}

// Fast path: check the tail. If equal, we can skip the lock.
if (m_array[(m_tailIndex - 1) & m_mask] == obj)
{
Expand Down Expand Up @@ -386,6 +399,25 @@ public object TrySteal(ref bool missedSteal)
return null;
}
}

internal void Deactivate()
{
if (m_isActive)
{
m_isActive = false;
// Remove queue from active local queues so its not checked if it can be stolen from.
WorkStealingQueueList.Remove(this);
}
}

private void Activate()
{
Debug.Assert(!m_isActive);

m_isActive = true;
// Add queue to active local queues so it can be stolen from.
WorkStealingQueueList.Add(this);
}
}

internal bool loggingEnabled;
Expand Down Expand Up @@ -488,13 +520,29 @@ public object Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal)
// finally try to steal from another thread's local queue
WorkStealingQueue[] queues = WorkStealingQueueList.Queues;
int c = queues.Length;
Debug.Assert(c > 0, "There must at least be a queue for this thread.");
int maxIndex = c - 1;
if (c == 0)
{
// No local queues to check
return null;
}

int i = tl.random.Next(c);
while (c > 0)
do
{
i = (i < maxIndex) ? i + 1 : 0;
WorkStealingQueue otherQueue = queues[i];
WorkStealingQueue otherQueue;
if ((uint)i < (uint)queues.Length)
{
// Elides bounds check for common case
otherQueue = queues[i];
}
else
{
// Moved passed last element, move to first element
i = 0;
// Bounds check occurs here, as Jit checks if 0 is <= .Length
otherQueue = queues[i];
}

if (otherQueue != localWsq && otherQueue.CanSteal)
{
callback = otherQueue.TrySteal(ref missedSteal);
Expand All @@ -504,7 +552,8 @@ public object Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal)
}
}
c--;
}
i++;
} while (c > 0);
}

return callback;
Expand Down Expand Up @@ -573,6 +622,12 @@ internal static bool Dispatch()
//
needAnotherThread = missedSteal;

// No global queued items and no local items for this thread.
// Use the rest of this threads quantum to remove its local queue from
// the active list (if it is active), so other threads do not waste time
// trying to steal work from it.
tl.workStealingQueue.Deactivate();

// Tell the VM we're returning normally, not because Hill Climbing asked us to return.
return true;
}
Expand Down Expand Up @@ -717,25 +772,25 @@ public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq)
{
workQueue = tpq;
workStealingQueue = new ThreadPoolWorkQueue.WorkStealingQueue();
ThreadPoolWorkQueue.WorkStealingQueueList.Add(workStealingQueue);
currentThread = Thread.CurrentThread;
}

private void CleanUp()
{
if (null != workStealingQueue)
ThreadPoolWorkQueue.WorkStealingQueue localQueue = workStealingQueue;
if (null != localQueue)
{
if (null != workQueue)
{
object cb;
while ((cb = workStealingQueue.LocalPop()) != null)
while ((cb = localQueue.LocalPop()) != null)
{
Debug.Assert(null != cb);
workQueue.Enqueue(cb, forceGlobal: true);
}
}

ThreadPoolWorkQueue.WorkStealingQueueList.Remove(workStealingQueue);
localQueue.Deactivate();
}
}

Expand Down