Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ println!("get p99: {}µs", m.operation_get_latency_micros_p99);
| Option | Description |
|--------|-------------|
| `wal_file_size` | Size limit of a single WAL segment file. When exceeded, a new segment is created. Larger segments improve write throughput by reducing rotation/checkpoint frequency, but extend recovery time on crash since more records must be replayed before the engine becomes available. |
| `wal_buffer_size` | Soft limit of WAL buffer size. |
| `io_thread_count` | Number of background IO worker threads shared across tables for write batching. Each table holds at most one worker at a time. |
| `wal_segment_flush_delay` | Maximum time to wait before triggering a checkpoint for WAL segment reuse. |
| `wal_segment_flush_count` | Maximum number of commits to buffer before triggering a checkpoint for WAL segment reuse. |
Expand Down
9 changes: 9 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ where
base_path,
io_thread_count: DEFAULT_IO_THREAD_COUNT,
wal_file_size: DEFAULT_WAL_FILE_SIZE,
wal_buffer_size: DEFAULT_WAL_BUFFER_SIZE,
wal_segment_flush_count: DEFAULT_WAL_SEGMENT_FLUSH_COUNT,
wal_segment_flush_delay: DEFAULT_WAL_SEGMENT_FLUSH_DELAY,
checkpoint_interval: DEFAULT_CHECKPOINT_INTERVAL,
Expand Down Expand Up @@ -47,6 +48,13 @@ where
self.0.wal_file_size = size;
self
}
/**
* Soft limit of WAL buffer size.
*/
pub const fn wal_buffer_size(mut self, size: usize) -> Self {
self.0.wal_buffer_size = size;
self
}
/**
* WAL segment reuse requires a checkpoint to confirm durability.
* A checkpoint fires when either the commit count or the delay is reached,
Expand Down Expand Up @@ -148,6 +156,7 @@ where
}

const DEFAULT_WAL_FILE_SIZE: usize = 512 << 20; // 512 mb
const DEFAULT_WAL_BUFFER_SIZE: usize = 8 << 20;
const DEFAULT_WAL_SEGMENT_FLUSH_DELAY: Duration = Duration::from_secs(10);
const DEFAULT_WAL_SEGMENT_FLUSH_COUNT: usize = 32;
const DEFAULT_CHECKPOINT_INTERVAL: Duration = Duration::from_secs(60);
Expand Down
60 changes: 32 additions & 28 deletions src/cache/block.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
use std::sync::{atomic::Ordering, Arc, Mutex, MutexGuard};
use std::{
cell::UnsafeCell,
panic::RefUnwindSafe,
sync::{Arc, Mutex, MutexGuard},
};

use crossbeam::epoch::{pin, Atomic, Guard, Owned, Shared};
use crossbeam::utils::Backoff;

use crate::{
disk::{PageRef, Pointer, PAGE_SIZE},
table::TableHandle,
thread::TaskHandle,
utils::{ShortenedMutex, UnsafeBorrow},
utils::{ExclusivePin, ShortenedMutex, ToArc, UnsafeBorrow},
};

pub struct CachedBlock {
page: Atomic<PageRef<PAGE_SIZE>>,
page: UnsafeCell<Arc<PageRef<PAGE_SIZE>>>,
page_pin: ExclusivePin,
pointer: Pointer,
handle: Arc<TableHandle>,
latch: Mutex<()>,
Expand All @@ -23,7 +28,8 @@ impl CachedBlock {
handle: Arc<TableHandle>,
) -> Self {
Self {
page: Atomic::new(page),
page: UnsafeCell::new(page.to_arc()),
page_pin: ExclusivePin::new(),
pointer,
handle,
latch: Mutex::new(()),
Expand All @@ -36,47 +42,45 @@ impl CachedBlock {
}

#[inline]
pub fn load_page<'a>(&self, guard: &'a Guard) -> *const PageRef<PAGE_SIZE> {
self.page.load(Ordering::Acquire, guard).as_raw()
pub fn load_page(&self) -> Arc<PageRef<PAGE_SIZE>> {
let backoff = Backoff::new();
loop {
if let Some(_token) = self.page_pin.try_shared() {
return self.page.get().borrow_unsafe().clone();
}
backoff.snooze();
}
}
pub fn store(&self, page: PageRef<PAGE_SIZE>) {
let g = pin();
let old = self.page.swap(Owned::new(page), Ordering::Release, &g);
if old.is_null() {
return;
let page = page.to_arc();
let backoff = Backoff::new();
loop {
if let Some(_token) = self.page_pin.try_exclusive() {
let _ = unsafe { self.page.get().replace(page) };
return;
}
backoff.snooze();
}
unsafe { g.defer_destroy(old) };
}

#[inline]
pub fn latch(&self) -> MutexGuard<'_, ()> {
self.latch.l()
}

/**
* Guard must be live until async write is done.
*/
#[inline]
pub fn flush_async<'a>(&self, guard: &'a Guard) -> TaskHandle<()> {
pub fn flush_async(&self) -> TaskHandle<()> {
self
.handle
.disk()
.write_async(self.pointer, self.load_page(guard).borrow_unsafe())
.write_async(self.pointer, self.load_page())
}

#[inline]
pub const fn handle(&self) -> &Arc<TableHandle> {
&self.handle
}
}

impl Drop for CachedBlock {
fn drop(&mut self) {
let g = pin();
let old = self.page.swap(Shared::null(), Ordering::Release, &g);
if old.is_null() {
return;
}
unsafe { g.defer_destroy(old) };
}
}
unsafe impl Send for CachedBlock {}
unsafe impl Sync for CachedBlock {}
impl RefUnwindSafe for CachedBlock {}
21 changes: 9 additions & 12 deletions src/cache/cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::{mem::MaybeUninit, ptr::drop_in_place, sync::Arc, time::Duration};

use crossbeam::epoch::{self, Guard};

use super::{Acquired, CacheSlot, CachedBlock, DirtyTables, LRUTable, Peeked};
use crate::{
debug,
Expand All @@ -13,9 +11,6 @@ use crate::{
utils::{AtomicBitmap, ExclusivePin, SharedToken, ToArc, ToBox},
};

const PRE_FLUSH_THRESHOLD: usize = 100;
const PRE_FLUSH_INTERVAL: Duration = Duration::from_millis(500);

pub struct BlockCacheConfig {
pub shard_count: usize,
pub capacity: usize,
Expand Down Expand Up @@ -118,7 +113,7 @@ impl BlockCache {

let mut page = self.page_pool.acquire();
handle.disk().read(pointer, &mut page)?;
state_ref.store(page);
state_ref.init(page);
Ok(CacheSlot::temp(
state_ref.downgrade(),
pointer,
Expand Down Expand Up @@ -154,7 +149,7 @@ impl BlockCache {

let old = unsafe { ptr.replace(new_block) };
if self.dirty_frames.contains(block_id) {
old.flush_async(&epoch::pin()).wait()?;
old.flush_async().wait()?;
self.dirty_frames.remove(block_id);
self.dirty_tables.mark(old.handle());
}
Expand Down Expand Up @@ -199,10 +194,13 @@ impl Drop for BlockCache {
}
}

fn __flush(waiting: &mut Vec<(TaskHandle<()>, Guard)>) -> Result {
waiting.drain(..).map(|(w, _guard)| w.wait()).collect()
fn __flush(waiting: &mut Vec<TaskHandle<()>>) -> Result {
waiting.drain(..).map(|w| w.wait()).collect()
}
const MAX_BATCHING: usize = PRE_FLUSH_THRESHOLD;

const PRE_FLUSH_INTERVAL: Duration = Duration::from_millis(500);
const PRE_FLUSH_THRESHOLD: usize = 100;
const MAX_BATCHING: usize = 16;

const fn handle_flush(
blocks: Arc<Vec<MaybeUninit<CachedBlock>>>,
Expand Down Expand Up @@ -237,8 +235,7 @@ const fn handle_flush(
// buffer and sort them, then batch into a single pwritev call.
// Writing synchronously one by one would bypass this optimization
// and issue a separate syscall per page.
let guard = epoch::pin();
waits.push((block.flush_async(&guard), guard));
waits.push(block.flush_async());
dirty_tables.mark(block.handle());
}

Expand Down
Loading
Loading