Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions java/lance-jni/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ fn convert_to_java_operation_inner<'local>(
mem_wal_to_merge: _,
fields_for_preserving_frag_bitmap,
update_mode,
inserted_rows_filter: _,
} => {
let removed_ids: Vec<JLance<i64>> = removed_fragment_ids
.iter()
Expand Down Expand Up @@ -1043,6 +1044,7 @@ fn convert_to_rust_operation(
mem_wal_to_merge: None,
fields_for_preserving_frag_bitmap,
update_mode,
inserted_rows_filter: None,
}
}
"DataReplacement" => {
Expand Down
43 changes: 43 additions & 0 deletions protos/transaction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,46 @@ message Transaction {
optional string branch_name = 5;
}

// Exact set of key hashes for conflict detection.
Comment thread
jackye1995 marked this conversation as resolved.
// Used when the number of inserted rows is small.
message ExactKeySetFilter {
// 64-bit hashes of the inserted row keys.
repeated uint64 key_hashes = 1;
}

// Bloom filter for key existence tests.
// Used when the number of rows is large.
message BloomFilter {
// Bitset backing the bloom filter (SBBF format).
bytes bitmap = 1;
// Number of bits in the bitmap.
uint32 num_bits = 2;
// Number of items the filter was sized for.
// Used for intersection validation (filters with different sizes cannot be compared).
// Default: 8192
uint64 number_of_items = 3;
// False positive probability the filter was sized for.
// Used for intersection validation (filters with different parameters cannot be compared).
// Default: 0.00057
double probability = 4;
}

// A filter for checking key existence in set of rows inserted by a merge insert operation.
// Only created when the merge insert's ON columns match the schema's unenforced primary key.
// The presence of this filter indicates strict primary key conflict detection should be used.
// Can use either an exact set (for small row counts) or a Bloom filter (for large row counts).
message KeyExistenceFilter {
// Field IDs of columns participating in the key (must match unenforced primary key).
repeated int32 field_ids = 1;
// The underlying data structure storing the key hashes.
oneof data {
// Exact set of key hashes (used for small number of rows).
ExactKeySetFilter exact = 2;
// Bloom filter (used for large number of rows).
BloomFilter bloom = 3;
}
}

// An operation that updates rows but does not add or remove rows.
message Update {
// The fragments that have been removed. These are fragments where all rows
Expand All @@ -202,6 +242,9 @@ message Transaction {
repeated uint32 fields_for_preserving_frag_bitmap = 6;
// The mode of update
UpdateMode update_mode = 7;
// Filter for checking existence of keys in newly inserted rows, used for conflict detection.
// Only tracks keys from INSERT operations during merge insert, not updates.
optional KeyExistenceFilter inserted_rows = 8;
}

// The mode of update operation
Expand Down
1 change: 1 addition & 0 deletions python/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ impl FromPyObject<'_> for PyLance<Operation> {
mem_wal_to_merge: None,
fields_for_preserving_frag_bitmap,
update_mode,
inserted_rows_filter: None,
};
Ok(Self(op))
}
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/scalar/bloomfilter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::scalar::{
use crate::{pb, Any};
use arrow_array::{Array, UInt64Array};
mod as_bytes;
mod sbbf;
pub mod sbbf;
use arrow_schema::{DataType, Field};
use serde::{Deserialize, Serialize};

Expand Down
64 changes: 64 additions & 0 deletions rust/lance-index/src/scalar/bloomfilter/sbbf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,70 @@ impl Sbbf {
pub fn estimated_memory_size(&self) -> usize {
self.blocks.capacity() * std::mem::size_of::<Block>()
}

/// Check if this filter might intersect with another filter.
/// Returns true if there's at least one bit position where both filters have a 1.
/// This is a fast check that may return false positives but never false negatives.
///
/// Returns an error if the filters have different sizes, as bloom filters with
/// different configurations cannot be reliably compared.
pub fn might_intersect(&self, other: &Self) -> Result<bool> {
if self.blocks.len() != other.blocks.len() {
return Err(SbbfError::InvalidData {
message: format!(
"Cannot compare bloom filters with different sizes: {} blocks vs {} blocks. \
Both filters must use the same configuration.",
self.blocks.len(),
other.blocks.len()
),
});
}
for i in 0..self.blocks.len() {
for j in 0..8 {
if (self.blocks[i][j] & other.blocks[i][j]) != 0 {
return Ok(true);
}
}
}
Ok(false)
}

/// Check if this filter might intersect with a raw bitmap.
/// The bitmap should be in the same format as produced by to_bytes().
///
/// Returns an error if the bitmaps have different sizes, as bloom filters with
/// different configurations cannot be reliably compared.
pub fn might_intersect_bytes(&self, other_bytes: &[u8]) -> Result<bool> {
Self::bytes_might_intersect(&self.to_bytes(), other_bytes)
}

/// Check if two raw bloom filter bitmaps might intersect.
/// Returns true if there's at least one bit position where both filters have a 1.
///
/// This is a fast probabilistic check: if it returns false, the filters definitely
/// have no common elements. If it returns true, they might have common elements
/// (with possible false positives).
///
/// Returns an error if the bitmaps have different sizes, as bloom filters with
/// different configurations cannot be reliably compared.
pub fn bytes_might_intersect(a: &[u8], b: &[u8]) -> Result<bool> {
if a.len() != b.len() {
return Err(SbbfError::InvalidData {
message: format!(
"Cannot compare bloom filters with different sizes: {} bytes vs {} bytes. \
Both filters must use the same configuration.",
a.len(),
b.len()
),
});
}
for i in 0..a.len() {
if (a[i] & b[i]) != 0 {
return Ok(true);
}
}
Ok(false)
}
}

// Per spec we use xxHash with seed=0
Expand Down
2 changes: 2 additions & 0 deletions rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2858,6 +2858,7 @@ mod tests {
mem_wal_to_merge: None,
fields_for_preserving_frag_bitmap: vec![],
update_mode: Some(UpdateMode::RewriteColumns),
inserted_rows_filter: None,
};
let mut dataset1 = Dataset::commit(
test_uri,
Expand Down Expand Up @@ -2930,6 +2931,7 @@ mod tests {
mem_wal_to_merge: None,
fields_for_preserving_frag_bitmap: vec![],
update_mode: Some(UpdateMode::RewriteColumns),
inserted_rows_filter: None,
};
let dataset2 = Dataset::commit(
test_uri,
Expand Down
14 changes: 14 additions & 0 deletions rust/lance/src/dataset/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
//! the operation does not modify the region of the column being replaced.
//!

use super::write::merge_insert::inserted_rows::KeyExistenceFilter;
use super::{blob::BLOB_VERSION_CONFIG_KEY, ManifestWriteConfig};
use crate::dataset::transaction::UpdateMode::RewriteRows;
use crate::index::mem_wal::update_mem_wal_index_in_indices_list;
Expand Down Expand Up @@ -251,6 +252,9 @@ pub enum Operation {
fields_for_preserving_frag_bitmap: Vec<u32>,
/// The mode of update
update_mode: Option<UpdateMode>,
/// Optional filter for detecting conflicts on inserted row keys.
/// Only tracks keys from INSERT operations during merge insert, not updates.
inserted_rows_filter: Option<KeyExistenceFilter>,
},

/// Project to a new schema. This only changes the schema, not the data.
Expand Down Expand Up @@ -449,6 +453,7 @@ impl PartialEq for Operation {
mem_wal_to_merge: a_mem_wal_to_merge,
fields_for_preserving_frag_bitmap: a_fields_for_preserving_frag_bitmap,
update_mode: a_update_mode,
inserted_rows_filter: a_inserted_rows_filter,
},
Self::Update {
removed_fragment_ids: b_removed,
Expand All @@ -458,6 +463,7 @@ impl PartialEq for Operation {
mem_wal_to_merge: b_mem_wal_to_merge,
fields_for_preserving_frag_bitmap: b_fields_for_preserving_frag_bitmap,
update_mode: b_update_mode,
inserted_rows_filter: b_inserted_rows_filter,
},
) => {
compare_vec(a_removed, b_removed)
Expand All @@ -470,6 +476,7 @@ impl PartialEq for Operation {
b_fields_for_preserving_frag_bitmap,
)
&& a_update_mode == b_update_mode
&& a_inserted_rows_filter == b_inserted_rows_filter
}
(Self::Project { schema: a }, Self::Project { schema: b }) => a == b,
(
Expand Down Expand Up @@ -1707,6 +1714,7 @@ impl Transaction {
mem_wal_to_merge,
fields_for_preserving_frag_bitmap,
update_mode,
..
} => {
// Extract existing fragments once for reuse
let existing_fragments = maybe_existing_fragments?;
Expand Down Expand Up @@ -2888,6 +2896,7 @@ impl TryFrom<pb::Transaction> for Transaction {
mem_wal_to_merge,
fields_for_preserving_frag_bitmap,
update_mode,
inserted_rows,
})) => Operation::Update {
removed_fragment_ids,
updated_fragments: updated_fragments
Expand All @@ -2906,6 +2915,9 @@ impl TryFrom<pb::Transaction> for Transaction {
1 => Some(UpdateMode::RewriteColumns),
_ => Some(UpdateMode::RewriteRows),
},
inserted_rows_filter: inserted_rows
.map(|ik| KeyExistenceFilter::try_from(&ik))
.transpose()?,
},
Some(pb::transaction::Operation::Project(pb::transaction::Project { schema })) => {
Operation::Project {
Expand Down Expand Up @@ -3216,6 +3228,7 @@ impl From<&Transaction> for pb::Transaction {
mem_wal_to_merge,
fields_for_preserving_frag_bitmap,
update_mode,
inserted_rows_filter,
} => pb::transaction::Operation::Update(pb::transaction::Update {
removed_fragment_ids: removed_fragment_ids.clone(),
updated_fragments: updated_fragments
Expand All @@ -3233,6 +3246,7 @@ impl From<&Transaction> for pb::Transaction {
UpdateMode::RewriteColumns => 1,
})
.unwrap_or(0),
inserted_rows: inserted_rows_filter.as_ref().map(|ik| ik.into()),
}),
Operation::Project { schema } => {
pb::transaction::Operation::Project(pb::transaction::Project {
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/dataset/write/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@ mod tests {
mem_wal_to_merge: None,
fields_for_preserving_frag_bitmap: vec![],
update_mode: None,
inserted_rows_filter: None,
},
read_version: 1,
tag: None,
Expand Down
Loading
Loading