diff --git a/rust/lance-table/src/io/deletion.rs b/rust/lance-table/src/io/deletion.rs index c7c58d50af0..5dd0028bfd7 100644 --- a/rust/lance-table/src/io/deletion.rs +++ b/rust/lance-table/src/io/deletion.rs @@ -46,6 +46,17 @@ pub fn deletion_file_path(base: &Path, fragment_id: u64, deletion_file: &Deletio .child(format!("{fragment_id}-{read_version}-{id}.{suffix}")) } +pub fn relative_deletion_file_path(fragment_id: u64, deletion_file: &DeletionFile) -> String { + let DeletionFile { + read_version, + id, + file_type, + .. + } = deletion_file; + let suffix = file_type.suffix(); + format!("{DELETIONS_DIR}/{fragment_id}-{read_version}-{id}.{suffix}") +} + /// Write a deletion file for a fragment for a given deletion vector. /// /// Returns the deletion file if one was written. If no deletions were present, diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 19372959a4e..1030faafe14 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -40,13 +40,13 @@ use lance_io::object_store::{ use lance_io::utils::{read_last_block, read_message, read_metadata_offset, read_struct}; use lance_namespace::LanceNamespace; use lance_table::format::{ - pb, DataFile, DataStorageFormat, DeletionFile, Fragment, IndexMetadata, Manifest, + pb, DataFile, DataStorageFormat, DeletionFile, Fragment, IndexMetadata, Manifest, RowIdMeta, }; use lance_table::io::commit::{ migrate_scheme_to_v2, write_manifest_file_to_path, CommitConfig, CommitError, CommitHandler, CommitLock, ManifestLocation, ManifestNamingScheme, VERSIONS_DIR, }; -use lance_table::io::manifest::read_manifest; +use lance_table::io::manifest::{read_manifest, read_manifest_indexes}; use object_store::path::Path; use prost::Message; use roaring::RoaringBitmap; @@ -112,7 +112,7 @@ use lance_core::box_error; pub use lance_core::ROW_ID; use lance_namespace::models::{CreateEmptyTableRequest, DescribeTableRequest}; use lance_table::feature_flags::{apply_feature_flags, can_read_dataset}; -use lance_table::io::deletion::DELETIONS_DIR; +use lance_table::io::deletion::{relative_deletion_file_path, DELETIONS_DIR}; pub use schema_evolution::{ BatchInfo, BatchUDF, ColumnAlteration, NewColumnTransform, UDFCheckpointStore, }; @@ -2150,6 +2150,102 @@ impl Dataset { builder.execute(transaction).await } + /// Deep clone the target version into a new dataset at target_path. + /// This performs a server-side copy of all relevant dataset files (data files, + /// deletion files, and any external row-id files) into the target dataset + /// without loading data into memory. + /// + /// Parameters: + /// - `target_path`: the URI string to clone the dataset into. + /// - `version`: the version cloned from, could be a version number, branch head, or tag. + /// - `store_params`: the object store params to use for the new dataset. + pub async fn deep_clone( + &mut self, + target_path: &str, + version: impl Into, + store_params: Option, + ) -> Result { + use futures::StreamExt; + + // Resolve source dataset and its manifest using checkout_version + let src_ds = self.checkout_version(version).await?; + let src_paths = src_ds.collect_paths().await?; + + // Prepare target object store and base path + let (target_store, target_base) = ObjectStore::from_uri_and_params( + self.session.store_registry(), + target_path, + &store_params.clone().unwrap_or_default(), + ) + .await?; + + // Prevent cloning into an existing target dataset + if self + .commit_handler + .resolve_latest_location(&target_base, &target_store) + .await + .is_ok() + { + return Err(Error::DatasetAlreadyExists { + uri: target_path.to_string(), + location: location!(), + }); + } + + let build_absolute_path = |relative_path: &str, base: &Path| -> Path { + let mut path = base.clone(); + for seg in relative_path.split('/') { + if !seg.is_empty() { + path = path.child(seg); + } + } + path + }; + + // TODO: Leverage object store bulk copy for efficient deep_clone + // + // All cloud storage providers support batch copy APIs that would provide significant + // performance improvements. We use single file copy before we have upstream support. + // + // Tracked by: https://github.com/lance-format/lance/issues/5435 + let io_parallelism = self.object_store.io_parallelism(); + let copy_futures = src_paths + .iter() + .map(|(relative_path, base)| { + let store = Arc::clone(&target_store); + let src_path = build_absolute_path(relative_path, base); + let target_path = build_absolute_path(relative_path, &target_base); + async move { store.copy(&src_path, &target_path).await.map(|_| ()) } + }) + .collect::>(); + + futures::stream::iter(copy_futures) + .buffer_unordered(io_parallelism) + .collect::>() + .await + .into_iter() + .collect::>>()?; + + // Record a Clone operation and commit via CommitBuilder + let ref_name = src_ds.manifest.branch.clone(); + let ref_version = src_ds.manifest_location.version; + let clone_op = Operation::Clone { + is_shallow: false, + ref_name, + ref_version, + ref_path: src_ds.uri().to_string(), + branch_name: None, + }; + let txn = Transaction::new(ref_version, clone_op, None); + let builder = CommitBuilder::new(WriteDestination::Uri(target_path)) + .with_store_params(store_params.clone().unwrap_or_default()) + .with_object_store(target_store.clone()) + .with_commit_handler(self.commit_handler.clone()) + .with_storage_format(self.manifest.data_storage_format.lance_file_version()?); + let new_ds = builder.execute(txn).await?; + Ok(new_ds) + } + async fn resolve_reference(&self, reference: refs::Ref) -> Result<(Option, u64)> { match reference { refs::Ref::Version(branch, version_number) => { @@ -2175,6 +2271,94 @@ impl Dataset { } } + /// Collect all (relative_path, path) of the dataset files. + async fn collect_paths(&self) -> Result> { + let mut file_paths: Vec<(String, Path)> = Vec::new(); + for fragment in self.manifest.fragments.iter() { + if let Some(RowIdMeta::External(external_file)) = &fragment.row_id_meta { + return Err(Error::Internal { + message: format!( + "External row_id_meta is not supported yet. external file path: {}", + external_file.path + ), + location: location!(), + }); + } + for data_file in fragment.files.iter() { + let base_root = if let Some(base_id) = data_file.base_id { + let base_path = + self.manifest + .base_paths + .get(&base_id) + .ok_or_else(|| Error::Internal { + message: format!("base_id {} not found", base_id), + location: location!(), + })?; + Path::parse(base_path.path.as_str())? + } else { + self.base.clone() + }; + file_paths.push(( + format!("{}/{}", DATA_DIR, data_file.path.clone()), + base_root, + )); + } + if let Some(deletion_file) = &fragment.deletion_file { + let base_root = if let Some(base_id) = deletion_file.base_id { + let base_path = + self.manifest + .base_paths + .get(&base_id) + .ok_or_else(|| Error::Internal { + message: format!("base_id {} not found", base_id), + location: location!(), + })?; + Path::parse(base_path.path.as_str())? + } else { + self.base.clone() + }; + file_paths.push(( + relative_deletion_file_path(fragment.id, deletion_file), + base_root, + )); + } + } + + let indices = read_manifest_indexes( + self.object_store.as_ref(), + &self.manifest_location, + &self.manifest, + ) + .await?; + + for index in &indices { + let base_root = if let Some(base_id) = index.base_id { + let base_path = + self.manifest + .base_paths + .get(&base_id) + .ok_or_else(|| Error::Internal { + message: format!("base_id {} not found", base_id), + location: location!(), + })?; + Path::parse(base_path.path.as_str())? + } else { + self.base.clone() + }; + let index_root = base_root.child(INDICES_DIR).child(index.uuid.to_string()); + let mut stream = self.object_store.read_dir_all(&index_root, None); + while let Some(meta) = stream.next().await.transpose()? { + if let Some(filename) = meta.location.filename() { + file_paths.push(( + format!("{}/{}/{}", INDICES_DIR, index.uuid, filename), + base_root.clone(), + )); + } + } + } + Ok(file_paths) + } + /// Run a SQL query against the dataset. /// The underlying SQL engine is DataFusion. /// Please refer to the DataFusion documentation for supported SQL syntax. diff --git a/rust/lance/src/dataset/tests/dataset_io.rs b/rust/lance/src/dataset/tests/dataset_io.rs index 0c163de3f96..ca95d4e4772 100644 --- a/rust/lance/src/dataset/tests/dataset_io.rs +++ b/rust/lance/src/dataset/tests/dataset_io.rs @@ -37,7 +37,11 @@ use lance_io::assert_io_eq; use lance_table::feature_flags; use futures::TryStreamExt; +use lance_index::scalar::ScalarIndexParams; +use lance_index::{DatasetIndexExt, IndexType}; +use lance_io::object_store::ObjectStore; use lance_table::io::manifest::read_manifest; +use object_store::path::Path; use rstest::rstest; #[rstest] @@ -481,6 +485,167 @@ async fn append_dataset( ) } +#[rstest] +#[tokio::test] +async fn test_deep_clone( + #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)] + data_storage_version: LanceFileVersion, +) { + // Setup source and target dirs + let test_dir = TempStdDir::default(); + let base_dir = test_dir.join("base_ds"); + let test_uri = base_dir.to_str().unwrap(); + let clone_dir = test_dir.join("clone_ds"); + let cloned_uri = clone_dir.to_str().unwrap(); + + // Generate test data + let data_reader = gen_batch() + .col("id", array::step::()) + .col("val", array::fill_utf8("deep".to_string())) + .into_reader_rows(RowCount::from(64), BatchCount::from(1)); + + // Create source dataset + let mut dataset = Dataset::write( + data_reader, + test_uri, + Some(WriteParams { + max_rows_per_file: 64, + max_rows_per_group: 16, + data_storage_version: Some(data_storage_version), + ..Default::default() + }), + ) + .await + .unwrap(); + + let mut branch = dataset + .create_branch("branch", dataset.version().version, None) + .await + .unwrap(); + + // Create a scalar index to validate index copy + branch + .create_index( + &["id"], + IndexType::Scalar, + Some("id_idx".to_string()), + &ScalarIndexParams::default(), + false, + ) + .await + .unwrap(); + + // Create a deletion file by deleting some rows + branch.delete("id < 10").await.unwrap(); + + let original_version = branch.version().version; + branch + .tags() + .create("tag", ("branch", original_version)) + .await + .unwrap(); + + // Perform deep clone + let cloned_dataset = branch.deep_clone(cloned_uri, "tag", None).await.unwrap(); + + // Validate target dataset rows + let batches = cloned_dataset + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 54); // 64 rows - 10 deletions + assert_eq!(cloned_dataset.version().version, original_version); + assert!(cloned_dataset.manifest().base_paths.is_empty()); + + // Validate internal file counts are equal between source and cloned datasets + let store = branch.object_store(); + let src_root = dataset.base.clone(); + let branch_root = branch.base.clone(); + let dst_root = cloned_dataset.base.clone(); + + let src_data = count_files(store, &src_root, "data").await; + let dst_data = count_files(store, &dst_root, "data").await; + assert_eq!(src_data, dst_data); + + let src_idx = count_files(store, &branch_root, "_indices").await; + let dst_idx = count_files(store, &dst_root, "_indices").await; + assert_eq!(src_idx, dst_idx); + + let src_del = count_files(store, &branch_root, "_deletions").await; + let dst_del = count_files(store, &dst_root, "_deletions").await; + assert_eq!(src_del, dst_del); + + // Validate index exists in cloned dataset + let cloned_indices = cloned_dataset.load_indices().await.unwrap(); + assert!(!cloned_indices.is_empty()); + assert_eq!(cloned_indices.first().unwrap().name, "id_idx"); + + // Verify base_id cleared in cloned manifest and indices + for frag in cloned_dataset.manifest().fragments.iter() { + for df in &frag.files { + assert!(df.base_id.is_none()); + } + if let Some(del) = &frag.deletion_file { + assert!(del.base_id.is_none()); + } + } + for idx in cloned_indices.iter() { + assert!(idx.base_id.is_none()); + } + + // Attempt cloning again to the same target should error + let res = dataset.deep_clone(cloned_uri, "tag", None).await; + assert!(matches!(res, Err(Error::DatasetAlreadyExists { .. }))); + + // Invalid tag should error + let res_invalid = dataset + .deep_clone(&format!("{}/clone_invalid", test_uri), "no_such_tag", None) + .await; + assert!(matches!(res_invalid, Err(Error::RefNotFound { .. }))); + + // deep_clone version before the deletion + let clone_dir = test_dir.join("clone_ds_old_ver"); + let cloned_ds = clone_dir.to_str().unwrap(); + let cloned_dataset = branch + .deep_clone(cloned_ds, ("branch", original_version - 1), None) + .await + .unwrap(); + let store = branch.object_store(); + let dst_root = cloned_dataset.base.clone(); + + // Validate target dataset rows + let batches = cloned_dataset + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 64); + assert_eq!(cloned_dataset.version().version, original_version - 1); + assert!(cloned_dataset.manifest().base_paths.is_empty()); + assert_eq!(count_files(store, &dst_root, "_deletions").await, 0); +} + +// Helper: count files under a dataset directory (data/_indices/_deletions) +async fn count_files(store: &ObjectStore, root: &Path, prefix: &str) -> usize { + use futures::StreamExt; + let dir = root.child(prefix); + let mut stream = store.read_dir_all(&dir, None); + let mut count: usize = 0; + while stream.next().await.transpose().unwrap().is_some() { + count += 1; + } + count +} + #[rstest] #[tokio::test] async fn test_shallow_clone_with_hybrid_paths( diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index e3933a9d45c..c19f0967780 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -119,6 +119,7 @@ async fn do_commit_new_dataset( }; let (mut manifest, indices) = if let Operation::Clone { + is_shallow, ref_name, ref_version, ref_path, @@ -139,37 +140,74 @@ async fn do_commit_new_dataset( ) .await?; - let new_base_id = source_manifest - .base_paths - .keys() - .max() - .map(|id| *id + 1) - .unwrap_or(0); - let new_manifest = source_manifest.shallow_clone( - ref_name.clone(), - ref_path.clone(), - new_base_id, - branch_name.clone(), - transaction_file, - ); + if *is_shallow { + let new_base_id = source_manifest + .base_paths + .keys() + .max() + .map(|id| *id + 1) + .unwrap_or(0); + let new_manifest = source_manifest.shallow_clone( + ref_name.clone(), + ref_path.clone(), + new_base_id, + branch_name.clone(), + transaction_file, + ); - let updated_indices = if let Some(index_section_pos) = source_manifest.index_section { - let reader = object_store.open(&source_manifest_location.path).await?; - let section: pb::IndexSection = - lance_io::utils::read_message(reader.as_ref(), index_section_pos).await?; - section - .indices - .into_iter() - .map(|index_pb| { - let mut index = IndexMetadata::try_from(index_pb)?; - index.base_id = Some(new_base_id); - Ok(index) - }) - .collect::>>()? + let updated_indices = if let Some(index_section_pos) = source_manifest.index_section { + let reader = object_store.open(&source_manifest_location.path).await?; + let section: pb::IndexSection = + lance_io::utils::read_message(reader.as_ref(), index_section_pos).await?; + section + .indices + .into_iter() + .map(|index_pb| { + let mut index = IndexMetadata::try_from(index_pb)?; + index.base_id = Some(new_base_id); + Ok(index) + }) + .collect::>>()? + } else { + vec![] + }; + (new_manifest, updated_indices) } else { - vec![] - }; - (new_manifest, updated_indices) + // Deep clone: build a manifest that references local files (no external bases) + let mut new_manifest = source_manifest.clone(); + new_manifest.base_paths.clear(); + new_manifest.branch = None; + new_manifest.tag = None; + new_manifest.index_section = None; // will be rewritten below + let mut new_frags = new_manifest.fragments.as_ref().clone(); + for f in &mut new_frags { + for df in &mut f.files { + df.base_id = None; + } + if let Some(d) = f.deletion_file.as_mut() { + d.base_id = None; + } + } + new_manifest.fragments = Arc::new(new_frags); + + // Indices: keep metadata but normalize base to local + let mut updated_indices = Vec::new(); + if let Some(index_section_pos) = source_manifest.index_section { + let reader = object_store.open(&source_manifest_location.path).await?; + let section: pb::IndexSection = + lance_io::utils::read_message(reader.as_ref(), index_section_pos).await?; + updated_indices = section + .indices + .into_iter() + .map(|index_pb| { + let mut index = IndexMetadata::try_from(index_pb)?; + index.base_id = None; + Ok(index) + }) + .collect::>>()?; + } + (new_manifest, updated_indices) + } } else { let (manifest, indices) = transaction.build_manifest(None, vec![], &transaction_file, write_config)?;