From b078322ff3c6308a6dfd60c581f05801696364f3 Mon Sep 17 00:00:00 2001 From: qwp0905 Date: Thu, 30 Apr 2026 13:09:33 +0900 Subject: [PATCH 01/13] remove unused asref --- src/wal/segment.rs | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/src/wal/segment.rs b/src/wal/segment.rs index 1da9d2d..276ae98 100644 --- a/src/wal/segment.rs +++ b/src/wal/segment.rs @@ -76,28 +76,20 @@ impl WALSegment { Ok(Self::new(file, path.to_path_buf(), flush_count)) } - pub fn read>>( - &self, - pointer: Pointer, - page: &mut P, - ) -> Result { + pub fn read(&self, pointer: Pointer, page: &mut Page) -> Result { self .file - .pread(page.as_mut().as_mut(), pointer * SIZE) + .pread(page.as_mut(), pointer * SIZE) .map(|_| ()) .map_err(Error::IO) } - pub fn write>>( - &self, - pointer: Pointer, - page: &P, - ) -> Result { + pub fn write(&self, pointer: Pointer, page: &Page) -> Result { // transmute extends the slice lifetime to 'static to satisfy the background thread's // type bound. Safe because wait and flatten blocks until the write completes, ensuring // the page buffer outlives the background thread's use of the pointer. self .io - .execute((pointer, unsafe { transmute(page.as_ref().as_ref()) })) + .execute((pointer, unsafe { transmute(page.as_ref()) })) .wait() .flatten() } From 3881bdd75899f1c9bbb55399f723843f11e655ea Mon Sep 17 00:00:00 2001 From: qwp0905 Date: Thu, 30 Apr 2026 13:11:25 +0900 Subject: [PATCH 02/13] add wal buffer size configuration to configure wal page pool size --- README.md | 1 + src/builder.rs | 9 +++++++++ src/engine.rs | 2 ++ src/wal/wal.rs | 3 ++- 4 files changed, 14 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 5b78cc6..b04cb11 100644 --- a/README.md +++ b/README.md @@ -107,6 +107,7 @@ println!("get p99: {}µs", m.operation_get_latency_micros_p99); | Option | Description | |--------|-------------| | `wal_file_size` | Size limit of a single WAL segment file. When exceeded, a new segment is created. Larger segments improve write throughput by reducing rotation/checkpoint frequency, but extend recovery time on crash since more records must be replayed before the engine becomes available. | +| `wal_buffer_size` | Soft limit of WAL buffer size. | | `io_thread_count` | Number of background IO worker threads shared across tables for write batching. Each table holds at most one worker at a time. | | `wal_segment_flush_delay` | Maximum time to wait before triggering a checkpoint for WAL segment reuse. | | `wal_segment_flush_count` | Maximum number of commits to buffer before triggering a checkpoint for WAL segment reuse. | diff --git a/src/builder.rs b/src/builder.rs index 4973634..5225016 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -15,6 +15,7 @@ where base_path, io_thread_count: DEFAULT_IO_THREAD_COUNT, wal_file_size: DEFAULT_WAL_FILE_SIZE, + wal_buffer_size: DEFAULT_WAL_BUFFER_SIZE, wal_segment_flush_count: DEFAULT_WAL_SEGMENT_FLUSH_COUNT, wal_segment_flush_delay: DEFAULT_WAL_SEGMENT_FLUSH_DELAY, checkpoint_interval: DEFAULT_CHECKPOINT_INTERVAL, @@ -47,6 +48,13 @@ where self.0.wal_file_size = size; self } + /** + * Soft limit of WAL buffer size. + */ + pub const fn wal_buffer_size(mut self, size: usize) -> Self { + self.0.wal_buffer_size = size; + self + } /** * WAL segment reuse requires a checkpoint to confirm durability. * A checkpoint fires when either the commit count or the delay is reached, @@ -148,6 +156,7 @@ where } const DEFAULT_WAL_FILE_SIZE: usize = 512 << 20; // 512 mb +const DEFAULT_WAL_BUFFER_SIZE: usize = 8 << 20; const DEFAULT_WAL_SEGMENT_FLUSH_DELAY: Duration = Duration::from_secs(10); const DEFAULT_WAL_SEGMENT_FLUSH_COUNT: usize = 32; const DEFAULT_CHECKPOINT_INTERVAL: Duration = Duration::from_secs(60); diff --git a/src/engine.rs b/src/engine.rs index e2f0587..9bf7ebc 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -33,6 +33,7 @@ where pub base_path: T, pub io_thread_count: usize, pub wal_file_size: usize, + pub wal_buffer_size: usize, pub wal_segment_flush_delay: Duration, pub wal_segment_flush_count: usize, pub checkpoint_interval: Duration, @@ -71,6 +72,7 @@ impl Engine { let wal_config = WALConfig { group_commit_count: config.group_commit_count, max_file_size: config.wal_file_size, + max_buffer_size: config.wal_buffer_size, base_dir: base_path.clone(), segment_flush_count: config.wal_segment_flush_count, segment_flush_delay: config.wal_segment_flush_delay, diff --git a/src/wal/wal.rs b/src/wal/wal.rs index 68feb7b..40d6e27 100644 --- a/src/wal/wal.rs +++ b/src/wal/wal.rs @@ -33,6 +33,7 @@ pub struct WALConfig { pub segment_flush_count: usize, pub group_commit_count: usize, pub max_file_size: usize, + pub max_buffer_size: usize, } /** @@ -102,7 +103,7 @@ pub struct WAL { impl WAL { pub fn replay(config: &WALConfig) -> Result<(Self, ReplayResult)> { let max_len = config.max_file_size / WAL_BLOCK_SIZE; - let page_pool = PagePool::new(max_len); + let page_pool = PagePool::new(config.max_buffer_size / WAL_BLOCK_SIZE); info!("start to replay wal segments"); let replay_result = replay(&config.base_dir, config.group_commit_count, &page_pool)?; From 64c0d925c094cf226d971dd31ad4f7cb3d32b147 Mon Sep 17 00:00:00 2001 From: qwp0905 Date: Thu, 30 Apr 2026 13:12:12 +0900 Subject: [PATCH 03/13] replace atomic to rwlock + arc to avoid memory leak --- src/cache/block.rs | 39 +++++++++----------------------------- src/cache/temp.rs | 47 ++++++++++++++++++++++++++-------------------- 2 files changed, 36 insertions(+), 50 deletions(-) diff --git a/src/cache/block.rs b/src/cache/block.rs index 4e5e963..c11abd3 100644 --- a/src/cache/block.rs +++ b/src/cache/block.rs @@ -1,16 +1,14 @@ -use std::sync::{atomic::Ordering, Arc, Mutex, MutexGuard}; - -use crossbeam::epoch::{pin, Atomic, Guard, Owned, Shared}; +use std::sync::{Arc, Mutex, MutexGuard, RwLock}; use crate::{ disk::{PageRef, Pointer, PAGE_SIZE}, table::TableHandle, thread::TaskHandle, - utils::{ShortenedMutex, UnsafeBorrow}, + utils::{ShortenedMutex, ShortenedRwLock, ToArc}, }; pub struct CachedBlock { - page: Atomic>, + page: RwLock>>, pointer: Pointer, handle: Arc, latch: Mutex<()>, @@ -23,7 +21,7 @@ impl CachedBlock { handle: Arc, ) -> Self { Self { - page: Atomic::new(page), + page: RwLock::new(page.to_arc()), pointer, handle, latch: Mutex::new(()), @@ -36,16 +34,11 @@ impl CachedBlock { } #[inline] - pub fn load_page<'a>(&self, guard: &'a Guard) -> *const PageRef { - self.page.load(Ordering::Acquire, guard).as_raw() + pub fn load_page(&self) -> Arc> { + self.page.rl().clone() } pub fn store(&self, page: PageRef) { - let g = pin(); - let old = self.page.swap(Owned::new(page), Ordering::Release, &g); - if old.is_null() { - return; - } - unsafe { g.defer_destroy(old) }; + *self.page.wl() = page.to_arc(); } #[inline] @@ -53,15 +46,12 @@ impl CachedBlock { self.latch.l() } - /** - * Guard must be live until async write is done. - */ #[inline] - pub fn flush_async<'a>(&self, guard: &'a Guard) -> TaskHandle<()> { + pub fn flush_async(&self) -> TaskHandle<()> { self .handle .disk() - .write_async(self.pointer, self.load_page(guard).borrow_unsafe()) + .write_async(self.pointer, self.load_page()) } #[inline] @@ -69,14 +59,3 @@ impl CachedBlock { &self.handle } } - -impl Drop for CachedBlock { - fn drop(&mut self) { - let g = pin(); - let old = self.page.swap(Shared::null(), Ordering::Release, &g); - if old.is_null() { - return; - } - unsafe { g.defer_destroy(old) }; - } -} diff --git a/src/cache/temp.rs b/src/cache/temp.rs index eddf416..1bf3540 100644 --- a/src/cache/temp.rs +++ b/src/cache/temp.rs @@ -1,18 +1,20 @@ use std::{ + cell::Cell, marker::PhantomData, - mem::transmute, + mem::{transmute, MaybeUninit}, ops::Deref, + ptr::drop_in_place, sync::{ atomic::{AtomicBool, Ordering}, - Arc, Mutex, MutexGuard, + Arc, Mutex, MutexGuard, RwLock, }, }; -use crossbeam::epoch::{pin, Atomic, Guard, Owned, Shared}; - use crate::{ disk::{PageRef, PAGE_SIZE}, - utils::{ExclusivePin, ExclusiveToken, SharedToken, ShortenedMutex}, + utils::{ + ExclusivePin, ExclusiveToken, SharedToken, ShortenedMutex, ShortenedRwLock, ToArc, + }, }; /** @@ -28,32 +30,39 @@ use crate::{ */ pub struct TempBlockState { pin: ExclusivePin, - page: Atomic>, + page: MaybeUninit>>>, dirty: AtomicBool, latch: Mutex<()>, + initialized: Cell, } impl TempBlockState { - pub const fn new() -> Self { + pub fn new() -> Self { Self { pin: ExclusivePin::new(), - page: Atomic::null(), + page: MaybeUninit::uninit(), + initialized: Cell::new(false), dirty: AtomicBool::new(false), latch: Mutex::new(()), } } - pub fn load_page<'a>(&self, guard: &'a Guard) -> *const PageRef { - self.page.load(Ordering::Acquire, guard).as_raw() + pub fn load_page<'a>(&self) -> Arc> { + unsafe { self.page.assume_init_ref() }.rl().clone() } pub fn store(&self, page: PageRef) { - let g = pin(); - let old = self.page.swap(Owned::new(page), Ordering::Release, &g); - if old.is_null() { - return; - } - unsafe { g.defer_destroy(old) }; + *unsafe { self.page.assume_init_ref() }.wl() = page.to_arc(); + } + + pub fn init(&self, page: PageRef) { + self.initialized.set(true); + unsafe { self.cast().write(RwLock::new(page.to_arc())) } + } + + #[inline] + const fn cast(&self) -> *mut RwLock>> { + self.page.as_ptr() as *mut RwLock>> } #[inline] @@ -77,12 +86,10 @@ impl TempBlockState { } impl Drop for TempBlockState { fn drop(&mut self) { - let g = pin(); - let old = self.page.swap(Shared::null(), Ordering::Release, &g); - if old.is_null() { + if !self.initialized.get() { return; } - unsafe { g.defer_destroy(old) }; + unsafe { drop_in_place(self.cast()) }; } } From a8f838b5fc7b29dc665ba78083b2f7fb576ae25d Mon Sep 17 00:00:00 2001 From: qwp0905 Date: Thu, 30 Apr 2026 13:13:03 +0900 Subject: [PATCH 04/13] add limit to block cache flush to avoid the depletion of the io pool --- src/cache/cache.rs | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/src/cache/cache.rs b/src/cache/cache.rs index b8fe94c..60e024a 100644 --- a/src/cache/cache.rs +++ b/src/cache/cache.rs @@ -1,7 +1,5 @@ use std::{mem::MaybeUninit, ptr::drop_in_place, sync::Arc, time::Duration}; -use crossbeam::epoch::{self, Guard}; - use super::{Acquired, CacheSlot, CachedBlock, DirtyTables, LRUTable, Peeked}; use crate::{ debug, @@ -13,9 +11,6 @@ use crate::{ utils::{AtomicBitmap, ExclusivePin, SharedToken, ToArc, ToBox}, }; -const PRE_FLUSH_THRESHOLD: usize = 100; -const PRE_FLUSH_INTERVAL: Duration = Duration::from_millis(500); - pub struct BlockCacheConfig { pub shard_count: usize, pub capacity: usize, @@ -118,7 +113,7 @@ impl BlockCache { let mut page = self.page_pool.acquire(); handle.disk().read(pointer, &mut page)?; - state_ref.store(page); + state_ref.init(page); Ok(CacheSlot::temp( state_ref.downgrade(), pointer, @@ -154,7 +149,7 @@ impl BlockCache { let old = unsafe { ptr.replace(new_block) }; if self.dirty_frames.contains(block_id) { - old.flush_async(&epoch::pin()).wait()?; + old.flush_async().wait()?; self.dirty_frames.remove(block_id); self.dirty_tables.mark(old.handle()); } @@ -199,10 +194,13 @@ impl Drop for BlockCache { } } -fn __flush(waiting: &mut Vec<(TaskHandle<()>, Guard)>) -> Result { - waiting.drain(..).map(|(w, _guard)| w.wait()).collect() +fn __flush(waiting: &mut Vec>) -> Result { + waiting.drain(..).map(|w| w.wait()).collect() } -const MAX_BATCHING: usize = PRE_FLUSH_THRESHOLD; + +const PRE_FLUSH_INTERVAL: Duration = Duration::from_millis(500); +const PRE_FLUSH_THRESHOLD: usize = 100; +const MAX_BATCHING: usize = 16; const fn handle_flush( blocks: Arc>>, @@ -237,8 +235,7 @@ const fn handle_flush( // buffer and sort them, then batch into a single pwritev call. // Writing synchronously one by one would bypass this optimization // and issue a separate syscall per page. - let guard = epoch::pin(); - waits.push((block.flush_async(&guard), guard)); + waits.push(block.flush_async()); dirty_tables.mark(block.handle()); } From 3780c248db3ad1528d0d8a9dd7eedfb7eafffab5 Mon Sep 17 00:00:00 2001 From: qwp0905 Date: Thu, 30 Apr 2026 13:13:33 +0900 Subject: [PATCH 05/13] remove pin and apply changes in cached blocks --- src/cache/slot.rs | 70 ++++++++++++++++++++--------------------------- 1 file changed, 29 insertions(+), 41 deletions(-) diff --git a/src/cache/slot.rs b/src/cache/slot.rs index 0211a4d..27cd521 100644 --- a/src/cache/slot.rs +++ b/src/cache/slot.rs @@ -3,13 +3,11 @@ use std::{ sync::{Arc, MutexGuard}, }; -use crossbeam::epoch::{pin, Guard}; - use super::{BlockId, CachedBlock, DirtyTables, TempBlockRef, TempGuard}; use crate::{ disk::{Page, PagePool, PageRef, Pointer, PAGE_SIZE}, table::TableHandle, - utils::{AtomicBitmap, SharedToken, UnsafeBorrow}, + utils::{AtomicBitmap, SharedToken}, }; /** @@ -55,13 +53,13 @@ impl<'a> CacheSlot<'a> { }) } #[inline] - pub fn for_read<'b>(self, guard: &'b Guard) -> ReadonlySlot<'b> + pub fn for_read<'b>(self) -> ReadonlySlot<'b> where 'a: 'b, { match self { - CacheSlot::Temp(temp) => ReadonlySlot::Temp(temp.for_read(guard)), - CacheSlot::Page(page) => ReadonlySlot::Page(page.for_read(guard)), + CacheSlot::Temp(temp) => ReadonlySlot::Temp(temp.for_read()), + CacheSlot::Page(page) => ReadonlySlot::Page(page.for_read()), } } @@ -70,10 +68,9 @@ impl<'a> CacheSlot<'a> { where 'a: 'b, { - let guard = pin(); match self { - CacheSlot::Temp(temp) => WritableSlot::Temp(temp.for_write(&guard)), - CacheSlot::Page(page) => WritableSlot::Page(page.for_write(&guard)), + CacheSlot::Temp(temp) => WritableSlot::Temp(temp.for_write()), + CacheSlot::Page(page) => WritableSlot::Page(page.for_write()), } } } @@ -85,8 +82,8 @@ impl<'a> AsMut> for WritableSlot<'a> { #[inline] fn as_mut(&mut self) -> &mut Page { match self { - WritableSlot::Temp(temp) => temp.shadow.as_mut(), - WritableSlot::Page(page) => page.shadow.as_mut(), + WritableSlot::Temp(temp) => &mut temp.shadow, + WritableSlot::Page(page) => &mut page.shadow, } } } @@ -94,8 +91,8 @@ impl<'a> AsRef> for WritableSlot<'a> { #[inline] fn as_ref(&self) -> &Page { match self { - WritableSlot::Temp(temp) => temp.shadow.as_ref(), - WritableSlot::Page(page) => page.shadow.as_ref(), + WritableSlot::Temp(temp) => &temp.shadow, + WritableSlot::Page(page) => &page.shadow, } } } @@ -117,8 +114,8 @@ impl<'a> AsRef> for ReadonlySlot<'a> { #[inline] fn as_ref(&self) -> &Page { match self { - ReadonlySlot::Temp(temp) => temp.page.borrow_unsafe().as_ref(), - ReadonlySlot::Page(page) => page.page.borrow_unsafe().as_ref(), + ReadonlySlot::Temp(temp) => &temp.page, + ReadonlySlot::Page(page) => &page.page, } } } @@ -132,27 +129,24 @@ pub struct PageSlot<'a> { } impl<'a> PageSlot<'a> { #[inline] - fn for_read<'b>(self, guard: &'b Guard) -> PageSlotRead<'b> + fn for_read<'b>(self) -> PageSlotRead<'b> where 'a: 'b, { - let page = self.block.load_page(&guard); + let page = self.block.load_page(); PageSlotRead { page, - _guard: guard, _token: self.token, } } - fn for_write<'b>(self, guard: &Guard) -> PageSlotWrite<'b> + fn for_write<'b>(self) -> PageSlotWrite<'b> where 'a: 'b, { let mut shadow = self.page_pool.acquire(); let _latch = self.block.latch(); - shadow - .as_mut() - .copy_from(self.block.load_page(guard).borrow_unsafe().as_ref()); + shadow.copy_from(self.block.load_page().as_ref().as_ref()); PageSlotWrite { shadow: ManuallyDrop::new(shadow), block: self.block, @@ -186,8 +180,7 @@ impl<'a> Drop for PageSlotWrite<'a> { } pub struct PageSlotRead<'a> { - page: *const PageRef, - _guard: &'a Guard, + page: Arc>, _token: SharedToken<'a>, } @@ -199,28 +192,26 @@ pub struct TempSlot<'a> { } impl<'a> TempSlot<'a> { #[inline] - fn for_read<'b>(self, guard: &'b Guard) -> TempSlotRead<'b> + fn for_read<'b>(self) -> TempSlotRead<'b> where 'a: 'b, { - let page = self.state.load_page(&guard); + let page = self.state.load_page(); TempSlotRead { state: ManuallyDrop::new(self.state), - page, + page: ManuallyDrop::new(page), pointer: self.pointer, - guard: self.guard, + temp_guard: self.guard, } } - fn for_write<'b>(self, guard: &Guard) -> TempSlotWrite<'b> + fn for_write<'b>(self) -> TempSlotWrite<'b> where 'a: 'b, { let mut shadow = self.page_pool.acquire(); let latch = unsafe { transmute(self.state.latch()) }; - shadow - .as_mut() - .copy_from(self.state.load_page(&guard).borrow_unsafe().as_ref()); + shadow.copy_from(self.state.load_page().as_ref().as_ref()); TempSlotWrite { shadow: ManuallyDrop::new(shadow), @@ -259,9 +250,7 @@ impl<'a> TempSlotWrite<'a> { unsafe { ManuallyDrop::drop(&mut self.latch) }; let state = unsafe { ManuallyDrop::take(&mut self.state) }.upgrade(); - let guard = pin(); - let page = state.load_page(&guard); - let _ = handle.disk().write(self.pointer, page.borrow_unsafe()); + let _ = handle.disk().write(self.pointer, state.load_page()); dirty.mark(&handle); } } @@ -292,9 +281,9 @@ impl<'a> Drop for TempSlotWrite<'a> { */ pub struct TempSlotRead<'a> { state: ManuallyDrop>>, - page: *const PageRef, + page: ManuallyDrop>>, pointer: Pointer, - guard: Option<(TempGuard<'a>, Arc, &'a DirtyTables)>, + temp_guard: Option<(TempGuard<'a>, Arc, &'a DirtyTables)>, } impl<'a> TempSlotRead<'a> { fn release( @@ -308,18 +297,17 @@ impl<'a> TempSlotRead<'a> { return; } - let guard = pin(); - let page = state.load_page(&guard); - let _ = handle.disk().write(self.pointer, page.borrow_unsafe()); + let _ = handle.disk().write(self.pointer, state.load_page()); dirty.mark(&handle); } } impl<'a> Drop for TempSlotRead<'a> { #[inline] fn drop(&mut self) { + unsafe { ManuallyDrop::drop(&mut self.page) }; // block_guard identifies the creator of this temp page, who is responsible // for cleanup (disk write + remove_temp). Non-creators simply unpin and exit. - if let Some((guard, handle, dirty)) = self.guard.take() { + if let Some((guard, handle, dirty)) = self.temp_guard.take() { return self.release(handle, guard, dirty); } From b6a542c5931d13270ea893458fe8266ac98c6e42 Mon Sep 17 00:00:00 2001 From: qwp0905 Date: Thu, 30 Apr 2026 13:17:47 +0900 Subject: [PATCH 06/13] add find method to data entry --- src/cursor/entry.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/cursor/entry.rs b/src/cursor/entry.rs index 2284bec..fc6a69b 100644 --- a/src/cursor/entry.rs +++ b/src/cursor/entry.rs @@ -102,11 +102,17 @@ impl DataEntry { self.versions = new_versions; } - #[allow(unused)] pub fn get_versions(&self) -> impl Iterator { self.versions.iter() } + pub fn find

(&self, predicate: P) -> Option<&VersionRecord> + where + P: FnMut(&&VersionRecord) -> bool, + { + self.versions.iter().find(predicate) + } + pub fn get_last_owner(&self) -> Option { self.versions.front().map(|v| v.owner) } From 7639363ed0983bdf407e254067a8e9f81b4fd811 Mon Sep 17 00:00:00 2001 From: qwp0905 Date: Thu, 30 Apr 2026 13:18:13 +0900 Subject: [PATCH 07/13] remove overall pins which used in read cached blocks --- src/cursor/btree.rs | 173 +++++++++++++++++++++----------------------- 1 file changed, 83 insertions(+), 90 deletions(-) diff --git a/src/cursor/btree.rs b/src/cursor/btree.rs index 0017179..7dfc442 100644 --- a/src/cursor/btree.rs +++ b/src/cursor/btree.rs @@ -27,23 +27,22 @@ impl BTreeIndex { let mut ptr = self .0 .fetch_slot(HEADER_POINTER, table)? - .for_read(&pin()) + .for_read() .as_ref() .deserialize::()? .get_root(); loop { + // This guard protects the next node or data entry from the GC or tree manager. + // By declaring a guard before reading the current page, it is guaranteed that the pointers written to the current page have been reclaimed and are not reused. let guard = pin(); - let slot = self.0.fetch_slot(ptr, table)?.for_read(&guard); + let slot = self.0.fetch_slot(ptr, table)?.for_read(); match slot.as_ref().view::()? { BTreeNodeView::Internal(node) => ptr = node.find(key).unwrap_or_else(|i| i), BTreeNodeView::Leaf(node) => match node.find(key) { - NodeFindResult::Found(_, i) => { - drop(slot); - return Ok(Some((guard, i))); - } - NodeFindResult::Move(i) => ptr = i, + NodeFindResult::Found(_, i) => return Ok(Some((guard, i))), NodeFindResult::NotFound(_) => return Ok(None), + NodeFindResult::Move(i) => ptr = i, }, } } @@ -59,7 +58,7 @@ impl BTreeIndex { for &ptr in pointers { let chunk: DataChunk = policy .fetch_slot(ptr, table)? - .for_read(&pin()) + .for_read() .as_ref() .deserialize()?; data.extend_from_slice(chunk.get_data()); @@ -73,7 +72,7 @@ impl BTreeIndex { key: KeyRef, table: &Arc, ) -> Result>>> { - let (mut guard, ptr) = match self.get_entry(key, table)? { + let (mut _guard, ptr) = match self.get_entry(key, table)? { Some(v) => v, None => return Ok(None), }; @@ -84,31 +83,31 @@ impl BTreeIndex { let entry: DataEntry = self .0 .fetch_slot(ptr, table)? - .for_read(&guard) + .for_read() .as_ref() .deserialize()?; - for record in entry.get_versions() { - if self.0.is_visible(record.owner, record.version) { - return Ok(Some(match &record.data { - RecordData::Data(data) => Some(data.to_vec()), - RecordData::Chunked(pointers) => { - Some(Self::read_chunk(&self.0, pointers, table)?) - } - RecordData::Tombstone => None, - })); - } + if let Some(record) = + entry.find(|&record| self.0.is_visible(record.owner, record.version)) + { + return Ok(Some(match &record.data { + RecordData::Data(data) => Some(data.to_vec()), + RecordData::Chunked(pointers) => { + Some(Self::read_chunk(&self.0, pointers, table)?) + } + RecordData::Tombstone => None, + })); } next = entry.get_next(); - guard = new_guard; + _guard = new_guard; } Ok(None) } pub fn contains(&self, key: KeyRef, table: &Arc) -> Result { - let (mut guard, ptr) = match self.get_entry(key, table)? { + let (mut _guard, ptr) = match self.get_entry(key, table)? { Some(v) => v, None => return Ok(false), }; @@ -119,21 +118,21 @@ impl BTreeIndex { let entry: DataEntry = self .0 .fetch_slot(ptr, table)? - .for_read(&guard) + .for_read() .as_ref() .deserialize()?; - for record in entry.get_versions() { - if self.0.is_visible(record.owner, record.version) { - return Ok(match &record.data { - RecordData::Chunked(_) | RecordData::Data(_) => true, - RecordData::Tombstone => false, - }); - } - } + if let Some(record) = + entry.find(|&record| self.0.is_visible(record.owner, record.version)) + { + return Ok(match &record.data { + RecordData::Chunked(_) | RecordData::Data(_) => true, + RecordData::Tombstone => false, + }); + }; next = entry.get_next(); - guard = new_guard; + _guard = new_guard; } Ok(false) @@ -148,7 +147,7 @@ impl BTreeIndex { let header = self .0 .fetch_slot(HEADER_POINTER, table)? - .for_read(&pin()) + .for_read() .as_ref() .deserialize::()?; (header.get_root(), header.get_height()) @@ -158,7 +157,7 @@ impl BTreeIndex { while let BTreeNodeView::Internal(node) = self .0 .fetch_slot(ptr, table)? - .for_read(&pin()) + .for_read() .as_ref() .view::()? { @@ -187,9 +186,10 @@ impl BTreeIndex { let root = self.0.alloc_and_log(&BTreeNode::initial_state(), table)?; { - let header = TreeHeader::new(root); let mut slot = self.0.fetch_slot(HEADER_POINTER, table)?.for_write(); - self.0.serialize_and_log(&mut slot, &header, table)?; + self + .0 + .serialize_and_log(&mut slot, &TreeHeader::new(root), table)?; } Ok(()) @@ -304,8 +304,7 @@ impl BTreeIndex { drop(header_slot); while stack.len() < diff { - let guard = pin(); - let slot = self.0.fetch_slot(ptr, table)?.for_read(&guard); + let slot = self.0.fetch_slot(ptr, table)?.for_read(); let node = slot.as_ref().view::()?.as_internal()?; match node.find(&split_key) { Ok(i) => stack.push(replace(&mut ptr, i)), @@ -499,7 +498,7 @@ where let mut ptr = self .0 .fetch_slot(HEADER_POINTER, table)? - .for_read(&pin()) + .for_read() .as_ref() .deserialize::()? .get_root(); @@ -507,7 +506,7 @@ where while let BTreeNodeView::Internal(node) = self .0 .fetch_slot(ptr, table)? - .for_read(&pin()) + .for_read() .as_ref() .view::()? { @@ -558,16 +557,16 @@ where ) -> Result { let mut ptr = policy .fetch_slot(HEADER_POINTER, table)? - .for_read(&pin()) + .for_read() .as_ref() .deserialize::()? .get_root(); let mut buffered = VecDeque::new(); - 'outer: loop { - let guard = pin(); - let slot = policy.fetch_slot(ptr, table)?.for_read(&guard); + loop { + let _guard = pin(); + let slot = policy.fetch_slot(ptr, table)?.for_read(); match slot.as_ref().view::()? { BTreeNodeView::Internal(node) => match &start { Bound::Included(k) => ptr = node.find(k).unwrap_or_else(|i| i), @@ -575,32 +574,31 @@ where Bound::Unbounded => ptr = node.first_child(), }, BTreeNodeView::Leaf(node) => { - let pos = 'inner: loop { - match &start { - Bound::Included(k) => match node.find(k) { - NodeFindResult::Found(i, _) => break 'inner i, - NodeFindResult::NotFound(i) => break 'inner i, - NodeFindResult::Move(i) => { - ptr = i; - continue 'outer; - } - }, - Bound::Excluded(k) => match node.find(k) { - NodeFindResult::Found(i, _) => break 'inner i + 1, - NodeFindResult::NotFound(i) => break 'inner i, - NodeFindResult::Move(i) => { - ptr = i; - continue 'outer; - } - }, - Bound::Unbounded => break 0, - } + let pos = match &start { + Bound::Included(k) => match node.find(k) { + NodeFindResult::Found(i, _) => i, + NodeFindResult::NotFound(i) => i, + NodeFindResult::Move(i) => { + ptr = i; + continue; + } + }, + Bound::Excluded(k) => match node.find(k) { + NodeFindResult::Found(i, _) => i + 1, + NodeFindResult::NotFound(i) => i, + NodeFindResult::Move(i) => { + ptr = i; + continue; + } + }, + Bound::Unbounded => 0, }; + let mut count = 0; for (k, p) in node.get_entries_while(end).skip(pos) { count += 1; - if let Some(found) = Self::__find(policy, p, table, &guard)? { + if let Some(found) = Self::__find(policy, p, table)? { buffered.push_back((k.to_vec(), found)); } } @@ -627,31 +625,30 @@ where policy: &'a Policy, ptr: Pointer, table: &Arc, - guard: &Guard, ) -> Result>> { let mut next = Some(ptr); while let Some(ptr) = next.take() { let entry: DataEntry = policy .fetch_slot(ptr, table)? - .for_read(guard) + .for_read() .as_ref() .deserialize()?; - for record in entry.get_versions() { - if policy.is_visible(record.owner, record.version) { - return Ok(Some(match &record.data { - RecordData::Data(data) => { - Some((Buffered::Data(data.to_vec()), record.owner, record.version)) - } - RecordData::Chunked(pointers) => Some(( - Buffered::Chunked(pointers.to_vec()), - record.owner, - record.version, - )), - RecordData::Tombstone => None, - })); - } + if let Some(record) = + entry.find(|record| policy.is_visible(record.owner, record.version)) + { + return Ok(Some(match &record.data { + RecordData::Data(data) => { + Some((Buffered::Data(data.to_vec()), record.owner, record.version)) + } + RecordData::Chunked(pointers) => Some(( + Buffered::Chunked(pointers.to_vec()), + record.owner, + record.version, + )), + RecordData::Tombstone => None, + })); } next = entry.get_next(); @@ -660,12 +657,8 @@ where Ok(None) } - fn find_value( - &self, - ptr: Pointer, - guard: &Guard, - ) -> Result>> { - Self::__find(self.policy, ptr, &self.table, guard) + fn find_value(&self, ptr: Pointer) -> Result>> { + Self::__find(self.policy, ptr, &self.table) } fn fill_up(&mut self) -> Result { @@ -679,14 +672,14 @@ where } }; - let guard = pin(); - let slot = self.policy.fetch_slot(ptr, &self.table)?.for_read(&guard); + let _guard = pin(); + let slot = self.policy.fetch_slot(ptr, &self.table)?.for_read(); let node = slot.as_ref().view::()?.as_leaf()?; let mut count = 0; for (k, p) in node.get_entries_while(&self.end) { count += 1; - if let Some(found) = self.find_value(p, &guard)? { + if let Some(found) = self.find_value(p)? { self.buffered.push_back((k.to_vec(), found)) } } From a8151dabe624b6f5767cdc600c5130d07cef1d37 Mon Sep 17 00:00:00 2001 From: qwp0905 Date: Thu, 30 Apr 2026 13:18:43 +0900 Subject: [PATCH 08/13] replace asref/asmut to deref/derefmut --- src/disk/page_pool.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/disk/page_pool.rs b/src/disk/page_pool.rs index 4a2a01e..0aa7ebc 100644 --- a/src/disk/page_pool.rs +++ b/src/disk/page_pool.rs @@ -1,4 +1,8 @@ -use std::{mem::ManuallyDrop, sync::Arc}; +use std::{ + mem::ManuallyDrop, + ops::{Deref, DerefMut}, + sync::Arc, +}; use crossbeam::{queue::ArrayQueue, utils::Backoff}; @@ -25,15 +29,17 @@ impl PageRef { Self::from_exists(store, Page::new()) } } -impl AsRef> for PageRef { +impl Deref for PageRef { + type Target = Page; + #[inline] - fn as_ref(&self) -> &Page { + fn deref(&self) -> &Self::Target { &self.page } } -impl AsMut> for PageRef { +impl DerefMut for PageRef { #[inline] - fn as_mut(&mut self) -> &mut Page { + fn deref_mut(&mut self) -> &mut Self::Target { &mut self.page } } From a929c59bc398960a0d2bdd48bb0d92eda0249866 Mon Sep 17 00:00:00 2001 From: qwp0905 Date: Thu, 30 Apr 2026 13:18:50 +0900 Subject: [PATCH 09/13] fix typo --- src/wal/replay.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/wal/replay.rs b/src/wal/replay.rs index 60523e9..87de2c9 100644 --- a/src/wal/replay.rs +++ b/src/wal/replay.rs @@ -8,7 +8,7 @@ use super::{ LogId, Operation, SegmentGeneration, TxId, WALSegment, FILE_EXT, WAL_BLOCK_SIZE, }; use crate::{ - disk::{PagePool, Pointer}, + disk::{Page, PagePool, Pointer}, error::{Error, Result}, table::TableId, }; @@ -95,7 +95,7 @@ pub fn replay( let mut page = page_pool.acquire(); wal.read(i, &mut page)?; - let (r, complete) = page.as_ref().into(); + let (r, complete) = (&page as &Page<_>).into(); records.extend(r.into_iter()); if complete { break; From 0beba2d5f0d57ad7bc42b5c3df6e88f1e9311bd8 Mon Sep 17 00:00:00 2001 From: qwp0905 Date: Thu, 30 Apr 2026 13:19:13 +0900 Subject: [PATCH 10/13] separating the responsibility for pinning --- src/disk/free.rs | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/src/disk/free.rs b/src/disk/free.rs index 2b0f72b..a4af6b1 100644 --- a/src/disk/free.rs +++ b/src/disk/free.rs @@ -1,11 +1,6 @@ -use std::sync::{ - atomic::{AtomicU64, Ordering}, - Arc, -}; +use std::sync::atomic::{AtomicU64, Ordering}; -use crossbeam::{epoch, queue::SegQueue}; - -use crate::utils::ToArc; +use crossbeam::queue::SegQueue; use super::Pointer; @@ -14,13 +9,13 @@ use super::Pointer; */ pub struct FreeList { file_end: AtomicU64, - released: Arc>, + released: SegQueue, } impl FreeList { pub fn new() -> Self { Self { file_end: AtomicU64::new(1), - released: SegQueue::new().to_arc(), + released: SegQueue::new(), } } @@ -32,8 +27,7 @@ impl FreeList { } pub fn dealloc(&self, pointer: Pointer) { - let released = self.released.clone(); - epoch::pin().defer(move || released.push(pointer)); + self.released.push(pointer); } pub fn replay(&self, file_end: Pointer) { self.file_end.store(file_end, Ordering::Release); From d339ffa70c16d5634c37ac7e083c14ca904acf0e Mon Sep 17 00:00:00 2001 From: qwp0905 Date: Thu, 30 Apr 2026 13:19:30 +0900 Subject: [PATCH 11/13] apply arc to diskcontroller --- src/disk/controller.rs | 28 ++++++++++------------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/src/disk/controller.rs b/src/disk/controller.rs index 757b9b7..f31913f 100644 --- a/src/disk/controller.rs +++ b/src/disk/controller.rs @@ -27,7 +27,7 @@ type ThreadArg = ( Arc, ); type WriteThread = dyn BackgroundThread, ()>; -type WriteTask = (Pointer, &'static PageRef); +type WriteTask = (Pointer, Arc>); type WriteQueue = SegQueue<(WriteTask, OneshotFulfill)>; struct WriteHandle { @@ -58,7 +58,7 @@ impl WriteHandle { &self, file: &Arc, pointer: Pointer, - page: &'static PageRef, + page: Arc>, ) -> TaskHandle<()> { let (o, f) = oneshot(); let handle = TaskHandle::new(o); @@ -100,7 +100,7 @@ impl IOPool { } #[inline] - pub fn open_controller>(&self, path: P) -> Result> { + pub fn open_controller(&self, path: &Path) -> Result> { DiskController::open( path, WriteHandle::new(self.thread.clone()), @@ -212,8 +212,8 @@ pub struct DiskController { impl DiskController { const SIZE: Pointer = N as Pointer; - fn open>( - path: P, + fn open( + path: &Path, write_handle: WriteHandle, metrics: Arc, ) -> Result { @@ -225,7 +225,7 @@ impl DiskController { .read(true) .write(true) .create(true) - .direct_io(path.as_ref()) + .direct_io(path) .map_err(Error::IO)? .to_arc(); @@ -240,25 +240,17 @@ impl DiskController { self .metrics .disk_read - .measure(|| { - self - .file - .pread(page.as_mut().as_mut(), pointer * Self::SIZE) - }) + .measure(|| self.file.pread(page.as_mut(), pointer * Self::SIZE)) .map(|_| ()) .map_err(Error::IO) } #[inline] - pub fn write_async( - &self, - pointer: Pointer, - page: &'static PageRef, - ) -> TaskHandle<()> { + pub fn write_async(&self, pointer: Pointer, page: Arc>) -> TaskHandle<()> { self.write_handle.execute(&self.file, pointer, page) } #[inline] - pub fn write(&self, pointer: Pointer, page: &'static PageRef) -> Result { + pub fn write(&self, pointer: Pointer, page: Arc>) -> Result { self.write_async(pointer, page).wait() } @@ -278,7 +270,7 @@ impl DiskController { Ok(meta.len() / Self::SIZE) } - pub fn truncate>(&self, path: P) -> Result { + pub fn truncate(&self, path: &Path) -> Result { let backoff = Backoff::new(); loop { match self.write_handle.pin.try_exclusive() { From 03bcb09de5d36b8f6fbadccc5d418396ebabb1a5 Mon Sep 17 00:00:00 2001 From: qwp0905 Date: Thu, 30 Apr 2026 13:19:47 +0900 Subject: [PATCH 12/13] remove guard for cache block read --- src/cursor/gc.rs | 22 +++++++++++++++------- src/cursor/tree_manager.rs | 15 +++++++-------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/src/cursor/gc.rs b/src/cursor/gc.rs index 3ececb4..9f387ae 100644 --- a/src/cursor/gc.rs +++ b/src/cursor/gc.rs @@ -5,6 +5,8 @@ use std::{ time::Duration, }; +use crossbeam::epoch; + use super::{DataEntry, RecordData, VersionRecord}; use crate::{ cache::BlockCache, @@ -17,7 +19,6 @@ use crate::{ utils::{DoubleBuffer, ToArc, ToBox}, wal::{TxId, RESERVED_TX}, }; -use crossbeam::epoch::pin; pub struct GarbageCollectionConfig { pub thread_count: usize, @@ -85,12 +86,15 @@ impl GarbageCollector { self.version_visibility.remove_aborted(&min_version); - // must release after triming because of trim type can contain release type. + // must release after trimming because of trim type can contain release type. // it could occur dangling pointer reference. + release .into_iter() .filter(|(_, table)| !table.is_closed()) - .for_each(|((_, ptr), table)| table.free().dealloc(ptr)); + .for_each(|((_, ptr), table)| { + epoch::pin().defer(move || table.free().dealloc(ptr)) + }); Ok(()) } @@ -192,7 +196,10 @@ const fn run_entry( let release = |record: VersionRecord| { if let RecordData::Chunked(pointers) = record.data { - pointers.into_iter().for_each(|p| table.free().dealloc(p)); + for ptr in pointers { + let handle = table.handle(); + epoch::pin().defer(move || handle.free().dealloc(ptr)); + } } }; @@ -256,13 +263,14 @@ const fn run_entry( let next_entry: DataEntry = block_cache .peek(next, table.handle())? - .for_read(&pin()) + .for_read() .as_ref() .deserialize()?; recorder.serialize_and_log(RESERVED_TX, table_id, &mut slot, &next_entry)?; ptr = Some(i); - table.free().dealloc(next); + let handle = table.handle(); + epoch::pin().defer(move || handle.free().dealloc(next)); } Ok(()) } @@ -275,7 +283,7 @@ const fn run_check( Ok( block_cache .peek(pointer, table)? - .for_read(&pin()) + .for_read() .as_ref() .deserialize::()? .is_empty(), diff --git a/src/cursor/tree_manager.rs b/src/cursor/tree_manager.rs index 9c9d88f..9065b4e 100644 --- a/src/cursor/tree_manager.rs +++ b/src/cursor/tree_manager.rs @@ -1,6 +1,6 @@ use std::{collections::HashSet, ops::Bound, sync::Arc, time::Duration}; -use crossbeam::{epoch::pin, queue::SegQueue}; +use crossbeam::queue::SegQueue; use super::{ after_compaction, handle_compaction, wait_compaction, BTreeIndex, BTreeNode, @@ -299,14 +299,14 @@ fn run_merge_leaf( let mut ptr = block_cache .peek(HEADER_POINTER, table.handle())? - .for_read(&pin()) + .for_read() .as_ref() .deserialize::()? .get_root(); while let BTreeNodeView::Internal(node) = block_cache .peek(ptr, table.handle())? - .for_read(&pin()) + .for_read() .as_ref() .view::()? { @@ -316,8 +316,7 @@ fn run_merge_leaf( let mut next_ptr = Some(ptr); while let Some(i) = next_ptr.take() { { - let guard = pin(); - let slot = block_cache.peek(i, table.handle())?.for_read(&guard); + let slot = block_cache.peek(i, table.handle())?.for_read(); let leaf = slot.as_ref().view::()?.as_leaf()?; if !gc .batch_check_empty( @@ -424,7 +423,7 @@ fn release_orphaned( let mut visited = HashSet::::from_iter([HEADER_POINTER]); let root = block_cache .read(HEADER_POINTER, table.clone())? - .for_read(&pin()) + .for_read() .as_ref() .deserialize::()? .get_root(); @@ -435,7 +434,7 @@ fn release_orphaned( visited.insert(ptr); match block_cache .read(ptr, table.clone())? - .for_read(&pin()) + .for_read() .as_ref() .view::()? { @@ -460,7 +459,7 @@ fn release_orphaned( visited.insert(ptr); let entry: DataEntry = block_cache .read(ptr, table.clone())? - .for_read(&pin()) + .for_read() .as_ref() .deserialize()?; for record in entry.get_versions() { From f127783d4607ce6f90f3fe0229ac9a754a1ed729 Mon Sep 17 00:00:00 2001 From: qwp0905 Date: Thu, 30 Apr 2026 15:12:33 +0900 Subject: [PATCH 13/13] remove rwlock in blocks and add exclusive pin --- src/cache/block.rs | 37 +++++++++++++++++++++++++++++++------ src/cache/temp.rs | 44 ++++++++++++++++++++++++++++++-------------- 2 files changed, 61 insertions(+), 20 deletions(-) diff --git a/src/cache/block.rs b/src/cache/block.rs index c11abd3..9bfc256 100644 --- a/src/cache/block.rs +++ b/src/cache/block.rs @@ -1,14 +1,21 @@ -use std::sync::{Arc, Mutex, MutexGuard, RwLock}; +use std::{ + cell::UnsafeCell, + panic::RefUnwindSafe, + sync::{Arc, Mutex, MutexGuard}, +}; + +use crossbeam::utils::Backoff; use crate::{ disk::{PageRef, Pointer, PAGE_SIZE}, table::TableHandle, thread::TaskHandle, - utils::{ShortenedMutex, ShortenedRwLock, ToArc}, + utils::{ExclusivePin, ShortenedMutex, ToArc, UnsafeBorrow}, }; pub struct CachedBlock { - page: RwLock>>, + page: UnsafeCell>>, + page_pin: ExclusivePin, pointer: Pointer, handle: Arc, latch: Mutex<()>, @@ -21,7 +28,8 @@ impl CachedBlock { handle: Arc, ) -> Self { Self { - page: RwLock::new(page.to_arc()), + page: UnsafeCell::new(page.to_arc()), + page_pin: ExclusivePin::new(), pointer, handle, latch: Mutex::new(()), @@ -35,10 +43,24 @@ impl CachedBlock { #[inline] pub fn load_page(&self) -> Arc> { - self.page.rl().clone() + let backoff = Backoff::new(); + loop { + if let Some(_token) = self.page_pin.try_shared() { + return self.page.get().borrow_unsafe().clone(); + } + backoff.snooze(); + } } pub fn store(&self, page: PageRef) { - *self.page.wl() = page.to_arc(); + let page = page.to_arc(); + let backoff = Backoff::new(); + loop { + if let Some(_token) = self.page_pin.try_exclusive() { + let _ = unsafe { self.page.get().replace(page) }; + return; + } + backoff.snooze(); + } } #[inline] @@ -59,3 +81,6 @@ impl CachedBlock { &self.handle } } +unsafe impl Send for CachedBlock {} +unsafe impl Sync for CachedBlock {} +impl RefUnwindSafe for CachedBlock {} diff --git a/src/cache/temp.rs b/src/cache/temp.rs index 1bf3540..aa4a20e 100644 --- a/src/cache/temp.rs +++ b/src/cache/temp.rs @@ -6,15 +6,15 @@ use std::{ ptr::drop_in_place, sync::{ atomic::{AtomicBool, Ordering}, - Arc, Mutex, MutexGuard, RwLock, + Arc, Mutex, MutexGuard, }, }; +use crossbeam::utils::Backoff; + use crate::{ disk::{PageRef, PAGE_SIZE}, - utils::{ - ExclusivePin, ExclusiveToken, SharedToken, ShortenedMutex, ShortenedRwLock, ToArc, - }, + utils::{ExclusivePin, ExclusiveToken, SharedToken, ShortenedMutex, ToArc}, }; /** @@ -29,8 +29,9 @@ use crate::{ * the LRU eviction path. */ pub struct TempBlockState { - pin: ExclusivePin, - page: MaybeUninit>>>, + block_pin: ExclusivePin, + page: MaybeUninit>>, + page_pin: ExclusivePin, dirty: AtomicBool, latch: Mutex<()>, initialized: Cell, @@ -39,7 +40,8 @@ pub struct TempBlockState { impl TempBlockState { pub fn new() -> Self { Self { - pin: ExclusivePin::new(), + block_pin: ExclusivePin::new(), + page_pin: ExclusivePin::new(), page: MaybeUninit::uninit(), initialized: Cell::new(false), dirty: AtomicBool::new(false), @@ -48,21 +50,35 @@ impl TempBlockState { } pub fn load_page<'a>(&self) -> Arc> { - unsafe { self.page.assume_init_ref() }.rl().clone() + let backoff = Backoff::new(); + loop { + if let Some(_token) = self.page_pin.try_shared() { + return unsafe { self.page.assume_init_ref() }.clone(); + } + backoff.snooze(); + } } pub fn store(&self, page: PageRef) { - *unsafe { self.page.assume_init_ref() }.wl() = page.to_arc(); + let page = page.to_arc(); + let backoff = Backoff::new(); + loop { + if let Some(_token) = self.page_pin.try_shared() { + let _ = unsafe { self.cast().replace(page) }; + return; + } + backoff.snooze(); + } } pub fn init(&self, page: PageRef) { self.initialized.set(true); - unsafe { self.cast().write(RwLock::new(page.to_arc())) } + unsafe { self.cast().write(page.to_arc()) } } #[inline] - const fn cast(&self) -> *mut RwLock>> { - self.page.as_ptr() as *mut RwLock>> + const fn cast(&self) -> *mut Arc> { + self.page.as_ptr() as *mut Arc> } #[inline] @@ -76,7 +92,7 @@ impl TempBlockState { #[inline] pub fn try_pin(&self) -> Option> { - self.pin.try_shared() + self.block_pin.try_shared() } #[inline] @@ -134,7 +150,7 @@ impl<'a> TempBlockRef<'a, SharedToken<'a>> { impl<'a> TempBlockRef<'a, ExclusiveToken<'a>> { #[inline] pub fn exclusive(state: &Arc) -> Self { - let token = state.pin.try_exclusive().unwrap(); + let token = state.block_pin.try_exclusive().unwrap(); Self { state: state.clone(), token: unsafe { transmute(token) },