From 7d8c2ce40694e3737ac08ef99feb17db8186576d Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Sat, 16 Aug 2025 11:57:03 -0700 Subject: [PATCH 1/7] rebase --- rust/lance-table/src/io/commit.rs | 751 +++++++++++++++-- .../src/io/commit/external_manifest.rs | 3 +- rust/lance/Cargo.toml | 4 + rust/lance/benches/manifest_scheme.rs | 272 ++++++ rust/lance/src/dataset.rs | 244 +++++- rust/lance/src/dataset/builder.rs | 4 +- rust/lance/src/dataset/refs.rs | 4 +- rust/lance/src/dataset/transaction.rs | 2 +- rust/lance/src/dataset/write.rs | 791 ++++++++++++++++++ rust/lance/src/dataset/write/commit.rs | 30 +- rust/lance/src/dataset/write/insert.rs | 5 +- rust/lance/src/dataset/write/merge_insert.rs | 241 +++++- rust/lance/src/io/commit.rs | 31 +- 13 files changed, 2290 insertions(+), 92 deletions(-) create mode 100644 rust/lance/benches/manifest_scheme.rs diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index ce78e5e90a6..5805bb44665 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -22,12 +22,6 @@ //! terms of a lock. The trait [CommitLock] can be implemented as a simpler //! alternative to [CommitHandler]. -use std::io; -use std::pin::Pin; -use std::sync::atomic::AtomicBool; -use std::sync::Arc; -use std::{fmt::Debug, fs::DirEntry}; - use futures::future::Either; use futures::Stream; use futures::{ @@ -40,6 +34,11 @@ use log::warn; use object_store::PutOptions; use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore}; use snafu::location; +use std::io; +use std::pin::Pin; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; +use std::{fmt::Debug, fs::DirEntry}; use url::Url; #[cfg(feature = "dynamodb")] @@ -66,63 +65,100 @@ use crate::format::{is_detached_version, Index, Manifest}; const VERSIONS_DIR: &str = "_versions"; const MANIFEST_EXTENSION: &str = "manifest"; const DETACHED_VERSION_PREFIX: &str = "d"; +const LATEST_MANIFEST_HINT_FILE: &str = "_latest_manifest_hint.json"; /// How manifest files should be named. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum ManifestNamingScheme { /// `_versions/{version}.manifest` V1, - /// `_manifests/{u64::MAX - version}.manifest` + /// `_versions/{u64::MAX - version}.manifest` /// /// Zero-padded and reversed for O(1) lookup of latest version on object stores. V2, + /// `_versions/{reversed_binary_version}.manifest` with `_latest_manifest_hint.json` + /// + /// Binary representation reversed for S3 throughput optimization. + /// E.g., version 5 becomes "1010000000000000000000000000000000000000000000000000000000000000.manifest" + V3, } impl ManifestNamingScheme { + pub fn latest_manifest_hint_path(&self, base: &Path) -> Option { + match self { + Self::V3 => Some(base.child(VERSIONS_DIR).child(LATEST_MANIFEST_HINT_FILE)), + _ => None, + } + } + pub fn manifest_path(&self, base: &Path, version: u64) -> Path { let directory = base.child(VERSIONS_DIR); - if is_detached_version(version) { - // Detached versions should never show up first in a list operation which - // means it needs to come lexicographically after all attached manifest - // files and so we add the prefix `d`. There is no need to invert the - // version number since detached versions are not part of the version - let directory = base.child(VERSIONS_DIR); - directory.child(format!( - "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}" - )) - } else { - match self { - Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")), - Self::V2 => { + match self { + Self::V1 => { + if is_detached_version(version) { + // V1 doesn't support detached versions - use V2 prefix format for compatibility + directory.child(format!( + "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}" + )) + } else { + directory.child(format!("{version}.{MANIFEST_EXTENSION}")) + } + } + Self::V2 => { + if is_detached_version(version) { + // Detached versions for V2 use prefix to ensure lexicographic ordering + directory.child(format!( + "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}" + )) + } else { let inverted_version = u64::MAX - version; directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}")) } } + Self::V3 => { + // V3 uses normal binary format for both regular and detached versions + // Detached versions naturally end with "1" due to DETACHED_VERSION_MASK + let binary_str = format!("{:064b}", version); + let reversed_binary: String = binary_str.chars().rev().collect(); + directory.child(format!("{reversed_binary}.{MANIFEST_EXTENSION}")) + } } } pub fn parse_version(&self, filename: &str) -> Option { - let file_number = filename - .split_once('.') - // Detached versions will fail the `parse` step, which is ok. - .and_then(|(version_str, _)| version_str.parse::().ok()); match self { - Self::V1 => file_number, - Self::V2 => file_number.map(|v| u64::MAX - v), + Self::V1 => filename + .split_once('.') + .and_then(|(version_str, _)| version_str.parse::().ok()), + Self::V2 => filename + .split_once('.') + .and_then(|(version_str, _)| version_str.parse::().ok()) + .map(|v| u64::MAX - v), + Self::V3 => filename.split_once('.').and_then(|(binary_str, _)| { + if binary_str.len() == 64 && binary_str.chars().all(|c| c == '0' || c == '1') { + // Reverse the binary string and parse it + let reversed: String = binary_str.chars().rev().collect(); + u64::from_str_radix(&reversed, 2).ok() + } else { + None + } + }), } } pub fn detect_scheme(filename: &str) -> Option { if filename.starts_with(DETACHED_VERSION_PREFIX) { - // Currently, detached versions must imply V2 + // Detached versions with "d" prefix are V1 or V2 format return Some(Self::V2); } if filename.ends_with(MANIFEST_EXTENSION) { const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len(); - if filename.len() == V2_LEN { - Some(Self::V2) - } else { - Some(Self::V1) + const V3_LEN: usize = 64 + 1 + MANIFEST_EXTENSION.len(); + + match filename.len() { + V3_LEN => Some(Self::V3), + V2_LEN => Some(Self::V2), + _ => Some(Self::V1), } } else { None @@ -132,7 +168,9 @@ impl ManifestNamingScheme { pub fn detect_scheme_staging(filename: &str) -> Self { // We shouldn't have to worry about detached versions here since there is no // such thing as "detached" and "staged" at the same time. - if filename.chars().nth(20) == Some('.') { + if filename.chars().nth(64) == Some('.') { + Self::V3 + } else if filename.chars().nth(20) == Some('.') { Self::V2 } else { Self::V1 @@ -140,6 +178,139 @@ impl ManifestNamingScheme { } } +/// Write the latest manifest hint JSON file for V3 naming scheme. +/// This is a best-effort operation. +/// If it fails, it's not critical and the commit is still considered succeeded. +pub async fn write_latest_manifest_hint_best_effort( + object_store: &ObjectStore, + base: &Path, + manifest_location: &ManifestLocation, +) -> Result<()> { + let json_content = serde_json::to_string(&LatestManifestHint { + version: manifest_location.version, + size: manifest_location.size, + e_tag: manifest_location.e_tag.clone(), + })?; + let path = base.child(VERSIONS_DIR).child(LATEST_MANIFEST_HINT_FILE); + if let Err(e) = object_store.put(&path, json_content.as_bytes()).await { + warn!("Failed to write latest manifest hint: {:?}", e); + } + + Ok(()) +} + +/// Read the latest manifest hint JSON file for V3 naming scheme. +/// This is a best-effort operation. +/// If it fails, the caller needs to determine the current latest version through listing. +pub async fn read_latest_manifest_hint_best_effort( + object_store: &ObjectStore, + base: &Path, +) -> Result> { + let path = base.child(VERSIONS_DIR).child(LATEST_MANIFEST_HINT_FILE); + + // Optimize for local file system + if object_store.is_local() { + let local_path = lance_io::local::to_local_path(&path); + match std::fs::read(&local_path) { + Ok(bytes) => match serde_json::from_slice::(&bytes) { + Ok(hint) => Ok(Some(hint)), + Err(e) => { + warn!("Failed to parse latest manifest hint: {:?}", e); + Ok(None) + } + }, + Err(e) => { + warn!("Failed to read latest manifest hint: {:?}", e); + Ok(None) + } + } + } else { + // Use object store for cloud storage + match object_store.read_one_all(&path).await { + Ok(data) => { + match serde_json::from_slice::(data.iter().as_slice()) { + Ok(hint) => Ok(Some(hint)), + Err(e) => { + warn!("Failed to parse latest manifest hint: {:?}", e); + Ok(None) + } + } + } + Err(e) => { + warn!("Failed to read latest manifest hint: {:?}", e); + Ok(None) + } + } + } +} + +/// Perform batched existence check for manifest files, starting with `known_latest_version` + 1. +/// Each batch checks `batch_size` number of manifests concurrently. +/// Returns a stream of all found manifest locations. +/// If there is no new manifest after the `known_latest_version`, it returns an empty stream. +pub fn batch_head_manifests<'a>( + object_store: &'a ObjectStore, + base: &'a Path, + known_latest_version: u64, + batch_size: usize, +) -> BoxStream<'a, Result> { + futures::stream::try_unfold( + (known_latest_version + 1, false), + move |(current_version, finished)| async move { + if finished { + return Ok::>, (u64, bool))>, Error>( + None, + ); + } + + // Check existence for all paths in parallel using buffered concurrency + let results = futures::stream::iter(0..batch_size) + .map(|i| async move { + let version = current_version + i as u64; + let path = ManifestNamingScheme::V3.manifest_path(base, version); + match object_store.inner.head(&path).await { + Ok(meta) => Ok(Some(ManifestLocation { + version, + path, + e_tag: meta.e_tag, + size: Some(meta.size), + naming_scheme: ManifestNamingScheme::V3, + })), + Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(None), + Err(e) => Err(Error::from(e)), + } + }) + .buffered(object_store.io_parallelism()) + .try_collect::>() + .await?; + + let mut any_version_not_exist = false; + let mut found_locations = Vec::new(); + + for result in results { + match result { + Some(location) => found_locations.push(location), + None => any_version_not_exist = true, + } + } + + if any_version_not_exist || found_locations.is_empty() { + Ok(Some(( + futures::stream::iter(found_locations.into_iter().map(Ok)).boxed(), + (0, true), + ))) + } else { + Ok(Some(( + futures::stream::iter(found_locations.into_iter().map(Ok)).boxed(), + (current_version + batch_size as u64, false), + ))) + } + }, + ) + .try_flatten() + .boxed() +} + /// Migrate all V1 manifests to V2 naming scheme. /// /// This function will rename all V1 manifests to V2 naming scheme. @@ -199,6 +370,13 @@ pub struct ManifestLocation { pub e_tag: Option, } +#[derive(serde::Serialize, serde::Deserialize)] +pub struct LatestManifestHint { + pub version: u64, + pub size: Option, + pub e_tag: Option, +} + impl TryFrom for ManifestLocation { type Error = Error; @@ -233,6 +411,44 @@ async fn current_manifest_path( object_store: &ObjectStore, base: &Path, ) -> Result { + // Try to use V3 hint file if available + if let Some(latest_hint) = read_latest_manifest_hint_best_effort(object_store, base).await? { + // Use batch head manifests to check for newer versions + let known_latest_version = latest_hint.version; + // TODO: make batch size configurable. + // For performance sensitive use case, can even skip checking for newer version. + let mut manifest_stream = batch_head_manifests(object_store, base, known_latest_version, 2); + let mut latest_found: Option = None; + + while let Some(location_result) = manifest_stream.next().await { + match location_result { + Ok(location) => { + if latest_found.is_none() + || location.version > latest_found.as_ref().unwrap().version + { + latest_found = Some(location); + } + } + Err(e) => return Err(e), + } + } + + if let Some(location) = latest_found { + return Ok(location); + } else { + // If no newer manifests found, use the hint version + let naming_scheme = ManifestNamingScheme::V3; + let path = naming_scheme.manifest_path(base, known_latest_version); + return Ok(ManifestLocation { + version: known_latest_version, + path, + size: latest_hint.size, + naming_scheme, + e_tag: latest_hint.e_tag, + }); + } + } + if object_store.is_local() { if let Ok(Some(location)) = current_manifest_local(base) { return Ok(location); @@ -300,14 +516,17 @@ async fn current_manifest_path( .unwrap(); let mut current_meta = meta; - while let Some((scheme, meta)) = valid_manifests.next().await.transpose()? { - if matches!(scheme, ManifestNamingScheme::V2) { + while let Some((manifest_scheme, meta)) = valid_manifests.next().await.transpose()? { + if manifest_scheme != scheme { return Err(Error::Internal { - message: "Found V2 manifest in a V1 manifest directory".to_string(), + message: format!( + "Found {:?} manifest in a {:?} manifest directory: {:?}", + manifest_scheme, scheme, meta + ), location: location!(), }); } - let version = scheme + let version = manifest_scheme .parse_version(meta.location.filename().unwrap()) .unwrap(); if version > current_version { @@ -475,8 +694,9 @@ pub trait CommitHandler: Debug + Send + Sync { base_path: &Path, version: u64, object_store: &dyn OSObjectStore, + hint_scheme: Option, ) -> Result { - default_resolve_version(base_path, version, object_store).await + default_resolve_version(base_path, version, object_store, hint_scheme).await } /// If `sorted_descending` is `true`, the stream will yield manifests in descending @@ -560,49 +780,83 @@ pub trait CommitHandler: Debug + Send + Sync { } } -async fn default_resolve_version( +async fn try_resolve_version_with_scheme( + scheme: ManifestNamingScheme, base_path: &Path, version: u64, object_store: &dyn OSObjectStore, -) -> Result { - if is_detached_version(version) { - return Ok(ManifestLocation { - version, - // Detached versions are not supported with V1 naming scheme. If we need - // to support in the future we could use a different prefix (e.g. 'x' or something) - naming_scheme: ManifestNamingScheme::V2, - // Both V1 and V2 should give the same path for detached versions - path: ManifestNamingScheme::V2.manifest_path(base_path, version), - size: None, - e_tag: None, - }); - } - - // try V2, fallback to V1. - let scheme = ManifestNamingScheme::V2; +) -> Result> { let path = scheme.manifest_path(base_path, version); match object_store.head(&path).await { - Ok(meta) => Ok(ManifestLocation { + Ok(meta) => Ok(Some(ManifestLocation { version, path, size: Some(meta.size), naming_scheme: scheme, e_tag: meta.e_tag, - }), - Err(ObjectStoreError::NotFound { .. }) => { - // fallback to V1 - let scheme = ManifestNamingScheme::V1; - Ok(ManifestLocation { - version, - path: scheme.manifest_path(base_path, version), - size: None, - naming_scheme: scheme, - e_tag: None, - }) - } + })), + Err(ObjectStoreError::NotFound { .. }) => Ok(None), Err(e) => Err(e.into()), } } + +async fn default_resolve_version( + base_path: &Path, + version: u64, + object_store: &dyn OSObjectStore, + hint_scheme: Option, +) -> Result { + // Create the list of schemes to try, with hint scheme first if provided + let schemes_to_try = if let Some(hint) = hint_scheme { + // Try hint first, then remaining schemes in fallback order + match hint { + ManifestNamingScheme::V3 => vec![ + ManifestNamingScheme::V3, + ManifestNamingScheme::V2, + ManifestNamingScheme::V1, + ], + ManifestNamingScheme::V2 => vec![ + ManifestNamingScheme::V2, + ManifestNamingScheme::V3, + ManifestNamingScheme::V1, + ], + ManifestNamingScheme::V1 => vec![ + ManifestNamingScheme::V1, + ManifestNamingScheme::V2, + ManifestNamingScheme::V3, + ], + } + } else { + // Default order: V3 -> V2 -> V1 + vec![ + ManifestNamingScheme::V3, + ManifestNamingScheme::V2, + ManifestNamingScheme::V1, + ] + }; + + // Try each scheme in order + for scheme in schemes_to_try { + if let Some(mut result) = + try_resolve_version_with_scheme(scheme, base_path, version, object_store).await? + { + if is_detached_version(version) && scheme == ManifestNamingScheme::V1 { + // use V2 as the detached version scheme + result.naming_scheme = ManifestNamingScheme::V2; + return Ok(result); + } else { + return Ok(result); + } + } + } + + // If we reach here, no scheme worked + Err(Error::Internal { + message: format!("Version {version} not found in any supported naming scheme"), + location: location!(), + }) +} + /// Adapt an object_store credentials into AWS SDK creds #[cfg(feature = "dynamodb")] #[derive(Debug)] @@ -1063,6 +1317,7 @@ impl Debug for ConditionalPutCommitHandler { pub struct CommitConfig { pub num_retries: u32, pub skip_auto_cleanup: bool, + pub head_manifests_batch_size: usize, // TODO: add isolation_level } @@ -1071,6 +1326,7 @@ impl Default for CommitConfig { Self { num_retries: 20, skip_auto_cleanup: false, + head_manifests_batch_size: 8, } } } @@ -1078,6 +1334,7 @@ impl Default for CommitConfig { #[cfg(test)] mod tests { use super::*; + use object_store::{MultipartUpload, ObjectMeta, PutMultipartOpts}; #[test] fn test_manifest_naming_scheme() { @@ -1124,6 +1381,75 @@ mod tests { assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None); } + #[test] + fn test_v3_manifest_naming_scheme_file_names() { + let v3 = ManifestNamingScheme::V3; + + // Test manifest path generation for V3 + assert_eq!( + v3.manifest_path(&Path::from("base"), 0), + Path::from("base/_versions/0000000000000000000000000000000000000000000000000000000000000000.manifest") + ); + assert_eq!( + v3.manifest_path(&Path::from("base"), 5), + Path::from("base/_versions/1010000000000000000000000000000000000000000000000000000000000000.manifest") + ); + assert_eq!( + v3.manifest_path(&Path::from("base"), 1), + Path::from("base/_versions/1000000000000000000000000000000000000000000000000000000000000000.manifest") + ); + + // Test parse_version for V3 + assert_eq!( + v3.parse_version( + "0000000000000000000000000000000000000000000000000000000000000000.manifest" + ), + Some(0) + ); + assert_eq!( + v3.parse_version( + "1010000000000000000000000000000000000000000000000000000000000000.manifest" + ), + Some(5) + ); + assert_eq!( + v3.parse_version( + "1000000000000000000000000000000000000000000000000000000000000000.manifest" + ), + Some(1) + ); + + // Test detect_scheme for V3 + assert_eq!( + ManifestNamingScheme::detect_scheme( + "0000000000000000000000000000000000000000000000000000000000000000.manifest" + ), + Some(v3) + ); + assert_eq!( + ManifestNamingScheme::detect_scheme( + "1010000000000000000000000000000000000000000000000000000000000000.manifest" + ), + Some(v3) + ); + + // Test latest manifest hint path + assert_eq!( + v3.latest_manifest_hint_path(&Path::from("base")), + Some(Path::from("base/_versions/_latest_manifest_hint.json")) + ); + + // Test that non-V3 schemes return None for latest manifest path + assert_eq!( + ManifestNamingScheme::V1.latest_manifest_hint_path(&Path::from("base")), + None + ); + assert_eq!( + ManifestNamingScheme::V2.latest_manifest_hint_path(&Path::from("base")), + None + ); + } + #[tokio::test] async fn test_manifest_naming_migration() { let object_store = ObjectStore::memory(); @@ -1193,4 +1519,295 @@ mod tests { assert_eq!(actual_versions, expected_paths); } + + #[tokio::test] + async fn test_write_and_read_latest_manifest_hint() { + // Test with memory store + let object_store = ObjectStore::memory(); + let base = Path::from("test_base"); + + // Create a manifest location + let manifest_location = ManifestLocation { + version: 42, + path: ManifestNamingScheme::V3.manifest_path(&base, 42), + size: Some(1024), + naming_scheme: ManifestNamingScheme::V3, + e_tag: Some("test-etag".to_string()), + }; + + // Write the hint + write_latest_manifest_hint_best_effort(&object_store, &base, &manifest_location) + .await + .unwrap(); + + // Read the hint back + let hint = read_latest_manifest_hint_best_effort(&object_store, &base) + .await + .unwrap() + .expect("Should read hint successfully"); + + assert_eq!(hint.version, 42); + assert_eq!(hint.size, Some(1024)); + assert_eq!(hint.e_tag, Some("test-etag".to_string())); + } + + #[tokio::test] + async fn test_read_latest_manifest_hint_not_found() { + let object_store = ObjectStore::memory(); + let base = Path::from("test_base"); + + // Try to read non-existent hint + let hint = read_latest_manifest_hint_best_effort(&object_store, &base) + .await + .unwrap(); + + assert!(hint.is_none()); + } + + #[tokio::test] + async fn test_read_latest_manifest_hint_invalid_json() { + let object_store = ObjectStore::memory(); + let base = Path::from("test_base"); + let path = base.child(VERSIONS_DIR).child(LATEST_MANIFEST_HINT_FILE); + + // Write invalid JSON + object_store + .put(&path, b"invalid json".as_slice()) + .await + .unwrap(); + + // Should return None for invalid JSON + let hint = read_latest_manifest_hint_best_effort(&object_store, &base) + .await + .unwrap(); + + assert!(hint.is_none()); + } + + #[tokio::test] + async fn test_write_and_read_latest_manifest_hint_local() { + let tempdir = tempfile::tempdir().unwrap(); + let base = Path::from_absolute_path(tempdir.path().to_str().unwrap()).unwrap(); + let object_store = ObjectStore::local(); + + // Create versions directory + let versions_dir = tempdir.path().join("_versions"); + std::fs::create_dir(&versions_dir).unwrap(); + + // Create a manifest location + let manifest_location = ManifestLocation { + version: 99, + path: ManifestNamingScheme::V3.manifest_path(&base, 99), + size: Some(2048), + naming_scheme: ManifestNamingScheme::V3, + e_tag: Some("local-etag".to_string()), + }; + + // Write the hint + write_latest_manifest_hint_best_effort(&object_store, &base, &manifest_location) + .await + .unwrap(); + + // Read the hint back + let hint = read_latest_manifest_hint_best_effort(&object_store, &base) + .await + .unwrap() + .expect("Should read hint successfully"); + + assert_eq!(hint.version, 99); + assert_eq!(hint.size, Some(2048)); + assert_eq!(hint.e_tag, Some("local-etag".to_string())); + } + + #[tokio::test] + async fn test_latest_manifest_hint_with_null_fields() { + let object_store = ObjectStore::memory(); + let base = Path::from("test_base"); + + // Create a manifest location with None size and e_tag + let manifest_location = ManifestLocation { + version: 7, + path: ManifestNamingScheme::V3.manifest_path(&base, 7), + size: None, + naming_scheme: ManifestNamingScheme::V3, + e_tag: None, + }; + + // Write the hint + write_latest_manifest_hint_best_effort(&object_store, &base, &manifest_location) + .await + .unwrap(); + + // Read the hint back + let hint = read_latest_manifest_hint_best_effort(&object_store, &base) + .await + .unwrap() + .expect("Should read hint successfully"); + + assert_eq!(hint.version, 7); + assert_eq!(hint.size, None); + assert_eq!(hint.e_tag, None); + } + + #[tokio::test] + async fn test_write_latest_manifest_hint_failure() { + use lance_io::object_store::WrappingObjectStore; + use std::sync::Arc; + + // Create a wrapper that fails on put operations + #[derive(Debug)] + struct FailingWrapper; + + impl WrappingObjectStore for FailingWrapper { + fn wrap(&self, original: Arc) -> Arc { + Arc::new(FailingStore { inner: original }) + } + } + + #[derive(Debug)] + struct FailingStore { + inner: Arc, + } + + impl std::fmt::Display for FailingStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "FailingStore") + } + } + + #[async_trait::async_trait] + impl OSObjectStore for FailingStore { + async fn put( + &self, + _location: &Path, + _bytes: object_store::PutPayload, + ) -> object_store::Result { + Err(object_store::Error::Generic { + store: "failing", + source: Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + "Simulated failure", + )), + }) + } + + async fn put_opts( + &self, + _location: &Path, + _bytes: object_store::PutPayload, + _opts: PutOptions, + ) -> object_store::Result { + Err(object_store::Error::Generic { + store: "failing", + source: Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + "Simulated failure", + )), + }) + } + + // Delegate other methods to inner store + async fn get(&self, location: &Path) -> object_store::Result { + self.inner.get(location).await + } + + async fn get_opts( + &self, + location: &Path, + options: object_store::GetOptions, + ) -> object_store::Result { + self.inner.get_opts(location, options).await + } + + async fn head( + &self, + location: &Path, + ) -> object_store::Result { + self.inner.head(location).await + } + + async fn delete(&self, location: &Path) -> object_store::Result<()> { + self.inner.delete(location).await + } + + async fn list_with_delimiter( + &self, + prefix: Option<&Path>, + ) -> object_store::Result { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.copy_if_not_exists(from, to).await + } + + async fn rename_if_not_exists( + &self, + from: &Path, + to: &Path, + ) -> object_store::Result<()> { + self.inner.rename_if_not_exists(from, to).await + } + + async fn put_multipart( + &self, + location: &Path, + ) -> object_store::Result> { + self.inner.put_multipart(location).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOpts, + ) -> object_store::Result> { + self.inner.put_multipart_opts(location, opts).await + } + + fn list( + &self, + prefix: Option<&Path>, + ) -> BoxStream<'static, object_store::Result> { + self.inner.list(prefix) + } + } + + let base_store = ObjectStore::memory(); + let wrapper = Arc::new(FailingWrapper); + let wrapped_inner = wrapper.wrap(base_store.inner.clone()); + let object_store = ObjectStore::new( + wrapped_inner, + Url::parse("memory://test").unwrap(), + None, + None, + false, + true, + 16, + 3, + ); + + let base = Path::from("test_base"); + let manifest_location = ManifestLocation { + version: 100, + path: ManifestNamingScheme::V3.manifest_path(&base, 100), + size: Some(4096), + naming_scheme: ManifestNamingScheme::V3, + e_tag: Some("fail-test".to_string()), + }; + + // Write should succeed (best-effort, doesn't propagate errors) + let result = + write_latest_manifest_hint_best_effort(&object_store, &base, &manifest_location).await; + assert!(result.is_ok()); // Should not propagate the error + + // Verify the hint was not written + let hint = read_latest_manifest_hint_best_effort(&base_store, &base) + .await + .unwrap(); + assert!(hint.is_none()); + } } diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index f7e277e2a32..cd1b2c1d210 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -276,6 +276,7 @@ impl CommitHandler for ExternalManifestCommitHandler { base_path: &Path, version: u64, object_store: &dyn OSObjectStore, + hint_scheme: Option, ) -> std::result::Result { let location_res = self .external_manifest_store @@ -286,7 +287,7 @@ impl CommitHandler for ExternalManifestCommitHandler { Ok(p) => p, // not board external manifest yet, direct to object store Err(Error::NotFound { .. }) => { - let path = default_resolve_version(base_path, version, object_store) + let path = default_resolve_version(base_path, version, object_store, hint_scheme) .await .map_err(|_| Error::NotFound { uri: format!("{}@{}", base_path, version), diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index 50c12ecb585..96d63aaa212 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -151,5 +151,9 @@ harness = false name = "take" harness = false +[[bench]] +name = "manifest_scheme" +harness = false + [lints] workspace = true diff --git a/rust/lance/benches/manifest_scheme.rs b/rust/lance/benches/manifest_scheme.rs new file mode 100644 index 00000000000..df29e0943a9 --- /dev/null +++ b/rust/lance/benches/manifest_scheme.rs @@ -0,0 +1,272 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Benchmark comparing V2 vs V3 manifest naming scheme performance +//! +//! This benchmark compares performance between V2 and V3 manifest schemes for: +//! 1. checkout_latest operations +//! 2. concurrent commits with conflict resolution +//! +//! Storage backend can be controlled via LANCE_BENCH_STORAGE_PREFIX environment variable: +//! - If not set: uses a temporary directory (default) +//! - If set to a prefix: uses that prefix for the dataset path +//! - Example: LANCE_BENCH_STORAGE_PREFIX=s3://my-bucket/path/to/ +//! - Example: LANCE_BENCH_STORAGE_PREFIX=/tmp/lance/ +//! - Example: LANCE_BENCH_STORAGE_PREFIX=memory:// +//! +//! Usage: +//! ``` +//! # Test with temporary directory (default) +//! cargo bench --bench manifest_scheme +//! +//! # Test with S3 storage +//! LANCE_BENCH_STORAGE_PREFIX=s3://my-bucket/benchmarks/ cargo bench --bench manifest_scheme +//! +//! # Test with memory storage +//! LANCE_BENCH_STORAGE_PREFIX=memory:// cargo bench --bench manifest_scheme +//! ``` + +#![allow(clippy::print_stdout)] + +use std::sync::Arc; + +use arrow_array::types::{Float32Type, Int32Type}; +use arrow_array::{Float32Array, Int32Array, RecordBatch, RecordBatchIterator}; +use arrow_schema::{DataType, Field, Schema as ArrowSchema}; +use criterion::{criterion_group, criterion_main, Criterion}; +use lance::dataset::{Dataset, InsertBuilder, WriteDestination, WriteMode, WriteParams}; +use lance_datafusion::datagen::DatafusionDatagenExt; +use lance_datagen::{BatchCount, RowCount}; +use lance_table::io::commit::ManifestNamingScheme; +#[cfg(target_os = "linux")] +use pprof::criterion::{Output, PProfProfiler}; + +/// Get storage URI prefix based on environment variable +fn get_storage_prefix() -> String { + std::env::var("LANCE_BENCH_STORAGE_PREFIX").unwrap_or_else(|_| { + // Use a temporary directory if no prefix is specified + let temp_dir = std::env::temp_dir(); + let bench_dir = temp_dir.join("lance_bench"); + format!("{}/", bench_dir.display()) + }) +} + +/// Get storage type name for display +fn get_storage_type() -> String { + let prefix = get_storage_prefix(); + if prefix.starts_with("s3://") { + "s3".to_string() + } else if prefix.starts_with("memory://") { + "memory".to_string() + } else if prefix.starts_with("gs://") { + "gcs".to_string() + } else if prefix.starts_with("az://") { + "azure".to_string() + } else { + "file".to_string() + } +} + +/// Create a test dataset with specified number of versions and manifest scheme +async fn create_test_dataset( + base_uri: &str, + num_versions: u64, + manifest_scheme: ManifestNamingScheme, +) -> Dataset { + // Clean up existing dataset if using file storage + if !base_uri.starts_with("memory://") + && !base_uri.starts_with("s3://") + && !base_uri.starts_with("gs://") + && !base_uri.starts_with("az://") + { + let _ = std::fs::remove_dir_all(base_uri); + } + + let write_params = WriteParams { + enable_v2_manifest_paths: matches!(manifest_scheme, ManifestNamingScheme::V2), + enable_v3_manifest_paths: matches!(manifest_scheme, ManifestNamingScheme::V3), + max_rows_per_file: 100, + max_rows_per_group: 50, + ..Default::default() + }; + + // Create initial dataset using into_reader_rows and Dataset::write pattern + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Float32, false), + ])); + + let batch_size = 50; + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from_iter_values(0..batch_size)), + Arc::new(Float32Array::from_iter_values( + (0..batch_size).map(|x| x as f32 * 0.1), + )), + ], + ) + .unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write(reader, base_uri, Some(write_params)) + .await + .unwrap(); + + // Add additional versions using lightweight update_config operations + for i in 1..num_versions { + dataset + .update_config([(format!("version_{}", i), i.to_string())]) + .await + .unwrap(); + } + + dataset +} + +/// Benchmark checkout_latest performance: V2 vs V3 +fn bench_checkout_latest_v2_vs_v3(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let storage_prefix = get_storage_prefix(); + let storage_type = get_storage_type(); + + for num_versions in [10, 50, 100] { + for scheme in [ManifestNamingScheme::V2, ManifestNamingScheme::V3] { + let scheme_name = match scheme { + ManifestNamingScheme::V2 => "V2", + ManifestNamingScheme::V3 => "V3", + _ => unreachable!(), + }; + + let dataset_uri = format!( + "{}bench_dataset_{}_{}.lance", + storage_prefix, + scheme_name.to_lowercase(), + num_versions + ); + let dataset = rt.block_on(create_test_dataset(&dataset_uri, num_versions, scheme)); + let latest_version = dataset.version().version; + // Start from an older version to test checkout_latest + let start_version = if num_versions > 5 { + latest_version - 5 + } else { + 0 + }; + + c.bench_function( + &format!( + "Checkout Latest {} ({} versions, {})", + scheme_name, num_versions, storage_type + ), + |b| { + b.to_async(&rt).iter(|| async { + // Open dataset at older version then checkout latest + let mut ds = Dataset::open(&dataset_uri).await.unwrap(); + ds = ds.checkout_version(start_version).await.unwrap(); + ds.checkout_latest().await.unwrap(); + assert_eq!(ds.version().version, latest_version); + }) + }, + ); + } + } +} + +/// Benchmark concurrent commit performance: V2 vs V3 +fn bench_concurrent_commits_v2_vs_v3(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let storage_prefix = get_storage_prefix(); + let storage_type = get_storage_type(); + + for num_versions in [10, 50, 100] { + for commits_behind in [5, 10, 20] { + for scheme in [ManifestNamingScheme::V2, ManifestNamingScheme::V3] { + let scheme_name = match scheme { + ManifestNamingScheme::V2 => "V2", + ManifestNamingScheme::V3 => "V3", + _ => unreachable!(), + }; + + // Skip if we don't have enough versions + if commits_behind >= num_versions { + continue; + } + + let dataset_uri = format!( + "{}bench_concurrent_{}_{}.lance", + storage_prefix, + scheme_name.to_lowercase(), + num_versions + ); + let dataset = rt.block_on(create_test_dataset(&dataset_uri, num_versions, scheme)); + let latest_version = dataset.version().version; + let target_version = latest_version - commits_behind; + + c.bench_function( + &format!( + "Concurrent Commit {} ({} versions, {} behind, {})", + scheme_name, num_versions, commits_behind, storage_type + ), + |b| { + b.to_async(&rt).iter(|| async { + let old_dataset = + dataset.checkout_version(target_version).await.unwrap(); + + // Create small append data for concurrent write + let append_data = lance_datagen::gen_batch() + .col( + "id", + lance_datagen::array::step_custom::(1000, 1), + ) + .col("value", lance_datagen::array::rand::()) + .into_df_stream(RowCount::from(10), BatchCount::from(1)); + + // Perform concurrent commit with conflict resolution + let write_params = WriteParams { + mode: WriteMode::Append, + head_manifests_batch_size: if matches!( + scheme, + ManifestNamingScheme::V3 + ) { + 8 + } else { + 1 + }, + ..Default::default() + }; + + InsertBuilder::new(WriteDestination::Dataset(Arc::new(old_dataset))) + .with_params(&write_params) + .execute_stream(append_data) + .await + .unwrap(); + }) + }, + ); + } + } + } +} + +#[cfg(target_os = "linux")] +criterion_group!( + name = benches; + config = Criterion::default() + .significance_level(0.01) + .sample_size(50) + .warm_up_time(std::time::Duration::from_secs(5)) + .with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = bench_checkout_latest_v2_vs_v3, bench_concurrent_commits_v2_vs_v3 +); + +#[cfg(not(target_os = "linux"))] +criterion_group!( + name = benches; + config = Criterion::default() + .significance_level(0.01) + .sample_size(50) + .warm_up_time(std::time::Duration::from_secs(5)); + targets = bench_checkout_latest_v2_vs_v3, bench_concurrent_commits_v2_vs_v3 +); + +criterion_main!(benches); diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index b3706cd516b..e7fca1dc581 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -36,8 +36,8 @@ use lance_table::format::{ DataStorageFormat, Fragment, Index, Manifest, MAGIC, MAJOR_VERSION, MINOR_VERSION, }; use lance_table::io::commit::{ - migrate_scheme_to_v2, CommitConfig, CommitError, CommitHandler, CommitLock, ManifestLocation, - ManifestNamingScheme, + batch_head_manifests, migrate_scheme_to_v2, CommitConfig, CommitError, CommitHandler, + CommitLock, ManifestLocation, ManifestNamingScheme, }; use lance_table::io::manifest::{read_manifest, write_manifest}; use object_store::path::Path; @@ -417,7 +417,12 @@ impl Dataset { let base_path = self.base.clone(); let manifest_location = self .commit_handler - .resolve_version_location(&base_path, version, &self.object_store.inner) + .resolve_version_location( + &base_path, + version, + &self.object_store.inner, + Some(self.manifest_location.naming_scheme), + ) .await?; if self.already_checked_out(&manifest_location) { @@ -685,7 +690,12 @@ impl Dataset { let blobs_path = self.base.child(BLOB_DIR); let blob_manifest_location = self .commit_handler - .resolve_version_location(&blobs_path, blobs_version, &self.object_store.inner) + .resolve_version_location( + &blobs_path, + blobs_version, + &self.object_store.inner, + Some(self.manifest_location.naming_scheme), + ) .await?; let manifest = read_manifest( &self.object_store, @@ -1750,6 +1760,117 @@ pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<' } } +pub(crate) async fn load_and_sort_new_transactions_v3( + dataset: &Dataset, + head_manifests_batch_size: usize, +) -> Result<(Dataset, Vec<(u64, Arc)>)> { + let io_parallelism = dataset.object_store().io_parallelism(); + let latest_version = dataset.manifest.version; + let locations = batch_head_manifests( + &dataset.object_store, + &dataset.base, + latest_version, + head_manifests_batch_size, + ); + + let manifests = locations + .map_ok(move |location| async move { + let manifest_key = ManifestKey { + version: location.version, + e_tag: location.e_tag.as_deref(), + }; + let manifest = + if let Some(cached) = dataset.metadata_cache.get_with_key(&manifest_key).await { + cached + } else { + let loaded = Arc::new( + Dataset::load_manifest( + dataset.object_store(), + &location, + &dataset.uri, + dataset.session.as_ref(), + ) + .await?, + ); + dataset + .metadata_cache + .insert_with_key(&manifest_key, loaded.clone()) + .await; + loaded + }; + Ok((manifest, location)) + }) + .try_buffer_unordered(io_parallelism / 2); + let transactions = manifests + .map_ok(move |(manifest, location)| async move { + let manifest_copy = manifest.clone(); + let tx_key = TransactionKey { + version: manifest.version, + }; + let transaction = + if let Some(cached) = dataset.metadata_cache.get_with_key(&tx_key).await { + cached + } else { + let dataset_version = Dataset::checkout_manifest( + dataset.object_store.clone(), + dataset.base.clone(), + dataset.uri.clone(), + manifest_copy.clone(), + location.clone(), + dataset.session(), + dataset.commit_handler.clone(), + dataset.file_reader_options.clone(), + )?; + let object_store = dataset_version.object_store(); + let path = dataset_version + .manifest + .transaction_file + .as_ref() + .ok_or_else(|| Error::Internal { + message: format!( + "Dataset version {} does not have a transaction file", + manifest_copy.version + ), + location: location!(), + })?; + let loaded = + Arc::new(read_transaction_file(object_store, &dataset.base, path).await?); + dataset + .metadata_cache + .insert_with_key(&tx_key, loaded.clone()) + .await; + loaded + }; + Ok((manifest, location, transaction)) + }) + .try_buffer_unordered(io_parallelism / 2); + + let mut new_transactions = transactions.boxed().try_collect::>().await?; + new_transactions.sort_by_key(|(manifest, _, _)| manifest.version); + let latest_dataset = if let Some(last_transaction) = new_transactions.last() { + Dataset::checkout_manifest( + dataset.object_store.clone(), + dataset.base.clone(), + dataset.uri.clone(), + last_transaction.0.clone(), + last_transaction.1.clone(), + dataset.session(), + dataset.commit_handler.clone(), + dataset.file_reader_options.clone(), + )? + } else { + dataset.clone() + }; + + Ok(( + latest_dataset, + new_transactions + .into_iter() + .map(|(manifest, _, transaction)| (manifest.version, transaction)) + .collect(), + )) +} + /// # Schema Evolution /// /// Lance datasets support evolving the schema. Several operations are @@ -2067,7 +2188,7 @@ mod tests { use crate::dataset::transaction::DataReplacementGroup; use crate::dataset::WriteMode::Overwrite; use crate::index::vector::VectorIndexParams; - use crate::utils::test::copy_test_data_to_tmp; + use crate::utils::test::{copy_test_data_to_tmp, DatagenExt, FragmentCount, FragmentRowCount}; use arrow::array::{as_struct_array, AsArray, GenericListBuilder, GenericStringBuilder}; use arrow::compute::concat_batches; @@ -7274,4 +7395,117 @@ mod tests { dataset.validate().await.unwrap(); assert_eq!(dataset.count_rows(None).await.unwrap(), 3); } + + #[tokio::test] + async fn test_v3_naming_scheme_load_new_transactions() { + // Helper function to verify transaction versions and content + fn verify_transactions( + transactions: &[(u64, Arc)], + base_version: u64, + expected_count: usize, + ) { + assert_eq!( + transactions.len(), + expected_count, + "Should have {} transactions", + expected_count + ); + + if expected_count == 0 { + return; + } + + let mut versions: Vec = transactions.iter().map(|(v, _)| *v).collect(); + versions.sort(); + let expected_versions: Vec = (1..=expected_count) + .map(|i| base_version + i as u64) + .collect(); + assert_eq!(versions, expected_versions); + + // Verify transaction content + for (i, (version, transaction)) in transactions.iter().enumerate() { + let expected_version = base_version + (i as u64) + 1; + assert_eq!(*version, expected_version); + + // Check that transaction contains UpdateConfig operation + match &transaction.operation { + crate::dataset::transaction::Operation::UpdateConfig { + upsert_values, .. + } => { + if let Some(upsert_values) = upsert_values { + let expected_key = format!("test_key_{}", i + 1); + let expected_value = format!("test_value_{}", i + 1); + assert_eq!(upsert_values.get(&expected_key), Some(&expected_value)); + } else { + panic!("Expected upsert_values in UpdateConfig operation"); + } + } + _ => panic!( + "Expected UpdateConfig operation, got {:?}", + transaction.operation + ), + } + } + } + + // Create initial dataset with V3 naming scheme using memory store + let write_params = WriteParams { + enable_v3_manifest_paths: true, + ..Default::default() + }; + + let mut dataset = lance_datagen::gen_batch() + .col( + "id", + lance_datagen::array::step::(), + ) + .into_ram_dataset_with_params( + FragmentCount::from(1), + FragmentRowCount::from(10), + Some(write_params), + ) + .await + .unwrap(); + + let base_version = dataset.version().version; + + // Add 4 new versions using simple UpdateConfig transactions + for i in 1..=4 { + dataset + .update_config([(format!("test_key_{}", i), format!("test_value_{}", i))]) + .await + .unwrap(); + } + + // Create an older dataset version to load new transactions from + let old_dataset = dataset.checkout_version(base_version).await.unwrap(); + + // Test Case 1: No new transactions when starting from latest version + let (new_dataset, transactions) = load_and_sort_new_transactions_v3(&dataset, 2) + .await + .unwrap(); + assert_eq!(new_dataset.version().version, base_version + 4); + verify_transactions(&transactions, base_version + 4, 0); + + // Test Case 2: New transactions within batch size (batch_size=10, actual=4) + let (new_dataset, transactions) = load_and_sort_new_transactions_v3(&old_dataset, 10) + .await + .unwrap(); + assert_eq!(new_dataset.version().version, base_version + 4); + verify_transactions(&transactions, base_version, 4); + + // Test Case 3: Exactly single batch size (batch_size=4, actual=4) + let (new_dataset, transactions) = load_and_sort_new_transactions_v3(&old_dataset, 4) + .await + .unwrap(); + assert_eq!(new_dataset.version().version, base_version + 4); + verify_transactions(&transactions, base_version, 4); + + // Test Case 4: Beyond single batch size (batch_size=3, actual=4 requires 2 batches: 3+1) + let (new_dataset, transactions) = load_and_sort_new_transactions_v3(&old_dataset, 3) + .await + .unwrap(); + assert_eq!(new_dataset.version().version, base_version + 4); + verify_transactions(&transactions, base_version, 4); + } } diff --git a/rust/lance/src/dataset/builder.rs b/rust/lance/src/dataset/builder.rs index e210265ed98..ce28499462f 100644 --- a/rust/lance/src/dataset/builder.rs +++ b/rust/lance/src/dataset/builder.rs @@ -326,7 +326,7 @@ impl DatasetBuilder { let (manifest, location) = if let Some(mut manifest) = manifest { let location = commit_handler - .resolve_version_location(&base_path, manifest.version, &object_store.inner) + .resolve_version_location(&base_path, manifest.version, &object_store.inner, None) .await?; if manifest.schema.has_dictionary_types() && manifest.should_use_legacy_format() { let reader = object_store.open(&location.path).await?; @@ -337,7 +337,7 @@ impl DatasetBuilder { let manifest_location = match version { Some(version) => { commit_handler - .resolve_version_location(&base_path, version, &object_store.inner) + .resolve_version_location(&base_path, version, &object_store.inner, None) .await? } None => commit_handler diff --git a/rust/lance/src/dataset/refs.rs b/rust/lance/src/dataset/refs.rs index f672a0949ca..c2acf632f3b 100644 --- a/rust/lance/src/dataset/refs.rs +++ b/rust/lance/src/dataset/refs.rs @@ -132,7 +132,7 @@ impl Tags { let manifest_file = self .commit_handler - .resolve_version_location(&self.base, version, &self.object_store.inner) + .resolve_version_location(&self.base, version, &self.object_store.inner, None) .await?; if !self.object_store().exists(&manifest_file.path).await? { @@ -188,7 +188,7 @@ impl Tags { let manifest_file = self .commit_handler - .resolve_version_location(&self.base, version, &self.object_store.inner) + .resolve_version_location(&self.base, version, &self.object_store.inner, None) .await?; if !self.object_store().exists(&manifest_file.path).await? { diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 3ffbb21b588..d557c0fcbd0 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -1149,7 +1149,7 @@ impl Transaction { tx_path: &str, ) -> Result<(Manifest, Vec)> { let location = commit_handler - .resolve_version_location(base_path, version, &object_store.inner) + .resolve_version_location(base_path, version, &object_store.inner, None) .await?; let mut manifest = read_manifest(object_store, &location.path, location.size).await?; manifest.set_timestamp(timestamp_to_nanos(config.timestamp)); diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 4dbdc5f26b7..ca1a9bef6a8 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -202,6 +202,13 @@ pub struct WriteParams { /// Default is False. pub enable_v2_manifest_paths: bool, + /// If set to true, and this is a new dataset, uses the new v3 manifest paths. + /// These use reversed binary representation for S3 throughput optimization + /// and include a _latest_manifest.json file for best-effort latest version tracking. + /// This parameter has no effect on existing datasets. + /// Default is False. + pub enable_v3_manifest_paths: bool, + pub session: Option>, /// If Some and this is a new dataset, old dataset versions will be @@ -212,6 +219,12 @@ pub struct WriteParams { /// Both parameters must be set to invoke autocleaning. pub auto_cleanup: Option, + /// Batch size for loading head manifests when checking for concurrent writes + /// in V3 manifest naming scheme. Smaller values can help reduce memory usage + /// but may increase the number of object store requests. + /// Default is 8. + pub head_manifests_batch_size: usize, + /// If true, skip auto cleanup during commits. This should be set to true /// for high frequency writes to improve performance. This is also useful /// if the writer does not have delete permissions and the clean up would @@ -222,6 +235,12 @@ pub struct WriteParams { /// This can include commit messages, engine information, etc. /// this properties map will be persisted as part of Transaction object. pub transaction_properties: Option>>, + + /// If true, create a detached commit that is not part of the mainline history. + /// Detached commits will never show up in the dataset's history. + /// This can be used to stage changes or to handle "secondary" datasets + /// whose lineage is tracked elsewhere. Default is false. + pub detached: bool, } impl Default for WriteParams { @@ -239,10 +258,13 @@ impl Default for WriteParams { data_storage_version: None, enable_move_stable_row_ids: false, enable_v2_manifest_paths: false, + enable_v3_manifest_paths: false, session: None, auto_cleanup: Some(AutoCleanupParams::default()), + head_manifests_batch_size: 8, skip_auto_cleanup: false, transaction_properties: None, + detached: false, } } } @@ -796,13 +818,18 @@ impl Iterator for SpillStreamIter { mod tests { use super::*; + use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount}; + use arrow_array::types::{Float32Type, Int32Type}; use arrow_array::{Int32Array, RecordBatchReader, StructArray}; use arrow_schema::{DataType, Field as ArrowField, Fields, Schema as ArrowSchema}; use datafusion::{error::DataFusionError, physical_plan::stream::RecordBatchStreamAdapter}; use futures::TryStreamExt; + use lance_datafusion::datagen::DatafusionDatagenExt; use lance_datagen::{array, gen_batch, BatchCount, RowCount}; use lance_file::reader::FileReader; use lance_io::traits::Reader; + use lance_table::format::is_detached_version; + use lance_table::io::commit::{read_latest_manifest_hint_best_effort, ManifestNamingScheme}; #[tokio::test] async fn test_chunking_large_batches() { @@ -1095,4 +1122,768 @@ mod tests { let batch = reader.read_batch(0, .., &schema).await.unwrap(); assert_eq!(batch, data); } + + #[tokio::test] + async fn test_v3_manifest_naming_scheme_write_and_read() { + let write_params = WriteParams { + enable_v3_manifest_paths: true, + max_rows_per_file: 100, + ..Default::default() + }; + + let mut dataset = lance_datagen::gen_batch() + .col("id", lance_datagen::array::step::()) + .col("value", lance_datagen::array::rand::()) + .into_ram_dataset_with_params( + FragmentCount::from(1), + FragmentRowCount::from(50), + Some(write_params.clone()), + ) + .await + .unwrap(); + + // Verify V3 naming scheme is used + assert_eq!( + dataset.manifest_location.naming_scheme, + ManifestNamingScheme::V3 + ); + assert_eq!(dataset.manifest.version, 1); + + // Verify manifest file exists with correct naming + let expected_manifest_path = dataset + .base + .child("_versions") + .child("1000000000000000000000000000000000000000000000000000000000000000.manifest"); + let manifest_exists = dataset + .object_store + .exists(&expected_manifest_path) + .await + .unwrap(); + assert!( + manifest_exists, + "V3 manifest file should exist at {:?}", + expected_manifest_path + ); + + // Verify _latest_manifest_hint.json exists and contains correct version + let latest_version = + read_latest_manifest_hint_best_effort(&dataset.object_store, &dataset.base) + .await + .unwrap(); + assert_eq!(latest_version.unwrap().version, 1); + + // Create version 2 + let append_data1 = lance_datagen::gen_batch() + .col("id", lance_datagen::array::step_custom::(100, 1)) + .col("value", lance_datagen::array::rand::()) + .into_reader_rows(RowCount::from(25), BatchCount::from(1)); + + dataset.append(append_data1, None).await.unwrap(); + assert_eq!(dataset.manifest.version, 2); + + // Verify the second manifest file exists with correct naming + let expected_manifest_path_v2 = dataset + .base + .child("_versions") + .child("0100000000000000000000000000000000000000000000000000000000000000.manifest"); + let manifest_v2_exists = dataset + .object_store + .exists(&expected_manifest_path_v2) + .await + .unwrap(); + assert!( + manifest_v2_exists, + "V3 manifest file for version 2 should exist at {:?}", + expected_manifest_path_v2 + ); + + // Verify _latest_manifest_hint.json is updated to version 2 + let latest_version = + read_latest_manifest_hint_best_effort(&dataset.object_store, &dataset.base) + .await + .unwrap(); + assert_eq!(latest_version.map(|v| v.version), Some(2)); + + // Create version 3 + let append_data2 = lance_datagen::gen_batch() + .col("id", lance_datagen::array::step_custom::(125, 1)) + .col("value", lance_datagen::array::rand::()) + .into_reader_rows(RowCount::from(25), BatchCount::from(1)); + + dataset.append(append_data2, None).await.unwrap(); + assert_eq!(dataset.manifest.version, 3); + + // Verify third manifest file exists correct naming + let expected_manifest_path_v3 = dataset + .base + .child("_versions") + .child("1100000000000000000000000000000000000000000000000000000000000000.manifest"); + let manifest_v3_exists = dataset + .object_store + .exists(&expected_manifest_path_v3) + .await + .unwrap(); + assert!( + manifest_v3_exists, + "V3 manifest file for version 3 should exist at {:?}", + expected_manifest_path_v3 + ); + + // Verify _latest_manifest.json is updated to version 3 + let latest_version = + read_latest_manifest_hint_best_effort(&dataset.object_store, &dataset.base) + .await + .unwrap(); + assert_eq!(latest_version.map(|v| v.version), Some(3)); + + // Verify final dataset state and data integrity + let scan = dataset.scan(); + let batches: Vec = scan + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + // The initial data creates 50 rows + // Then we append 25 rows twice + assert_eq!(total_rows, 100); // After two append operations (50 + 25 + 25) + } + + #[tokio::test] + async fn test_v3_manifest_naming_scheme_concurrent_write() { + let write_params = WriteParams { + enable_v3_manifest_paths: true, + max_rows_per_file: 100, + ..Default::default() + }; + + // Create initial dataset with V3 naming scheme using in-memory storage + let mut dataset = lance_datagen::gen_batch() + .col("id", lance_datagen::array::step::()) + .col("value", lance_datagen::array::rand::()) + .into_ram_dataset_with_params( + FragmentCount::from(1), + FragmentRowCount::from(50), + Some(write_params.clone()), + ) + .await + .unwrap(); + + // Verify V3 naming scheme is used + assert_eq!( + dataset.manifest_location.naming_scheme, + ManifestNamingScheme::V3 + ); + assert_eq!(dataset.manifest.version, 1); + + // Check that _latest_manifest_hint.json exists + let latest_manifest_path = dataset + .base + .child("_versions") + .child("_latest_manifest_hint.json"); + assert!(dataset + .object_store + .exists(&latest_manifest_path) + .await + .unwrap()); + + // Add 4 new versions using lightweight UpdateConfig transactions (simulating intermediate commits) + for i in 1..=4 { + dataset + .update_config([(format!("test_key_{}", i), format!("test_value_{}", i))]) + .await + .unwrap(); + } + + // Verify _latest_manifest_hint.json is updated to version 5 + let latest_version = + read_latest_manifest_hint_best_effort(&dataset.object_store, &dataset.base) + .await + .unwrap(); + assert_eq!(latest_version.map(|v| v.version), Some(5)); + + // Test Case 1: Concurrent write with default batch size + let concurrent_dataset_1 = dataset.checkout_version(1).await.unwrap(); + assert_eq!(concurrent_dataset_1.manifest.version, 1); + + let concurrent_data_1 = lance_datagen::gen_batch() + .col("id", lance_datagen::array::step_custom::(200, 1)) + .col("value", lance_datagen::array::rand::()) + .into_df_stream(RowCount::from(25), BatchCount::from(1)); + + let rebased_dataset_1 = + InsertBuilder::new(WriteDestination::Dataset(Arc::new(concurrent_dataset_1))) + .with_params(&WriteParams { + mode: WriteMode::Append, + ..write_params.clone() + }) + .execute_stream(concurrent_data_1) + .await + .unwrap(); + + // This should succeed and create version 6 (after rebasing over versions 2-5) + assert_eq!(rebased_dataset_1.manifest.version, 6); + + // Verify read after write for test case 1 + let scan_1 = rebased_dataset_1.scan(); + let batches_1: Vec = scan_1 + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let total_rows_1: usize = batches_1.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows_1, 75); // 50 (initial) + 25 (concurrent append 1) + + // Test Case 2: Concurrent write with small batch size (1) + let concurrent_dataset_2 = dataset.checkout_version(2).await.unwrap(); + assert_eq!(concurrent_dataset_2.manifest.version, 2); + + let concurrent_data_2 = lance_datagen::gen_batch() + .col("id", lance_datagen::array::step_custom::(300, 1)) + .col("value", lance_datagen::array::rand::()) + .into_df_stream(RowCount::from(30), BatchCount::from(1)); + + let rebased_dataset_2 = + InsertBuilder::new(WriteDestination::Dataset(Arc::new(concurrent_dataset_2))) + .with_params(&WriteParams { + mode: WriteMode::Append, + head_manifests_batch_size: 1, // Small batch size - forces multiple batches + ..write_params.clone() + }) + .execute_stream(concurrent_data_2) + .await + .unwrap(); + + // This should succeed and create version 7 (after rebasing over versions 3-6) + assert_eq!(rebased_dataset_2.manifest.version, 7); + + // Verify read after write for test case 2 + let scan_2 = rebased_dataset_2.scan(); + let batches_2: Vec = scan_2 + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let total_rows_2: usize = batches_2.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows_2, 105); // 50 (initial) + 25 (concurrent append 1) + 30 (concurrent append 2) + + // Test Case 3: Concurrent write with large batch size (20) + let concurrent_dataset_3 = dataset.checkout_version(3).await.unwrap(); + assert_eq!(concurrent_dataset_3.manifest.version, 3); + + let concurrent_data_3 = lance_datagen::gen_batch() + .col("id", lance_datagen::array::step_custom::(400, 1)) + .col("value", lance_datagen::array::rand::()) + .into_df_stream(RowCount::from(20), BatchCount::from(1)); + + let rebased_dataset_3 = + InsertBuilder::new(WriteDestination::Dataset(Arc::new(concurrent_dataset_3))) + .with_params(&WriteParams { + mode: WriteMode::Append, + head_manifests_batch_size: 20, // Large batch size - single batch + ..write_params.clone() + }) + .execute_stream(concurrent_data_3) + .await + .unwrap(); + + // This should succeed and create version 8 (after rebasing over versions 4-7) + assert_eq!(rebased_dataset_3.manifest.version, 8); + + // Verify read after write for test case 3 + let scan_3 = rebased_dataset_3.scan(); + let batches_3: Vec = scan_3 + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let total_rows_3: usize = batches_3.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows_3, 125); // 50 (initial) + 25 (concurrent append 1) + 30 (concurrent append 2) + 20 (concurrent append 3) + + // Verify _latest_manifest.json is updated to version 8 + let latest_version = read_latest_manifest_hint_best_effort( + &rebased_dataset_3.object_store, + &rebased_dataset_3.base, + ) + .await + .unwrap(); + assert_eq!(latest_version.map(|v| v.version), Some(8)); + + // Use the latest dataset to verify final state + let final_dataset = &rebased_dataset_3; + assert_eq!(final_dataset.version().version, 8); + + // Verify all versions exist + for version in 1..=8 { + let version_dataset = final_dataset.checkout_version(version).await.unwrap(); + assert_eq!(version_dataset.manifest.version, version); + } + + // Verify final dataset has all the data (read after write) + let scan = final_dataset.scan(); + let batches: Vec = scan + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + // Initial: 50, Concurrent append 1: 25, Concurrent append 2: 30, Concurrent append 3: 20 + assert_eq!(total_rows, 125); // 50 + 25 + 30 + 20 + + // Test that the V3 batched existence checking works correctly + // by verifying the latest version + let latest_version_id = final_dataset.latest_version_id().await.unwrap(); + let fresh_dataset = final_dataset + .checkout_version(latest_version_id) + .await + .unwrap(); + assert_eq!(fresh_dataset.manifest.version, 8); + + // Verify read after write works correctly + let fresh_scan = fresh_dataset.scan(); + let fresh_batches: Vec = fresh_scan + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let fresh_total_rows: usize = fresh_batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(fresh_total_rows, 125); + } + + #[tokio::test] + async fn test_v3_manifest_naming_scheme_checkout_latest() { + let write_params = WriteParams { + enable_v3_manifest_paths: true, + max_rows_per_file: 100, + ..Default::default() + }; + + // Create initial dataset with V3 naming scheme using in-memory storage + let mut dataset = lance_datagen::gen_batch() + .col("id", lance_datagen::array::step::()) + .col("value", lance_datagen::array::rand::()) + .into_ram_dataset_with_params( + FragmentCount::from(1), + FragmentRowCount::from(50), + Some(write_params.clone()), + ) + .await + .unwrap(); + + // Verify V3 naming scheme is used + assert_eq!( + dataset.manifest_location.naming_scheme, + ManifestNamingScheme::V3 + ); + assert_eq!(dataset.manifest.version, 1); + + // Add several more versions to test checkout_latest + for i in 2..=5 { + dataset + .update_config([(format!("version_{}", i), i.to_string())]) + .await + .unwrap(); + assert_eq!(dataset.manifest.version, i); + } + + // Test Case 1: Latest version hint is correctly pointing to the latest version + { + let latest_version = + read_latest_manifest_hint_best_effort(&dataset.object_store, &dataset.base) + .await + .unwrap(); + assert_eq!(latest_version.map(|v| v.version), Some(5)); + + // Checkout an older version and then checkout_latest + let mut old_dataset = dataset.checkout_version(3).await.unwrap(); + assert_eq!(old_dataset.manifest.version, 3); + + // checkout_latest should bring us to version 5 + old_dataset.checkout_latest().await.unwrap(); + assert_eq!(old_dataset.manifest.version, 5); + } + + // Test Case 2: Latest version hint is pointing to an old version + { + // Manually corrupt the latest version hint to point to an older version + use lance_table::io::commit::LatestManifestHint; + + let corrupted_hint = LatestManifestHint { + version: 3, // Point to older version instead of 5 + size: None, + e_tag: None, + }; + + let hint_path = dataset + .base + .child("_versions") + .child("_latest_manifest_hint.json"); + let hint_json = serde_json::to_vec(&corrupted_hint).unwrap(); + dataset + .object_store + .put(&hint_path, hint_json.as_slice()) + .await + .unwrap(); + + // Verify the hint now points to version 3 + let latest_version = + read_latest_manifest_hint_best_effort(&dataset.object_store, &dataset.base) + .await + .unwrap(); + assert_eq!(latest_version.map(|v| v.version), Some(3)); + + // Checkout an older version and then checkout_latest + let mut old_dataset = dataset.checkout_version(2).await.unwrap(); + assert_eq!(old_dataset.manifest.version, 2); + + // checkout_latest should still find the actual latest version (5) + // even though the hint points to version 3 + old_dataset.checkout_latest().await.unwrap(); + assert_eq!(old_dataset.manifest.version, 5); + } + + // Test Case 3: Latest version hint does not exist + { + // Delete the latest version hint file + let hint_path = dataset + .base + .child("_versions") + .child("_latest_manifest_hint.json"); + dataset.object_store.delete(&hint_path).await.unwrap(); + + // Verify the hint no longer exists + let latest_version = + read_latest_manifest_hint_best_effort(&dataset.object_store, &dataset.base) + .await + .unwrap(); + assert!(latest_version.is_none()); + + // Checkout an older version and then checkout_latest + let mut old_dataset = dataset.checkout_version(1).await.unwrap(); + assert_eq!(old_dataset.manifest.version, 1); + + // checkout_latest should still find the actual latest version (5) + // by scanning all manifest files + old_dataset.checkout_latest().await.unwrap(); + assert_eq!(old_dataset.manifest.version, 5); + } + } + + #[tokio::test] + async fn test_v3_manifest_naming_scheme_checkout_version() { + let write_params = WriteParams { + enable_v3_manifest_paths: true, + max_rows_per_file: 100, + ..Default::default() + }; + + // Create initial dataset with V3 naming scheme using in-memory storage + let mut dataset = lance_datagen::gen_batch() + .col("id", lance_datagen::array::step::()) + .col("value", lance_datagen::array::rand::()) + .into_ram_dataset_with_params( + FragmentCount::from(1), + FragmentRowCount::from(50), + Some(write_params.clone()), + ) + .await + .unwrap(); + + // Verify V3 naming scheme is used + assert_eq!( + dataset.manifest_location.naming_scheme, + ManifestNamingScheme::V3 + ); + assert_eq!(dataset.manifest.version, 1); + + // Test Case 1: Checkout the first version after the dataset is created + { + let version_1_dataset = dataset.checkout_version(1).await.unwrap(); + assert_eq!(version_1_dataset.manifest.version, 1); + + // Verify the dataset content is accessible + let scan = version_1_dataset.scan(); + let batches: Vec = scan + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 50); // Initial dataset has 50 rows + } + + // Create a few more versions using append operations + for i in 2..=5 { + let append_data = lance_datagen::gen_batch() + .col( + "id", + lance_datagen::array::step_custom::(i * 100, 1), + ) + .col("value", lance_datagen::array::rand::()) + .into_reader_rows(RowCount::from(10), BatchCount::from(1)); + + dataset + .append(append_data, Some(write_params.clone())) + .await + .unwrap(); + assert_eq!(dataset.manifest.version, i as u64); + } + + // Now we have versions 1, 2, 3, 4, 5 + assert_eq!(dataset.manifest.version, 5); + + // Test Case 2: Checkout the first version + { + let version_1_dataset = dataset.checkout_version(1).await.unwrap(); + assert_eq!(version_1_dataset.manifest.version, 1); + + // Verify the content is only the original data + let scan = version_1_dataset.scan(); + let batches: Vec = scan + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 50); // Only original 50 rows + } + + // Test Case 3: Checkout a middle version + { + let version_3_dataset = dataset.checkout_version(3).await.unwrap(); + assert_eq!(version_3_dataset.manifest.version, 3); + + // Verify the content includes original + 2 appends + let scan = version_3_dataset.scan(); + let batches: Vec = scan + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 70); // 50 (original) + 10 (v2) + 10 (v3) + } + + // Test Case 4: Explicitly checkout the latest version + { + let version_5_dataset = dataset.checkout_version(5).await.unwrap(); + assert_eq!(version_5_dataset.manifest.version, 5); + + // Verify the content includes all data + let scan = version_5_dataset.scan(); + let batches: Vec = scan + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 90); // 50 (original) + 10 (v2) + 10 (v3) + 10 (v4) + 10 (v5) + } + + // Test Case 5: Verify that all versions can be checked out sequentially + for version in 1..=5 { + let version_dataset = dataset.checkout_version(version).await.unwrap(); + assert_eq!(version_dataset.manifest.version, version); + + // Each version should have the expected number of rows + let scan = version_dataset.scan(); + let batches: Vec = scan + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + let expected_rows = 50 + (version - 1) * 10; // 50 initial + 10 per additional version + assert_eq!(total_rows, expected_rows as usize); + } + + // Test Case 6: Verify checkout works from any version to any other version + { + let version_2_dataset = dataset.checkout_version(2).await.unwrap(); + assert_eq!(version_2_dataset.manifest.version, 2); + + // From version 2, checkout version 4 + let version_4_dataset = version_2_dataset.checkout_version(4).await.unwrap(); + assert_eq!(version_4_dataset.manifest.version, 4); + + // Verify content is correct for version 4 + let scan = version_4_dataset.scan(); + let batches: Vec = scan + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 80); // 50 + 10 + 10 + 10 + } + } + + #[tokio::test] + async fn test_v3_manifest_naming_scheme_commit_checkout_detached() { + let test_dir = tempfile::tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + // Create initial dataset with V3 manifest scheme + let write_params = WriteParams { + enable_v3_manifest_paths: true, + max_rows_per_file: 50, + max_rows_per_group: 25, + ..Default::default() + }; + + let initial_data = lance_datagen::gen_batch() + .col("id", lance_datagen::array::step::()) + .col("value", lance_datagen::array::rand::()) + .into_reader_rows(RowCount::from(100), BatchCount::from(1)); + + let mut dataset = Dataset::write(initial_data, test_uri, Some(write_params.clone())) + .await + .unwrap(); + + // Add a few normal versions + for i in 1..=3 { + let append_data = lance_datagen::gen_batch() + .col( + "id", + lance_datagen::array::step_custom::(100 * i, 1), + ) + .col("value", lance_datagen::array::rand::()) + .into_reader_rows(RowCount::from(20), BatchCount::from(1)); + + dataset + .append( + append_data, + Some(WriteParams { + mode: WriteMode::Append, + ..write_params.clone() + }), + ) + .await + .unwrap(); + } + + let current_version = dataset.version().version; + assert_eq!(current_version, 4); + assert_eq!( + dataset.manifest_location.naming_scheme, + ManifestNamingScheme::V3 + ); + + // Test committing a detached version + let detached_data = lance_datagen::gen_batch() + .col( + "id", + lance_datagen::array::step_custom::(1000, 1), + ) + .col("value", lance_datagen::array::rand::()) + .into_df_stream(RowCount::from(10), BatchCount::from(1)); + + let detached_dataset = InsertBuilder::new(WriteDestination::Dataset(Arc::new(dataset))) + .with_params(&WriteParams { + mode: WriteMode::Append, + detached: true, + ..write_params.clone() + }) + .execute_stream(detached_data) + .await + .unwrap(); + + // Verify detached version properties + let detached_version = detached_dataset.version().version; + assert!(is_detached_version(detached_version)); + assert_eq!( + detached_dataset.manifest_location.naming_scheme, + ManifestNamingScheme::V3 + ); + + // Verify V3 detached manifest file uses normal binary format (ends with "1" due to highest bit set) + let manifest_path = + ManifestNamingScheme::V3.manifest_path(&detached_dataset.base, detached_version); + let filename = manifest_path.filename().unwrap(); + let stem = filename + .strip_suffix(".manifest") + .expect("filename should end with .manifest"); + assert!( + stem.ends_with("1"), + "V3 detached version binary should end with '1' due to DETACHED_VERSION_MASK, got: {}", + stem + ); + // Verify it's a 64-bit binary string + assert_eq!( + stem.len(), + 64, + "V3 manifest filename should be 64-bit binary string, got length: {}", + stem.len() + ); + // Verify it only contains '0' and '1' + assert!( + stem.chars().all(|c| c == '0' || c == '1'), + "V3 manifest filename should be binary, got: {}", + stem + ); + + // Test checkout detached version by version number + let checkout_detached = Dataset::open(test_uri) + .await + .unwrap() + .checkout_version(detached_version) + .await + .unwrap(); + + assert_eq!(checkout_detached.version().version, detached_version); + assert_eq!( + checkout_detached.manifest_location.naming_scheme, + ManifestNamingScheme::V3 + ); + assert!(is_detached_version(checkout_detached.version().version)); + + // Verify data integrity in detached version + let detached_count = checkout_detached.count_rows(None).await.unwrap(); + let expected_count = 100 + 20 * 3 + 10; // initial + 3 appends + detached append + assert_eq!(detached_count, expected_count); + + // Test that we can still checkout the main branch + let main_dataset = Dataset::open(test_uri).await.unwrap(); + assert_eq!(main_dataset.version().version, current_version); + assert!(!is_detached_version(main_dataset.version().version)); + + let main_count = main_dataset.count_rows(None).await.unwrap(); + let expected_main_count = 100 + 20 * 3; // initial + 3 appends (no detached) + assert_eq!(main_count, expected_main_count); + + // Test that checkout by version works for both detached and regular versions + let checkout_v1 = Dataset::open(test_uri) + .await + .unwrap() + .checkout_version(1) + .await + .unwrap(); + assert_eq!(checkout_v1.version().version, 1); + let v1_count = checkout_v1.count_rows(None).await.unwrap(); + assert_eq!(v1_count, 100); + } } diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index 30f3f377c95..8d2df5a0fb2 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -37,6 +37,7 @@ pub struct CommitBuilder<'a> { dest: WriteDestination<'a>, use_move_stable_row_ids: Option, enable_v2_manifest_paths: bool, + enable_v3_manifest_paths: bool, storage_format: Option, commit_handler: Option>, store_params: Option, @@ -54,6 +55,7 @@ impl<'a> CommitBuilder<'a> { dest: dest.into(), use_move_stable_row_ids: None, enable_v2_manifest_paths: false, + enable_v3_manifest_paths: false, storage_format: None, commit_handler: None, store_params: None, @@ -128,15 +130,26 @@ impl<'a> CommitBuilder<'a> { /// This parameter has no effect on existing datasets. To migrate an existing /// dataset, use the [`Dataset::migrate_manifest_paths_v2`] method. **Default is False.** /// - ///
- /// WARNING: turning this on will make the dataset unreadable for older - /// versions of Lance (prior to 0.17.0). - ///
+ /// WARNING: turning this on will make the dataset unreadable for older + /// versions of Lance (prior to 0.17.0). pub fn enable_v2_manifest_paths(mut self, enable: bool) -> Self { self.enable_v2_manifest_paths = enable; self } + /// If set to true, and this is a new dataset, uses the new v3 manifest paths. + /// These use reversed binary representation for S3 throughput optimization + /// and include a _latest_manifest.json file for best-effort latest version tracking. + /// This parameter has no effect on existing datasets. + /// **Default is False.** + /// + /// WARNING: turning this on will make the dataset unreadable for older + /// versions of Lance. + pub fn enable_v3_manifest_paths(mut self, enable: bool) -> Self { + self.enable_v3_manifest_paths = enable; + self + } + /// Commit a version that is not part of the mainline history. /// /// This commit will never show up in the dataset's history. @@ -161,6 +174,13 @@ impl<'a> CommitBuilder<'a> { self } + /// Set the batch size for loading head manifests when checking for concurrent writes + /// in V3 manifest naming scheme. + pub fn with_head_manifests_batch_size(mut self, batch_size: usize) -> Self { + self.commit_config.head_manifests_batch_size = batch_size; + self + } + /// Provide the set of row addresses that were deleted or updated. This is /// used to perform fast conflict resolution. pub fn with_affected_rows(mut self, affected_rows: RowIdTreeMap) -> Self { @@ -261,6 +281,8 @@ impl<'a> CommitBuilder<'a> { let manifest_naming_scheme = if let Some(ds) = dest.dataset() { ds.manifest_location.naming_scheme + } else if self.enable_v3_manifest_paths { + ManifestNamingScheme::V3 } else if self.enable_v2_manifest_paths { ManifestNamingScheme::V2 } else { diff --git a/rust/lance/src/dataset/write/insert.rs b/rust/lance/src/dataset/write/insert.rs index 1b2c7ac021a..2d0aa9890e5 100644 --- a/rust/lance/src/dataset/write/insert.rs +++ b/rust/lance/src/dataset/write/insert.rs @@ -126,9 +126,12 @@ impl<'a> InsertBuilder<'a> { .use_move_stable_row_ids(context.params.enable_move_stable_row_ids) .with_storage_format(context.storage_version) .enable_v2_manifest_paths(context.params.enable_v2_manifest_paths) + .enable_v3_manifest_paths(context.params.enable_v3_manifest_paths) .with_commit_handler(context.commit_handler.clone()) .with_object_store(context.object_store.clone()) - .with_skip_auto_cleanup(context.params.skip_auto_cleanup); + .with_skip_auto_cleanup(context.params.skip_auto_cleanup) + .with_detached(context.params.detached) + .with_head_manifests_batch_size(context.params.head_manifests_batch_size); if let Some(params) = context.params.store_params.as_ref() { commit_builder = commit_builder.with_store_params(params.clone()); diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 5d1fd013fd8..4ff41d3edd2 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -236,6 +236,11 @@ struct MergeInsertParams { // if the writer does not have delete permissions and the clean up would // just try and log a failure anyway. skip_auto_cleanup: bool, + /// Batch size for loading head manifests when checking for concurrent writes + /// in V3 manifest naming scheme. Smaller values can help reduce memory usage + /// but may increase the number of object store requests. + /// Default is 8. + head_manifests_batch_size: usize, } /// A MergeInsertJob inserts new rows, deletes old rows, and updates existing rows all as @@ -319,6 +324,7 @@ impl MergeInsertBuilder { retry_timeout: Duration::from_secs(30), mem_wal_to_flush: None, skip_auto_cleanup: false, + head_manifests_batch_size: 8, }, }) } @@ -379,6 +385,13 @@ impl MergeInsertBuilder { self } + /// Set the batch size for loading head manifests when checking for concurrent writes + /// in V3 manifest naming scheme. + pub fn head_manifests_batch_size(&mut self, batch_size: usize) -> &mut Self { + self.params.head_manifests_batch_size = batch_size; + self + } + /// Indicate that this merge-insert uses data in a sealed MemTable. /// Once write is completed, the corresponding MemTable should also be marked as flushed. pub async fn mark_mem_wal_as_flushed( @@ -1888,8 +1901,9 @@ impl Merger { #[cfg(test)] mod tests { + use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount}; use arrow_array::{ - types::{Int32Type, UInt32Type}, + types::{Float32Type, Int32Type, UInt32Type}, Int32Array, Int64Array, RecordBatchIterator, RecordBatchReader, StringArray, UInt32Array, }; use arrow_select::concat::concat_batches; @@ -1900,6 +1914,7 @@ mod tests { use lance_datagen::{array, BatchCount, RowCount, Seed}; use lance_index::{scalar::ScalarIndexParams, IndexType}; use lance_io::object_store::ObjectStoreParams; + use lance_table::io::commit::{read_latest_manifest_hint_best_effort, ManifestNamingScheme}; use object_store::throttle::ThrottleConfig; use roaring::RoaringBitmap; use std::collections::HashMap; @@ -1909,10 +1924,7 @@ mod tests { use crate::{ dataset::{builder::DatasetBuilder, InsertBuilder, ReadParams, WriteMode, WriteParams}, session::Session, - utils::test::{ - assert_plan_node_equals, assert_string_matches, DatagenExt, FragmentCount, - FragmentRowCount, ThrottledStoreWrapper, - }, + utils::test::{assert_plan_node_equals, assert_string_matches, ThrottledStoreWrapper}, }; use super::*; @@ -3505,4 +3517,223 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n assert!(analysis.contains("num_files_written")); assert!(analysis.contains("elapsed_compute")); } + + #[tokio::test] + async fn test_v3_manifest_naming_scheme_concurrent_merge_insert() { + let write_params = crate::dataset::WriteParams { + enable_v3_manifest_paths: true, + max_rows_per_file: 100, + ..Default::default() + }; + + // Create initial dataset with V3 naming scheme using in-memory storage + let mut dataset = lance_datagen::gen_batch() + .col("id", lance_datagen::array::step::()) + .col("value", lance_datagen::array::rand::()) + .into_ram_dataset_with_params( + FragmentCount::from(1), + FragmentRowCount::from(10), + Some(write_params.clone()), + ) + .await + .unwrap(); + + // Verify V3 naming scheme is used + assert_eq!( + dataset.manifest_location.naming_scheme, + ManifestNamingScheme::V3 + ); + assert_eq!(dataset.manifest.version, 1); + + // Check that _latest_manifest_hint.json exists + let latest_manifest_path = dataset + .base + .child("_versions") + .child("_latest_manifest_hint.json"); + assert!(dataset + .object_store + .exists(&latest_manifest_path) + .await + .unwrap()); + + // Add 4 new versions using lightweight UpdateConfig transactions (simulating intermediate commits) + for i in 1..=4 { + dataset + .update_config([(format!("test_key_{}", i), format!("test_value_{}", i))]) + .await + .unwrap(); + } + + // Verify _latest_manifest_hint.json is updated to version 5 + let latest_version = + read_latest_manifest_hint_best_effort(&dataset.object_store, &dataset.base) + .await + .unwrap(); + assert_eq!(latest_version.map(|v| v.version), Some(5)); + + // Test Case 1: Concurrent merge_insert with default batch size + let concurrent_dataset_1 = dataset.checkout_version(1).await.unwrap(); + assert_eq!(concurrent_dataset_1.manifest.version, 1); + + let merge_data_1 = lance_datagen::gen_batch() + .col("id", lance_datagen::array::cycle::(vec![3, 15])) // Update row 3, insert row 15 + .col( + "value", + lance_datagen::array::cycle::(vec![30.0, 150.0]), + ) + .into_df_stream( + lance_datagen::RowCount::from(2), + lance_datagen::BatchCount::from(1), + ); + + let (merged_dataset_1, _stats_1) = + MergeInsertBuilder::try_new(Arc::new(concurrent_dataset_1), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .try_build() + .unwrap() + .execute(merge_data_1) + .await + .unwrap(); + + // This should succeed and create version 6 (after rebasing over versions 2-5) + assert_eq!(merged_dataset_1.manifest.version, 6); + + // Verify read after write for test case 1 + let scan_1 = merged_dataset_1.scan(); + let batches_1: Vec = scan_1 + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let total_rows_1: usize = batches_1.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows_1, 11); // 10 original rows + 1 new row (id=15), row 3 updated + + // Test Case 2: Concurrent merge_insert with small batch size (1) + let concurrent_dataset_2 = dataset.checkout_version(2).await.unwrap(); + assert_eq!(concurrent_dataset_2.manifest.version, 2); + + let merge_data_2 = lance_datagen::gen_batch() + .col("id", lance_datagen::array::cycle::(vec![2, 16])) // Update row 2, insert row 16 + .col( + "value", + lance_datagen::array::cycle::(vec![20.0, 70.0]), + ) + .into_df_stream( + lance_datagen::RowCount::from(2), + lance_datagen::BatchCount::from(1), + ); + + let (merged_dataset_2, _stats_2) = + MergeInsertBuilder::try_new(Arc::new(concurrent_dataset_2), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .head_manifests_batch_size(1) // Small batch size - forces multiple batches + .try_build() + .unwrap() + .execute(merge_data_2) + .await + .unwrap(); + + // This should succeed and create version 7 (after rebasing over versions 3-6) + assert_eq!(merged_dataset_2.manifest.version, 7); + + // Verify read after write for test case 2 + let scan_2 = merged_dataset_2.scan(); + let batches_2: Vec = scan_2 + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let total_rows_2: usize = batches_2.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows_2, 12); // Previous 11 rows + 1 new row (id=16), row 2 updated + + // Test Case 3: Concurrent merge_insert with large batch size (20) + let concurrent_dataset_3 = dataset.checkout_version(3).await.unwrap(); + assert_eq!(concurrent_dataset_3.manifest.version, 3); + + let merge_data_3 = lance_datagen::gen_batch() + .col("id", lance_datagen::array::cycle::(vec![1, 17])) // Update row 1, insert row 17 + .col( + "value", + lance_datagen::array::cycle::(vec![10.0, 80.0]), + ) + .into_df_stream( + lance_datagen::RowCount::from(2), + lance_datagen::BatchCount::from(1), + ); + + let (merged_dataset_3, _stats_3) = + MergeInsertBuilder::try_new(Arc::new(concurrent_dataset_3), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .head_manifests_batch_size(20) // Large batch size - single batch + .try_build() + .unwrap() + .execute(merge_data_3) + .await + .unwrap(); + + // This should succeed and create version 8 (after rebasing over versions 4-7) + assert_eq!(merged_dataset_3.manifest.version, 8); + + // Verify read after write for test case 3 + let scan_3 = merged_dataset_3.scan(); + let batches_3: Vec = scan_3 + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let total_rows_3: usize = batches_3.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows_3, 13); // Previous 12 rows + 1 new row (id=17), row 1 updated + + // Verify _latest_manifest_hint.json is updated to version 8 + let latest_version = read_latest_manifest_hint_best_effort( + &merged_dataset_3.object_store, + &merged_dataset_3.base, + ) + .await + .unwrap(); + assert_eq!(latest_version.map(|v| v.version), Some(8)); + + // Use the latest dataset to verify final state + let final_dataset = &merged_dataset_3; + assert_eq!(final_dataset.version().version, 8); + + // Verify all versions exist + for version in 1..=8 { + let version_dataset = final_dataset.checkout_version(version).await.unwrap(); + assert_eq!(version_dataset.manifest.version, version); + } + + // Test that the V3 batched existence checking works correctly + // by verifying the latest version + let latest_version_id = final_dataset.latest_version_id().await.unwrap(); + let fresh_dataset = final_dataset + .checkout_version(latest_version_id) + .await + .unwrap(); + assert_eq!(fresh_dataset.manifest.version, 8); + + // Verify read after write works correctly + let fresh_scan = fresh_dataset.scan(); + let fresh_batches: Vec = fresh_scan + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let fresh_total_rows: usize = fresh_batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(fresh_total_rows, 13); + } } diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index fd810e54b46..7402fcbebcc 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -35,7 +35,8 @@ use lance_table::format::{ WriterVersion, DETACHED_VERSION_MASK, }; use lance_table::io::commit::{ - CommitConfig, CommitError, CommitHandler, ManifestLocation, ManifestNamingScheme, + write_latest_manifest_hint_best_effort, CommitConfig, CommitError, CommitHandler, + ManifestLocation, ManifestNamingScheme, }; use rand::{rng, Rng}; use snafu::location; @@ -53,7 +54,8 @@ use crate::dataset::cleanup::auto_cleanup_hook; use crate::dataset::fragment::FileFragment; use crate::dataset::transaction::{Operation, Transaction}; use crate::dataset::{ - load_new_transactions, write_manifest_file, ManifestWriteConfig, NewTransactionResult, BLOB_DIR, + load_and_sort_new_transactions_v3, load_new_transactions, write_manifest_file, + ManifestWriteConfig, NewTransactionResult, BLOB_DIR, }; use crate::index::DatasetIndexInternalExt; use crate::io::deletion::read_dataset_deletion_file; @@ -149,6 +151,12 @@ async fn do_commit_new_dataset( metadata_cache .insert_with_key(&manifest_key, Arc::new(manifest.clone())) .await; + + if matches!(manifest_naming_scheme, ManifestNamingScheme::V3) { + write_latest_manifest_hint_best_effort(object_store, base_path, &manifest_location) + .await?; + } + Ok((manifest, manifest_location)) } Err(CommitError::CommitConflict) => Err(crate::Error::DatasetAlreadyExists { @@ -606,7 +614,7 @@ pub(crate) async fn do_commit_detached_transaction( Some(indices.clone()) }, write_config, - ManifestNamingScheme::V2, + dataset.manifest_location.naming_scheme, ) .await; @@ -760,7 +768,13 @@ pub(crate) async fn commit_transaction( // slower performance for concurrent writes. But that makes the fast path // faster and the slow path slower, which makes performance less predictable // for users. So we always check for other transactions. - (dataset, other_transactions) = load_and_sort_new_transactions(&dataset).await?; + (dataset, other_transactions) = + if dataset.manifest_location.naming_scheme == ManifestNamingScheme::V3 { + load_and_sort_new_transactions_v3(&dataset, commit_config.head_manifests_batch_size) + .await? + } else { + load_and_sort_new_transactions(&dataset).await? + }; // See if we can retry the commit. Try to account for all // transactions that have been committed since the read_version. @@ -867,6 +881,15 @@ pub(crate) async fn commit_transaction( .await; } + if matches!(manifest_location.naming_scheme, ManifestNamingScheme::V3) { + write_latest_manifest_hint_best_effort( + object_store, + &dataset.base, + &manifest_location, + ) + .await?; + } + if !commit_config.skip_auto_cleanup { // Note: We're using the old dataset here (before the new manifest is committed). // This means cleanup runs based on the previous version's state, which may affect From 4db09efd1069d1b6c79d838da74b1c81ae224064 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Sat, 16 Aug 2025 15:36:56 -0700 Subject: [PATCH 2/7] improve bench code --- rust/lance/benches/manifest_scheme.rs | 116 +++++++++++++------------- rust/lance/src/dataset.rs | 2 +- rust/lance/src/io/commit.rs | 2 +- 3 files changed, 60 insertions(+), 60 deletions(-) diff --git a/rust/lance/benches/manifest_scheme.rs b/rust/lance/benches/manifest_scheme.rs index df29e0943a9..0fa29b6c46e 100644 --- a/rust/lance/benches/manifest_scheme.rs +++ b/rust/lance/benches/manifest_scheme.rs @@ -30,13 +30,11 @@ use std::sync::Arc; -use arrow_array::types::{Float32Type, Int32Type}; use arrow_array::{Float32Array, Int32Array, RecordBatch, RecordBatchIterator}; use arrow_schema::{DataType, Field, Schema as ArrowSchema}; use criterion::{criterion_group, criterion_main, Criterion}; -use lance::dataset::{Dataset, InsertBuilder, WriteDestination, WriteMode, WriteParams}; -use lance_datafusion::datagen::DatafusionDatagenExt; -use lance_datagen::{BatchCount, RowCount}; +use lance::dataset::{load_and_sort_new_transactions_v3, Dataset, WriteParams}; +use lance::io::commit::load_and_sort_new_transactions; use lance_table::io::commit::ManifestNamingScheme; #[cfg(target_os = "linux")] use pprof::criterion::{Output, PProfProfiler}; @@ -63,7 +61,7 @@ fn get_storage_type() -> String { } else if prefix.starts_with("az://") { "azure".to_string() } else { - "file".to_string() + "local".to_string() } } @@ -124,8 +122,7 @@ async fn create_test_dataset( dataset } -/// Benchmark checkout_latest performance: V2 vs V3 -fn bench_checkout_latest_v2_vs_v3(c: &mut Criterion) { +fn bench_checkout_latest(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); let storage_prefix = get_storage_prefix(); let storage_type = get_storage_type(); @@ -155,7 +152,7 @@ fn bench_checkout_latest_v2_vs_v3(c: &mut Criterion) { c.bench_function( &format!( - "Checkout Latest {} ({} versions, {})", + "checkout_latest_{} ({} versions, {})", scheme_name, num_versions, storage_type ), |b| { @@ -172,74 +169,77 @@ fn bench_checkout_latest_v2_vs_v3(c: &mut Criterion) { } } -/// Benchmark concurrent commit performance: V2 vs V3 -fn bench_concurrent_commits_v2_vs_v3(c: &mut Criterion) { +fn bench_load_new_transactions_during_commit(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); let storage_prefix = get_storage_prefix(); let storage_type = get_storage_type(); - for num_versions in [10, 50, 100] { - for commits_behind in [5, 10, 20] { - for scheme in [ManifestNamingScheme::V2, ManifestNamingScheme::V3] { - let scheme_name = match scheme { - ManifestNamingScheme::V2 => "V2", - ManifestNamingScheme::V3 => "V3", - _ => unreachable!(), - }; + // Test different combinations of commits_behind and head_manifests_batch_size + let test_configs = [(3, 4), (5, 8)]; + + for num_versions in [50, 100, 200] { + for (commits_behind, head_manifests_batch_size) in test_configs { + // Skip if we don't have enough versions + if commits_behind >= num_versions { + continue; + } + + // Test V2 scheme + { + let scheme = ManifestNamingScheme::V2; + let dataset_uri = format!("{}bench_load_v2_{}.lance", storage_prefix, num_versions); + let dataset = rt.block_on(create_test_dataset(&dataset_uri, num_versions, scheme)); + let latest_version = dataset.version().version; + let target_version = latest_version - commits_behind; + + c.bench_function( + &format!( + "load_new_transactions_during_commit_V2 ({} versions, {} behind, {})", + num_versions, commits_behind, storage_type + ), + |b| { + b.to_async(&rt).iter(|| async { + // Checkout an older version to simulate being behind + let old_dataset = dataset.checkout_version(target_version).await.unwrap(); - // Skip if we don't have enough versions - if commits_behind >= num_versions { - continue; - } + // Measure only the load_and_sort_new_transactions function + let (_new_dataset, transactions) = + load_and_sort_new_transactions(&old_dataset).await.unwrap(); - let dataset_uri = format!( - "{}bench_concurrent_{}_{}.lance", - storage_prefix, - scheme_name.to_lowercase(), - num_versions + // Verify we loaded the expected number of transactions + assert_eq!(transactions.len(), commits_behind as usize); + }) + }, ); + } + + // Test V3 scheme + { + let scheme = ManifestNamingScheme::V3; + let dataset_uri = format!("{}bench_load_v3_{}.lance", storage_prefix, num_versions); let dataset = rt.block_on(create_test_dataset(&dataset_uri, num_versions, scheme)); let latest_version = dataset.version().version; let target_version = latest_version - commits_behind; c.bench_function( &format!( - "Concurrent Commit {} ({} versions, {} behind, {})", - scheme_name, num_versions, commits_behind, storage_type + "load_new_transactions_during_commit_V3 ({} versions, {} behind, head batch size {}, {})", + num_versions, commits_behind, head_manifests_batch_size, storage_type ), |b| { b.to_async(&rt).iter(|| async { + // Checkout an older version to simulate being behind let old_dataset = dataset.checkout_version(target_version).await.unwrap(); - // Create small append data for concurrent write - let append_data = lance_datagen::gen_batch() - .col( - "id", - lance_datagen::array::step_custom::(1000, 1), - ) - .col("value", lance_datagen::array::rand::()) - .into_df_stream(RowCount::from(10), BatchCount::from(1)); - - // Perform concurrent commit with conflict resolution - let write_params = WriteParams { - mode: WriteMode::Append, - head_manifests_batch_size: if matches!( - scheme, - ManifestNamingScheme::V3 - ) { - 8 - } else { - 1 - }, - ..Default::default() - }; + // Measure only the load_and_sort_new_transactions_v3 function + let (_new_dataset, transactions) = + load_and_sort_new_transactions_v3(&old_dataset, head_manifests_batch_size) + .await + .unwrap(); - InsertBuilder::new(WriteDestination::Dataset(Arc::new(old_dataset))) - .with_params(&write_params) - .execute_stream(append_data) - .await - .unwrap(); + // Verify we loaded the expected number of transactions + assert_eq!(transactions.len(), commits_behind as usize); }) }, ); @@ -256,7 +256,7 @@ criterion_group!( .sample_size(50) .warm_up_time(std::time::Duration::from_secs(5)) .with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); - targets = bench_checkout_latest_v2_vs_v3, bench_concurrent_commits_v2_vs_v3 + targets = bench_checkout_latest, bench_load_new_transactions_during_commit ); #[cfg(not(target_os = "linux"))] @@ -266,7 +266,7 @@ criterion_group!( .significance_level(0.01) .sample_size(50) .warm_up_time(std::time::Duration::from_secs(5)); - targets = bench_checkout_latest_v2_vs_v3, bench_concurrent_commits_v2_vs_v3 + targets = bench_checkout_latest, bench_load_new_transactions_during_commit ); criterion_main!(benches); diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index e7fca1dc581..e1d40ea57cb 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1760,7 +1760,7 @@ pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<' } } -pub(crate) async fn load_and_sort_new_transactions_v3( +pub async fn load_and_sort_new_transactions_v3( dataset: &Dataset, head_manifests_batch_size: usize, ) -> Result<(Dataset, Vec<(u64, Arc)>)> { diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 7402fcbebcc..b1d0dc986f0 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -687,7 +687,7 @@ pub(crate) async fn commit_detached_transaction( } /// Load new transactions and sort them by version in ascending order (oldest to newest) -async fn load_and_sort_new_transactions( +pub async fn load_and_sort_new_transactions( dataset: &Dataset, ) -> Result<(Dataset, Vec<(u64, Arc)>)> { let NewTransactionResult { From 0968554a7257e3de5dd5b93f91e74c1cf70b0b63 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Sat, 16 Aug 2025 20:55:31 -0700 Subject: [PATCH 3/7] reuse bench dataset --- rust/lance/benches/manifest_scheme.rs | 28 +++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/rust/lance/benches/manifest_scheme.rs b/rust/lance/benches/manifest_scheme.rs index 0fa29b6c46e..1ca43330e3c 100644 --- a/rust/lance/benches/manifest_scheme.rs +++ b/rust/lance/benches/manifest_scheme.rs @@ -66,20 +66,31 @@ fn get_storage_type() -> String { } /// Create a test dataset with specified number of versions and manifest scheme +/// Only creates the dataset if it doesn't exist, otherwise returns the existing one async fn create_test_dataset( base_uri: &str, num_versions: u64, manifest_scheme: ManifestNamingScheme, ) -> Dataset { - // Clean up existing dataset if using file storage - if !base_uri.starts_with("memory://") - && !base_uri.starts_with("s3://") - && !base_uri.starts_with("gs://") - && !base_uri.starts_with("az://") - { - let _ = std::fs::remove_dir_all(base_uri); + // Try to open existing dataset first + if let Ok(mut existing_dataset) = Dataset::open(base_uri).await { + // Check if it has the expected number of versions + let current_version = existing_dataset.version().version; + if current_version >= num_versions - 1 { + // Dataset exists with enough versions, return it + return existing_dataset; + } + // Add more versions if needed + for i in (current_version + 1)..num_versions { + existing_dataset + .update_config([(format!("version_{}", i), i.to_string())]) + .await + .unwrap(); + } + return existing_dataset; } + // Dataset doesn't exist, create it let write_params = WriteParams { enable_v2_manifest_paths: matches!(manifest_scheme, ManifestNamingScheme::V2), enable_v3_manifest_paths: matches!(manifest_scheme, ManifestNamingScheme::V3), @@ -200,7 +211,8 @@ fn bench_load_new_transactions_during_commit(c: &mut Criterion) { |b| { b.to_async(&rt).iter(|| async { // Checkout an older version to simulate being behind - let old_dataset = dataset.checkout_version(target_version).await.unwrap(); + let old_dataset = + dataset.checkout_version(target_version).await.unwrap(); // Measure only the load_and_sort_new_transactions function let (_new_dataset, transactions) = From a9c535e61781ebdf2327180e28ece4950a102f21 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Sat, 16 Aug 2025 22:03:22 -0700 Subject: [PATCH 4/7] fix error --- rust/lance-table/src/io/commit.rs | 5 ++--- rust/lance/src/dataset.rs | 7 ++++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 5805bb44665..54da05daf2c 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -851,9 +851,8 @@ async fn default_resolve_version( } // If we reach here, no scheme worked - Err(Error::Internal { - message: format!("Version {version} not found in any supported naming scheme"), - location: location!(), + Err(Error::VersionNotFound { + message: format!("Can not find version {version}"), }) } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index e1d40ea57cb..c2da443763d 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -423,7 +423,12 @@ impl Dataset { &self.object_store.inner, Some(self.manifest_location.naming_scheme), ) - .await?; + .await + .map_err(|e| Error::DatasetNotFound { + path: self.uri.clone(), + source: box_error(e), + location: location!(), + })?; if self.already_checked_out(&manifest_location) { return Ok(self.clone()); From ea01d5a86dd86db19999ad0866a29127d6345330 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Sun, 17 Aug 2025 21:11:33 -0700 Subject: [PATCH 5/7] match error message --- rust/lance-table/src/io/commit.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 54da05daf2c..db93e41006e 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -852,7 +852,7 @@ async fn default_resolve_version( // If we reach here, no scheme worked Err(Error::VersionNotFound { - message: format!("Can not find version {version}"), + message: format!("version {version} does not exist"), }) } From 71bc21782337f6bdb93d9fbafaa815a6b65d9edb Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Sun, 17 Aug 2025 22:09:51 -0700 Subject: [PATCH 6/7] fix python error handling --- python/src/dataset.rs | 1 + rust/lance/benches/manifest_scheme.rs | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 66c67c02931..7d0e22fd177 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -2165,6 +2165,7 @@ impl Dataset { .block_on(None, self.ds.checkout_version(version))? .map_err(|err| match err { lance::Error::NotFound { .. } => PyValueError::new_err(err.to_string()), + lance::Error::DatasetNotFound { .. } => PyValueError::new_err(err.to_string()), _ => PyIOError::new_err(err.to_string()), })?; diff --git a/rust/lance/benches/manifest_scheme.rs b/rust/lance/benches/manifest_scheme.rs index 1ca43330e3c..37686a8eb22 100644 --- a/rust/lance/benches/manifest_scheme.rs +++ b/rust/lance/benches/manifest_scheme.rs @@ -186,7 +186,9 @@ fn bench_load_new_transactions_during_commit(c: &mut Criterion) { let storage_type = get_storage_type(); // Test different combinations of commits_behind and head_manifests_batch_size - let test_configs = [(3, 4), (5, 8)]; + // to better measure the effectiveness, use large commits_behind size + // otherwise the time is dominated by the file download time and variance is high + let test_configs = [(20, 21)]; for num_versions in [50, 100, 200] { for (commits_behind, head_manifests_batch_size) in test_configs { From 0bf961057db3a863b66375a589f6a87522b61448 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Sun, 17 Aug 2025 22:22:45 -0700 Subject: [PATCH 7/7] update bench --- rust/lance/benches/manifest_scheme.rs | 89 +-------------------------- 1 file changed, 3 insertions(+), 86 deletions(-) diff --git a/rust/lance/benches/manifest_scheme.rs b/rust/lance/benches/manifest_scheme.rs index 37686a8eb22..392cdc7fcaa 100644 --- a/rust/lance/benches/manifest_scheme.rs +++ b/rust/lance/benches/manifest_scheme.rs @@ -33,8 +33,7 @@ use std::sync::Arc; use arrow_array::{Float32Array, Int32Array, RecordBatch, RecordBatchIterator}; use arrow_schema::{DataType, Field, Schema as ArrowSchema}; use criterion::{criterion_group, criterion_main, Criterion}; -use lance::dataset::{load_and_sort_new_transactions_v3, Dataset, WriteParams}; -use lance::io::commit::load_and_sort_new_transactions; +use lance::dataset::{Dataset, WriteParams}; use lance_table::io::commit::ManifestNamingScheme; #[cfg(target_os = "linux")] use pprof::criterion::{Output, PProfProfiler}; @@ -180,88 +179,6 @@ fn bench_checkout_latest(c: &mut Criterion) { } } -fn bench_load_new_transactions_during_commit(c: &mut Criterion) { - let rt = tokio::runtime::Runtime::new().unwrap(); - let storage_prefix = get_storage_prefix(); - let storage_type = get_storage_type(); - - // Test different combinations of commits_behind and head_manifests_batch_size - // to better measure the effectiveness, use large commits_behind size - // otherwise the time is dominated by the file download time and variance is high - let test_configs = [(20, 21)]; - - for num_versions in [50, 100, 200] { - for (commits_behind, head_manifests_batch_size) in test_configs { - // Skip if we don't have enough versions - if commits_behind >= num_versions { - continue; - } - - // Test V2 scheme - { - let scheme = ManifestNamingScheme::V2; - let dataset_uri = format!("{}bench_load_v2_{}.lance", storage_prefix, num_versions); - let dataset = rt.block_on(create_test_dataset(&dataset_uri, num_versions, scheme)); - let latest_version = dataset.version().version; - let target_version = latest_version - commits_behind; - - c.bench_function( - &format!( - "load_new_transactions_during_commit_V2 ({} versions, {} behind, {})", - num_versions, commits_behind, storage_type - ), - |b| { - b.to_async(&rt).iter(|| async { - // Checkout an older version to simulate being behind - let old_dataset = - dataset.checkout_version(target_version).await.unwrap(); - - // Measure only the load_and_sort_new_transactions function - let (_new_dataset, transactions) = - load_and_sort_new_transactions(&old_dataset).await.unwrap(); - - // Verify we loaded the expected number of transactions - assert_eq!(transactions.len(), commits_behind as usize); - }) - }, - ); - } - - // Test V3 scheme - { - let scheme = ManifestNamingScheme::V3; - let dataset_uri = format!("{}bench_load_v3_{}.lance", storage_prefix, num_versions); - let dataset = rt.block_on(create_test_dataset(&dataset_uri, num_versions, scheme)); - let latest_version = dataset.version().version; - let target_version = latest_version - commits_behind; - - c.bench_function( - &format!( - "load_new_transactions_during_commit_V3 ({} versions, {} behind, head batch size {}, {})", - num_versions, commits_behind, head_manifests_batch_size, storage_type - ), - |b| { - b.to_async(&rt).iter(|| async { - // Checkout an older version to simulate being behind - let old_dataset = - dataset.checkout_version(target_version).await.unwrap(); - - // Measure only the load_and_sort_new_transactions_v3 function - let (_new_dataset, transactions) = - load_and_sort_new_transactions_v3(&old_dataset, head_manifests_batch_size) - .await - .unwrap(); - - // Verify we loaded the expected number of transactions - assert_eq!(transactions.len(), commits_behind as usize); - }) - }, - ); - } - } - } -} - #[cfg(target_os = "linux")] criterion_group!( name = benches; @@ -270,7 +187,7 @@ criterion_group!( .sample_size(50) .warm_up_time(std::time::Duration::from_secs(5)) .with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); - targets = bench_checkout_latest, bench_load_new_transactions_during_commit + targets = bench_checkout_latest ); #[cfg(not(target_os = "linux"))] @@ -280,7 +197,7 @@ criterion_group!( .significance_level(0.01) .sample_size(50) .warm_up_time(std::time::Duration::from_secs(5)); - targets = bench_checkout_latest, bench_load_new_transactions_during_commit + targets = bench_checkout_latest ); criterion_main!(benches);