Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/lance-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ bytes.workspace = true
chrono.workspace = true
deepsize.workspace = true
futures.workspace = true
http.workspace = true
log.workspace = true
pin-project.workspace = true
prost.workspace = true
Expand Down
100 changes: 100 additions & 0 deletions rust/lance-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use object_store::aws::AwsCredentialProvider;
use object_store::DynObjectStore;
use object_store::Error as ObjectStoreError;
use object_store::{path::Path, ObjectMeta, ObjectStore as OSObjectStore};
#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
use object_store::{ClientOptions, HeaderMap, HeaderValue};
use providers::local::FileStoreProvider;
use providers::memory::MemoryStoreProvider;
use snafu::location;
Expand Down Expand Up @@ -894,6 +896,42 @@ impl StorageOptions {
self.0.get(key)
}

/// Build [`ClientOptions`] with default headers extracted from `headers.*` keys.
///
/// Keys prefixed with `headers.` are parsed into HTTP headers. For example,
/// `headers.x-ms-version = 2023-11-03` results in a default header
/// `x-ms-version: 2023-11-03`.
///
/// Returns an error if any `headers.*` key has an invalid header name or value.
#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
pub fn client_options(&self) -> Result<ClientOptions> {
let mut headers = HeaderMap::new();
for (key, value) in &self.0 {
if let Some(header_name) = key.strip_prefix("headers.") {
let name = header_name
.parse::<http::header::HeaderName>()
.map_err(|e| {
Error::invalid_input(
format!("invalid header name '{header_name}': {e}"),
location!(),
)
})?;
let val = HeaderValue::from_str(value).map_err(|e| {
Error::invalid_input(
format!("invalid header value for '{header_name}': {e}"),
location!(),
)
})?;
headers.insert(name, val);
}
}
let mut client_options = ClientOptions::default();
if !headers.is_empty() {
client_options = client_options.with_default_headers(headers);
}
Ok(client_options)
}

/// Get the expiration time in milliseconds since epoch, if present
pub fn expires_at_millis(&self) -> Option<u64> {
self.0
Expand Down Expand Up @@ -1380,4 +1418,66 @@ mod tests {
let copied_content = std::fs::read(&dest_file).unwrap();
assert_eq!(copied_content, b"test content");
}

#[test]
#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
fn test_client_options_extracts_headers() {
let opts = StorageOptions(HashMap::from([
("headers.x-custom-foo".to_string(), "bar".to_string()),
("headers.x-ms-version".to_string(), "2023-11-03".to_string()),
("region".to_string(), "us-west-2".to_string()),
]));
let client_options = opts.client_options().unwrap();

// Verify non-header keys are not consumed as headers by creating
// another StorageOptions with no headers.* keys.
let opts_no_headers = StorageOptions(HashMap::from([(
"region".to_string(),
"us-west-2".to_string(),
)]));
opts_no_headers.client_options().unwrap();

// Smoke test: the client_options with headers should be usable
// in a builder (we can't inspect the headers directly, but building
// should not fail).
#[cfg(feature = "gcp")]
{
use object_store::gcp::GoogleCloudStorageBuilder;
let _builder = GoogleCloudStorageBuilder::new()
.with_client_options(client_options)
.with_url("gs://test-bucket");
}
}

#[test]
#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
fn test_client_options_rejects_invalid_header_name() {
let opts = StorageOptions(HashMap::from([(
"headers.bad header".to_string(),
"value".to_string(),
)]));
let err = opts.client_options().unwrap_err();
assert!(err.to_string().contains("invalid header name"));
}

#[test]
#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
fn test_client_options_rejects_invalid_header_value() {
let opts = StorageOptions(HashMap::from([(
"headers.x-good-name".to_string(),
"bad\x01value".to_string(),
)]));
let err = opts.client_options().unwrap_err();
assert!(err.to_string().contains("invalid header value"));
}

#[test]
#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
fn test_client_options_empty_when_no_header_keys() {
let opts = StorageOptions(HashMap::from([
("region".to_string(), "us-east-1".to_string()),
("access_key_id".to_string(), "AKID".to_string()),
]));
opts.client_options().unwrap();
}
}
3 changes: 2 additions & 1 deletion rust/lance-io/src/object_store/providers/aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ impl AwsStoreProvider {
base_path.set_query(None);

// we can't use parse_url_opts here because we need to manually set the credentials provider
let mut builder = AmazonS3Builder::new();
let mut builder =
AmazonS3Builder::new().with_client_options(storage_options.client_options()?);
for (key, value) in s3_storage_options {
builder = builder.with_config(key, value);
}
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-io/src/object_store/providers/azure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ impl AzureBlobStoreProvider {

let mut builder = MicrosoftAzureBuilder::new()
.with_url(base_path.as_ref())
.with_retry(retry_config);
.with_retry(retry_config)
.with_client_options(storage_options.client_options()?);
for (key, value) in storage_options.as_azure_options() {
builder = builder.with_config(key, value);
}
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-io/src/object_store/providers/gcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ impl GcsStoreProvider {

let mut builder = GoogleCloudStorageBuilder::new()
.with_url(base_path.as_ref())
.with_retry(retry_config);
.with_retry(retry_config)
.with_client_options(storage_options.client_options()?);
for (key, value) in storage_options.as_gcs_options() {
builder = builder.with_config(key, value);
}
Expand Down
Loading