Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions cs/src/core/Index/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace FASTER.core
/// <summary>
/// FASTER log
/// </summary>
public sealed partial class FasterLog : IDisposable
public sealed partial class FasterLog
{
private readonly BlittableAllocator<Empty, byte> allocator;
private readonly LightEpoch epoch;
Expand Down Expand Up @@ -519,6 +519,7 @@ public void Commit(bool spinWait = false)
/// Proposal for the identifier to use for this commit, or -1 if the system should pick one. If supplied with
/// a non -1 value, commit is guaranteed to have the supplied identifier if commit call is successful
/// </param>
/// <param name="callback"> callback function that will be invoked when strong commit is persistent </param>
/// <returns>Whether commit is successful </returns>
public bool CommitStrongly(out long commitTail, out long actualCommitNum, bool spinWait = false, byte[] cookie = null, long proposedCommitNum = -1, Action callback = null)
{
Expand Down Expand Up @@ -1072,7 +1073,7 @@ private void UpdateCommittedState(FasterLogRecoveryInfo recoveryInfo)

private void WriteCommitMetadata(FasterLogRecoveryInfo recoveryInfo)
{
// TODO(Tianyu): If fast commit, write this in separate thread?
// TODO: can change to write this in separate thread for fast commit
logCommitManager.Commit(recoveryInfo.BeginAddress, recoveryInfo.UntilAddress,
recoveryInfo.ToByteArray(), recoveryInfo.CommitNum);
// If not fast committing, set committed state as we commit metadata explicitly only after metadata commit
Expand Down Expand Up @@ -1138,7 +1139,7 @@ private void SerialCommitCallbackWorker(CommitInfo commitInfo)
if (latestCommit.FastForwardAllowed) WriteCommitMetadata(latestCommit);
}

// TODO(Tianyu): Can invoke earlier in the case of fast commit
// TODO: Can invoke earlier in the case of fast commit
var _commitTcs = commitTcs;
commitTcs = new TaskCompletionSource<LinkedCommitInfo>(TaskCreationOptions.RunContinuationsAsynchronously);
var lci = new LinkedCommitInfo
Expand Down Expand Up @@ -1679,12 +1680,12 @@ private bool CommitInternal(out long commitTail, out long actualCommitNum, bool
info.SnapshotIterators(PersistedIterators);
var metadataChanged = ShouldCommmitMetadata(ref info);
// Only apply commit policy if not a strong commit
if (!fastForwardAllowed && !commitPolicy.AdmitCommit(TailAddress, metadataChanged))
if (fastForwardAllowed && !commitPolicy.AdmitCommit(TailAddress, metadataChanged))
return false;

lock (ongoingCommitRequests)
{
if (commitCoveredAddress == TailAddress && metadataChanged)
if (commitCoveredAddress == TailAddress && !metadataChanged)
// Nothing to commit if no metadata update and no new entries
return false;
// Make sure we will not be allowed to back out of a commit of AdmitCommit returns true, as the strategy
Expand Down Expand Up @@ -1717,6 +1718,8 @@ private bool CommitInternal(out long commitTail, out long actualCommitNum, bool
info.UntilAddress = commitTail = TailAddress;
}

Utility.MonotonicUpdate(ref commitCoveredAddress, commitTail, out _);

commitPolicy.OnCommitCreated(info);
// Enqueue the commit record's content and offset into the queue so it can be picked up by the next flush
// At this point, we expect the commit record to be flushed out as a distinct recovery point
Expand Down
5 changes: 0 additions & 5 deletions cs/src/core/Index/FasterLog/FasterLogCommitPolicies.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,6 @@ public sealed partial class FasterLog : IDisposable
/// automatically retried.
/// </summary>
/// <param name="k"> maximum number of commits that can be outstanding at a time </param>
/// <param name="autoRetry">
/// whether to automatically retry rejected commit requests later. If set to true, even when
/// a commit() returns false due to being limited, the tail as of that commit will eventually be committed
/// without the need to invoke commit() again.
/// </param>
/// <returns> policy object </returns>
public static IFasterLogCommitPolicy MaxParallelCommitStrategy(int k) => new MaxParallelCommitPolicy(k);

Expand Down
3 changes: 3 additions & 0 deletions cs/src/core/Index/FasterLog/FasterLogRecoveryInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ public readonly byte[] ToByteArray()
return ms.ToArray();
}

/// <summary>
/// </summary>
/// <returns> size of this recovery info serialized </returns>
public int SerializedSize()
{
var iteratorSize = sizeof(int);
Expand Down
3 changes: 3 additions & 0 deletions cs/src/core/Index/FasterLog/FasterLogSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ public class FasterLogSettings
/// </summary>
public bool RemoveOutdatedCommitFiles = true;

/// <summary>
/// CommitPolicy that influences the behavior of Commit() calls, or null if default.
/// </summary>
public IFasterLogCommitPolicy CommitPolicy = null;

internal LogSettings GetLogSettings()
Expand Down
2 changes: 1 addition & 1 deletion cs/test/LogShiftTailStressTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public void FasterLogShiftTailStressTest()
{
// Get an excruciatingly slow storage device to maximize chance of clogging the flush pipeline
device = new LocalMemoryDevice(1L << 28, 1 << 28, 2, sector_size: 512, latencyMs: 50, fileName: "stress.log");
var logSettings = new FasterLogSettings { LogDevice = device, LogChecksum = LogChecksumType.None, LogCommitManager = manager};
var logSettings = new FasterLogSettings { LogDevice = device, LogChecksum = LogChecksumType.None, LogCommitManager = manager, SegmentSizeBits = 28};
log = new FasterLog(logSettings);

byte[] entry = new byte[entryLength];
Expand Down