Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 5 additions & 15 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1752,22 +1752,12 @@ public void AsyncFlushPages(long fromAddress, long untilAddress)
}

// Enqueue work in shared queue
if (PendingFlush[index].Add(asyncResult))
{
// Perform work from shared queue if possible
if (PendingFlush[index].RemoveNextAdjacent(FlushedUntilAddress, out PageAsyncFlushResult<Empty> request))
{
WriteAsync(request.fromAddress >> LogPageSizeBits, AsyncFlushPageCallback, request);
}
}
else
PendingFlush[index].Add(asyncResult);

// Perform work from shared queue if possible
if (PendingFlush[index].RemoveNextAdjacent(FlushedUntilAddress, out PageAsyncFlushResult<Empty> request))
{
// Because we are invoking the callback away from the usual codepath, need to externally
// ensure that flush address are updated in order
while (FlushedUntilAddress < asyncResult.fromAddress) Thread.Yield();
// Could not add to pending flush list, treat as a failed write
// Use a special errorCode to convey that this is not from a syscall
AsyncFlushPageCallback(16000, 0, asyncResult);
WriteAsync(request.fromAddress >> LogPageSizeBits, AsyncFlushPageCallback, request);
}
}
else
Expand Down
49 changes: 20 additions & 29 deletions cs/src/core/Allocator/PendingFlushList.cs
Original file line number Diff line number Diff line change
@@ -1,54 +1,43 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System.Threading;
using System.Collections.Generic;

namespace FASTER.core
{
class PendingFlushList
sealed class PendingFlushList
{
const int maxSize = 8;
const int maxRetries = 10;
public PageAsyncFlushResult<Empty>[] list;
public readonly LinkedList<PageAsyncFlushResult<Empty>> list;

public PendingFlushList()
{
list = new PageAsyncFlushResult<Empty>[maxSize];
list = new();
}

public bool Add(PageAsyncFlushResult<Empty> t)
public void Add(PageAsyncFlushResult<Empty> t)
{
int retries = 0;
do
lock (list)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perf idea: let's reuse the original performant code path with a fixed size array --- if array full, grow the array under critical section. For concurrency, can use something similar to SimpleVersionScheme in libDPR to ensure minimal overhead on the normal code path.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Growing is fine, but then we would have to shrink at some point. Also, the growability implies we still have to take the lock every time.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also thinking of a LinkedList, where each element is the performant array from earlier. Kind of like the ElasticCircularBuffer ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't really need to shrink here? Even a thousand pending flush entry is nothing in terms of mem size.

For the lock --- we can use epoch + one atomic variable to avoid a shared lock on the common path. SimpleVersionScheme does this.

{
for (int i = 0; i < maxSize; i++)
{
if (list[i] == default)
{
if (Interlocked.CompareExchange(ref list[i], t, default) == default)
{
return true;
}
}
}
} while (retries++ < maxRetries);
return false;
list.AddFirst(t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just checking: we basically don't have any ordering guarantees within the pending flush list right?

Is there maybe some value in ensuring order? Normally, we don't flush a random range to disk, so only truly concurrent flushes will result in out-of-order entries in the pending flush list. In that case, whatever saving we have on the add path will arguably be negated on the remove path. What about just biting the bullet and ensure ordering from the start?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensuring order would be useful. The entries are of the form [0, 23], [24, 39], [40, 100] and so on. But keeping these ordered will make this a O(log N) structure (interval tree).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about a much simpler approach: if you have [0, 23], and you see [40, 100] --- [24,39] must have been issued from a different thread, we can just spin wait until that's in before enqueuing [40, 100]

}
}

/// <summary>
/// Remove item from flush list with from-address equal to the specified address
/// </summary>
public bool RemoveNextAdjacent(long address, out PageAsyncFlushResult<Empty> request)
{
for (int i=0; i<maxSize; i++)
lock (list)
{
request = list[i];
if (request?.fromAddress == address)
for (var it = list.First; it != null;)
{
if (Interlocked.CompareExchange(ref list[i], null, request) == request)
request = it.Value;
if (request.fromAddress == address)
{
list.Remove(it);
return true;
}
it = it.Next;
}
}
request = null;
Expand All @@ -60,15 +49,17 @@ public bool RemoveNextAdjacent(long address, out PageAsyncFlushResult<Empty> req
/// </summary>
public bool RemovePreviousAdjacent(long address, out PageAsyncFlushResult<Empty> request)
{
for (int i = 0; i < maxSize; i++)
lock (list)
{
request = list[i];
if (request?.untilAddress == address)
for (var it = list.First; it != null;)
{
if (Interlocked.CompareExchange(ref list[i], null, request) == request)
request = it.Value;
if (request.untilAddress == address)
{
list.Remove(it);
return true;
}
it = it.Next;
}
}
request = null;
Expand Down