diff --git a/Cargo.toml b/Cargo.toml index 0406f31e4ea..4f07fff49de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ exclude = ["python", "java/lance-jni"] # Python package needs to be built by maturin. resolver = "2" + [workspace.package] version = "0.40.0-beta.1" edition = "2021" diff --git a/java/src/test/java/com/lancedb/lance/FileReaderWriterTest.java b/java/src/test/java/com/lancedb/lance/FileReaderWriterTest.java index 0e52e351614..04f570b569a 100644 --- a/java/src/test/java/com/lancedb/lance/FileReaderWriterTest.java +++ b/java/src/test/java/com/lancedb/lance/FileReaderWriterTest.java @@ -219,14 +219,18 @@ void testWriteNoData(@TempDir Path tempDir) throws Exception { } @Test - void testWriteWithStorage(@TempDir Path tempDir) { + void testWriteWithStorage(@TempDir Path tempDir) throws IOException { String filePath = "az://fail_bucket" + tempDir.resolve("test_write_with_storage"); BufferAllocator allocator = new RootAllocator(); Map storageOptions = new HashMap<>(); try { LanceFileWriter.open(filePath, allocator, null, storageOptions); - } catch (IOException e) { - assertTrue(e.getMessage().contains("Account must be specified")); + } catch (IllegalArgumentException e) { + assertTrue( + e.getMessage() + .contains( + "Unable to find object store prefix: no Azure account " + + "name in URI, and no storage account configured.")); } storageOptions.put("account_name", "some_account"); diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index d9fcbbc9290..94967eb93b6 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -145,13 +145,9 @@ impl std::fmt::Display for ObjectStore { pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync { /// Wrap an object store with additional functionality /// - /// The storage_options contain namespace information (e.g., azure_storage_account_name) - /// that wrappers may need for proper isolation - fn wrap( - &self, - original: Arc, - storage_options: Option<&HashMap>, - ) -> Arc; + /// The store_prefix is a string which uniquely identifies the object + /// store being wrapped. + fn wrap(&self, store_prefix: &str, original: Arc) -> Arc; } #[derive(Debug, Clone)] @@ -170,14 +166,10 @@ impl ChainedWrappingObjectStore { } impl WrappingObjectStore for ChainedWrappingObjectStore { - fn wrap( - &self, - original: Arc, - storage_options: Option<&HashMap>, - ) -> Arc { + fn wrap(&self, store_prefix: &str, original: Arc) -> Arc { self.wrappers .iter() - .fold(original, |acc, wrapper| wrapper.wrap(acc, storage_options)) + .fold(original, |acc, wrapper| wrapper.wrap(store_prefix, acc)) } } @@ -353,8 +345,10 @@ impl ObjectStore { #[allow(deprecated)] if let Some((store, path)) = params.object_store.as_ref() { let mut inner = store.clone(); + let store_prefix = + registry.calculate_object_store_prefix(uri, params.storage_options.as_ref())?; if let Some(wrapper) = params.object_store_wrapper.as_ref() { - inner = wrapper.wrap(inner, params.storage_options.as_ref()); + inner = wrapper.wrap(&store_prefix, inner); } let store = Self { inner, @@ -734,6 +728,9 @@ impl From> for StorageOptions { } } +static DEFAULT_OBJECT_STORE_REGISTRY: std::sync::LazyLock = + std::sync::LazyLock::new(ObjectStoreRegistry::default); + impl ObjectStore { #[allow(clippy::too_many_arguments)] pub fn new( @@ -751,7 +748,12 @@ impl ObjectStore { let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme)); let store = match wrapper { - Some(wrapper) => wrapper.wrap(store, storage_options), + Some(wrapper) => { + let store_prefix = DEFAULT_OBJECT_STORE_REGISTRY + .calculate_object_store_prefix(location.as_ref(), storage_options) + .unwrap(); + wrapper.wrap(&store_prefix, store) + } None => store, }; @@ -984,8 +986,8 @@ mod tests { impl WrappingObjectStore for TestWrapper { fn wrap( &self, + _store_prefix: &str, _original: Arc, - _storage_options: Option<&HashMap>, ) -> Arc { self.called.store(true, Ordering::Relaxed); diff --git a/rust/lance-io/src/object_store/providers.rs b/rust/lance-io/src/object_store/providers.rs index 3ac6be93d6f..633017d4aeb 100644 --- a/rust/lance-io/src/object_store/providers.rs +++ b/rust/lance-io/src/object_store/providers.rs @@ -41,23 +41,25 @@ pub trait ObjectStoreProvider: std::fmt::Debug + Sync + Send { }) } - /// Generate a cache URL for this provider. + /// Calculate the unique prefix that should be used for this object store. /// - /// Providers can override this to implement custom cache key generation - /// that takes into account provider-specific requirements like namespace - /// isolation. - fn cache_url(&self, url: &Url) -> String { - if ["file", "file-object-store", "memory"].contains(&url.scheme()) { - // For file URLs, cache the URL without the path. - // The path can be different for different object stores, - // but we want to cache the object store itself. - format!("{}://", url.scheme()) - } else { - // Bucket is parsed as domain, so drop the path. - let mut url = url.clone(); - url.set_path(""); - url.to_string() - } + /// For object stores that don't have the concept of buckets, this will just be something like + /// 'file' or 'memory'. + /// + /// In object stores where all bucket names are unique, like s3, this will be + /// simply 's3$my_bucket_name' or similar. + /// + /// In Azure, only the combination of (account name, container name) is unique, so + /// this will be something like 'az$account_name@container' + /// + /// Providers should override this if they have special requirements like Azure's. + fn calculate_object_store_prefix( + &self, + scheme: &str, + authority: &str, + _storage_options: Option<&HashMap>, + ) -> Result { + Ok(format!("{}${}", scheme, authority)) } } @@ -141,6 +143,15 @@ impl ObjectStoreRegistry { output } + fn scheme_not_found_error(&self, scheme: &str) -> Error { + let mut message = format!("No object store provider found for scheme: '{}'", scheme); + if let Ok(providers) = self.providers.read() { + let valid_schemes = providers.keys().cloned().collect::>().join(", "); + message.push_str(&format!("\nValid schemes: {}", valid_schemes)); + } + Error::invalid_input(message, location!()) + } + /// Get an object store for a given base path and parameters. /// /// If the object store is already in use, it will return a strong reference @@ -153,16 +164,15 @@ impl ObjectStoreRegistry { ) -> Result> { let scheme = base_path.scheme(); let Some(provider) = self.get_provider(scheme) else { - let mut message = format!("No object store provider found for scheme: '{}'", scheme); - if let Ok(providers) = self.providers.read() { - let valid_schemes = providers.keys().cloned().collect::>().join(", "); - message.push_str(&format!("\nValid schemes: {}", valid_schemes)); - } - return Err(Error::invalid_input(message, location!())); + return Err(self.scheme_not_found_error(scheme)); }; - let cache_path = provider.cache_url(&base_path); - let cache_key = (cache_path, params.clone()); + let cache_path = provider.calculate_object_store_prefix( + base_path.scheme(), + base_path.authority(), + params.storage_options.as_ref(), + )?; + let cache_key = (cache_path.clone(), params.clone()); // Check if we have a cached store for this base path and params { @@ -197,7 +207,7 @@ impl ObjectStoreRegistry { store.inner = store.inner.traced(); if let Some(wrapper) = ¶ms.object_store_wrapper { - store.inner = wrapper.wrap(store.inner, params.storage_options.as_ref()); + store.inner = wrapper.wrap(&cache_path, store.inner); } let store = Arc::new(store); @@ -210,6 +220,45 @@ impl ObjectStoreRegistry { Ok(store) } + + /// Calculate the datastore prefix based on the URI and the storage options. + /// The data store prefix should uniquely identify the datastore. + pub fn calculate_object_store_prefix( + &self, + uri: &str, + storage_options: Option<&HashMap>, + ) -> Result { + let (scheme, authority) = match uri.find("://") { + None => { + // If there is no scheme, this is a file:// URI. + return Ok("file".to_string()); + } + Some(index) => { + let scheme = &uri[..index]; + let remainder = &uri[index + 3..]; + let authority = match remainder.find("/") { + None => remainder, + Some(sindex) => &remainder[..sindex], + }; + (scheme, authority) + } + }; + match self.get_provider(scheme) { + None => { + if scheme.len() == 1 { + // On Windows, drive letters such as C:/ can sometimes be confused for schemes. + // So if there is no known object store for this single-letter scheme, treat it + // as the local store. + Ok("file".to_string()) + } else { + Err(self.scheme_not_found_error(scheme)) + } + } + Some(provider) => { + provider.calculate_object_store_prefix(scheme, authority, storage_options) + } + } + } } impl Default for ObjectStoreRegistry { @@ -262,39 +311,77 @@ impl ObjectStoreRegistry { mod tests { use super::*; - #[test] - fn test_cache_url() { - // Test the default cache_url implementation using a dummy provider - #[derive(Debug)] - struct DummyProvider; - - #[async_trait::async_trait] - impl ObjectStoreProvider for DummyProvider { - async fn new_store( - &self, - _base_path: Url, - _params: &ObjectStoreParams, - ) -> Result { - unreachable!("This test doesn't create stores") - } + #[derive(Debug)] + struct DummyProvider; + + #[async_trait::async_trait] + impl ObjectStoreProvider for DummyProvider { + async fn new_store( + &self, + _base_path: Url, + _params: &ObjectStoreParams, + ) -> Result { + unreachable!("This test doesn't create stores") } + } + #[test] + fn test_calculate_object_store_prefix() { let provider = DummyProvider; - let cases = [ - ("s3://bucket/path?param=value", "s3://bucket?param=value"), - ("file:///path/to/file", "file://"), - ("file-object-store:///path/to/file", "file-object-store://"), - ("memory:///", "memory://"), - ( - "http://example.com/path?param=value", - "http://example.com/?param=value", - ), - ]; - - for (url, expected_cache_url) in cases { - let url = Url::parse(url).unwrap(); - let cache_url = provider.cache_url(&url); - assert_eq!(cache_url, expected_cache_url); - } + assert_eq!( + "dummy$blah", + provider + .calculate_object_store_prefix("dummy", "blah", None) + .unwrap() + ); + } + + #[test] + fn test_calculate_object_store_scheme_not_found() { + let registry = ObjectStoreRegistry::empty(); + registry.insert("dummy", Arc::new(DummyProvider)); + let s = "Invalid user input: No object store provider found for scheme: 'dummy2'\nValid schemes: dummy"; + let result = registry + .calculate_object_store_prefix("dummy2://mybucket/my/long/path", None) + .expect_err("expected error") + .to_string(); + assert_eq!(s, &result[..s.len()]); + } + + // Test that paths without a scheme get treated as local paths. + #[test] + fn test_calculate_object_store_prefix_for_local() { + let registry = ObjectStoreRegistry::empty(); + assert_eq!( + "file", + registry + .calculate_object_store_prefix("/tmp/foobar", None) + .unwrap() + ); + } + + // Test that paths with a single-letter scheme that is not registered for anything get treated as local paths. + #[test] + fn test_calculate_object_store_prefix_for_local_windows_path() { + let registry = ObjectStoreRegistry::empty(); + assert_eq!( + "file", + registry + .calculate_object_store_prefix("c://dos/path", None) + .unwrap() + ); + } + + // Test that paths with a given scheme get mapped to that storage provider. + #[test] + fn test_calculate_object_store_prefix_for_dummy_path() { + let registry = ObjectStoreRegistry::empty(); + registry.insert("dummy", Arc::new(DummyProvider)); + assert_eq!( + "dummy$mybucket", + registry + .calculate_object_store_prefix("dummy://mybucket/my/long/path", None) + .unwrap() + ); } } diff --git a/rust/lance-io/src/object_store/providers/azure.rs b/rust/lance-io/src/object_store/providers/azure.rs index b79ca8498d0..b3461591e07 100644 --- a/rust/lance-io/src/object_store/providers/azure.rs +++ b/rust/lance-io/src/object_store/providers/azure.rs @@ -1,7 +1,12 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + str::FromStr, + sync::{Arc, LazyLock}, + time::Duration, +}; use object_store::ObjectStore as OSObjectStore; use object_store_opendal::OpendalStore; @@ -119,21 +124,68 @@ impl ObjectStoreProvider for AzureBlobStoreProvider { download_retry_count, }) } + + fn calculate_object_store_prefix( + &self, + scheme: &str, + authority: &str, + storage_options: Option<&HashMap>, + ) -> Result { + let (container, account) = match authority.find("@") { + Some(at_index) => { + // The URI looks like 'az://container@account.dfs.core.windows.net/path-part/file', + // or possibly 'az://container@account/path-part/file'. + let container = &authority[..at_index]; + let account = &authority[at_index + 1..]; + ( + container, + account.split(".").next().unwrap_or_default().to_string(), + ) + } + None => { + // The URI looks like 'az://container/path-part/file'. + // We must look at the storage options to find the account. + let mut account = match storage_options { + Some(opts) => StorageOptions::find_configured_storage_account(opts), + None => None, + }; + if account.is_none() { + account = StorageOptions::find_configured_storage_account(&ENV_OPTIONS.0); + } + let account = account.ok_or(Error::invalid_input( + "Unable to find object store prefix: no Azure account name in URI, and no storage account configured.", + location!(), + ))?; + (authority, account) + } + }; + Ok(format!("{}${}@{}", scheme, container, account)) + } } +static ENV_OPTIONS: LazyLock = LazyLock::new(StorageOptions::from_env); + impl StorageOptions { - /// Add values from the environment to storage options - pub fn with_env_azure(&mut self) { + /// Iterate over all environment variables, looking for anything related to Azure. + fn from_env() -> Self { + let mut opts = HashMap::::new(); for (os_key, os_value) in std::env::vars_os() { if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) { if let Ok(config_key) = AzureConfigKey::from_str(&key.to_ascii_lowercase()) { - if !self.0.contains_key(config_key.as_ref()) { - self.0 - .insert(config_key.as_ref().to_string(), value.to_string()); - } + opts.insert(config_key.as_ref().to_string(), value.to_string()); } } } + Self(opts) + } + + /// Add values from the environment to storage options + pub fn with_env_azure(&mut self) { + for (os_key, os_value) in &ENV_OPTIONS.0 { + if !self.0.contains_key(os_key) { + self.0.insert(os_key.clone(), os_value.clone()); + } + } } /// Subset of options relevant for azure storage @@ -146,6 +198,17 @@ impl StorageOptions { }) .collect() } + + #[allow(clippy::manual_map)] + fn find_configured_storage_account(map: &HashMap) -> Option { + if let Some(account) = map.get("azure_storage_account_name") { + Some(account.clone()) + } else if let Some(account) = map.get("account_name") { + Some(account.clone()) + } else { + None + } + } } #[cfg(test)] @@ -187,4 +250,61 @@ mod tests { .unwrap(); assert_eq!(store.scheme, "az"); } + + #[test] + fn test_find_configured_storage_account() { + assert_eq!( + Some("myaccount".to_string()), + StorageOptions::find_configured_storage_account(&HashMap::from_iter( + [ + ("access_key".to_string(), "myaccesskey".to_string()), + ( + "azure_storage_account_name".to_string(), + "myaccount".to_string() + ) + ] + .into_iter() + )) + ); + } + + #[test] + fn test_calculate_object_store_prefix_from_url_and_options() { + let provider = AzureBlobStoreProvider; + let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]); + assert_eq!( + "az$container@bob", + provider + .calculate_object_store_prefix("az", "container", Some(&options)) + .unwrap() + ); + } + + #[test] + fn test_calculate_object_store_prefix_from_url_and_ignored_options() { + let provider = AzureBlobStoreProvider; + let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]); + assert_eq!( + "az$container@account", + provider + .calculate_object_store_prefix( + "az", + "container@account.dfs.core.windows.net", + Some(&options) + ) + .unwrap() + ); + } + + #[test] + fn test_fail_to_calculate_object_store_prefix_from_url() { + let provider = AzureBlobStoreProvider; + let options = HashMap::from_iter([("access_key".to_string(), "myaccesskey".to_string())]); + let expected = "Invalid user input: Unable to find object store prefix: no Azure account name in URI, and no storage account configured."; + let result = provider + .calculate_object_store_prefix("az", "container", Some(&options)) + .expect_err("expected error") + .to_string(); + assert_eq!(expected, &result[..expected.len()]); + } } diff --git a/rust/lance-io/src/object_store/providers/local.rs b/rust/lance-io/src/object_store/providers/local.rs index 9915ae45679..89bdfa5ab62 100644 --- a/rust/lance-io/src/object_store/providers/local.rs +++ b/rust/lance-io/src/object_store/providers/local.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use crate::object_store::{ ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, DEFAULT_LOCAL_BLOCK_SIZE, @@ -48,6 +48,15 @@ impl ObjectStoreProvider for FileStoreProvider { ) }) } + + fn calculate_object_store_prefix( + &self, + scheme: &str, + _authority: &str, + _storage_options: Option<&HashMap>, + ) -> Result { + Ok(scheme.to_string()) + } } #[cfg(test)] @@ -74,6 +83,28 @@ mod tests { } } + #[test] + fn test_calculate_object_store_prefix() { + let provider = FileStoreProvider; + assert_eq!( + "file", + provider + .calculate_object_store_prefix("file", "etc", None) + .unwrap() + ); + } + + #[test] + fn test_calculate_object_store_prefix_for_file_object_store() { + let provider = FileStoreProvider; + assert_eq!( + "file-object-store", + provider + .calculate_object_store_prefix("file-object-store", "etc", None) + .unwrap() + ); + } + #[test] #[cfg(windows)] fn test_file_store_path_windows() { diff --git a/rust/lance-io/src/object_store/providers/memory.rs b/rust/lance-io/src/object_store/providers/memory.rs index f80ce410a43..b03baea104f 100644 --- a/rust/lance-io/src/object_store/providers/memory.rs +++ b/rust/lance-io/src/object_store/providers/memory.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use crate::object_store::{ ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, @@ -41,6 +41,15 @@ impl ObjectStoreProvider for MemoryStoreProvider { output.push_str(url.path()); Ok(Path::from(output)) } + + fn calculate_object_store_prefix( + &self, + _scheme: &str, + _authority: &str, + _storage_options: Option<&HashMap>, + ) -> Result { + Ok("memory".to_string()) + } } #[cfg(test)] @@ -56,4 +65,15 @@ mod tests { let expected_path = Path::from("path/to/file"); assert_eq!(path, expected_path); } + + #[test] + fn test_calculate_object_store_prefix() { + let provider = MemoryStoreProvider; + assert_eq!( + "memory", + provider + .calculate_object_store_prefix("memory", "etc", None) + .unwrap() + ); + } } diff --git a/rust/lance-io/src/utils/tracking_store.rs b/rust/lance-io/src/utils/tracking_store.rs index a1b4f3b0a77..eebce400dcc 100644 --- a/rust/lance-io/src/utils/tracking_store.rs +++ b/rust/lance-io/src/utils/tracking_store.rs @@ -32,11 +32,7 @@ impl IOTracker { } impl WrappingObjectStore for IOTracker { - fn wrap( - &self, - target: Arc, - _storage_options: Option<&std::collections::HashMap>, - ) -> Arc { + fn wrap(&self, _store_prefix: &str, target: Arc) -> Arc { Arc::new(IoTrackingStore::new(target, self.0.clone())) } } diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index c5ec80ffd69..75cf4a60996 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -667,8 +667,8 @@ mod tests { impl WrappingObjectStore for MockObjectStore { fn wrap( &self, + _storage_prefix: &str, original: Arc, - _storage_options: Option<&std::collections::HashMap>, ) -> Arc { Arc::new(ProxyObjectStore::new(original, self.policy.clone())) } diff --git a/rust/lance/src/utils/test/throttle_store.rs b/rust/lance/src/utils/test/throttle_store.rs index c78cd66583c..c146841e365 100644 --- a/rust/lance/src/utils/test/throttle_store.rs +++ b/rust/lance/src/utils/test/throttle_store.rs @@ -15,11 +15,7 @@ pub struct ThrottledStoreWrapper { } impl WrappingObjectStore for ThrottledStoreWrapper { - fn wrap( - &self, - original: Arc, - _storage_options: Option<&std::collections::HashMap>, - ) -> Arc { + fn wrap(&self, _prefix: &str, original: Arc) -> Arc { let throttle_store = ThrottledStore::new(original, self.config); Arc::new(throttle_store) }