From c0bc1c56f03fd7ffcb87e4fc64a5000017c44d2f Mon Sep 17 00:00:00 2001 From: majin1102 Date: Mon, 17 Nov 2025 18:39:11 +0800 Subject: [PATCH 1/8] init --- rust/lance-table/src/io/deletion.rs | 11 + rust/lance/src/dataset.rs | 497 +++++++++++++++++++++++++++- rust/lance/src/io/commit.rs | 96 ++++-- 3 files changed, 573 insertions(+), 31 deletions(-) diff --git a/rust/lance-table/src/io/deletion.rs b/rust/lance-table/src/io/deletion.rs index ca714da4acd..39422b69e7e 100644 --- a/rust/lance-table/src/io/deletion.rs +++ b/rust/lance-table/src/io/deletion.rs @@ -46,6 +46,17 @@ pub fn deletion_file_path(base: &Path, fragment_id: u64, deletion_file: &Deletio .child(format!("{fragment_id}-{read_version}-{id}.{suffix}")) } +pub fn relative_deletion_file_path(fragment_id: u64, deletion_file: &DeletionFile) -> String { + let DeletionFile { + read_version, + id, + file_type, + .. + } = deletion_file; + let suffix = file_type.suffix(); + format!("{DELETION_DIRS}/{fragment_id}-{read_version}-{id}.{suffix}") +} + /// Write a deletion file for a fragment for a given deletion vector. /// /// Returns the deletion file if one was written. If no deletions were present, diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 89e9af18ce7..c9a35898ce9 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -37,13 +37,13 @@ use lance_io::object_store::{ use lance_io::utils::{read_last_block, read_message, read_metadata_offset, read_struct}; use lance_namespace::LanceNamespace; use lance_table::format::{ - pb, DataFile, DataStorageFormat, DeletionFile, Fragment, IndexMetadata, Manifest, + pb, DataFile, DataStorageFormat, DeletionFile, Fragment, IndexMetadata, Manifest, RowIdMeta, }; use lance_table::io::commit::{ migrate_scheme_to_v2, write_manifest_file_to_path, CommitConfig, CommitError, CommitHandler, CommitLock, ManifestLocation, ManifestNamingScheme, }; -use lance_table::io::manifest::read_manifest; +use lance_table::io::manifest::{read_manifest, read_manifest_indexes}; use object_store::path::Path; use prost::Message; use roaring::RoaringBitmap; @@ -109,6 +109,7 @@ use lance_core::box_error; pub use lance_core::ROW_ID; use lance_namespace::models::{CreateEmptyTableRequest, DescribeTableRequest}; use lance_table::feature_flags::{apply_feature_flags, can_read_dataset}; +use lance_table::io::deletion::relative_deletion_file_path; pub use schema_evolution::{ BatchInfo, BatchUDF, ColumnAlteration, NewColumnTransform, UDFCheckpointStore, }; @@ -204,6 +205,124 @@ impl From<&Manifest> for Version { } } +impl Dataset { + async fn collect_paths(&self) -> Result> { + let mut file_paths = Vec::new(); + for fragment in self.manifest.fragments.iter() { + if let Some(RowIdMeta::External(external_file)) = &fragment.row_id_meta { + return Err(Error::Internal { + message: format!( + "External row_id_meta is not supported yet. external file path: {}", + external_file.path + ), + location: location!(), + }); + } + for data_file in fragment.files.iter() { + let base_root = if let Some(base_id) = data_file.base_id { + let base_path = + self.manifest + .base_paths + .get(&base_id) + .ok_or_else(|| Error::Internal { + message: format!("base_id {} not found", base_id), + location: location!(), + })?; + // Use the base path as provided; do not append kind-specific directory here + Path::from(base_path.path.as_str()) + } else { + self.base.clone() + }; + file_paths.push(FilePath { + relative_path: format!("{}/{}", DATA_DIR, data_file.path.clone()), + base_path: base_root, + }); + } + // Deletion file under _deletions/ + if let Some(deletion_file) = &fragment.deletion_file { + let base_root = if let Some(base_id) = deletion_file.base_id { + let base_path = + self.manifest + .base_paths + .get(&base_id) + .ok_or_else(|| Error::Internal { + message: format!("base_id {} not found", base_id), + location: location!(), + })?; + Path::from(base_path.path.as_str()) + } else { + self.base.clone() + }; + file_paths.push(FilePath { + relative_path: relative_deletion_file_path(fragment.id, deletion_file), + base_path: base_root, + }); + } + } + + // Load indices for the source dataset + let indices = read_manifest_indexes( + self.object_store.as_ref(), + &self.manifest_location, + &self.manifest, + ) + .await?; + + for index in &indices { + // Base root: dataset root or external base path + let base_root = if let Some(base_id) = index.base_id { + let base_path = + self.manifest + .base_paths + .get(&base_id) + .ok_or_else(|| Error::Internal { + message: format!("base_id {} not found", base_id), + location: location!(), + })?; + Path::from(base_path.path.as_str()) + } else { + self.base.clone() + }; + // Index directory is /_indices/{uuid} + let index_root = base_root.child(INDICES_DIR).child(index.uuid.to_string()); + let mut stream = self.object_store.read_dir_all(&index_root, None); + while let Some(meta) = stream.next().await.transpose()? { + if let Some(filename) = meta.location.filename() { + file_paths.push(FilePath { + relative_path: format!("{}/{}/{}", INDICES_DIR, index.uuid, filename), + base_path: base_root.clone(), + }); + } + } + } + Ok(file_paths) + } +} + +/// A file path wrapper that can be used to represent a file in a dataset. +/// This wrapper is used for changing the base_path like deep_clone +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FilePath { + pub relative_path: String, + pub base_path: Path, +} + +impl FilePath { + fn absolute_path_with_base(&self, base: &Path) -> Path { + let mut path = base.clone(); + for seg in self.relative_path.split('/') { + if !seg.is_empty() { + path = path.child(seg); + } + } + path + } + + fn absolute_path(&self) -> Path { + self.absolute_path_with_base(&self.base_path) + } +} + /// Customize read behavior of a dataset. #[derive(Clone, Debug)] pub struct ReadParams { @@ -2114,6 +2233,88 @@ impl Dataset { builder.execute(transaction).await } + /// Deep clone the target version into a new dataset at target_path. + /// This performs a server-side copy of all relevant dataset files (data files, + /// deletion files, and any external row-id files) into the target dataset + /// without loading data into memory. + /// + /// Parameters: + /// - `target_path`: the URI string to clone the dataset into. + /// - `version`: the version cloned from, could be a version number, branch head, or tag. + /// - `store_params`: the object store params to use for the new dataset. + pub async fn deep_clone( + &mut self, + target_path: &str, + version: impl Into, + store_params: Option, + ) -> Result { + use futures::StreamExt; + + // Resolve source dataset and its manifest using checkout_version + let src_ds = self.checkout_version(version).await?; + let path_specs = self.collect_paths().await?; + + // Prepare target object store and base path + let (target_store, target_base) = ObjectStore::from_uri_and_params( + self.session.store_registry(), + target_path, + &store_params.clone().unwrap_or_default(), + ) + .await?; + + // Prevent cloning into an existing target dataset + if self + .commit_handler + .resolve_latest_location(&target_base, &target_store) + .await + .is_ok() + { + return Err(Error::DatasetAlreadyExists { + uri: target_path.to_string(), + location: location!(), + }); + } + + let io_parallelism = self.object_store.io_parallelism(); + let copy_futures = path_specs + .iter() + .map(|file| { + let store = Arc::clone(&target_store); + let src_path = file.absolute_path(); + let target_path = file.absolute_path_with_base(&target_base); + async move { + store.copy(&src_path, &target_path).await.map(|_| ()) + } + }) + .collect::>(); + + futures::stream::iter(copy_futures) + .buffer_unordered(io_parallelism) + .collect::>() + .await + .into_iter() + .collect::>>()?; + + // Record a Clone operation and commit via CommitBuilder + let ref_name = src_ds.manifest.branch.clone(); + let ref_version = src_ds.manifest_location.version; + let clone_op = Operation::Clone { + is_shallow: false, + ref_name, + ref_version, + ref_path: src_ds.uri().to_string(), + branch_name: None, + }; + let txn = Transaction::new(ref_version, clone_op, None); + let builder = CommitBuilder::new(WriteDestination::Uri(target_path)) + .with_store_params(store_params.clone().unwrap_or_default()) + .with_object_store(target_store.clone()) + .with_commit_handler(self.commit_handler.clone()) + .with_storage_format(self.manifest.data_storage_format.lance_file_version()?); + let new_ds = builder.execute(txn).await?; + Ok(new_ds) + } + async fn resolve_reference(&self, reference: refs::Ref) -> Result<(Option, u64)> { match reference { refs::Ref::Version(branch, version_number) => { @@ -3358,6 +3559,142 @@ mod tests { ) } + #[rstest] + #[tokio::test] + async fn test_deep_clone( + #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)] + data_storage_version: LanceFileVersion, + ) { + // Setup source and target dirs + let test_dir = TempStdDir::default(); + let base_dir = test_dir.join("base_ds"); + let test_uri = base_dir.to_str().unwrap(); + let clone_dir = test_dir.join("clone_ds"); + let cloned_uri = clone_dir.to_str().unwrap(); + + // Generate test data + let data_reader = gen_batch() + .col("id", array::step::()) + .col("val", array::fill_utf8("deep".to_string())) + .into_reader_rows(RowCount::from(64), BatchCount::from(1)); + + // Create source dataset + let mut dataset = Dataset::write( + data_reader, + test_uri, + Some(WriteParams { + max_rows_per_file: 64, + max_rows_per_group: 16, + data_storage_version: Some(data_storage_version), + ..Default::default() + }), + ) + .await + .unwrap(); + + let mut branch = dataset + .create_branch("branch", dataset.version().version, None) + .await + .unwrap(); + + // Create a scalar index to validate index copy + branch + .create_index( + &["id"], + IndexType::Scalar, + Some("id_idx".to_string()), + &ScalarIndexParams::default(), + false, + ) + .await + .unwrap(); + + // Create a deletion file by deleting some rows + branch.delete("id < 10").await.unwrap(); + + let original_version = branch.version().version; + branch + .tags() + .create_on_branch("tag", original_version, Some("branch")) + .await + .unwrap(); + + // Perform deep clone + let cloned_dataset = branch.deep_clone(cloned_uri, "tag", None).await.unwrap(); + + // Validate target dataset rows + let batches = cloned_dataset + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 54); // 64 rows - 10 deletions + assert_eq!(cloned_dataset.version().version, original_version); + assert!(cloned_dataset.manifest().base_paths.is_empty()); + + // Validate internal file counts are equal between source and cloned datasets + let store = branch.object_store(); + let src_root = dataset.base.clone(); + let branch_root = branch.base.clone(); + let dst_root = cloned_dataset.base.clone(); + + let src_data = count_files(store, &src_root, "data").await; + let dst_data = count_files(store, &dst_root, "data").await; + assert_eq!(src_data, dst_data); + + let src_idx = count_files(store, &branch_root, "_indices").await; + let dst_idx = count_files(store, &dst_root, "_indices").await; + assert_eq!(src_idx, dst_idx); + + let src_del = count_files(store, &branch_root, "_deletions").await; + let dst_del = count_files(store, &dst_root, "_deletions").await; + assert_eq!(src_del, dst_del); + + // Validate index exists in cloned dataset + let cloned_indices = cloned_dataset.load_indices().await.unwrap(); + assert!(!cloned_indices.is_empty()); + assert_eq!(cloned_indices.first().unwrap().name, "id_idx"); + + // Verify base_id cleared in cloned manifest and indices + for frag in cloned_dataset.manifest().fragments.iter() { + for df in &frag.files { + assert!(df.base_id.is_none()); + } + if let Some(del) = &frag.deletion_file { + assert!(del.base_id.is_none()); + } + } + for idx in cloned_indices.iter() { + assert!(idx.base_id.is_none()); + } + + // Attempt cloning again to the same target should error + let res = dataset.deep_clone(cloned_uri, "tag", None).await; + assert!(matches!(res, Err(Error::DatasetAlreadyExists { .. }))); + + // Invalid tag should error + let res_invalid = dataset + .deep_clone(&format!("{}/clone_invalid", test_uri), "no_such_tag", None) + .await; + assert!(matches!(res_invalid, Err(Error::RefNotFound { .. }))); + } + + // Helper: recursively count files under a dataset directory (data/_indices/_deletions) + async fn count_files(store: &ObjectStore, root: &Path, prefix: &str) -> usize { + use futures::StreamExt; + let dir = root.child(prefix); + let mut stream = store.read_dir_all(&dir, None); + let mut count: usize = 0; + while stream.next().await.transpose().unwrap().is_some() { + count += 1; + } + count + } + #[rstest] #[tokio::test] async fn test_shallow_clone_with_hybrid_paths( @@ -9609,6 +9946,162 @@ mod tests { assert!(branches.is_empty()); } + // Deep clone branch: basic functionality + #[tokio::test] + async fn test_deep_clone_branch_basic() { + let tempdir = TempDir::default(); + let test_uri = tempdir.path_str(); + + // Create main dataset + let data_main = gen_batch() + .col("id", array::step::()) + .into_reader_rows(RowCount::from(50), BatchCount::from(1)); + let mut main_ds = Dataset::write( + data_main, + &test_uri, + Some(WriteParams { + mode: WriteMode::Create, + ..Default::default() + }), + ) + .await + .unwrap(); + + // Create branch and append + let mut branch_ds = main_ds + .create_branch("feature/deep", main_ds.version().version, None) + .await + .unwrap(); + let data_branch = gen_batch() + .col("id", array::step_custom::(50, 1)) + .into_reader_rows(RowCount::from(25), BatchCount::from(1)); + branch_ds = Dataset::write( + data_branch, + branch_ds.uri(), + Some(WriteParams { + mode: WriteMode::Append, + ..Default::default() + }), + ) + .await + .unwrap(); + + // Deep clone from branch head into a new dataset + let target = format!("{}/deep_clone_branch_basic", test_uri); + let cloned = main_ds + .deep_clone(&target, ("feature/deep", None), None) + .await + .unwrap(); + + // Verify: row count matches branch, base paths cleared + assert_eq!( + cloned.count_rows(None).await.unwrap(), + branch_ds.count_rows(None).await.unwrap() + ); + assert_eq!(cloned.manifest.base_paths.len(), 0); + assert!(cloned + .manifest + .fragments + .iter() + .all(|f| f.files.iter().all(|df| df.base_id.is_none()))); + } + + // Deep clone should error on non-existent branch + #[tokio::test] + async fn test_deep_clone_invalid_branch() { + let tempdir = TempStrDir::default(); + + // Create main dataset + let data_main = gen_batch() + .col("id", array::step::()) + .into_reader_rows(RowCount::from(10), BatchCount::from(1)); + let mut ds = Dataset::write( + data_main, + &tempdir, + Some(WriteParams { + mode: WriteMode::Create, + ..Default::default() + }), + ) + .await + .unwrap(); + + // Attempt deep clone from non-existent branch + let target = format!("{}/deep_clone_invalid_branch", tempdir.as_str()); + let res = ds + .deep_clone(&target, ("non-existent-branch", None), None) + .await; + assert!(res.is_err()); + } + + // Deep clone snapshot stability: changes after clone do not affect clone + #[tokio::test] + async fn test_deep_clone_branch_snapshot_stability() { + let tempdir = TempStrDir::default(); + + // Create main dataset + let data_main = gen_batch() + .col("id", array::step::()) + .into_reader_rows(RowCount::from(40), BatchCount::from(1)); + let mut main_ds = Dataset::write( + data_main, + &tempdir, + Some(WriteParams { + mode: WriteMode::Create, + ..Default::default() + }), + ) + .await + .unwrap(); + + // Create branch and append some rows + let mut branch_ds = main_ds + .create_branch("feature/deep", main_ds.version().version, None) + .await + .unwrap(); + let data_branch_a = gen_batch() + .col("id", array::step_custom::(40, 1)) + .into_reader_rows(RowCount::from(10), BatchCount::from(1)); + branch_ds = Dataset::write( + data_branch_a, + branch_ds.uri(), + Some(WriteParams { + mode: WriteMode::Append, + ..Default::default() + }), + ) + .await + .unwrap(); + + let rows_before_clone = branch_ds.count_rows(None).await.unwrap(); + + // Deep clone at current branch head + let snap_target = format!("{}/deep_clone_snap", tempdir.as_str()); + let snap = main_ds + .deep_clone(&snap_target, ("feature/deep", None), None) + .await + .unwrap(); + + // Advance branch after cloning + let data_branch_b = gen_batch() + .col("id", array::step_custom::(50, 1)) + .into_reader_rows(RowCount::from(5), BatchCount::from(1)); + let _ = Dataset::write( + data_branch_b, + branch_ds.uri(), + Some(WriteParams { + mode: WriteMode::Append, + ..Default::default() + }), + ) + .await + .unwrap(); + + // Verify cloned snapshot is stable + assert_eq!(snap.count_rows(None).await.unwrap(), rows_before_clone); + assert_eq!(snap.manifest.base_paths.len(), 0); + } + #[tokio::test] async fn test_add_bases() { use lance_table::format::BasePath; diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index c8173ab1c4e..d690a2f537c 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -118,6 +118,7 @@ async fn do_commit_new_dataset( }; let (mut manifest, indices) = if let Operation::Clone { + is_shallow, ref_name, ref_version, ref_path, @@ -138,37 +139,74 @@ async fn do_commit_new_dataset( ) .await?; - let new_base_id = source_manifest - .base_paths - .keys() - .max() - .map(|id| *id + 1) - .unwrap_or(0); - let new_manifest = source_manifest.shallow_clone( - ref_name.clone(), - ref_path.clone(), - new_base_id, - branch_name.clone(), - transaction_file, - ); + if *is_shallow { + let new_base_id = source_manifest + .base_paths + .keys() + .max() + .map(|id| *id + 1) + .unwrap_or(0); + let new_manifest = source_manifest.shallow_clone( + ref_name.clone(), + ref_path.clone(), + new_base_id, + branch_name.clone(), + transaction_file, + ); - let updated_indices = if let Some(index_section_pos) = source_manifest.index_section { - let reader = object_store.open(&source_manifest_location.path).await?; - let section: pb::IndexSection = - lance_io::utils::read_message(reader.as_ref(), index_section_pos).await?; - section - .indices - .into_iter() - .map(|index_pb| { - let mut index = IndexMetadata::try_from(index_pb)?; - index.base_id = Some(new_base_id); - Ok(index) - }) - .collect::>>()? + let updated_indices = if let Some(index_section_pos) = source_manifest.index_section { + let reader = object_store.open(&source_manifest_location.path).await?; + let section: pb::IndexSection = + lance_io::utils::read_message(reader.as_ref(), index_section_pos).await?; + section + .indices + .into_iter() + .map(|index_pb| { + let mut index = IndexMetadata::try_from(index_pb)?; + index.base_id = Some(new_base_id); + Ok(index) + }) + .collect::>>()? + } else { + vec![] + }; + (new_manifest, updated_indices) } else { - vec![] - }; - (new_manifest, updated_indices) + // Deep clone: build a manifest that references local files (no external bases) + let mut new_manifest = source_manifest.clone(); + new_manifest.base_paths.clear(); + new_manifest.branch = None; + new_manifest.tag = None; + new_manifest.index_section = None; // will be rewritten below + let mut new_frags = new_manifest.fragments.as_ref().clone(); + for f in &mut new_frags { + for df in &mut f.files { + df.base_id = None; + } + if let Some(d) = f.deletion_file.as_mut() { + d.base_id = None; + } + } + new_manifest.fragments = Arc::new(new_frags); + + // Indices: keep metadata but normalize base to local + let mut updated_indices = Vec::new(); + if let Some(index_section_pos) = source_manifest.index_section { + let reader = object_store.open(&source_manifest_location.path).await?; + let section: pb::IndexSection = + lance_io::utils::read_message(reader.as_ref(), index_section_pos).await?; + updated_indices = section + .indices + .into_iter() + .map(|index_pb| { + let mut index = IndexMetadata::try_from(index_pb)?; + index.base_id = None; + Ok(index) + }) + .collect::>>()?; + } + (new_manifest, updated_indices) + } } else { let (manifest, indices) = transaction.build_manifest(None, vec![], &transaction_file, write_config)?; From 38db690581db52de59e0d7fad37a2b3571e58e4d Mon Sep 17 00:00:00 2001 From: majin1102 Date: Wed, 3 Dec 2025 18:25:01 +0800 Subject: [PATCH 2/8] fmt --- rust/lance/src/dataset.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index c9a35898ce9..c4609fd0485 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -2282,9 +2282,7 @@ impl Dataset { let store = Arc::clone(&target_store); let src_path = file.absolute_path(); let target_path = file.absolute_path_with_base(&target_base); - async move { - store.copy(&src_path, &target_path).await.map(|_| ()) - } + async move { store.copy(&src_path, &target_path).await.map(|_| ()) } }) .collect::>(); From 1a9d3b573f12f173f9d15759fb98bb99a95321ff Mon Sep 17 00:00:00 2001 From: majin1102 Date: Wed, 3 Dec 2025 20:02:00 +0800 Subject: [PATCH 3/8] Use Path::parse instead of Path::from --- rust/lance/src/dataset.rs | 337 ++++++++++---------------------------- 1 file changed, 87 insertions(+), 250 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index c4609fd0485..32048299317 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -205,100 +205,6 @@ impl From<&Manifest> for Version { } } -impl Dataset { - async fn collect_paths(&self) -> Result> { - let mut file_paths = Vec::new(); - for fragment in self.manifest.fragments.iter() { - if let Some(RowIdMeta::External(external_file)) = &fragment.row_id_meta { - return Err(Error::Internal { - message: format!( - "External row_id_meta is not supported yet. external file path: {}", - external_file.path - ), - location: location!(), - }); - } - for data_file in fragment.files.iter() { - let base_root = if let Some(base_id) = data_file.base_id { - let base_path = - self.manifest - .base_paths - .get(&base_id) - .ok_or_else(|| Error::Internal { - message: format!("base_id {} not found", base_id), - location: location!(), - })?; - // Use the base path as provided; do not append kind-specific directory here - Path::from(base_path.path.as_str()) - } else { - self.base.clone() - }; - file_paths.push(FilePath { - relative_path: format!("{}/{}", DATA_DIR, data_file.path.clone()), - base_path: base_root, - }); - } - // Deletion file under _deletions/ - if let Some(deletion_file) = &fragment.deletion_file { - let base_root = if let Some(base_id) = deletion_file.base_id { - let base_path = - self.manifest - .base_paths - .get(&base_id) - .ok_or_else(|| Error::Internal { - message: format!("base_id {} not found", base_id), - location: location!(), - })?; - Path::from(base_path.path.as_str()) - } else { - self.base.clone() - }; - file_paths.push(FilePath { - relative_path: relative_deletion_file_path(fragment.id, deletion_file), - base_path: base_root, - }); - } - } - - // Load indices for the source dataset - let indices = read_manifest_indexes( - self.object_store.as_ref(), - &self.manifest_location, - &self.manifest, - ) - .await?; - - for index in &indices { - // Base root: dataset root or external base path - let base_root = if let Some(base_id) = index.base_id { - let base_path = - self.manifest - .base_paths - .get(&base_id) - .ok_or_else(|| Error::Internal { - message: format!("base_id {} not found", base_id), - location: location!(), - })?; - Path::from(base_path.path.as_str()) - } else { - self.base.clone() - }; - // Index directory is /_indices/{uuid} - let index_root = base_root.child(INDICES_DIR).child(index.uuid.to_string()); - let mut stream = self.object_store.read_dir_all(&index_root, None); - while let Some(meta) = stream.next().await.transpose()? { - if let Some(filename) = meta.location.filename() { - file_paths.push(FilePath { - relative_path: format!("{}/{}/{}", INDICES_DIR, index.uuid, filename), - base_path: base_root.clone(), - }); - } - } - } - Ok(file_paths) - } -} - /// A file path wrapper that can be used to represent a file in a dataset. /// This wrapper is used for changing the base_path like deep_clone #[derive(Debug, Clone, PartialEq, Eq)] @@ -2334,6 +2240,93 @@ impl Dataset { } } + async fn collect_paths(&self) -> Result> { + let mut file_paths = Vec::new(); + for fragment in self.manifest.fragments.iter() { + if let Some(RowIdMeta::External(external_file)) = &fragment.row_id_meta { + return Err(Error::Internal { + message: format!( + "External row_id_meta is not supported yet. external file path: {}", + external_file.path + ), + location: location!(), + }); + } + for data_file in fragment.files.iter() { + let base_root = if let Some(base_id) = data_file.base_id { + let base_path = + self.manifest + .base_paths + .get(&base_id) + .ok_or_else(|| Error::Internal { + message: format!("base_id {} not found", base_id), + location: location!(), + })?; + Path::parse(base_path.path.as_str())? + } else { + self.base.clone() + }; + file_paths.push(FilePath { + relative_path: format!("{}/{}", DATA_DIR, data_file.path.clone()), + base_path: base_root, + }); + } + if let Some(deletion_file) = &fragment.deletion_file { + let base_root = if let Some(base_id) = deletion_file.base_id { + let base_path = + self.manifest + .base_paths + .get(&base_id) + .ok_or_else(|| Error::Internal { + message: format!("base_id {} not found", base_id), + location: location!(), + })?; + Path::parse(base_path.path.as_str())? + } else { + self.base.clone() + }; + file_paths.push(FilePath { + relative_path: relative_deletion_file_path(fragment.id, deletion_file), + base_path: base_root, + }); + } + } + + let indices = read_manifest_indexes( + self.object_store.as_ref(), + &self.manifest_location, + &self.manifest, + ) + .await?; + + for index in &indices { + let base_root = if let Some(base_id) = index.base_id { + let base_path = + self.manifest + .base_paths + .get(&base_id) + .ok_or_else(|| Error::Internal { + message: format!("base_id {} not found", base_id), + location: location!(), + })?; + Path::parse(base_path.path.as_str())? + } else { + self.base.clone() + }; + let index_root = base_root.child(INDICES_DIR).child(index.uuid.to_string()); + let mut stream = self.object_store.read_dir_all(&index_root, None); + while let Some(meta) = stream.next().await.transpose()? { + if let Some(filename) = meta.location.filename() { + file_paths.push(FilePath { + relative_path: format!("{}/{}/{}", INDICES_DIR, index.uuid, filename), + base_path: base_root.clone(), + }); + } + } + } + Ok(file_paths) + } + /// Run a SQL query against the dataset. /// The underlying SQL engine is DataFusion. /// Please refer to the DataFusion documentation for supported SQL syntax. @@ -9944,162 +9937,6 @@ mod tests { assert!(branches.is_empty()); } - // Deep clone branch: basic functionality - #[tokio::test] - async fn test_deep_clone_branch_basic() { - let tempdir = TempDir::default(); - let test_uri = tempdir.path_str(); - - // Create main dataset - let data_main = gen_batch() - .col("id", array::step::()) - .into_reader_rows(RowCount::from(50), BatchCount::from(1)); - let mut main_ds = Dataset::write( - data_main, - &test_uri, - Some(WriteParams { - mode: WriteMode::Create, - ..Default::default() - }), - ) - .await - .unwrap(); - - // Create branch and append - let mut branch_ds = main_ds - .create_branch("feature/deep", main_ds.version().version, None) - .await - .unwrap(); - let data_branch = gen_batch() - .col("id", array::step_custom::(50, 1)) - .into_reader_rows(RowCount::from(25), BatchCount::from(1)); - branch_ds = Dataset::write( - data_branch, - branch_ds.uri(), - Some(WriteParams { - mode: WriteMode::Append, - ..Default::default() - }), - ) - .await - .unwrap(); - - // Deep clone from branch head into a new dataset - let target = format!("{}/deep_clone_branch_basic", test_uri); - let cloned = main_ds - .deep_clone(&target, ("feature/deep", None), None) - .await - .unwrap(); - - // Verify: row count matches branch, base paths cleared - assert_eq!( - cloned.count_rows(None).await.unwrap(), - branch_ds.count_rows(None).await.unwrap() - ); - assert_eq!(cloned.manifest.base_paths.len(), 0); - assert!(cloned - .manifest - .fragments - .iter() - .all(|f| f.files.iter().all(|df| df.base_id.is_none()))); - } - - // Deep clone should error on non-existent branch - #[tokio::test] - async fn test_deep_clone_invalid_branch() { - let tempdir = TempStrDir::default(); - - // Create main dataset - let data_main = gen_batch() - .col("id", array::step::()) - .into_reader_rows(RowCount::from(10), BatchCount::from(1)); - let mut ds = Dataset::write( - data_main, - &tempdir, - Some(WriteParams { - mode: WriteMode::Create, - ..Default::default() - }), - ) - .await - .unwrap(); - - // Attempt deep clone from non-existent branch - let target = format!("{}/deep_clone_invalid_branch", tempdir.as_str()); - let res = ds - .deep_clone(&target, ("non-existent-branch", None), None) - .await; - assert!(res.is_err()); - } - - // Deep clone snapshot stability: changes after clone do not affect clone - #[tokio::test] - async fn test_deep_clone_branch_snapshot_stability() { - let tempdir = TempStrDir::default(); - - // Create main dataset - let data_main = gen_batch() - .col("id", array::step::()) - .into_reader_rows(RowCount::from(40), BatchCount::from(1)); - let mut main_ds = Dataset::write( - data_main, - &tempdir, - Some(WriteParams { - mode: WriteMode::Create, - ..Default::default() - }), - ) - .await - .unwrap(); - - // Create branch and append some rows - let mut branch_ds = main_ds - .create_branch("feature/deep", main_ds.version().version, None) - .await - .unwrap(); - let data_branch_a = gen_batch() - .col("id", array::step_custom::(40, 1)) - .into_reader_rows(RowCount::from(10), BatchCount::from(1)); - branch_ds = Dataset::write( - data_branch_a, - branch_ds.uri(), - Some(WriteParams { - mode: WriteMode::Append, - ..Default::default() - }), - ) - .await - .unwrap(); - - let rows_before_clone = branch_ds.count_rows(None).await.unwrap(); - - // Deep clone at current branch head - let snap_target = format!("{}/deep_clone_snap", tempdir.as_str()); - let snap = main_ds - .deep_clone(&snap_target, ("feature/deep", None), None) - .await - .unwrap(); - - // Advance branch after cloning - let data_branch_b = gen_batch() - .col("id", array::step_custom::(50, 1)) - .into_reader_rows(RowCount::from(5), BatchCount::from(1)); - let _ = Dataset::write( - data_branch_b, - branch_ds.uri(), - Some(WriteParams { - mode: WriteMode::Append, - ..Default::default() - }), - ) - .await - .unwrap(); - - // Verify cloned snapshot is stable - assert_eq!(snap.count_rows(None).await.unwrap(), rows_before_clone); - assert_eq!(snap.manifest.base_paths.len(), 0); - } - #[tokio::test] async fn test_add_bases() { use lance_table::format::BasePath; From 93b5c4192e554c399c0a0fd13a6385abe327f767 Mon Sep 17 00:00:00 2001 From: majin1102 Date: Thu, 4 Dec 2025 16:24:50 +0800 Subject: [PATCH 4/8] fix clippy after rebasing --- rust/lance/src/dataset.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index ed32cd155c7..f25bce0e50e 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -2963,4 +2963,5 @@ impl Projectable for Dataset { } } +#[cfg(test)] mod tests; From 91c2fc2228f05aedb9668c37a0a9cbec3eda2de6 Mon Sep 17 00:00:00 2001 From: majin1102 Date: Mon, 8 Dec 2025 22:15:02 +0800 Subject: [PATCH 5/8] address comments --- rust/lance/src/dataset.rs | 79 +++++----- rust/lance/src/dataset/tests/dataset_io.rs | 165 +++++++++++++++++++++ 2 files changed, 201 insertions(+), 43 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index f25bce0e50e..990b28bcdc7 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -208,30 +208,6 @@ impl From<&Manifest> for Version { } } -/// A file path wrapper that can be used to represent a file in a dataset. -/// This wrapper is used for changing the base_path like deep_clone -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct FilePath { - pub relative_path: String, - pub base_path: Path, -} - -impl FilePath { - fn absolute_path_with_base(&self, base: &Path) -> Path { - let mut path = base.clone(); - for seg in self.relative_path.split('/') { - if !seg.is_empty() { - path = path.child(seg); - } - } - path - } - - fn absolute_path(&self) -> Path { - self.absolute_path_with_base(&self.base_path) - } -} - /// Customize read behavior of a dataset. #[derive(Clone, Debug)] pub struct ReadParams { @@ -2185,7 +2161,7 @@ impl Dataset { // Resolve source dataset and its manifest using checkout_version let src_ds = self.checkout_version(version).await?; - let path_specs = self.collect_paths().await?; + let src_paths = src_ds.collect_paths().await?; // Prepare target object store and base path let (target_store, target_base) = ObjectStore::from_uri_and_params( @@ -2208,13 +2184,29 @@ impl Dataset { }); } + let build_absolute_path = |relative_path: &str, base: &Path| -> Path { + let mut path = base.clone(); + for seg in relative_path.split('/') { + if !seg.is_empty() { + path = path.child(seg); + } + } + path + }; + + /// TODO: Leverage object store bulk copy for efficient deep_clone + /// + /// All cloud storage providers support batch copy APIs that would provide significant + /// performance improvements. We use single file copy before we have upstream support. + /// + /// Tracked by: https://github.com/lance-format/lance/issues/5435 let io_parallelism = self.object_store.io_parallelism(); - let copy_futures = path_specs + let copy_futures = src_paths .iter() - .map(|file| { + .map(|(relative_path, base)| { let store = Arc::clone(&target_store); - let src_path = file.absolute_path(); - let target_path = file.absolute_path_with_base(&target_base); + let src_path = build_absolute_path(relative_path, base); + let target_path = build_absolute_path(relative_path, &target_base); async move { store.copy(&src_path, &target_path).await.map(|_| ()) } }) .collect::>(); @@ -2267,8 +2259,9 @@ impl Dataset { } } - async fn collect_paths(&self) -> Result> { - let mut file_paths = Vec::new(); + /// Collect all (relative_path, path) of the dataset files. + async fn collect_paths(&self) -> Result> { + let mut file_paths: Vec<(String, Path)> = Vec::new(); for fragment in self.manifest.fragments.iter() { if let Some(RowIdMeta::External(external_file)) = &fragment.row_id_meta { return Err(Error::Internal { @@ -2293,10 +2286,10 @@ impl Dataset { } else { self.base.clone() }; - file_paths.push(FilePath { - relative_path: format!("{}/{}", DATA_DIR, data_file.path.clone()), - base_path: base_root, - }); + file_paths.push(( + format!("{}/{}", DATA_DIR, data_file.path.clone()), + base_root, + )); } if let Some(deletion_file) = &fragment.deletion_file { let base_root = if let Some(base_id) = deletion_file.base_id { @@ -2312,10 +2305,10 @@ impl Dataset { } else { self.base.clone() }; - file_paths.push(FilePath { - relative_path: relative_deletion_file_path(fragment.id, deletion_file), - base_path: base_root, - }); + file_paths.push(( + relative_deletion_file_path(fragment.id, deletion_file), + base_root, + )); } } @@ -2344,10 +2337,10 @@ impl Dataset { let mut stream = self.object_store.read_dir_all(&index_root, None); while let Some(meta) = stream.next().await.transpose()? { if let Some(filename) = meta.location.filename() { - file_paths.push(FilePath { - relative_path: format!("{}/{}/{}", INDICES_DIR, index.uuid, filename), - base_path: base_root.clone(), - }); + file_paths.push(( + format!("{}/{}/{}", INDICES_DIR, index.uuid, filename), + base_root.clone(), + )); } } } diff --git a/rust/lance/src/dataset/tests/dataset_io.rs b/rust/lance/src/dataset/tests/dataset_io.rs index 0c163de3f96..c79578539ba 100644 --- a/rust/lance/src/dataset/tests/dataset_io.rs +++ b/rust/lance/src/dataset/tests/dataset_io.rs @@ -37,7 +37,11 @@ use lance_io::assert_io_eq; use lance_table::feature_flags; use futures::TryStreamExt; +use lance_index::scalar::ScalarIndexParams; +use lance_index::{DatasetIndexExt, IndexType}; +use lance_io::object_store::ObjectStore; use lance_table::io::manifest::read_manifest; +use object_store::path::Path; use rstest::rstest; #[rstest] @@ -481,6 +485,167 @@ async fn append_dataset( ) } +#[rstest] +#[tokio::test] +async fn test_deep_clone( + #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)] + data_storage_version: LanceFileVersion, +) { + // Setup source and target dirs + let test_dir = TempStdDir::default(); + let base_dir = test_dir.join("base_ds"); + let test_uri = base_dir.to_str().unwrap(); + let clone_dir = test_dir.join("clone_ds"); + let cloned_uri = clone_dir.to_str().unwrap(); + + // Generate test data + let data_reader = gen_batch() + .col("id", array::step::()) + .col("val", array::fill_utf8("deep".to_string())) + .into_reader_rows(RowCount::from(64), BatchCount::from(1)); + + // Create source dataset + let mut dataset = Dataset::write( + data_reader, + test_uri, + Some(WriteParams { + max_rows_per_file: 64, + max_rows_per_group: 16, + data_storage_version: Some(data_storage_version), + ..Default::default() + }), + ) + .await + .unwrap(); + + let mut branch = dataset + .create_branch("branch", dataset.version().version, None) + .await + .unwrap(); + + // Create a scalar index to validate index copy + branch + .create_index( + &["id"], + IndexType::Scalar, + Some("id_idx".to_string()), + &ScalarIndexParams::default(), + false, + ) + .await + .unwrap(); + + // Create a deletion file by deleting some rows + branch.delete("id < 10").await.unwrap(); + + let original_version = branch.version().version; + branch + .tags() + .create_on_branch("tag", original_version, Some("branch")) + .await + .unwrap(); + + // Perform deep clone + let cloned_dataset = branch.deep_clone(cloned_uri, "tag", None).await.unwrap(); + + // Validate target dataset rows + let batches = cloned_dataset + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 54); // 64 rows - 10 deletions + assert_eq!(cloned_dataset.version().version, original_version); + assert!(cloned_dataset.manifest().base_paths.is_empty()); + + // Validate internal file counts are equal between source and cloned datasets + let store = branch.object_store(); + let src_root = dataset.base.clone(); + let branch_root = branch.base.clone(); + let dst_root = cloned_dataset.base.clone(); + + let src_data = count_files(store, &src_root, "data").await; + let dst_data = count_files(store, &dst_root, "data").await; + assert_eq!(src_data, dst_data); + + let src_idx = count_files(store, &branch_root, "_indices").await; + let dst_idx = count_files(store, &dst_root, "_indices").await; + assert_eq!(src_idx, dst_idx); + + let src_del = count_files(store, &branch_root, "_deletions").await; + let dst_del = count_files(store, &dst_root, "_deletions").await; + assert_eq!(src_del, dst_del); + + // Validate index exists in cloned dataset + let cloned_indices = cloned_dataset.load_indices().await.unwrap(); + assert!(!cloned_indices.is_empty()); + assert_eq!(cloned_indices.first().unwrap().name, "id_idx"); + + // Verify base_id cleared in cloned manifest and indices + for frag in cloned_dataset.manifest().fragments.iter() { + for df in &frag.files { + assert!(df.base_id.is_none()); + } + if let Some(del) = &frag.deletion_file { + assert!(del.base_id.is_none()); + } + } + for idx in cloned_indices.iter() { + assert!(idx.base_id.is_none()); + } + + // Attempt cloning again to the same target should error + let res = dataset.deep_clone(cloned_uri, "tag", None).await; + assert!(matches!(res, Err(Error::DatasetAlreadyExists { .. }))); + + // Invalid tag should error + let res_invalid = dataset + .deep_clone(&format!("{}/clone_invalid", test_uri), "no_such_tag", None) + .await; + assert!(matches!(res_invalid, Err(Error::RefNotFound { .. }))); + + // deep_clone version before the deletion + let clone_dir = test_dir.join("clone_ds_old_ver"); + let cloned_ds = clone_dir.to_str().unwrap(); + let cloned_dataset = branch + .deep_clone(cloned_ds, original_version - 1, None) + .await + .unwrap(); + let store = branch.object_store(); + let dst_root = cloned_dataset.base.clone(); + + // Validate target dataset rows + let batches = cloned_dataset + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 64); + assert_eq!(cloned_dataset.version().version, original_version - 1); + assert!(cloned_dataset.manifest().base_paths.is_empty()); + assert_eq!(count_files(store, &dst_root, "_deletions").await, 0); +} + +// Helper: count files under a dataset directory (data/_indices/_deletions) +async fn count_files(store: &ObjectStore, root: &Path, prefix: &str) -> usize { + use futures::StreamExt; + let dir = root.child(prefix); + let mut stream = store.read_dir_all(&dir, None); + let mut count: usize = 0; + while stream.next().await.transpose().unwrap().is_some() { + count += 1; + } + count +} + #[rstest] #[tokio::test] async fn test_shallow_clone_with_hybrid_paths( From d8a73c49c274db0861ba1a747be30dbba8a5a75d Mon Sep 17 00:00:00 2001 From: majin1102 Date: Tue, 9 Dec 2025 13:41:45 +0800 Subject: [PATCH 6/8] fix clippy --- rust/lance/src/dataset.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 990b28bcdc7..97adff3399f 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -2194,12 +2194,12 @@ impl Dataset { path }; - /// TODO: Leverage object store bulk copy for efficient deep_clone - /// - /// All cloud storage providers support batch copy APIs that would provide significant - /// performance improvements. We use single file copy before we have upstream support. - /// - /// Tracked by: https://github.com/lance-format/lance/issues/5435 + // TODO: Leverage object store bulk copy for efficient deep_clone + // + // All cloud storage providers support batch copy APIs that would provide significant + // performance improvements. We use single file copy before we have upstream support. + // + // Tracked by: https://github.com/lance-format/lance/issues/5435 let io_parallelism = self.object_store.io_parallelism(); let copy_futures = src_paths .iter() From 3ce420b53097c133b116092088c154d75e92975a Mon Sep 17 00:00:00 2001 From: majin1102 Date: Tue, 9 Dec 2025 17:26:34 +0800 Subject: [PATCH 7/8] fix ci --- rust/lance/src/dataset/tests/dataset_io.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance/src/dataset/tests/dataset_io.rs b/rust/lance/src/dataset/tests/dataset_io.rs index c79578539ba..4a294c65cfe 100644 --- a/rust/lance/src/dataset/tests/dataset_io.rs +++ b/rust/lance/src/dataset/tests/dataset_io.rs @@ -612,7 +612,7 @@ async fn test_deep_clone( let clone_dir = test_dir.join("clone_ds_old_ver"); let cloned_ds = clone_dir.to_str().unwrap(); let cloned_dataset = branch - .deep_clone(cloned_ds, original_version - 1, None) + .deep_clone(cloned_ds, ("branch", original_version - 1), None) .await .unwrap(); let store = branch.object_store(); From 5216f58cebed1f84d1d15332ca80338fb778f10c Mon Sep 17 00:00:00 2001 From: majin1102 Date: Mon, 22 Dec 2025 17:24:45 +0800 Subject: [PATCH 8/8] fix rebase issue --- rust/lance-table/src/io/deletion.rs | 2 +- rust/lance/src/dataset/tests/dataset_io.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/lance-table/src/io/deletion.rs b/rust/lance-table/src/io/deletion.rs index 4d193bf6ff0..5dd0028bfd7 100644 --- a/rust/lance-table/src/io/deletion.rs +++ b/rust/lance-table/src/io/deletion.rs @@ -54,7 +54,7 @@ pub fn relative_deletion_file_path(fragment_id: u64, deletion_file: &DeletionFil .. } = deletion_file; let suffix = file_type.suffix(); - format!("{DELETION_DIRS}/{fragment_id}-{read_version}-{id}.{suffix}") + format!("{DELETIONS_DIR}/{fragment_id}-{read_version}-{id}.{suffix}") } /// Write a deletion file for a fragment for a given deletion vector. diff --git a/rust/lance/src/dataset/tests/dataset_io.rs b/rust/lance/src/dataset/tests/dataset_io.rs index 4a294c65cfe..ca95d4e4772 100644 --- a/rust/lance/src/dataset/tests/dataset_io.rs +++ b/rust/lance/src/dataset/tests/dataset_io.rs @@ -541,7 +541,7 @@ async fn test_deep_clone( let original_version = branch.version().version; branch .tags() - .create_on_branch("tag", original_version, Some("branch")) + .create("tag", ("branch", original_version)) .await .unwrap();