diff --git a/cs/benchmark/FasterSpanByteYcsbBenchmark.cs b/cs/benchmark/FasterSpanByteYcsbBenchmark.cs index 2a101b7b3..cab049e11 100644 --- a/cs/benchmark/FasterSpanByteYcsbBenchmark.cs +++ b/cs/benchmark/FasterSpanByteYcsbBenchmark.cs @@ -1,8 +1,6 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT license. -#pragma warning disable CS0162 // Unreachable code detected -- when switching on YcsbConstants - // Define below to enable continuous performance report for dashboard // #define DASHBOARD @@ -297,7 +295,7 @@ internal unsafe (double, double) Run(TestLoader testLoader) long start = swatch.ElapsedTicks; if (store.TakeHybridLogCheckpoint(out _, testLoader.Options.PeriodicCheckpointType, testLoader.Options.PeriodicCheckpointTryIncremental)) { - store.CompleteCheckpointAsync().GetAwaiter().GetResult(); + store.CompleteCheckpointAsync().AsTask().GetAwaiter().GetResult(); var timeTaken = (swatch.ElapsedTicks - start) / TimeSpan.TicksPerMillisecond; Console.WriteLine("Checkpoint time: {0}ms", timeTaken); checkpointTaken++; diff --git a/cs/benchmark/FasterYcsbBenchmark.cs b/cs/benchmark/FasterYcsbBenchmark.cs index 9e8380079..9852f0ef3 100644 --- a/cs/benchmark/FasterYcsbBenchmark.cs +++ b/cs/benchmark/FasterYcsbBenchmark.cs @@ -1,8 +1,6 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT license. -#pragma warning disable CS0162 // Unreachable code detected -- when switching on YcsbConstants - // Define below to enable continuous performance report for dashboard // #define DASHBOARD @@ -292,7 +290,7 @@ internal unsafe (double, double) Run(TestLoader testLoader) long start = swatch.ElapsedTicks; if (store.TakeHybridLogCheckpoint(out _, testLoader.Options.PeriodicCheckpointType, testLoader.Options.PeriodicCheckpointTryIncremental)) { - store.CompleteCheckpointAsync().GetAwaiter().GetResult(); + store.CompleteCheckpointAsync().AsTask().GetAwaiter().GetResult(); var timeTaken = (swatch.ElapsedTicks - start) / TimeSpan.TicksPerMillisecond; Console.WriteLine("Checkpoint time: {0}ms", timeTaken); checkpointTaken++; diff --git a/cs/benchmark/Functions.cs b/cs/benchmark/Functions.cs index 1cbcdca77..ac3701706 100644 --- a/cs/benchmark/Functions.cs +++ b/cs/benchmark/Functions.cs @@ -98,12 +98,18 @@ public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va public void Lock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, ref long lockContext) { - recordInfo.SpinLock(); + if (lockType == LockType.Exclusive) + recordInfo.LockExclusive(); + else + recordInfo.LockShared(); } public bool Unlock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, long lockContext) { - recordInfo.Unlock(); + if (lockType == LockType.Exclusive) + recordInfo.UnlockExclusive(); + else + recordInfo.UnlockShared(); return true; } } diff --git a/cs/src/core/Index/Common/RecordInfo.cs b/cs/src/core/Index/Common/RecordInfo.cs index 65ba383b2..c8a3173e7 100644 --- a/cs/src/core/Index/Common/RecordInfo.cs +++ b/cs/src/core/Index/Common/RecordInfo.cs @@ -3,104 +3,62 @@ #pragma warning disable 1591 -//#define RECORD_INFO_WITH_PIN_COUNT - -using System; -using System.Diagnostics; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading; namespace FASTER.core { -#if RECORD_INFO_WITH_PIN_COUNT - [StructLayout(LayoutKind.Explicit, Size = 12)] -#else + // RecordInfo layout (64 bits total): + // [VVVVV][Dirty][Stub][Sealed] [Valid][Tombstone][X][SSSSS] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] + // where V = version, X = exclusive lock, S = shared lock, A = address [StructLayout(LayoutKind.Explicit, Size = 8)] -#endif - public unsafe struct RecordInfo + public struct RecordInfo { - public const int kLatchBitOffset = 48; - - public const int kTombstoneBitOffset = 49; - - public const int kInvalidBitOffset = 50; - - public const int kVersionBits = 13; - - public const int kVersionShiftInWord = 51; - - public const long kVersionMaskInWord = ((1L << kVersionBits) - 1) << kVersionShiftInWord; + const int kTotalSizeInBytes = 8; + const int kTotalBits = kTotalSizeInBytes * 8; - public const long kVersionMaskInInteger = (1L << kVersionBits) - 1; + // Previous address + const int kPreviousAddressBits = 48; + const long kPreviousAddressMaskInWord = (1L << kPreviousAddressBits) - 1; - public const long kPreviousAddressMask = (1L << 48) - 1; + // Shift position of lock in word + const int kLockShiftInWord = kPreviousAddressBits; - public const long kLatchBitMask = (1L << kLatchBitOffset); + // We use 6 lock bits: 5 shared lock bits + 1 exclusive lock bit + const int kSharedLockBits = 5; + const int kExlcusiveLockBits = 1; - public const long kTombstoneMask = (1L << kTombstoneBitOffset); + // Shared lock constants + const long kSharedLockMaskInWord = ((1L << kSharedLockBits) - 1) << kLockShiftInWord; + const long kSharedLockIncrement = 1L << kLockShiftInWord; - public const long kInvalidBitMask = (1L << kInvalidBitOffset); + // Exclusive lock constants + const int kExclusiveLockBitOffset = kLockShiftInWord + kSharedLockBits; + const long kExclusiveLockBitMask = 1L << kExclusiveLockBitOffset; -#if RECORD_INFO_WITH_PIN_COUNT - public const int kTotalSizeInBytes = sizeof(long) + sizeof(int); + // Other marker bits + const int kTombstoneBitOffset = kExclusiveLockBitOffset + 1; + const int kValidBitOffset = kTombstoneBitOffset + 1; + const int kStubBitOffset = kValidBitOffset + 1; + const int kSealedBitOffset = kStubBitOffset + 1; + const int kDirtyBitOffset = kSealedBitOffset + 1; - public const int kTotalBits = kTotalSizeInBytes * 8; - - [FieldOffset(0)] - private long word; - - [FieldOffset(sizeof(long))] - private int access_data; - - public static void WriteInfo(RecordInfo* info, int checkpointVersion, bool tombstone, bool invalidBit, long previousAddress) - { - info->word = default(long); - info->Tombstone = tombstone; - info->Invalid = invalidBit; - info->PreviousAddress = previousAddress; - info->Version = checkpointVersion; - info->access_data = 0; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool TryPin() - { - return Interlocked.Increment(ref access_data) > 0; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool TryMarkReadOnly() - { - return Interlocked.CompareExchange(ref access_data, int.MinValue, 0) == 0; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void MarkReadOnly() - { - var found_value = Interlocked.CompareExchange(ref access_data, int.MinValue, 0); - if (found_value != 0) - { - int num_iterations = 1000; - Thread.SpinWait(num_iterations); - while (Interlocked.CompareExchange(ref access_data, int.MinValue, 0) != 0) - { - Thread.SpinWait(num_iterations); - num_iterations <<= 1; - } - } - } + const long kTombstoneBitMask = 1L << kTombstoneBitOffset; + const long kValidBitMask = 1L << kValidBitOffset; + const long kStubBitMask = 1L << kStubBitOffset; + const long kSealedBitMask = 1L << kSealedBitOffset; + const long kDirtyBitMask = 1L << kDirtyBitOffset; - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void Unpin() - { - Interlocked.Decrement(ref access_data); - } + // Shift position of version in word + const int kVersionShiftInWord = kDirtyBitOffset + 1; -#else - public const int kTotalSizeInBytes = sizeof(long); + // We use the remaining bits (64 - 59 = 5) as a short version for record + const int kVersionBits = kTotalBits - kVersionShiftInWord; - public const int kTotalBits = kTotalSizeInBytes * 8; + // Version constants + const long kVersionMaskInWord = ((1L << kVersionBits) - 1) << kVersionShiftInWord; + internal const long kVersionMaskInInteger = (1L << kVersionBits) - 1; [FieldOffset(0)] private long word; @@ -113,129 +71,190 @@ public static void WriteInfo(ref RecordInfo info, int checkpointVersion, bool to info.PreviousAddress = previousAddress; info.Version = checkpointVersion; } - - public static string ToString(RecordInfo* info) + /// + /// Take exclusive (write) lock on RecordInfo + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void LockExclusive() { - return "RecordHeader Word = " + info->word; + // Acquire exclusive lock (readers may still be present) + while (true) + { + long expected_word = word; + if ((expected_word & kExclusiveLockBitMask) == 0) + { + if (expected_word == Interlocked.CompareExchange(ref word, expected_word | kExclusiveLockBitMask, expected_word)) + break; + } + Thread.Yield(); + } + + // Wait for readers to drain + while ((word & kSharedLockMaskInWord) != 0) Thread.Yield(); } + /// + /// Unlock RecordInfo that was previously locked for exclusive access, via + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool TryPin() - { - throw new InvalidOperationException(); - } + public void UnlockExclusive() => word &= ~kExclusiveLockBitMask; // Safe because there should be no other threads (e.g., readers) updating the word at this point + /// + /// Try to take an exclusive (write) lock on RecordInfo + /// + /// Number of attempts before giving up + /// Whether lock was acquired successfully [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool TryMarkReadOnly() + public bool TryLockExclusive(int spinCount = 1) { - throw new InvalidOperationException(); + // Acquire exclusive lock (readers may still be present) + while (true) + { + long expected_word = word; + if ((expected_word & kExclusiveLockBitMask) == 0) + { + if (expected_word == Interlocked.CompareExchange(ref word, expected_word | kExclusiveLockBitMask, expected_word)) + break; + } + if (--spinCount <= 0) return false; + Thread.Yield(); + } + + // Wait for readers to drain + while ((word & kSharedLockMaskInWord) != 0) Thread.Yield(); + return true; } + /// + /// Take shared (read) lock on RecordInfo + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void MarkReadOnly() + public void LockShared() { - throw new InvalidOperationException(); + // Acquire shared lock + while (true) + { + long expected_word = word; + if (((expected_word & kExclusiveLockBitMask) == 0) // not exclusively locked + && (expected_word & kSharedLockMaskInWord) != kSharedLockMaskInWord) // shared lock is not full + { + if (expected_word == Interlocked.CompareExchange(ref word, expected_word + kSharedLockIncrement, expected_word)) + break; + } + Thread.Yield(); + } } + /// + /// Unlock RecordInfo that was previously locked for shared access, via + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void Unpin() - { - throw new InvalidOperationException(); - } -#endif + public void UnlockShared() => Interlocked.Add(ref word, -kSharedLockIncrement); + /// + /// Take shared (read) lock on RecordInfo + /// + /// Number of attempts before giving up + /// Whether lock was acquired successfully [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void SpinLock() + public bool TryLockShared(int spinCount = 1) { - // Note: Any improvements here should be done in IntExclusiveLocker.SpinLock() as well. + // Acquire shared lock while (true) { long expected_word = word; - if ((expected_word & kLatchBitMask) == 0) + if (((expected_word & kExclusiveLockBitMask) == 0) // not exclusively locked + && (expected_word & kSharedLockMaskInWord) != kSharedLockMaskInWord) // shared lock is not full { - var found_word = Interlocked.CompareExchange(ref word, expected_word | kLatchBitMask, expected_word); - if (found_word == expected_word) - return; + if (expected_word == Interlocked.CompareExchange(ref word, expected_word + kSharedLockIncrement, expected_word)) + break; } + if (--spinCount <= 0) return false; Thread.Yield(); } + return true; } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void Unlock() + public bool IsNull() => word == 0; + + public bool Tombstone { - word &= ~kLatchBitMask; + get => (word & kTombstoneBitMask) > 0; + set + { + if (value) word |= kTombstoneBitMask; + else word &= ~kTombstoneBitMask; + } } - public bool IsNull() + public bool Valid { - return word == 0; + get => (word & kValidBitMask) > 0; + set + { + if (value) word |= kValidBitMask; + else word &= ~kValidBitMask; + } } - public bool Tombstone + public bool Stub { - get + get => (word & kTombstoneBitMask) > 0; + set { - return (word & kTombstoneMask) > 0; + if (value) word |= kStubBitMask; + else word &= ~kStubBitMask; } + } + public bool Sealed + { + get => (word & kSealedBitMask) > 0; set { - if (value) - { - word |= kTombstoneMask; - } - else - { - word &= ~kTombstoneMask; - } + if (value) word |= kSealedBitMask; + else word &= ~kSealedBitMask; } } - public bool Invalid + public bool Dirty { - get + get => (word & kDirtyBitMask) > 0; + set { - return !((word & kInvalidBitMask) > 0); + if (value) word |= kDirtyBitMask; + else word &= ~kDirtyBitMask; } + } + + public bool Invalid + { + get => !((word & kValidBitMask) > 0); set { - if (value) - { - word &= ~kInvalidBitMask; - } - else - { - word |= kInvalidBitMask; - } + if (value) word &= ~kValidBitMask; + else word |= kValidBitMask; } } public int Version { - get - { - return (int)(((word & kVersionMaskInWord) >> kVersionShiftInWord) & kVersionMaskInInteger); - } - [MethodImpl(MethodImplOptions.AggressiveInlining)] + get => (int)((word & kVersionMaskInWord) >> kVersionShiftInWord); set { word &= ~kVersionMaskInWord; - word |= ((value & kVersionMaskInInteger) << kVersionShiftInWord); + word |= (value & kVersionMaskInInteger) << kVersionShiftInWord; } } public long PreviousAddress { - get - { - return (word & kPreviousAddressMask); - } + get => word & kPreviousAddressMaskInWord; set { - word &= ~kPreviousAddressMask; - word |= (value & kPreviousAddressMask); + word &= ~kPreviousAddressMaskInWord; + word |= value & kPreviousAddressMaskInWord; } } diff --git a/cs/src/core/Index/Interfaces/FunctionsBase.cs b/cs/src/core/Index/Interfaces/FunctionsBase.cs index 857f0bbae..b0b86f462 100644 --- a/cs/src/core/Index/Interfaces/FunctionsBase.cs +++ b/cs/src/core/Index/Interfaces/FunctionsBase.cs @@ -77,13 +77,19 @@ public virtual void CheckpointCompletionCallback(string sessionId, CommitPoint c /// public virtual void Lock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, ref long lockContext) { - recordInfo.SpinLock(); + if (lockType == LockType.Exclusive) + recordInfo.LockExclusive(); + else + recordInfo.LockShared(); } /// public virtual bool Unlock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, long lockContext) { - recordInfo.Unlock(); + if (lockType == LockType.Exclusive) + recordInfo.UnlockExclusive(); + else + recordInfo.UnlockShared(); return true; } } diff --git a/cs/src/core/Index/Interfaces/IFunctions.cs b/cs/src/core/Index/Interfaces/IFunctions.cs index 20e3a44b6..7b581f4ed 100644 --- a/cs/src/core/Index/Interfaces/IFunctions.cs +++ b/cs/src/core/Index/Interfaces/IFunctions.cs @@ -276,8 +276,7 @@ void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, long address) #region Locking /// - /// User-provided lock call, defaulting to no-op. A default exclusive implementation is available via . - /// See also to use two bits of an existing int value. + /// User-provided lock call, defaulting to no-op. A default implementation is available via and . /// /// The header for the current record /// The key for the current record @@ -290,8 +289,7 @@ void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, long address) void Lock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, ref long lockContext); /// - /// User-provided unlock call, defaulting to no-op. A default exclusive implementation is available via . - /// See also to use two bits of an existing int value. + /// User-provided unlock call, defaulting to no-op. A default exclusive implementation is available via and . /// /// The header for the current record /// The key for the current record diff --git a/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs b/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs index 9706d76a6..f986684b7 100644 --- a/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs +++ b/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs @@ -173,11 +173,10 @@ public override SystemState NextState(SystemState start) break; case Phase.PREPARE: nextState.Phase = Phase.IN_PROGRESS; - // 13 bits of 1s --- FASTER records only store 13 bits of version number, and we need to ensure that - // the next version is distinguishable from the last in those 13 bits. - var bitMask = (1L << 13) - 1; + // FASTER records only store a few bits of version number, and we need to ensure that + // the next version is distinguishable from the last in those bits. // If they are not distinguishable, simply increment target version to resolve this - if (((targetVersion - start.Version) & bitMask) == 0) + if (((targetVersion - start.Version) & RecordInfo.kVersionMaskInInteger) == 0) targetVersion++; // TODO: Move to long for system state as well. diff --git a/cs/src/core/Utilities/IntExclusiveLocker.cs b/cs/src/core/Utilities/IntExclusiveLocker.cs deleted file mode 100644 index 38a74e858..000000000 --- a/cs/src/core/Utilities/IntExclusiveLocker.cs +++ /dev/null @@ -1,52 +0,0 @@ -using System.Runtime.CompilerServices; -using System.Threading; - -namespace FASTER.core -{ - /// - /// Exclusive lock + marking using 2 MSB bits of int - /// - internal struct IntExclusiveLocker - { - const int kLatchBitMask = 1 << 31; - const int kMarkBitMask = 1 << 30; - public const int kHeaderMask = 3 << 30; - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void SpinLock(ref int value) - { - // Note: Any improvements here should be done in RecordInfo.SpinLock() as well. - while (true) - { - int expected_word = value; - if ((expected_word & kLatchBitMask) == 0) - { - var found_word = Interlocked.CompareExchange(ref value, expected_word | kLatchBitMask, expected_word); - if (found_word == expected_word) return; - } - Thread.Yield(); - } - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void Unlock(ref int value) - { - value &= ~kLatchBitMask; - } - - public static void Mark(ref int value) - { - value |= kMarkBitMask; - } - - public static void Unmark(ref int value) - { - value &= ~kMarkBitMask; - } - - public static bool IsMarked(ref int value) - { - return (value & kMarkBitMask) != 0; - } - } -} \ No newline at end of file diff --git a/cs/src/core/VarLen/MemoryFunctions.cs b/cs/src/core/VarLen/MemoryFunctions.cs index 53bc2ad84..0c16134bc 100644 --- a/cs/src/core/VarLen/MemoryFunctions.cs +++ b/cs/src/core/VarLen/MemoryFunctions.cs @@ -33,9 +33,12 @@ public override bool ConcurrentWriter(ref Key key, ref Memory input, ref Memo { // We can write the source (src) data to the existing destination (dst) in-place, // only if there is sufficient space - if (dst.Length < src.Length || dst.IsMarkedReadOnly()) + if (recordInfo.Sealed) + return false; + + if (dst.Length < src.Length) { - dst.MarkReadOnly(); + recordInfo.Sealed = true; return false; } @@ -85,21 +88,5 @@ public override bool InPlaceUpdater(ref Key key, ref Memory input, ref Memory // The default implementation of IPU simply writes input to destination, if there is space return ConcurrentWriter(ref key, ref input, ref input, ref value, ref recordInfo, address); } - - /// - public override bool SupportsLocking => locking; - - /// - public override void Lock(ref RecordInfo recordInfo, ref Key key, ref Memory value, LockType lockType, ref long lockContext) - { - value.SpinLock(); - } - - /// - public override bool Unlock(ref RecordInfo recordInfo, ref Key key, ref Memory value, LockType lockType, long lockContext) - { - value.Unlock(); - return true; - } } } \ No newline at end of file diff --git a/cs/src/core/VarLen/MemoryVarLenStruct.cs b/cs/src/core/VarLen/MemoryVarLenStruct.cs index 9f27e35b8..2b411bc3b 100644 --- a/cs/src/core/VarLen/MemoryVarLenStruct.cs +++ b/cs/src/core/VarLen/MemoryVarLenStruct.cs @@ -39,7 +39,7 @@ public unsafe ref Memory AsRef(void* source) } count = (count + 1) % 4; ref var cache = ref refCache[count]; - var len = (*(int*)source) & ~IntExclusiveLocker.kHeaderMask; + var len = *(int*)source; cache.Item1.SetDestination((T*)((byte*)source + sizeof(int)), len / sizeof(T)); cache.Item2 = cache.Item1.Memory; return ref cache.Item2; diff --git a/cs/src/core/VarLen/SpanByte.cs b/cs/src/core/VarLen/SpanByte.cs index d75150753..b4db17ba1 100644 --- a/cs/src/core/VarLen/SpanByte.cs +++ b/cs/src/core/VarLen/SpanByte.cs @@ -12,18 +12,17 @@ namespace FASTER.core /// /// Represents a pinned variable length byte array that is viewable as a fixed (pinned) Span<byte> /// Format: [4-byte (int) length of payload][[optional 8-byte metadata] payload bytes...] - /// First 4 bits of length are used as a mask for various properties, so max length is 256MB + /// First 2 bits of length are used as a mask for properties, so max payload length is 1GB /// [StructLayout(LayoutKind.Explicit)] public unsafe struct SpanByte { - // Byte #30 and #31 are used for read-only and lock respectively - // Byte #29 is used to denote unserialized (1) or serialized (0) data - const int kUnserializedBitMask = 1 << 29; - // Byte #28 is used to denote extra metadata present (1) or absent (0) in payload - const int kExtraMetadataBitMask = 1 << 28; + // Byte #31 is used to denote unserialized (1) or serialized (0) data + const int kUnserializedBitMask = 1 << 31; + // Byte #30 is used to denote extra metadata present (1) or absent (0) in payload + const int kExtraMetadataBitMask = 1 << 30; // Mask for header - const int kHeaderMask = 0xf << 28; + const int kHeaderMask = 0x3 << 30; /// /// Length of the payload @@ -77,7 +76,7 @@ public int Length /// /// Size of metadata header, if any (returns 0 or 8) /// - public int MetadataSize => (length & kExtraMetadataBitMask) >> (28 - 3); + public int MetadataSize => (length & kExtraMetadataBitMask) >> (30 - 3); /// /// Constructor @@ -445,27 +444,5 @@ public void CopyTo(byte* destination) Buffer.MemoryCopy((void*)payload, destination + sizeof(int), Length, Length); } } - - /// - /// Lock SpanByte, using 2 most significant bits from the length header - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void SpinLock() => IntExclusiveLocker.SpinLock(ref length); - - /// - /// Unlock SpanByte, using 2 most significant bits from the length header - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void Unlock() => IntExclusiveLocker.Unlock(ref length); - - /// - /// Mark SpanByte as read-only, using 2 most significant bits from the length header - /// - public void MarkReadOnly() => IntExclusiveLocker.Mark(ref length); - - /// - /// Check if SpanByte is marked as read-only, using 2 most significant bits from the length header - /// - public bool IsMarkedReadOnly() => IntExclusiveLocker.IsMarked(ref length); } } diff --git a/cs/src/core/VarLen/SpanByteFunctions.cs b/cs/src/core/VarLen/SpanByteFunctions.cs index b00d8c2b6..a48101ed3 100644 --- a/cs/src/core/VarLen/SpanByteFunctions.cs +++ b/cs/src/core/VarLen/SpanByteFunctions.cs @@ -25,14 +25,14 @@ public override void SingleWriter(ref Key key, ref SpanByte input, ref SpanByte /// public override bool ConcurrentWriter(ref Key key, ref SpanByte input, ref SpanByte src, ref SpanByte dst, ref RecordInfo recordInfo, long address) { - if (locking) dst.SpinLock(); - // We can write the source (src) data to the existing destination (dst) in-place, // only if there is sufficient space - if (dst.Length < src.Length || dst.IsMarkedReadOnly()) + if (recordInfo.Sealed) + return false; + + if (dst.Length < src.Length) { - dst.MarkReadOnly(); - if (locking) dst.Unlock(); + recordInfo.Sealed = true; return false; } @@ -44,7 +44,6 @@ public override bool ConcurrentWriter(ref Key key, ref SpanByte input, ref SpanB // This method will also zero out the extra space to retain log scan correctness. dst.ShrinkSerializedLength(src.Length); - if (locking) dst.Unlock(); return true; } @@ -105,13 +104,19 @@ public unsafe override bool ConcurrentReader(ref SpanByte key, ref SpanByte inpu /// public override void Lock(ref RecordInfo recordInfo, ref SpanByte key, ref SpanByte value, LockType lockType, ref long lockContext) { - value.SpinLock(); + if (lockType == LockType.Exclusive) + recordInfo.LockExclusive(); + else + recordInfo.LockShared(); } /// public override bool Unlock(ref RecordInfo recordInfo, ref SpanByte key, ref SpanByte value, LockType lockType, long lockContext) { - value.Unlock(); + if (lockType == LockType.Exclusive) + recordInfo.UnlockExclusive(); + else + recordInfo.UnlockShared(); return true; } } @@ -147,13 +152,19 @@ public override bool ConcurrentReader(ref SpanByte key, ref SpanByte input, ref /// public override void Lock(ref RecordInfo recordInfo, ref SpanByte key, ref SpanByte value, LockType lockType, ref long lockContext) { - value.SpinLock(); + if (lockType == LockType.Exclusive) + recordInfo.LockExclusive(); + else + recordInfo.LockShared(); } /// public override bool Unlock(ref RecordInfo recordInfo, ref SpanByte key, ref SpanByte value, LockType lockType, long lockContext) { - value.Unlock(); + if (lockType == LockType.Exclusive) + recordInfo.UnlockExclusive(); + else + recordInfo.UnlockShared(); return true; } } diff --git a/cs/src/core/VarLen/UnsafeLogMemoryExtensions.cs b/cs/src/core/VarLen/UnsafeLogMemoryExtensions.cs index a614b872b..e82ed7ff2 100644 --- a/cs/src/core/VarLen/UnsafeLogMemoryExtensions.cs +++ b/cs/src/core/VarLen/UnsafeLogMemoryExtensions.cs @@ -32,51 +32,5 @@ public static unsafe bool ShrinkSerializedLength(this Memory memory, int n } return true; } - - /// - /// Lock Memory serialized on log, using 2 most significant bits from the length header - /// - /// - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static unsafe void SpinLock(this Memory memory) where T : unmanaged - { - var ptr = Unsafe.AsPointer(ref memory.Span[0]); - IntExclusiveLocker.SpinLock(ref Unsafe.AsRef((byte*)ptr - sizeof(int))); - } - - /// - /// Unlock Memory serialized on log, using 2 most significant bits from the length header - /// - /// - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static unsafe void Unlock(this Memory memory) where T : unmanaged - { - var ptr = Unsafe.AsPointer(ref memory.Span[0]); - IntExclusiveLocker.Unlock(ref Unsafe.AsRef((byte*)ptr - sizeof(int))); - } - - /// - /// Mark Memory serialized on log as read-only, using 2 most significant bits from the length header - /// - /// - /// - public static unsafe void MarkReadOnly(this Memory memory) where T : unmanaged - { - var ptr = Unsafe.AsPointer(ref memory.Span[0]); - IntExclusiveLocker.Mark(ref Unsafe.AsRef((byte*)ptr - sizeof(int))); - } - - /// - /// Check is Memory serialized on log is marked as read-only, using 2 most significant bits from the length header - /// - /// - /// - public static unsafe bool IsMarkedReadOnly(this Memory memory) where T : unmanaged - { - var ptr = Unsafe.AsPointer(ref memory.Span[0]); - return IntExclusiveLocker.IsMarked(ref Unsafe.AsRef((byte*)ptr - sizeof(int))); - } } } \ No newline at end of file diff --git a/cs/test/LockTests.cs b/cs/test/LockTests.cs index 3b62c6c25..87cff461b 100644 --- a/cs/test/LockTests.cs +++ b/cs/test/LockTests.cs @@ -15,6 +15,10 @@ internal class LockTests { internal class Functions : SimpleFunctions { + public Functions() : base(true) + { + } + static bool Increment(ref int dst) { ++dst; @@ -24,14 +28,6 @@ static bool Increment(ref int dst) public override bool ConcurrentWriter(ref int key, ref int input, ref int src, ref int dst, ref RecordInfo recordInfo, long address) => Increment(ref dst); public override bool InPlaceUpdater(ref int key, ref int input, ref int value, ref int output, ref RecordInfo recordInfo, long address) => Increment(ref value); - - public override bool SupportsLocking => true; - public override void Lock(ref RecordInfo recordInfo, ref int key, ref int value, LockType lockType, ref long lockContext) => recordInfo.SpinLock(); - public override bool Unlock(ref RecordInfo recordInfo, ref int key, ref int value, LockType lockType, long lockContext) - { - recordInfo.Unlock(); - return true; - } } private FasterKV fkv; @@ -66,10 +62,12 @@ public unsafe void RecordInfoLockTest() { for (var ii = 0; ii < 5; ++ii) { - RecordInfo recordInfo = new RecordInfo(); + RecordInfo recordInfo = new(); RecordInfo* ri = &recordInfo; - XLockTest(() => ri->SpinLock(), () => ri->Unlock()); + XLockTest(() => ri->LockExclusive(), () => ri->UnlockExclusive()); + SLockTest(() => ri->LockShared(), () => ri->UnlockShared()); + XSLockTest(() => ri->LockExclusive(), () => ri->UnlockExclusive(), () => ri->LockShared(), () => ri->UnlockShared()); } } @@ -97,12 +95,68 @@ void XLockTestFunc() } } - [Test] - [Category("FasterKV")] - public void IntExclusiveLockerTest() + private void SLockTest(Action locker, Action unlocker) + { + long lockTestValue = 1; + long lockTestValueResult = 0; + + const int numThreads = 50; + const int numIters = 5000; + + var tasks = Enumerable.Range(0, numThreads).Select(ii => Task.Factory.StartNew(SLockTestFunc)).ToArray(); + Task.WaitAll(tasks); + + Assert.AreEqual(numThreads * numIters, lockTestValueResult); + + void SLockTestFunc() + { + for (int ii = 0; ii < numIters; ++ii) + { + locker(); + Interlocked.Add(ref lockTestValueResult, Interlocked.Read(ref lockTestValue)); + Thread.Yield(); + unlocker(); + } + } + } + + private void XSLockTest(Action xlocker, Action xunlocker, Action slocker, Action sunlocker) { - int lockTestValue = 0; - XLockTest(() => IntExclusiveLocker.SpinLock(ref lockTestValue), () => IntExclusiveLocker.Unlock(ref lockTestValue)); + long lockTestValue = 0; + long lockTestValueResult = 0; + + const int numThreads = 50; + const int numIters = 5000; + + var tasks = Enumerable.Range(0, numThreads).Select(ii => Task.Factory.StartNew(XLockTestFunc)) + .Concat(Enumerable.Range(0, numThreads).Select(ii => Task.Factory.StartNew(SLockTestFunc))).ToArray(); + Task.WaitAll(tasks); + + Assert.AreEqual(numThreads * numIters, lockTestValue); + Assert.AreEqual(numThreads * numIters, lockTestValueResult); + + void XLockTestFunc() + { + for (int ii = 0; ii < numIters; ++ii) + { + xlocker(); + var temp = lockTestValue; + Thread.Yield(); + lockTestValue = temp + 1; + xunlocker(); + } + } + + void SLockTestFunc() + { + for (int ii = 0; ii < numIters; ++ii) + { + slocker(); + Interlocked.Add(ref lockTestValueResult, 1); + Thread.Yield(); + sunlocker(); + } + } } [Test]