diff --git a/cs/samples/FasterLogPubSub/Program.cs b/cs/samples/FasterLogPubSub/Program.cs
index 4446c393e..4d430dc77 100644
--- a/cs/samples/FasterLogPubSub/Program.cs
+++ b/cs/samples/FasterLogPubSub/Program.cs
@@ -78,7 +78,7 @@ static async Task CommitterAsync(FasterLog log, CancellationToken cancellationTo
Console.WriteLine("Committing...");
- await log.CommitAsync(cancellationToken);
+ await log.CommitAsync(token: cancellationToken);
}
}
catch (OperationCanceledException) { }
diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs
index fb83286eb..3a089d3f8 100644
--- a/cs/src/core/Allocator/AllocatorBase.cs
+++ b/cs/src/core/Allocator/AllocatorBase.cs
@@ -1762,8 +1762,12 @@ public void AsyncFlushPages(long fromAddress, long untilAddress)
}
else
{
+ // Because we are invoking the callback away from the usual codepath, need to externally
+ // ensure that flush address are updated in order
+ while (FlushedUntilAddress < asyncResult.fromAddress) Thread.Yield();
// Could not add to pending flush list, treat as a failed write
- AsyncFlushPageCallback(1, 0, asyncResult);
+ // Use a special errorCode to convey that this is not from a syscall
+ AsyncFlushPageCallback(16000, 0, asyncResult);
}
}
else
diff --git a/cs/src/core/Allocator/WorkQueueLIFO.cs b/cs/src/core/Allocator/WorkQueueLIFO.cs
index 852a7dfc7..5c7ee8355 100644
--- a/cs/src/core/Allocator/WorkQueueLIFO.cs
+++ b/cs/src/core/Allocator/WorkQueueLIFO.cs
@@ -3,6 +3,8 @@
using System;
using System.Collections.Concurrent;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
@@ -12,18 +14,30 @@ namespace FASTER.core
/// Shared work queue that ensures one worker at any given time. Uses LIFO ordering of work.
///
///
- class WorkQueueLIFO
+ class WorkQueueLIFO : IDisposable
{
const int kMaxQueueSize = 1 << 30;
readonly ConcurrentStack _queue;
readonly Action _work;
- int _count;
+ private int _count;
+ private bool _disposed;
public WorkQueueLIFO(Action work)
{
_queue = new ConcurrentStack();
_work = work;
_count = 0;
+ _disposed = false;
+ }
+
+ public void Dispose()
+ {
+ _disposed = true;
+ // All future enqueue requests will no longer perform work after _disposed is set to true.
+ while (_count != 0)
+ Thread.Yield();
+ // After this point, any previous work must have completed. Even if another enqueue request manipulates the
+ // count field, they are guaranteed to see disposed and not enqueue any actual work.
}
///
@@ -32,16 +46,24 @@ public WorkQueueLIFO(Action work)
///
/// Work to enqueue
/// Process work as separate task
- public void EnqueueAndTryWork(T work, bool asTask)
+ /// whether the enqueue is successful. Enqueuing into a disposed WorkQueue will fail and the task will not be performed>
+ public bool EnqueueAndTryWork(T work, bool asTask)
{
Interlocked.Increment(ref _count);
+ if (_disposed)
+ {
+ // Remove self from count in case Dispose() is actively waiting for completion
+ Interlocked.Decrement(ref _count);
+ return false;
+ }
+
_queue.Push(work);
// Try to take over work queue processing if needed
while (true)
{
int count = _count;
- if (count >= kMaxQueueSize) return;
+ if (count >= kMaxQueueSize) return true;
if (Interlocked.CompareExchange(ref _count, count + kMaxQueueSize, count) == count)
break;
}
@@ -50,6 +72,7 @@ public void EnqueueAndTryWork(T work, bool asTask)
_ = Task.Run(() => ProcessQueue());
else
ProcessQueue();
+ return true;
}
private void ProcessQueue()
@@ -59,12 +82,12 @@ private void ProcessQueue()
{
while (_queue.TryPop(out var workItem))
{
- Interlocked.Decrement(ref _count);
try
{
_work(workItem);
}
catch { }
+ Interlocked.Decrement(ref _count);
}
int count = _count;
diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs
index 74d9f72e6..d52921706 100644
--- a/cs/src/core/Epochs/LightEpoch.cs
+++ b/cs/src/core/Epochs/LightEpoch.cs
@@ -345,8 +345,8 @@ public void BumpCurrentEpoch(Action onDrain)
i = 0;
if (++j == 500)
{
+ // Spin until there is a free entry in the drain list
j = 0;
- Debug.WriteLine("Delay finding a free entry in the drain list");
}
}
}
diff --git a/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs b/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs
index 281aafb1f..ae5e16d1e 100644
--- a/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs
+++ b/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs
@@ -29,11 +29,6 @@ public class DeviceLogCommitCheckpointManager : ILogCommitManager, ICheckpointMa
private IDevice singleLogCommitDevice;
private bool _disposed;
- ///
- /// Next commit number
- ///
- private long commitNum;
-
///
/// Track historical commits for automatic purging
///
@@ -52,7 +47,6 @@ public DeviceLogCommitCheckpointManager(INamedDeviceFactory deviceFactory, IChec
this.deviceFactory = deviceFactory;
this.checkpointNamingScheme = checkpointNamingScheme;
- this.commitNum = 0;
this.semaphore = new SemaphoreSlim(0);
this.overwriteLogCommits = overwriteLogCommits;
@@ -99,9 +93,12 @@ public DeviceLogCommitCheckpointManager(INamedDeviceFactory deviceFactory, strin
#region ILogCommitManager
///
- public unsafe void Commit(long beginAddress, long untilAddress, byte[] commitMetadata)
+ public bool PreciseCommitNumRecoverySupport() => !overwriteLogCommits;
+
+ ///
+ public unsafe void Commit(long beginAddress, long untilAddress, byte[] commitMetadata, long commitNum)
{
- var device = NextCommitDevice();
+ var device = NextCommitDevice(commitNum);
if (device == null) return;
@@ -135,6 +132,21 @@ public IEnumerable ListCommits()
return deviceFactory.ListContents(checkpointNamingScheme.FasterLogCommitBasePath()).Select(e => checkpointNamingScheme.CommitNumber(e)).OrderByDescending(e => e);
}
+ ///
+ public void RemoveCommit(long commitNum)
+ {
+ if (overwriteLogCommits)
+ throw new FasterException("removing commit by commit num is not supported when overwriting log commits");
+ deviceFactory.Delete(checkpointNamingScheme.FasterLogCommitMetadata(commitNum));
+ }
+
+ ///
+ public void RemoveAllCommits()
+ {
+ foreach (var commitNum in ListCommits())
+ RemoveCommit(commitNum);
+ }
+
///
public byte[] GetCommitMetadata(long commitNum)
{
@@ -156,7 +168,6 @@ public byte[] GetCommitMetadata(long commitNum)
else
{
device = deviceFactory.Get(checkpointNamingScheme.FasterLogCommitMetadata(commitNum));
- this.commitNum = commitNum + 1;
}
if (device == null) return null;
@@ -176,14 +187,14 @@ public byte[] GetCommitMetadata(long commitNum)
return new Span(body).Slice(sizeof(int)).ToArray();
}
- private IDevice NextCommitDevice()
+ private IDevice NextCommitDevice(long commitNum)
{
if (overwriteLogCommits)
{
if (_disposed) return null;
if (singleLogCommitDevice == null)
{
- singleLogCommitDevice = deviceFactory.Get(checkpointNamingScheme.FasterLogCommitMetadata(commitNum));
+ singleLogCommitDevice = deviceFactory.Get(checkpointNamingScheme.FasterLogCommitMetadata(0));
if (_disposed)
{
singleLogCommitDevice?.Dispose();
@@ -194,11 +205,11 @@ private IDevice NextCommitDevice()
return singleLogCommitDevice;
}
- return deviceFactory.Get(checkpointNamingScheme.FasterLogCommitMetadata(commitNum++));
+ var result = deviceFactory.Get(checkpointNamingScheme.FasterLogCommitMetadata(commitNum));
+ return result;
}
#endregion
-
-
+
#region ICheckpointManager
///
public unsafe void CommitIndexCheckpoint(Guid indexToken, byte[] commitMetadata)
diff --git a/cs/src/core/Index/FasterLog/FasterLog.cs b/cs/src/core/Index/FasterLog/FasterLog.cs
index 99a3536db..3b3dd5eed 100644
--- a/cs/src/core/Index/FasterLog/FasterLog.cs
+++ b/cs/src/core/Index/FasterLog/FasterLog.cs
@@ -1,8 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
-#pragma warning disable 0162
-
using System;
using System.Buffers;
using System.Collections.Concurrent;
@@ -19,7 +17,7 @@ namespace FASTER.core
///
/// FASTER log
///
- public class FasterLog : IDisposable
+ public sealed class FasterLog : IDisposable
{
private readonly BlittableAllocator allocator;
private readonly LightEpoch epoch;
@@ -31,22 +29,33 @@ public class FasterLog : IDisposable
private readonly WorkQueueLIFO commitQueue;
internal readonly bool readOnlyMode;
+ internal readonly bool fastCommitMode;
+
+ private TaskCompletionSource commitTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
+ private TaskCompletionSource refreshUncommittedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
- private TaskCompletionSource commitTcs
- = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
- private TaskCompletionSource refreshUncommittedTcs
- = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ // Offsets for all currently unprocessed commit records
+ private readonly Queue<(long, FasterLogRecoveryInfo)> ongoingCommitRequests;
+ private long commitNum;
///
/// Beginning address of log
///
- public long BeginAddress => allocator.BeginAddress;
+ public long BeginAddress => beginAddress;
+
+ // Here's a soft begin address that is observed by all access at the FasterLog level but not actually on the
+ // allocator. This is to make sure that any potential physical deletes only happen after commit.
+ private long beginAddress;
///
/// Tail address of log
///
public long TailAddress => allocator.GetTailAddress();
+ // Used to track the last commit record and commits that have been issued, to stop commits from committing
+ // without any user records
+ private long commitCoveredAddress;
+
///
/// Log flushed until address
///
@@ -72,6 +81,11 @@ private TaskCompletionSource refreshUncommittedTcs
///
public long CommittedBeginAddress;
+ ///
+ /// Recovered Commit Cookie
+ ///
+ public byte[] RecoveredCookie;
+
///
/// Task notifying commit completions
///
@@ -90,18 +104,12 @@ private TaskCompletionSource refreshUncommittedTcs
///
/// Table of persisted iterators
///
- internal readonly ConcurrentDictionary PersistedIterators
- = new ConcurrentDictionary();
-
- ///
- /// Version number to track changes to commit metadata (begin address and persisted iterators)
- ///
- private long commitMetadataVersion;
+ internal readonly ConcurrentDictionary PersistedIterators = new();
///
/// Committed view of commitMetadataVersion
///
- private long persistedCommitMetadataVersion;
+ private long persistedCommitNum;
internal Dictionary LastPersistedIterators;
@@ -115,10 +123,21 @@ internal readonly ConcurrentDictionary PersistedI
/// Create new log instance
///
///
- public FasterLog(FasterLogSettings logSettings)
+ /// specific commit number to recover from (or -1 for latest)
+ public FasterLog(FasterLogSettings logSettings, long requestedCommitNum = -1)
: this(logSettings, false)
{
- this.RecoveredIterators = Restore();
+ Dictionary it;
+ if (requestedCommitNum == -1)
+ RestoreLatest(out it, out RecoveredCookie);
+ else
+ {
+ if (!logCommitManager.PreciseCommitNumRecoverySupport())
+ throw new FasterException("Recovering to a specific commit is not supported for given log setting");
+ RestoreSpecificCommit(requestedCommitNum, out it, out RecoveredCookie);
+ }
+
+ RecoveredIterators = it;
}
///
@@ -129,7 +148,9 @@ public FasterLog(FasterLogSettings logSettings)
public static async ValueTask CreateAsync(FasterLogSettings logSettings, CancellationToken cancellationToken = default)
{
var fasterLog = new FasterLog(logSettings, false);
- fasterLog.RecoveredIterators = await fasterLog.RestoreAsync(cancellationToken).ConfigureAwait(false);
+ var (it, cookie) = await fasterLog.RestoreLatestAsync(cancellationToken).ConfigureAwait(false);
+ fasterLog.RecoveredIterators = it;
+ fasterLog.RecoveredCookie = cookie;
return fasterLog;
}
@@ -159,6 +180,7 @@ private FasterLog(FasterLogSettings logSettings, bool oldCommitManager)
logSettings.GetLogSettings(), null,
null, epoch, CommitCallback);
allocator.Initialize();
+ beginAddress = allocator.BeginAddress;
// FasterLog is used as a read-only iterator
if (logSettings.ReadOnlyMode)
@@ -167,6 +189,9 @@ private FasterLog(FasterLogSettings logSettings, bool oldCommitManager)
allocator.HeadAddress = long.MaxValue;
}
+ fastCommitMode = logSettings.FastCommitMode;
+
+ ongoingCommitRequests = new Queue<(long, FasterLogRecoveryInfo)>();
}
///
@@ -180,6 +205,7 @@ public void Dispose()
internal void TrueDispose()
{
+ commitQueue.Dispose();
commitTcs.TrySetException(new ObjectDisposedException("Log has been disposed"));
allocator.Dispose();
epoch.Dispose();
@@ -251,7 +277,7 @@ public unsafe bool TryEnqueue(byte[] entry, out long logicalAddress)
epoch.Suspend();
return false;
}
-
+
var physicalAddress = allocator.GetPhysicalAddress(logicalAddress);
fixed (byte* bp = entry)
Buffer.MemoryCopy(bp, (void*)(headerSize + physicalAddress), length, length);
@@ -282,7 +308,7 @@ public unsafe bool TryEnqueue(ReadOnlySpan entry, out long logicalAddress)
epoch.Suspend();
return false;
}
-
+
var physicalAddress = allocator.GetPhysicalAddress(logicalAddress);
fixed (byte* bp = &entry.GetPinnableReference())
Buffer.MemoryCopy(bp, (void*)(headerSize + physicalAddress), length, length);
@@ -425,7 +451,8 @@ public void WaitForCommit(long untilAddress = 0)
var tailAddress = untilAddress;
if (tailAddress == 0) tailAddress = allocator.GetTailAddress();
- while (CommittedUntilAddress < tailAddress || persistedCommitMetadataVersion < commitMetadataVersion) Thread.Yield();
+ var observedCommitNum = commitNum;
+ while (CommittedUntilAddress < tailAddress || persistedCommitNum < observedCommitNum) Thread.Yield();
}
///
@@ -443,13 +470,11 @@ public async ValueTask WaitForCommitAsync(long untilAddress = 0, CancellationTok
var tailAddress = untilAddress;
if (tailAddress == 0) tailAddress = allocator.GetTailAddress();
- while (CommittedUntilAddress < tailAddress || persistedCommitMetadataVersion < commitMetadataVersion)
+ var observedCommitNum = commitNum;
+ while (CommittedUntilAddress < tailAddress || persistedCommitNum < observedCommitNum)
{
var linkedCommitInfo = await task.WithCancellationAsync(token).ConfigureAwait(false);
- if (linkedCommitInfo.CommitInfo.UntilAddress < tailAddress || persistedCommitMetadataVersion < commitMetadataVersion)
- task = linkedCommitInfo.NextTask;
- else
- break;
+ task = linkedCommitInfo.NextTask;
}
}
#endregion
@@ -460,10 +485,34 @@ public async ValueTask WaitForCommitAsync(long untilAddress = 0, CancellationTok
/// Issue commit request for log (until tail)
///
/// If true, spin-wait until commit completes. Otherwise, issue commit and return immediately.
- ///
+ /// whether there is anything to commit.
+
public void Commit(bool spinWait = false)
{
- CommitInternal(spinWait);
+ CommitInternal(out _, out _, spinWait, true, null, -1);
+ }
+
+ ///
+ /// Issue a strong commit request for log (until tail) with the given commitNum
+ ///
+ /// The tail committed by this call
+ ///
+ /// A unique, monotonically increasing identifier for the commit that can be used to recover to exactly this commit
+ ///
+ /// If true, spin-wait until commit completes. Otherwise, issue commit and return immediately
+ ///
+ /// A custom piece of metadata to be associated with this commit. If commit is successful, any recovery from
+ /// this commit will recover the cookie in RecoveredCookie field. Note that cookies are not stored by FasterLog
+ /// itself, so the user is responsible for tracking cookie content and supplying it to every commit call if needed
+ ///
+ ///
+ /// Proposal for the identifier to use for this commit, or -1 if the system should pick one. If supplied with
+ /// a non -1 value, commit is guaranteed to have the supplied identifier if commit call is successful
+ ///
+ /// Whether commit is successful
+ public bool CommitStrongly(out long commitTail, out long actualCommitNum, bool spinWait = false, byte[] cookie = null, long proposedCommitNum = -1)
+ {
+ return CommitInternal(out commitTail, out actualCommitNum, spinWait, false, cookie, proposedCommitNum);
}
///
@@ -476,15 +525,13 @@ public async ValueTask CommitAsync(CancellationToken token = default)
{
token.ThrowIfCancellationRequested();
var task = CommitTask;
- var tailAddress = CommitInternal();
+ if (!CommitInternal(out var tailAddress, out var actualCommitNum, false, true, null, -1))
+ return;
- while (CommittedUntilAddress < tailAddress || persistedCommitMetadataVersion < commitMetadataVersion)
+ while (CommittedUntilAddress < tailAddress || persistedCommitNum < actualCommitNum)
{
var linkedCommitInfo = await task.WithCancellationAsync(token).ConfigureAwait(false);
- if (linkedCommitInfo.CommitInfo.UntilAddress < tailAddress || persistedCommitMetadataVersion < commitMetadataVersion)
- task = linkedCommitInfo.NextTask;
- else
- break;
+ task = linkedCommitInfo.NextTask;
}
}
@@ -498,12 +545,14 @@ public async ValueTask> CommitAsync(Task> CommitAsync(Task
+ /// Issue commit request for log (until tail) with the given commitNum
+ ///
+ ///
+ /// A custom piece of metadata to be associated with this commit. If commit is successful, any recovery from
+ /// this commit will recover the cookie in RecoveredCookie field. Note that cookies are not stored by FasterLog
+ /// itself, so the user is responsible for tracking cookie content and supplying it to every commit call if needed
+ ///
+ ///
+ /// Proposal for the identifier to use for this commit, or -1 if the system should pick one. If supplied with
+ /// a non -1 value, commit is guaranteed to have the supplied identifier if commit call is successful
+ ///
+ /// Cancellation token
+ /// Whether commit is successful, commit tail, and actual commit number
+ public async ValueTask<(bool success, long commitTail, long actualCommitNum)> CommitStronglyAsync(byte[] cookie = null, long proposedCommitNum = -1, CancellationToken token = default)
+ {
+ token.ThrowIfCancellationRequested();
+ var task = CommitTask;
+ if (!CommitInternal(out var commitTail, out var actualCommitNum, false, false, cookie, proposedCommitNum))
+ return (false, commitTail, actualCommitNum);
+
+ while (CommittedUntilAddress < commitTail || persistedCommitNum < actualCommitNum)
+ {
+ var linkedCommitInfo = await task.WithCancellationAsync(token).ConfigureAwait(false);
+ task = linkedCommitInfo.NextTask;
+ }
+
+ return (true, commitTail, actualCommitNum);
+ }
+
///
/// Trigger a refresh of information about uncommitted part of log (tail address) to ensure visibility
/// to uncommitted scan iterators. Will cause SafeTailAddress to reflect the current tail address.
@@ -758,7 +837,7 @@ public async ValueTask EnqueueAndWaitForCommitAsync(IReadOnlySpanBatch rea
/// Until address
public void TruncateUntil(long untilAddress)
{
- allocator.ShiftBeginAddress(untilAddress);
+ Utility.MonotonicUpdate(ref beginAddress, untilAddress, out _);
}
///
@@ -769,7 +848,7 @@ public void TruncateUntil(long untilAddress)
/// Until address
public void TruncateUntilPageStart(long untilAddress)
{
- allocator.ShiftBeginAddress(untilAddress & ~allocator.PageSizeMask);
+ Utility.MonotonicUpdate(ref beginAddress, untilAddress & ~allocator.PageSizeMask, out _);
}
///
@@ -906,7 +985,7 @@ public async ValueTask ReadRecordLengthAsync(long address, CancellationToke
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- private int Align(int length)
+ private static int Align(int length)
{
return (length + 3) & ~3;
}
@@ -916,49 +995,123 @@ private int Align(int length)
///
private void CommitCallback(CommitInfo commitInfo)
{
+ // Using count is safe as a fast filtering mechanism to reduce number of invocations despite concurrency
+ if (ongoingCommitRequests.Count == 0) return;
commitQueue.EnqueueAndTryWork(commitInfo, asTask: true);
}
- private void SerialCommitCallbackWorker(CommitInfo commitInfo)
+ private unsafe bool TryEnqueueCommitRecord(ref FasterLogRecoveryInfo info)
{
- // Check if commit is already covered
- if (CommittedBeginAddress >= BeginAddress &&
- CommittedUntilAddress >= commitInfo.UntilAddress &&
- persistedCommitMetadataVersion >= commitMetadataVersion &&
- commitInfo.ErrorCode == 0)
- return;
+ var entryBodySize = info.SerializedSize();
+
+ int allocatedLength = headerSize + Align(entryBodySize);
+ ValidateAllocatedLength(allocatedLength);
+
+ epoch.Resume();
+
+ var logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
+ if (logicalAddress == 0)
+ {
+ epoch.Suspend();
+ return false;
+ }
+ // Finish filling in all fields
+ info.BeginAddress = BeginAddress;
+ info.UntilAddress = logicalAddress + allocatedLength;
- if (commitInfo.ErrorCode == 0)
+ var physicalAddress = allocator.GetPhysicalAddress(logicalAddress);
+
+ var entryBody = info.ToByteArray();
+ fixed (byte* bp = entryBody)
+ Buffer.MemoryCopy(bp, (void*)(headerSize + physicalAddress), entryBody.Length, entryBody.Length);
+ SetCommitRecordHeader(entryBody.Length, (byte*)physicalAddress);
+ epoch.Suspend();
+ // Return the commit tail
+ return true;
+ }
+
+ private bool ShouldCommmitMetadata(ref FasterLogRecoveryInfo info)
+ {
+ return beginAddress > CommittedBeginAddress || IteratorsChanged(ref info) || info.Cookie != null;
+ }
+
+ private void CommitMetadataOnly(ref FasterLogRecoveryInfo info, bool spinWait)
+ {
+ var fromAddress = CommittedUntilAddress > info.BeginAddress ? CommittedUntilAddress : info.BeginAddress;
+ var untilAddress = CommittedUntilAddress > info.BeginAddress ? CommittedUntilAddress : info.BeginAddress;
+
+ CommitCallback(new CommitInfo
+ {
+ FromAddress = fromAddress,
+ UntilAddress = untilAddress,
+ ErrorCode = 0,
+ });
+
+ if (spinWait)
{
- // Capture CMV first, so metadata prior to CMV update is visible to commit
- long _localCMV = commitMetadataVersion;
+ while (info.CommitNum < persistedCommitNum)
+ Thread.Yield();
+ }
+ }
+
+ private void UpdateCommittedState(FasterLogRecoveryInfo recoveryInfo)
+ {
+ LastPersistedIterators = recoveryInfo.Iterators;
+ CommittedBeginAddress = recoveryInfo.BeginAddress;
+ CommittedUntilAddress = recoveryInfo.UntilAddress;
+ recoveryInfo.CommitIterators(PersistedIterators);
+ Utility.MonotonicUpdate(ref persistedCommitNum, recoveryInfo.CommitNum, out _);
+ }
- if (CommittedUntilAddress > commitInfo.FromAddress)
- commitInfo.FromAddress = CommittedUntilAddress;
- if (CommittedUntilAddress > commitInfo.UntilAddress)
- commitInfo.UntilAddress = CommittedUntilAddress;
+ private void WriteCommitMetadata(FasterLogRecoveryInfo recoveryInfo)
+ {
+ // TODO(Tianyu): If fast commit, write this in separate thread?
+ logCommitManager.Commit(recoveryInfo.BeginAddress, recoveryInfo.UntilAddress,
+ recoveryInfo.ToByteArray(), recoveryInfo.CommitNum);
+ // If not fast committing, set committed state as we commit metadata explicitly only after metadata commit
+ if (!fastCommitMode)
+ UpdateCommittedState(recoveryInfo);
+ // Issue any potential physical deletes due to shifts in begin address
+ allocator.ShiftBeginAddress(recoveryInfo.BeginAddress);
+ }
- FasterLogRecoveryInfo info = new FasterLogRecoveryInfo
+ private void SerialCommitCallbackWorker(CommitInfo commitInfo)
+ {
+ if (commitInfo.ErrorCode == 0)
+ {
+ var coveredCommits = new List();
+ // Check for the commit records included in this flush
+ lock (ongoingCommitRequests)
{
- BeginAddress = BeginAddress,
- FlushedUntilAddress = commitInfo.UntilAddress
- };
+ while (ongoingCommitRequests.Count != 0)
+ {
+ var (addr, recoveryInfo) = ongoingCommitRequests.Peek();
+ if (addr > commitInfo.UntilAddress) break;
+ coveredCommits.Add(recoveryInfo);
+ ongoingCommitRequests.Dequeue();
+ }
+ }
- // Take snapshot of persisted iterators
- info.SnapshotIterators(PersistedIterators);
+ // Nothing was committed --- this was probably au auto-flush. Return now without touching any
+ // commit task tracking.
+ if (coveredCommits.Count == 0) return;
- logCommitManager.Commit(info.BeginAddress, info.FlushedUntilAddress, info.ToByteArray());
+ var latestCommit = coveredCommits[coveredCommits.Count - 1];
+ if (fastCommitMode)
+ // In fast commit mode, can safely set committed state to the latest flushed
+ UpdateCommittedState(latestCommit);
- LastPersistedIterators = info.Iterators;
- CommittedBeginAddress = info.BeginAddress;
- CommittedUntilAddress = info.FlushedUntilAddress;
- if (_localCMV > persistedCommitMetadataVersion)
- persistedCommitMetadataVersion = _localCMV;
+ foreach (var recoveryInfo in coveredCommits)
+ {
+ // Only write out commit metadata if user cares about this as a distinct recoverable point
+ if (!recoveryInfo.FastForwardAllowed) WriteCommitMetadata(recoveryInfo);
+ }
- // Update completed address for persisted iterators
- info.CommitIterators(PersistedIterators);
+ // We fast-forwarded commits earlier, so write it out if not covered by another commit
+ if (latestCommit.FastForwardAllowed) WriteCommitMetadata(latestCommit);
}
+ // TODO(Tianyu): Can invoke earlier in the case of fast commit
var _commitTcs = commitTcs;
commitTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var lci = new LinkedCommitInfo
@@ -973,22 +1126,20 @@ private void SerialCommitCallbackWorker(CommitInfo commitInfo)
_commitTcs.TrySetException(new CommitFailureException(lci, $"Commit of address range [{commitInfo.FromAddress}-{commitInfo.UntilAddress}] failed with error code {commitInfo.ErrorCode}"));
}
- private bool IteratorsChanged()
+ private bool IteratorsChanged(ref FasterLogRecoveryInfo info)
{
var _lastPersistedIterators = LastPersistedIterators;
if (_lastPersistedIterators == null)
{
- if (PersistedIterators.Count == 0)
- return false;
- return true;
+ return info.Iterators != null && info.Iterators.Count != 0;
}
- if (_lastPersistedIterators.Count != PersistedIterators.Count)
+ if (_lastPersistedIterators.Count != info.Iterators.Count)
return true;
foreach (var item in _lastPersistedIterators)
{
- if (PersistedIterators.TryGetValue(item.Key, out var other))
+ if (info.Iterators.TryGetValue(item.Key, out var other))
{
- if (item.Value != other.requestedCompletedUntilAddress) return true;
+ if (item.Value != other) return true;
}
else
return true;
@@ -1014,6 +1165,8 @@ private void UpdateTailCallback(long tailAddress)
}
}
+
+ // TODO(Tianyu): Will we ever need to recover to a specific commit for read-only cases?
///
/// Synchronously recover instance to FasterLog's latest commit, when being used as a readonly log iterator
///
@@ -1022,7 +1175,7 @@ public void RecoverReadOnly()
if (!readOnlyMode)
throw new FasterException("This method can only be used with a read-only FasterLog instance used for iteration. Set FasterLogSettings.ReadOnlyMode to true during creation to indicate this.");
- this.Restore();
+ this.RestoreLatest(out _, out _);
SignalWaitingROIterators();
}
@@ -1034,7 +1187,7 @@ public async ValueTask RecoverReadOnlyAsync(CancellationToken cancellationToken
if (!readOnlyMode)
throw new FasterException("This method can only be used with a read-only FasterLog instance used for iteration. Set FasterLogSettings.ReadOnlyMode to true during creation to indicate this.");
- await this.RestoreAsync(cancellationToken).ConfigureAwait(false);
+ await this.RestoreLatestAsync(cancellationToken).ConfigureAwait(false);
SignalWaitingROIterators();
}
@@ -1058,86 +1211,216 @@ private void SignalWaitingROIterators()
_commitTcs?.TrySetResult(lci);
}
- ///
- /// Restore log synchronously
- ///
- private Dictionary Restore()
+ private bool LoadCommitMetadata(long commitNum, out FasterLogRecoveryInfo info)
{
- foreach (var commitNum in logCommitManager.ListCommits())
+ var commitInfo = logCommitManager.GetCommitMetadata(commitNum);
+ if (commitInfo is null)
+ {
+ info = default;
+ return false;
+ }
+
+ info = new FasterLogRecoveryInfo();
+ using (BinaryReader r = new(new MemoryStream(commitInfo)))
+ {
+ info.Initialize(r);
+ }
+
+ if (info.CommitNum == -1)
+ info.CommitNum = commitNum;
+
+ return true;
+ }
+
+ private void RestoreLatest(out Dictionary iterators, out byte[] cookie)
+ {
+ iterators = null;
+ cookie = null;
+ FasterLogRecoveryInfo info = new();
+
+ foreach (var metadataCommit in logCommitManager.ListCommits())
{
try
{
- if (!PrepareToRestoreFromCommit(commitNum, out FasterLogRecoveryInfo info, out long headAddress))
- return default;
+ if (LoadCommitMetadata(metadataCommit, out info))
+ break;
+ }
+ catch { }
+ }
+
+ // Only in fast commit mode will we potentially need to recover from an entry in the log
+ if (fastCommitMode)
+ {
+ // Shut up safe guards, I know what I am doing
+ CommittedUntilAddress = long.MaxValue;
+ beginAddress = info.BeginAddress;
+ allocator.HeadAddress = long.MaxValue;
+ using var scanIterator = Scan(info.UntilAddress, long.MaxValue, recover: false);
+ scanIterator.ScanForwardForCommit(ref info);
+ }
+
+ // if until address is 0, that means info is still its default value and we haven't been able to recover
+ // from any any commit. Set the log to its start position and return
+ if (info.UntilAddress == 0)
+ {
+ Debug.WriteLine("Unable to recover using any available commit");
+ // Reset things to be something normal lol
+ allocator.Initialize();
+ CommittedUntilAddress = Constants.kFirstValidAddress;
+ beginAddress = allocator.BeginAddress;
+ if (readOnlyMode)
+ allocator.HeadAddress = long.MaxValue;
+ return;
+ }
+
+ if (!readOnlyMode)
+ {
+ var headAddress = info.UntilAddress - allocator.GetOffsetInPage(info.UntilAddress);
+ if (info.BeginAddress > headAddress)
+ headAddress = info.BeginAddress;
+
+ if (headAddress == 0)
+ headAddress = Constants.kFirstValidAddress;
+
+ allocator.RestoreHybridLog(info.BeginAddress, headAddress, info.UntilAddress, info.UntilAddress);
+ }
- if (headAddress > 0)
- allocator.RestoreHybridLog(info.BeginAddress, headAddress, info.FlushedUntilAddress, info.FlushedUntilAddress);
+ iterators = CompleteRestoreFromCommit(info);
+ cookie = info.Cookie;
+ commitNum = info.CommitNum;
+ beginAddress = allocator.BeginAddress;
+ if (readOnlyMode)
+ allocator.HeadAddress = long.MaxValue;
+ }
- return CompleteRestoreFromCommit(info);
+ private void RestoreSpecificCommit(long requestedCommitNum, out Dictionary iterators, out byte[] cookie)
+ {
+ iterators = null;
+ cookie = null;
+ FasterLogRecoveryInfo info = new();
+
+ // Find the closest commit metadata with commit num smaller than requested
+ long scanStart = 0;
+ foreach (var metadataCommit in logCommitManager.ListCommits())
+ {
+ if (metadataCommit > requestedCommitNum) continue;
+ try
+ {
+ if (LoadCommitMetadata(metadataCommit, out info))
+ {
+ scanStart = metadataCommit;
+ break;
+ }
}
catch { }
}
- Debug.WriteLine("Unable to recover using any available commit");
- return null;
+
+ // Need to potentially scan log for the entry
+ if (scanStart < requestedCommitNum)
+ {
+ // If not in fast commit mode, do not scan log
+ if (!fastCommitMode)
+ // In the case where precisely requested commit num is not available, can just throw exception
+ throw new FasterException("requested commit num is not available");
+
+ // If no exact metadata is found, scan forward to see if we able to find a commit entry
+ // Shut up safe guards, I know what I am doing
+ CommittedUntilAddress = long.MaxValue;
+ beginAddress = info.BeginAddress;
+ allocator.HeadAddress = long.MaxValue;
+ using var scanIterator = Scan(info.UntilAddress, long.MaxValue, recover: false);
+ if (!scanIterator.ScanForwardForCommit(ref info, requestedCommitNum))
+ throw new FasterException("requested commit num is not available");
+ }
+
+ // At this point, we should have found the exact commit num requested
+ Debug.Assert(info.CommitNum == requestedCommitNum);
+ if (!readOnlyMode)
+ {
+ var headAddress = info.UntilAddress - allocator.GetOffsetInPage(info.UntilAddress);
+ if (info.BeginAddress > headAddress)
+ headAddress = info.BeginAddress;
+
+ if (headAddress == 0)
+ headAddress = Constants.kFirstValidAddress;
+ allocator.RestoreHybridLog(info.BeginAddress, headAddress, info.UntilAddress, info.UntilAddress);
+ }
+
+ iterators = CompleteRestoreFromCommit(info);
+ cookie = info.Cookie;
+ commitNum = info.CommitNum;
+ beginAddress = allocator.BeginAddress;
+ if (readOnlyMode)
+ allocator.HeadAddress = long.MaxValue;
}
///
/// Restore log asynchronously
///
- private async ValueTask> RestoreAsync(CancellationToken cancellationToken)
+ private async ValueTask<(Dictionary, byte[])> RestoreLatestAsync(CancellationToken cancellationToken)
{
- foreach (var commitNum in logCommitManager.ListCommits())
+ FasterLogRecoveryInfo info = new();
+
+ foreach (var metadataCommit in logCommitManager.ListCommits())
{
try
{
- if (!PrepareToRestoreFromCommit(commitNum, out FasterLogRecoveryInfo info, out long headAddress))
- return default;
-
- if (headAddress > 0)
- await allocator.RestoreHybridLogAsync(info.BeginAddress, headAddress, info.FlushedUntilAddress, info.FlushedUntilAddress, cancellationToken: cancellationToken).ConfigureAwait(false);
-
- return CompleteRestoreFromCommit(info);
+ if (LoadCommitMetadata(metadataCommit, out info))
+ break;
}
catch { }
}
- Debug.WriteLine("Unable to recover using any available commit");
- return null;
- }
- private bool PrepareToRestoreFromCommit(long commitNum, out FasterLogRecoveryInfo info, out long headAddress)
- {
- headAddress = 0;
- var commitInfo = logCommitManager.GetCommitMetadata(commitNum);
- if (commitInfo is null)
+ // Only in fast commit mode will we potentially need to recover from an entry in the log
+ if (fastCommitMode)
{
- info = default;
- return false;
+ // Shut up safe guards, I know what I am doing
+ CommittedUntilAddress = long.MaxValue;
+ beginAddress = info.BeginAddress;
+ allocator.HeadAddress = long.MaxValue;
+ using var scanIterator = Scan(info.UntilAddress, long.MaxValue, recover: false);
+ scanIterator.ScanForwardForCommit(ref info);
}
- info = new FasterLogRecoveryInfo();
- using (BinaryReader r = new(new MemoryStream(commitInfo)))
+ // if until address is 0, that means info is still its default value and we haven't been able to recover
+ // from any any commit. Set the log to its start position and return
+ if (info.UntilAddress == 0)
{
- info.Initialize(r);
+ Debug.WriteLine("Unable to recover using any available commit");
+ // Reset things to be something normal lol
+ allocator.Initialize();
+ CommittedUntilAddress = Constants.kFirstValidAddress;
+ beginAddress = allocator.BeginAddress;
+ if (readOnlyMode)
+ allocator.HeadAddress = long.MaxValue;
+ return ValueTuple.Create, byte[]>(new Dictionary(), null);
}
-
+
if (!readOnlyMode)
{
- headAddress = info.FlushedUntilAddress - allocator.GetOffsetInPage(info.FlushedUntilAddress);
+ var headAddress = info.UntilAddress - allocator.GetOffsetInPage(info.UntilAddress);
if (info.BeginAddress > headAddress)
headAddress = info.BeginAddress;
if (headAddress == 0)
headAddress = Constants.kFirstValidAddress;
+ await allocator.RestoreHybridLogAsync(info.BeginAddress, headAddress, info.UntilAddress, info.UntilAddress, cancellationToken : cancellationToken).ConfigureAwait(false);
}
- return true;
+ var iterators = CompleteRestoreFromCommit(info);
+ var cookie = info.Cookie;
+ commitNum = info.CommitNum;
+ beginAddress = allocator.BeginAddress;
+ if (readOnlyMode)
+ allocator.HeadAddress = long.MaxValue;
+ return ValueTuple.Create(iterators, cookie);
}
private Dictionary CompleteRestoreFromCommit(FasterLogRecoveryInfo info)
{
- CommittedUntilAddress = info.FlushedUntilAddress;
+ CommittedUntilAddress = info.UntilAddress;
CommittedBeginAddress = info.BeginAddress;
- SafeTailAddress = info.FlushedUntilAddress;
+ SafeTailAddress = info.UntilAddress;
// Fix uncommitted addresses in iterators
var recoveredIterators = info.Iterators;
@@ -1174,6 +1457,7 @@ private unsafe bool TryAppend(IReadOnlySpanBatch readOnlySpanBatch, out long log
epoch.Resume();
logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
+
if (logicalAddress == 0)
{
epoch.Suspend();
@@ -1330,42 +1614,96 @@ private int GetRecordLengthAndFree(SectorAlignedMemory record)
return length;
}
- private long CommitInternal(bool spinWait = false)
+ private bool CommitInternal(out long commitTail, out long actualCommitNum, bool spinWait, bool allowFastForward, byte[] cookie, long proposedCommitNum)
{
+ commitTail = actualCommitNum = 0;
+
if (readOnlyMode)
throw new FasterException("Cannot commit in read-only mode");
+ if (allowFastForward && (cookie != null || proposedCommitNum != -1))
+ throw new FasterException(
+ "Fast forwarding a commit is only allowed when no cookie and not commit num is specified");
+
+ var info = new FasterLogRecoveryInfo
+ {
+ FastForwardAllowed = allowFastForward
+ };
+
+ // This critical section serializes commit record creation / commit content generation and ensures that the
+ // long address are sorted in outstandingCommitRecords. Ok because we do not expect heavy contention on the
+ // commit code path
+ lock (ongoingCommitRequests)
+ {
+ // Compute regular information about the commit
+ info.Cookie = cookie;
+ info.SnapshotIterators(PersistedIterators);
+
+ if (commitCoveredAddress == TailAddress && !ShouldCommmitMetadata(ref info))
+ // Nothing to commit if no metadata update and no new entries
+ return false;
+
+
+ if (proposedCommitNum == -1)
+ info.CommitNum = actualCommitNum = ++commitNum;
+ else if (proposedCommitNum > commitNum)
+ info.CommitNum = actualCommitNum = commitNum = proposedCommitNum;
+ else
+ // Invalid commit num
+ return false;
+
+ if (fastCommitMode)
+ {
+ // Ok to retry in critical section, any concurrently invoked commit would block, but cannot progress
+ // anyways if no record can be enqueued
+ while (!TryEnqueueCommitRecord(ref info)) Thread.Yield();
+ commitTail = info.UntilAddress;
+ }
+ else
+ {
+ // If not using fastCommitMode, do not need to allocate a commit record. Instead, set the content
+ // of this commit to the current tail and base all commit metadata on this address, even though
+ // perhaps more entries will be flushed as part of this commit
+ info.BeginAddress = BeginAddress;
+ info.UntilAddress = commitTail = TailAddress;
+ }
+ Utility.MonotonicUpdate(ref commitCoveredAddress, commitTail, out _);
+
+ // Enqueue the commit record's content and offset into the queue so it can be picked up by the next flush
+ // At this point, we expect the commit record to be flushed out as a distinct recovery point
+ ongoingCommitRequests.Enqueue(ValueTuple.Create(commitTail, info));
+ }
+
+
+ // Need to check, however, that a concurrent flush hasn't already advanced flushed address past this
+ // commit. If so, need to manually trigger another commit callback in case the one triggered by the flush
+ // already finished execution and missed our commit record
+ if (commitTail < FlushedUntilAddress)
+ {
+ CommitMetadataOnly(ref info, spinWait);
+ return true;
+ }
+
+ // Otherwise, move to set read-only tail and flush
epoch.Resume();
- if (allocator.ShiftReadOnlyToTail(out long tailAddress, out _))
+
+ if (allocator.ShiftReadOnlyToTail(out _, out _))
{
if (spinWait)
{
- while (CommittedUntilAddress < tailAddress)
+ while (CommittedUntilAddress < commitTail)
{
epoch.ProtectAndDrain();
Thread.Yield();
}
}
- epoch.Suspend();
}
else
{
- // May need to commit begin address and/or iterators
- epoch.Suspend();
- var beginAddress = allocator.BeginAddress;
- if (beginAddress > CommittedBeginAddress || IteratorsChanged())
- {
- Interlocked.Increment(ref commitMetadataVersion);
- CommitCallback(new CommitInfo
- {
- FromAddress = CommittedUntilAddress > beginAddress ? CommittedUntilAddress : beginAddress,
- UntilAddress = CommittedUntilAddress > beginAddress ? CommittedUntilAddress : beginAddress,
- ErrorCode = 0
- });
- }
+ CommitMetadataOnly(ref info, spinWait);
}
-
- return tailAddress;
+ epoch.Suspend();
+ return true;
}
private long RefreshUncommittedInternal(bool spinWait = false)
@@ -1434,6 +1772,22 @@ private unsafe void SetHeader(int length, byte* dest)
*(ulong*)dest = Utility.XorBytes(dest + 8, length + 4);
}
}
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private unsafe void SetCommitRecordHeader(int length, byte* dest)
+ {
+ // commit record has negative length field to differentiate from normal records
+ if (logChecksum == LogChecksumType.None)
+ {
+ *(int*)dest = -length;
+ return;
+ }
+ else if (logChecksum == LogChecksumType.PerEntry)
+ {
+ *(int*)(dest + 8) = -length;
+ *(ulong*)dest = Utility.XorBytes(dest + 8, length + 4);
+ }
+ }
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ValidateAllocatedLength(int numSlots)
diff --git a/cs/src/core/Index/FasterLog/FasterLogIterator.cs b/cs/src/core/Index/FasterLog/FasterLogIterator.cs
index 77395d25e..a5124c64e 100644
--- a/cs/src/core/Index/FasterLog/FasterLogIterator.cs
+++ b/cs/src/core/Index/FasterLog/FasterLogIterator.cs
@@ -5,6 +5,7 @@
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
+using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
@@ -201,16 +202,42 @@ public unsafe bool GetNext(out byte[] entry, out int entryLength, out long curre
nextAddress = default;
return false;
}
-
epoch.Resume();
- if (GetNextInternal(out long physicalAddress, out entryLength, out currentAddress, out nextAddress))
+ // Continue looping until we find a record that is not a commit record
+ while (true)
{
+ long physicalAddress;
+ bool isCommitRecord;
+ try
+ {
+ var hasNext = GetNextInternal(out physicalAddress, out entryLength, out currentAddress,
+ out nextAddress,
+ out isCommitRecord);
+ if (!hasNext)
+ {
+ entry = default;
+ epoch.Suspend();
+ return false;
+ }
+ }
+ catch (Exception)
+ {
+ // Throw upwards, but first, suspend the epoch we are in
+ epoch.Suspend();
+ throw;
+ }
+
+ if (isCommitRecord) continue;
+
if (getMemory != null)
{
// Use user delegate to allocate memory
entry = getMemory(entryLength);
if (entry.Length < entryLength)
+ {
+ epoch.Suspend();
throw new FasterException("Byte array provided has invalid length");
+ }
}
else
{
@@ -219,15 +246,11 @@ public unsafe bool GetNext(out byte[] entry, out int entryLength, out long curre
}
fixed (byte* bp = entry)
- Buffer.MemoryCopy((void*)(headerSize + physicalAddress), bp, entryLength, entryLength);
+ Buffer.MemoryCopy((void*) (headerSize + physicalAddress), bp, entryLength, entryLength);
epoch.Suspend();
return true;
}
-
- entry = default;
- epoch.Suspend();
- return false;
}
///
@@ -264,8 +287,33 @@ public unsafe bool GetNext(MemoryPool pool, out IMemoryOwner entry,
}
epoch.Resume();
- if (GetNextInternal(out long physicalAddress, out entryLength, out currentAddress, out nextAddress))
+ // Continue looping until we find a record that is not a commit record
+ while (true)
{
+ long physicalAddress;
+ bool isCommitRecord;
+ try
+ {
+ var hasNext = GetNextInternal(out physicalAddress, out entryLength, out currentAddress,
+ out nextAddress,
+ out isCommitRecord);
+ if (!hasNext)
+ {
+ entry = default;
+ entryLength = default;
+ epoch.Suspend();
+ return false;
+ }
+ }
+ catch (Exception)
+ {
+ // Throw upwards, but first, suspend the epoch we are in
+ epoch.Suspend();
+ throw;
+ }
+
+ if (isCommitRecord) continue;
+
entry = pool.Rent(entryLength);
fixed (byte* bp = &entry.Memory.Span.GetPinnableReference())
@@ -274,11 +322,6 @@ public unsafe bool GetNext(MemoryPool pool, out IMemoryOwner entry,
epoch.Suspend();
return true;
}
-
- entry = default;
- entryLength = default;
- epoch.Suspend();
- return false;
}
///
@@ -368,6 +411,51 @@ private int Align(int length)
return (length + 3) & ~3;
}
+ internal unsafe bool ScanForwardForCommit(ref FasterLogRecoveryInfo info, long commitNum = -1)
+ {
+ epoch.Resume();
+ var foundCommit = false;
+ try
+ {
+ // Continue looping until we find a record that is a commit record
+ while (GetNextInternal(out long physicalAddress, out var entryLength, out currentAddress,
+ out nextAddress,
+ out var isCommitRecord))
+ {
+ if (!isCommitRecord) continue;
+
+ foundCommit = true;
+ byte[] entry;
+ // We allocate a byte array from heap
+ entry = new byte[entryLength];
+ fixed (byte* bp = entry)
+ Buffer.MemoryCopy((void*) (headerSize + physicalAddress), bp, entryLength, entryLength);
+ info.Initialize(new BinaryReader(new MemoryStream(entry)));
+
+ Debug.Assert(info.CommitNum != -1);
+
+ // If we have already found the commit number we are looking for, can stop early
+ if (info.CommitNum == commitNum) break;
+ }
+ }
+ catch (FasterException)
+ {
+ // If we are here --- simply stop scanning because we ran into an incomplete entry
+ }
+ finally
+ {
+ epoch.Suspend();
+ }
+
+ if (info.CommitNum == commitNum)
+ return true;
+ // User wants any commie
+ if (commitNum == -1)
+ return foundCommit;
+ // requested commit not found
+ return false;
+ }
+
///
/// Retrieve physical address of next iterator value
/// (under epoch protection if it is from main page buffer)
@@ -377,7 +465,7 @@ private int Align(int length)
///
///
///
- private unsafe bool GetNextInternal(out long physicalAddress, out int entryLength, out long currentAddress, out long outNextAddress)
+ private unsafe bool GetNextInternal(out long physicalAddress, out int entryLength, out long currentAddress, out long outNextAddress, out bool commitRecord)
{
while (true)
{
@@ -385,6 +473,7 @@ private unsafe bool GetNextInternal(out long physicalAddress, out int entryLengt
entryLength = 0;
currentAddress = nextAddress;
outNextAddress = nextAddress;
+ commitRecord = false;
// Check for boundary conditions
if (currentAddress < allocator.BeginAddress)
@@ -428,34 +517,42 @@ private unsafe bool GetNextInternal(out long physicalAddress, out int entryLengt
// Get and check entry length
entryLength = fasterLog.GetLength((byte*)physicalAddress);
+ // We may encounter zeroed out bits at the end of page in a normal log, therefore, we need to check
+ // whether that is the case
if (entryLength == 0)
{
- // We are likely at end of page, skip to next
- currentAddress = (1 + (currentAddress >> allocator.LogPageSizeBits)) << allocator.LogPageSizeBits;
-
- Utility.MonotonicUpdate(ref nextAddress, currentAddress, out _);
-
- if (0 != fasterLog.GetChecksum((byte*)physicalAddress))
+ // If zeroed out field is at page start, we encountered an uninitialized page and should signal up
+ var pageOffset = currentAddress & ((1 << allocator.LogPageSizeBits) - 1);
+ if (pageOffset == 0)
{
- epoch.Suspend();
var curPage = currentAddress >> allocator.LogPageSizeBits;
- throw new FasterException("Invalid checksum found during scan, skipping page " + curPage);
+ throw new FasterException("Uninitialized page found during scan at page " + curPage);
}
- else
- continue;
+
+ // Otherwise, we must assume that zeroed out bits are due to page end and skip forward to the next
+ // page. If that's not the case, next iteration of the loop will either hit EOF exception or a
+ // blank page, and propagate failure upwards appropriately
+ currentAddress = (1 + (currentAddress >> allocator.LogPageSizeBits)) << allocator.LogPageSizeBits;
+ Utility.MonotonicUpdate(ref nextAddress, currentAddress, out _);
+ continue;
+ }
+
+ // commit records have negative length fields
+ if (entryLength < 0)
+ {
+ commitRecord = true;
+ entryLength = -entryLength;
}
int recordSize = headerSize + Align(entryLength);
- if (entryLength < 0 || (_currentOffset + recordSize > allocator.PageSize))
+ if (_currentOffset + recordSize > allocator.PageSize)
{
currentAddress = (1 + (currentAddress >> allocator.LogPageSizeBits)) << allocator.LogPageSizeBits;
if (Utility.MonotonicUpdate(ref nextAddress, currentAddress, out _))
{
- epoch.Suspend();
throw new FasterException("Invalid length of record found: " + entryLength + " at address " + currentAddress + ", skipping page");
}
- else
- continue;
+ continue;
}
// Verify checksum if needed
@@ -467,11 +564,9 @@ private unsafe bool GetNextInternal(out long physicalAddress, out int entryLengt
currentAddress = (1 + (currentAddress >> allocator.LogPageSizeBits)) << allocator.LogPageSizeBits;
if (Utility.MonotonicUpdate(ref nextAddress, currentAddress, out _))
{
- epoch.Suspend();
throw new FasterException("Invalid checksum found during scan, skipping page " + curPage);
}
- else
- continue;
+ continue;
}
}
diff --git a/cs/src/core/Index/FasterLog/FasterLogRecoveryInfo.cs b/cs/src/core/Index/FasterLog/FasterLogRecoveryInfo.cs
index a6b2e80fc..9b14cbd3f 100644
--- a/cs/src/core/Index/FasterLog/FasterLogRecoveryInfo.cs
+++ b/cs/src/core/Index/FasterLog/FasterLogRecoveryInfo.cs
@@ -24,21 +24,33 @@ internal struct FasterLogRecoveryInfo
///
/// Flushed logical address
///
- public long FlushedUntilAddress;
+ public long UntilAddress;
///
/// Persisted iterators
///
public Dictionary Iterators;
+ ///
+ /// User-specified commit cookie
+ ///
+ public byte[] Cookie;
+
+ public long CommitNum;
+
+ public bool FastForwardAllowed;
+
///
/// Initialize
///
public void Initialize()
{
BeginAddress = 0;
- FlushedUntilAddress = 0;
+ UntilAddress = 0;
+ Iterators = null;
+ Cookie = null;
}
+
///
/// Initialize from stream
@@ -53,16 +65,20 @@ public void Initialize(BinaryReader reader)
version = reader.ReadInt32();
checkSum = reader.ReadInt64();
BeginAddress = reader.ReadInt64();
- FlushedUntilAddress = reader.ReadInt64();
+ UntilAddress = reader.ReadInt64();
+ if (version == 1)
+ CommitNum = reader.ReadInt64();
+ else
+ CommitNum = -1;
}
catch (Exception e)
{
throw new FasterException("Unable to recover from previous commit. Inner exception: " + e.ToString());
}
- if (version != 0)
+ if (version != 0 && version != 1)
throw new FasterException("Invalid version found during commit recovery");
- if (checkSum != (BeginAddress ^ FlushedUntilAddress))
+ if (checkSum != (BeginAddress ^ UntilAddress))
throw new FasterException("Invalid checksum found during commit recovery");
var count = 0;
@@ -80,6 +96,18 @@ public void Initialize(BinaryReader reader)
Iterators.Add(reader.ReadString(), reader.ReadInt64());
}
}
+
+ if (version == 1)
+ {
+ try
+ {
+ count = reader.ReadInt32();
+ }
+ catch { }
+
+ if (count > 0)
+ Cookie = reader.ReadBytes(count);
+ }
}
///
@@ -98,10 +126,11 @@ public readonly byte[] ToByteArray()
using MemoryStream ms = new();
using (BinaryWriter writer = new(ms))
{
- writer.Write(0); // version
- writer.Write(BeginAddress ^ FlushedUntilAddress); // checksum
+ writer.Write(1); // version
+ writer.Write(BeginAddress ^ UntilAddress); // checksum
writer.Write(BeginAddress);
- writer.Write(FlushedUntilAddress);
+ writer.Write(UntilAddress);
+ writer.Write(CommitNum);
if (Iterators?.Count > 0)
{
writer.Write(Iterators.Count);
@@ -115,10 +144,32 @@ public readonly byte[] ToByteArray()
{
writer.Write(0);
}
+
+ if (Cookie != null)
+ {
+ writer.Write(Cookie.Length);
+ writer.Write(Cookie);
+ }
+ else
+ {
+ writer.Write(0);
+ }
}
return ms.ToArray();
}
+ public int SerializedSize()
+ {
+ var iteratorSize = sizeof(int);
+ if (Iterators != null)
+ {
+ foreach (var kvp in Iterators)
+ iteratorSize += kvp.Key.Length + sizeof(long);
+ }
+
+ return sizeof(int) + 4 * sizeof(long) + iteratorSize + sizeof(int) + (Cookie?.Length ?? 0);
+ }
+
///
/// Take snapshot of persisted iterators
///
@@ -159,7 +210,7 @@ public void DebugPrint()
Debug.WriteLine("******** Log Commit Info ********");
Debug.WriteLine("BeginAddress: {0}", BeginAddress);
- Debug.WriteLine("FlushedUntilAddress: {0}", FlushedUntilAddress);
+ Debug.WriteLine("FlushedUntilAddress: {0}", UntilAddress);
}
}
}
diff --git a/cs/src/core/Index/FasterLog/FasterLogSettings.cs b/cs/src/core/Index/FasterLog/FasterLogSettings.cs
index 9fe7cc8e5..ce1e290eb 100644
--- a/cs/src/core/Index/FasterLog/FasterLogSettings.cs
+++ b/cs/src/core/Index/FasterLog/FasterLogSettings.cs
@@ -89,6 +89,13 @@ public class FasterLogSettings
///
public bool ReadOnlyMode = false;
+ ///
+ /// When FastCommitMode is enabled, FasterLog will reduce commit critical path latency, but may result in slower
+ /// recovery to a commit on restart. Additionally, FastCommitMode is only possible when log checksum is turned
+ /// on.
+ ///
+ public bool FastCommitMode = false;
+
internal LogSettings GetLogSettings()
{
return new LogSettings
diff --git a/cs/src/core/Index/FasterLog/ILogCommitManager.cs b/cs/src/core/Index/FasterLog/ILogCommitManager.cs
index d36066ad0..27a7b858c 100644
--- a/cs/src/core/Index/FasterLog/ILogCommitManager.cs
+++ b/cs/src/core/Index/FasterLog/ILogCommitManager.cs
@@ -12,13 +12,18 @@ namespace FASTER.core
///
public interface ILogCommitManager : IDisposable
{
+ ///
+ /// Whether this log commit manager supports recovering to a specific commit num
+ bool PreciseCommitNumRecoverySupport();
+
///
/// Perform (synchronous) commit with specified metadata
///
/// Committed begin address (for information only, not necessary to persist)
/// Address committed until (for information only, not necessary to persist)
/// Commit metadata - should be persisted
- void Commit(long beginAddress, long untilAddress, byte[] commitMetadata);
+ /// commit num
+ void Commit(long beginAddress, long untilAddress, byte[] commitMetadata, long commitNum);
///
/// Return commit metadata
@@ -32,5 +37,16 @@ public interface ILogCommitManager : IDisposable
///
///
public IEnumerable ListCommits();
+
+ ///
+ /// Remove the given commit, if present. Should only be invoked if PreciseCommitNumRecoverySupport returns true
+ ///
+ /// commit num to remove
+ public void RemoveCommit(long commitNum);
+
+ ///
+ /// Remove all log commits from this manager
+ ///
+ public void RemoveAllCommits();
}
}
\ No newline at end of file
diff --git a/cs/src/core/Index/FasterLog/LocalLogCommitManager.cs b/cs/src/core/Index/FasterLog/LocalLogCommitManager.cs
index f47bd16bc..9626ca0f3 100644
--- a/cs/src/core/Index/FasterLog/LocalLogCommitManager.cs
+++ b/cs/src/core/Index/FasterLog/LocalLogCommitManager.cs
@@ -22,14 +22,20 @@ public LocalLogCommitManager(string commitFile)
this.commitFile = commitFile;
}
+ ///
+ public bool PreciseCommitNumRecoverySupport() => false;
+
+
///
/// Perform (synchronous) commit with specified metadata
///
/// Committed begin address (for information only, not necessary to persist)
/// Address committed until (for information only, not necessary to persist)
/// Commit metadata
- public void Commit(long beginAddress, long untilAddress, byte[] commitMetadata)
+ /// Ignored param
+ public void Commit(long beginAddress, long untilAddress, byte[] commitMetadata, long commitNum)
{
+
// Two phase to ensure we write metadata in single Write operation
using MemoryStream ms = new();
using (BinaryWriter writer = new(ms))
@@ -74,5 +80,19 @@ public IEnumerable ListCommits()
// we only use a single commit file in this implementation
yield return 0;
}
+
+ ///
+ public void RemoveCommit(long commitNum)
+ {
+ throw new FasterException("removing commit by commit num is not supported when overwriting log commits");
+ }
+
+ ///
+ public void RemoveAllCommits()
+ {
+ // we only use a single commit file in this implementation
+ RemoveCommit(0);
+ }
+
}
}
\ No newline at end of file
diff --git a/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs b/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs
index 9954c3338..cb69d2053 100644
--- a/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs
+++ b/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs
@@ -182,7 +182,7 @@ public void PurgeAll()
{
foreach (var entry in blobs)
{
- entry.Value.PageBlob.Delete();
+ entry.Value.PageBlob?.Delete();
}
}
@@ -361,8 +361,8 @@ public override unsafe void ReadAsync(int segmentId, ulong sourceAddress, IntPtr
{
var nonLoadedBlob = this.blobDirectory.GetPageBlobReference(GetSegmentBlobName(segmentId));
var exception = new InvalidOperationException("Attempt to read a non-loaded segment");
- this.BlobManager?.HandleBlobError(nameof(ReadAsync), exception.Message, nonLoadedBlob?.Name, exception, true);
- throw exception;
+ this.BlobManager?.HandleBlobError(nameof(ReadAsync), exception.Message, nonLoadedBlob?.Name, exception, false);
+ throw new FasterException(exception.Message, exception);
}
var t = this.ReadFromBlobUnsafeAsync(blobEntry.PageBlob, (long)sourceAddress, (long)destinationAddress, readLength);
diff --git a/cs/test/DeviceFasterLogTests.cs b/cs/test/DeviceFasterLogTests.cs
index e1fc8f55b..0b36c4974 100644
--- a/cs/test/DeviceFasterLogTests.cs
+++ b/cs/test/DeviceFasterLogTests.cs
@@ -111,7 +111,10 @@ private async ValueTask FasterLogTest1(LogChecksumType logChecksum, IDevice devi
}
log.Commit(true);
- using (var iter = log.Scan(0, long.MaxValue))
+ // MoveNextAsync() would hang at TailAddress, waiting for more entries (that we don't add).
+ // Note: If this happens and the test has to be canceled, there may be a leftover blob from the log.Commit(), because
+ // the log device isn't Dispose()d; the symptom is currently a numeric string format error in DefaultCheckpointNamingScheme.
+ using (var iter = log.Scan(0, log.TailAddress))
{
var counter = new FasterLogTestBase.Counter(log);
@@ -122,12 +125,6 @@ private async ValueTask FasterLogTest1(LogChecksumType logChecksum, IDevice devi
{
Assert.IsTrue(result.SequenceEqual(entry));
counter.IncrementAndMaybeTruncateUntil(nextAddress);
-
- // MoveNextAsync() would hang at TailAddress, waiting for more entries (that we don't add).
- // Note: If this happens and the test has to be canceled, there may be a leftover blob from the log.Commit(), because
- // the log device isn't Dispose()d; the symptom is currently a numeric string format error in DefaultCheckpointNamingScheme.
- if (nextAddress == log.TailAddress)
- break;
}
break;
case FasterLogTestBase.IteratorType.AsyncMemoryOwner:
@@ -136,12 +133,6 @@ private async ValueTask FasterLogTest1(LogChecksumType logChecksum, IDevice devi
Assert.IsTrue(result.Memory.Span.ToArray().Take(entry.Length).SequenceEqual(entry));
result.Dispose();
counter.IncrementAndMaybeTruncateUntil(nextAddress);
-
- // MoveNextAsync() would hang at TailAddress, waiting for more entries (that we don't add).
- // Note: If this happens and the test has to be canceled, there may be a leftover blob from the log.Commit(), because
- // the log device isn't Dispose()d; the symptom is currently a numeric string format error in DefaultCheckpointNamingScheme.
- if (nextAddress == log.TailAddress)
- break;
}
break;
case FasterLogTestBase.IteratorType.Sync:
diff --git a/cs/test/FasterLogFastCommitTests.cs b/cs/test/FasterLogFastCommitTests.cs
new file mode 100644
index 000000000..62a830d34
--- /dev/null
+++ b/cs/test/FasterLogFastCommitTests.cs
@@ -0,0 +1,148 @@
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+using System.Threading;
+using FASTER.core;
+using FASTER.test.recovery;
+using NUnit.Framework;
+
+namespace FASTER.test
+{
+ [TestFixture]
+ internal class FasterLogFastCommitTests : FasterLogTestBase
+ {
+ [SetUp]
+ public void Setup() => base.BaseSetup(false, false);
+
+ [TearDown]
+ public void TearDown() => base.BaseTearDown();
+
+ [Test]
+ [Category("FasterLog")]
+ [Category("Smoke")]
+ public void FasterLogSimpleFastCommitTest([Values] TestUtils.DeviceType deviceType)
+ {
+ var cookie = new byte[100];
+ new Random().NextBytes(cookie);
+
+ var filename = path + "fastCommit" + deviceType.ToString() + ".log";
+ device = TestUtils.CreateTestDevice(deviceType, filename, deleteOnClose: true);
+ var logSettings = new FasterLogSettings { LogDevice = device, LogChecksum = LogChecksumType.PerEntry, LogCommitManager = manager, FastCommitMode = true};
+ log = new FasterLog(logSettings);
+
+ byte[] entry = new byte[entryLength];
+ for (int i = 0; i < entryLength; i++)
+ entry[i] = (byte)i;
+
+ for (int i = 0; i < numEntries; i++)
+ {
+ log.Enqueue(entry);
+ }
+
+ var cookie1 = new byte[100];
+ new Random().NextBytes(cookie1);
+ var commitSuccessful = log.CommitStrongly(out var commit1Addr, out _, true, cookie1, 1);
+ Assert.IsTrue(commitSuccessful);
+
+ for (int i = 0; i < numEntries; i++)
+ {
+ log.Enqueue(entry);
+ }
+
+ var cookie2 = new byte[100];
+ new Random().NextBytes(cookie2);
+ commitSuccessful = log.CommitStrongly(out var commit2Addr, out _, true, cookie2, 2);
+ Assert.IsTrue(commitSuccessful);
+
+ for (int i = 0; i < numEntries; i++)
+ {
+ log.Enqueue(entry);
+ }
+
+ var cookie6 = new byte[100];
+ new Random().NextBytes(cookie6);
+ commitSuccessful = log.CommitStrongly(out var commit6Addr, out _, true, cookie6, 6);
+ Assert.IsTrue(commitSuccessful);
+
+ // Wait for all metadata writes to be complete to avoid a concurrent access exception
+ log.Dispose();
+ log = null;
+
+ // be a deviant and remove commit metadata files
+ manager.RemoveAllCommits();
+
+ // Recovery should still work
+ var recoveredLog = new FasterLog(logSettings, 1);
+ Assert.AreEqual(cookie1, recoveredLog.RecoveredCookie);
+ Assert.AreEqual(commit1Addr, recoveredLog.TailAddress);
+ recoveredLog.Dispose();
+
+ recoveredLog = new FasterLog(logSettings, 2);
+ Assert.AreEqual(cookie2, recoveredLog.RecoveredCookie);
+ Assert.AreEqual(commit2Addr, recoveredLog.TailAddress);
+ recoveredLog.Dispose();
+
+ // Default argument should recover to most recent
+ recoveredLog = new FasterLog(logSettings);
+ Assert.AreEqual(cookie6, recoveredLog.RecoveredCookie);
+ Assert.AreEqual(commit6Addr, recoveredLog.TailAddress);
+ recoveredLog.Dispose();
+ }
+
+ [Test]
+ [Category("FasterLog")]
+ [Category("Smoke")]
+ public void CommitRecordBoundedGrowthTest([Values] TestUtils.DeviceType deviceType)
+ {
+ var cookie = new byte[100];
+ new Random().NextBytes(cookie);
+
+ var filename = path + "boundedGrowth" + deviceType.ToString() + ".log";
+ device = TestUtils.CreateTestDevice(deviceType, filename, deleteOnClose: true);
+ var logSettings = new FasterLogSettings { LogDevice = device, LogChecksum = LogChecksumType.PerEntry, LogCommitManager = manager, FastCommitMode = true};
+ log = new FasterLog(logSettings);
+
+ byte[] entry = new byte[entryLength];
+ for (int i = 0; i < entryLength; i++)
+ entry[i] = (byte)i;
+
+ for (int i = 0; i < 5 * numEntries; i++)
+ log.Enqueue(entry);
+
+ // for comparison, insert some entries without any commit records
+ var referenceTailLength = log.TailAddress;
+
+ var enqueueDone = new ManualResetEventSlim();
+ var commitThreads = new List();
+ // Make sure to not spin up too many commit threads, otherwise we might clog epochs and halt progress
+ for (var i = 0; i < Math.Max(1, Environment.ProcessorCount - 1); i++)
+ {
+ commitThreads.Add(new Thread(() =>
+ {
+ // Otherwise, absolutely clog the commit pipeline
+ while (!enqueueDone.IsSet)
+ log.Commit();
+ }));
+ }
+
+ foreach (var t in commitThreads)
+ t.Start();
+ for (int i = 0; i < 5 * numEntries; i++)
+ {
+ log.Enqueue(entry);
+ }
+ enqueueDone.Set();
+
+ foreach (var t in commitThreads)
+ t.Join();
+
+
+ // TODO: Hardcoded constant --- if this number changes in FasterLogRecoverInfo, it needs to be updated here too
+ var commitRecordSize = 44;
+ var logTailGrowth = log.TailAddress - referenceTailLength;
+ // Check that we are not growing the log more than one commit record per user entry
+ Assert.IsTrue(logTailGrowth - referenceTailLength <= commitRecordSize * 5 * numEntries);
+ }
+ }
+}
\ No newline at end of file
diff --git a/cs/test/FasterLogRecoverReadOnlyTests.cs b/cs/test/FasterLogRecoverReadOnlyTests.cs
index 734e6bc2c..7d96461e7 100644
--- a/cs/test/FasterLogRecoverReadOnlyTests.cs
+++ b/cs/test/FasterLogRecoverReadOnlyTests.cs
@@ -80,7 +80,7 @@ private async Task CommitterAsync(FasterLog log, CancellationToken cancellationT
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromMilliseconds(CommitPeriodMs), cancellationToken);
- await log.CommitAsync(cancellationToken);
+ await log.CommitAsync(token: cancellationToken);
}
} catch (OperationCanceledException) { }
}
diff --git a/cs/test/FasterLogTests.cs b/cs/test/FasterLogTests.cs
index b3918bc86..7a8712f24 100644
--- a/cs/test/FasterLogTests.cs
+++ b/cs/test/FasterLogTests.cs
@@ -9,7 +9,9 @@
using System.Threading;
using System.Threading.Tasks;
using FASTER.core;
+using NUnit.Compatibility;
using NUnit.Framework;
+using NUnit.Framework.Internal;
namespace FASTER.test
{
@@ -19,7 +21,6 @@ internal class FasterLogStandAloneTests
[Test]
[Category("FasterLog")]
[Category("Smoke")]
-
public void TestDisposeReleasesFileLocksWithInprogressCommit([Values] TestUtils.DeviceType deviceType)
{
string path = TestUtils.MethodTestDir + "/";
@@ -60,6 +61,8 @@ internal class FasterLogTestBase
protected static readonly byte[] entry = new byte[100];
protected static readonly ReadOnlySpanBatch spanBatch = new ReadOnlySpanBatch(10000);
+ private bool deleteOnClose;
+
protected struct ReadOnlySpanBatch : IReadOnlySpanBatch
{
private readonly int batchSize;
@@ -68,20 +71,23 @@ protected struct ReadOnlySpanBatch : IReadOnlySpanBatch
public int TotalEntries() => batchSize;
}
- protected void BaseSetup()
+ protected void BaseSetup(bool overwriteCommit = true, bool deleteOnClose = true)
{
path = TestUtils.MethodTestDir + "/";
// Clean up log files from previous test runs in case they weren't cleaned up
TestUtils.DeleteDirectory(path, wait: true);
- manager = new DeviceLogCommitCheckpointManager(new LocalStorageNamedDeviceFactory(deleteOnClose: true), new DefaultCheckpointNamingScheme(path));
+ manager = new DeviceLogCommitCheckpointManager(new LocalStorageNamedDeviceFactory(deleteOnClose: deleteOnClose), new DefaultCheckpointNamingScheme(path), overwriteLogCommits: overwriteCommit);
+ this.deleteOnClose = deleteOnClose;
}
protected void BaseTearDown()
{
log?.Dispose();
log = null;
+ if (!deleteOnClose)
+ manager.RemoveAllCommits();
manager?.Dispose();
manager = null;
device?.Dispose();
@@ -164,9 +170,9 @@ protected static async Task LogWriterAsync(FasterLog log, byte[] entry)
// Enter in some entries then wait on this separate thread
await log.EnqueueAsync(entry);
await log.EnqueueAsync(entry);
- var commitTask = await log.CommitAsync(null, token);
+ var commitTask = await log.CommitAsync(null, token: token);
await log.EnqueueAsync(entry);
- await log.CommitAsync(commitTask, token);
+ await log.CommitAsync(commitTask, token: token);
}
}
@@ -280,7 +286,7 @@ public async ValueTask TryEnqueue1([Values] LogChecksumType logChecksum, [Values
}
Assert.IsFalse(waitingReader.IsCompleted);
- await log.CommitAsync(token);
+ await log.CommitAsync(token: token);
while (!waitingReader.IsCompleted) ;
Assert.IsTrue(waitingReader.IsCompleted);
@@ -783,6 +789,110 @@ public async ValueTask RefreshUncommittedAsyncTest([Values] IteratorType iterato
}
log.Dispose();
}
+ }
+
+ [TestFixture]
+ internal class FasterLogCustomCommitTests : FasterLogTestBase
+ {
+ [SetUp]
+ public void Setup() => base.BaseSetup(false, false);
+
+ [TearDown]
+ public void TearDown() => base.BaseTearDown();
+
+ [Test]
+ [Category("FasterLog")]
+ [Category("Smoke")]
+ public void FasterLogSimpleCommitCookieTest([Values] bool fastCommit)
+ {
+ var cookie = new byte[100];
+ new Random().NextBytes(cookie);
+
+ device = Devices.CreateLogDevice(path + "SimpleCommitCookie" + fastCommit + ".log", deleteOnClose: true);
+ var logSettings = new FasterLogSettings { LogDevice = device, LogChecksum = LogChecksumType.PerEntry, LogCommitManager = manager, FastCommitMode = fastCommit};
+ log = new FasterLog(logSettings);
+
+ byte[] entry = new byte[entryLength];
+ for (int i = 0; i < entryLength; i++)
+ entry[i] = (byte)i;
+
+ for (int i = 0; i < numEntries; i++)
+ {
+ log.Enqueue(entry);
+ }
+
+ log.CommitStrongly(out _, out _, true, cookie);
+
+ var recoveredLog = new FasterLog(logSettings);
+ Assert.AreEqual(cookie, recoveredLog.RecoveredCookie);
+ recoveredLog.Dispose();
+ }
+ [Test]
+ [Category("FasterLog")]
+ public void FasterLogManualCommitTest()
+ {
+ device = Devices.CreateLogDevice(path + "logManualCommitTest.log", deleteOnClose: true);
+ var logSettings = new FasterLogSettings { LogDevice = device, LogChecksum = LogChecksumType.None, LogCommitManager = manager};
+ log = new FasterLog(logSettings);
+
+ byte[] entry = new byte[entryLength];
+ for (int i = 0; i < entryLength; i++)
+ entry[i] = (byte)i;
+
+ for (int i = 0; i < numEntries; i++)
+ {
+ log.Enqueue(entry);
+ }
+
+ var cookie1 = new byte[100];
+ new Random().NextBytes(cookie1);
+ var commitSuccessful = log.CommitStrongly(out var commit1Addr, out _, true, cookie1, 1);
+ Assert.IsTrue(commitSuccessful);
+
+ for (int i = 0; i < numEntries; i++)
+ {
+ log.Enqueue(entry);
+ }
+
+ var cookie2 = new byte[100];
+ new Random().NextBytes(cookie2);
+ commitSuccessful = log.CommitStrongly(out var commit2Addr, out _, true, cookie2, 2);
+ Assert.IsTrue(commitSuccessful);
+
+ for (int i = 0; i < numEntries; i++)
+ {
+ log.Enqueue(entry);
+ }
+
+ var cookie6 = new byte[100];
+ new Random().NextBytes(cookie6);
+ commitSuccessful = log.CommitStrongly(out var commit6Addr, out _, true, cookie6, 6);
+ Assert.IsTrue(commitSuccessful);
+
+ var recoveredLog = new FasterLog(logSettings, 1);
+ Assert.AreEqual(cookie1, recoveredLog.RecoveredCookie);
+ Assert.AreEqual(commit1Addr, recoveredLog.TailAddress);
+ recoveredLog.Dispose();
+
+ recoveredLog = new FasterLog(logSettings, 2);
+ Assert.AreEqual(cookie2, recoveredLog.RecoveredCookie);
+ Assert.AreEqual(commit2Addr, recoveredLog.TailAddress);
+ recoveredLog.Dispose();
+
+ // recovering to a non-existent commit should throw FasterException
+ try
+ {
+ recoveredLog = new FasterLog(logSettings, 4);
+ Assert.Fail();
+ }
+ catch (FasterException) {}
+
+ // Default argument should recover to most recent
+ recoveredLog = new FasterLog(logSettings);
+ Assert.AreEqual(cookie6, recoveredLog.RecoveredCookie);
+ Assert.AreEqual(commit6Addr, recoveredLog.TailAddress);
+ recoveredLog.Dispose();
+ }
}
}
diff --git a/cs/test/LogShiftTailStressTest.cs b/cs/test/LogShiftTailStressTest.cs
new file mode 100644
index 000000000..d5009600a
--- /dev/null
+++ b/cs/test/LogShiftTailStressTest.cs
@@ -0,0 +1,64 @@
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using FASTER.core;
+using NUnit.Framework;
+
+namespace FASTER.test
+{
+ [TestFixture]
+ internal class LogShiftTailStressTest : FasterLogTestBase
+ {
+ [SetUp]
+ public void Setup() => base.BaseSetup();
+
+ [TearDown]
+ public void TearDown() => base.BaseTearDown();
+
+ [Test]
+ [Category("FasterLog")]
+ public void FasterLogShiftTailStressTest()
+ {
+ // Get an excruciatingly slow storage device to maximize chance of clogging the flush pipeline
+ device = new LocalMemoryDevice(1L << 32, 1 << 30, 2, sector_size: 512, latencyMs: 50, fileName: "stress.log");
+ var logSettings = new FasterLogSettings { LogDevice = device, LogChecksum = LogChecksumType.None, LogCommitManager = manager};
+ log = new FasterLog(logSettings);
+
+ byte[] entry = new byte[entryLength];
+ for (int i = 0; i < entryLength; i++)
+ entry[i] = (byte)i;
+
+ for (int i = 0; i < 5 * numEntries; i++)
+ log.Enqueue(entry);
+
+ // for comparison, insert some entries without any commit records
+ var referenceTailLength = log.TailAddress;
+
+ var enqueueDone = new ManualResetEventSlim();
+ var commitThreads = new List();
+ // Make sure to spin up many commit threads to expose lots of interleavings
+ for (var i = 0; i < 2 * Math.Max(1, Environment.ProcessorCount - 1); i++)
+ {
+ commitThreads.Add(new Thread(() =>
+ {
+ // Otherwise, absolutely clog the commit pipeline
+ while (!enqueueDone.IsSet)
+ log.Commit();
+ }));
+ }
+
+ foreach (var t in commitThreads)
+ t.Start();
+ for (int i = 0; i < 5 * numEntries; i++)
+ {
+ log.Enqueue(entry);
+ }
+ enqueueDone.Set();
+
+ foreach (var t in commitThreads)
+ t.Join();
+
+ // We expect the test to finish and not get stuck somewhere
+ }
+ }
+}
\ No newline at end of file
diff --git a/cs/test/RecoverReadOnlyTest.cs b/cs/test/RecoverReadOnlyTest.cs
index ed81c90b4..5ecac989c 100644
--- a/cs/test/RecoverReadOnlyTest.cs
+++ b/cs/test/RecoverReadOnlyTest.cs
@@ -89,7 +89,7 @@ static async Task CommitterAsync(FasterLog log, CancellationToken cancellationTo
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromMilliseconds(commitPeriodMs), cancellationToken);
- await log.CommitAsync(cancellationToken);
+ await log.CommitAsync(token: cancellationToken);
}
}
diff --git a/cs/test/TestUtils.cs b/cs/test/TestUtils.cs
index 98a2a8472..28d6515b6 100644
--- a/cs/test/TestUtils.cs
+++ b/cs/test/TestUtils.cs
@@ -94,7 +94,7 @@ public enum DeviceType
LocalMemory
}
- internal static IDevice CreateTestDevice(DeviceType testDeviceType, string filename, int latencyMs = 20) // latencyMs works only for DeviceType = LocalMemory
+ internal static IDevice CreateTestDevice(DeviceType testDeviceType, string filename, int latencyMs = 20, bool deleteOnClose = false) // latencyMs works only for DeviceType = LocalMemory
{
IDevice device = null;
bool preallocateFile = false;
@@ -102,9 +102,7 @@ public enum DeviceType
bool recoverDevice = false;
bool useIoCompletionPort = false;
bool disableFileBuffering = true;
-
- bool deleteOnClose = false;
-
+
switch (testDeviceType)
{
#if WINDOWS
@@ -116,7 +114,7 @@ public enum DeviceType
break;
case DeviceType.EmulatedAzure:
IgnoreIfNotRunningAzureTests();
- device = new AzureStorageDevice(AzureEmulatedStorageString, AzureTestContainer, AzureTestDirectory, Path.GetFileName(filename), deleteOnClose: false);
+ device = new AzureStorageDevice(AzureEmulatedStorageString, AzureTestContainer, AzureTestDirectory, Path.GetFileName(filename), deleteOnClose: deleteOnClose);
break;
#endif
case DeviceType.MLSD:
@@ -124,7 +122,7 @@ public enum DeviceType
break;
// Emulated higher latency storage device - takes a disk latency arg (latencyMs) and emulates an IDevice using main memory, serving data at specified latency
case DeviceType.LocalMemory:
- device = new LocalMemoryDevice(1L << 26, 1L << 22, 2, sector_size: 512, latencyMs: latencyMs, fileName: filename); // 64 MB (1L << 26) is enough for our test cases
+ device = new LocalMemoryDevice(1L << 30, 1L << 30, 2, sector_size: 512, latencyMs: latencyMs, fileName: filename); // 64 MB (1L << 26) is enough for our test cases
break;
}