Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ pub struct Dataset {
/// Object store parameters used when opening this dataset.
/// These are used when creating object stores for additional base paths.
pub(crate) store_params: Option<Box<ObjectStoreParams>>,
/// Optional runtime-only object store parameters keyed by base path URI.
pub(crate) base_store_params: Option<Arc<HashMap<String, ObjectStoreParams>>>,
}

impl std::fmt::Debug for Dataset {
Expand All @@ -186,6 +188,7 @@ impl std::fmt::Debug for Dataset {
.field("base", &self.base)
.field("version", &self.manifest.version)
.field("cache_num_items", &self.session.approx_num_items())
.field("base_store_params", &self.base_store_params.is_some())
.finish()
}
}
Expand Down Expand Up @@ -577,6 +580,7 @@ impl Dataset {
self.commit_handler.clone(),
self.file_reader_options.clone(),
self.store_params.as_deref().cloned(),
self.base_store_params.clone(),
)
}

Expand Down Expand Up @@ -695,6 +699,7 @@ impl Dataset {
commit_handler: Arc<dyn CommitHandler>,
file_reader_options: Option<FileReaderOptions>,
store_params: Option<ObjectStoreParams>,
base_store_params: Option<Arc<HashMap<String, ObjectStoreParams>>>,
) -> Result<Self> {
let refs = Refs::new(
object_store.clone(),
Expand Down Expand Up @@ -722,6 +727,7 @@ impl Dataset {
index_cache,
file_reader_options,
store_params: store_params.map(Box::new),
base_store_params,
})
}

Expand Down Expand Up @@ -1631,6 +1637,23 @@ impl Dataset {
cloned
}

fn store_params_for_base(
&self,
base_path: Option<&lance_table::format::BasePath>,
) -> ObjectStoreParams {
// Base-specific bindings are exact ObjectStoreParams keyed by
// `BasePath.path`. If a base has no explicit binding then reads fall back
// to the dataset-level default store params.
base_path
.and_then(|base_path| {
self.base_store_params
.as_ref()
.and_then(|params| params.get(&base_path.path))
})
.cloned()
.unwrap_or_else(|| self.store_params.as_deref().cloned().unwrap_or_default())
}

/// Returns the initial storage options used when opening this dataset, if any.
///
/// This returns the static initial options without triggering any refresh.
Expand Down Expand Up @@ -1739,11 +1762,12 @@ impl Dataset {
let base_path = self.manifest.base_paths.get(&base_id).ok_or_else(|| {
Error::invalid_input(format!("Dataset base path with ID {} not found", base_id))
})?;
let store_params = self.store_params_for_base(Some(base_path));

let (store, _) = ObjectStore::from_uri_and_params(
self.session.store_registry(),
&base_path.path,
&self.store_params.as_deref().cloned().unwrap_or_default(),
&store_params,
)
.await?;

Expand Down Expand Up @@ -2564,6 +2588,7 @@ pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<'
dataset.commit_handler.clone(),
dataset.file_reader_options.clone(),
dataset.store_params.as_deref().cloned(),
dataset.base_store_params.clone(),
)?;
let loaded =
Arc::new(dataset_version.read_transaction().await?.ok_or_else(|| {
Expand Down Expand Up @@ -2595,6 +2620,7 @@ pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<'
dataset.commit_handler.clone(),
dataset.file_reader_options.clone(),
dataset.store_params.as_deref().cloned(),
dataset.base_store_params.clone(),
)
} else {
// If we didn't get the latest manifest, we can still return the dataset
Expand Down
94 changes: 86 additions & 8 deletions rust/lance/src/dataset/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,39 +45,37 @@ pub(super) struct ExternalBaseCandidate {
pub base_id: u32,
pub store_prefix: String,
pub base_path: Path,
pub store_params: ObjectStoreParams,
}

#[derive(Debug)]
pub(super) struct ExternalBaseResolver {
candidates: Vec<ExternalBaseCandidate>,
store_registry: Arc<ObjectStoreRegistry>,
store_params: ObjectStoreParams,
}

impl ExternalBaseResolver {
pub(super) fn new(
candidates: Vec<ExternalBaseCandidate>,
store_registry: Arc<ObjectStoreRegistry>,
store_params: ObjectStoreParams,
) -> Self {
Self {
candidates,
store_registry,
store_params,
}
}

pub(crate) async fn resolve_external_uri(
&self,
uri: &str,
) -> Result<Option<ResolvedExternalBase>> {
let uri_store_prefix = self
.store_registry
.calculate_object_store_prefix(uri, self.store_params.storage_options())?;
let uri_path = ObjectStore::extract_path_from_uri(self.store_registry.clone(), uri)?;

let mut best_match: Option<(usize, ResolvedExternalBase)> = None;
for candidate in &self.candidates {
let uri_store_prefix = self
.store_registry
.calculate_object_store_prefix(uri, candidate.store_params.storage_options())?;
if candidate.store_prefix != uri_store_prefix {
continue;
}
Expand Down Expand Up @@ -1321,6 +1319,7 @@ fn data_file_key_from_path(path: &str) -> &str {

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;

use arrow::{
Expand All @@ -1338,7 +1337,9 @@ mod tests {
ARROW_EXT_NAME_KEY, BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY, BLOB_V2_EXT_NAME, DataTypeExt,
};
use lance_core::datatypes::BlobKind;
use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
use lance_io::object_store::{
ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor,
};
use lance_io::stream::RecordBatchStream;
use lance_table::format::BasePath;
use object_store::{
Expand All @@ -1354,7 +1355,7 @@ mod tests {
use lance_datagen::{BatchCount, RowCount, array};
use lance_file::version::LanceFileVersion;

use super::{BlobFile, data_file_key_from_path};
use super::{BlobFile, ExternalBaseCandidate, ExternalBaseResolver, data_file_key_from_path};
use crate::{
Dataset,
blob::{BlobArrayBuilder, blob_field},
Expand All @@ -1374,9 +1375,86 @@ mod tests {
expected: Vec<u8>,
}

#[cfg(feature = "azure")]
fn azure_store_params(account_name: &str) -> ObjectStoreParams {
ObjectStoreParams {
storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
HashMap::from([
("account_name".to_string(), account_name.to_string()),
("account_key".to_string(), "dGVzdA==".to_string()),
]),
))),
..Default::default()
}
}

#[derive(Debug)]
struct RejectEmptyRangeObjectStore;

#[cfg(feature = "azure")]
#[tokio::test]
async fn test_external_base_resolver_uses_candidate_store_params() {
let store_registry = Arc::new(ObjectStoreRegistry::default());
let base_a = BasePath::new(
1,
"az://container/path-a".to_string(),
Some("base-a".to_string()),
false,
);
let base_b = BasePath::new(
2,
"az://container/path-b".to_string(),
Some("base-b".to_string()),
false,
);

let base_a_params = azure_store_params("account-a");
let base_b_params = azure_store_params("account-b");

let (store_a, extracted_a) =
ObjectStore::from_uri_and_params(store_registry.clone(), &base_a.path, &base_a_params)
.await
.unwrap();
let (store_b, extracted_b) =
ObjectStore::from_uri_and_params(store_registry.clone(), &base_b.path, &base_b_params)
.await
.unwrap();

let resolver = ExternalBaseResolver::new(
vec![
ExternalBaseCandidate {
base_id: base_a.id,
store_prefix: store_a.store_prefix.clone(),
base_path: extracted_a,
store_params: base_a_params,
},
ExternalBaseCandidate {
base_id: base_b.id,
store_prefix: store_b.store_prefix.clone(),
base_path: extracted_b,
store_params: base_b_params,
},
],
store_registry,
);

let resolved_a = resolver
.resolve_external_uri("az://container/path-a/file.bin")
.await
.unwrap()
.unwrap();
let resolved_b = resolver
.resolve_external_uri("az://container/path-b/file.bin")
.await
.unwrap()
.unwrap();

assert_eq!(resolved_a.base_id, 1);
assert_eq!(resolved_a.relative_path, "file.bin");
assert_eq!(resolved_b.base_id, 2);
assert_eq!(resolved_b.relative_path, "file.bin");
}

impl std::fmt::Display for RejectEmptyRangeObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "RejectEmptyRangeObjectStore")
Expand Down
Loading
Loading