diff --git a/java/core/lance-jni/src/transaction.rs b/java/core/lance-jni/src/transaction.rs index 29dcdf7d651..02aca88cf49 100644 --- a/java/core/lance-jni/src/transaction.rs +++ b/java/core/lance-jni/src/transaction.rs @@ -119,10 +119,17 @@ impl IntoJava for Index { JObject::null() }; + // Convert base_id from Option to Integer for Java + let base_id = if let Some(id) = self.base_id { + env.new_object("java/lang/Integer", "(I)V", &[JValue::Int(id as i32)])? + } else { + JObject::null() + }; + // Create Index object Ok(env.new_object( "com/lancedb/lance/index/Index", - "(Ljava/util/UUID;Ljava/util/List;Ljava/lang/String;J[B[BILjava/time/Instant;)V", + "(Ljava/util/UUID;Ljava/util/List;Ljava/lang/String;J[B[BILjava/time/Instant;Ljava/lang/Integer;)V", &[ JValue::Object(&uuid), JValue::Object(&fields), @@ -132,6 +139,7 @@ impl IntoJava for Index { JValue::Object(&index_details), JValue::Int(self.index_version), JValue::Object(&created_at), + JValue::Object(&base_id), ], )?) } @@ -232,6 +240,13 @@ impl FromJObjectWithEnv for JObject<'_> { .i()? as u32; Some(DateTime::from_timestamp(seconds, nanos).unwrap()) }; + let base_id_obj = env.get_field(self, "baseId", "Ljava/lang/Integer;")?.l()?; + let base_id = if base_id_obj.is_null() { + None + } else { + let id_value = env.call_method(&base_id_obj, "intValue", "()I", &[])?.i()? as u32; + Some(id_value) + }; Ok(Index { uuid, @@ -242,6 +257,7 @@ impl FromJObjectWithEnv for JObject<'_> { index_details, index_version, created_at, + base_id, }) } } diff --git a/java/core/src/main/java/com/lancedb/lance/index/Index.java b/java/core/src/main/java/com/lancedb/lance/index/Index.java index 0cce9c14eb6..0daad1f1359 100644 --- a/java/core/src/main/java/com/lancedb/lance/index/Index.java +++ b/java/core/src/main/java/com/lancedb/lance/index/Index.java @@ -35,6 +35,7 @@ public class Index { private final byte[] indexDetails; private final int indexVersion; private final Instant createdAt; + private final Integer baseId; private Index( UUID uuid, @@ -44,7 +45,8 @@ private Index( byte[] fragmentBitmap, byte[] indexDetails, int indexVersion, - Instant createdAt) { + Instant createdAt, + Integer baseId) { this.uuid = uuid; this.fields = fields; this.name = name; @@ -53,6 +55,7 @@ private Index( this.indexDetails = indexDetails; this.indexVersion = indexVersion; this.createdAt = createdAt; + this.baseId = baseId; } public UUID uuid() { @@ -94,6 +97,10 @@ public Optional indexDetails() { return Optional.ofNullable(indexDetails); } + public Optional baseId() { + return Optional.ofNullable(baseId); + } + /** * Get the index version. * @@ -124,12 +131,13 @@ public boolean equals(Object o) { && Objects.equals(name, index.name) && Arrays.equals(fragmentBitmap, index.fragmentBitmap) && Arrays.equals(indexDetails, index.indexDetails) - && Objects.equals(createdAt, index.createdAt); + && Objects.equals(createdAt, index.createdAt) + && Objects.equals(baseId, index.baseId); } @Override public int hashCode() { - int result = Objects.hash(uuid, fields, name, datasetVersion, indexVersion, createdAt); + int result = Objects.hash(uuid, fields, name, datasetVersion, indexVersion, createdAt, baseId); result = 31 * result + Arrays.hashCode(fragmentBitmap); result = 31 * result + Arrays.hashCode(indexDetails); return result; @@ -144,6 +152,7 @@ public String toString() { .append("datasetVersion", datasetVersion) .append("indexVersion", indexVersion) .append("createdAt", createdAt) + .append("baseId", baseId) .toString(); } @@ -166,6 +175,7 @@ public static class Builder { private byte[] indexDetails; private int indexVersion; private Instant createdAt; + private Integer baseId; private Builder() {} @@ -209,6 +219,11 @@ public Builder createdAt(Instant createdAt) { return this; } + public Builder baseId(Integer baseId) { + this.baseId = baseId; + return this; + } + public Index build() { return new Index( uuid, @@ -218,7 +233,8 @@ public Index build() { fragmentBitmap, indexDetails, indexVersion, - createdAt); + createdAt, + baseId); } } } diff --git a/protos/table.proto b/protos/table.proto index 70a6459cbd8..5e9c3998613 100644 --- a/protos/table.proto +++ b/protos/table.proto @@ -224,6 +224,10 @@ message IndexMetadata { // This field is optional for backward compatibility. For existing indices created before // this field was added, this will be None/null. optional uint64 created_at = 8; + + // The base path index of the data file. Used when the file is imported or referred from another dataset. + // Lance use it as key of the base_paths field in Manifest to determine the actual base path of the data file. + optional uint32 base_id = 9; } // Index Section, containing a list of index metadata for one dataset version. diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index b18604bc62e..163eb724c35 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3340,6 +3340,7 @@ class Index: fragment_ids: Set[int] index_version: int created_at: Optional[datetime] = None + base_id: Optional[int] = None class AutoCleanupConfig(TypedDict): @@ -3676,7 +3677,7 @@ class Rewrite(BaseOperation): """ groups: Iterable[LanceOperation.RewriteGroup] - rewritten_indices: Iterable[LanceOperation.RewrittenIndex] + rewritten_indices: Iterable[LanceOperation.RewrfittenIndex] def __post_init__(self): all_frags = [old for group in self.groups for old in group.old_fragments] diff --git a/python/src/dataset.rs b/python/src/dataset.rs index fec421aca10..f25a097391b 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -622,6 +622,8 @@ impl Dataset { dict.set_item("fields", field_names).unwrap(); dict.set_item("version", idx.dataset_version).unwrap(); dict.set_item("fragment_ids", fragment_set).unwrap(); + dict.set_item("base_id", idx.base_id.map(|id| id as i64)) + .unwrap(); dict.into_py_any(py) }) .collect::>>() diff --git a/python/src/transaction.rs b/python/src/transaction.rs index da6e248070d..2baf07569b0 100644 --- a/python/src/transaction.rs +++ b/python/src/transaction.rs @@ -38,6 +38,10 @@ impl FromPyObject<'_> for PyLance { .map(|id| id.extract::()) .collect::>()?, ); + let base_id: Option = ob + .getattr("base_id")? + .extract::>()? + .map(|id| id as u32); Ok(Self(Index { uuid: Uuid::parse_str(&uuid).map_err(|e| PyValueError::new_err(e.to_string()))?, @@ -48,6 +52,7 @@ impl FromPyObject<'_> for PyLance { index_details: None, index_version, created_at, + base_id, })) } } @@ -78,6 +83,7 @@ impl<'py> IntoPyObject<'py> for PyLance<&Index> { }, ); let created_at = self.0.created_at; + let base_id = self.0.base_id.map(|id| id as i64); let cls = namespace .getattr("Index") @@ -90,6 +96,7 @@ impl<'py> IntoPyObject<'py> for PyLance<&Index> { fragment_ids, index_version, created_at, + base_id, )) } } diff --git a/rust/lance-table/src/format/index.rs b/rust/lance-table/src/format/index.rs index 021f1d5ba29..053e1974f43 100644 --- a/rust/lance-table/src/format/index.rs +++ b/rust/lance-table/src/format/index.rs @@ -11,6 +11,7 @@ use uuid::Uuid; use super::pb; use lance_core::{Error, Result}; + /// Index metadata #[derive(Debug, Clone, PartialEq)] pub struct Index { @@ -47,6 +48,10 @@ pub struct Index { /// This field is optional for backward compatibility. For existing indices created before /// this field was added, this will be None. pub created_at: Option>, + + /// The base path index of the index files. Used when the index is imported or referred from another dataset. + /// Lance uses it as key of the base_paths field in Manifest to determine the actual base path of the index files. + pub base_id: Option, } impl Index { @@ -102,6 +107,7 @@ impl TryFrom for Index { DateTime::from_timestamp_millis(ts as i64) .expect("Invalid timestamp in index metadata") }), + base_id: proto.base_id, }) } } @@ -127,6 +133,7 @@ impl From<&Index> for pb::IndexMetadata { index_details: idx.index_details.clone(), index_version: Some(idx.index_version), created_at: idx.created_at.map(|dt| dt.timestamp_millis() as u64), + base_id: idx.base_id, } } } diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index a20d4fae1b9..b4728068dca 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -185,13 +185,16 @@ impl Manifest { } } + /// Performs a shallow_clone of the manifest entirely in memory without: + /// - Any persistent storage operations + /// - Modifications to the original data pub fn shallow_clone( &self, ref_name: Option, ref_path: String, + ref_base_id: u32, transaction_file: String, ) -> Self { - let new_base_id = self.base_paths.keys().max().map(|id| *id + 1).unwrap_or(0); let cloned_fragments = self .fragments .as_ref() @@ -202,13 +205,13 @@ impl Manifest { .files .into_iter() .map(|mut file| { - file.base_id = Some(new_base_id); + file.base_id = Some(ref_base_id); file }) .collect(); if let Some(mut deletion) = cloned_fragment.deletion_file.take() { - deletion.base_id = Some(new_base_id); + deletion.base_id = Some(ref_base_id); cloned_fragment.deletion_file = Some(deletion); } @@ -223,12 +226,11 @@ impl Manifest { writer_version: self.writer_version.clone(), fragments: Arc::new(cloned_fragments), version_aux_data: self.version_aux_data, - // TODO: apply shallow clone to indexes - index_section: None, + index_section: None, // These will be set on commit timestamp_nanos: self.timestamp_nanos, - reader_feature_flags: self.reader_feature_flags, tag: None, - writer_feature_flags: self.writer_feature_flags, + reader_feature_flags: 0, // These will be set on commit + writer_feature_flags: 0, // These will be set on commit max_fragment_id: self.max_fragment_id, transaction_file: Some(transaction_file), fragment_offsets: self.fragment_offsets.clone(), @@ -239,12 +241,12 @@ impl Manifest { base_paths: { let mut base_paths = self.base_paths.clone(); let base_path = BasePath { - id: new_base_id, + id: ref_base_id, name: ref_name, is_dataset_root: true, path: ref_path, }; - base_paths.insert(new_base_id, base_path); + base_paths.insert(ref_base_id, base_path); base_paths }, } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index b36bc16dde9..8eb396988fd 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1275,6 +1275,32 @@ impl Dataset { } } + /// Get the indices directory for a specific index, considering its base_id + pub(crate) fn indice_files_dir(&self, index: &Index) -> Result { + match index.base_id.as_ref() { + Some(base_id) => { + let base_paths = &self.manifest.base_paths; + let base_path = base_paths.get(base_id).ok_or_else(|| { + Error::invalid_input( + format!( + "base_path id {} not found for index {}", + base_id, index.uuid + ), + location!(), + ) + })?; + let path = Path::parse(base_path.path.as_str())?; + if base_path.is_dataset_root { + Ok(path.child(INDICES_DIR)) + } else { + // For non-dataset-root base paths, we assume the path already points to the indices directory + Ok(path) + } + } + None => Ok(self.base.child(INDICES_DIR)), + } + } + pub fn session(&self) -> Arc { self.session.clone() } diff --git a/rust/lance/src/dataset/optimize/remapping.rs b/rust/lance/src/dataset/optimize/remapping.rs index d38adc8dc81..06796a27ab7 100644 --- a/rust/lance/src/dataset/optimize/remapping.rs +++ b/rust/lance/src/dataset/optimize/remapping.rs @@ -272,6 +272,7 @@ async fn remap_index(dataset: &mut Dataset, index_id: &Uuid) -> Result<()> { index_details: curr_index_meta.index_details.clone(), index_version: curr_index_meta.index_version, created_at: curr_index_meta.created_at, + base_id: None, }; let transaction = Transaction::new( diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 06a8f0b5f60..2668ff4ff7c 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -529,6 +529,7 @@ impl DatasetIndexExt for Dataset { index_details: None, index_version: 0, created_at: Some(chrono::Utc::now()), + base_id: None, // New indices don't have base_id (they're not from shallow clone) }; let transaction = Transaction::new( @@ -643,6 +644,7 @@ impl DatasetIndexExt for Dataset { index_details: last_idx.index_details.clone(), index_version: res.new_index_version, created_at: Some(chrono::Utc::now()), + base_id: None, // Mew merged index file locates in the cloned dataset. }; removed_indices.extend(res.removed_indices.iter().map(|&idx| idx.clone())); if deltas.len() > removed_indices.len() { @@ -978,8 +980,12 @@ impl DatasetIndexInternalExt for Dataset { // scalar indices, we may start having this file with scalar indices too. Once that happens // we can just read this file and look at the `implementation` or `index_type` fields to // determine what kind of index it is. - let index_dir = self.indices_dir().child(uuid); - let index_file = index_dir.child(INDEX_FILE_NAME); + let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index { + message: format!("Index with id {} does not exist", uuid), + location: location!(), + })?; + let index_dir = self.indice_files_dir(&index_meta)?; + let index_file = index_dir.child(uuid).child(INDEX_FILE_NAME); if self.object_store.exists(&index_file).await? { let index = self.open_vector_index(column, uuid, metrics).await?; Ok(index.as_index()) @@ -1032,8 +1038,12 @@ impl DatasetIndexInternalExt for Dataset { } let frag_reuse_index = self.open_frag_reuse_index(metrics).await?; - let index_dir = self.indices_dir().child(uuid); - let index_file = index_dir.child(INDEX_FILE_NAME); + let index_meta = self.load_index(uuid).await?.ok_or_else(|| Error::Index { + message: format!("Index with id {} does not exist", uuid), + location: location!(), + })?; + let index_dir = self.indice_files_dir(&index_meta)?; + let index_file = index_dir.child(uuid).child(INDEX_FILE_NAME); let reader: Arc = self.object_store.open(&index_file).await?.into(); let tailing_bytes = read_last_block(reader.as_ref()).await?; @@ -1124,7 +1134,7 @@ impl DatasetIndexInternalExt for Dataset { DataType::Float16 | DataType::Float32 | DataType::Float64 => { let ivf = IVFIndex::::try_new( self.object_store.clone(), - self.indices_dir(), + index_dir, uuid.to_owned(), frag_reuse_index, self.metadata_cache.as_ref(), @@ -1136,7 +1146,7 @@ impl DatasetIndexInternalExt for Dataset { DataType::UInt8 => { let ivf = IVFIndex::::try_new( self.object_store.clone(), - self.indices_dir(), + index_dir, uuid.to_owned(), frag_reuse_index, self.metadata_cache.as_ref(), @@ -1157,7 +1167,7 @@ impl DatasetIndexInternalExt for Dataset { "IVF_PQ" => { let ivf = IVFIndex::::try_new( self.object_store.clone(), - self.indices_dir(), + index_dir, uuid.to_owned(), frag_reuse_index, self.metadata_cache.as_ref(), @@ -1170,7 +1180,7 @@ impl DatasetIndexInternalExt for Dataset { "IVF_SQ" => { let ivf = IVFIndex::::try_new( self.object_store.clone(), - self.indices_dir(), + index_dir, uuid.to_owned(), frag_reuse_index, self.metadata_cache.as_ref(), @@ -1181,12 +1191,12 @@ impl DatasetIndexInternalExt for Dataset { } "IVF_HNSW_FLAT" => { - let uri = self.indices_dir().child(uuid).child("index.pb"); + let uri = index_dir.child(uuid).child("index.pb"); let file_metadata_cache = self.session.metadata_cache.file_metadata_cache(&uri); let ivf = IVFIndex::::try_new( self.object_store.clone(), - self.indices_dir(), + index_dir, uuid.to_owned(), frag_reuse_index, &file_metadata_cache, @@ -1199,7 +1209,7 @@ impl DatasetIndexInternalExt for Dataset { "IVF_HNSW_SQ" => { let ivf = IVFIndex::::try_new( self.object_store.clone(), - self.indices_dir(), + index_dir, uuid.to_owned(), frag_reuse_index, self.metadata_cache.as_ref(), @@ -1212,7 +1222,7 @@ impl DatasetIndexInternalExt for Dataset { "IVF_HNSW_PQ" => { let ivf = IVFIndex::::try_new( self.object_store.clone(), - self.indices_dir(), + index_dir, uuid.to_owned(), frag_reuse_index, self.metadata_cache.as_ref(), @@ -1467,7 +1477,7 @@ fn is_vector_field(data_type: DataType) -> bool { mod tests { use crate::dataset::builder::DatasetBuilder; use crate::dataset::optimize::{compact_files, CompactionOptions}; - use crate::dataset::{ReadParams, WriteParams}; + use crate::dataset::{ReadParams, WriteMode, WriteParams}; use crate::index::vector::VectorIndexParams; use crate::session::Session; use crate::utils::test::{ @@ -3127,4 +3137,334 @@ mod tests { "Index should have zero unindexed rows after optimization" ); } + + #[tokio::test] + async fn test_shallow_clone_with_index() { + let test_dir = tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + let clone_dir = test_dir.path().join("clone"); + let cloned_uri = clone_dir.to_str().unwrap(); + + // Create a schema with both vector and scalar columns + let dimensions = 16u32; + // Generate test data using lance_datagen (300 rows to satisfy PQ training requirements) + let data = gen_batch() + .col("id", array::step::()) + .col("category", array::fill_utf8("category_0".to_string())) + .col( + "vector", + array::rand_vec::(Dimension::from(dimensions)), + ) + .into_reader_rows(RowCount::from(300), BatchCount::from(1)); + + // Create initial dataset + let mut dataset = Dataset::write(data, test_uri, None).await.unwrap(); + // Create vector index (IVF_PQ) + let vector_params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10); + dataset + .create_index( + &["vector"], + IndexType::Vector, + Some("vector_idx".to_string()), + &vector_params, + true, + ) + .await + .unwrap(); + + // Create scalar index (BTree) + dataset + .create_index( + &["category"], + IndexType::BTree, + Some("category_idx".to_string()), + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + // Verify indices were created + let indices = dataset.load_indices().await.unwrap(); + assert_eq!(indices.len(), 2, "Should have 2 indices"); + let index_names: HashSet = indices.iter().map(|idx| idx.name.clone()).collect(); + assert!(index_names.contains("vector_idx")); + assert!(index_names.contains("category_idx")); + + // Test scalar query on source dataset + let scalar_results = dataset + .scan() + .filter("category = 'category_0'") + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let source_scalar_query_rows = scalar_results.num_rows(); + assert!( + scalar_results.num_rows() > 0, + "Scalar query should return results" + ); + + // Create tag for shallow cloning + dataset + .tags + .create("test_tag", dataset.version().version) + .await + .unwrap(); + + // Perform shallow clone + let cloned_dataset = dataset + .shallow_clone(cloned_uri, "test_tag", ObjectStoreParams::default()) + .await + .unwrap(); + + // Verify cloned dataset has indices + let cloned_indices = cloned_dataset.load_indices().await.unwrap(); + assert_eq!( + cloned_indices.len(), + 2, + "Cloned dataset should have 2 indices" + ); + let cloned_index_names: HashSet = + cloned_indices.iter().map(|idx| idx.name.clone()).collect(); + assert!(cloned_index_names.contains("vector_idx")); + assert!(cloned_index_names.contains("category_idx")); + + // Test vector search on cloned dataset + let query_vector = generate_random_array(dimensions as usize); + let search_results = cloned_dataset + .scan() + .nearest("vector", &query_vector, 5) + .unwrap() + .limit(Some(5), None) + .unwrap() + .try_into_batch() + .await + .unwrap(); + + assert!( + search_results.num_rows() > 0, + "Vector search should return results" + ); + + // Test scalar query on cloned dataset + let scalar_results = cloned_dataset + .scan() + .filter("category = 'category_0'") + .unwrap() + .try_into_batch() + .await + .unwrap(); + + assert_eq!( + source_scalar_query_rows, + scalar_results.num_rows(), + "Scalar query should return results" + ); + + // Append new data to cloned dataset using lance_datagen + let new_data = gen_batch() + .col("id", array::step_custom::(300, 1)) + .col("category", array::fill_utf8("category_1".to_string())) + .col( + "vector", + array::rand_vec::(Dimension::from(dimensions)), + ) + .into_reader_rows(RowCount::from(50), BatchCount::from(1)); + + let mut updated_cloned_dataset = Dataset::write( + new_data, + cloned_uri, + Some(WriteParams { + mode: WriteMode::Append, + ..Default::default() + }), + ) + .await + .unwrap(); + + // Test scalar query on cloned dataset after appending + let scalar_results = cloned_dataset + .scan() + .filter("category = 'category_1'") + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!( + 0, + scalar_results.num_rows(), + "Scalar query should return results 0 before optimizing index" + ); + + // Test scalar query on cloned dataset after appending + let scalar_results = cloned_dataset + .scan() + .filter("category = 'category_0'") + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!( + source_scalar_query_rows, + scalar_results.num_rows(), + "Scalar query should return {} results after cloning", + source_scalar_query_rows + ); + + // Verify row count increased + let total_rows = updated_cloned_dataset.count_rows(None).await.unwrap(); + assert_eq!(total_rows, 350, "Should have 350 rows after append"); + + // Store indices before optimization for comparison + let indices_before_optimize = updated_cloned_dataset.load_indices().await.unwrap(); + let vector_idx_before = indices_before_optimize + .iter() + .find(|idx| idx.name == "vector_idx") + .unwrap(); + let category_idx_before = indices_before_optimize + .iter() + .find(|idx| idx.name == "category_idx") + .unwrap(); + + // Call optimize_indices + updated_cloned_dataset + .optimize_indices(&OptimizeOptions::default()) + .await + .unwrap(); + + // Critical test: Verify new indices are created in the cloned dataset location + let optimized_indices = updated_cloned_dataset.load_indices().await.unwrap(); + + // Find the new index metadata after optimization + let new_vector_idx = optimized_indices + .iter() + .find(|idx| idx.name == "vector_idx") + .unwrap(); + let new_category_idx = optimized_indices + .iter() + .find(|idx| idx.name == "category_idx") + .unwrap(); + + // The UUIDs should be different after optimization (new indices were created) + assert_ne!( + new_vector_idx.uuid, vector_idx_before.uuid, + "Vector index should have a new UUID after optimization" + ); + assert_ne!( + new_category_idx.uuid, category_idx_before.uuid, + "Category index should have a new UUID after optimization" + ); + + // Verify the new index files are in the cloned dataset's directory + use std::path::PathBuf; + let clone_indices_dir = PathBuf::from(cloned_uri).join("_indices"); + let vector_index_dir = clone_indices_dir.join(new_vector_idx.uuid.to_string()); + let category_index_dir = clone_indices_dir.join(new_category_idx.uuid.to_string()); + + assert!( + vector_index_dir.exists(), + "New vector index directory should exist in cloned dataset location: {:?}", + vector_index_dir + ); + assert!( + category_index_dir.exists(), + "New category index directory should exist in cloned dataset location: {:?}", + category_index_dir + ); + + // Verify that the new indices do NOT have base_id set (they're local to the cloned dataset) + assert!( + new_vector_idx.base_id.is_none(), + "New vector index should not have base_id after optimization in cloned dataset" + ); + assert!( + new_category_idx.base_id.is_none(), + "New category index should not have base_id after optimization in cloned dataset" + ); + + // Also verify the original dataset's index directories are NOT modified + let original_indices_dir = PathBuf::from(test_uri).join("_indices"); + + // The new index UUIDs should NOT exist in the original dataset's directory + let wrong_vector_dir = original_indices_dir.join(new_vector_idx.uuid.to_string()); + let wrong_category_dir = original_indices_dir.join(new_category_idx.uuid.to_string()); + + assert!( + !wrong_vector_dir.exists(), + "New vector index should NOT be in original dataset location: {:?}", + wrong_vector_dir + ); + assert!( + !wrong_category_dir.exists(), + "New category index should NOT be in original dataset location: {:?}", + wrong_category_dir + ); + + // Test vector search after optimization (should find both old and new data) + let query_vector = generate_random_array(dimensions as usize); + + let optimized_search_results = updated_cloned_dataset + .scan() + .nearest("vector", &query_vector, 10) + .unwrap() + .limit(Some(10), None) + .unwrap() + .try_into_batch() + .await + .unwrap(); + + assert!( + optimized_search_results.num_rows() > 0, + "Vector search should work after optimization" + ); + + // Test scalar query after optimization (should find both old and new data) + let old_category_results = updated_cloned_dataset + .scan() + .filter("category = 'category_0'") + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let new_category_results = updated_cloned_dataset + .scan() + .filter("category = 'category_1'") + .unwrap() + .try_into_batch() + .await + .unwrap(); + + assert_eq!( + source_scalar_query_rows, + old_category_results.num_rows(), + "Should find old category data with {} rows", + source_scalar_query_rows + ); + assert!( + new_category_results.num_rows() > 0, + "Should find new category data" + ); + + // Verify index statistics + let vector_stats: serde_json::Value = serde_json::from_str( + &updated_cloned_dataset + .index_statistics("vector_idx") + .await + .unwrap(), + ) + .unwrap(); + let category_stats: serde_json::Value = serde_json::from_str( + &updated_cloned_dataset + .index_statistics("category_idx") + .await + .unwrap(), + ) + .unwrap(); + + assert_eq!(vector_stats["num_indexed_rows"].as_u64().unwrap(), 350); + assert_eq!(category_stats["num_indexed_rows"].as_u64().unwrap(), 350); + } } diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 70c13d9d603..15d10c1fe73 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -276,6 +276,7 @@ impl<'a> CreateIndexBuilder<'a> { index_details: Some(index_details), index_version: self.index_type.version(), created_at: Some(chrono::Utc::now()), + base_id: None, }; let transaction = Transaction::new( self.dataset.manifest.version, diff --git a/rust/lance/src/index/frag_reuse.rs b/rust/lance/src/index/frag_reuse.rs index 6b827ce8613..346664792e6 100644 --- a/rust/lance/src/index/frag_reuse.rs +++ b/rust/lance/src/index/frag_reuse.rs @@ -176,5 +176,6 @@ pub(crate) async fn build_frag_reuse_index_metadata( index_details: Some(prost_types::Any::from_msg(&proto)?), index_version: index_meta.map_or(0, |index_meta| index_meta.index_version), created_at: Some(chrono::Utc::now()), + base_id: None, }) } diff --git a/rust/lance/src/index/mem_wal.rs b/rust/lance/src/index/mem_wal.rs index b9e9f0451ea..a25fa3a340e 100644 --- a/rust/lance/src/index/mem_wal.rs +++ b/rust/lance/src/index/mem_wal.rs @@ -542,6 +542,7 @@ pub(crate) fn new_mem_wal_index_meta( ))?), index_version: 0, created_at: Some(chrono::Utc::now()), + base_id: None, }) } diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 101e177d504..996da799419 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -445,7 +445,14 @@ pub async fn open_scalar_index( metrics: &dyn MetricsCollector, ) -> Result> { let uuid_str = index.uuid.to_string(); - let index_store = Arc::new(LanceIndexStore::from_dataset(dataset, &uuid_str)); + let index_dir = dataset.indice_files_dir(index)?.child(uuid_str.as_str()); + let cache = dataset.metadata_cache.file_metadata_cache(&index_dir); + let index_store = Arc::new(LanceIndexStore::new( + dataset.object_store.clone(), + index_dir, + Arc::new(cache), + )); + let index_type = detect_scalar_index_type(dataset, index, column).await?; let frag_reuse_index = dataset.open_frag_reuse_index(metrics).await?; @@ -486,10 +493,12 @@ pub async fn open_scalar_index( async fn infer_scalar_index_type( dataset: &Dataset, - index_uuid: &str, + index: &Index, column: &str, ) -> Result { - let index_dir = dataset.indices_dir().child(index_uuid.to_string()); + let index_dir = dataset + .indice_files_dir(index)? + .child(index.uuid.to_string()); let col = dataset.schema().field(column).ok_or(Error::Internal { message: format!( "Index refers to column {} which does not exist in dataset schema", @@ -550,7 +559,7 @@ pub async fn detect_scalar_index_type( if let Some(index_type) = dataset.index_cache.get_with_key(&type_key).await { return Ok(*index_type.as_ref()); } - let index_type = infer_scalar_index_type(dataset, &index.uuid.to_string(), column).await?; + let index_type = infer_scalar_index_type(dataset, index, column).await?; dataset .index_cache .insert_with_key(&type_key, Arc::new(index_type)) @@ -693,6 +702,7 @@ mod tests { index_details, index_version: 0, created_at: None, + base_id: None, } } diff --git a/rust/lance/src/index/vector.rs b/rust/lance/src/index/vector.rs index feba2e5cfda..8b4663632fb 100644 --- a/rust/lance/src/index/vector.rs +++ b/rust/lance/src/index/vector.rs @@ -36,7 +36,9 @@ use lance_index::vector::{ sq::{builder::SQBuildParams, ScalarQuantizer}, VectorIndex, }; -use lance_index::{IndexType, INDEX_AUXILIARY_FILE_NAME, INDEX_METADATA_SCHEMA_KEY}; +use lance_index::{ + DatasetIndexExt, IndexType, INDEX_AUXILIARY_FILE_NAME, INDEX_METADATA_SCHEMA_KEY, +}; use lance_io::traits::Reader; use lance_linalg::distance::*; use lance_table::format::Index as IndexMetadata; @@ -652,13 +654,19 @@ pub(crate) async fn open_vector_index_v2( let distance_type = DistanceType::try_from(index_metadata.distance_type.as_str())?; let frag_reuse_uuid = dataset.frag_reuse_index_uuid(); + // Load the index metadata to get the correct index directory + let index_meta = dataset + .load_index(uuid) + .await? + .ok_or_else(|| Error::Index { + message: format!("Index with id {} does not exist", uuid), + location: location!(), + })?; + let index_dir = dataset.indice_files_dir(&index_meta)?; let index: Arc = match index_metadata.index_type.as_str() { "IVF_HNSW_PQ" => { - let aux_path = dataset - .indices_dir() - .child(uuid) - .child(INDEX_AUXILIARY_FILE_NAME); + let aux_path = index_dir.child(uuid).child(INDEX_AUXILIARY_FILE_NAME); let aux_reader = dataset.object_store().open(&aux_path).await?; let ivf_data = IvfModel::load(&reader).await?; @@ -685,10 +693,7 @@ pub(crate) async fn open_vector_index_v2( } "IVF_HNSW_SQ" => { - let aux_path = dataset - .indices_dir() - .child(uuid) - .child(INDEX_AUXILIARY_FILE_NAME); + let aux_path = index_dir.child(uuid).child(INDEX_AUXILIARY_FILE_NAME); let aux_reader = dataset.object_store().open(&aux_path).await?; let ivf_data = IvfModel::load(&reader).await?; diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 849deeafc2e..21716f4c872 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -2267,24 +2267,58 @@ mod tests { .await .unwrap(); - let index = dataset - .open_vector_index(WellKnownIvfPqData::COLUMN, &uuid_str, &NoOpMetricsCollector) - .await - .unwrap(); - let ivf_index = index.as_any().downcast_ref::().unwrap(); - + // After building the index file, we need to register the index metadata + // so that the dataset can find it when we try to open it + let field = dataset.schema().field(WellKnownIvfPqData::COLUMN).unwrap(); let index_meta = lance_table::format::Index { uuid, - dataset_version: 0, - fields: Vec::new(), + dataset_version: dataset.version().version, + fields: vec![field.id], name: INDEX_NAME.to_string(), - fragment_bitmap: None, + fragment_bitmap: Some( + dataset + .get_fragments() + .iter() + .map(|f| f.id() as u32) + .collect(), + ), index_details: Some(vector_index_details()), - index_version: index.index_type().version(), - created_at: None, // Test index, not setting timestamp + index_version: 1, // Use version 1 for IVF PQ index + created_at: Some(chrono::Utc::now()), + base_id: None, }; - let prefilter = Arc::new(DatasetPreFilter::new(dataset.clone(), &[index_meta], None)); + // We need to commit this index to the dataset so that it can be found + use crate::dataset::transaction::{Operation, Transaction}; + let transaction = Transaction::new( + dataset.version().version, + Operation::CreateIndex { + new_indices: vec![index_meta.clone()], + removed_indices: vec![], + }, + None, + None, + ); + + // Apply the transaction to register the index + // Since dataset is Arc, we need to create a mutable reference + let mut dataset_mut = (*dataset).clone(); + dataset_mut + .apply_commit(transaction, &Default::default(), &Default::default()) + .await + .unwrap(); + + let index = dataset_mut + .open_vector_index(WellKnownIvfPqData::COLUMN, &uuid_str, &NoOpMetricsCollector) + .await + .unwrap(); + + let ivf_index = index.as_any().downcast_ref::().unwrap(); + let prefilter = Arc::new(DatasetPreFilter::new( + Arc::new(dataset_mut.clone()), + std::slice::from_ref(&index_meta), + None, + )); let is_not_remapped = Some; let is_remapped = |row_id| Some(row_id + BIG_OFFSET); @@ -2312,10 +2346,10 @@ mod tests { let new_uuid_str = new_uuid.to_string(); remap_index_file( - &dataset, + &dataset_mut, &uuid_str, &new_uuid_str, - dataset.version().version, + dataset_mut.version().version, ivf_index, &mapping, INDEX_NAME.to_string(), @@ -2325,7 +2359,48 @@ mod tests { .await .unwrap(); - let remapped = dataset + // After remapping the index file, we need to register the new index metadata + // so that the dataset can find it when we try to open it + let field = dataset_mut + .schema() + .field(WellKnownIvfPqData::COLUMN) + .unwrap(); + let new_index_meta = lance_table::format::Index { + uuid: new_uuid, + dataset_version: dataset_mut.version().version, + fields: vec![field.id], + name: format!("{}_remapped", INDEX_NAME), + fragment_bitmap: Some( + dataset_mut + .get_fragments() + .iter() + .map(|f| f.id() as u32) + .collect(), + ), + index_details: Some(vector_index_details()), + index_version: index.index_type().version(), + created_at: Some(chrono::Utc::now()), + base_id: None, + }; + + // We need to commit this new index to the dataset so it can be found + let transaction = Transaction::new( + dataset_mut.version().version, + Operation::CreateIndex { + new_indices: vec![new_index_meta], + removed_indices: vec![], + }, + None, + None, + ); + + // Apply the transaction to register the new index + dataset_mut + .apply_commit(transaction, &Default::default(), &Default::default()) + .await + .unwrap(); + + let remapped = dataset_mut .open_vector_index( WellKnownIvfPqData::COLUMN, &new_uuid.to_string(), diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index dfabd409174..2f79c2c19b1 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -133,9 +133,37 @@ async fn do_commit_new_dataset( &Session::default(), ) .await?; - let new_manifest = - source_manifest.shallow_clone(ref_name.clone(), ref_path.clone(), transaction_file); - (new_manifest, Vec::new()) + + let new_base_id = source_manifest + .base_paths + .keys() + .max() + .map(|id| *id + 1) + .unwrap_or(0); + let new_manifest = source_manifest.shallow_clone( + ref_name.clone(), + ref_path.clone(), + new_base_id, + transaction_file, + ); + + let updated_indices = if let Some(index_section_pos) = source_manifest.index_section { + let reader = object_store.open(&source_manifest_location.path).await?; + let section: pb::IndexSection = + lance_io::utils::read_message(reader.as_ref(), index_section_pos).await?; + section + .indices + .into_iter() + .map(|index_pb| { + let mut index = lance_table::format::Index::try_from(index_pb)?; + index.base_id = Some(new_base_id); + Ok(index) + }) + .collect::>>()? + } else { + vec![] + }; + (new_manifest, updated_indices) } else { let (manifest, indices) = transaction.build_manifest( None, diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index 4b031da9575..5d50011ab06 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -1959,6 +1959,7 @@ mod tests { index_details: None, index_version: 0, created_at: None, // Test index, not setting timestamp + base_id: None, }; let fragment0 = Fragment::new(0); let fragment1 = Fragment::new(1);