Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1574,6 +1574,24 @@ impl Dataset {
&self.object_store
}

/// Clone this dataset with a different object store binding.
///
/// The returned dataset shares metadata, session state, and caches with the
/// original dataset, but all subsequent operations on the returned dataset
/// use the supplied object store.
pub fn with_object_store(
&self,
object_store: Arc<ObjectStore>,
store_params: Option<ObjectStoreParams>,
) -> Self {
let mut cloned = self.clone();
cloned.object_store = object_store;
if let Some(store_params) = store_params {
cloned.store_params = Some(Box::new(store_params));
}
cloned
}

/// Returns the initial storage options used when opening this dataset, if any.
///
/// This returns the static initial options without triggering any refresh.
Expand Down
95 changes: 94 additions & 1 deletion rust/lance/src/dataset/tests/dataset_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ use lance_table::feature_flags;
use futures::TryStreamExt;
use lance_index::scalar::ScalarIndexParams;
use lance_index::{DatasetIndexExt, IndexType};
use lance_io::object_store::ObjectStore;
use lance_io::object_store::{ObjectStore, ObjectStoreParams};
use lance_io::utils::tracking_store::IOTracker;
use lance_table::io::manifest::read_manifest;
use object_store::path::Path;
use rstest::rstest;
Expand Down Expand Up @@ -73,6 +74,98 @@ async fn test_truncate_table() {
assert_eq!(&actual_schema, expected_schema.as_ref());
}

async fn drain_scan(dataset: &Dataset) {
dataset
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
}

#[tokio::test]
async fn test_with_object_store_clone_preserves_shared_state_and_overrides_store_binding() {
let test_dir = TempStdDir::default();
create_file(&test_dir, WriteMode::Create, LanceFileVersion::Stable).await;
let uri = test_dir.to_str().unwrap();
let dataset = Dataset::open(uri).await.unwrap();

let io_tracker = Arc::new(IOTracker::default());
let store_params = ObjectStoreParams {
object_store_wrapper: Some(io_tracker),
..Default::default()
};
let (wrapped_store, _) = ObjectStore::from_uri_and_params(
dataset.session().store_registry(),
dataset.uri(),
&store_params,
)
.await
.unwrap();
let wrapped_dataset = dataset.with_object_store(wrapped_store, Some(store_params));
assert!(Arc::ptr_eq(&dataset.session(), &wrapped_dataset.session()));
assert!(!Arc::ptr_eq(
&dataset.object_store().inner,
&wrapped_dataset.object_store().inner
));
}

#[tokio::test]
async fn test_with_object_store_enables_isolated_per_request_io_tracking() {
let test_dir = TempStdDir::default();
create_file(&test_dir, WriteMode::Create, LanceFileVersion::Stable).await;
let uri = test_dir.to_str().unwrap();
let dataset = Dataset::open(uri).await.unwrap();

let tracker_a = Arc::new(IOTracker::default());
let store_params_a = ObjectStoreParams {
object_store_wrapper: Some(tracker_a.clone()),
..Default::default()
};
let (wrapped_store_a, _) = ObjectStore::from_uri_and_params(
dataset.session().store_registry(),
dataset.uri(),
&store_params_a,
)
.await
.unwrap();
let wrapped_a = dataset.with_object_store(wrapped_store_a, Some(store_params_a));

let tracker_b = Arc::new(IOTracker::default());
let store_params_b = ObjectStoreParams {
object_store_wrapper: Some(tracker_b.clone()),
..Default::default()
};
let (wrapped_store_b, _) = ObjectStore::from_uri_and_params(
dataset.session().store_registry(),
dataset.uri(),
&store_params_b,
)
.await
.unwrap();
let wrapped_b = dataset.with_object_store(wrapped_store_b, Some(store_params_b));

let _ = tracker_a.incremental_stats(); // reset
let _ = tracker_b.incremental_stats(); // reset

// Request A uses only wrapper A.
drain_scan(&wrapped_a).await;
assert!(tracker_a.incremental_stats().read_iops > 0);
assert_eq!(tracker_b.incremental_stats().read_iops, 0);

// Request B uses only wrapper B.
drain_scan(&wrapped_b).await;
assert_eq!(tracker_a.incremental_stats().read_iops, 0);
assert!(tracker_b.incremental_stats().read_iops > 0);

// Base dataset does not use request-specific wrappers.
drain_scan(&dataset).await;
assert_eq!(tracker_a.incremental_stats().read_iops, 0);
assert_eq!(tracker_b.incremental_stats().read_iops, 0);
}

#[rstest]
#[lance_test_macros::test(tokio::test)]
async fn test_create_dataset(
Expand Down
Loading