Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions rust/lance-table/src/io/deletion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ pub fn deletion_file_path(base: &Path, fragment_id: u64, deletion_file: &Deletio
.child(format!("{fragment_id}-{read_version}-{id}.{suffix}"))
}

pub fn relative_deletion_file_path(fragment_id: u64, deletion_file: &DeletionFile) -> String {
let DeletionFile {
read_version,
id,
file_type,
..
} = deletion_file;
let suffix = file_type.suffix();
format!("{DELETIONS_DIR}/{fragment_id}-{read_version}-{id}.{suffix}")
}

/// Write a deletion file for a fragment for a given deletion vector.
///
/// Returns the deletion file if one was written. If no deletions were present,
Expand Down
190 changes: 187 additions & 3 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ use lance_io::object_store::{
use lance_io::utils::{read_last_block, read_message, read_metadata_offset, read_struct};
use lance_namespace::LanceNamespace;
use lance_table::format::{
pb, DataFile, DataStorageFormat, DeletionFile, Fragment, IndexMetadata, Manifest,
pb, DataFile, DataStorageFormat, DeletionFile, Fragment, IndexMetadata, Manifest, RowIdMeta,
};
use lance_table::io::commit::{
migrate_scheme_to_v2, write_manifest_file_to_path, CommitConfig, CommitError, CommitHandler,
CommitLock, ManifestLocation, ManifestNamingScheme, VERSIONS_DIR,
};
use lance_table::io::manifest::read_manifest;
use lance_table::io::manifest::{read_manifest, read_manifest_indexes};
use object_store::path::Path;
use prost::Message;
use roaring::RoaringBitmap;
Expand Down Expand Up @@ -112,7 +112,7 @@ use lance_core::box_error;
pub use lance_core::ROW_ID;
use lance_namespace::models::{CreateEmptyTableRequest, DescribeTableRequest};
use lance_table::feature_flags::{apply_feature_flags, can_read_dataset};
use lance_table::io::deletion::DELETIONS_DIR;
use lance_table::io::deletion::{relative_deletion_file_path, DELETIONS_DIR};
pub use schema_evolution::{
BatchInfo, BatchUDF, ColumnAlteration, NewColumnTransform, UDFCheckpointStore,
};
Expand Down Expand Up @@ -2150,6 +2150,102 @@ impl Dataset {
builder.execute(transaction).await
}

/// Deep clone the target version into a new dataset at target_path.
/// This performs a server-side copy of all relevant dataset files (data files,
/// deletion files, and any external row-id files) into the target dataset
/// without loading data into memory.
///
/// Parameters:
/// - `target_path`: the URI string to clone the dataset into.
/// - `version`: the version cloned from, could be a version number, branch head, or tag.
/// - `store_params`: the object store params to use for the new dataset.
pub async fn deep_clone(
&mut self,
target_path: &str,
version: impl Into<refs::Ref>,
store_params: Option<ObjectStoreParams>,
) -> Result<Self> {
use futures::StreamExt;

// Resolve source dataset and its manifest using checkout_version
let src_ds = self.checkout_version(version).await?;
let src_paths = src_ds.collect_paths().await?;

// Prepare target object store and base path
let (target_store, target_base) = ObjectStore::from_uri_and_params(
self.session.store_registry(),
target_path,
&store_params.clone().unwrap_or_default(),
)
.await?;

// Prevent cloning into an existing target dataset
if self
.commit_handler
.resolve_latest_location(&target_base, &target_store)
.await
.is_ok()
{
return Err(Error::DatasetAlreadyExists {
uri: target_path.to_string(),
location: location!(),
});
}

let build_absolute_path = |relative_path: &str, base: &Path| -> Path {
let mut path = base.clone();
for seg in relative_path.split('/') {
if !seg.is_empty() {
path = path.child(seg);
}
}
path
};

// TODO: Leverage object store bulk copy for efficient deep_clone
//
// All cloud storage providers support batch copy APIs that would provide significant
// performance improvements. We use single file copy before we have upstream support.
//
// Tracked by: https://github.com/lance-format/lance/issues/5435
let io_parallelism = self.object_store.io_parallelism();
let copy_futures = src_paths
.iter()
.map(|(relative_path, base)| {
let store = Arc::clone(&target_store);
let src_path = build_absolute_path(relative_path, base);
let target_path = build_absolute_path(relative_path, &target_base);
async move { store.copy(&src_path, &target_path).await.map(|_| ()) }
})
.collect::<Vec<_>>();

futures::stream::iter(copy_futures)
.buffer_unordered(io_parallelism)
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

// Record a Clone operation and commit via CommitBuilder
let ref_name = src_ds.manifest.branch.clone();
let ref_version = src_ds.manifest_location.version;
let clone_op = Operation::Clone {
is_shallow: false,
ref_name,
ref_version,
ref_path: src_ds.uri().to_string(),
branch_name: None,
};
let txn = Transaction::new(ref_version, clone_op, None);
let builder = CommitBuilder::new(WriteDestination::Uri(target_path))
.with_store_params(store_params.clone().unwrap_or_default())
.with_object_store(target_store.clone())
.with_commit_handler(self.commit_handler.clone())
.with_storage_format(self.manifest.data_storage_format.lance_file_version()?);
let new_ds = builder.execute(txn).await?;
Ok(new_ds)
}

async fn resolve_reference(&self, reference: refs::Ref) -> Result<(Option<String>, u64)> {
match reference {
refs::Ref::Version(branch, version_number) => {
Expand All @@ -2175,6 +2271,94 @@ impl Dataset {
}
}

/// Collect all (relative_path, path) of the dataset files.
async fn collect_paths(&self) -> Result<Vec<(String, Path)>> {
let mut file_paths: Vec<(String, Path)> = Vec::new();
for fragment in self.manifest.fragments.iter() {
if let Some(RowIdMeta::External(external_file)) = &fragment.row_id_meta {
return Err(Error::Internal {
message: format!(
"External row_id_meta is not supported yet. external file path: {}",
external_file.path
),
location: location!(),
});
}
for data_file in fragment.files.iter() {
let base_root = if let Some(base_id) = data_file.base_id {
let base_path =
self.manifest
.base_paths
.get(&base_id)
.ok_or_else(|| Error::Internal {
message: format!("base_id {} not found", base_id),
location: location!(),
})?;
Path::parse(base_path.path.as_str())?
} else {
self.base.clone()
};
file_paths.push((
format!("{}/{}", DATA_DIR, data_file.path.clone()),
base_root,
));
}
if let Some(deletion_file) = &fragment.deletion_file {
let base_root = if let Some(base_id) = deletion_file.base_id {
let base_path =
self.manifest
.base_paths
.get(&base_id)
.ok_or_else(|| Error::Internal {
message: format!("base_id {} not found", base_id),
location: location!(),
})?;
Path::parse(base_path.path.as_str())?
} else {
self.base.clone()
};
file_paths.push((
relative_deletion_file_path(fragment.id, deletion_file),
base_root,
));
}
}

let indices = read_manifest_indexes(
self.object_store.as_ref(),
&self.manifest_location,
&self.manifest,
)
.await?;

for index in &indices {
let base_root = if let Some(base_id) = index.base_id {
let base_path =
self.manifest
.base_paths
.get(&base_id)
.ok_or_else(|| Error::Internal {
message: format!("base_id {} not found", base_id),
location: location!(),
})?;
Path::parse(base_path.path.as_str())?
} else {
self.base.clone()
};
let index_root = base_root.child(INDICES_DIR).child(index.uuid.to_string());
let mut stream = self.object_store.read_dir_all(&index_root, None);
while let Some(meta) = stream.next().await.transpose()? {
if let Some(filename) = meta.location.filename() {
file_paths.push((
format!("{}/{}/{}", INDICES_DIR, index.uuid, filename),
base_root.clone(),
));
}
}
}
Ok(file_paths)
}

/// Run a SQL query against the dataset.
/// The underlying SQL engine is DataFusion.
/// Please refer to the DataFusion documentation for supported SQL syntax.
Expand Down
Loading