Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions src/cache/block.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::Arc;

use super::{Latch, LatchGuard};

use crate::{
disk::{PageRef, Pointer, PAGE_SIZE},
table::TableHandle,
thread::TaskHandle,
utils::{AtomicArc, ShortenedMutex},
utils::AtomicArc,
};

pub struct CachedBlock {
page: AtomicArc<PageRef<PAGE_SIZE>>,
pointer: Pointer,
handle: Arc<TableHandle>,
latch: Mutex<()>,
latch: Latch,
}
impl CachedBlock {
#[inline]
Expand All @@ -24,7 +26,7 @@ impl CachedBlock {
page: AtomicArc::new(page),
pointer,
handle,
latch: Mutex::new(()),
latch: Latch::new(),
}
}

Expand All @@ -42,8 +44,12 @@ impl CachedBlock {
}

#[inline]
pub fn latch(&self) -> MutexGuard<'_, ()> {
self.latch.l()
pub fn latch(&self) -> LatchGuard<'_> {
self.latch.lock_immediately()
}
#[inline]
pub fn lazy_latch(&self) -> LatchGuard<'_> {
self.latch.lock_lazily()
}

#[inline]
Expand Down
75 changes: 75 additions & 0 deletions src/cache/latch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::sync::{Condvar, Mutex};

use crate::utils::ShortenedMutex;

struct State {
locked: bool,
high_waiters: usize,
}

pub struct Latch {
state: Mutex<State>,
high_cv: Condvar,
low_cv: Condvar,
}
impl Latch {
pub const fn new() -> Self {
Self {
state: Mutex::new(State {
locked: false,
high_waiters: 0,
}),
high_cv: Condvar::new(),
low_cv: Condvar::new(),
}
}

pub fn lock_immediately<'a, 'b>(&'a self) -> LatchGuard<'b>
where
'a: 'b,
{
let mut state = self.state.l();
while state.locked {
state.high_waiters += 1;
state = self.high_cv.wait(state).unwrap();
state.high_waiters -= 1;
}
state.locked = true;

LatchGuard(self)
}

pub fn lock_lazily<'a, 'b>(&'a self) -> LatchGuard<'b>
where
'a: 'b,
{
let mut state = self.state.l();
while state.locked || state.high_waiters > 0 {
state = self.low_cv.wait(state).unwrap();
}
state.locked = true;

LatchGuard(self)
}

fn unlock(&self) {
let mut state = self.state.l();
state.locked = false;
if state.high_waiters > 0 {
self.high_cv.notify_one();
} else {
self.low_cv.notify_one();
}
}
}

pub struct LatchGuard<'a>(&'a Latch);
impl<'a> Drop for LatchGuard<'a> {
fn drop(&mut self) {
self.0.unlock();
}
}

#[cfg(test)]
#[path = "tests/latch.rs"]
mod tests;
3 changes: 3 additions & 0 deletions src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ use block::*;

mod dirty;
use dirty::*;

mod latch;
use latch::*;
51 changes: 47 additions & 4 deletions src/cache/slot.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{
mem::{transmute, ManuallyDrop},
sync::{Arc, MutexGuard},
sync::Arc,
};

use super::{BlockId, CachedBlock, DirtyTables, TempBlockRef, TempGuard};
use super::{BlockId, CachedBlock, DirtyTables, LatchGuard, TempBlockRef, TempGuard};
use crate::{
disk::{Page, PagePool, PageRef, Pointer, PAGE_SIZE},
table::TableHandle,
Expand Down Expand Up @@ -73,6 +73,16 @@ impl<'a> CacheSlot<'a> {
CacheSlot::Page(page) => WritableSlot::Page(page.for_write()),
}
}
#[inline]
pub fn for_lazy_write<'b>(self) -> WritableSlot<'b>
where
'a: 'b,
{
match self {
CacheSlot::Temp(temp) => WritableSlot::Temp(temp.for_lazy_write()),
CacheSlot::Page(page) => WritableSlot::Page(page.for_lazy_write()),
}
}
}
pub enum WritableSlot<'a> {
Temp(TempSlotWrite<'a>),
Expand Down Expand Up @@ -156,14 +166,31 @@ impl<'a> PageSlot<'a> {
_latch,
}
}

fn for_lazy_write<'b>(self) -> PageSlotWrite<'b>
where
'a: 'b,
{
let mut shadow = self.page_pool.acquire();
let _latch = self.block.lazy_latch();
shadow.copy_from(self.block.load_page().as_ref().as_ref());
PageSlotWrite {
shadow: ManuallyDrop::new(shadow),
block: self.block,
dirty: self.dirty,
block_id: self.block_id,
_token: self.token,
_latch,
}
}
}
pub struct PageSlotWrite<'a> {
shadow: ManuallyDrop<PageRef<PAGE_SIZE>>,
block: &'a CachedBlock,
dirty: &'a AtomicBitmap,
block_id: BlockId,
_token: SharedToken<'a>,
_latch: MutexGuard<'a, ()>,
_latch: LatchGuard<'a>,
}

impl<'a> Drop for PageSlotWrite<'a> {
Expand Down Expand Up @@ -213,6 +240,22 @@ impl<'a> TempSlot<'a> {
let latch = unsafe { transmute(self.state.latch()) };
shadow.copy_from(self.state.load_page().as_ref().as_ref());

TempSlotWrite {
shadow: ManuallyDrop::new(shadow),
state: ManuallyDrop::new(self.state),
pointer: self.pointer,
guard: self.guard,
latch: ManuallyDrop::new(latch),
}
}
fn for_lazy_write<'b>(self) -> TempSlotWrite<'b>
where
'a: 'b,
{
let mut shadow = self.page_pool.acquire();
let latch = unsafe { transmute(self.state.lazy_latch()) };
shadow.copy_from(self.state.load_page().as_ref().as_ref());

TempSlotWrite {
shadow: ManuallyDrop::new(shadow),
state: ManuallyDrop::new(self.state),
Expand All @@ -237,7 +280,7 @@ pub struct TempSlotWrite<'a> {
state: ManuallyDrop<TempBlockRef<'a, SharedToken<'a>>>,
pointer: Pointer,
guard: Option<(TempGuard<'a>, Arc<TableHandle>, &'a DirtyTables)>,
latch: ManuallyDrop<MutexGuard<'a, ()>>,
latch: ManuallyDrop<LatchGuard<'a>>,
}

impl<'a> TempSlotWrite<'a> {
Expand Down
17 changes: 11 additions & 6 deletions src/cache/temp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ use std::{
ops::Deref,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex, MutexGuard,
Arc,
},
};

use super::{Latch, LatchGuard};
use crate::{
disk::{PageRef, PAGE_SIZE},
utils::{AtomicArc, ExclusivePin, ExclusiveToken, SharedToken, ShortenedMutex},
utils::{AtomicArc, ExclusivePin, ExclusiveToken, SharedToken},
};

/**
Expand All @@ -29,7 +30,7 @@ pub struct TempBlockState {
page: OnceCell<AtomicArc<PageRef<PAGE_SIZE>>>,
block_pin: ExclusivePin,
dirty: AtomicBool,
latch: Mutex<()>,
latch: Latch,
}

impl TempBlockState {
Expand All @@ -38,7 +39,7 @@ impl TempBlockState {
page: OnceCell::new(),
block_pin: ExclusivePin::new(),
dirty: AtomicBool::new(false),
latch: Mutex::new(()),
latch: Latch::new(),
}
}

Expand Down Expand Up @@ -71,8 +72,12 @@ impl TempBlockState {
}

#[inline]
pub fn latch(&self) -> MutexGuard<'_, ()> {
self.latch.l()
pub fn latch(&self) -> LatchGuard<'_> {
self.latch.lock_immediately()
}
#[inline]
pub fn lazy_latch(&self) -> LatchGuard<'_> {
self.latch.lock_lazily()
}
}

Expand Down
53 changes: 53 additions & 0 deletions src/cache/tests/latch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::{
sync::atomic::{AtomicUsize, Ordering},
thread::{scope, yield_now},
};

use crossbeam::queue::SegQueue;

use super::*;

#[test]
fn test_priority() {
let l = Latch::new();
let v = SegQueue::new();
let c = AtomicUsize::new(0);

let ic = 5;
let lc = 5;
scope(|s| {
let _g = l.lock_immediately();

let mut th = vec![];

for _ in 0..lc {
let t = s.spawn(|| {
c.fetch_add(1, Ordering::Release);
let _g = l.lock_lazily();
v.push(2);
});
th.push(t)
}

for _ in 0..ic {
let t = s.spawn(|| {
c.fetch_add(1, Ordering::Release);
let _g = l.lock_immediately();
v.push(1);
});

th.push(t)
}

while c.load(Ordering::Acquire) < ic + lc {
yield_now();
}

drop(_g);

th.into_iter().for_each(|t| t.join().unwrap());
});

let result = vec![1, 1, 1, 1, 1, 2, 2, 2, 2, 2];
assert_eq!(v.into_iter().collect::<Vec<_>>(), result)
}
2 changes: 1 addition & 1 deletion src/cursor/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ const fn run_entry(
continue;
}

let mut slot = block_cache.peek(ptr, table.handle())?.for_write();
let mut slot = block_cache.peek(ptr, table.handle())?.for_lazy_write();
let mut entry: DataEntry = slot.as_ref().deserialize()?;

let prev_len = entry.len();
Expand Down
4 changes: 2 additions & 2 deletions src/cursor/tree_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ fn run_merge_leaf(
}
}

let mut slot = block_cache.peek(i, table.handle())?.for_write();
let mut slot = block_cache.peek(i, table.handle())?.for_lazy_write();
let mut leaf = slot.as_ref().deserialize::<BTreeNode>()?.as_leaf()?;
next_ptr = leaf.get_next();

Expand Down Expand Up @@ -376,7 +376,7 @@ fn run_merge_leaf(
// merge without propagating to internal nodes.
debug!("trying to start merge {} with {}", slot.get_pointer(), next);

let mut next_slot = block_cache.peek(next, table.handle())?.for_write();
let mut next_slot = block_cache.peek(next, table.handle())?.for_lazy_write();
let next_leaf = next_slot.as_ref().deserialize::<BTreeNode>()?;
leaf.set_next(slot.get_pointer());

Expand Down
18 changes: 0 additions & 18 deletions src/transaction/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,24 +155,6 @@ pub struct VersionVisibility {
snapshot_id: AtomicU8,
}
impl VersionVisibility {
pub fn new<T>(
base_path: PathBuf,
aborted: T,
last_tx_id: TxId,
last_snapshot_id: u8,
) -> Self
where
T: IntoIterator<Item = TxId>,
{
Self {
base_path,
active: SkipMap::new(),
aborted: SkipSet::from_iter(aborted),
last_tx_id: AtomicTxId::new(last_tx_id),
snapshot_id: AtomicU8::new(last_snapshot_id),
}
}

pub fn replay(
base_path: PathBuf,
last_tx_id: TxId,
Expand Down
Loading