diff --git a/Cargo.lock b/Cargo.lock index 4a76885be57..f9a5443d2b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5182,6 +5182,7 @@ dependencies = [ "criterion", "deepsize", "futures", + "http 1.4.0", "lance-arrow", "lance-core", "lance-namespace", diff --git a/python/Cargo.lock b/python/Cargo.lock index 0a41ecccc5a..5e365395797 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -4267,6 +4267,7 @@ dependencies = [ "chrono", "deepsize", "futures", + "http 1.4.0", "lance-arrow", "lance-core", "lance-namespace", diff --git a/rust/lance-io/Cargo.toml b/rust/lance-io/Cargo.toml index fd6e0345c2f..0cd8f75747a 100644 --- a/rust/lance-io/Cargo.toml +++ b/rust/lance-io/Cargo.toml @@ -36,6 +36,7 @@ bytes.workspace = true chrono.workspace = true deepsize.workspace = true futures.workspace = true +http.workspace = true log.workspace = true pin-project.workspace = true prost.workspace = true diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index 626055a2b11..8644351cf97 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -24,6 +24,8 @@ use object_store::aws::AwsCredentialProvider; use object_store::DynObjectStore; use object_store::Error as ObjectStoreError; use object_store::{path::Path, ObjectMeta, ObjectStore as OSObjectStore}; +#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))] +use object_store::{ClientOptions, HeaderMap, HeaderValue}; use providers::local::FileStoreProvider; use providers::memory::MemoryStoreProvider; use snafu::location; @@ -894,6 +896,42 @@ impl StorageOptions { self.0.get(key) } + /// Build [`ClientOptions`] with default headers extracted from `headers.*` keys. + /// + /// Keys prefixed with `headers.` are parsed into HTTP headers. For example, + /// `headers.x-ms-version = 2023-11-03` results in a default header + /// `x-ms-version: 2023-11-03`. + /// + /// Returns an error if any `headers.*` key has an invalid header name or value. + #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))] + pub fn client_options(&self) -> Result { + let mut headers = HeaderMap::new(); + for (key, value) in &self.0 { + if let Some(header_name) = key.strip_prefix("headers.") { + let name = header_name + .parse::() + .map_err(|e| { + Error::invalid_input( + format!("invalid header name '{header_name}': {e}"), + location!(), + ) + })?; + let val = HeaderValue::from_str(value).map_err(|e| { + Error::invalid_input( + format!("invalid header value for '{header_name}': {e}"), + location!(), + ) + })?; + headers.insert(name, val); + } + } + let mut client_options = ClientOptions::default(); + if !headers.is_empty() { + client_options = client_options.with_default_headers(headers); + } + Ok(client_options) + } + /// Get the expiration time in milliseconds since epoch, if present pub fn expires_at_millis(&self) -> Option { self.0 @@ -1380,4 +1418,66 @@ mod tests { let copied_content = std::fs::read(&dest_file).unwrap(); assert_eq!(copied_content, b"test content"); } + + #[test] + #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))] + fn test_client_options_extracts_headers() { + let opts = StorageOptions(HashMap::from([ + ("headers.x-custom-foo".to_string(), "bar".to_string()), + ("headers.x-ms-version".to_string(), "2023-11-03".to_string()), + ("region".to_string(), "us-west-2".to_string()), + ])); + let client_options = opts.client_options().unwrap(); + + // Verify non-header keys are not consumed as headers by creating + // another StorageOptions with no headers.* keys. + let opts_no_headers = StorageOptions(HashMap::from([( + "region".to_string(), + "us-west-2".to_string(), + )])); + opts_no_headers.client_options().unwrap(); + + // Smoke test: the client_options with headers should be usable + // in a builder (we can't inspect the headers directly, but building + // should not fail). + #[cfg(feature = "gcp")] + { + use object_store::gcp::GoogleCloudStorageBuilder; + let _builder = GoogleCloudStorageBuilder::new() + .with_client_options(client_options) + .with_url("gs://test-bucket"); + } + } + + #[test] + #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))] + fn test_client_options_rejects_invalid_header_name() { + let opts = StorageOptions(HashMap::from([( + "headers.bad header".to_string(), + "value".to_string(), + )])); + let err = opts.client_options().unwrap_err(); + assert!(err.to_string().contains("invalid header name")); + } + + #[test] + #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))] + fn test_client_options_rejects_invalid_header_value() { + let opts = StorageOptions(HashMap::from([( + "headers.x-good-name".to_string(), + "bad\x01value".to_string(), + )])); + let err = opts.client_options().unwrap_err(); + assert!(err.to_string().contains("invalid header value")); + } + + #[test] + #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))] + fn test_client_options_empty_when_no_header_keys() { + let opts = StorageOptions(HashMap::from([ + ("region".to_string(), "us-east-1".to_string()), + ("access_key_id".to_string(), "AKID".to_string()), + ])); + opts.client_options().unwrap(); + } } diff --git a/rust/lance-io/src/object_store/providers/aws.rs b/rust/lance-io/src/object_store/providers/aws.rs index 982470581f2..a69eb1c8ec3 100644 --- a/rust/lance-io/src/object_store/providers/aws.rs +++ b/rust/lance-io/src/object_store/providers/aws.rs @@ -78,7 +78,8 @@ impl AwsStoreProvider { base_path.set_query(None); // we can't use parse_url_opts here because we need to manually set the credentials provider - let mut builder = AmazonS3Builder::new(); + let mut builder = + AmazonS3Builder::new().with_client_options(storage_options.client_options()?); for (key, value) in s3_storage_options { builder = builder.with_config(key, value); } diff --git a/rust/lance-io/src/object_store/providers/azure.rs b/rust/lance-io/src/object_store/providers/azure.rs index 7bf566c8972..6b5e70227ec 100644 --- a/rust/lance-io/src/object_store/providers/azure.rs +++ b/rust/lance-io/src/object_store/providers/azure.rs @@ -81,7 +81,8 @@ impl AzureBlobStoreProvider { let mut builder = MicrosoftAzureBuilder::new() .with_url(base_path.as_ref()) - .with_retry(retry_config); + .with_retry(retry_config) + .with_client_options(storage_options.client_options()?); for (key, value) in storage_options.as_azure_options() { builder = builder.with_config(key, value); } diff --git a/rust/lance-io/src/object_store/providers/gcp.rs b/rust/lance-io/src/object_store/providers/gcp.rs index dba5cd8dd40..52c6cbbbdc9 100644 --- a/rust/lance-io/src/object_store/providers/gcp.rs +++ b/rust/lance-io/src/object_store/providers/gcp.rs @@ -74,7 +74,8 @@ impl GcsStoreProvider { let mut builder = GoogleCloudStorageBuilder::new() .with_url(base_path.as_ref()) - .with_retry(retry_config); + .with_retry(retry_config) + .with_client_options(storage_options.client_options()?); for (key, value) in storage_options.as_gcs_options() { builder = builder.with_config(key, value); }