Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 100 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ mock_instant = { version = "0.3.1", features = ["sync"] }
moka = { version = "0.12", features = ["future", "sync"] }
num-traits = "0.2"
object_store = { version = "0.12.2" }
opendal = { version = "0.54" }
object_store_opendal = { version = "0.54" }
pin-project = "1.0"
path_abs = "0.5"
pprof = { version = "0.14.0", features = ["flamegraph", "criterion"] }
Expand Down
3 changes: 3 additions & 0 deletions rust/lance-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ rust-version.workspace = true
[dependencies]

object_store = { workspace = true }
opendal = { workspace = true, optional = true }
object_store_opendal = { workspace = true, optional = true }
lance-arrow.workspace = true
lance-core.workspace = true
arrow = { workspace = true, features = ["ffi"] }
Expand Down Expand Up @@ -66,6 +68,7 @@ gcs-test = []
gcp = ["object_store/gcp"]
aws = ["object_store/aws", "aws-config", "aws-credential-types"]
azure = ["object_store/azure"]
oss = ["opendal/services-oss", "object_store_opendal"]

[lints]
workspace = true
4 changes: 4 additions & 0 deletions rust/lance-io/src/object_store/providers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub mod azure;
pub mod gcp;
pub mod local;
pub mod memory;
#[cfg(feature = "oss")]
pub mod oss;

#[async_trait::async_trait]
pub trait ObjectStoreProvider: std::fmt::Debug + Sync + Send {
Expand Down Expand Up @@ -237,6 +239,8 @@ impl Default for ObjectStoreRegistry {
providers.insert("az".into(), Arc::new(azure::AzureBlobStoreProvider));
#[cfg(feature = "gcp")]
providers.insert("gs".into(), Arc::new(gcp::GcsStoreProvider));
#[cfg(feature = "oss")]
providers.insert("oss".into(), Arc::new(oss::OssStoreProvider));
Self {
providers: RwLock::new(providers),
active_stores: RwLock::new(HashMap::new()),
Expand Down
121 changes: 121 additions & 0 deletions rust/lance-io/src/object_store/providers/oss.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::collections::HashMap;
use std::sync::Arc;

use object_store_opendal::OpendalStore;
use opendal::{services::Oss, Operator};
use snafu::location;
use url::Url;

use crate::object_store::{
ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, DEFAULT_CLOUD_BLOCK_SIZE,
DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE,
};
use lance_core::error::{Error, Result};

#[derive(Default, Debug)]
pub struct OssStoreProvider;

#[async_trait::async_trait]
impl ObjectStoreProvider for OssStoreProvider {
async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE);
let storage_options = StorageOptions(params.storage_options.clone().unwrap_or_default());

let bucket = base_path
.host_str()
.ok_or_else(|| Error::invalid_input("OSS URL must contain bucket name", location!()))?
.to_string();

let prefix = base_path.path().trim_start_matches('/').to_string();

// Start with environment variables as base configuration
let mut config_map: HashMap<String, String> = std::env::vars()
.filter(|(k, _)| {
k.starts_with("OSS_") || k.starts_with("AWS_") || k.starts_with("ALIBABA_CLOUD_")
})
.map(|(k, v)| {
// Convert env var names to opendal config keys
let key = k
.to_lowercase()
.replace("oss_", "")
.replace("aws_", "")
.replace("alibaba_cloud_", "");
(key, v)
})
.collect();

config_map.insert("bucket".to_string(), bucket);

if !prefix.is_empty() {
config_map.insert("root".to_string(), format!("/{}", prefix));
}

// Override with storage options if provided
if let Some(endpoint) = storage_options.0.get("oss_endpoint") {
config_map.insert("endpoint".to_string(), endpoint.clone());
}

if let Some(access_key_id) = storage_options.0.get("oss_access_key_id") {
config_map.insert("access_key_id".to_string(), access_key_id.clone());
}

if let Some(secret_access_key) = storage_options.0.get("oss_secret_access_key") {
config_map.insert("access_key_secret".to_string(), secret_access_key.clone());
}

if let Some(region) = storage_options.0.get("oss_region") {
config_map.insert("region".to_string(), region.clone());
}

if !config_map.contains_key("endpoint") {
return Err(Error::invalid_input(
"OSS endpoint is required. Please provide 'oss_endpoint' in storage options or set OSS_ENDPOINT environment variable",
location!(),
));
}

let operator = Operator::from_iter::<Oss>(config_map)
.map_err(|e| {
Error::invalid_input(format!("Failed to create OSS operator: {}", e), location!())
})?
.finish();

let opendal_store = Arc::new(OpendalStore::new(operator));

let mut url = base_path;
if !url.path().ends_with('/') {
url.set_path(&format!("{}/", url.path()));
}

Ok(ObjectStore {
scheme: "oss".to_string(),
inner: opendal_store,
block_size,
max_iop_size: *DEFAULT_MAX_IOP_SIZE,
use_constant_size_upload_parts: params.use_constant_size_upload_parts,
list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or(true),
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count: storage_options.download_retry_count(),
})
}
}

#[cfg(test)]
mod tests {
use super::OssStoreProvider;
use crate::object_store::ObjectStoreProvider;
use url::Url;

#[test]
fn test_oss_store_path() {
let provider = OssStoreProvider;

let url = Url::parse("oss://bucket/path/to/file").unwrap();
let path = provider.extract_path(&url);
let expected_path = object_store::path::Path::from("/path/to/file");
assert_eq!(path, expected_path);
}
}
1 change: 1 addition & 0 deletions rust/lance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ protoc = [
aws = ["lance-io/aws"]
gcp = ["lance-io/gcp"]
azure = ["lance-io/azure"]
oss = ["lance-io/oss"]

[[bin]]
name = "lq"
Expand Down
Loading