From 93708414cf469dfb73025117bf919a4fa024f744 Mon Sep 17 00:00:00 2001 From: qwp0905 Date: Sat, 2 May 2026 13:45:30 +0900 Subject: [PATCH 1/2] add latch struct for gc --- src/cache/latch.rs | 75 ++++++++++++++++++++++++++++++++++++++++ src/cache/mod.rs | 3 ++ src/cache/tests/latch.rs | 53 ++++++++++++++++++++++++++++ 3 files changed, 131 insertions(+) create mode 100644 src/cache/latch.rs create mode 100644 src/cache/tests/latch.rs diff --git a/src/cache/latch.rs b/src/cache/latch.rs new file mode 100644 index 0000000..0162c57 --- /dev/null +++ b/src/cache/latch.rs @@ -0,0 +1,75 @@ +use std::sync::{Condvar, Mutex}; + +use crate::utils::ShortenedMutex; + +struct State { + locked: bool, + high_waiters: usize, +} + +pub struct Latch { + state: Mutex, + high_cv: Condvar, + low_cv: Condvar, +} +impl Latch { + pub const fn new() -> Self { + Self { + state: Mutex::new(State { + locked: false, + high_waiters: 0, + }), + high_cv: Condvar::new(), + low_cv: Condvar::new(), + } + } + + pub fn lock_immediately<'a, 'b>(&'a self) -> LatchGuard<'b> + where + 'a: 'b, + { + let mut state = self.state.l(); + while state.locked { + state.high_waiters += 1; + state = self.high_cv.wait(state).unwrap(); + state.high_waiters -= 1; + } + state.locked = true; + + LatchGuard(self) + } + + pub fn lock_lazily<'a, 'b>(&'a self) -> LatchGuard<'b> + where + 'a: 'b, + { + let mut state = self.state.l(); + while state.locked || state.high_waiters > 0 { + state = self.low_cv.wait(state).unwrap(); + } + state.locked = true; + + LatchGuard(self) + } + + fn unlock(&self) { + let mut state = self.state.l(); + state.locked = false; + if state.high_waiters > 0 { + self.high_cv.notify_one(); + } else { + self.low_cv.notify_one(); + } + } +} + +pub struct LatchGuard<'a>(&'a Latch); +impl<'a> Drop for LatchGuard<'a> { + fn drop(&mut self) { + self.0.unlock(); + } +} + +#[cfg(test)] +#[path = "tests/latch.rs"] +mod tests; diff --git a/src/cache/mod.rs b/src/cache/mod.rs index 878a3ce..7738531 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -24,3 +24,6 @@ use block::*; mod dirty; use dirty::*; + +mod latch; +use latch::*; diff --git a/src/cache/tests/latch.rs b/src/cache/tests/latch.rs new file mode 100644 index 0000000..3555543 --- /dev/null +++ b/src/cache/tests/latch.rs @@ -0,0 +1,53 @@ +use std::{ + sync::atomic::{AtomicUsize, Ordering}, + thread::{scope, yield_now}, +}; + +use crossbeam::queue::SegQueue; + +use super::*; + +#[test] +fn test_priority() { + let l = Latch::new(); + let v = SegQueue::new(); + let c = AtomicUsize::new(0); + + let ic = 5; + let lc = 5; + scope(|s| { + let _g = l.lock_immediately(); + + let mut th = vec![]; + + for _ in 0..lc { + let t = s.spawn(|| { + c.fetch_add(1, Ordering::Release); + let _g = l.lock_lazily(); + v.push(2); + }); + th.push(t) + } + + for _ in 0..ic { + let t = s.spawn(|| { + c.fetch_add(1, Ordering::Release); + let _g = l.lock_immediately(); + v.push(1); + }); + + th.push(t) + } + + while c.load(Ordering::Acquire) < ic + lc { + yield_now(); + } + + drop(_g); + + th.into_iter().for_each(|t| t.join().unwrap()); + }); + + let result = vec![1, 1, 1, 1, 1, 2, 2, 2, 2, 2]; + assert_eq!(v.into_iter().collect::>(), result) +} From db17eb7957c81535acc03f04a9a9b09175473a76 Mon Sep 17 00:00:00 2001 From: qwp0905 Date: Sat, 2 May 2026 13:45:44 +0900 Subject: [PATCH 2/2] apply latch --- src/cache/block.rs | 18 +++++++++----- src/cache/slot.rs | 51 +++++++++++++++++++++++++++++++++++--- src/cache/temp.rs | 17 ++++++++----- src/cursor/gc.rs | 2 +- src/cursor/tree_manager.rs | 4 +-- src/transaction/version.rs | 18 -------------- 6 files changed, 73 insertions(+), 37 deletions(-) diff --git a/src/cache/block.rs b/src/cache/block.rs index 2ee533a..8b3579d 100644 --- a/src/cache/block.rs +++ b/src/cache/block.rs @@ -1,17 +1,19 @@ -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::Arc; + +use super::{Latch, LatchGuard}; use crate::{ disk::{PageRef, Pointer, PAGE_SIZE}, table::TableHandle, thread::TaskHandle, - utils::{AtomicArc, ShortenedMutex}, + utils::AtomicArc, }; pub struct CachedBlock { page: AtomicArc>, pointer: Pointer, handle: Arc, - latch: Mutex<()>, + latch: Latch, } impl CachedBlock { #[inline] @@ -24,7 +26,7 @@ impl CachedBlock { page: AtomicArc::new(page), pointer, handle, - latch: Mutex::new(()), + latch: Latch::new(), } } @@ -42,8 +44,12 @@ impl CachedBlock { } #[inline] - pub fn latch(&self) -> MutexGuard<'_, ()> { - self.latch.l() + pub fn latch(&self) -> LatchGuard<'_> { + self.latch.lock_immediately() + } + #[inline] + pub fn lazy_latch(&self) -> LatchGuard<'_> { + self.latch.lock_lazily() } #[inline] diff --git a/src/cache/slot.rs b/src/cache/slot.rs index 27cd521..aff4144 100644 --- a/src/cache/slot.rs +++ b/src/cache/slot.rs @@ -1,9 +1,9 @@ use std::{ mem::{transmute, ManuallyDrop}, - sync::{Arc, MutexGuard}, + sync::Arc, }; -use super::{BlockId, CachedBlock, DirtyTables, TempBlockRef, TempGuard}; +use super::{BlockId, CachedBlock, DirtyTables, LatchGuard, TempBlockRef, TempGuard}; use crate::{ disk::{Page, PagePool, PageRef, Pointer, PAGE_SIZE}, table::TableHandle, @@ -73,6 +73,16 @@ impl<'a> CacheSlot<'a> { CacheSlot::Page(page) => WritableSlot::Page(page.for_write()), } } + #[inline] + pub fn for_lazy_write<'b>(self) -> WritableSlot<'b> + where + 'a: 'b, + { + match self { + CacheSlot::Temp(temp) => WritableSlot::Temp(temp.for_lazy_write()), + CacheSlot::Page(page) => WritableSlot::Page(page.for_lazy_write()), + } + } } pub enum WritableSlot<'a> { Temp(TempSlotWrite<'a>), @@ -156,6 +166,23 @@ impl<'a> PageSlot<'a> { _latch, } } + + fn for_lazy_write<'b>(self) -> PageSlotWrite<'b> + where + 'a: 'b, + { + let mut shadow = self.page_pool.acquire(); + let _latch = self.block.lazy_latch(); + shadow.copy_from(self.block.load_page().as_ref().as_ref()); + PageSlotWrite { + shadow: ManuallyDrop::new(shadow), + block: self.block, + dirty: self.dirty, + block_id: self.block_id, + _token: self.token, + _latch, + } + } } pub struct PageSlotWrite<'a> { shadow: ManuallyDrop>, @@ -163,7 +190,7 @@ pub struct PageSlotWrite<'a> { dirty: &'a AtomicBitmap, block_id: BlockId, _token: SharedToken<'a>, - _latch: MutexGuard<'a, ()>, + _latch: LatchGuard<'a>, } impl<'a> Drop for PageSlotWrite<'a> { @@ -213,6 +240,22 @@ impl<'a> TempSlot<'a> { let latch = unsafe { transmute(self.state.latch()) }; shadow.copy_from(self.state.load_page().as_ref().as_ref()); + TempSlotWrite { + shadow: ManuallyDrop::new(shadow), + state: ManuallyDrop::new(self.state), + pointer: self.pointer, + guard: self.guard, + latch: ManuallyDrop::new(latch), + } + } + fn for_lazy_write<'b>(self) -> TempSlotWrite<'b> + where + 'a: 'b, + { + let mut shadow = self.page_pool.acquire(); + let latch = unsafe { transmute(self.state.lazy_latch()) }; + shadow.copy_from(self.state.load_page().as_ref().as_ref()); + TempSlotWrite { shadow: ManuallyDrop::new(shadow), state: ManuallyDrop::new(self.state), @@ -237,7 +280,7 @@ pub struct TempSlotWrite<'a> { state: ManuallyDrop>>, pointer: Pointer, guard: Option<(TempGuard<'a>, Arc, &'a DirtyTables)>, - latch: ManuallyDrop>, + latch: ManuallyDrop>, } impl<'a> TempSlotWrite<'a> { diff --git a/src/cache/temp.rs b/src/cache/temp.rs index 0cb517d..b05bf0f 100644 --- a/src/cache/temp.rs +++ b/src/cache/temp.rs @@ -5,13 +5,14 @@ use std::{ ops::Deref, sync::{ atomic::{AtomicBool, Ordering}, - Arc, Mutex, MutexGuard, + Arc, }, }; +use super::{Latch, LatchGuard}; use crate::{ disk::{PageRef, PAGE_SIZE}, - utils::{AtomicArc, ExclusivePin, ExclusiveToken, SharedToken, ShortenedMutex}, + utils::{AtomicArc, ExclusivePin, ExclusiveToken, SharedToken}, }; /** @@ -29,7 +30,7 @@ pub struct TempBlockState { page: OnceCell>>, block_pin: ExclusivePin, dirty: AtomicBool, - latch: Mutex<()>, + latch: Latch, } impl TempBlockState { @@ -38,7 +39,7 @@ impl TempBlockState { page: OnceCell::new(), block_pin: ExclusivePin::new(), dirty: AtomicBool::new(false), - latch: Mutex::new(()), + latch: Latch::new(), } } @@ -71,8 +72,12 @@ impl TempBlockState { } #[inline] - pub fn latch(&self) -> MutexGuard<'_, ()> { - self.latch.l() + pub fn latch(&self) -> LatchGuard<'_> { + self.latch.lock_immediately() + } + #[inline] + pub fn lazy_latch(&self) -> LatchGuard<'_> { + self.latch.lock_lazily() } } diff --git a/src/cursor/gc.rs b/src/cursor/gc.rs index b0ec358..31ede8e 100644 --- a/src/cursor/gc.rs +++ b/src/cursor/gc.rs @@ -222,7 +222,7 @@ const fn run_entry( continue; } - let mut slot = block_cache.peek(ptr, table.handle())?.for_write(); + let mut slot = block_cache.peek(ptr, table.handle())?.for_lazy_write(); let mut entry: DataEntry = slot.as_ref().deserialize()?; let prev_len = entry.len(); diff --git a/src/cursor/tree_manager.rs b/src/cursor/tree_manager.rs index 9065b4e..72c0cf4 100644 --- a/src/cursor/tree_manager.rs +++ b/src/cursor/tree_manager.rs @@ -334,7 +334,7 @@ fn run_merge_leaf( } } - let mut slot = block_cache.peek(i, table.handle())?.for_write(); + let mut slot = block_cache.peek(i, table.handle())?.for_lazy_write(); let mut leaf = slot.as_ref().deserialize::()?.as_leaf()?; next_ptr = leaf.get_next(); @@ -376,7 +376,7 @@ fn run_merge_leaf( // merge without propagating to internal nodes. debug!("trying to start merge {} with {}", slot.get_pointer(), next); - let mut next_slot = block_cache.peek(next, table.handle())?.for_write(); + let mut next_slot = block_cache.peek(next, table.handle())?.for_lazy_write(); let next_leaf = next_slot.as_ref().deserialize::()?; leaf.set_next(slot.get_pointer()); diff --git a/src/transaction/version.rs b/src/transaction/version.rs index 129cf69..648bce7 100644 --- a/src/transaction/version.rs +++ b/src/transaction/version.rs @@ -155,24 +155,6 @@ pub struct VersionVisibility { snapshot_id: AtomicU8, } impl VersionVisibility { - pub fn new( - base_path: PathBuf, - aborted: T, - last_tx_id: TxId, - last_snapshot_id: u8, - ) -> Self - where - T: IntoIterator, - { - Self { - base_path, - active: SkipMap::new(), - aborted: SkipSet::from_iter(aborted), - last_tx_id: AtomicTxId::new(last_tx_id), - snapshot_id: AtomicU8::new(last_snapshot_id), - } - } - pub fn replay( base_path: PathBuf, last_tx_id: TxId,