diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index f7369e038..d2c512847 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -146,6 +146,12 @@ public abstract partial class AllocatorBase : IDisposable /// public long SafeHeadAddress; + /// + /// Tentative head address. Threads do not take any record locks earlier than this point. + /// Records earlier than this address undergo eviction, before HeadAddress is moved. + /// + public long TentativeHeadAddress; + /// /// Flushed until address /// @@ -1235,7 +1241,7 @@ internal virtual bool TryComplete() /// Flush: send page to secondary store /// /// - public void OnPagesMarkedReadOnly(long newSafeReadOnlyAddress) + private void OnPagesMarkedReadOnly(long newSafeReadOnlyAddress) { if (Utility.MonotonicUpdate(ref SafeReadOnlyAddress, newSafeReadOnlyAddress, out long oldSafeReadOnlyAddress)) { @@ -1249,17 +1255,31 @@ public void OnPagesMarkedReadOnly(long newSafeReadOnlyAddress) } } + /// + /// Action to be performed when all threads agree that + /// a page range is ready to close. + /// + private void OnPagesReadyToClose(long oldHeadAddress, long newHeadAddress) + { + if (ReadCache && (newHeadAddress > HeadAddress)) + EvictCallback(HeadAddress, newHeadAddress); + + if (Utility.MonotonicUpdate(ref HeadAddress, newHeadAddress, out oldHeadAddress)) + { + Debug.WriteLine("Allocate: Moving head offset from {0:X} to {1:X}", oldHeadAddress, newHeadAddress); + epoch.BumpCurrentEpoch(() => OnPagesClosed(newHeadAddress)); + } + } + /// /// Action to be performed for when all threads have /// agreed that a page range is closed. /// /// - public void OnPagesClosed(long newSafeHeadAddress) + private void OnPagesClosed(long newSafeHeadAddress) { if (Utility.MonotonicUpdate(ref SafeHeadAddress, newSafeHeadAddress, out long oldSafeHeadAddress)) { - Debug.WriteLine("SafeHeadOffset shifted from {0:X} to {1:X}", oldSafeHeadAddress, newSafeHeadAddress); - // Also shift begin address if we are using a null storage device if (IsNullDevice) Utility.MonotonicUpdate(ref BeginAddress, newSafeHeadAddress, out _); @@ -1298,12 +1318,14 @@ internal void DebugPrintAddresses(long closePageAddress) var _readonly = ReadOnlyAddress; var _safereadonly = SafeReadOnlyAddress; var _tail = GetTailAddress(); + var _thead = TentativeHeadAddress; var _head = HeadAddress; var _safehead = SafeHeadAddress; Console.WriteLine("ClosePageAddress: {0}.{1}", GetPage(closePageAddress), GetOffsetInPage(closePageAddress)); Console.WriteLine("FlushedUntil: {0}.{1}", GetPage(_flush), GetOffsetInPage(_flush)); Console.WriteLine("Tail: {0}.{1}", GetPage(_tail), GetOffsetInPage(_tail)); + Console.WriteLine("TentativeHead: {0}.{1}", GetPage(_thead), GetOffsetInPage(_thead)); Console.WriteLine("Head: {0}.{1}", GetPage(_head), GetOffsetInPage(_head)); Console.WriteLine("SafeHead: {0}.{1}", GetPage(_safehead), GetOffsetInPage(_safehead)); Console.WriteLine("ReadOnly: {0}.{1}", GetPage(_readonly), GetOffsetInPage(_readonly)); @@ -1334,29 +1356,7 @@ private void PageAlignedShiftReadOnlyAddress(long currentTailAddress) /// /// private void PageAlignedShiftHeadAddress(long currentTailAddress) - { - //obtain local values of variables that can change - long currentHeadAddress = HeadAddress; - long currentFlushedUntilAddress = FlushedUntilAddress; - long pageAlignedTailAddress = currentTailAddress & ~PageSizeMask; - long desiredHeadAddress = (pageAlignedTailAddress - HeadOffsetLagAddress); - - long newHeadAddress = desiredHeadAddress; - if (currentFlushedUntilAddress < newHeadAddress) - { - newHeadAddress = currentFlushedUntilAddress; - } - newHeadAddress &= ~PageSizeMask; - - if (ReadCache && (newHeadAddress > HeadAddress)) - EvictCallback(HeadAddress, newHeadAddress); - - if (Utility.MonotonicUpdate(ref HeadAddress, newHeadAddress, out long oldHeadAddress)) - { - Debug.WriteLine("Allocate: Moving head offset from {0:X} to {1:X}", oldHeadAddress, newHeadAddress); - epoch.BumpCurrentEpoch(() => OnPagesClosed(newHeadAddress)); - } - } + => ShiftHeadAddress((currentTailAddress & ~PageSizeMask) - HeadOffsetLagAddress); /// /// Tries to shift head address to specified value @@ -1373,14 +1373,11 @@ public long ShiftHeadAddress(long desiredHeadAddress) newHeadAddress = currentFlushedUntilAddress; } - if (ReadCache && (newHeadAddress > HeadAddress)) - EvictCallback(HeadAddress, newHeadAddress); - - if (Utility.MonotonicUpdate(ref HeadAddress, newHeadAddress, out long oldHeadAddress)) + if (Utility.MonotonicUpdate(ref TentativeHeadAddress, newHeadAddress, out long oldTentativeHeadAddress)) { - Debug.WriteLine("Allocate: Moving head offset from {0:X} to {1:X}", oldHeadAddress, newHeadAddress); - epoch.BumpCurrentEpoch(() => OnPagesClosed(newHeadAddress)); + epoch.BumpCurrentEpoch(() => OnPagesReadyToClose(oldTentativeHeadAddress, newHeadAddress)); } + return newHeadAddress; }