diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 8ef2c79a67e..824304c3f8d 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -177,6 +177,8 @@ pub struct Dataset { /// Object store parameters used when opening this dataset. /// These are used when creating object stores for additional base paths. pub(crate) store_params: Option>, + /// Optional runtime-only object store parameters keyed by base path URI. + pub(crate) base_store_params: Option>>, } impl std::fmt::Debug for Dataset { @@ -186,6 +188,7 @@ impl std::fmt::Debug for Dataset { .field("base", &self.base) .field("version", &self.manifest.version) .field("cache_num_items", &self.session.approx_num_items()) + .field("base_store_params", &self.base_store_params.is_some()) .finish() } } @@ -577,6 +580,7 @@ impl Dataset { self.commit_handler.clone(), self.file_reader_options.clone(), self.store_params.as_deref().cloned(), + self.base_store_params.clone(), ) } @@ -695,6 +699,7 @@ impl Dataset { commit_handler: Arc, file_reader_options: Option, store_params: Option, + base_store_params: Option>>, ) -> Result { let refs = Refs::new( object_store.clone(), @@ -722,6 +727,7 @@ impl Dataset { index_cache, file_reader_options, store_params: store_params.map(Box::new), + base_store_params, }) } @@ -1631,6 +1637,23 @@ impl Dataset { cloned } + fn store_params_for_base( + &self, + base_path: Option<&lance_table::format::BasePath>, + ) -> ObjectStoreParams { + // Base-specific bindings are exact ObjectStoreParams keyed by + // `BasePath.path`. If a base has no explicit binding then reads fall back + // to the dataset-level default store params. + base_path + .and_then(|base_path| { + self.base_store_params + .as_ref() + .and_then(|params| params.get(&base_path.path)) + }) + .cloned() + .unwrap_or_else(|| self.store_params.as_deref().cloned().unwrap_or_default()) + } + /// Returns the initial storage options used when opening this dataset, if any. /// /// This returns the static initial options without triggering any refresh. @@ -1739,11 +1762,12 @@ impl Dataset { let base_path = self.manifest.base_paths.get(&base_id).ok_or_else(|| { Error::invalid_input(format!("Dataset base path with ID {} not found", base_id)) })?; + let store_params = self.store_params_for_base(Some(base_path)); let (store, _) = ObjectStore::from_uri_and_params( self.session.store_registry(), &base_path.path, - &self.store_params.as_deref().cloned().unwrap_or_default(), + &store_params, ) .await?; @@ -2564,6 +2588,7 @@ pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<' dataset.commit_handler.clone(), dataset.file_reader_options.clone(), dataset.store_params.as_deref().cloned(), + dataset.base_store_params.clone(), )?; let loaded = Arc::new(dataset_version.read_transaction().await?.ok_or_else(|| { @@ -2595,6 +2620,7 @@ pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<' dataset.commit_handler.clone(), dataset.file_reader_options.clone(), dataset.store_params.as_deref().cloned(), + dataset.base_store_params.clone(), ) } else { // If we didn't get the latest manifest, we can still return the dataset diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 33c365ee534..8b6cbe3160c 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -45,25 +45,23 @@ pub(super) struct ExternalBaseCandidate { pub base_id: u32, pub store_prefix: String, pub base_path: Path, + pub store_params: ObjectStoreParams, } #[derive(Debug)] pub(super) struct ExternalBaseResolver { candidates: Vec, store_registry: Arc, - store_params: ObjectStoreParams, } impl ExternalBaseResolver { pub(super) fn new( candidates: Vec, store_registry: Arc, - store_params: ObjectStoreParams, ) -> Self { Self { candidates, store_registry, - store_params, } } @@ -71,13 +69,13 @@ impl ExternalBaseResolver { &self, uri: &str, ) -> Result> { - let uri_store_prefix = self - .store_registry - .calculate_object_store_prefix(uri, self.store_params.storage_options())?; let uri_path = ObjectStore::extract_path_from_uri(self.store_registry.clone(), uri)?; let mut best_match: Option<(usize, ResolvedExternalBase)> = None; for candidate in &self.candidates { + let uri_store_prefix = self + .store_registry + .calculate_object_store_prefix(uri, candidate.store_params.storage_options())?; if candidate.store_prefix != uri_store_prefix { continue; } @@ -1321,6 +1319,7 @@ fn data_file_key_from_path(path: &str) -> &str { #[cfg(test)] mod tests { + use std::collections::HashMap; use std::sync::Arc; use arrow::{ @@ -1338,7 +1337,9 @@ mod tests { ARROW_EXT_NAME_KEY, BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY, BLOB_V2_EXT_NAME, DataTypeExt, }; use lance_core::datatypes::BlobKind; - use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; + use lance_io::object_store::{ + ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor, + }; use lance_io::stream::RecordBatchStream; use lance_table::format::BasePath; use object_store::{ @@ -1354,7 +1355,7 @@ mod tests { use lance_datagen::{BatchCount, RowCount, array}; use lance_file::version::LanceFileVersion; - use super::{BlobFile, data_file_key_from_path}; + use super::{BlobFile, ExternalBaseCandidate, ExternalBaseResolver, data_file_key_from_path}; use crate::{ Dataset, blob::{BlobArrayBuilder, blob_field}, @@ -1374,9 +1375,86 @@ mod tests { expected: Vec, } + #[cfg(feature = "azure")] + fn azure_store_params(account_name: &str) -> ObjectStoreParams { + ObjectStoreParams { + storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options( + HashMap::from([ + ("account_name".to_string(), account_name.to_string()), + ("account_key".to_string(), "dGVzdA==".to_string()), + ]), + ))), + ..Default::default() + } + } + #[derive(Debug)] struct RejectEmptyRangeObjectStore; + #[cfg(feature = "azure")] + #[tokio::test] + async fn test_external_base_resolver_uses_candidate_store_params() { + let store_registry = Arc::new(ObjectStoreRegistry::default()); + let base_a = BasePath::new( + 1, + "az://container/path-a".to_string(), + Some("base-a".to_string()), + false, + ); + let base_b = BasePath::new( + 2, + "az://container/path-b".to_string(), + Some("base-b".to_string()), + false, + ); + + let base_a_params = azure_store_params("account-a"); + let base_b_params = azure_store_params("account-b"); + + let (store_a, extracted_a) = + ObjectStore::from_uri_and_params(store_registry.clone(), &base_a.path, &base_a_params) + .await + .unwrap(); + let (store_b, extracted_b) = + ObjectStore::from_uri_and_params(store_registry.clone(), &base_b.path, &base_b_params) + .await + .unwrap(); + + let resolver = ExternalBaseResolver::new( + vec![ + ExternalBaseCandidate { + base_id: base_a.id, + store_prefix: store_a.store_prefix.clone(), + base_path: extracted_a, + store_params: base_a_params, + }, + ExternalBaseCandidate { + base_id: base_b.id, + store_prefix: store_b.store_prefix.clone(), + base_path: extracted_b, + store_params: base_b_params, + }, + ], + store_registry, + ); + + let resolved_a = resolver + .resolve_external_uri("az://container/path-a/file.bin") + .await + .unwrap() + .unwrap(); + let resolved_b = resolver + .resolve_external_uri("az://container/path-b/file.bin") + .await + .unwrap() + .unwrap(); + + assert_eq!(resolved_a.base_id, 1); + assert_eq!(resolved_a.relative_path, "file.bin"); + assert_eq!(resolved_b.base_id, 2); + assert_eq!(resolved_b.relative_path, "file.bin"); + } + impl std::fmt::Display for RejectEmptyRangeObjectStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "RejectEmptyRangeObjectStore") diff --git a/rust/lance/src/dataset/builder.rs b/rust/lance/src/dataset/builder.rs index 2d995d3b78f..393ff45c4ea 100644 --- a/rust/lance/src/dataset/builder.rs +++ b/rust/lance/src/dataset/builder.rs @@ -51,6 +51,8 @@ pub struct DatasetBuilder { file_reader_options: Option, /// Storage options that override user-provided options (e.g., from namespace client) storage_options_override: Option>, + /// Runtime-only exact object store bindings keyed by base path URI. + base_store_params: HashMap, } impl std::fmt::Debug for DatasetBuilder { @@ -68,6 +70,7 @@ impl std::fmt::Debug for DatasetBuilder { "storage_options_override", &self.storage_options_override.is_some(), ) + .field("base_store_params", &!self.base_store_params.is_empty()) .finish() } } @@ -86,6 +89,7 @@ impl DatasetBuilder { manifest: None, file_reader_options: None, storage_options_override: None, + base_store_params: HashMap::new(), } } @@ -448,6 +452,22 @@ impl DatasetBuilder { self } + /// Set runtime-only object store params for a specific registered base path. + /// + /// These params are not persisted in the manifest. They are used as-is + /// whenever the dataset resolves an object store for the given + /// `BasePath.path`. Dataset-level store params remain the fallback for bases + /// without an explicit binding. + pub fn with_base_store_params( + mut self, + base_path: impl AsRef, + store_params: ObjectStoreParams, + ) -> Self { + self.base_store_params + .insert(base_path.as_ref().to_string(), store_params); + self + } + /// Set options based on [ReadParams]. pub fn with_read_params(mut self, read_params: ReadParams) -> Self { self = self @@ -496,6 +516,12 @@ impl DatasetBuilder { self } + /// Set exact object store params used as the dataset-level default binding. + pub fn with_store_params(mut self, store_params: ObjectStoreParams) -> Self { + self.options = store_params; + self + } + /// Build a lance object store for the given config pub async fn build_object_store( self, @@ -566,30 +592,41 @@ impl DatasetBuilder { } } + // Runtime per-base overrides are supplied as storage options, but the dataset + // ultimately resolves object stores from ObjectStoreParams. Normalize once in + // the builder so reads only need to look up the prepared params by base path. + fn merge_store_params_with_storage_options( + params: &ObjectStoreParams, + override_options: &HashMap, + ) -> ObjectStoreParams { + if override_options.is_empty() { + return params.clone(); + } + + let mut merged_params = params.clone(); + let mut merged_options = merged_params.storage_options().cloned().unwrap_or_default(); + merged_options.extend(override_options.clone()); + + let storage_options_accessor = match merged_params + .storage_options_accessor + .as_ref() + .and_then(|accessor| accessor.provider().cloned()) + { + Some(provider) => Arc::new(StorageOptionsAccessor::with_initial_and_provider( + merged_options, + provider, + )), + None => Arc::new(StorageOptionsAccessor::with_static_options(merged_options)), + }; + merged_params.storage_options_accessor = Some(storage_options_accessor); + merged_params + } + async fn load_impl(mut self) -> Result { // Apply storage_options_override to merge namespace client options with any existing accessor if let Some(override_opts) = self.storage_options_override.take() { - // Get existing options and merge - let mut merged_opts = self.options.storage_options().cloned().unwrap_or_default(); - // Override with namespace client storage options - they take precedence - merged_opts.extend(override_opts); - - // Update accessor with merged options - if let Some(accessor) = &self.options.storage_options_accessor { - if let Some(provider) = accessor.provider().cloned() { - self.options.storage_options_accessor = Some(Arc::new( - StorageOptionsAccessor::with_initial_and_provider(merged_opts, provider), - )); - } else { - self.options.storage_options_accessor = Some(Arc::new( - StorageOptionsAccessor::with_static_options(merged_opts), - )); - } - } else { - self.options.storage_options_accessor = Some(Arc::new( - StorageOptionsAccessor::with_static_options(merged_opts), - )); - } + self.options = + Self::merge_store_params_with_storage_options(&self.options, &override_opts); } let index_cache_backend = self.index_cache_backend.take(); @@ -617,6 +654,8 @@ impl DatasetBuilder { let file_reader_options = self.file_reader_options.clone(); let store_params = self.options.clone(); + let base_store_params = (!self.base_store_params.is_empty()) + .then(|| Arc::new(std::mem::take(&mut self.base_store_params))); let (object_store, base_path, commit_handler) = self.build_object_store().await?; // Two cases that need to check out after loading the manifest: @@ -669,6 +708,7 @@ impl DatasetBuilder { base_path, commit_handler, Some(store_params), + base_store_params, ) .await?; @@ -706,6 +746,7 @@ impl DatasetBuilder { base_path: Path, commit_handler: Arc, store_params: Option, + base_store_params: Option>>, ) -> Result { let (manifest, location) = if let Some(mut manifest) = manifest { let location = commit_handler @@ -768,6 +809,7 @@ impl DatasetBuilder { commit_handler, file_reader_options, store_params, + base_store_params, ) } } diff --git a/rust/lance/src/dataset/tests/dataset_io.rs b/rust/lance/src/dataset/tests/dataset_io.rs index dbe03ed5122..306580f4aaa 100644 --- a/rust/lance/src/dataset/tests/dataset_io.rs +++ b/rust/lance/src/dataset/tests/dataset_io.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use std::collections::HashMap; use std::sync::Arc; use std::vec; @@ -35,12 +36,13 @@ use lance_datagen::{BatchCount, RowCount, array, gen_batch}; use lance_file::version::LanceFileVersion; use lance_io::assert_io_eq; use lance_table::feature_flags; +use lance_table::format::BasePath; use crate::index::DatasetIndexExt; use futures::TryStreamExt; use lance_index::IndexType; use lance_index::scalar::ScalarIndexParams; -use lance_io::object_store::{ObjectStore, ObjectStoreParams}; +use lance_io::object_store::{ObjectStore, ObjectStoreParams, StorageOptionsAccessor}; use lance_io::utils::tracking_store::IOTracker; use lance_table::io::manifest::read_manifest; use object_store::path::Path; @@ -167,6 +169,67 @@ async fn test_with_object_store_enables_isolated_per_request_io_tracking() { assert_eq!(tracker_b.incremental_stats().read_iops, 0); } +#[cfg(feature = "azure")] +#[tokio::test] +async fn test_object_store_for_base_uses_runtime_base_store_params() { + let test_dir = TempStdDir::default(); + create_file(&test_dir, WriteMode::Create, LanceFileVersion::Stable).await; + let uri = test_dir.to_str().unwrap(); + let dataset = Arc::new(Dataset::open(uri).await.unwrap()); + + let base_a = BasePath::new( + 1, + "az://container/path-a".to_string(), + Some("base-a".to_string()), + true, + ); + let base_b = BasePath::new( + 2, + "az://container/path-b".to_string(), + Some("base-b".to_string()), + true, + ); + dataset + .add_bases(vec![base_a.clone(), base_b.clone()], None) + .await + .unwrap(); + + let base_a_store_params = ObjectStoreParams { + storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options( + HashMap::from([ + ("account_name".to_string(), "account-a".to_string()), + ("account_key".to_string(), "dGVzdA==".to_string()), + ]), + ))), + ..Default::default() + }; + let default_store_params = ObjectStoreParams { + storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options( + HashMap::from([ + ("account_name".to_string(), "account-b".to_string()), + ("account_key".to_string(), "dGVzdA==".to_string()), + ]), + ))), + ..Default::default() + }; + + let dataset = DatasetBuilder::from_uri(uri) + .with_store_params(default_store_params) + .with_base_store_params(&base_a.path, base_a_store_params) + .load() + .await + .unwrap(); + + let store_a = dataset.object_store_for_base(1).await.unwrap(); + let store_a_again = dataset.object_store_for_base(1).await.unwrap(); + let store_b = dataset.object_store_for_base(2).await.unwrap(); + + assert!(Arc::ptr_eq(&store_a, &store_a_again)); + assert!(!Arc::ptr_eq(&store_a, &store_b)); + assert_eq!(store_a.store_prefix, "az$container@account-a"); + assert_eq!(store_b.store_prefix, "az$container@account-b"); +} + #[rstest] #[lance_test_macros::test(tokio::test)] async fn test_create_dataset( diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 46c4e47c81d..6364bbf60a3 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -213,6 +213,8 @@ pub struct WriteParams { pub store_params: Option, + pub base_store_params: Option>, + pub progress: Arc, /// Optional callback invoked after each batch is written. @@ -310,6 +312,7 @@ impl Default for WriteParams { max_bytes_per_file: 90 * 1024 * 1024 * 1024, // 90 GB mode: WriteMode::Create, store_params: None, + base_store_params: None, progress: Arc::new(NoopFragmentWriteProgress::new()), write_progress: None, commit_handler: None, @@ -350,6 +353,21 @@ impl WriteParams { .unwrap_or_default() } + /// Set exact runtime object store params for a registered base path. + /// + /// These params are used as-is for that base. The write-level default + /// `store_params` remain the fallback for bases without an explicit binding. + pub fn with_base_store_params( + mut self, + base_path: impl AsRef, + store_params: ObjectStoreParams, + ) -> Self { + self.base_store_params + .get_or_insert_with(HashMap::new) + .insert(base_path.as_ref().to_string(), store_params); + self + } + /// Set the properties for this WriteParams. pub fn with_transaction_properties(self, properties: HashMap) -> Self { Self { @@ -627,7 +645,6 @@ pub async fn validate_and_resolve_target_bases( .unwrap_or_default(); if let Some(target_bases) = &target_base_ids { - let store_params = params.store_params.clone().unwrap_or_default(); let mut bases_info = Vec::new(); for &target_base_id in target_bases { @@ -638,6 +655,7 @@ pub async fn validate_and_resolve_target_bases( )) })?; + let store_params = write_store_params_for_base(params, &base_path.path); let (target_object_store, extracted_path) = ObjectStore::from_uri_and_params( store_registry.clone(), &base_path.path, @@ -663,6 +681,7 @@ fn append_external_base_candidate( base_path: &BasePath, store_prefix: String, extracted_path: Path, + store_params: ObjectStoreParams, candidates: &mut Vec, seen_base_ids: &mut HashSet, ) { @@ -674,29 +693,50 @@ fn append_external_base_candidate( base_id: base_path.id, store_prefix, base_path: extracted_path, + store_params, }); } } +fn write_store_params_for_base(params: &WriteParams, base_path: &str) -> ObjectStoreParams { + params + .base_store_params + .as_ref() + .and_then(|base_store_params| base_store_params.get(base_path)) + .cloned() + .unwrap_or_else(|| params.store_params.clone().unwrap_or_default()) +} + +fn dataset_store_params_for_base(dataset: &Dataset, base_path: &str) -> ObjectStoreParams { + dataset + .base_store_params + .as_ref() + .and_then(|base_store_params| base_store_params.get(base_path)) + .cloned() + .unwrap_or_else(|| dataset.store_params.as_deref().cloned().unwrap_or_default()) +} + async fn append_external_initial_bases( initial_bases: Option<&Vec>, store_registry: Arc, - store_params: &ObjectStoreParams, + params: &WriteParams, candidates: &mut Vec, seen_base_ids: &mut HashSet, ) -> Result<()> { if let Some(initial_bases) = initial_bases { for base_path in initial_bases { + let store_params = write_store_params_for_base(params, &base_path.path); let (store, extracted_path) = ObjectStore::from_uri_and_params( store_registry.clone(), &base_path.path, - store_params, + &store_params, ) .await?; append_external_base_candidate( base_path, store.store_prefix.clone(), extracted_path, + store_params, candidates, seen_base_ids, ); @@ -712,13 +752,13 @@ async fn build_external_base_resolver( let store_registry = dataset .map(|ds| ds.session.store_registry()) .unwrap_or_else(|| params.store_registry()); - let store_params = params.store_params.clone().unwrap_or_default(); let mut seen_base_ids = HashSet::new(); let mut candidates = vec![]; if let Some(dataset) = dataset { for base_path in dataset.manifest.base_paths.values() { + let store_params = dataset_store_params_for_base(dataset, &base_path.path); let (store, extracted_path) = ObjectStore::from_uri_and_params( store_registry.clone(), &base_path.path, @@ -729,6 +769,7 @@ async fn build_external_base_resolver( base_path, store.store_prefix.clone(), extracted_path, + store_params, &mut candidates, &mut seen_base_ids, ); @@ -738,17 +779,13 @@ async fn build_external_base_resolver( append_external_initial_bases( params.initial_bases.as_ref(), store_registry.clone(), - &store_params, + params, &mut candidates, &mut seen_base_ids, ) .await?; - Ok(ExternalBaseResolver::new( - candidates, - store_registry, - store_params, - )) + Ok(ExternalBaseResolver::new(candidates, store_registry)) } /// Writes the given data to the dataset and returns fragments. @@ -1330,6 +1367,7 @@ impl Iterator for SpillStreamIter { #[cfg(test)] mod tests { use super::*; + use std::collections::HashMap; use arrow_array::{Int32Array, RecordBatchIterator, RecordBatchReader, StructArray}; use arrow_schema::{DataType, Field as ArrowField, Fields, Schema as ArrowSchema}; @@ -1338,7 +1376,9 @@ mod tests { use futures::TryStreamExt; use lance_datagen::{BatchCount, RowCount, array, gen_batch}; use lance_file::previous::reader::FileReader as PreviousFileReader; + use lance_io::object_store::StorageOptionsAccessor; use lance_io::traits::Reader; + use lance_table::format::BasePath; #[tokio::test] async fn test_chunking_large_batches() { @@ -1784,6 +1824,65 @@ mod tests { assert_eq!(batch, data); } + #[cfg(feature = "azure")] + fn azure_store_params(account_name: &str) -> ObjectStoreParams { + ObjectStoreParams { + storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options( + HashMap::from([ + ("account_name".to_string(), account_name.to_string()), + ("account_key".to_string(), "dGVzdA==".to_string()), + ]), + ))), + ..Default::default() + } + } + + #[cfg(feature = "azure")] + #[tokio::test] + async fn test_validate_and_resolve_target_bases_uses_base_store_params() { + let mut params = WriteParams::default() + .with_target_bases(vec![1, 2]) + .with_base_store_params("az://container/path-a", azure_store_params("account-a")) + .with_base_store_params("az://container/path-b", azure_store_params("account-b")); + + let existing_base_paths = HashMap::from([ + ( + 1, + BasePath::new( + 1, + "az://container/path-a".to_string(), + Some("base-a".to_string()), + false, + ), + ), + ( + 2, + BasePath::new( + 2, + "az://container/path-b".to_string(), + Some("base-b".to_string()), + false, + ), + ), + ]); + + let target_bases = + validate_and_resolve_target_bases(&mut params, Some(&existing_base_paths)) + .await + .unwrap() + .unwrap(); + + assert_eq!(target_bases.len(), 2); + assert_eq!( + target_bases[0].object_store.store_prefix, + "az$container@account-a" + ); + assert_eq!( + target_bases[1].object_store.store_prefix, + "az$container@account-b" + ); + } + #[tokio::test] async fn test_explicit_data_file_bases_writer_generator() { use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema}; diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index ddf1b769425..de64fde8987 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -404,6 +404,7 @@ impl<'a> CommitBuilder<'a> { metadata_cache, file_reader_options: None, store_params: self.store_params.clone().map(Box::new), + base_store_params: None, }) } }