From f62bc22c453ea4be5f5946ccb4ffdc83548a1977 Mon Sep 17 00:00:00 2001 From: GXD Date: Wed, 19 Jul 2023 15:27:07 +0800 Subject: [PATCH 1/6] feat(services/etcd): introduce new service backend etcd --- Cargo.lock | 160 +++++++++++++++++- bin/oli/Cargo.toml | 2 + bin/oli/src/config/mod.rs | 5 + core/Cargo.toml | 2 + core/src/services/etcd/backend.rs | 259 +++++++++++++++++++++++++++++ core/src/services/etcd/docs.md | 43 +++++ core/src/services/etcd/mod.rs | 19 +++ core/src/services/mod.rs | 5 + core/src/types/operator/builder.rs | 2 + core/src/types/scheme.rs | 4 + 10 files changed, 500 insertions(+), 1 deletion(-) create mode 100644 core/src/services/etcd/backend.rs create mode 100644 core/src/services/etcd/docs.md create mode 100644 core/src/services/etcd/mod.rs diff --git a/Cargo.lock b/Cargo.lock index a9170c01aeaf..5836a8aab614 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -500,7 +500,7 @@ dependencies = [ "lazy_static", "lazycell", "peeking_take_while", - "prettyplease", + "prettyplease 0.2.9", "proc-macro2", "quote", "regex", @@ -1361,6 +1361,22 @@ dependencies = [ "version_check", ] +[[package]] +name = "etcd-client" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4b0ea5ef6dc2388a4b1669fa32097249bc03a15417b97cb75e38afb309e4a89" +dependencies = [ + "http", + "prost", + "tokio", + "tokio-stream", + "tonic", + "tonic-build", + "tower", + "tower-service", +] + [[package]] name = "event-listener" version = "2.5.3" @@ -1376,6 +1392,12 @@ dependencies = [ "instant", ] +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flagset" version = "0.4.3" @@ -1851,6 +1873,18 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -2572,6 +2606,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + [[package]] name = "naive-timer" version = "0.2.0" @@ -2885,6 +2925,7 @@ dependencies = [ "dashmap", "dirs 5.0.1", "dotenvy", + "etcd-client", "flagset", "futures", "governor", @@ -3418,6 +3459,16 @@ dependencies = [ "zigzag", ] +[[package]] +name = "petgraph" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dd7d28ee937e54fe3080c91faa1c3a46c06de6252988a7f4592ba2310ef22a4" +dependencies = [ + "fixedbitset", + "indexmap 1.9.3", +] + [[package]] name = "pin-project" version = "1.1.2" @@ -3585,6 +3636,16 @@ dependencies = [ "yansi", ] +[[package]] +name = "prettyplease" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c8646e95016a7a6c4adea95bafa8a16baab64b583356217f2c85db4a39d9a86" +dependencies = [ + "proc-macro2", + "syn 1.0.109", +] + [[package]] name = "prettyplease" version = "0.2.9" @@ -3674,6 +3735,28 @@ dependencies = [ "prost-derive", ] +[[package]] +name = "prost-build" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" +dependencies = [ + "bytes", + "heck", + "itertools", + "lazy_static", + "log", + "multimap", + "petgraph", + "prettyplease 0.1.25", + "prost", + "prost-types", + "regex", + "syn 1.0.109", + "tempfile", + "which", +] + [[package]] name = "prost-derive" version = "0.11.9" @@ -3687,6 +3770,15 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "prost-types" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" +dependencies = [ + "prost", +] + [[package]] name = "protobuf" version = "2.28.0" @@ -5046,6 +5138,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-io-utility" version = "0.7.6" @@ -5165,6 +5267,47 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" +dependencies = [ + "async-trait", + "axum", + "base64 0.21.2", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6fdaae4c2c638bb70fe42803a26fbd6fc6ac8c72f5c59f67ecc2a2dcabf4b07" +dependencies = [ + "prettyplease 0.1.25", + "proc-macro2", + "prost-build", + "quote", + "syn 1.0.109", +] + [[package]] name = "tower" version = "0.4.13" @@ -5173,9 +5316,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project", "pin-project-lite", + "rand 0.8.5", + "slab", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -5652,6 +5799,17 @@ dependencies = [ "webpki", ] +[[package]] +name = "which" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2441c784c52b289a054b7201fc93253e288f094e2f4be9058343127c4226a269" +dependencies = [ + "either", + "libc", + "once_cell", +] + [[package]] name = "widestring" version = "1.0.2" diff --git a/bin/oli/Cargo.toml b/bin/oli/Cargo.toml index dbfa7cc053f9..c62408ccb2ba 100644 --- a/bin/oli/Cargo.toml +++ b/bin/oli/Cargo.toml @@ -50,6 +50,8 @@ services-redis = ["opendal/services-redis"] services-rocksdb = ["opendal/services-rocksdb"] # Enable services sled support services-sled = ["opendal/services-sled"] +# Enable services etcd support +services-etcd = ["opendal/services-etcd"] [dependencies] anyhow = "1" diff --git a/bin/oli/src/config/mod.rs b/bin/oli/src/config/mod.rs index 2a155e436c67..41188fd30fea 100644 --- a/bin/oli/src/config/mod.rs +++ b/bin/oli/src/config/mod.rs @@ -253,6 +253,11 @@ impl Config { Operator::from_map::(profile.clone())?.finish(), path, )), + #[cfg(feature = "services-etcd")] + Scheme::Etcd => Ok(( + Operator::from_map::(profile.clone())?.finish(), + path, + )), _ => Err(anyhow!( "unknown type '{}' in profile '{}'", scheme, diff --git a/core/Cargo.toml b/core/Cargo.toml index ee6f2995efb0..c6be981565df 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -161,6 +161,7 @@ services-wasabi = [ ] services-webdav = [] services-webhdfs = [] +services-etcd = ["dep:etcd-client"] [lib] bench = false @@ -190,6 +191,7 @@ cacache = { version = "11.6", default-features = false, features = [ chrono = "0.4.26" dashmap = { version = "5.4", optional = true } dirs = { version = "5.0.1", optional = true } +etcd-client = { version = "0.11", optional = true} flagset = "0.4" futures = { version = "0.3", default-features = false, features = ["std"] } governor = { version = "0.5", optional = true, features = ["std"] } diff --git a/core/src/services/etcd/backend.rs b/core/src/services/etcd/backend.rs new file mode 100644 index 000000000000..f54fe080cbd4 --- /dev/null +++ b/core/src/services/etcd/backend.rs @@ -0,0 +1,259 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; + +use async_trait::async_trait; +use etcd_client::Error as EtcdError; +use etcd_client::{Client, ConnectOptions, GetOptions}; +use tokio::sync::OnceCell; + +use crate::raw::adapters::kv; +use crate::raw::*; +use crate::*; + +const DEFAULT_ETCD_ENDPOINTS: &str = "http://127.0.0.1:2379"; + +/// [Etcd](https://etcd.io/) services support. +#[doc = include_str!("docs.md")] +#[derive(Clone, Default)] +pub struct EtcdBuilder { + /// network address of the Etcd services. Can be "http://127.0.0.1:23790,http://127.0.0.1:23791,http://127.0.0.1:23792", e.g. + /// + /// default is "http://127.0.0.1:2379" + endpoints: Option, + /// the username to connect etcd service. + /// + /// default is None + username: Option, + /// the password for authentication + /// + /// default is None + password: Option, + /// the working directory of the etcd service. Can be "/path/to/dir" + /// + /// default is "/" + root: Option, + +} + +impl Debug for EtcdBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("Builder"); + ds.field("root", &self.root); + if let Some(endpoints) = self.endpoints.clone() { + ds.field("endpoints", &endpoints); + } + if let Some(username) = self.username.clone() { + ds.field("username", &username); + } + if self.password.is_some() { + ds.field("password", &""); + } + ds.finish() + } +} + +impl EtcdBuilder { + /// set the network address of etcd service. + /// + /// default: "http://127.0.0.1:2379" + pub fn endpoints(&mut self, endpoints: &str) -> &mut Self { + if !endpoints.is_empty() { + self.endpoints = Some(endpoints.to_owned()); + } + self + } + + /// set the username for etcd + /// + /// default: no username + pub fn username(&mut self, username: &str) -> &mut Self { + if !username.is_empty() { + self.username = Some(username.to_owned()); + } + self + } + + /// set the password for etcd + /// + /// default: no password + pub fn password(&mut self, password: &str) -> &mut Self { + if !password.is_empty() { + self.password = Some(password.to_owned()); + } + self + } + + /// set the working directory, all operations will be performed under it. + /// + /// default: "/" + pub fn root(&mut self, root: &str) -> &mut Self { + if !root.is_empty() { + self.root = Some(root.to_owned()); + } + self + } +} + +impl Builder for EtcdBuilder { + const SCHEME: Scheme = Scheme::Etcd; + type Accessor = EtcdBackend; + + fn from_map(map: HashMap) -> Self { + let mut builder = EtcdBuilder::default(); + + map.get("root").map(|v| builder.root(v)); + map.get("endpoints").map(|v| builder.endpoints(v)); + map.get("username").map(|v| builder.username(v)); + map.get("password").map(|v| builder.password(v)); + + builder + } + + fn build(&mut self) -> Result { + let endpoints = self + .endpoints + .clone() + .unwrap_or_else(|| DEFAULT_ETCD_ENDPOINTS.to_string()); + + let endpoints: Vec = endpoints.split(",").map(|s| s.to_string()).collect(); + let mut options = ConnectOptions::new(); + if let Some(username) = self.username.clone() { + options = options.with_user(username.to_owned(), self.password.clone().unwrap_or("".to_string())); + } + + let root = normalize_root( + self.root + .clone() + .unwrap_or_else(|| "/".to_string()) + .as_str(), + ); + + let client = OnceCell::new(); + Ok(EtcdBackend::new(Adapter { + root, + endpoints, + client, + options, + })) + } +} + +/// Backend for etcd services. +pub type EtcdBackend = kv::Backend; + +#[derive(Clone)] +pub struct Adapter { + root: String, + endpoints: Vec, + client: OnceCell, + options: ConnectOptions, +} + +// implement `Debug` manually, or password may be leaked. +impl Debug for Adapter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("Adapter"); + + ds.field("root", &self.root); + ds.field("endpoints", &self.endpoints.join(",")); + ds.field("options", &self.options.clone()); + ds.finish() + } +} + +impl Adapter { + async fn conn(&self) -> Result { + Ok(self + .client + .get_or_try_init(|| async { + Client::connect(self.endpoints.clone(), Some(self.options.clone())).await + }) + .await? + .clone()) + } +} + +#[async_trait] +impl kv::Adapter for Adapter { + fn metadata(&self) -> kv::Metadata { + kv::Metadata::new( + Scheme::Etcd, + &self.endpoints.join(","), + Capability { + read: true, + write: true, + create_dir: true, + list: true, + + ..Default::default() + }, + ) + } + + async fn get(&self, key: &str) -> Result>> { + let p = build_rooted_abs_path(&self.root, key); + let mut client = self.conn().await?; + let resp = client.get(p, None).await?; + if let Some(kv) = resp.kvs().first() { + Ok(Some(kv.value().to_vec())) + } else { + Ok(None) + } + } + + async fn set(&self, key: &str, value: &[u8]) -> Result<()> { + let p = build_rooted_abs_path(&self.root, key); + let mut client = self.conn().await?; + let _ = client.put(p, value, None).await?; + Ok(()) + } + + async fn delete(&self, key: &str) -> Result<()> { + let p = build_rooted_abs_path(&self.root, key); + let mut client = self.conn().await?; + let _ = client.delete(p, None).await?; + Ok(()) + } + + async fn scan(&self, path: &str) -> Result> { + let p = build_rooted_abs_path(&self.root, path); + let mut client = self.conn().await?; + let get_options = Some(GetOptions::new().with_prefix().with_keys_only()); + let resp = client.get(p, get_options).await?; + let mut res = Vec::default(); + for kv in resp.kvs() { + res.push(kv.key_str().map(|e| String::from(e)).map_err(|err| { + Error::new(ErrorKind::Unexpected, "store key is not valid utf-8 string") + .set_source(err) + })?); + } + + Ok(res) + } +} + +impl From for Error { + fn from(e: EtcdError) -> Self { + Error::new(ErrorKind::Unexpected, e.to_string().as_str()) + .set_source(e) + .set_temporary() + } +} diff --git a/core/src/services/etcd/docs.md b/core/src/services/etcd/docs.md new file mode 100644 index 000000000000..50b2c8bfa337 --- /dev/null +++ b/core/src/services/etcd/docs.md @@ -0,0 +1,43 @@ +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [x] copy +- [x] rename +- [ ] ~~list~~ +- [x] scan +- [ ] ~~presign~~ +- [ ] blocking + +## Configuration + +- `root`: Set the working directory of `OpenDAL` +- `endpoints`: Set the network address of etcd servers +- `username`: Set the username of Redis +- `password`: Set the password for authentication + +You can refer to [`EtcdBuilder`]'s docs for more information + +## Example + +### Via Builder + +```rust +use anyhow::Result; +use opendal::services::Etcd; +use opendal::Operator; + +#[tokio::main] +async fn main() -> Result<()> { + let mut builder = Etcd::default(); + + // this will build a Operator accessing etcd which runs on http://127.0.0.1:2379 + let op: Operator = Operator::new(builder)?.finish(); + Ok(()) +} +``` diff --git a/core/src/services/etcd/mod.rs b/core/src/services/etcd/mod.rs new file mode 100644 index 000000000000..c68fce5f8d81 --- /dev/null +++ b/core/src/services/etcd/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; +pub use backend::EtcdBuilder as Etcd; diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index b484fcc301f1..8f2db0a47a46 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -188,3 +188,8 @@ pub use vercel_artifacts::VercelArtifacts; mod redb; #[cfg(feature = "services-redb")] pub use self::redb::Redb; + +#[cfg(feature = "services-etcd")] +mod etcd; +#[cfg(feature = "services-etcd")] +pub use self::etcd::Etcd; diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index 3f78d3cedf7f..d5a8c637b70d 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -219,6 +219,8 @@ impl Operator { Scheme::Webhdfs => Self::from_map::(map)?.finish(), #[cfg(feature = "services-redb")] Scheme::Redb => Self::from_map::(map)?.finish(), + #[cfg(feature = "services-etcd")] + Scheme::Etcd => Self::from_map::(map)?.finish(), v => { return Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 475732e7b7c9..1f4b18e5fe48 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -100,6 +100,8 @@ pub enum Scheme { Webhdfs, /// [redb][crate::services::Redb]: Redb Services Redb, + /// [etcd][crate::services::Etcd]: Etcd Services + Etcd, /// Custom that allow users to implement services outside of OpenDAL. /// /// # NOTE @@ -163,6 +165,7 @@ impl FromStr for Scheme { "wasabi" => Ok(Scheme::Wasabi), "webdav" => Ok(Scheme::Webdav), "webhdfs" => Ok(Scheme::Webhdfs), + "etcd" => Ok(Scheme::Etcd), _ => Ok(Scheme::Custom(Box::leak(s.into_boxed_str()))), } } @@ -205,6 +208,7 @@ impl From for &'static str { Scheme::Webdav => "webdav", Scheme::Webhdfs => "webhdfs", Scheme::Redb => "redb", + Scheme::Etcd => "etcd", Scheme::Custom(v) => v, } } From f308fb22763c42409cd91404612bb3f36c845645 Mon Sep 17 00:00:00 2001 From: GXD Date: Wed, 19 Jul 2023 19:28:41 +0800 Subject: [PATCH 2/6] feat(services/etcd): add tls support --- Cargo.lock | 3 ++ core/Cargo.toml | 2 +- core/src/services/etcd/backend.rs | 83 +++++++++++++++++++++++++++++-- core/src/services/etcd/docs.md | 3 ++ 4 files changed, 86 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5836a8aab614..552f51d639bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5273,6 +5273,7 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" dependencies = [ + "async-stream", "async-trait", "axum", "base64 0.21.2", @@ -5287,7 +5288,9 @@ dependencies = [ "percent-encoding", "pin-project", "prost", + "rustls-pemfile", "tokio", + "tokio-rustls", "tokio-stream", "tower", "tower-layer", diff --git a/core/Cargo.toml b/core/Cargo.toml index c6be981565df..fb4370ecec9e 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -191,7 +191,7 @@ cacache = { version = "11.6", default-features = false, features = [ chrono = "0.4.26" dashmap = { version = "5.4", optional = true } dirs = { version = "5.0.1", optional = true } -etcd-client = { version = "0.11", optional = true} +etcd-client = { version = "0.11", optional = true, features = ["tls"]} flagset = "0.4" futures = { version = "0.3", default-features = false, features = ["std"] } governor = { version = "0.5", optional = true, features = ["std"] } diff --git a/core/src/services/etcd/backend.rs b/core/src/services/etcd/backend.rs index f54fe080cbd4..d217a8842b32 100644 --- a/core/src/services/etcd/backend.rs +++ b/core/src/services/etcd/backend.rs @@ -20,7 +20,10 @@ use std::fmt::Debug; use std::fmt::Formatter; use async_trait::async_trait; +use etcd_client::Certificate; use etcd_client::Error as EtcdError; +use etcd_client::Identity; +use etcd_client::TlsOptions; use etcd_client::{Client, ConnectOptions, GetOptions}; use tokio::sync::OnceCell; @@ -34,8 +37,10 @@ const DEFAULT_ETCD_ENDPOINTS: &str = "http://127.0.0.1:2379"; #[doc = include_str!("docs.md")] #[derive(Clone, Default)] pub struct EtcdBuilder { - /// network address of the Etcd services. Can be "http://127.0.0.1:23790,http://127.0.0.1:23791,http://127.0.0.1:23792", e.g. - /// + /// network address of the Etcd services. + /// If use https, must set TLS options: `ca_path`, `cert_path`, `key_path`. + /// e.g. "127.0.0.1:23790,127.0.0.1:23791,127.0.0.1:23792" or "http://127.0.0.1:23790,http://127.0.0.1:23791,http://127.0.0.1:23792" or "https://127.0.0.1:23790,https://127.0.0.1:23791,https://127.0.0.1:23792" + /// /// default is "http://127.0.0.1:2379" endpoints: Option, /// the username to connect etcd service. @@ -50,7 +55,18 @@ pub struct EtcdBuilder { /// /// default is "/" root: Option, - + /// certificate authority file path + /// + /// default is None + ca_path: Option, + /// cert path + /// + /// default is None + cert_path: Option, + /// key path + /// + /// default is None + key_path: Option, } impl Debug for EtcdBuilder { @@ -110,6 +126,36 @@ impl EtcdBuilder { } self } + + /// Set the certificate authority file path. + /// + /// default is None + pub fn ca_path(&mut self, ca_path: &str) -> &mut Self { + if !ca_path.is_empty() { + self.ca_path = Some(ca_path.to_string()) + } + self + } + + /// Set the certificate file path. + /// + /// default is None + pub fn cert_path(&mut self, cert_path: &str) -> &mut Self { + if !cert_path.is_empty() { + self.cert_path = Some(cert_path.to_string()) + } + self + } + + /// Set the key file path. + /// + /// default is None + pub fn key_path(&mut self, key_path: &str) -> &mut Self { + if !key_path.is_empty() { + self.key_path = Some(key_path.to_string()) + } + self + } } impl Builder for EtcdBuilder { @@ -123,6 +169,9 @@ impl Builder for EtcdBuilder { map.get("endpoints").map(|v| builder.endpoints(v)); map.get("username").map(|v| builder.username(v)); map.get("password").map(|v| builder.password(v)); + map.get("ca_path").map(|v| builder.ca_path(v)); + map.get("cert_path").map(|v| builder.cert_path(v)); + map.get("key_path").map(|v| builder.key_path(v)); builder } @@ -134,9 +183,25 @@ impl Builder for EtcdBuilder { .unwrap_or_else(|| DEFAULT_ETCD_ENDPOINTS.to_string()); let endpoints: Vec = endpoints.split(",").map(|s| s.to_string()).collect(); + let mut options = ConnectOptions::new(); + + if self.ca_path.is_some() && self.cert_path.is_some() && self.key_path.is_some() { + let ca = self.load_pem( self.ca_path.clone().unwrap().as_str())?; + let key = self.load_pem( self.key_path.clone().unwrap().as_str())?; + let cert = self.load_pem(self.cert_path.clone().unwrap().as_str())?; + + let tls_options = TlsOptions::default() + .ca_certificate(Certificate::from_pem(ca)) + .identity(Identity::from_pem(cert, key)); + options = options.with_tls(tls_options); + } + if let Some(username) = self.username.clone() { - options = options.with_user(username.to_owned(), self.password.clone().unwrap_or("".to_string())); + options = options.with_user( + username.clone(), + self.password.clone().unwrap_or("".to_string()), + ); } let root = normalize_root( @@ -156,6 +221,16 @@ impl Builder for EtcdBuilder { } } +impl EtcdBuilder { + fn load_pem(&self, path: &str) -> Result { + let content = std::fs::read_to_string(path).map_err(|err| { + Error::new(ErrorKind::Unexpected, "invalid file path") + .set_source(err) + }); + content + } +} + /// Backend for etcd services. pub type EtcdBackend = kv::Backend; diff --git a/core/src/services/etcd/docs.md b/core/src/services/etcd/docs.md index 50b2c8bfa337..affbad71dc87 100644 --- a/core/src/services/etcd/docs.md +++ b/core/src/services/etcd/docs.md @@ -20,6 +20,9 @@ This service can be used to: - `endpoints`: Set the network address of etcd servers - `username`: Set the username of Redis - `password`: Set the password for authentication +- `ca_path`: Set the ca path to the etcd connection +- `cert_path`: Set the cert path to the etcd connection +- `key_path`: Set the key path to the etcd connection You can refer to [`EtcdBuilder`]'s docs for more information From 6597cff6c1842d4ff15250e16eb8cf4583b025d2 Mon Sep 17 00:00:00 2001 From: GXD Date: Wed, 19 Jul 2023 19:57:51 +0800 Subject: [PATCH 3/6] style: code format --- core/src/services/etcd/backend.rs | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/core/src/services/etcd/backend.rs b/core/src/services/etcd/backend.rs index d217a8842b32..9f41847cb296 100644 --- a/core/src/services/etcd/backend.rs +++ b/core/src/services/etcd/backend.rs @@ -40,7 +40,7 @@ pub struct EtcdBuilder { /// network address of the Etcd services. /// If use https, must set TLS options: `ca_path`, `cert_path`, `key_path`. /// e.g. "127.0.0.1:23790,127.0.0.1:23791,127.0.0.1:23792" or "http://127.0.0.1:23790,http://127.0.0.1:23791,http://127.0.0.1:23792" or "https://127.0.0.1:23790,https://127.0.0.1:23791,https://127.0.0.1:23792" - /// + /// /// default is "http://127.0.0.1:2379" endpoints: Option, /// the username to connect etcd service. @@ -60,11 +60,11 @@ pub struct EtcdBuilder { /// default is None ca_path: Option, /// cert path - /// + /// /// default is None cert_path: Option, /// key path - /// + /// /// default is None key_path: Option, } @@ -128,7 +128,7 @@ impl EtcdBuilder { } /// Set the certificate authority file path. - /// + /// /// default is None pub fn ca_path(&mut self, ca_path: &str) -> &mut Self { if !ca_path.is_empty() { @@ -138,7 +138,7 @@ impl EtcdBuilder { } /// Set the certificate file path. - /// + /// /// default is None pub fn cert_path(&mut self, cert_path: &str) -> &mut Self { if !cert_path.is_empty() { @@ -148,7 +148,7 @@ impl EtcdBuilder { } /// Set the key file path. - /// + /// /// default is None pub fn key_path(&mut self, key_path: &str) -> &mut Self { if !key_path.is_empty() { @@ -187,8 +187,8 @@ impl Builder for EtcdBuilder { let mut options = ConnectOptions::new(); if self.ca_path.is_some() && self.cert_path.is_some() && self.key_path.is_some() { - let ca = self.load_pem( self.ca_path.clone().unwrap().as_str())?; - let key = self.load_pem( self.key_path.clone().unwrap().as_str())?; + let ca = self.load_pem(self.ca_path.clone().unwrap().as_str())?; + let key = self.load_pem(self.key_path.clone().unwrap().as_str())?; let cert = self.load_pem(self.cert_path.clone().unwrap().as_str())?; let tls_options = TlsOptions::default() @@ -196,7 +196,7 @@ impl Builder for EtcdBuilder { .identity(Identity::from_pem(cert, key)); options = options.with_tls(tls_options); } - + if let Some(username) = self.username.clone() { options = options.with_user( username.clone(), @@ -223,10 +223,8 @@ impl Builder for EtcdBuilder { impl EtcdBuilder { fn load_pem(&self, path: &str) -> Result { - let content = std::fs::read_to_string(path).map_err(|err| { - Error::new(ErrorKind::Unexpected, "invalid file path") - .set_source(err) - }); + let content = std::fs::read_to_string(path) + .map_err(|err| Error::new(ErrorKind::Unexpected, "invalid file path").set_source(err)); content } } From 3fe04a2888385f6e2f6fc65252d567da3c153d0e Mon Sep 17 00:00:00 2001 From: GXD Date: Thu, 20 Jul 2023 11:39:21 +0800 Subject: [PATCH 4/6] refactor: alphabet order --- bin/oli/Cargo.toml | 4 ++-- bin/oli/src/config/mod.rs | 10 +++++----- core/Cargo.toml | 2 +- core/src/services/mod.rs | 10 +++++----- core/src/types/operator/builder.rs | 4 ++-- core/src/types/scheme.rs | 8 ++++---- 6 files changed, 19 insertions(+), 19 deletions(-) diff --git a/bin/oli/Cargo.toml b/bin/oli/Cargo.toml index c62408ccb2ba..407f80f9e1e3 100644 --- a/bin/oli/Cargo.toml +++ b/bin/oli/Cargo.toml @@ -32,6 +32,8 @@ version.workspace = true [features] # Enable services dashmap support services-dashmap = ["opendal/services-dashmap"] +# Enable services etcd support +services-etcd = ["opendal/services-etcd"] # Enable services ftp support services-ftp = ["opendal/services-ftp"] # Enable services hdfs support @@ -50,8 +52,6 @@ services-redis = ["opendal/services-redis"] services-rocksdb = ["opendal/services-rocksdb"] # Enable services sled support services-sled = ["opendal/services-sled"] -# Enable services etcd support -services-etcd = ["opendal/services-etcd"] [dependencies] anyhow = "1" diff --git a/bin/oli/src/config/mod.rs b/bin/oli/src/config/mod.rs index 41188fd30fea..18f7b818bf0e 100644 --- a/bin/oli/src/config/mod.rs +++ b/bin/oli/src/config/mod.rs @@ -170,6 +170,11 @@ impl Config { Operator::from_map::(profile.clone())?.finish(), path, )), + #[cfg(feature = "services-etcd")] + Scheme::Etcd => Ok(( + Operator::from_map::(profile.clone())?.finish(), + path, + )), Scheme::Gcs => Ok(( Operator::from_map::(profile.clone())?.finish(), path, @@ -253,11 +258,6 @@ impl Config { Operator::from_map::(profile.clone())?.finish(), path, )), - #[cfg(feature = "services-etcd")] - Scheme::Etcd => Ok(( - Operator::from_map::(profile.clone())?.finish(), - path, - )), _ => Err(anyhow!( "unknown type '{}' in profile '{}'", scheme, diff --git a/core/Cargo.toml b/core/Cargo.toml index fb4370ecec9e..a2d4231d94c1 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -113,6 +113,7 @@ services-cos = [ ] services-dashmap = ["dep:dashmap"] services-dropbox = [] +services-etcd = ["dep:etcd-client"] services-fs = ["tokio/fs"] services-ftp = ["dep:suppaftp", "dep:lazy-regex", "dep:bb8", "dep:async-tls"] services-gcs = [ @@ -161,7 +162,6 @@ services-wasabi = [ ] services-webdav = [] services-webhdfs = [] -services-etcd = ["dep:etcd-client"] [lib] bench = false diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index 8f2db0a47a46..faa541629dc0 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -39,6 +39,11 @@ mod dashmap; #[cfg(feature = "services-dashmap")] pub use self::dashmap::Dashmap; +#[cfg(feature = "services-etcd")] +mod etcd; +#[cfg(feature = "services-etcd")] +pub use self::etcd::Etcd; + #[cfg(feature = "services-fs")] mod fs; #[cfg(feature = "services-fs")] @@ -188,8 +193,3 @@ pub use vercel_artifacts::VercelArtifacts; mod redb; #[cfg(feature = "services-redb")] pub use self::redb::Redb; - -#[cfg(feature = "services-etcd")] -mod etcd; -#[cfg(feature = "services-etcd")] -pub use self::etcd::Etcd; diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index d5a8c637b70d..1f5e88d3f145 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -163,6 +163,8 @@ impl Operator { Scheme::Cos => Self::from_map::(map)?.finish(), #[cfg(feature = "services-dashmap")] Scheme::Dashmap => Self::from_map::(map)?.finish(), + #[cfg(feature = "services-etcd")] + Scheme::Etcd => Self::from_map::(map)?.finish(), #[cfg(feature = "services-fs")] Scheme::Fs => Self::from_map::(map)?.finish(), #[cfg(feature = "services-ftp")] @@ -219,8 +221,6 @@ impl Operator { Scheme::Webhdfs => Self::from_map::(map)?.finish(), #[cfg(feature = "services-redb")] Scheme::Redb => Self::from_map::(map)?.finish(), - #[cfg(feature = "services-etcd")] - Scheme::Etcd => Self::from_map::(map)?.finish(), v => { return Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 1f4b18e5fe48..3b29260c9005 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -41,6 +41,8 @@ pub enum Scheme { Cos, /// [dashmap][crate::services::Dashmap]: dashmap backend support. Dashmap, + /// [etcd][crate::services::Etcd]: Etcd Services + Etcd, /// [fs][crate::services::Fs]: POSIX alike file system. Fs, /// [ftp][crate::services::Ftp]: FTP backend. @@ -100,8 +102,6 @@ pub enum Scheme { Webhdfs, /// [redb][crate::services::Redb]: Redb Services Redb, - /// [etcd][crate::services::Etcd]: Etcd Services - Etcd, /// Custom that allow users to implement services outside of OpenDAL. /// /// # NOTE @@ -141,6 +141,7 @@ impl FromStr for Scheme { "cacache" => Ok(Scheme::Cacache), "cos" => Ok(Scheme::Cos), "dashmap" => Ok(Scheme::Dashmap), + "etcd" => Ok(Scheme::Etcd), "fs" => Ok(Scheme::Fs), "gcs" => Ok(Scheme::Gcs), "ghac" => Ok(Scheme::Ghac), @@ -165,7 +166,6 @@ impl FromStr for Scheme { "wasabi" => Ok(Scheme::Wasabi), "webdav" => Ok(Scheme::Webdav), "webhdfs" => Ok(Scheme::Webhdfs), - "etcd" => Ok(Scheme::Etcd), _ => Ok(Scheme::Custom(Box::leak(s.into_boxed_str()))), } } @@ -179,6 +179,7 @@ impl From for &'static str { Scheme::Cacache => "cacache", Scheme::Cos => "cos", Scheme::Dashmap => "dashmap", + Scheme::Etcd => "etcd", Scheme::Fs => "fs", Scheme::Gcs => "gcs", Scheme::Ghac => "ghac", @@ -208,7 +209,6 @@ impl From for &'static str { Scheme::Webdav => "webdav", Scheme::Webhdfs => "webhdfs", Scheme::Redb => "redb", - Scheme::Etcd => "etcd", Scheme::Custom(v) => v, } } From e30fcc9e199f8e5523cd9dbc10e028d403ee5c43 Mon Sep 17 00:00:00 2001 From: GXD Date: Thu, 20 Jul 2023 14:32:13 +0800 Subject: [PATCH 5/6] trigger build From 3cba806a889d443a9e8dbf7939e9243b4303864c Mon Sep 17 00:00:00 2001 From: GXD Date: Thu, 20 Jul 2023 15:41:58 +0800 Subject: [PATCH 6/6] refactor: code optimization --- core/src/services/etcd/backend.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/core/src/services/etcd/backend.rs b/core/src/services/etcd/backend.rs index 9f41847cb296..035e9a58bd21 100644 --- a/core/src/services/etcd/backend.rs +++ b/core/src/services/etcd/backend.rs @@ -182,7 +182,7 @@ impl Builder for EtcdBuilder { .clone() .unwrap_or_else(|| DEFAULT_ETCD_ENDPOINTS.to_string()); - let endpoints: Vec = endpoints.split(",").map(|s| s.to_string()).collect(); + let endpoints: Vec = endpoints.split(',').map(|s| s.to_string()).collect(); let mut options = ConnectOptions::new(); @@ -198,10 +198,7 @@ impl Builder for EtcdBuilder { } if let Some(username) = self.username.clone() { - options = options.with_user( - username.clone(), - self.password.clone().unwrap_or("".to_string()), - ); + options = options.with_user(username, self.password.clone().unwrap_or("".to_string())); } let root = normalize_root( @@ -223,9 +220,8 @@ impl Builder for EtcdBuilder { impl EtcdBuilder { fn load_pem(&self, path: &str) -> Result { - let content = std::fs::read_to_string(path) - .map_err(|err| Error::new(ErrorKind::Unexpected, "invalid file path").set_source(err)); - content + std::fs::read_to_string(path) + .map_err(|err| Error::new(ErrorKind::Unexpected, "invalid file path").set_source(err)) } } @@ -313,7 +309,7 @@ impl kv::Adapter for Adapter { let resp = client.get(p, get_options).await?; let mut res = Vec::default(); for kv in resp.kvs() { - res.push(kv.key_str().map(|e| String::from(e)).map_err(|err| { + res.push(kv.key_str().map(String::from).map_err(|err| { Error::new(ErrorKind::Unexpected, "store key is not valid utf-8 string") .set_source(err) })?);