From 0a8efc9fce0855cd54857beb4ea99eba46f16eb6 Mon Sep 17 00:00:00 2001 From: majin1102 Date: Sun, 24 Aug 2025 23:20:09 +0800 Subject: [PATCH 01/13] init --- protos/table.proto | 4 + rust/lance-table/src/format/index.rs | 52 +- rust/lance-table/src/format/manifest.rs | 17 +- rust/lance/src/dataset.rs | 26 + rust/lance/src/dataset/optimize/remapping.rs | 1 + rust/lance/src/index.rs | 496 +++++++++++++++++- rust/lance/src/index/create.rs | 1 + rust/lance/src/index/frag_reuse.rs | 1 + rust/lance/src/index/mem_wal.rs | 1 + rust/lance/src/index/scalar.rs | 6 +- rust/lance/src/index/vector.rs | 14 +- rust/lance/src/index/vector/ivf.rs | 1 + rust/lance/src/io/commit.rs | 34 +- rust/lance/src/io/commit/conflict_resolver.rs | 1 + 14 files changed, 601 insertions(+), 54 deletions(-) diff --git a/protos/table.proto b/protos/table.proto index 70a6459cbd8..5e9c3998613 100644 --- a/protos/table.proto +++ b/protos/table.proto @@ -224,6 +224,10 @@ message IndexMetadata { // This field is optional for backward compatibility. For existing indices created before // this field was added, this will be None/null. optional uint64 created_at = 8; + + // The base path index of the data file. Used when the file is imported or referred from another dataset. + // Lance use it as key of the base_paths field in Manifest to determine the actual base path of the data file. + optional uint32 base_id = 9; } // Index Section, containing a list of index metadata for one dataset version. diff --git a/rust/lance-table/src/format/index.rs b/rust/lance-table/src/format/index.rs index 021f1d5ba29..e0d28dfd7ac 100644 --- a/rust/lance-table/src/format/index.rs +++ b/rust/lance-table/src/format/index.rs @@ -11,6 +11,30 @@ use uuid::Uuid; use super::pb; use lance_core::{Error, Result}; +impl Index { + pub fn effective_fragment_bitmap( + &self, + existing_fragments: &RoaringBitmap, + ) -> Option { + let fragment_bitmap = self.fragment_bitmap.as_ref()?; + Some(fragment_bitmap & existing_fragments) + } +} + +impl DeepSizeOf for Index { + fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize { + self.uuid.as_bytes().deep_size_of_children(context) + + self.fields.deep_size_of_children(context) + + self.name.deep_size_of_children(context) + + self.dataset_version.deep_size_of_children(context) + + self + .fragment_bitmap + .as_ref() + .map(|fragment_bitmap| fragment_bitmap.serialized_size()) + .unwrap_or(0) + } +} + /// Index metadata #[derive(Debug, Clone, PartialEq)] pub struct Index { @@ -47,30 +71,10 @@ pub struct Index { /// This field is optional for backward compatibility. For existing indices created before /// this field was added, this will be None. pub created_at: Option>, -} - -impl Index { - pub fn effective_fragment_bitmap( - &self, - existing_fragments: &RoaringBitmap, - ) -> Option { - let fragment_bitmap = self.fragment_bitmap.as_ref()?; - Some(fragment_bitmap & existing_fragments) - } -} -impl DeepSizeOf for Index { - fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize { - self.uuid.as_bytes().deep_size_of_children(context) - + self.fields.deep_size_of_children(context) - + self.name.deep_size_of_children(context) - + self.dataset_version.deep_size_of_children(context) - + self - .fragment_bitmap - .as_ref() - .map(|fragment_bitmap| fragment_bitmap.serialized_size()) - .unwrap_or(0) - } + /// The base path index of the index files. Used when the index is imported or referred from another dataset. + /// Lance uses it as key of the base_paths field in Manifest to determine the actual base path of the index files. + pub base_id: Option, } impl TryFrom for Index { @@ -102,6 +106,7 @@ impl TryFrom for Index { DateTime::from_timestamp_millis(ts as i64) .expect("Invalid timestamp in index metadata") }), + base_id: proto.base_id, }) } } @@ -127,6 +132,7 @@ impl From<&Index> for pb::IndexMetadata { index_details: idx.index_details.clone(), index_version: Some(idx.index_version), created_at: idx.created_at.map(|dt| dt.timestamp_millis() as u64), + base_id: idx.base_id, } } } diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index a20d4fae1b9..37b5b73328a 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -189,9 +189,9 @@ impl Manifest { &self, ref_name: Option, ref_path: String, + ref_base_id: u32, transaction_file: String, ) -> Self { - let new_base_id = self.base_paths.keys().max().map(|id| *id + 1).unwrap_or(0); let cloned_fragments = self .fragments .as_ref() @@ -202,13 +202,13 @@ impl Manifest { .files .into_iter() .map(|mut file| { - file.base_id = Some(new_base_id); + file.base_id = Some(ref_base_id); file }) .collect(); if let Some(mut deletion) = cloned_fragment.deletion_file.take() { - deletion.base_id = Some(new_base_id); + deletion.base_id = Some(ref_base_id); cloned_fragment.deletion_file = Some(deletion); } @@ -223,12 +223,11 @@ impl Manifest { writer_version: self.writer_version.clone(), fragments: Arc::new(cloned_fragments), version_aux_data: self.version_aux_data, - // TODO: apply shallow clone to indexes - index_section: None, + index_section: None, // These will be set on commit timestamp_nanos: self.timestamp_nanos, - reader_feature_flags: self.reader_feature_flags, tag: None, - writer_feature_flags: self.writer_feature_flags, + reader_feature_flags: 0, // These will be set on commit + writer_feature_flags: 0, // These will be set on commit max_fragment_id: self.max_fragment_id, transaction_file: Some(transaction_file), fragment_offsets: self.fragment_offsets.clone(), @@ -239,12 +238,12 @@ impl Manifest { base_paths: { let mut base_paths = self.base_paths.clone(); let base_path = BasePath { - id: new_base_id, + id: ref_base_id, name: ref_name, is_dataset_root: true, path: ref_path, }; - base_paths.insert(new_base_id, base_path); + base_paths.insert(ref_base_id, base_path); base_paths }, } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 6653a4e4d27..1b2c1e8bcbe 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1254,6 +1254,32 @@ impl Dataset { } } + /// Get the indices directory for a specific index, considering its base_id + pub(crate) fn indice_files_dir(&self, index: &Index) -> Result { + match index.base_id.as_ref() { + Some(base_id) => { + let base_paths = &self.manifest.base_paths; + let base_path = base_paths.get(base_id).ok_or_else(|| { + Error::invalid_input( + format!( + "base_path id {} not found for index {}", + base_id, index.uuid + ), + location!(), + ) + })?; + let path = Path::parse(base_path.path.as_str())?; + if base_path.is_dataset_root { + Ok(path.child(INDICES_DIR)) + } else { + // For non-dataset-root base paths, we assume the path already points to the indices directory + Ok(path) + } + } + None => Ok(self.base.child(INDICES_DIR)), + } + } + pub fn session(&self) -> Arc { self.session.clone() } diff --git a/rust/lance/src/dataset/optimize/remapping.rs b/rust/lance/src/dataset/optimize/remapping.rs index d38adc8dc81..06796a27ab7 100644 --- a/rust/lance/src/dataset/optimize/remapping.rs +++ b/rust/lance/src/dataset/optimize/remapping.rs @@ -272,6 +272,7 @@ async fn remap_index(dataset: &mut Dataset, index_id: &Uuid) -> Result<()> { index_details: curr_index_meta.index_details.clone(), index_version: curr_index_meta.index_version, created_at: curr_index_meta.created_at, + base_id: None, }; let transaction = Transaction::new( diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 06a8f0b5f60..5b69cbef6a9 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -529,6 +529,7 @@ impl DatasetIndexExt for Dataset { index_details: None, index_version: 0, created_at: Some(chrono::Utc::now()), + base_id: None, // New indices don't have base_id (they're not from shallow clone) }; let transaction = Transaction::new( @@ -643,6 +644,7 @@ impl DatasetIndexExt for Dataset { index_details: last_idx.index_details.clone(), index_version: res.new_index_version, created_at: Some(chrono::Utc::now()), + base_id: last_idx.base_id, // Preserve base_id from the last delta }; removed_indices.extend(res.removed_indices.iter().map(|&idx| idx.clone())); if deltas.len() > removed_indices.len() { @@ -978,8 +980,12 @@ impl DatasetIndexInternalExt for Dataset { // scalar indices, we may start having this file with scalar indices too. Once that happens // we can just read this file and look at the `implementation` or `index_type` fields to // determine what kind of index it is. - let index_dir = self.indices_dir().child(uuid); - let index_file = index_dir.child(INDEX_FILE_NAME); + let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index { + message: format!("Index with id {} does not exist", uuid), + location: location!(), + })?; + let index_dir = self.indice_files_dir(&index_meta)?; + let index_file = index_dir.child(uuid).child(INDEX_FILE_NAME); if self.object_store.exists(&index_file).await? { let index = self.open_vector_index(column, uuid, metrics).await?; Ok(index.as_index()) @@ -1032,8 +1038,12 @@ impl DatasetIndexInternalExt for Dataset { } let frag_reuse_index = self.open_frag_reuse_index(metrics).await?; - let index_dir = self.indices_dir().child(uuid); - let index_file = index_dir.child(INDEX_FILE_NAME); + let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index { + message: format!("Index with id {} does not exist", uuid), + location: location!(), + })?; + let index_dir = self.indice_files_dir(&index_meta)?; + let index_file = index_dir.child(uuid).child(INDEX_FILE_NAME); let reader: Arc = self.object_store.open(&index_file).await?.into(); let tailing_bytes = read_last_block(reader.as_ref()).await?; @@ -1124,7 +1134,7 @@ impl DatasetIndexInternalExt for Dataset { DataType::Float16 | DataType::Float32 | DataType::Float64 => { let ivf = IVFIndex::::try_new( self.object_store.clone(), - self.indices_dir(), + index_dir, uuid.to_owned(), frag_reuse_index, self.metadata_cache.as_ref(), @@ -1136,7 +1146,7 @@ impl DatasetIndexInternalExt for Dataset { DataType::UInt8 => { let ivf = IVFIndex::::try_new( self.object_store.clone(), - self.indices_dir(), + index_dir, uuid.to_owned(), frag_reuse_index, self.metadata_cache.as_ref(), @@ -1181,12 +1191,12 @@ impl DatasetIndexInternalExt for Dataset { } "IVF_HNSW_FLAT" => { - let uri = self.indices_dir().child(uuid).child("index.pb"); + let uri = index_dir.child(uuid).child("index.pb"); let file_metadata_cache = self.session.metadata_cache.file_metadata_cache(&uri); let ivf = IVFIndex::::try_new( self.object_store.clone(), - self.indices_dir(), + index_dir, uuid.to_owned(), frag_reuse_index, &file_metadata_cache, @@ -1199,7 +1209,7 @@ impl DatasetIndexInternalExt for Dataset { "IVF_HNSW_SQ" => { let ivf = IVFIndex::::try_new( self.object_store.clone(), - self.indices_dir(), + index_dir, uuid.to_owned(), frag_reuse_index, self.metadata_cache.as_ref(), @@ -1212,7 +1222,7 @@ impl DatasetIndexInternalExt for Dataset { "IVF_HNSW_PQ" => { let ivf = IVFIndex::::try_new( self.object_store.clone(), - self.indices_dir(), + index_dir, uuid.to_owned(), frag_reuse_index, self.metadata_cache.as_ref(), @@ -1467,7 +1477,7 @@ fn is_vector_field(data_type: DataType) -> bool { mod tests { use crate::dataset::builder::DatasetBuilder; use crate::dataset::optimize::{compact_files, CompactionOptions}; - use crate::dataset::{ReadParams, WriteParams}; + use crate::dataset::{ReadParams, WriteMode, WriteParams}; use crate::index::vector::VectorIndexParams; use crate::session::Session; use crate::utils::test::{ @@ -3127,4 +3137,468 @@ mod tests { "Index should have zero unindexed rows after optimization" ); } + + #[tokio::test] + async fn test_comprehensive_shallow_clone_with_indices() { + let test_dir = tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + let clone_dir = test_dir.path().join("clone"); + let cloned_uri = clone_dir.to_str().unwrap(); + + // Create a schema with both vector and scalar columns + let dimensions = 16; + let schema = Arc::new(Schema::new(vec![ + Field::new( + "id", + DataType::Int32, + false, + ), + Field::new( + "category", + DataType::Utf8, + false, + ), + Field::new( + "vector", + DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::Float32, true)), + dimensions, + ), + false, + ), + ])); + + // Generate test data + let float_arr = generate_random_array(300 * dimensions as usize); + let vectors = Arc::new( + FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap(), + ); + let ids = Arc::new(Int32Array::from_iter_values(0..300)); + let categories = Arc::new(StringArray::from_iter_values( + (0..300).map(|i| format!("category_{}", i % 5)) + )); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ids, categories, vectors], + ).unwrap(); + + let reader = RecordBatchIterator::new( + vec![batch].into_iter().map(Ok), + schema.clone(), + ); + + // Create initial dataset + let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + println!("✅ Created initial dataset with 300 rows"); + + // Create vector index + let vector_params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10); + dataset + .create_index( + &["vector"], + IndexType::Vector, + Some("vector_idx".to_string()), + &vector_params, + true, + ) + .await + .unwrap(); + println!("✅ Created vector index"); + + // Create scalar index + dataset + .create_index( + &["category"], + IndexType::BTree, + Some("category_idx".to_string()), + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + println!("✅ Created scalar index"); + + // Verify indices were created + let indices = dataset.load_indices().await.unwrap(); + assert_eq!(indices.len(), 2, "Should have 2 indices"); + let index_names: HashSet = indices.iter().map(|idx| idx.name.clone()).collect(); + assert!(index_names.contains("vector_idx")); + assert!(index_names.contains("category_idx")); + println!("✅ Verified indices creation"); + + // Create tag for shallow cloning + dataset.tags.create("test_tag", dataset.version().version).await.unwrap(); + + // Perform shallow clone + let cloned_dataset = dataset + .shallow_clone(cloned_uri, "test_tag", ObjectStoreParams::default()) + .await + .unwrap(); + println!("✅ Performed shallow clone"); + + // Verify cloned dataset has indices + let cloned_indices = cloned_dataset.load_indices().await.unwrap(); + assert_eq!(cloned_indices.len(), 2, "Cloned dataset should have 2 indices"); + let cloned_index_names: HashSet = cloned_indices.iter().map(|idx| idx.name.clone()).collect(); + assert!(cloned_index_names.contains("vector_idx")); + assert!(cloned_index_names.contains("category_idx")); + + // Debug: Check base_id of cloned indices + for index in cloned_indices.iter() { + println!("🔍 Index '{}' has base_id: {:?}", index.name, index.base_id); + } + + // Debug: Check base_paths in cloned dataset + println!("🔍 Cloned dataset base_paths: {:?}", cloned_dataset.manifest.base_paths); + println!("🔍 Cloned dataset base path: {:?}", cloned_dataset.base); + println!("🔍 Cloned dataset indices_dir(): {:?}", cloned_dataset.indices_dir()); + + println!("✅ Verified cloned dataset has indices"); + + // Test vector search on cloned dataset + let query_vector = generate_random_array(dimensions as usize); + + let search_results = cloned_dataset + .scan() + .nearest("vector", &query_vector, 5).unwrap() + .limit(Some(5), None).unwrap() + .try_into_batch() + .await + .unwrap(); + + assert!(search_results.num_rows() > 0, "Vector search should return results"); + println!("✅ Vector search works on cloned dataset"); + + // Test scalar query on cloned dataset + let scalar_results = cloned_dataset + .scan() + .filter("category = 'category_0'").unwrap() + .try_into_batch() + .await + .unwrap(); + + assert!(scalar_results.num_rows() > 0, "Scalar query should return results"); + println!("✅ Scalar query works on cloned dataset"); + + // Append new data to cloned dataset + let new_float_arr = generate_random_array(50 * dimensions as usize); + let new_vectors = Arc::new( + FixedSizeListArray::try_new_from_values(new_float_arr, dimensions).unwrap(), + ); + let new_ids = Arc::new(Int32Array::from_iter_values(300..350)); + let new_categories = Arc::new(StringArray::from_iter_values( + (300..350).map(|i| format!("new_category_{}", i % 3)) + )); + + let new_batch = RecordBatch::try_new( + schema.clone(), + vec![new_ids, new_categories, new_vectors], + ).unwrap(); + + let new_reader = RecordBatchIterator::new( + vec![new_batch].into_iter().map(Ok), + schema.clone(), + ); + + let mut updated_cloned_dataset = Dataset::write( + new_reader, + cloned_uri, + Some(WriteParams { + mode: WriteMode::Append, + ..Default::default() + }), + ).await.unwrap(); + println!("✅ Appended new data to cloned dataset"); + + // Verify row count increased + let total_rows = updated_cloned_dataset.count_rows(None).await.unwrap(); + assert_eq!(total_rows, 350, "Should have 350 rows after append"); + println!("✅ Verified row count after append"); + + // Call optimize_indices + updated_cloned_dataset + .optimize_indices(&OptimizeOptions::default()) + .await + .unwrap(); + println!("✅ Optimized indices"); + + // Test vector search after optimization (should find both old and new data) + let query_vector = generate_random_array(dimensions as usize); + + let optimized_search_results = updated_cloned_dataset + .scan() + .nearest("vector", &query_vector, 10).unwrap() + .limit(Some(10), None).unwrap() + .try_into_batch() + .await + .unwrap(); + + assert!(optimized_search_results.num_rows() > 0, "Vector search should work after optimization"); + println!("✅ Vector search works after optimization"); + + // Test scalar query after optimization (should find both old and new data) + let old_category_results = updated_cloned_dataset + .scan() + .filter("category = 'category_0'").unwrap() + .try_into_batch() + .await + .unwrap(); + + let new_category_results = updated_cloned_dataset + .scan() + .filter("category = 'new_category_0'").unwrap() + .try_into_batch() + .await + .unwrap(); + + assert!(old_category_results.num_rows() > 0, "Should find old category data"); + assert!(new_category_results.num_rows() > 0, "Should find new category data"); + println!("✅ Can query both old and new data after optimization"); + + // Verify index statistics + let vector_stats: serde_json::Value = serde_json::from_str( + &updated_cloned_dataset.index_statistics("vector_idx").await.unwrap() + ).unwrap(); + let category_stats: serde_json::Value = serde_json::from_str( + &updated_cloned_dataset.index_statistics("category_idx").await.unwrap() + ).unwrap(); + + assert_eq!(vector_stats["num_indexed_rows"].as_u64().unwrap(), 350); + assert_eq!(category_stats["num_indexed_rows"].as_u64().unwrap(), 350); + println!("✅ Index statistics show all rows are indexed"); + + println!("🎉 Comprehensive shallow clone with indices test passed!"); + } + + #[tokio::test] + async fn test_shallow_clone_with_index() { + let test_dir = tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + let clone_dir = test_dir.path().join("clone"); + let cloned_uri = clone_dir.to_str().unwrap(); + + // Create a schema with both vector and scalar columns + let dimensions = 16; + let schema = Arc::new(Schema::new(vec![ + Field::new( + "id", + DataType::Int32, + false, + ), + Field::new( + "category", + DataType::Utf8, + false, + ), + Field::new( + "vector", + DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::Float32, true)), + dimensions, + ), + false, + ), + ])); + + // Generate test data (300 rows to satisfy PQ training requirements) + let float_arr = generate_random_array(300 * dimensions as usize); + let vectors = Arc::new( + FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap(), + ); + let ids = Arc::new(Int32Array::from_iter_values(0..300)); + let categories = Arc::new(StringArray::from_iter_values( + (0..300).map(|i| format!("category_{}", i % 5)) + )); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ids, categories, vectors], + ).unwrap(); + + let reader = RecordBatchIterator::new( + vec![batch].into_iter().map(Ok), + schema.clone(), + ); + + // Create initial dataset + let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + println!("✅ Created initial dataset with 300 rows"); + + // Create vector index (IVF_PQ) + let vector_params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10); + dataset + .create_index( + &["vector"], + IndexType::Vector, + Some("vector_idx".to_string()), + &vector_params, + true, + ) + .await + .unwrap(); + println!("✅ Created vector index"); + + // Create scalar index (BTree) + dataset + .create_index( + &["category"], + IndexType::BTree, + Some("category_idx".to_string()), + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + println!("✅ Created scalar index"); + + // Verify indices were created + let indices = dataset.load_indices().await.unwrap(); + assert_eq!(indices.len(), 2, "Should have 2 indices"); + let index_names: HashSet = indices.iter().map(|idx| idx.name.clone()).collect(); + assert!(index_names.contains("vector_idx")); + assert!(index_names.contains("category_idx")); + println!("✅ Verified indices creation"); + + // Create tag for shallow cloning + dataset.tags.create("test_tag", dataset.version().version).await.unwrap(); + + // Perform shallow clone + let cloned_dataset = dataset + .shallow_clone(cloned_uri, "test_tag", ObjectStoreParams::default()) + .await + .unwrap(); + println!("✅ Performed shallow clone"); + + // Verify cloned dataset has indices + let cloned_indices = cloned_dataset.load_indices().await.unwrap(); + assert_eq!(cloned_indices.len(), 2, "Cloned dataset should have 2 indices"); + let cloned_index_names: HashSet = cloned_indices.iter().map(|idx| idx.name.clone()).collect(); + assert!(cloned_index_names.contains("vector_idx")); + assert!(cloned_index_names.contains("category_idx")); + + // Debug: Check base_id of cloned indices + for index in cloned_indices.iter() { + println!("🔍 Index '{}' has base_id: {:?}", index.name, index.base_id); + } + + // Debug: Check base_paths in cloned dataset + println!("🔍 Cloned dataset base_paths: {:?}", cloned_dataset.manifest.base_paths); + + println!("✅ Verified cloned dataset has indices"); + + // Test vector search on cloned dataset + let query_vector = generate_random_array(dimensions as usize); + + let search_results = cloned_dataset + .scan() + .nearest("vector", &query_vector, 5).unwrap() + .limit(Some(5), None).unwrap() + .try_into_batch() + .await + .unwrap(); + + assert!(search_results.num_rows() > 0, "Vector search should return results"); + println!("✅ Vector search works on cloned dataset"); + + // Test scalar query on cloned dataset + let scalar_results = cloned_dataset + .scan() + .filter("category = 'category_0'").unwrap() + .try_into_batch() + .await + .unwrap(); + + assert!(scalar_results.num_rows() > 0, "Scalar query should return results"); + println!("✅ Scalar query works on cloned dataset"); + + // Append new data to cloned dataset + let new_float_arr = generate_random_array(50 * dimensions as usize); + let new_vectors = Arc::new( + FixedSizeListArray::try_new_from_values(new_float_arr, dimensions).unwrap(), + ); + let new_ids = Arc::new(Int32Array::from_iter_values(300..350)); + let new_categories = Arc::new(StringArray::from_iter_values( + (300..350).map(|i| format!("new_category_{}", i % 3)) + )); + + let new_batch = RecordBatch::try_new( + schema.clone(), + vec![new_ids, new_categories, new_vectors], + ).unwrap(); + + let new_reader = RecordBatchIterator::new( + vec![new_batch].into_iter().map(Ok), + schema.clone(), + ); + + let mut updated_cloned_dataset = Dataset::write( + new_reader, + cloned_uri, + Some(WriteParams { + mode: WriteMode::Append, + ..Default::default() + }), + ).await.unwrap(); + println!("✅ Appended new data to cloned dataset"); + + // Verify row count increased + let total_rows = updated_cloned_dataset.count_rows(None).await.unwrap(); + assert_eq!(total_rows, 350, "Should have 350 rows after append"); + println!("✅ Verified row count after append"); + + // Call optimize_indices + updated_cloned_dataset + .optimize_indices(&OptimizeOptions::default()) + .await + .unwrap(); + println!("✅ Optimized indices"); + + // Test vector search after optimization (should find both old and new data) + let query_vector = generate_random_array(dimensions as usize); + + let optimized_search_results = updated_cloned_dataset + .scan() + .nearest("vector", &query_vector, 10).unwrap() + .limit(Some(10), None).unwrap() + .try_into_batch() + .await + .unwrap(); + + assert!(optimized_search_results.num_rows() > 0, "Vector search should work after optimization"); + println!("✅ Vector search works after optimization"); + + // Test scalar query after optimization (should find both old and new data) + let old_category_results = updated_cloned_dataset + .scan() + .filter("category = 'category_0'").unwrap() + .try_into_batch() + .await + .unwrap(); + + let new_category_results = updated_cloned_dataset + .scan() + .filter("category = 'new_category_0'").unwrap() + .try_into_batch() + .await + .unwrap(); + + assert!(old_category_results.num_rows() > 0, "Should find old category data"); + assert!(new_category_results.num_rows() > 0, "Should find new category data"); + println!("✅ Can query both old and new data after optimization"); + + // Verify index statistics + let vector_stats: serde_json::Value = serde_json::from_str( + &updated_cloned_dataset.index_statistics("vector_idx").await.unwrap() + ).unwrap(); + let category_stats: serde_json::Value = serde_json::from_str( + &updated_cloned_dataset.index_statistics("category_idx").await.unwrap() + ).unwrap(); + + assert_eq!(vector_stats["num_indexed_rows"].as_u64().unwrap(), 350); + assert_eq!(category_stats["num_indexed_rows"].as_u64().unwrap(), 350); + println!("✅ Index statistics show all rows are indexed"); + + println!("🎉 Comprehensive shallow clone index functionality test passed!"); + } } diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 70c13d9d603..15d10c1fe73 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -276,6 +276,7 @@ impl<'a> CreateIndexBuilder<'a> { index_details: Some(index_details), index_version: self.index_type.version(), created_at: Some(chrono::Utc::now()), + base_id: None, }; let transaction = Transaction::new( self.dataset.manifest.version, diff --git a/rust/lance/src/index/frag_reuse.rs b/rust/lance/src/index/frag_reuse.rs index 6b827ce8613..346664792e6 100644 --- a/rust/lance/src/index/frag_reuse.rs +++ b/rust/lance/src/index/frag_reuse.rs @@ -176,5 +176,6 @@ pub(crate) async fn build_frag_reuse_index_metadata( index_details: Some(prost_types::Any::from_msg(&proto)?), index_version: index_meta.map_or(0, |index_meta| index_meta.index_version), created_at: Some(chrono::Utc::now()), + base_id: None, }) } diff --git a/rust/lance/src/index/mem_wal.rs b/rust/lance/src/index/mem_wal.rs index b9e9f0451ea..a25fa3a340e 100644 --- a/rust/lance/src/index/mem_wal.rs +++ b/rust/lance/src/index/mem_wal.rs @@ -542,6 +542,7 @@ pub(crate) fn new_mem_wal_index_meta( ))?), index_version: 0, created_at: Some(chrono::Utc::now()), + base_id: None, }) } diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 101e177d504..779278b6b4e 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -486,10 +486,10 @@ pub async fn open_scalar_index( async fn infer_scalar_index_type( dataset: &Dataset, - index_uuid: &str, + index: &Index, column: &str, ) -> Result { - let index_dir = dataset.indices_dir().child(index_uuid.to_string()); + let index_dir = dataset.indice_files_dir(index)?.child(index.uuid.to_string()); let col = dataset.schema().field(column).ok_or(Error::Internal { message: format!( "Index refers to column {} which does not exist in dataset schema", @@ -550,7 +550,7 @@ pub async fn detect_scalar_index_type( if let Some(index_type) = dataset.index_cache.get_with_key(&type_key).await { return Ok(*index_type.as_ref()); } - let index_type = infer_scalar_index_type(dataset, &index.uuid.to_string(), column).await?; + let index_type = infer_scalar_index_type(dataset, index, column).await?; dataset .index_cache .insert_with_key(&type_key, Arc::new(index_type)) diff --git a/rust/lance/src/index/vector.rs b/rust/lance/src/index/vector.rs index feba2e5cfda..905ffce7579 100644 --- a/rust/lance/src/index/vector.rs +++ b/rust/lance/src/index/vector.rs @@ -36,7 +36,7 @@ use lance_index::vector::{ sq::{builder::SQBuildParams, ScalarQuantizer}, VectorIndex, }; -use lance_index::{IndexType, INDEX_AUXILIARY_FILE_NAME, INDEX_METADATA_SCHEMA_KEY}; +use lance_index::{DatasetIndexExt, IndexType, INDEX_AUXILIARY_FILE_NAME, INDEX_METADATA_SCHEMA_KEY}; use lance_io::traits::Reader; use lance_linalg::distance::*; use lance_table::format::Index as IndexMetadata; @@ -652,11 +652,16 @@ pub(crate) async fn open_vector_index_v2( let distance_type = DistanceType::try_from(index_metadata.distance_type.as_str())?; let frag_reuse_uuid = dataset.frag_reuse_index_uuid(); + // Load the index metadata to get the correct index directory + let index_meta = dataset.load_index(uuid).await?.ok_or_else(|| Error::Index { + message: format!("Index with id {} does not exist", uuid), + location: location!(), + })?; + let index_dir = dataset.indice_files_dir(&index_meta)?; let index: Arc = match index_metadata.index_type.as_str() { "IVF_HNSW_PQ" => { - let aux_path = dataset - .indices_dir() + let aux_path = index_dir .child(uuid) .child(INDEX_AUXILIARY_FILE_NAME); let aux_reader = dataset.object_store().open(&aux_path).await?; @@ -685,8 +690,7 @@ pub(crate) async fn open_vector_index_v2( } "IVF_HNSW_SQ" => { - let aux_path = dataset - .indices_dir() + let aux_path = index_dir .child(uuid) .child(INDEX_AUXILIARY_FILE_NAME); let aux_reader = dataset.object_store().open(&aux_path).await?; diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 849deeafc2e..420784fee70 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -2282,6 +2282,7 @@ mod tests { index_details: Some(vector_index_details()), index_version: index.index_type().version(), created_at: None, // Test index, not setting timestamp + base_id: None, }; let prefilter = Arc::new(DatasetPreFilter::new(dataset.clone(), &[index_meta], None)); diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index dfabd409174..2f79c2c19b1 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -133,9 +133,37 @@ async fn do_commit_new_dataset( &Session::default(), ) .await?; - let new_manifest = - source_manifest.shallow_clone(ref_name.clone(), ref_path.clone(), transaction_file); - (new_manifest, Vec::new()) + + let new_base_id = source_manifest + .base_paths + .keys() + .max() + .map(|id| *id + 1) + .unwrap_or(0); + let new_manifest = source_manifest.shallow_clone( + ref_name.clone(), + ref_path.clone(), + new_base_id, + transaction_file, + ); + + let updated_indices = if let Some(index_section_pos) = source_manifest.index_section { + let reader = object_store.open(&source_manifest_location.path).await?; + let section: pb::IndexSection = + lance_io::utils::read_message(reader.as_ref(), index_section_pos).await?; + section + .indices + .into_iter() + .map(|index_pb| { + let mut index = lance_table::format::Index::try_from(index_pb)?; + index.base_id = Some(new_base_id); + Ok(index) + }) + .collect::>>()? + } else { + vec![] + }; + (new_manifest, updated_indices) } else { let (manifest, indices) = transaction.build_manifest( None, diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index 4b031da9575..5d50011ab06 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -1959,6 +1959,7 @@ mod tests { index_details: None, index_version: 0, created_at: None, // Test index, not setting timestamp + base_id: None, }; let fragment0 = Fragment::new(0); let fragment1 = Fragment::new(1); From ec5c62a4c1e5fd8e7c4cb7b9a6c01eda5d8df9d7 Mon Sep 17 00:00:00 2001 From: majin1102 Date: Sun, 24 Aug 2025 23:22:21 +0800 Subject: [PATCH 02/13] fmt --all --- rust/lance/src/index.rs | 275 ++++++++++++++++++++------------- rust/lance/src/index/scalar.rs | 4 +- rust/lance/src/index/vector.rs | 23 +-- 3 files changed, 179 insertions(+), 123 deletions(-) diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 5b69cbef6a9..dcbe0cb2c64 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -3148,16 +3148,8 @@ mod tests { // Create a schema with both vector and scalar columns let dimensions = 16; let schema = Arc::new(Schema::new(vec![ - Field::new( - "id", - DataType::Int32, - false, - ), - Field::new( - "category", - DataType::Utf8, - false, - ), + Field::new("id", DataType::Int32, false), + Field::new("category", DataType::Utf8, false), Field::new( "vector", DataType::FixedSizeList( @@ -3170,23 +3162,16 @@ mod tests { // Generate test data let float_arr = generate_random_array(300 * dimensions as usize); - let vectors = Arc::new( - FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap(), - ); + let vectors = + Arc::new(FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap()); let ids = Arc::new(Int32Array::from_iter_values(0..300)); let categories = Arc::new(StringArray::from_iter_values( - (0..300).map(|i| format!("category_{}", i % 5)) + (0..300).map(|i| format!("category_{}", i % 5)), )); - let batch = RecordBatch::try_new( - schema.clone(), - vec![ids, categories, vectors], - ).unwrap(); + let batch = RecordBatch::try_new(schema.clone(), vec![ids, categories, vectors]).unwrap(); - let reader = RecordBatchIterator::new( - vec![batch].into_iter().map(Ok), - schema.clone(), - ); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); // Create initial dataset let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); @@ -3228,7 +3213,11 @@ mod tests { println!("✅ Verified indices creation"); // Create tag for shallow cloning - dataset.tags.create("test_tag", dataset.version().version).await.unwrap(); + dataset + .tags + .create("test_tag", dataset.version().version) + .await + .unwrap(); // Perform shallow clone let cloned_dataset = dataset @@ -3239,8 +3228,13 @@ mod tests { // Verify cloned dataset has indices let cloned_indices = cloned_dataset.load_indices().await.unwrap(); - assert_eq!(cloned_indices.len(), 2, "Cloned dataset should have 2 indices"); - let cloned_index_names: HashSet = cloned_indices.iter().map(|idx| idx.name.clone()).collect(); + assert_eq!( + cloned_indices.len(), + 2, + "Cloned dataset should have 2 indices" + ); + let cloned_index_names: HashSet = + cloned_indices.iter().map(|idx| idx.name.clone()).collect(); assert!(cloned_index_names.contains("vector_idx")); assert!(cloned_index_names.contains("category_idx")); @@ -3250,9 +3244,15 @@ mod tests { } // Debug: Check base_paths in cloned dataset - println!("🔍 Cloned dataset base_paths: {:?}", cloned_dataset.manifest.base_paths); + println!( + "🔍 Cloned dataset base_paths: {:?}", + cloned_dataset.manifest.base_paths + ); println!("🔍 Cloned dataset base path: {:?}", cloned_dataset.base); - println!("🔍 Cloned dataset indices_dir(): {:?}", cloned_dataset.indices_dir()); + println!( + "🔍 Cloned dataset indices_dir(): {:?}", + cloned_dataset.indices_dir() + ); println!("✅ Verified cloned dataset has indices"); @@ -3261,45 +3261,50 @@ mod tests { let search_results = cloned_dataset .scan() - .nearest("vector", &query_vector, 5).unwrap() - .limit(Some(5), None).unwrap() + .nearest("vector", &query_vector, 5) + .unwrap() + .limit(Some(5), None) + .unwrap() .try_into_batch() .await .unwrap(); - assert!(search_results.num_rows() > 0, "Vector search should return results"); + assert!( + search_results.num_rows() > 0, + "Vector search should return results" + ); println!("✅ Vector search works on cloned dataset"); // Test scalar query on cloned dataset let scalar_results = cloned_dataset .scan() - .filter("category = 'category_0'").unwrap() + .filter("category = 'category_0'") + .unwrap() .try_into_batch() .await .unwrap(); - assert!(scalar_results.num_rows() > 0, "Scalar query should return results"); + assert!( + scalar_results.num_rows() > 0, + "Scalar query should return results" + ); println!("✅ Scalar query works on cloned dataset"); // Append new data to cloned dataset let new_float_arr = generate_random_array(50 * dimensions as usize); - let new_vectors = Arc::new( - FixedSizeListArray::try_new_from_values(new_float_arr, dimensions).unwrap(), - ); + let new_vectors = + Arc::new(FixedSizeListArray::try_new_from_values(new_float_arr, dimensions).unwrap()); let new_ids = Arc::new(Int32Array::from_iter_values(300..350)); let new_categories = Arc::new(StringArray::from_iter_values( - (300..350).map(|i| format!("new_category_{}", i % 3)) + (300..350).map(|i| format!("new_category_{}", i % 3)), )); - let new_batch = RecordBatch::try_new( - schema.clone(), - vec![new_ids, new_categories, new_vectors], - ).unwrap(); + let new_batch = + RecordBatch::try_new(schema.clone(), vec![new_ids, new_categories, new_vectors]) + .unwrap(); - let new_reader = RecordBatchIterator::new( - vec![new_batch].into_iter().map(Ok), - schema.clone(), - ); + let new_reader = + RecordBatchIterator::new(vec![new_batch].into_iter().map(Ok), schema.clone()); let mut updated_cloned_dataset = Dataset::write( new_reader, @@ -3308,7 +3313,9 @@ mod tests { mode: WriteMode::Append, ..Default::default() }), - ).await.unwrap(); + ) + .await + .unwrap(); println!("✅ Appended new data to cloned dataset"); // Verify row count increased @@ -3328,41 +3335,62 @@ mod tests { let optimized_search_results = updated_cloned_dataset .scan() - .nearest("vector", &query_vector, 10).unwrap() - .limit(Some(10), None).unwrap() + .nearest("vector", &query_vector, 10) + .unwrap() + .limit(Some(10), None) + .unwrap() .try_into_batch() .await .unwrap(); - assert!(optimized_search_results.num_rows() > 0, "Vector search should work after optimization"); + assert!( + optimized_search_results.num_rows() > 0, + "Vector search should work after optimization" + ); println!("✅ Vector search works after optimization"); // Test scalar query after optimization (should find both old and new data) let old_category_results = updated_cloned_dataset .scan() - .filter("category = 'category_0'").unwrap() + .filter("category = 'category_0'") + .unwrap() .try_into_batch() .await .unwrap(); let new_category_results = updated_cloned_dataset .scan() - .filter("category = 'new_category_0'").unwrap() + .filter("category = 'new_category_0'") + .unwrap() .try_into_batch() .await .unwrap(); - assert!(old_category_results.num_rows() > 0, "Should find old category data"); - assert!(new_category_results.num_rows() > 0, "Should find new category data"); + assert!( + old_category_results.num_rows() > 0, + "Should find old category data" + ); + assert!( + new_category_results.num_rows() > 0, + "Should find new category data" + ); println!("✅ Can query both old and new data after optimization"); // Verify index statistics let vector_stats: serde_json::Value = serde_json::from_str( - &updated_cloned_dataset.index_statistics("vector_idx").await.unwrap() - ).unwrap(); + &updated_cloned_dataset + .index_statistics("vector_idx") + .await + .unwrap(), + ) + .unwrap(); let category_stats: serde_json::Value = serde_json::from_str( - &updated_cloned_dataset.index_statistics("category_idx").await.unwrap() - ).unwrap(); + &updated_cloned_dataset + .index_statistics("category_idx") + .await + .unwrap(), + ) + .unwrap(); assert_eq!(vector_stats["num_indexed_rows"].as_u64().unwrap(), 350); assert_eq!(category_stats["num_indexed_rows"].as_u64().unwrap(), 350); @@ -3381,16 +3409,8 @@ mod tests { // Create a schema with both vector and scalar columns let dimensions = 16; let schema = Arc::new(Schema::new(vec![ - Field::new( - "id", - DataType::Int32, - false, - ), - Field::new( - "category", - DataType::Utf8, - false, - ), + Field::new("id", DataType::Int32, false), + Field::new("category", DataType::Utf8, false), Field::new( "vector", DataType::FixedSizeList( @@ -3403,23 +3423,16 @@ mod tests { // Generate test data (300 rows to satisfy PQ training requirements) let float_arr = generate_random_array(300 * dimensions as usize); - let vectors = Arc::new( - FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap(), - ); + let vectors = + Arc::new(FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap()); let ids = Arc::new(Int32Array::from_iter_values(0..300)); let categories = Arc::new(StringArray::from_iter_values( - (0..300).map(|i| format!("category_{}", i % 5)) + (0..300).map(|i| format!("category_{}", i % 5)), )); - let batch = RecordBatch::try_new( - schema.clone(), - vec![ids, categories, vectors], - ).unwrap(); + let batch = RecordBatch::try_new(schema.clone(), vec![ids, categories, vectors]).unwrap(); - let reader = RecordBatchIterator::new( - vec![batch].into_iter().map(Ok), - schema.clone(), - ); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); // Create initial dataset let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); @@ -3461,7 +3474,11 @@ mod tests { println!("✅ Verified indices creation"); // Create tag for shallow cloning - dataset.tags.create("test_tag", dataset.version().version).await.unwrap(); + dataset + .tags + .create("test_tag", dataset.version().version) + .await + .unwrap(); // Perform shallow clone let cloned_dataset = dataset @@ -3472,8 +3489,13 @@ mod tests { // Verify cloned dataset has indices let cloned_indices = cloned_dataset.load_indices().await.unwrap(); - assert_eq!(cloned_indices.len(), 2, "Cloned dataset should have 2 indices"); - let cloned_index_names: HashSet = cloned_indices.iter().map(|idx| idx.name.clone()).collect(); + assert_eq!( + cloned_indices.len(), + 2, + "Cloned dataset should have 2 indices" + ); + let cloned_index_names: HashSet = + cloned_indices.iter().map(|idx| idx.name.clone()).collect(); assert!(cloned_index_names.contains("vector_idx")); assert!(cloned_index_names.contains("category_idx")); @@ -3483,7 +3505,10 @@ mod tests { } // Debug: Check base_paths in cloned dataset - println!("🔍 Cloned dataset base_paths: {:?}", cloned_dataset.manifest.base_paths); + println!( + "🔍 Cloned dataset base_paths: {:?}", + cloned_dataset.manifest.base_paths + ); println!("✅ Verified cloned dataset has indices"); @@ -3492,45 +3517,50 @@ mod tests { let search_results = cloned_dataset .scan() - .nearest("vector", &query_vector, 5).unwrap() - .limit(Some(5), None).unwrap() + .nearest("vector", &query_vector, 5) + .unwrap() + .limit(Some(5), None) + .unwrap() .try_into_batch() .await .unwrap(); - assert!(search_results.num_rows() > 0, "Vector search should return results"); + assert!( + search_results.num_rows() > 0, + "Vector search should return results" + ); println!("✅ Vector search works on cloned dataset"); // Test scalar query on cloned dataset let scalar_results = cloned_dataset .scan() - .filter("category = 'category_0'").unwrap() + .filter("category = 'category_0'") + .unwrap() .try_into_batch() .await .unwrap(); - assert!(scalar_results.num_rows() > 0, "Scalar query should return results"); + assert!( + scalar_results.num_rows() > 0, + "Scalar query should return results" + ); println!("✅ Scalar query works on cloned dataset"); // Append new data to cloned dataset let new_float_arr = generate_random_array(50 * dimensions as usize); - let new_vectors = Arc::new( - FixedSizeListArray::try_new_from_values(new_float_arr, dimensions).unwrap(), - ); + let new_vectors = + Arc::new(FixedSizeListArray::try_new_from_values(new_float_arr, dimensions).unwrap()); let new_ids = Arc::new(Int32Array::from_iter_values(300..350)); let new_categories = Arc::new(StringArray::from_iter_values( - (300..350).map(|i| format!("new_category_{}", i % 3)) + (300..350).map(|i| format!("new_category_{}", i % 3)), )); - let new_batch = RecordBatch::try_new( - schema.clone(), - vec![new_ids, new_categories, new_vectors], - ).unwrap(); + let new_batch = + RecordBatch::try_new(schema.clone(), vec![new_ids, new_categories, new_vectors]) + .unwrap(); - let new_reader = RecordBatchIterator::new( - vec![new_batch].into_iter().map(Ok), - schema.clone(), - ); + let new_reader = + RecordBatchIterator::new(vec![new_batch].into_iter().map(Ok), schema.clone()); let mut updated_cloned_dataset = Dataset::write( new_reader, @@ -3539,7 +3569,9 @@ mod tests { mode: WriteMode::Append, ..Default::default() }), - ).await.unwrap(); + ) + .await + .unwrap(); println!("✅ Appended new data to cloned dataset"); // Verify row count increased @@ -3559,41 +3591,62 @@ mod tests { let optimized_search_results = updated_cloned_dataset .scan() - .nearest("vector", &query_vector, 10).unwrap() - .limit(Some(10), None).unwrap() + .nearest("vector", &query_vector, 10) + .unwrap() + .limit(Some(10), None) + .unwrap() .try_into_batch() .await .unwrap(); - assert!(optimized_search_results.num_rows() > 0, "Vector search should work after optimization"); + assert!( + optimized_search_results.num_rows() > 0, + "Vector search should work after optimization" + ); println!("✅ Vector search works after optimization"); // Test scalar query after optimization (should find both old and new data) let old_category_results = updated_cloned_dataset .scan() - .filter("category = 'category_0'").unwrap() + .filter("category = 'category_0'") + .unwrap() .try_into_batch() .await .unwrap(); let new_category_results = updated_cloned_dataset .scan() - .filter("category = 'new_category_0'").unwrap() + .filter("category = 'new_category_0'") + .unwrap() .try_into_batch() .await .unwrap(); - assert!(old_category_results.num_rows() > 0, "Should find old category data"); - assert!(new_category_results.num_rows() > 0, "Should find new category data"); + assert!( + old_category_results.num_rows() > 0, + "Should find old category data" + ); + assert!( + new_category_results.num_rows() > 0, + "Should find new category data" + ); println!("✅ Can query both old and new data after optimization"); // Verify index statistics let vector_stats: serde_json::Value = serde_json::from_str( - &updated_cloned_dataset.index_statistics("vector_idx").await.unwrap() - ).unwrap(); + &updated_cloned_dataset + .index_statistics("vector_idx") + .await + .unwrap(), + ) + .unwrap(); let category_stats: serde_json::Value = serde_json::from_str( - &updated_cloned_dataset.index_statistics("category_idx").await.unwrap() - ).unwrap(); + &updated_cloned_dataset + .index_statistics("category_idx") + .await + .unwrap(), + ) + .unwrap(); assert_eq!(vector_stats["num_indexed_rows"].as_u64().unwrap(), 350); assert_eq!(category_stats["num_indexed_rows"].as_u64().unwrap(), 350); diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 779278b6b4e..a910e6ff53e 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -489,7 +489,9 @@ async fn infer_scalar_index_type( index: &Index, column: &str, ) -> Result { - let index_dir = dataset.indice_files_dir(index)?.child(index.uuid.to_string()); + let index_dir = dataset + .indice_files_dir(index)? + .child(index.uuid.to_string()); let col = dataset.schema().field(column).ok_or(Error::Internal { message: format!( "Index refers to column {} which does not exist in dataset schema", diff --git a/rust/lance/src/index/vector.rs b/rust/lance/src/index/vector.rs index 905ffce7579..8b4663632fb 100644 --- a/rust/lance/src/index/vector.rs +++ b/rust/lance/src/index/vector.rs @@ -36,7 +36,9 @@ use lance_index::vector::{ sq::{builder::SQBuildParams, ScalarQuantizer}, VectorIndex, }; -use lance_index::{DatasetIndexExt, IndexType, INDEX_AUXILIARY_FILE_NAME, INDEX_METADATA_SCHEMA_KEY}; +use lance_index::{ + DatasetIndexExt, IndexType, INDEX_AUXILIARY_FILE_NAME, INDEX_METADATA_SCHEMA_KEY, +}; use lance_io::traits::Reader; use lance_linalg::distance::*; use lance_table::format::Index as IndexMetadata; @@ -653,17 +655,18 @@ pub(crate) async fn open_vector_index_v2( let frag_reuse_uuid = dataset.frag_reuse_index_uuid(); // Load the index metadata to get the correct index directory - let index_meta = dataset.load_index(uuid).await?.ok_or_else(|| Error::Index { - message: format!("Index with id {} does not exist", uuid), - location: location!(), - })?; + let index_meta = dataset + .load_index(uuid) + .await? + .ok_or_else(|| Error::Index { + message: format!("Index with id {} does not exist", uuid), + location: location!(), + })?; let index_dir = dataset.indice_files_dir(&index_meta)?; let index: Arc = match index_metadata.index_type.as_str() { "IVF_HNSW_PQ" => { - let aux_path = index_dir - .child(uuid) - .child(INDEX_AUXILIARY_FILE_NAME); + let aux_path = index_dir.child(uuid).child(INDEX_AUXILIARY_FILE_NAME); let aux_reader = dataset.object_store().open(&aux_path).await?; let ivf_data = IvfModel::load(&reader).await?; @@ -690,9 +693,7 @@ pub(crate) async fn open_vector_index_v2( } "IVF_HNSW_SQ" => { - let aux_path = index_dir - .child(uuid) - .child(INDEX_AUXILIARY_FILE_NAME); + let aux_path = index_dir.child(uuid).child(INDEX_AUXILIARY_FILE_NAME); let aux_reader = dataset.object_store().open(&aux_path).await?; let ivf_data = IvfModel::load(&reader).await?; From 1e5983f10430967fefb07a9018bff1013796f98c Mon Sep 17 00:00:00 2001 From: majin1102 Date: Mon, 25 Aug 2025 14:03:56 +0800 Subject: [PATCH 03/13] fix ci issues --- java/core/lance-jni/src/transaction.rs | 18 +- .../java/com/lancedb/lance/index/Index.java | 25 +- python/python/lance/dataset.py | 1 + python/src/dataset.rs | 2 + rust/lance/src/index.rs | 355 ++++-------------- rust/lance/src/index/scalar.rs | 10 +- 6 files changed, 117 insertions(+), 294 deletions(-) diff --git a/java/core/lance-jni/src/transaction.rs b/java/core/lance-jni/src/transaction.rs index 29dcdf7d651..02aca88cf49 100644 --- a/java/core/lance-jni/src/transaction.rs +++ b/java/core/lance-jni/src/transaction.rs @@ -119,10 +119,17 @@ impl IntoJava for Index { JObject::null() }; + // Convert base_id from Option to Integer for Java + let base_id = if let Some(id) = self.base_id { + env.new_object("java/lang/Integer", "(I)V", &[JValue::Int(id as i32)])? + } else { + JObject::null() + }; + // Create Index object Ok(env.new_object( "com/lancedb/lance/index/Index", - "(Ljava/util/UUID;Ljava/util/List;Ljava/lang/String;J[B[BILjava/time/Instant;)V", + "(Ljava/util/UUID;Ljava/util/List;Ljava/lang/String;J[B[BILjava/time/Instant;Ljava/lang/Integer;)V", &[ JValue::Object(&uuid), JValue::Object(&fields), @@ -132,6 +139,7 @@ impl IntoJava for Index { JValue::Object(&index_details), JValue::Int(self.index_version), JValue::Object(&created_at), + JValue::Object(&base_id), ], )?) } @@ -232,6 +240,13 @@ impl FromJObjectWithEnv for JObject<'_> { .i()? as u32; Some(DateTime::from_timestamp(seconds, nanos).unwrap()) }; + let base_id_obj = env.get_field(self, "baseId", "Ljava/lang/Integer;")?.l()?; + let base_id = if base_id_obj.is_null() { + None + } else { + let id_value = env.call_method(&base_id_obj, "intValue", "()I", &[])?.i()? as u32; + Some(id_value) + }; Ok(Index { uuid, @@ -242,6 +257,7 @@ impl FromJObjectWithEnv for JObject<'_> { index_details, index_version, created_at, + base_id, }) } } diff --git a/java/core/src/main/java/com/lancedb/lance/index/Index.java b/java/core/src/main/java/com/lancedb/lance/index/Index.java index 0cce9c14eb6..9859fbd1613 100644 --- a/java/core/src/main/java/com/lancedb/lance/index/Index.java +++ b/java/core/src/main/java/com/lancedb/lance/index/Index.java @@ -13,6 +13,7 @@ */ package com.lancedb.lance.index; +import org.apache.arrow.flatbuf.Int; import org.apache.commons.lang3.builder.ToStringBuilder; import java.time.Instant; @@ -35,6 +36,7 @@ public class Index { private final byte[] indexDetails; private final int indexVersion; private final Instant createdAt; + private final Integer baseId; private Index( UUID uuid, @@ -44,7 +46,8 @@ private Index( byte[] fragmentBitmap, byte[] indexDetails, int indexVersion, - Instant createdAt) { + Instant createdAt, + Integer baseId) { this.uuid = uuid; this.fields = fields; this.name = name; @@ -53,6 +56,7 @@ private Index( this.indexDetails = indexDetails; this.indexVersion = indexVersion; this.createdAt = createdAt; + this.baseId = baseId; } public UUID uuid() { @@ -94,6 +98,10 @@ public Optional indexDetails() { return Optional.ofNullable(indexDetails); } + public Optional baseId() { + return Optional.ofNullable(baseId); + } + /** * Get the index version. * @@ -124,12 +132,13 @@ public boolean equals(Object o) { && Objects.equals(name, index.name) && Arrays.equals(fragmentBitmap, index.fragmentBitmap) && Arrays.equals(indexDetails, index.indexDetails) - && Objects.equals(createdAt, index.createdAt); + && Objects.equals(createdAt, index.createdAt) + && Objects.equals(baseId, index.baseId); } @Override public int hashCode() { - int result = Objects.hash(uuid, fields, name, datasetVersion, indexVersion, createdAt); + int result = Objects.hash(uuid, fields, name, datasetVersion, indexVersion, createdAt, baseId); result = 31 * result + Arrays.hashCode(fragmentBitmap); result = 31 * result + Arrays.hashCode(indexDetails); return result; @@ -144,6 +153,7 @@ public String toString() { .append("datasetVersion", datasetVersion) .append("indexVersion", indexVersion) .append("createdAt", createdAt) + .append("baseId", baseId) .toString(); } @@ -166,6 +176,7 @@ public static class Builder { private byte[] indexDetails; private int indexVersion; private Instant createdAt; + private Integer baseId; private Builder() {} @@ -209,6 +220,11 @@ public Builder createdAt(Instant createdAt) { return this; } + public Builder baseId(Integer baseId) { + this.baseId = baseId; + return this; + } + public Index build() { return new Index( uuid, @@ -218,7 +234,8 @@ public Index build() { fragmentBitmap, indexDetails, indexVersion, - createdAt); + createdAt, + baseId); } } } diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 022db057df5..0f9f59d633f 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3339,6 +3339,7 @@ class Index: fragment_ids: Set[int] index_version: int created_at: Optional[datetime] = None + base_id: Optional[int] = None class AutoCleanupConfig(TypedDict): diff --git a/python/src/dataset.rs b/python/src/dataset.rs index fec421aca10..d47da7cd270 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -622,6 +622,8 @@ impl Dataset { dict.set_item("fields", field_names).unwrap(); dict.set_item("version", idx.dataset_version).unwrap(); dict.set_item("fragment_ids", fragment_set).unwrap(); + // Convert base_id from Option to Optional[int] for Python + dict.set_item("base_id", idx.base_id.map(|id| id as i64)).unwrap(); dict.into_py_any(py) }) .collect::>>() diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index dcbe0cb2c64..68eb6d2bf65 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -644,7 +644,7 @@ impl DatasetIndexExt for Dataset { index_details: last_idx.index_details.clone(), index_version: res.new_index_version, created_at: Some(chrono::Utc::now()), - base_id: last_idx.base_id, // Preserve base_id from the last delta + base_id: None, // Mew merged index file locates in the cloned dataset. }; removed_indices.extend(res.removed_indices.iter().map(|&idx| idx.clone())); if deltas.len() > removed_indices.len() { @@ -1167,7 +1167,7 @@ impl DatasetIndexInternalExt for Dataset { "IVF_PQ" => { let ivf = IVFIndex::::try_new( self.object_store.clone(), - self.indices_dir(), + index_dir, uuid.to_owned(), frag_reuse_index, self.metadata_cache.as_ref(), @@ -1180,7 +1180,7 @@ impl DatasetIndexInternalExt for Dataset { "IVF_SQ" => { let ivf = IVFIndex::::try_new( self.object_store.clone(), - self.indices_dir(), + index_dir, uuid.to_owned(), frag_reuse_index, self.metadata_cache.as_ref(), @@ -3139,14 +3139,14 @@ mod tests { } #[tokio::test] - async fn test_comprehensive_shallow_clone_with_indices() { + async fn test_shallow_clone_with_index() { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); let clone_dir = test_dir.path().join("clone"); let cloned_uri = clone_dir.to_str().unwrap(); // Create a schema with both vector and scalar columns - let dimensions = 16; + let dimensions = 16u32; let schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int32, false), Field::new("category", DataType::Utf8, false), @@ -3154,30 +3154,27 @@ mod tests { "vector", DataType::FixedSizeList( Arc::new(Field::new("item", DataType::Float32, true)), - dimensions, + dimensions as i32, ), false, ), ])); - // Generate test data - let float_arr = generate_random_array(300 * dimensions as usize); - let vectors = - Arc::new(FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap()); - let ids = Arc::new(Int32Array::from_iter_values(0..300)); - let categories = Arc::new(StringArray::from_iter_values( - (0..300).map(|i| format!("category_{}", i % 5)), - )); - - let batch = RecordBatch::try_new(schema.clone(), vec![ids, categories, vectors]).unwrap(); - - let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); + // Generate test data using lance_datagen (300 rows to satisfy PQ training requirements) + let data = gen_batch() + .col("id", array::step::()) + .col("category", array::fill_utf8("category_0".to_string())) + .col( + "vector", + array::rand_vec::(Dimension::from(dimensions)), + ) + .into_reader_rows(RowCount::from(300), BatchCount::from(1)); // Create initial dataset - let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + let mut dataset = Dataset::write(data, test_uri, None).await.unwrap(); println!("✅ Created initial dataset with 300 rows"); - // Create vector index + // Create vector index (IVF_PQ) let vector_params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10); dataset .create_index( @@ -3191,7 +3188,7 @@ mod tests { .unwrap(); println!("✅ Created vector index"); - // Create scalar index + // Create scalar index (BTree) dataset .create_index( &["category"], @@ -3212,6 +3209,22 @@ mod tests { assert!(index_names.contains("category_idx")); println!("✅ Verified indices creation"); + // Test scalar query on source dataset + let scalar_results = dataset + .scan() + .filter("category = 'category_0'") + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let source_scalar_query_rows = scalar_results.num_rows(); + assert!( + scalar_results.num_rows() > 0, + "Scalar query should return results" + ); + println!("✅ Scalar query works on source dataset"); + // Create tag for shallow cloning dataset .tags @@ -3248,11 +3261,6 @@ mod tests { "🔍 Cloned dataset base_paths: {:?}", cloned_dataset.manifest.base_paths ); - println!("🔍 Cloned dataset base path: {:?}", cloned_dataset.base); - println!( - "🔍 Cloned dataset indices_dir(): {:?}", - cloned_dataset.indices_dir() - ); println!("✅ Verified cloned dataset has indices"); @@ -3284,30 +3292,25 @@ mod tests { .await .unwrap(); - assert!( - scalar_results.num_rows() > 0, + assert_eq!( + source_scalar_query_rows, + scalar_results.num_rows(), "Scalar query should return results" ); println!("✅ Scalar query works on cloned dataset"); - // Append new data to cloned dataset - let new_float_arr = generate_random_array(50 * dimensions as usize); - let new_vectors = - Arc::new(FixedSizeListArray::try_new_from_values(new_float_arr, dimensions).unwrap()); - let new_ids = Arc::new(Int32Array::from_iter_values(300..350)); - let new_categories = Arc::new(StringArray::from_iter_values( - (300..350).map(|i| format!("new_category_{}", i % 3)), - )); - - let new_batch = - RecordBatch::try_new(schema.clone(), vec![new_ids, new_categories, new_vectors]) - .unwrap(); - - let new_reader = - RecordBatchIterator::new(vec![new_batch].into_iter().map(Ok), schema.clone()); + // Append new data to cloned dataset using lance_datagen + let new_data = gen_batch() + .col("id", array::step_custom::(300, 1)) + .col("category", array::fill_utf8("category_1".to_string())) + .col( + "vector", + array::rand_vec::(Dimension::from(dimensions)), + ) + .into_reader_rows(RowCount::from(50), BatchCount::from(1)); let mut updated_cloned_dataset = Dataset::write( - new_reader, + new_data, cloned_uri, Some(WriteParams { mode: WriteMode::Append, @@ -3318,220 +3321,21 @@ mod tests { .unwrap(); println!("✅ Appended new data to cloned dataset"); - // Verify row count increased - let total_rows = updated_cloned_dataset.count_rows(None).await.unwrap(); - assert_eq!(total_rows, 350, "Should have 350 rows after append"); - println!("✅ Verified row count after append"); - - // Call optimize_indices - updated_cloned_dataset - .optimize_indices(&OptimizeOptions::default()) - .await - .unwrap(); - println!("✅ Optimized indices"); - - // Test vector search after optimization (should find both old and new data) - let query_vector = generate_random_array(dimensions as usize); - - let optimized_search_results = updated_cloned_dataset - .scan() - .nearest("vector", &query_vector, 10) - .unwrap() - .limit(Some(10), None) - .unwrap() - .try_into_batch() - .await - .unwrap(); - - assert!( - optimized_search_results.num_rows() > 0, - "Vector search should work after optimization" - ); - println!("✅ Vector search works after optimization"); - - // Test scalar query after optimization (should find both old and new data) - let old_category_results = updated_cloned_dataset - .scan() - .filter("category = 'category_0'") - .unwrap() - .try_into_batch() - .await - .unwrap(); - - let new_category_results = updated_cloned_dataset + // Test scalar query on cloned dataset after appending + let scalar_results = cloned_dataset .scan() - .filter("category = 'new_category_0'") + .filter("category = 'category_1'") .unwrap() .try_into_batch() .await .unwrap(); - - assert!( - old_category_results.num_rows() > 0, - "Should find old category data" - ); - assert!( - new_category_results.num_rows() > 0, - "Should find new category data" - ); - println!("✅ Can query both old and new data after optimization"); - - // Verify index statistics - let vector_stats: serde_json::Value = serde_json::from_str( - &updated_cloned_dataset - .index_statistics("vector_idx") - .await - .unwrap(), - ) - .unwrap(); - let category_stats: serde_json::Value = serde_json::from_str( - &updated_cloned_dataset - .index_statistics("category_idx") - .await - .unwrap(), - ) - .unwrap(); - - assert_eq!(vector_stats["num_indexed_rows"].as_u64().unwrap(), 350); - assert_eq!(category_stats["num_indexed_rows"].as_u64().unwrap(), 350); - println!("✅ Index statistics show all rows are indexed"); - - println!("🎉 Comprehensive shallow clone with indices test passed!"); - } - - #[tokio::test] - async fn test_shallow_clone_with_index() { - let test_dir = tempdir().unwrap(); - let test_uri = test_dir.path().to_str().unwrap(); - let clone_dir = test_dir.path().join("clone"); - let cloned_uri = clone_dir.to_str().unwrap(); - - // Create a schema with both vector and scalar columns - let dimensions = 16; - let schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("category", DataType::Utf8, false), - Field::new( - "vector", - DataType::FixedSizeList( - Arc::new(Field::new("item", DataType::Float32, true)), - dimensions, - ), - false, - ), - ])); - - // Generate test data (300 rows to satisfy PQ training requirements) - let float_arr = generate_random_array(300 * dimensions as usize); - let vectors = - Arc::new(FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap()); - let ids = Arc::new(Int32Array::from_iter_values(0..300)); - let categories = Arc::new(StringArray::from_iter_values( - (0..300).map(|i| format!("category_{}", i % 5)), - )); - - let batch = RecordBatch::try_new(schema.clone(), vec![ids, categories, vectors]).unwrap(); - - let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); - - // Create initial dataset - let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); - println!("✅ Created initial dataset with 300 rows"); - - // Create vector index (IVF_PQ) - let vector_params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10); - dataset - .create_index( - &["vector"], - IndexType::Vector, - Some("vector_idx".to_string()), - &vector_params, - true, - ) - .await - .unwrap(); - println!("✅ Created vector index"); - - // Create scalar index (BTree) - dataset - .create_index( - &["category"], - IndexType::BTree, - Some("category_idx".to_string()), - &ScalarIndexParams::default(), - true, - ) - .await - .unwrap(); - println!("✅ Created scalar index"); - - // Verify indices were created - let indices = dataset.load_indices().await.unwrap(); - assert_eq!(indices.len(), 2, "Should have 2 indices"); - let index_names: HashSet = indices.iter().map(|idx| idx.name.clone()).collect(); - assert!(index_names.contains("vector_idx")); - assert!(index_names.contains("category_idx")); - println!("✅ Verified indices creation"); - - // Create tag for shallow cloning - dataset - .tags - .create("test_tag", dataset.version().version) - .await - .unwrap(); - - // Perform shallow clone - let cloned_dataset = dataset - .shallow_clone(cloned_uri, "test_tag", ObjectStoreParams::default()) - .await - .unwrap(); - println!("✅ Performed shallow clone"); - - // Verify cloned dataset has indices - let cloned_indices = cloned_dataset.load_indices().await.unwrap(); assert_eq!( - cloned_indices.len(), - 2, - "Cloned dataset should have 2 indices" - ); - let cloned_index_names: HashSet = - cloned_indices.iter().map(|idx| idx.name.clone()).collect(); - assert!(cloned_index_names.contains("vector_idx")); - assert!(cloned_index_names.contains("category_idx")); - - // Debug: Check base_id of cloned indices - for index in cloned_indices.iter() { - println!("🔍 Index '{}' has base_id: {:?}", index.name, index.base_id); - } - - // Debug: Check base_paths in cloned dataset - println!( - "🔍 Cloned dataset base_paths: {:?}", - cloned_dataset.manifest.base_paths + 0, + scalar_results.num_rows(), + "Scalar query should return results 0 before optimizing index" ); - println!("✅ Verified cloned dataset has indices"); - - // Test vector search on cloned dataset - let query_vector = generate_random_array(dimensions as usize); - - let search_results = cloned_dataset - .scan() - .nearest("vector", &query_vector, 5) - .unwrap() - .limit(Some(5), None) - .unwrap() - .try_into_batch() - .await - .unwrap(); - - assert!( - search_results.num_rows() > 0, - "Vector search should return results" - ); - println!("✅ Vector search works on cloned dataset"); - - // Test scalar query on cloned dataset + // Test scalar query on cloned dataset after appending let scalar_results = cloned_dataset .scan() .filter("category = 'category_0'") @@ -3539,41 +3343,14 @@ mod tests { .try_into_batch() .await .unwrap(); - - assert!( - scalar_results.num_rows() > 0, - "Scalar query should return results" + assert_eq!( + source_scalar_query_rows, + scalar_results.num_rows(), + "Scalar query should return {} results after cloning", + source_scalar_query_rows ); - println!("✅ Scalar query works on cloned dataset"); - - // Append new data to cloned dataset - let new_float_arr = generate_random_array(50 * dimensions as usize); - let new_vectors = - Arc::new(FixedSizeListArray::try_new_from_values(new_float_arr, dimensions).unwrap()); - let new_ids = Arc::new(Int32Array::from_iter_values(300..350)); - let new_categories = Arc::new(StringArray::from_iter_values( - (300..350).map(|i| format!("new_category_{}", i % 3)), - )); - - let new_batch = - RecordBatch::try_new(schema.clone(), vec![new_ids, new_categories, new_vectors]) - .unwrap(); - - let new_reader = - RecordBatchIterator::new(vec![new_batch].into_iter().map(Ok), schema.clone()); - - let mut updated_cloned_dataset = Dataset::write( - new_reader, - cloned_uri, - Some(WriteParams { - mode: WriteMode::Append, - ..Default::default() - }), - ) - .await - .unwrap(); - println!("✅ Appended new data to cloned dataset"); + println!("✅ Scalar query works on cloned dataset after append"); // Verify row count increased let total_rows = updated_cloned_dataset.count_rows(None).await.unwrap(); assert_eq!(total_rows, 350, "Should have 350 rows after append"); @@ -3616,15 +3393,17 @@ mod tests { let new_category_results = updated_cloned_dataset .scan() - .filter("category = 'new_category_0'") + .filter("category = 'category_1'") .unwrap() .try_into_batch() .await .unwrap(); - assert!( - old_category_results.num_rows() > 0, - "Should find old category data" + assert_eq!( + source_scalar_query_rows, + old_category_results.num_rows(), + "Should find old category data with {} rows", + source_scalar_query_rows ); assert!( new_category_results.num_rows() > 0, @@ -3652,6 +3431,6 @@ mod tests { assert_eq!(category_stats["num_indexed_rows"].as_u64().unwrap(), 350); println!("✅ Index statistics show all rows are indexed"); - println!("🎉 Comprehensive shallow clone index functionality test passed!"); + println!("🎉 Both vector and scalar indexes for shallow cloning testcases passed!"); } } diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index a910e6ff53e..996da799419 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -445,7 +445,14 @@ pub async fn open_scalar_index( metrics: &dyn MetricsCollector, ) -> Result> { let uuid_str = index.uuid.to_string(); - let index_store = Arc::new(LanceIndexStore::from_dataset(dataset, &uuid_str)); + let index_dir = dataset.indice_files_dir(index)?.child(uuid_str.as_str()); + let cache = dataset.metadata_cache.file_metadata_cache(&index_dir); + let index_store = Arc::new(LanceIndexStore::new( + dataset.object_store.clone(), + index_dir, + Arc::new(cache), + )); + let index_type = detect_scalar_index_type(dataset, index, column).await?; let frag_reuse_index = dataset.open_frag_reuse_index(metrics).await?; @@ -695,6 +702,7 @@ mod tests { index_details, index_version: 0, created_at: None, + base_id: None, } } From 46428d9e71addc94ed99b4b26d9425c97bf41934 Mon Sep 17 00:00:00 2001 From: majin1102 Date: Mon, 25 Aug 2025 14:53:23 +0800 Subject: [PATCH 04/13] fix ci issues --- java/core/src/main/java/com/lancedb/lance/index/Index.java | 1 - python/python/lance/dataset.py | 2 +- python/src/transaction.rs | 5 +++++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/java/core/src/main/java/com/lancedb/lance/index/Index.java b/java/core/src/main/java/com/lancedb/lance/index/Index.java index 9859fbd1613..0daad1f1359 100644 --- a/java/core/src/main/java/com/lancedb/lance/index/Index.java +++ b/java/core/src/main/java/com/lancedb/lance/index/Index.java @@ -13,7 +13,6 @@ */ package com.lancedb.lance.index; -import org.apache.arrow.flatbuf.Int; import org.apache.commons.lang3.builder.ToStringBuilder; import java.time.Instant; diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 0f9f59d633f..c80e917a5a6 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3676,7 +3676,7 @@ class Rewrite(BaseOperation): """ groups: Iterable[LanceOperation.RewriteGroup] - rewritten_indices: Iterable[LanceOperation.RewrittenIndex] + rewritten_indices: Iterable[LanceOperation.RewrfittenIndex] def __post_init__(self): all_frags = [old for group in self.groups for old in group.old_fragments] diff --git a/python/src/transaction.rs b/python/src/transaction.rs index da6e248070d..96caa29ef06 100644 --- a/python/src/transaction.rs +++ b/python/src/transaction.rs @@ -38,6 +38,8 @@ impl FromPyObject<'_> for PyLance { .map(|id| id.extract::()) .collect::>()?, ); + let base_id: Option = ob.getattr("base_id")?.extract::>()? + .map(|id| id as u32); Ok(Self(Index { uuid: Uuid::parse_str(&uuid).map_err(|e| PyValueError::new_err(e.to_string()))?, @@ -48,6 +50,7 @@ impl FromPyObject<'_> for PyLance { index_details: None, index_version, created_at, + base_id, })) } } @@ -78,6 +81,7 @@ impl<'py> IntoPyObject<'py> for PyLance<&Index> { }, ); let created_at = self.0.created_at; + let base_id = self.0.base_id.map(|id| id as i64); let cls = namespace .getattr("Index") @@ -90,6 +94,7 @@ impl<'py> IntoPyObject<'py> for PyLance<&Index> { fragment_ids, index_version, created_at, + base_id, )) } } From 4978dbc429edc664f1578460427bdaaf89d364dc Mon Sep 17 00:00:00 2001 From: majin1102 Date: Mon, 25 Aug 2025 15:29:04 +0800 Subject: [PATCH 05/13] fix ci issues --- rust/lance/src/index.rs | 45 ----------------------------------------- 1 file changed, 45 deletions(-) diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 68eb6d2bf65..be6caf07801 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -3147,19 +3147,6 @@ mod tests { // Create a schema with both vector and scalar columns let dimensions = 16u32; - let schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("category", DataType::Utf8, false), - Field::new( - "vector", - DataType::FixedSizeList( - Arc::new(Field::new("item", DataType::Float32, true)), - dimensions as i32, - ), - false, - ), - ])); - // Generate test data using lance_datagen (300 rows to satisfy PQ training requirements) let data = gen_batch() .col("id", array::step::()) @@ -3172,8 +3159,6 @@ mod tests { // Create initial dataset let mut dataset = Dataset::write(data, test_uri, None).await.unwrap(); - println!("✅ Created initial dataset with 300 rows"); - // Create vector index (IVF_PQ) let vector_params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10); dataset @@ -3186,7 +3171,6 @@ mod tests { ) .await .unwrap(); - println!("✅ Created vector index"); // Create scalar index (BTree) dataset @@ -3199,7 +3183,6 @@ mod tests { ) .await .unwrap(); - println!("✅ Created scalar index"); // Verify indices were created let indices = dataset.load_indices().await.unwrap(); @@ -3207,7 +3190,6 @@ mod tests { let index_names: HashSet = indices.iter().map(|idx| idx.name.clone()).collect(); assert!(index_names.contains("vector_idx")); assert!(index_names.contains("category_idx")); - println!("✅ Verified indices creation"); // Test scalar query on source dataset let scalar_results = dataset @@ -3223,7 +3205,6 @@ mod tests { scalar_results.num_rows() > 0, "Scalar query should return results" ); - println!("✅ Scalar query works on source dataset"); // Create tag for shallow cloning dataset @@ -3237,7 +3218,6 @@ mod tests { .shallow_clone(cloned_uri, "test_tag", ObjectStoreParams::default()) .await .unwrap(); - println!("✅ Performed shallow clone"); // Verify cloned dataset has indices let cloned_indices = cloned_dataset.load_indices().await.unwrap(); @@ -3251,22 +3231,8 @@ mod tests { assert!(cloned_index_names.contains("vector_idx")); assert!(cloned_index_names.contains("category_idx")); - // Debug: Check base_id of cloned indices - for index in cloned_indices.iter() { - println!("🔍 Index '{}' has base_id: {:?}", index.name, index.base_id); - } - - // Debug: Check base_paths in cloned dataset - println!( - "🔍 Cloned dataset base_paths: {:?}", - cloned_dataset.manifest.base_paths - ); - - println!("✅ Verified cloned dataset has indices"); - // Test vector search on cloned dataset let query_vector = generate_random_array(dimensions as usize); - let search_results = cloned_dataset .scan() .nearest("vector", &query_vector, 5) @@ -3281,7 +3247,6 @@ mod tests { search_results.num_rows() > 0, "Vector search should return results" ); - println!("✅ Vector search works on cloned dataset"); // Test scalar query on cloned dataset let scalar_results = cloned_dataset @@ -3297,7 +3262,6 @@ mod tests { scalar_results.num_rows(), "Scalar query should return results" ); - println!("✅ Scalar query works on cloned dataset"); // Append new data to cloned dataset using lance_datagen let new_data = gen_batch() @@ -3319,7 +3283,6 @@ mod tests { ) .await .unwrap(); - println!("✅ Appended new data to cloned dataset"); // Test scalar query on cloned dataset after appending let scalar_results = cloned_dataset @@ -3350,18 +3313,15 @@ mod tests { source_scalar_query_rows ); - println!("✅ Scalar query works on cloned dataset after append"); // Verify row count increased let total_rows = updated_cloned_dataset.count_rows(None).await.unwrap(); assert_eq!(total_rows, 350, "Should have 350 rows after append"); - println!("✅ Verified row count after append"); // Call optimize_indices updated_cloned_dataset .optimize_indices(&OptimizeOptions::default()) .await .unwrap(); - println!("✅ Optimized indices"); // Test vector search after optimization (should find both old and new data) let query_vector = generate_random_array(dimensions as usize); @@ -3380,7 +3340,6 @@ mod tests { optimized_search_results.num_rows() > 0, "Vector search should work after optimization" ); - println!("✅ Vector search works after optimization"); // Test scalar query after optimization (should find both old and new data) let old_category_results = updated_cloned_dataset @@ -3409,7 +3368,6 @@ mod tests { new_category_results.num_rows() > 0, "Should find new category data" ); - println!("✅ Can query both old and new data after optimization"); // Verify index statistics let vector_stats: serde_json::Value = serde_json::from_str( @@ -3429,8 +3387,5 @@ mod tests { assert_eq!(vector_stats["num_indexed_rows"].as_u64().unwrap(), 350); assert_eq!(category_stats["num_indexed_rows"].as_u64().unwrap(), 350); - println!("✅ Index statistics show all rows are indexed"); - - println!("🎉 Both vector and scalar indexes for shallow cloning testcases passed!"); } } From 92c42071be1c2fad0eaa4926605e90d63e10fa98 Mon Sep 17 00:00:00 2001 From: majin1102 Date: Tue, 26 Aug 2025 16:52:10 +0800 Subject: [PATCH 06/13] fix ci issues --- rust/lance/src/index/vector/ivf.rs | 84 ++++++++++++++++++++++++------ 1 file changed, 69 insertions(+), 15 deletions(-) diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 420784fee70..daae672ba79 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -2267,25 +2267,47 @@ mod tests { .await .unwrap(); - let index = dataset - .open_vector_index(WellKnownIvfPqData::COLUMN, &uuid_str, &NoOpMetricsCollector) - .await - .unwrap(); - let ivf_index = index.as_any().downcast_ref::().unwrap(); - + // After building the index file, we need to register the index metadata + // so that the dataset can find it when we try to open it + let field = dataset.schema().field(WellKnownIvfPqData::COLUMN).unwrap(); let index_meta = lance_table::format::Index { uuid, - dataset_version: 0, - fields: Vec::new(), + dataset_version: dataset.version().version, + fields: vec![field.id], name: INDEX_NAME.to_string(), - fragment_bitmap: None, + fragment_bitmap: Some(dataset.get_fragments().iter().map(|f| f.id() as u32).collect()), index_details: Some(vector_index_details()), - index_version: index.index_type().version(), - created_at: None, // Test index, not setting timestamp + index_version: 1, // Use version 1 for IVF PQ index + created_at: Some(chrono::Utc::now()), base_id: None, }; - let prefilter = Arc::new(DatasetPreFilter::new(dataset.clone(), &[index_meta], None)); + // We need to commit this index to the dataset so it can be found + use crate::dataset::transaction::{Transaction, Operation}; + let transaction = Transaction::new( + dataset.version().version, + Operation::CreateIndex { + new_indices: vec![index_meta.clone()], + removed_indices: vec![], + }, + None, + None, + ); + + // Apply the transaction to register the index + // Since dataset is Arc, we need to create a mutable reference + let mut dataset_mut = (*dataset).clone(); + dataset_mut.apply_commit(transaction, &Default::default(), &Default::default()) + .await + .unwrap(); + + let index = dataset_mut + .open_vector_index(WellKnownIvfPqData::COLUMN, &uuid_str, &NoOpMetricsCollector) + .await + .unwrap(); + + let ivf_index = index.as_any().downcast_ref::().unwrap(); + let prefilter = Arc::new(DatasetPreFilter::new(Arc::new(dataset_mut.clone()), &[index_meta.clone()], None)); let is_not_remapped = Some; let is_remapped = |row_id| Some(row_id + BIG_OFFSET); @@ -2313,10 +2335,10 @@ mod tests { let new_uuid_str = new_uuid.to_string(); remap_index_file( - &dataset, + &dataset_mut, &uuid_str, &new_uuid_str, - dataset.version().version, + dataset_mut.version().version, ivf_index, &mapping, INDEX_NAME.to_string(), @@ -2326,7 +2348,39 @@ mod tests { .await .unwrap(); - let remapped = dataset + + // After remapping the index file, we need to register the new index metadata + // so that the dataset can find it when we try to open it + let field = dataset_mut.schema().field(WellKnownIvfPqData::COLUMN).unwrap(); + let new_index_meta = lance_table::format::Index { + uuid: new_uuid, + dataset_version: dataset_mut.version().version, + fields: vec![field.id], + name: format!("{}_remapped", INDEX_NAME), + fragment_bitmap: Some(dataset_mut.get_fragments().iter().map(|f| f.id() as u32).collect()), + index_details: Some(vector_index_details()), + index_version: index.index_type().version(), + created_at: Some(chrono::Utc::now()), + base_id: None, + }; + + // We need to commit this new index to the dataset so it can be found + let transaction = Transaction::new( + dataset_mut.version().version, + Operation::CreateIndex { + new_indices: vec![new_index_meta], + removed_indices: vec![], + }, + None, + None, + ); + + // Apply the transaction to register the new index + dataset_mut.apply_commit(transaction, &Default::default(), &Default::default()) + .await + .unwrap(); + + let remapped = dataset_mut .open_vector_index( WellKnownIvfPqData::COLUMN, &new_uuid.to_string(), From 663ae51d1c30e7cf73d3af5d0e713ca5e1891da1 Mon Sep 17 00:00:00 2001 From: majin1102 Date: Tue, 26 Aug 2025 16:58:12 +0800 Subject: [PATCH 07/13] cargo fmt --- rust/lance/src/index/vector/ivf.rs | 36 +++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index daae672ba79..b34ebd264b2 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -2275,7 +2275,13 @@ mod tests { dataset_version: dataset.version().version, fields: vec![field.id], name: INDEX_NAME.to_string(), - fragment_bitmap: Some(dataset.get_fragments().iter().map(|f| f.id() as u32).collect()), + fragment_bitmap: Some( + dataset + .get_fragments() + .iter() + .map(|f| f.id() as u32) + .collect(), + ), index_details: Some(vector_index_details()), index_version: 1, // Use version 1 for IVF PQ index created_at: Some(chrono::Utc::now()), @@ -2283,7 +2289,7 @@ mod tests { }; // We need to commit this index to the dataset so it can be found - use crate::dataset::transaction::{Transaction, Operation}; + use crate::dataset::transaction::{Operation, Transaction}; let transaction = Transaction::new( dataset.version().version, Operation::CreateIndex { @@ -2297,7 +2303,8 @@ mod tests { // Apply the transaction to register the index // Since dataset is Arc, we need to create a mutable reference let mut dataset_mut = (*dataset).clone(); - dataset_mut.apply_commit(transaction, &Default::default(), &Default::default()) + dataset_mut + .apply_commit(transaction, &Default::default(), &Default::default()) .await .unwrap(); @@ -2307,7 +2314,11 @@ mod tests { .unwrap(); let ivf_index = index.as_any().downcast_ref::().unwrap(); - let prefilter = Arc::new(DatasetPreFilter::new(Arc::new(dataset_mut.clone()), &[index_meta.clone()], None)); + let prefilter = Arc::new(DatasetPreFilter::new( + Arc::new(dataset_mut.clone()), + &[index_meta.clone()], + None, + )); let is_not_remapped = Some; let is_remapped = |row_id| Some(row_id + BIG_OFFSET); @@ -2348,16 +2359,24 @@ mod tests { .await .unwrap(); - // After remapping the index file, we need to register the new index metadata // so that the dataset can find it when we try to open it - let field = dataset_mut.schema().field(WellKnownIvfPqData::COLUMN).unwrap(); + let field = dataset_mut + .schema() + .field(WellKnownIvfPqData::COLUMN) + .unwrap(); let new_index_meta = lance_table::format::Index { uuid: new_uuid, dataset_version: dataset_mut.version().version, fields: vec![field.id], name: format!("{}_remapped", INDEX_NAME), - fragment_bitmap: Some(dataset_mut.get_fragments().iter().map(|f| f.id() as u32).collect()), + fragment_bitmap: Some( + dataset_mut + .get_fragments() + .iter() + .map(|f| f.id() as u32) + .collect(), + ), index_details: Some(vector_index_details()), index_version: index.index_type().version(), created_at: Some(chrono::Utc::now()), @@ -2376,7 +2395,8 @@ mod tests { ); // Apply the transaction to register the new index - dataset_mut.apply_commit(transaction, &Default::default(), &Default::default()) + dataset_mut + .apply_commit(transaction, &Default::default(), &Default::default()) .await .unwrap(); From 36543091259b0457748fd1eda3b3b300b26c8b8a Mon Sep 17 00:00:00 2001 From: majin1102 Date: Tue, 26 Aug 2025 17:14:34 +0800 Subject: [PATCH 08/13] cargo fmt in python --- python/src/dataset.rs | 4 ++-- python/src/transaction.rs | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index d47da7cd270..f25a097391b 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -622,8 +622,8 @@ impl Dataset { dict.set_item("fields", field_names).unwrap(); dict.set_item("version", idx.dataset_version).unwrap(); dict.set_item("fragment_ids", fragment_set).unwrap(); - // Convert base_id from Option to Optional[int] for Python - dict.set_item("base_id", idx.base_id.map(|id| id as i64)).unwrap(); + dict.set_item("base_id", idx.base_id.map(|id| id as i64)) + .unwrap(); dict.into_py_any(py) }) .collect::>>() diff --git a/python/src/transaction.rs b/python/src/transaction.rs index 96caa29ef06..2baf07569b0 100644 --- a/python/src/transaction.rs +++ b/python/src/transaction.rs @@ -38,7 +38,9 @@ impl FromPyObject<'_> for PyLance { .map(|id| id.extract::()) .collect::>()?, ); - let base_id: Option = ob.getattr("base_id")?.extract::>()? + let base_id: Option = ob + .getattr("base_id")? + .extract::>()? .map(|id| id as u32); Ok(Self(Index { From 5aa69be61e9ea81cb2919bbbee990c01607e49e7 Mon Sep 17 00:00:00 2001 From: majin1102 Date: Tue, 26 Aug 2025 18:15:23 +0800 Subject: [PATCH 09/13] fix clippy --- rust/lance/src/index/vector/ivf.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index b34ebd264b2..5ac84ba6680 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -2316,7 +2316,7 @@ mod tests { let ivf_index = index.as_any().downcast_ref::().unwrap(); let prefilter = Arc::new(DatasetPreFilter::new( Arc::new(dataset_mut.clone()), - &[index_meta.clone()], + std::slice::from_ref(&index_meta), None, )); From 2e61bf4d80b197f24b1da68a11c2ff38add09fc0 Mon Sep 17 00:00:00 2001 From: majin1102 Date: Tue, 26 Aug 2025 19:41:46 +0800 Subject: [PATCH 10/13] optimize code --- rust/lance-table/src/format/index.rs | 47 ++++++++++++++-------------- rust/lance/src/index/vector/ivf.rs | 2 +- 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/rust/lance-table/src/format/index.rs b/rust/lance-table/src/format/index.rs index e0d28dfd7ac..d2dcb077c08 100644 --- a/rust/lance-table/src/format/index.rs +++ b/rust/lance-table/src/format/index.rs @@ -11,29 +11,6 @@ use uuid::Uuid; use super::pb; use lance_core::{Error, Result}; -impl Index { - pub fn effective_fragment_bitmap( - &self, - existing_fragments: &RoaringBitmap, - ) -> Option { - let fragment_bitmap = self.fragment_bitmap.as_ref()?; - Some(fragment_bitmap & existing_fragments) - } -} - -impl DeepSizeOf for Index { - fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize { - self.uuid.as_bytes().deep_size_of_children(context) - + self.fields.deep_size_of_children(context) - + self.name.deep_size_of_children(context) - + self.dataset_version.deep_size_of_children(context) - + self - .fragment_bitmap - .as_ref() - .map(|fragment_bitmap| fragment_bitmap.serialized_size()) - .unwrap_or(0) - } -} /// Index metadata #[derive(Debug, Clone, PartialEq)] @@ -77,6 +54,30 @@ pub struct Index { pub base_id: Option, } +impl Index { + pub fn effective_fragment_bitmap( + &self, + existing_fragments: &RoaringBitmap, + ) -> Option { + let fragment_bitmap = self.fragment_bitmap.as_ref()?; + Some(fragment_bitmap & existing_fragments) + } +} + +impl DeepSizeOf for Index { + fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize { + self.uuid.as_bytes().deep_size_of_children(context) + + self.fields.deep_size_of_children(context) + + self.name.deep_size_of_children(context) + + self.dataset_version.deep_size_of_children(context) + + self + .fragment_bitmap + .as_ref() + .map(|fragment_bitmap| fragment_bitmap.serialized_size()) + .unwrap_or(0) + } +} + impl TryFrom for Index { type Error = Error; diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 5ac84ba6680..21716f4c872 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -2288,7 +2288,7 @@ mod tests { base_id: None, }; - // We need to commit this index to the dataset so it can be found + // We need to commit this index to the dataset so that it can be found use crate::dataset::transaction::{Operation, Transaction}; let transaction = Transaction::new( dataset.version().version, From 71cc7946a3d75e4684907b64405b78a60fcddd30 Mon Sep 17 00:00:00 2001 From: majin1102 Date: Tue, 26 Aug 2025 19:46:01 +0800 Subject: [PATCH 11/13] cargo fmt --all --- rust/lance-table/src/format/index.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/lance-table/src/format/index.rs b/rust/lance-table/src/format/index.rs index d2dcb077c08..053e1974f43 100644 --- a/rust/lance-table/src/format/index.rs +++ b/rust/lance-table/src/format/index.rs @@ -71,10 +71,10 @@ impl DeepSizeOf for Index { + self.name.deep_size_of_children(context) + self.dataset_version.deep_size_of_children(context) + self - .fragment_bitmap - .as_ref() - .map(|fragment_bitmap| fragment_bitmap.serialized_size()) - .unwrap_or(0) + .fragment_bitmap + .as_ref() + .map(|fragment_bitmap| fragment_bitmap.serialized_size()) + .unwrap_or(0) } } From e2d9686dc926396d53d6d58530e6db7c2617fe61 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 27 Aug 2025 09:53:45 -0700 Subject: [PATCH 12/13] update test to verify new index location --- rust/lance/src/index.rs | 79 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index be6caf07801..2668ff4ff7c 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -3317,12 +3317,91 @@ mod tests { let total_rows = updated_cloned_dataset.count_rows(None).await.unwrap(); assert_eq!(total_rows, 350, "Should have 350 rows after append"); + // Store indices before optimization for comparison + let indices_before_optimize = updated_cloned_dataset.load_indices().await.unwrap(); + let vector_idx_before = indices_before_optimize + .iter() + .find(|idx| idx.name == "vector_idx") + .unwrap(); + let category_idx_before = indices_before_optimize + .iter() + .find(|idx| idx.name == "category_idx") + .unwrap(); + // Call optimize_indices updated_cloned_dataset .optimize_indices(&OptimizeOptions::default()) .await .unwrap(); + // Critical test: Verify new indices are created in the cloned dataset location + let optimized_indices = updated_cloned_dataset.load_indices().await.unwrap(); + + // Find the new index metadata after optimization + let new_vector_idx = optimized_indices + .iter() + .find(|idx| idx.name == "vector_idx") + .unwrap(); + let new_category_idx = optimized_indices + .iter() + .find(|idx| idx.name == "category_idx") + .unwrap(); + + // The UUIDs should be different after optimization (new indices were created) + assert_ne!( + new_vector_idx.uuid, vector_idx_before.uuid, + "Vector index should have a new UUID after optimization" + ); + assert_ne!( + new_category_idx.uuid, category_idx_before.uuid, + "Category index should have a new UUID after optimization" + ); + + // Verify the new index files are in the cloned dataset's directory + use std::path::PathBuf; + let clone_indices_dir = PathBuf::from(cloned_uri).join("_indices"); + let vector_index_dir = clone_indices_dir.join(new_vector_idx.uuid.to_string()); + let category_index_dir = clone_indices_dir.join(new_category_idx.uuid.to_string()); + + assert!( + vector_index_dir.exists(), + "New vector index directory should exist in cloned dataset location: {:?}", + vector_index_dir + ); + assert!( + category_index_dir.exists(), + "New category index directory should exist in cloned dataset location: {:?}", + category_index_dir + ); + + // Verify that the new indices do NOT have base_id set (they're local to the cloned dataset) + assert!( + new_vector_idx.base_id.is_none(), + "New vector index should not have base_id after optimization in cloned dataset" + ); + assert!( + new_category_idx.base_id.is_none(), + "New category index should not have base_id after optimization in cloned dataset" + ); + + // Also verify the original dataset's index directories are NOT modified + let original_indices_dir = PathBuf::from(test_uri).join("_indices"); + + // The new index UUIDs should NOT exist in the original dataset's directory + let wrong_vector_dir = original_indices_dir.join(new_vector_idx.uuid.to_string()); + let wrong_category_dir = original_indices_dir.join(new_category_idx.uuid.to_string()); + + assert!( + !wrong_vector_dir.exists(), + "New vector index should NOT be in original dataset location: {:?}", + wrong_vector_dir + ); + assert!( + !wrong_category_dir.exists(), + "New category index should NOT be in original dataset location: {:?}", + wrong_category_dir + ); + // Test vector search after optimization (should find both old and new data) let query_vector = generate_random_array(dimensions as usize); From b2fa7fb6d938000711d48c78ce674fe1a68fd9e9 Mon Sep 17 00:00:00 2001 From: majin1102 Date: Thu, 28 Aug 2025 15:34:43 +0800 Subject: [PATCH 13/13] add docs for Manifest.shallow_clone --- rust/lance-table/src/format/manifest.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index 37b5b73328a..b4728068dca 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -185,6 +185,9 @@ impl Manifest { } } + /// Performs a shallow_clone of the manifest entirely in memory without: + /// - Any persistent storage operations + /// - Modifications to the original data pub fn shallow_clone( &self, ref_name: Option,