Skip to content
Merged
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ exclude = ["python", "java/lance-jni"]
# Python package needs to be built by maturin.
resolver = "2"


[workspace.package]
version = "0.40.0-beta.1"
edition = "2021"
Expand Down
10 changes: 7 additions & 3 deletions java/src/test/java/com/lancedb/lance/FileReaderWriterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,18 @@ void testWriteNoData(@TempDir Path tempDir) throws Exception {
}

@Test
void testWriteWithStorage(@TempDir Path tempDir) {
void testWriteWithStorage(@TempDir Path tempDir) throws IOException {
String filePath = "az://fail_bucket" + tempDir.resolve("test_write_with_storage");
BufferAllocator allocator = new RootAllocator();
Map<String, String> storageOptions = new HashMap<>();
try {
LanceFileWriter.open(filePath, allocator, null, storageOptions);
} catch (IOException e) {
assertTrue(e.getMessage().contains("Account must be specified"));
} catch (IllegalArgumentException e) {
assertTrue(
e.getMessage()
.contains(
"Unable to find object store prefix: no Azure account "
+ "name in URI, and no storage account configured."));
}

storageOptions.put("account_name", "some_account");
Expand Down
34 changes: 18 additions & 16 deletions rust/lance-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,9 @@ impl std::fmt::Display for ObjectStore {
pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
/// Wrap an object store with additional functionality
///
/// The storage_options contain namespace information (e.g., azure_storage_account_name)
/// that wrappers may need for proper isolation
fn wrap(
&self,
original: Arc<dyn OSObjectStore>,
storage_options: Option<&HashMap<String, String>>,
) -> Arc<dyn OSObjectStore>;
/// The store_prefix is a string which uniquely identifies the object
/// store being wrapped.
fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
}

#[derive(Debug, Clone)]
Expand All @@ -170,14 +166,10 @@ impl ChainedWrappingObjectStore {
}

impl WrappingObjectStore for ChainedWrappingObjectStore {
fn wrap(
&self,
original: Arc<dyn OSObjectStore>,
storage_options: Option<&HashMap<String, String>>,
) -> Arc<dyn OSObjectStore> {
fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
self.wrappers
.iter()
.fold(original, |acc, wrapper| wrapper.wrap(acc, storage_options))
.fold(original, |acc, wrapper| wrapper.wrap(store_prefix, acc))
}
}

Expand Down Expand Up @@ -353,8 +345,10 @@ impl ObjectStore {
#[allow(deprecated)]
if let Some((store, path)) = params.object_store.as_ref() {
let mut inner = store.clone();
let store_prefix =
registry.calculate_object_store_prefix(uri, params.storage_options.as_ref())?;
if let Some(wrapper) = params.object_store_wrapper.as_ref() {
inner = wrapper.wrap(inner, params.storage_options.as_ref());
inner = wrapper.wrap(&store_prefix, inner);
}
let store = Self {
inner,
Expand Down Expand Up @@ -734,6 +728,9 @@ impl From<HashMap<String, String>> for StorageOptions {
}
}

static DEFAULT_OBJECT_STORE_REGISTRY: std::sync::LazyLock<ObjectStoreRegistry> =
std::sync::LazyLock::new(ObjectStoreRegistry::default);

impl ObjectStore {
#[allow(clippy::too_many_arguments)]
pub fn new(
Expand All @@ -751,7 +748,12 @@ impl ObjectStore {
let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));

let store = match wrapper {
Some(wrapper) => wrapper.wrap(store, storage_options),
Some(wrapper) => {
let store_prefix = DEFAULT_OBJECT_STORE_REGISTRY
.calculate_object_store_prefix(location.as_ref(), storage_options)
.unwrap();
wrapper.wrap(&store_prefix, store)
}
None => store,
};

Expand Down Expand Up @@ -984,8 +986,8 @@ mod tests {
impl WrappingObjectStore for TestWrapper {
fn wrap(
&self,
_store_prefix: &str,
_original: Arc<dyn OSObjectStore>,
_storage_options: Option<&HashMap<String, String>>,
) -> Arc<dyn OSObjectStore> {
self.called.store(true, Ordering::Relaxed);

Expand Down
199 changes: 143 additions & 56 deletions rust/lance-io/src/object_store/providers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,25 @@ pub trait ObjectStoreProvider: std::fmt::Debug + Sync + Send {
})
}

/// Generate a cache URL for this provider.
/// Calculate the unique prefix that should be used for this object store.
///
/// Providers can override this to implement custom cache key generation
/// that takes into account provider-specific requirements like namespace
/// isolation.
fn cache_url(&self, url: &Url) -> String {
if ["file", "file-object-store", "memory"].contains(&url.scheme()) {
// For file URLs, cache the URL without the path.
// The path can be different for different object stores,
// but we want to cache the object store itself.
format!("{}://", url.scheme())
} else {
// Bucket is parsed as domain, so drop the path.
let mut url = url.clone();
url.set_path("");
url.to_string()
}
/// For object stores that don't have the concept of buckets, this will just be something like
/// 'file' or 'memory'.
///
/// In object stores where all bucket names are unique, like s3, this will be
/// simply 's3$my_bucket_name' or similar.
///
/// In Azure, only the combination of (account name, container name) is unique, so
/// this will be something like 'az$account_name@container'
///
/// Providers should override this if they have special requirements like Azure's.
fn calculate_object_store_prefix(
&self,
scheme: &str,
authority: &str,
_storage_options: Option<&HashMap<String, String>>,
) -> Result<String> {
Ok(format!("{}${}", scheme, authority))
}
}

Expand Down Expand Up @@ -141,6 +143,15 @@ impl ObjectStoreRegistry {
output
}

fn scheme_not_found_error(&self, scheme: &str) -> Error {
let mut message = format!("No object store provider found for scheme: '{}'", scheme);
if let Ok(providers) = self.providers.read() {
let valid_schemes = providers.keys().cloned().collect::<Vec<_>>().join(", ");
message.push_str(&format!("\nValid schemes: {}", valid_schemes));
}
Error::invalid_input(message, location!())
}

/// Get an object store for a given base path and parameters.
///
/// If the object store is already in use, it will return a strong reference
Expand All @@ -153,16 +164,15 @@ impl ObjectStoreRegistry {
) -> Result<Arc<ObjectStore>> {
let scheme = base_path.scheme();
let Some(provider) = self.get_provider(scheme) else {
let mut message = format!("No object store provider found for scheme: '{}'", scheme);
if let Ok(providers) = self.providers.read() {
let valid_schemes = providers.keys().cloned().collect::<Vec<_>>().join(", ");
message.push_str(&format!("\nValid schemes: {}", valid_schemes));
}
return Err(Error::invalid_input(message, location!()));
return Err(self.scheme_not_found_error(scheme));
};

let cache_path = provider.cache_url(&base_path);
let cache_key = (cache_path, params.clone());
let cache_path = provider.calculate_object_store_prefix(
base_path.scheme(),
base_path.authority(),
params.storage_options.as_ref(),
)?;
let cache_key = (cache_path.clone(), params.clone());

// Check if we have a cached store for this base path and params
{
Expand Down Expand Up @@ -197,7 +207,7 @@ impl ObjectStoreRegistry {
store.inner = store.inner.traced();

if let Some(wrapper) = &params.object_store_wrapper {
store.inner = wrapper.wrap(store.inner, params.storage_options.as_ref());
store.inner = wrapper.wrap(&cache_path, store.inner);
}

let store = Arc::new(store);
Expand All @@ -210,6 +220,45 @@ impl ObjectStoreRegistry {

Ok(store)
}

/// Calculate the datastore prefix based on the URI and the storage options.
/// The data store prefix should uniquely identify the datastore.
pub fn calculate_object_store_prefix(
&self,
uri: &str,
storage_options: Option<&HashMap<String, String>>,
) -> Result<String> {
let (scheme, authority) = match uri.find("://") {
None => {
// If there is no scheme, this is a file:// URI.
return Ok("file".to_string());
}
Some(index) => {
let scheme = &uri[..index];
let remainder = &uri[index + 3..];
let authority = match remainder.find("/") {
None => remainder,
Some(sindex) => &remainder[..sindex],
};
(scheme, authority)
}
};
match self.get_provider(scheme) {
None => {
if scheme.len() == 1 {
// On Windows, drive letters such as C:/ can sometimes be confused for schemes.
// So if there is no known object store for this single-letter scheme, treat it
// as the local store.
Ok("file".to_string())
} else {
Err(self.scheme_not_found_error(scheme))
}
}
Some(provider) => {
provider.calculate_object_store_prefix(scheme, authority, storage_options)
}
}
}
}

impl Default for ObjectStoreRegistry {
Expand Down Expand Up @@ -262,39 +311,77 @@ impl ObjectStoreRegistry {
mod tests {
use super::*;

#[test]
fn test_cache_url() {
// Test the default cache_url implementation using a dummy provider
#[derive(Debug)]
struct DummyProvider;

#[async_trait::async_trait]
impl ObjectStoreProvider for DummyProvider {
async fn new_store(
&self,
_base_path: Url,
_params: &ObjectStoreParams,
) -> Result<ObjectStore> {
unreachable!("This test doesn't create stores")
}
#[derive(Debug)]
struct DummyProvider;

#[async_trait::async_trait]
impl ObjectStoreProvider for DummyProvider {
async fn new_store(
&self,
_base_path: Url,
_params: &ObjectStoreParams,
) -> Result<ObjectStore> {
unreachable!("This test doesn't create stores")
}
}

#[test]
fn test_calculate_object_store_prefix() {
let provider = DummyProvider;
let cases = [
("s3://bucket/path?param=value", "s3://bucket?param=value"),
("file:///path/to/file", "file://"),
("file-object-store:///path/to/file", "file-object-store://"),
("memory:///", "memory://"),
(
"http://example.com/path?param=value",
"http://example.com/?param=value",
),
];

for (url, expected_cache_url) in cases {
let url = Url::parse(url).unwrap();
let cache_url = provider.cache_url(&url);
assert_eq!(cache_url, expected_cache_url);
}
assert_eq!(
"dummy$blah",
provider
.calculate_object_store_prefix("dummy", "blah", None)
.unwrap()
);
}

#[test]
fn test_calculate_object_store_scheme_not_found() {
let registry = ObjectStoreRegistry::empty();
registry.insert("dummy", Arc::new(DummyProvider));
let s = "Invalid user input: No object store provider found for scheme: 'dummy2'\nValid schemes: dummy";
let result = registry
.calculate_object_store_prefix("dummy2://mybucket/my/long/path", None)
.expect_err("expected error")
.to_string();
assert_eq!(s, &result[..s.len()]);
}

// Test that paths without a scheme get treated as local paths.
#[test]
fn test_calculate_object_store_prefix_for_local() {
let registry = ObjectStoreRegistry::empty();
assert_eq!(
"file",
registry
.calculate_object_store_prefix("/tmp/foobar", None)
.unwrap()
);
}

// Test that paths with a single-letter scheme that is not registered for anything get treated as local paths.
#[test]
fn test_calculate_object_store_prefix_for_local_windows_path() {
let registry = ObjectStoreRegistry::empty();
assert_eq!(
"file",
registry
.calculate_object_store_prefix("c://dos/path", None)
.unwrap()
);
}

// Test that paths with a given scheme get mapped to that storage provider.
#[test]
fn test_calculate_object_store_prefix_for_dummy_path() {
let registry = ObjectStoreRegistry::empty();
registry.insert("dummy", Arc::new(DummyProvider));
assert_eq!(
"dummy$mybucket",
registry
.calculate_object_store_prefix("dummy://mybucket/my/long/path", None)
.unwrap()
);
}
}
Loading
Loading