diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index 70e88e0731e..8ca3e01d95c 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -1554,6 +1554,35 @@ def test_bitmap_index(tmp_path: Path): assert indices[0]["type"] == "Bitmap" +def test_btree_remap_big_deletions(tmp_path: Path): + # Write 15K rows in 3 fragments + ds = lance.write_dataset(pa.table({"a": range(5000)}), tmp_path) + ds = lance.write_dataset( + pa.table({"a": range(5000, 10000)}), tmp_path, mode="append" + ) + ds = lance.write_dataset( + pa.table({"a": range(10000, 15000)}), tmp_path, mode="append" + ) + + # Create index (will have 4 pages) + ds.create_scalar_index("a", index_type="BTREE") + + # Delete a lot of data (now there will only be two pages worth) + ds.delete("a > 1000 AND a < 10000") + + # Run compaction (deletions will be materialized) + ds.optimize.compact_files() + + # Reload dataset and ensure index still works + ds = lance.dataset(tmp_path) + + for idx in [0, 500, 1000, 10000, 13000, 14000, 14999]: + assert ds.to_table(filter=f"a = {idx}").num_rows == 1 + + for idx in [1001, 5000, 8000, 9999]: + assert ds.to_table(filter=f"a = {idx}").num_rows == 0 + + def test_bitmap_remap(tmp_path: Path): # Make one full fragment tbl = pa.Table.from_arrays( diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index dbf3b99c088..469092f962a 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -1271,13 +1271,6 @@ impl BTreeIndex { Schema::new(vec![value_field, row_id_field]) } - // For legacy reasons a btree index expects the serialized schema to be values/ids - fn flat_schema(&self) -> Schema { - let value_field = Field::new(BTREE_VALUES_COLUMN, self.data_type.clone(), true); - let row_id_field = Field::new(BTREE_IDS_COLUMN, DataType::UInt64, false); - Schema::new(vec![value_field, row_id_field]) - } - /// Create a stream of all the data in the index, in the same format used to train the index async fn into_data_stream(self) -> Result { let lazy_reader = LazyIndexReader::new(self.store.clone(), self.ranges_to_files.clone()); @@ -1526,38 +1519,69 @@ impl ScalarIndex for BTreeIndex { mapping: &HashMap>, dest_store: &dyn IndexStore, ) -> Result { - let part_page_files: Vec<&str> = if let Some(ranges_to_files) = &self.ranges_to_files { - // Range-based Index: Directly collect references to the file paths. - ranges_to_files - .iter() - .map(|(_, (path, _))| path.as_str()) - .collect() - } else { - // Basic Index: There is only one source page file. - vec![BTREE_PAGES_NAME] - }; + // (part_id, path) + // The part_id is None for a basic index + // For a range-based index we use Some(0), Some(1), ... + // even if those weren't the original part ids + let part_page_files: Vec<(Option, &str)> = + if let Some(ranges_to_files) = &self.ranges_to_files { + // Range-based Index: Directly collect references to the file paths. + ranges_to_files + .iter() + .enumerate() + .map(|(part_id, (_, (path, _)))| (Some(part_id as u32), path.as_str())) + .collect() + } else { + // Basic Index: There is only one source page file. + vec![(None, BTREE_PAGES_NAME)] + }; - for page_file in part_page_files { - // Remap and write the pages - let schema = Arc::new(self.flat_schema()); - let mut sub_index_file = dest_store.new_index_file(page_file, schema).await?; + let mapping = Arc::new(mapping.clone()); + let train_schema = Arc::new(self.train_schema()); + // TODO: Could potentially parallelize this across parts, unclear it would be worth it + for (part_id, page_file) in part_page_files { + // Retrain on the remapped pages let sub_index_reader = self.store.open_index_file(page_file).await?; - let mut reader_stream = IndexReaderStream::new(sub_index_reader, self.batch_size) + let mapping = mapping.clone(); + + let train_schema_clone = train_schema.clone(); + let train_schema = train_schema.clone(); + + let remapped_stream = IndexReaderStream::new(sub_index_reader, self.batch_size) .await - .buffered(self.store.io_parallelism()); - while let Some(serialized) = reader_stream.try_next().await? { - let remapped = FlatIndex::remap_batch(serialized, mapping)?; - sub_index_file.write_record_batch(remapped).await?; - } + .buffered(self.store.io_parallelism()) + .map_err(DataFusionError::from) + .and_then(move |batch| { + // Remap the batch and then convert from the serialized schema to the training input schema + let remapped = + FlatIndex::remap_batch(batch, &mapping).map_err(DataFusionError::from); + let with_train_schema = remapped.and_then(|batch| { + RecordBatch::try_new(train_schema.clone(), batch.columns().to_vec()) + .map_err(DataFusionError::from) + }); + std::future::ready(with_train_schema) + }); + + let remapped_stream = Box::pin(RecordBatchStreamAdapter::new( + train_schema_clone, + remapped_stream, + )); - sub_index_file.finish().await?; + train_btree_index(remapped_stream, dest_store, self.batch_size, None, part_id).await?; } - // Copy the lookup file as-is - self.store - .copy_index_file(BTREE_LOOKUP_NAME, dest_store) - .await?; + if let Some(ranges_to_files) = &self.ranges_to_files { + let num_parts = ranges_to_files.len(); + // Merge the lookups if we are a range-based index + let page_files = (0..num_parts) + .map(|part_id| part_page_data_file_path((part_id as u64) << 32)) + .collect::>(); + let lookup_files = (0..num_parts) + .map(|part_id| part_lookup_file_path((part_id as u64) << 32)) + .collect::>(); + merge_metadata_files(dest_store, &page_files, &lookup_files, None).await?; + } Ok(CreatedIndex { index_details: prost_types::Any::from_msg(&pbold::BTreeIndexDetails::default()) @@ -1719,10 +1743,6 @@ fn btree_stats_as_batch(stats: Vec, value_type: &DataType) -> Resu } /// Train a btree index from a stream of sorted page-size batches of values and row ids -/// -/// Note: This is likely to change. It is unreasonable to expect the caller to do the sorting -/// and re-chunking into page-size batches. This is left for simplicity as this feature is still -/// a work in progress pub async fn train_btree_index( batches_source: SendableRecordBatchStream, index_store: &dyn IndexStore, @@ -1833,7 +1853,13 @@ pub async fn merge_index_files( // List all partition page / lookup files in the index directory let (part_page_files, part_lookup_files) = list_page_lookup_files(object_store, index_dir).await?; - merge_metadata_files(store, &part_page_files, &part_lookup_files, batch_readhead).await + merge_metadata_files( + store.as_ref(), + &part_page_files, + &part_lookup_files, + batch_readhead, + ) + .await } /// List and filter files from the index directory @@ -1884,7 +1910,7 @@ async fn list_page_lookup_files( /// - For fragment-based indices, it performs a full K-way sort-merge of page files to create new global page and lookup files. /// - For range-based indices, it concatenates lookup files, as data is already globally sorted. async fn merge_metadata_files( - store: Arc, + store: &dyn IndexStore, part_page_files: &[String], part_lookup_files: &[String], batch_readhead: Option, @@ -1962,7 +1988,7 @@ async fn merge_metadata_files( // Step 4: Merge pages and lookups and generate new index files if range_partitioned { merge_range_partitioned_lookups( - &store, + store, part_lookup_files, lookup_schema, metadata, @@ -1972,7 +1998,7 @@ async fn merge_metadata_files( .await } else { merge_pages_and_lookups( - &store, + store, part_page_files, part_lookup_files, &page_files_map, @@ -2015,7 +2041,7 @@ async fn merge_metadata_files( /// The final, merged `_page_lookup.lance` will have a single `page_idx` column containing /// `[0, 1, 2, 3, 4, 5, 6]`. async fn merge_range_partitioned_lookups( - store: &Arc, + store: &dyn IndexStore, part_lookup_files: &[String], lookup_schema: Arc, mut metadata: HashMap, @@ -2064,7 +2090,7 @@ async fn merge_range_partitioned_lookups( /// writes a new global page file, and generates a corresponding global lookup file. #[allow(clippy::too_many_arguments)] async fn merge_pages_and_lookups( - store: &Arc, + store: &dyn IndexStore, part_page_files: &[String], part_lookup_files: &[String], page_files_map: &HashMap, @@ -2155,7 +2181,7 @@ fn add_offset_to_page_idx(batch: &RecordBatch, offset: u32) -> Result, - store: &Arc, + store: &dyn IndexStore, batch_size: u64, page_file: &mut Box, arrow_schema: Arc, @@ -2295,7 +2321,7 @@ fn extract_partition_id(filename: &str) -> Result { /// This function safely deletes partition lookup and page files after a successful merge operation. /// File deletion failures are logged but do not affect the overall success of the merge operation. async fn cleanup_partition_files( - store: &Arc, + store: &dyn IndexStore, part_lookup_files: &[String], part_page_files: &[String], ) { @@ -2328,7 +2354,7 @@ async fn cleanup_partition_files( /// /// Performs safety checks on the filename pattern before attempting deletion. async fn cleanup_single_file( - store: &Arc, + store: &dyn IndexStore, file_name: &str, expected_prefix: &str, expected_suffix: &str, @@ -2858,7 +2884,7 @@ mod tests { ]; super::merge_metadata_files( - fragment_store.clone(), + fragment_store.as_ref(), &part_page_files, &part_lookup_files, Option::from(1usize), @@ -3067,7 +3093,7 @@ mod tests { ]; super::merge_metadata_files( - fragment_store.clone(), + fragment_store.as_ref(), &part_page_files, &part_lookup_files, Option::from(1usize), @@ -3417,7 +3443,7 @@ mod tests { // The cleanup function should handle both valid and invalid file patterns gracefully // This test mainly verifies that the function doesn't panic and handles edge cases - super::cleanup_partition_files(&test_store, &lookup_files, &page_files).await; + super::cleanup_partition_files(test_store.as_ref(), &lookup_files, &page_files).await; } #[tokio::test] @@ -3590,7 +3616,7 @@ mod tests { range_store.as_ref(), DEFAULT_BTREE_BATCH_SIZE, None, - Option::from(1u32), + Option::from(0u32), ) .await .unwrap(); @@ -3617,24 +3643,24 @@ mod tests { range_store.as_ref(), DEFAULT_BTREE_BATCH_SIZE, None, - Option::from(2u32), + Option::from(1u32), ) .await .unwrap(); // Merge the fragment files let part_page_files = vec![ + part_page_data_file_path(0 << 32), part_page_data_file_path(1 << 32), - part_page_data_file_path(2 << 32), ]; let part_lookup_files = vec![ + part_lookup_file_path(0 << 32), part_lookup_file_path(1 << 32), - part_lookup_file_path(2 << 32), ]; super::merge_metadata_files( - range_store.clone(), + range_store.as_ref(), &part_page_files, &part_lookup_files, Option::from(1usize), @@ -3951,7 +3977,7 @@ mod tests { ]; super::merge_metadata_files( - old_store.clone(), + old_store.as_ref(), &part_page_files, &part_lookup_files, Option::from(1usize), @@ -4020,4 +4046,122 @@ mod tests { } } } + + /// Rust equivalent of Python test `test_btree_remap_big_deletions` + /// + /// This test verifies that btree index remapping works correctly when a large + /// portion of the data is deleted. The Python test: + /// 1. Writes 15K rows in 3 fragments (values 0-14999) + /// 2. Creates a btree index (will have multiple pages) + /// 3. Deletes rows where a > 1000 AND a < 10000 (deletes values 1001-9999) + /// 4. Runs compaction (materializes deletions via remap) + /// 5. Verifies the index still works for remaining values + #[tokio::test] + async fn test_btree_remap_big_deletions() { + let tmpdir = TempObjDir::default(); + let test_store = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + tmpdir.clone(), + Arc::new(LanceCache::no_cache()), + )); + + // Generate 15000 rows with values 0-14999 and row_ids 0-14999 + // Using a smaller batch size to ensure we get multiple pages + let batch_size = 4096; + let total_rows = 15000; + + let stream = gen_batch() + .col("value", array::step::()) + .col("_rowid", array::step::()) + .into_df_stream(RowCount::from(total_rows), BatchCount::from(1)); + + train_btree_index(stream, test_store.as_ref(), batch_size, None, None) + .await + .unwrap(); + + let index = BTreeIndex::load(test_store.clone(), None, &LanceCache::no_cache()) + .await + .unwrap(); + + // Create a mapping that simulates deleting rows where value > 1000 AND value < 10000 + // Since values match row_ids in our test data: + // - Rows 0-1000 (values 0-1000) are kept with same row_ids + // - Rows 1001-9999 (values 1001-9999) are deleted (mapped to None) + // - Rows 10000-14999 (values 10000-14999) are remapped to new row_ids 1001-5999 + let mut mapping: HashMap> = HashMap::new(); + + // Mark deleted rows (values 1001-9999) + for old_id in 1001..10000 { + mapping.insert(old_id, None); + } + + let mut new_id_counter = 100_000; + + // Remap all other rows + for old_id in (0..1000).chain(10000..15000) { + let new_id = new_id_counter; + new_id_counter += 1; + mapping.insert(old_id, Some(new_id)); + } + + let remap_dir = TempObjDir::default(); + let remap_store = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + remap_dir.clone(), + Arc::new(LanceCache::no_cache()), + )); + + // Remap the index with our deletion mapping + index.remap(&mapping, remap_store.as_ref()).await.unwrap(); + + let remapped_index = BTreeIndex::load(remap_store.clone(), None, &LanceCache::no_cache()) + .await + .unwrap(); + + // Verify values that should exist (values 0-1000 and 10000-14999) + // These correspond to: original values 0-1000 at row_ids 0-1000 + // and original values 10000-14999 at new row_ids 1001-5999 + let should_exist = vec![0, 500, 1000, 10000, 13000, 14000, 14999]; + for value in should_exist { + let query = SargableQuery::Equals(ScalarValue::Int32(Some(value))); + let result = remapped_index + .search(&query, &NoOpMetricsCollector) + .await + .unwrap(); + match result { + SearchResult::Exact(row_id_map) => { + assert!( + !row_id_map.is_empty(), + "Value {} should exist in remapped index but was not found", + value + ); + } + _ => { + panic!("Btree search result should always be Exact."); + } + } + } + + // Verify values that should NOT exist (values 1001-9999 were deleted) + let should_not_exist = vec![1001, 5000, 8000, 9999]; + for value in should_not_exist { + let query = SargableQuery::Equals(ScalarValue::Int32(Some(value))); + let result = remapped_index + .search(&query, &NoOpMetricsCollector) + .await + .unwrap(); + match result { + SearchResult::Exact(row_id_map) => { + assert!( + row_id_map.is_empty(), + "Value {} should NOT exist in remapped index but was found", + value + ); + } + _ => { + panic!("Btree search result should always be Exact."); + } + } + } + } }