Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions java/lance-jni/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ impl FromJObjectWithEnv<IndexMetadata> for JObject<'_> {
created_at,
base_id,
files: None,
invalidated_fragment_bitmap: None,
})
}
}
Expand Down
30 changes: 30 additions & 0 deletions protos/table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,36 @@ message IndexMetadata {
// of index sizes without extra IO.
// If this is empty, the index files sizes are unknown.
repeated IndexFile files = 10;

// A bitmap of fragment IDs that were removed from fragment_bitmap because
// the indexed column values changed in those fragments but the index data
// was not rewritten to reflect the change. The index data still contains
// stale entries for these fragments and readers must mask out any index
// results from these fragments when querying the index.
//
// Several operations can introduce invalidated fragments:
//
// - A partial-schema merge_insert modifies indexed column values in a
// fragment without rewriting the index.
// - A data replacement operation changes column values without
// rewriting the index.
// - Some index types do not rewrite the full index during incremental
// updates (e.g. the FTS inverted index), so stale entries for
// replaced fragments may persist after an optimize.
//
// Future operations that modify indexed data without a full index
// rebuild may also add entries here.
//
// When absent, there are no invalidated fragments. Always cleared when
// the index is fully rebuilt. An optimize operation may or may not clear
// this depending on the index type — index types that rewrite all data
// during optimize (e.g. btree) will clear it, while index types that
// perform incremental merges (e.g. FTS) may retain invalidated
// fragments that were not reprocessed.
//
// The bitmap is stored as a 32-bit Roaring bitmap (same encoding as
// fragment_bitmap).
bytes invalidated_fragment_bitmap = 11;
}

// Metadata about a single file within an index segment.
Expand Down
1 change: 1 addition & 0 deletions python/src/indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ async fn do_load_shuffled_vectors(
created_at: Some(Utc::now()),
base_id: None,
files: Some(files),
invalidated_fragment_bitmap: None,
};
ds.commit_existing_index_segments(index_name, column, vec![metadata])
.await
Expand Down
1 change: 1 addition & 0 deletions python/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl FromPyObject<'_> for PyLance<IndexMetadata> {
created_at,
base_id,
files,
invalidated_fragment_bitmap: None,
}))
}
}
Expand Down
56 changes: 44 additions & 12 deletions rust/lance-table/src/format/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ pub struct IndexMetadata {
/// This is None if the file sizes are unknown. This happens for indices created
/// before this field was added.
pub files: Option<Vec<IndexFile>>,

/// Fragment IDs that were removed from `fragment_bitmap` because indexed
/// column values changed but the index data was not rewritten. The index
/// still contains stale entries for these fragments; readers must treat
/// all rows from them as deleted when querying the index.
///
/// `None` means no fragments have been invalidated. Always cleared on a
/// full index rebuild. An optimize may or may not clear this depending
/// on the index type (see proto comment for details).
pub invalidated_fragment_bitmap: Option<RoaringBitmap>,
}

impl IndexMetadata {
Expand Down Expand Up @@ -132,6 +142,11 @@ impl DeepSizeOf for IndexMetadata {
.map(|fragment_bitmap| fragment_bitmap.serialized_size())
.unwrap_or(0)
+ self.files.deep_size_of_children(context)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is line 144 sort of a no-op at this point? The last line is all that will be returned right and it doesn't make use of line 144?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lines are all added together (there is a + at the start of line 145)

+ self
.invalidated_fragment_bitmap
.as_ref()
.map(|bitmap| bitmap.serialized_size())
.unwrap_or(0)
}
}

Expand Down Expand Up @@ -162,6 +177,14 @@ impl TryFrom<pb::IndexMetadata> for IndexMetadata {
)
};

let invalidated_fragment_bitmap = if proto.invalidated_fragment_bitmap.is_empty() {
None
} else {
Some(RoaringBitmap::deserialize_from(
&mut proto.invalidated_fragment_bitmap.as_slice(),
)?)
};

Ok(Self {
uuid: proto.uuid.as_ref().map(Uuid::try_from).ok_or_else(|| {
Error::invalid_input("uuid field does not exist in Index metadata".to_string())
Expand All @@ -178,20 +201,18 @@ impl TryFrom<pb::IndexMetadata> for IndexMetadata {
}),
base_id: proto.base_id,
files,
invalidated_fragment_bitmap,
})
}
}

impl From<&IndexMetadata> for pb::IndexMetadata {
fn from(idx: &IndexMetadata) -> Self {
impl TryFrom<&IndexMetadata> for pb::IndexMetadata {
type Error = Error;

fn try_from(idx: &IndexMetadata) -> Result<Self> {
let mut fragment_bitmap = Vec::new();
if let Some(bitmap) = &idx.fragment_bitmap
&& let Err(e) = bitmap.serialize_into(&mut fragment_bitmap)
{
// In theory, this should never error. But if we do, just
// recover gracefully.
log::error!("Failed to serialize fragment bitmap: {}", e);
fragment_bitmap.clear();
if let Some(bitmap) = &idx.fragment_bitmap {
bitmap.serialize_into(&mut fragment_bitmap)?;
}

let files = idx
Expand All @@ -208,7 +229,12 @@ impl From<&IndexMetadata> for pb::IndexMetadata {
})
.unwrap_or_default();

Self {
let mut invalidated_fragment_bitmap = Vec::new();
if let Some(bitmap) = &idx.invalidated_fragment_bitmap {
bitmap.serialize_into(&mut invalidated_fragment_bitmap)?;
}

Ok(Self {
uuid: Some((&idx.uuid).into()),
name: idx.name.clone(),
fields: idx.fields.clone(),
Expand All @@ -222,7 +248,8 @@ impl From<&IndexMetadata> for pb::IndexMetadata {
created_at: idx.created_at.map(|dt| dt.timestamp_millis() as u64),
base_id: idx.base_id,
files,
}
invalidated_fragment_bitmap,
})
}
}

Expand All @@ -244,7 +271,10 @@ fn serialize_index_metadata(
.downcast_ref::<Vec<IndexMetadata>>()
.expect("index_metadata_codec: wrong type (this is a bug in the cache layer)");
let section = pb::IndexSection {
indices: vec.iter().map(pb::IndexMetadata::from).collect(),
indices: vec
.iter()
.map(pb::IndexMetadata::try_from)
.collect::<lance_core::Result<_>>()?,
};
writer.write_all(&section.encode_to_vec())?;
Ok(())
Expand Down Expand Up @@ -319,6 +349,7 @@ mod tests {
path: "index.idx".to_string(),
size_bytes: 1024,
}]),
invalidated_fragment_bitmap: Some(RoaringBitmap::from_iter([4, 5])),
},
IndexMetadata {
uuid: Uuid::new_v4(),
Expand All @@ -331,6 +362,7 @@ mod tests {
created_at: None,
base_id: Some(7),
files: None,
invalidated_fragment_bitmap: None,
},
];

Expand Down
5 changes: 4 additions & 1 deletion rust/lance-table/src/io/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ async fn do_write_manifest(
// Write indices if presented.
if let Some(indices) = indices.as_ref() {
let section = pb::IndexSection {
indices: indices.iter().map(|i| i.into()).collect(),
indices: indices
.iter()
.map(pb::IndexMetadata::try_from)
.collect::<Result<_>>()?,
};
let pos = writer.write_protobuf(&section).await?;
manifest.index_section = Some(pos);
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3196,7 +3196,7 @@ pub(crate) async fn write_manifest_file(
object_store,
write_manifest_file_to_path,
naming_scheme,
transaction.take().map(|tx| tx.into()),
transaction.take().map(|tx| tx.try_into()).transpose()?,
)
.await
}
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/dataset/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1582,6 +1582,7 @@ mod tests {
created_at: None,
base_id: None,
files: None,
invalidated_fragment_bitmap: None,
}
}

Expand Down
2 changes: 2 additions & 0 deletions rust/lance/src/dataset/mem_wal/memtable/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ impl MemTableFlusher {
created_at: None,
base_id: None,
files: None,
invalidated_fragment_bitmap: None,
};

// Commit the index to the dataset
Expand Down Expand Up @@ -721,6 +722,7 @@ impl MemTableFlusher {
created_at: Some(chrono::Utc::now()),
index_version: 1,
files: None,
invalidated_fragment_bitmap: None,
};

Ok(index_meta)
Expand Down
6 changes: 6 additions & 0 deletions rust/lance/src/dataset/optimize/remapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ async fn remap_index(dataset: &mut Dataset, index_id: &Uuid) -> Result<()> {
created_at: curr_index_meta.created_at,
base_id: None,
files: curr_index_meta.files.clone(),
invalidated_fragment_bitmap: curr_index_meta
.invalidated_fragment_bitmap
.clone(),
},
RemapResult::Remapped(remapped_index) => IndexMetadata {
uuid: remapped_index.new_id,
Expand All @@ -298,6 +301,9 @@ async fn remap_index(dataset: &mut Dataset, index_id: &Uuid) -> Result<()> {
created_at: curr_index_meta.created_at,
base_id: None,
files: remapped_index.files,
invalidated_fragment_bitmap: curr_index_meta
.invalidated_fragment_bitmap
.clone(),
},
};

Expand Down
Loading
Loading