From e7b419abc856e7c285da660ac5e46c0024476cbd Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 17 Jul 2025 17:11:38 +0800 Subject: [PATCH 1/3] feat: add native oss support for lance Signed-off-by: Xuanwo --- Cargo.lock | 103 ++++++++++++++- rust/lance-io/Cargo.toml | 3 + rust/lance-io/src/object_store/providers.rs | 4 + .../src/object_store/providers/oss.rs | 121 ++++++++++++++++++ 4 files changed, 229 insertions(+), 2 deletions(-) create mode 100644 rust/lance-io/src/object_store/providers/oss.rs diff --git a/Cargo.lock b/Cargo.lock index 771bd88bf6b..1571d31797f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1026,6 +1026,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "backon" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "302eaff5357a264a2c42f127ecb8bac761cf99749fc3dc95677e2743991f99e7" +dependencies = [ + "fastrand", + "gloo-timers", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.75" @@ -1391,8 +1402,10 @@ checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-link", ] @@ -2630,6 +2643,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", + "const-oid", "crypto-common", "subtle", ] @@ -3873,6 +3887,17 @@ version = "4.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d762194228a2f1c11063e46e32e5acb96e66e906382b9eb5441f2e0504bbd5a" +[[package]] +name = "io-uring" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b86e202f00093dcba4275d4636b93ef9dd75d025ae560d2521b45ea28ab49013" +dependencies = [ + "bitflags 2.9.1", + "cfg-if", + "libc", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -4443,6 +4468,8 @@ dependencies = [ "log", "mockall", "object_store", + "object_store_opendal", + "opendal", "path_abs", "pin-project", "pprof", @@ -5364,6 +5391,21 @@ dependencies = [ "web-time", ] +[[package]] +name = "object_store_opendal" +version = "0.52.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28eaaf0a67714fa27acb7771668c1eba78fa9b01494704fde05faa5f3e1d3d89" +dependencies = [ + "async-trait", + "bytes", + "futures", + "object_store", + "opendal", + "pin-project", + "tokio", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -5410,6 +5452,34 @@ version = "11.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" +[[package]] +name = "opendal" +version = "0.53.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f947c4efbca344c1a125753366033c8107f552b2e3f8251815ed1908f116ca3e" +dependencies = [ + "anyhow", + "async-trait", + "backon", + "base64 0.22.1", + "bytes", + "chrono", + "futures", + "getrandom 0.2.16", + "http 1.3.1", + "http-body 1.0.1", + "log", + "md-5", + "percent-encoding", + "quick-xml 0.37.5", + "reqsign", + "reqwest", + "serde", + "serde_json", + "tokio", + "uuid", +] + [[package]] name = "openssl" version = "0.10.73" @@ -6439,6 +6509,33 @@ version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" +[[package]] +name = "reqsign" +version = "0.16.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43451dbf3590a7590684c25fb8d12ecdcc90ed3ac123433e500447c7d77ed701" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.22.1", + "chrono", + "form_urlencoded", + "getrandom 0.2.16", + "hex", + "hmac", + "home", + "http 1.3.1", + "log", + "once_cell", + "percent-encoding", + "rand 0.8.5", + "reqwest", + "serde", + "serde_json", + "sha1", + "sha2", +] + [[package]] name = "reqwest" version = "0.12.20" @@ -7825,16 +7922,18 @@ dependencies = [ [[package]] name = "tokio" -version = "1.45.1" +version = "1.46.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779" +checksum = "0cc3a2344dafbe23a245241fe8b09735b521110d30fcefbbd5feb1797ca35d17" dependencies = [ "backtrace", "bytes", + "io-uring", "libc", "mio", "pin-project-lite", "signal-hook-registry", + "slab", "socket2", "tokio-macros", "windows-sys 0.52.0", diff --git a/rust/lance-io/Cargo.toml b/rust/lance-io/Cargo.toml index 244839582ab..b8282a309bc 100644 --- a/rust/lance-io/Cargo.toml +++ b/rust/lance-io/Cargo.toml @@ -14,6 +14,8 @@ rust-version.workspace = true [dependencies] object_store = { workspace = true } +opendal = { version = "0.53", optional = true, features = ["services-oss"] } +object_store_opendal = { version = "0.52", optional = true } lance-arrow.workspace = true lance-core.workspace = true arrow = { workspace = true, features = ["ffi"] } @@ -66,6 +68,7 @@ gcs-test = [] gcp = ["object_store/gcp"] aws = ["object_store/aws", "aws-config", "aws-credential-types"] azure = ["object_store/azure"] +oss = ["opendal", "object_store_opendal"] [lints] workspace = true diff --git a/rust/lance-io/src/object_store/providers.rs b/rust/lance-io/src/object_store/providers.rs index d07e4719ba2..10185f1e2e7 100644 --- a/rust/lance-io/src/object_store/providers.rs +++ b/rust/lance-io/src/object_store/providers.rs @@ -21,6 +21,8 @@ pub mod azure; pub mod gcp; pub mod local; pub mod memory; +#[cfg(feature = "oss")] +pub mod oss; #[async_trait::async_trait] pub trait ObjectStoreProvider: std::fmt::Debug + Sync + Send { @@ -237,6 +239,8 @@ impl Default for ObjectStoreRegistry { providers.insert("az".into(), Arc::new(azure::AzureBlobStoreProvider)); #[cfg(feature = "gcp")] providers.insert("gs".into(), Arc::new(gcp::GcsStoreProvider)); + #[cfg(feature = "oss")] + providers.insert("oss".into(), Arc::new(oss::OssStoreProvider)); Self { providers: RwLock::new(providers), active_stores: RwLock::new(HashMap::new()), diff --git a/rust/lance-io/src/object_store/providers/oss.rs b/rust/lance-io/src/object_store/providers/oss.rs new file mode 100644 index 00000000000..8e70302a5db --- /dev/null +++ b/rust/lance-io/src/object_store/providers/oss.rs @@ -0,0 +1,121 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::collections::HashMap; +use std::sync::Arc; + +use object_store_opendal::OpendalStore; +use opendal::{services::Oss, Operator}; +use snafu::location; +use url::Url; + +use crate::object_store::{ + ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, DEFAULT_CLOUD_BLOCK_SIZE, + DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, +}; +use lance_core::error::{Error, Result}; + +#[derive(Default, Debug)] +pub struct OssStoreProvider; + +#[async_trait::async_trait] +impl ObjectStoreProvider for OssStoreProvider { + async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result { + let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE); + let storage_options = StorageOptions(params.storage_options.clone().unwrap_or_default()); + + let bucket = base_path + .host_str() + .ok_or_else(|| Error::invalid_input("OSS URL must contain bucket name", location!()))? + .to_string(); + + let prefix = base_path.path().trim_start_matches('/').to_string(); + + // Start with environment variables as base configuration + let mut config_map: HashMap = std::env::vars() + .filter(|(k, _)| { + k.starts_with("OSS_") || k.starts_with("AWS_") || k.starts_with("ALIBABA_CLOUD_") + }) + .map(|(k, v)| { + // Convert env var names to opendal config keys + let key = k + .to_lowercase() + .replace("oss_", "") + .replace("aws_", "") + .replace("alibaba_cloud_", ""); + (key, v) + }) + .collect(); + + config_map.insert("bucket".to_string(), bucket); + + if !prefix.is_empty() { + config_map.insert("root".to_string(), format!("/{}", prefix)); + } + + // Override with storage options if provided + if let Some(endpoint) = storage_options.0.get("oss_endpoint") { + config_map.insert("endpoint".to_string(), endpoint.clone()); + } + + if let Some(access_key_id) = storage_options.0.get("oss_access_key_id") { + config_map.insert("access_key_id".to_string(), access_key_id.clone()); + } + + if let Some(secret_access_key) = storage_options.0.get("oss_secret_access_key") { + config_map.insert("access_key_secret".to_string(), secret_access_key.clone()); + } + + if let Some(region) = storage_options.0.get("oss_region") { + config_map.insert("region".to_string(), region.clone()); + } + + if !config_map.contains_key("endpoint") { + return Err(Error::invalid_input( + "OSS endpoint is required. Please provide 'oss_endpoint' in storage options or set OSS_ENDPOINT environment variable", + location!(), + )); + } + + let operator = Operator::from_iter::(config_map) + .map_err(|e| { + Error::invalid_input(format!("Failed to create OSS operator: {}", e), location!()) + })? + .finish(); + + let opendal_store = Arc::new(OpendalStore::new(operator)); + + let mut url = base_path; + if !url.path().ends_with('/') { + url.set_path(&format!("{}/", url.path())); + } + + Ok(ObjectStore { + scheme: "oss".to_string(), + inner: opendal_store, + block_size, + max_iop_size: *DEFAULT_MAX_IOP_SIZE, + use_constant_size_upload_parts: params.use_constant_size_upload_parts, + list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or(true), + io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, + download_retry_count: storage_options.download_retry_count(), + }) + } +} + +#[cfg(test)] +mod tests { + use super::OssStoreProvider; + use crate::object_store::ObjectStoreProvider; + use url::Url; + + #[test] + fn test_oss_store_path() { + let provider = OssStoreProvider; + + let url = Url::parse("oss://bucket/path/to/file").unwrap(); + let path = provider.extract_path(&url); + let expected_path = object_store::path::Path::from("/path/to/file"); + assert_eq!(path, expected_path); + } +} From f26c46e1110f22cd3d044ff35c3ae6e8c1b0d8c4 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 17 Jul 2025 17:15:04 +0800 Subject: [PATCH 2/3] tune cargo toml Signed-off-by: Xuanwo --- Cargo.lock | 9 ++++----- Cargo.toml | 2 ++ rust/lance-io/Cargo.toml | 6 +++--- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1571d31797f..033472b6ee7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5393,9 +5393,9 @@ dependencies = [ [[package]] name = "object_store_opendal" -version = "0.52.1" +version = "0.54.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28eaaf0a67714fa27acb7771668c1eba78fa9b01494704fde05faa5f3e1d3d89" +checksum = "5ce697ee723fdc3eaf6c457abf4059034be15167022b18b619993802cd1443d5" dependencies = [ "async-trait", "bytes", @@ -5454,12 +5454,11 @@ checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" [[package]] name = "opendal" -version = "0.53.3" +version = "0.54.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f947c4efbca344c1a125753366033c8107f552b2e3f8251815ed1908f116ca3e" +checksum = "ffb9838d0575c6dbaf3fcec7255af8d5771996d4af900bbb6fa9a314dec00a1a" dependencies = [ "anyhow", - "async-trait", "backon", "base64 0.22.1", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 9bc79e142f3..0ab118ff1b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -129,6 +129,8 @@ mock_instant = { version = "0.3.1", features = ["sync"] } moka = { version = "0.12", features = ["future", "sync"] } num-traits = "0.2" object_store = { version = "0.12.2" } +opendal = { version = "0.54" } +object_store_opendal = { version = "0.54" } pin-project = "1.0" path_abs = "0.5" pprof = { version = "0.14.0", features = ["flamegraph", "criterion"] } diff --git a/rust/lance-io/Cargo.toml b/rust/lance-io/Cargo.toml index b8282a309bc..84763269b54 100644 --- a/rust/lance-io/Cargo.toml +++ b/rust/lance-io/Cargo.toml @@ -14,8 +14,8 @@ rust-version.workspace = true [dependencies] object_store = { workspace = true } -opendal = { version = "0.53", optional = true, features = ["services-oss"] } -object_store_opendal = { version = "0.52", optional = true } +opendal = { workspace = true, optional = true } +object_store_opendal = { workspace = true, optional = true } lance-arrow.workspace = true lance-core.workspace = true arrow = { workspace = true, features = ["ffi"] } @@ -68,7 +68,7 @@ gcs-test = [] gcp = ["object_store/gcp"] aws = ["object_store/aws", "aws-config", "aws-credential-types"] azure = ["object_store/azure"] -oss = ["opendal", "object_store_opendal"] +oss = ["opendal/services-oss", "object_store_opendal"] [lints] workspace = true From e6871c82741836c823b3a4e0ecec717fd315191a Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 17 Jul 2025 18:19:38 +0800 Subject: [PATCH 3/3] add new feature Signed-off-by: Xuanwo --- rust/lance/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index f783f4db471..c61566bda8c 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -124,6 +124,7 @@ protoc = [ aws = ["lance-io/aws"] gcp = ["lance-io/gcp"] azure = ["lance-io/azure"] +oss = ["lance-io/oss"] [[bin]] name = "lq"