Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions src/cache/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,9 @@ impl<'a> CacheSlot<'a> {
}
}
#[inline]
pub fn for_read<'b>(self) -> ReadonlySlot<'b>
where
'a: 'b,
{
ReadonlySlot {
page: self.block.load_page(),
_token: self.token,
}
pub fn for_read(self) -> ReadonlySlot {
let page = self.block.load_page();
ReadonlySlot { page }
}

#[inline]
Expand Down Expand Up @@ -110,16 +105,22 @@ impl<'a> WritableSlot<'a> {
}
}

pub struct ReadonlySlot<'a> {
pub struct ReadonlySlot {
page: Arc<PageRef<PAGE_SIZE>>,
_token: SharedToken<'a>,
}
impl<'a> AsRef<Page<PAGE_SIZE>> for ReadonlySlot<'a> {
impl AsRef<Page<PAGE_SIZE>> for ReadonlySlot {
#[inline]
fn as_ref(&self) -> &Page<PAGE_SIZE> {
&self.page
}
}
impl Clone for ReadonlySlot {
fn clone(&self) -> Self {
Self {
page: self.page.clone(),
}
}
}

impl<'a> Drop for WritableSlot<'a> {
fn drop(&mut self) {
Expand Down
126 changes: 62 additions & 64 deletions src/cursor/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use crate::{
use crossbeam::epoch::{pin, Guard};

use super::{
BTreeNode, BTreeNodeView, CreatablePolicy, DataChunk, DataEntry, InternalNode, Key,
KeyRef, LeafNode, NodeFindResult, ReadonlyPolicy, RecordData, TreeHeader,
VersionRecord, WritablePolicy, CHUNK_SIZE, HEADER_POINTER, LARGE_VALUE,
BTreeNode, BTreeNodeView, CreatablePolicy, DataChunk, DataChunkView, DataEntry,
DataEntryView, InternalNode, LeafNode, NodeFindResult, ReadonlyPolicy, RecordData,
RecordDataView, StaticKey, StaticKeyRef, TreeHeader, VecRef, VersionRecord,
WritablePolicy, CHUNK_SIZE, HEADER_POINTER, LARGE_VALUE,
};

pub struct BTreeIndex<Policy>(Policy);
Expand All @@ -21,7 +22,7 @@ impl<Policy> BTreeIndex<Policy> {
impl<Policy: ReadonlyPolicy> BTreeIndex<Policy> {
fn get_entry(
&self,
key: KeyRef,
key: StaticKeyRef,
table: &Arc<TableHandle>,
) -> Result<Option<(Guard, Pointer)>> {
let mut ptr = self
Expand Down Expand Up @@ -52,26 +53,23 @@ impl<Policy: ReadonlyPolicy> BTreeIndex<Policy> {
policy: &Policy,
pointers: &[Pointer],
table: &Arc<TableHandle>,
) -> Result<Vec<u8>> {
) -> Result<VecRef> {
let mut data = Vec::new();

for &ptr in pointers {
let chunk: DataChunk = policy
.fetch_slot(ptr, table)?
.for_read()
.as_ref()
.deserialize()?;
let slot = policy.fetch_slot(ptr, table)?.for_read();
let chunk: DataChunkView = slot.as_ref().view()?;
data.extend_from_slice(chunk.get_data());
}

Ok(data)
Ok(VecRef::copied(data))
}

pub fn get(
&self,
key: KeyRef,
key: StaticKeyRef,
table: &Arc<TableHandle>,
) -> Result<Option<Option<Vec<u8>>>> {
) -> Result<Option<Option<VecRef>>> {
let (mut _guard, ptr) = match self.get_entry(key, table)? {
Some(v) => v,
None => return Ok(None),
Expand All @@ -80,22 +78,18 @@ impl<Policy: ReadonlyPolicy> BTreeIndex<Policy> {
let mut next = Some(ptr);
while let Some(ptr) = next.take() {
let new_guard = pin();
let entry: DataEntry = self
.0
.fetch_slot(ptr, table)?
.for_read()
.as_ref()
.deserialize()?;
let slot = self.0.fetch_slot(ptr, table)?.for_read();
let entry: DataEntryView = slot.as_ref().deserialize()?;

if let Some(record) =
entry.find(|&record| self.0.is_visible(record.owner, record.version))
{
return Ok(Some(match &record.data {
RecordData::Data(data) => Some(data.to_vec()),
RecordData::Chunked(pointers) => {
RecordDataView::Data(s, e) => Some(VecRef::refed(&slot, *s, *e)),
RecordDataView::Chunked(pointers) => {
Some(Self::read_chunk(&self.0, pointers, table)?)
}
RecordData::Tombstone => None,
RecordDataView::Tombstone => None,
}));
}

Expand All @@ -106,7 +100,7 @@ impl<Policy: ReadonlyPolicy> BTreeIndex<Policy> {
Ok(None)
}

pub fn contains(&self, key: KeyRef, table: &Arc<TableHandle>) -> Result<bool> {
pub fn contains(&self, key: StaticKeyRef, table: &Arc<TableHandle>) -> Result<bool> {
let (mut _guard, ptr) = match self.get_entry(key, table)? {
Some(v) => v,
None => return Ok(false),
Expand All @@ -115,7 +109,7 @@ impl<Policy: ReadonlyPolicy> BTreeIndex<Policy> {
let mut next = Some(ptr);
while let Some(ptr) = next.take() {
let new_guard = pin();
let entry: DataEntry = self
let entry: DataEntryView = self
.0
.fetch_slot(ptr, table)?
.for_read()
Expand All @@ -126,8 +120,8 @@ impl<Policy: ReadonlyPolicy> BTreeIndex<Policy> {
entry.find(|&record| self.0.is_visible(record.owner, record.version))
{
return Ok(match &record.data {
RecordData::Chunked(_) | RecordData::Data(_) => true,
RecordData::Tombstone => false,
RecordDataView::Chunked(_) | RecordDataView::Data(_, _) => true,
RecordDataView::Tombstone => false,
});
};

Expand All @@ -140,7 +134,7 @@ impl<Policy: ReadonlyPolicy> BTreeIndex<Policy> {

fn find_leaf_stack(
&self,
key: KeyRef,
key: StaticKeyRef,
table: &Arc<TableHandle>,
) -> Result<(Pointer, Vec<Pointer>)> {
let (mut ptr, height) = {
Expand Down Expand Up @@ -174,8 +168,8 @@ impl<Policy: ReadonlyPolicy> BTreeIndex<Policy> {
pub fn scan(
&self,
table: &Arc<TableHandle>,
start: &Bound<Key>,
end: &Bound<Key>,
start: &Bound<StaticKey>,
end: &Bound<StaticKey>,
) -> Result<BTreeIterator<'_, Policy>> {
BTreeIterator::open(&self.0, table, start, end)
}
Expand All @@ -197,11 +191,11 @@ impl<Policy: WritablePolicy> BTreeIndex<Policy> {

fn apply_split(
&self,
evicted_key: Key,
evicted_key: StaticKey,
evicted_ptr: Pointer,
current: Pointer,
table: &Arc<TableHandle>,
) -> Result<Option<(Key, Pointer)>> {
) -> Result<Option<(StaticKey, Pointer)>> {
let mut ptr = current;

let (mut slot, mut internal) = loop {
Expand Down Expand Up @@ -235,13 +229,13 @@ impl<Policy: WritablePolicy> BTreeIndex<Policy> {

fn create_entry<F>(
&self,
key: KeyRef,
key: StaticKeyRef,
pos: usize,
slot: &mut WritableSlot,
mut node: LeafNode,
table: &Arc<TableHandle>,
create_record: F,
) -> Result<Option<(Key, Pointer)>>
) -> Result<Option<(StaticKey, Pointer)>>
where
F: FnOnce() -> VersionRecord,
{
Expand Down Expand Up @@ -269,7 +263,7 @@ impl<Policy: WritablePolicy> BTreeIndex<Policy> {

fn propagate_split(
&self,
mut split_key: Key,
mut split_key: StaticKey,
mut split_pointer: Pointer,
mut stack: Vec<Pointer>,
table: &Arc<TableHandle>,
Expand Down Expand Up @@ -374,7 +368,7 @@ impl<Policy: WritablePolicy> BTreeIndex<Policy> {
let record = VersionRecord::new(
snapshot.owner,
snapshot.version,
self.create_record(snapshot.value, table)?,
self.create_record(snapshot.value.into_vec(), table)?,
);
let (mut ptr, stack) = self.find_leaf_stack(&key, table)?;

Expand Down Expand Up @@ -406,7 +400,7 @@ where
{
pub fn insert_record(
&self,
key: Key,
key: StaticKey,
record: RecordData,
table: &Arc<TableHandle>,
) -> Result {
Expand All @@ -433,10 +427,15 @@ where
self.propagate_split(mid_key, right_ptr, stack, table)?;
Ok(())
}
pub fn insert(&self, key: Key, data: Vec<u8>, table: &Arc<TableHandle>) -> Result {
pub fn insert(
&self,
key: StaticKey,
data: Vec<u8>,
table: &Arc<TableHandle>,
) -> Result {
self.insert_record(key, self.create_record(data, table)?, table)
}
pub fn remove(&self, key: KeyRef, table: &Arc<TableHandle>) -> Result {
pub fn remove(&self, key: StaticKeyRef, table: &Arc<TableHandle>) -> Result {
self.insert_record_if_matched(key, RecordData::Tombstone, table)
}

Expand Down Expand Up @@ -482,7 +481,7 @@ where

pub fn insert_if_matched(
&self,
key: KeyRef,
key: StaticKeyRef,
data: Vec<u8>,
table: &Arc<TableHandle>,
) -> Result {
Expand All @@ -491,7 +490,7 @@ where

fn insert_record_if_matched(
&self,
key: KeyRef,
key: StaticKeyRef,
record: RecordData,
table: &Arc<TableHandle>,
) -> Result {
Expand Down Expand Up @@ -526,23 +525,23 @@ where
}

enum Buffered {
Data(Vec<u8>),
Data(VecRef),
Chunked(Vec<Pointer>),
}

pub struct KVSnapshot {
key: Key,
value: Vec<u8>,
key: VecRef,
value: VecRef,
owner: TxId,
version: TxId,
}

pub struct BTreeIterator<'a, Policy> {
policy: &'a Policy,
table: Arc<TableHandle>,
buffered: VecDeque<(Key, Option<(Buffered, TxId, TxId)>)>,
buffered: VecDeque<(VecRef, Option<(Buffered, TxId, TxId)>)>,
next: Option<Pointer>,
end: Bound<Key>,
end: Bound<StaticKey>,
closed: bool,
}
impl<'a, Policy> BTreeIterator<'a, Policy>
Expand All @@ -552,8 +551,8 @@ where
pub fn open(
policy: &'a Policy,
table: &Arc<TableHandle>,
start: &Bound<Key>,
end: &Bound<Key>,
start: &Bound<StaticKey>,
end: &Bound<StaticKey>,
) -> Result<Self> {
let mut ptr = policy
.fetch_slot(HEADER_POINTER, table)?
Expand Down Expand Up @@ -595,11 +594,11 @@ where
};

let mut count = 0;
for (k, p) in node.get_entries_while(end).skip(pos) {
for (s, e, p) in node.get_entries_while(end).skip(pos) {
count += 1;

if let Some(found) = Self::__find(policy, p, table)? {
buffered.push_back((k.to_vec(), found));
buffered.push_back((VecRef::refed(&slot, s, e), found));
}
}

Expand Down Expand Up @@ -629,25 +628,24 @@ where
let mut next = Some(ptr);

while let Some(ptr) = next.take() {
let entry: DataEntry = policy
.fetch_slot(ptr, table)?
.for_read()
.as_ref()
.deserialize()?;
let slot = policy.fetch_slot(ptr, table)?.for_read();
let entry: DataEntryView = slot.as_ref().deserialize()?;

if let Some(record) =
entry.find(|record| policy.is_visible(record.owner, record.version))
{
return Ok(Some(match &record.data {
RecordData::Data(data) => {
Some((Buffered::Data(data.to_vec()), record.owner, record.version))
}
RecordData::Chunked(pointers) => Some((
RecordDataView::Data(s, e) => Some((
Buffered::Data(VecRef::refed(&slot, *s, *e)),
record.owner,
record.version,
)),
RecordDataView::Chunked(pointers) => Some((
Buffered::Chunked(pointers.to_vec()),
record.owner,
record.version,
)),
RecordData::Tombstone => None,
RecordDataView::Tombstone => None,
}));
}

Expand Down Expand Up @@ -677,10 +675,10 @@ where
let node = slot.as_ref().view::<BTreeNodeView>()?.as_leaf()?;

let mut count = 0;
for (k, p) in node.get_entries_while(&self.end) {
for (s, e, p) in node.get_entries_while(&self.end) {
count += 1;
if let Some(found) = self.find_value(p)? {
self.buffered.push_back((k.to_vec(), found))
self.buffered.push_back((VecRef::refed(&slot, s, e), found))
}
}

Expand All @@ -690,7 +688,7 @@ where
Ok(())
}

fn next_record(&mut self) -> Result<Option<(Key, Option<(Vec<u8>, TxId, TxId)>)>> {
fn next_record(&mut self) -> Result<Option<(VecRef, Option<(VecRef, TxId, TxId)>)>> {
loop {
if self.closed {
return Ok(None);
Expand Down Expand Up @@ -732,7 +730,7 @@ where
}
}

pub fn next_kv_skip_tombstone(&mut self) -> Result<Option<(Key, Vec<u8>)>> {
pub fn next_kv_skip_tombstone(&mut self) -> Result<Option<(VecRef, VecRef)>> {
loop {
match self.next_record()? {
Some((key, Some((value, _, _)))) => return Ok(Some((key, value))),
Expand All @@ -742,7 +740,7 @@ where
}
}

pub fn next_kv(&mut self) -> Result<Option<(Key, Option<Vec<u8>>)>> {
pub fn next_kv(&mut self) -> Result<Option<(VecRef, Option<VecRef>)>> {
match self.next_record()? {
Some((k, v)) => Ok(Some((k, v.map(|v| v.0)))),
None => Ok(None),
Expand Down
Loading
Loading