diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 3913c5b255f..11c4937189e 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1574,6 +1574,24 @@ impl Dataset { &self.object_store } + /// Clone this dataset with a different object store binding. + /// + /// The returned dataset shares metadata, session state, and caches with the + /// original dataset, but all subsequent operations on the returned dataset + /// use the supplied object store. + pub fn with_object_store( + &self, + object_store: Arc, + store_params: Option, + ) -> Self { + let mut cloned = self.clone(); + cloned.object_store = object_store; + if let Some(store_params) = store_params { + cloned.store_params = Some(Box::new(store_params)); + } + cloned + } + /// Returns the initial storage options used when opening this dataset, if any. /// /// This returns the static initial options without triggering any refresh. diff --git a/rust/lance/src/dataset/tests/dataset_io.rs b/rust/lance/src/dataset/tests/dataset_io.rs index 5aade47d9e1..5a8613c1577 100644 --- a/rust/lance/src/dataset/tests/dataset_io.rs +++ b/rust/lance/src/dataset/tests/dataset_io.rs @@ -39,7 +39,8 @@ use lance_table::feature_flags; use futures::TryStreamExt; use lance_index::scalar::ScalarIndexParams; use lance_index::{DatasetIndexExt, IndexType}; -use lance_io::object_store::ObjectStore; +use lance_io::object_store::{ObjectStore, ObjectStoreParams}; +use lance_io::utils::tracking_store::IOTracker; use lance_table::io::manifest::read_manifest; use object_store::path::Path; use rstest::rstest; @@ -73,6 +74,98 @@ async fn test_truncate_table() { assert_eq!(&actual_schema, expected_schema.as_ref()); } +async fn drain_scan(dataset: &Dataset) { + dataset + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); +} + +#[tokio::test] +async fn test_with_object_store_clone_preserves_shared_state_and_overrides_store_binding() { + let test_dir = TempStdDir::default(); + create_file(&test_dir, WriteMode::Create, LanceFileVersion::Stable).await; + let uri = test_dir.to_str().unwrap(); + let dataset = Dataset::open(uri).await.unwrap(); + + let io_tracker = Arc::new(IOTracker::default()); + let store_params = ObjectStoreParams { + object_store_wrapper: Some(io_tracker), + ..Default::default() + }; + let (wrapped_store, _) = ObjectStore::from_uri_and_params( + dataset.session().store_registry(), + dataset.uri(), + &store_params, + ) + .await + .unwrap(); + let wrapped_dataset = dataset.with_object_store(wrapped_store, Some(store_params)); + assert!(Arc::ptr_eq(&dataset.session(), &wrapped_dataset.session())); + assert!(!Arc::ptr_eq( + &dataset.object_store().inner, + &wrapped_dataset.object_store().inner + )); +} + +#[tokio::test] +async fn test_with_object_store_enables_isolated_per_request_io_tracking() { + let test_dir = TempStdDir::default(); + create_file(&test_dir, WriteMode::Create, LanceFileVersion::Stable).await; + let uri = test_dir.to_str().unwrap(); + let dataset = Dataset::open(uri).await.unwrap(); + + let tracker_a = Arc::new(IOTracker::default()); + let store_params_a = ObjectStoreParams { + object_store_wrapper: Some(tracker_a.clone()), + ..Default::default() + }; + let (wrapped_store_a, _) = ObjectStore::from_uri_and_params( + dataset.session().store_registry(), + dataset.uri(), + &store_params_a, + ) + .await + .unwrap(); + let wrapped_a = dataset.with_object_store(wrapped_store_a, Some(store_params_a)); + + let tracker_b = Arc::new(IOTracker::default()); + let store_params_b = ObjectStoreParams { + object_store_wrapper: Some(tracker_b.clone()), + ..Default::default() + }; + let (wrapped_store_b, _) = ObjectStore::from_uri_and_params( + dataset.session().store_registry(), + dataset.uri(), + &store_params_b, + ) + .await + .unwrap(); + let wrapped_b = dataset.with_object_store(wrapped_store_b, Some(store_params_b)); + + let _ = tracker_a.incremental_stats(); // reset + let _ = tracker_b.incremental_stats(); // reset + + // Request A uses only wrapper A. + drain_scan(&wrapped_a).await; + assert!(tracker_a.incremental_stats().read_iops > 0); + assert_eq!(tracker_b.incremental_stats().read_iops, 0); + + // Request B uses only wrapper B. + drain_scan(&wrapped_b).await; + assert_eq!(tracker_a.incremental_stats().read_iops, 0); + assert!(tracker_b.incremental_stats().read_iops > 0); + + // Base dataset does not use request-specific wrappers. + drain_scan(&dataset).await; + assert_eq!(tracker_a.incremental_stats().read_iops, 0); + assert_eq!(tracker_b.incremental_stats().read_iops, 0); +} + #[rstest] #[lance_test_macros::test(tokio::test)] async fn test_create_dataset(