diff --git a/.env.example b/.env.example index 2bc8a6b74353..2b83073a7cdf 100644 --- a/.env.example +++ b/.env.example @@ -61,6 +61,12 @@ OPENDAL_REDIS_DB=0 OPENDAL_ROCKSDB_TEST=false OPENDAL_ROCKSDB_DATADIR=/path/to/database OPENDAL_ROCKSDB_ROOT=/path/to/root +# sftp +OPENDAL_SFTP_TEST=false +OPENDAL_SFTP_ENDPOINT=ssh:// +OPENDAL_SFTP_ROOT=/path/to/dir +OPENDAL_SFTP_USER= +OPENDAL_SFTP_KEY= # sled OPENDAL_SLED_TEST=false OPENDAL_SLED_DATADIR=/path/to/database diff --git a/Cargo.lock b/Cargo.lock index 15b2aa908dd9..ae8f0cc82e76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -104,6 +104,12 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" +[[package]] +name = "array-init" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d62b7694a562cdf5a74227903507c56ab2cc8bdd1f781ed5cb4cf9c9f810bfc" + [[package]] name = "assert-json-diff" version = "2.0.2" @@ -312,6 +318,22 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "awaitable" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70af449c9a763cb655c6a1e5338b42d99c67190824ff90658c1e30be844c0775" +dependencies = [ + "awaitable-error", + "cfg-if", +] + +[[package]] +name = "awaitable-error" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5b3469636cdf8543cceab175efca534471f36eee12fb8374aba00eb5e7e7f8a" + [[package]] name = "backon" version = "0.4.0" @@ -716,12 +738,31 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "concurrent_arena" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24bfeb060a299f86521bb3940344800fc861cc506356e44a273a42cb552afde5" +dependencies = [ + "arc-swap", + "array-init", + "const_fn_assert", + "parking_lot 0.12.1", + "triomphe", +] + [[package]] name = "const-oid" version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "520fbf3c07483f94e3e3ca9d0cfd913d7718ef2483d2cfd91c0d9e91474ab913" +[[package]] +name = "const_fn_assert" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27d614f23f34f7b5165a77dc1591f497e2518f9cec4b4f4b92bfc4dc6cf7a190" + [[package]] name = "convert_case" version = "0.6.0" @@ -994,6 +1035,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "derive_destructure2" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35cb7e5875e1028a73e551747d6d0118f25c3d6dbba2dadf97cc0f4d0c53f2f5" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "diff" version = "0.1.13" @@ -1018,13 +1070,33 @@ dependencies = [ "subtle", ] +[[package]] +name = "dirs" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" +dependencies = [ + "dirs-sys 0.3.7", +] + [[package]] name = "dirs" version = "5.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dece029acd3353e3a58ac2e3eb3c8d6c35827a892edc6cc4138ef9c33df46ecd" dependencies = [ - "dirs-sys", + "dirs-sys 0.4.0", +] + +[[package]] +name = "dirs-sys" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" +dependencies = [ + "libc", + "redox_users", + "winapi", ] [[package]] @@ -2301,6 +2373,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-derive" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "num-integer" version = "0.1.45" @@ -2348,7 +2431,7 @@ version = "0.33.2" dependencies = [ "anyhow", "clap 4.2.5", - "dirs", + "dirs 5.0.0", "env_logger", "futures", "log", @@ -2398,7 +2481,7 @@ dependencies = [ "anyhow", "assert_cmd", "clap 4.2.5", - "dirs", + "dirs 5.0.0", "env_logger", "futures", "log", @@ -2453,8 +2536,11 @@ dependencies = [ "minitrace", "moka", "once_cell", + "openssh", + "openssh-sftp-client", "opentelemetry 0.19.0", "opentelemetry-jaeger", + "owning_ref", "parking_lot 0.12.1", "paste", "percent-encoding", @@ -2535,6 +2621,95 @@ dependencies = [ "rb-sys-env", ] +[[package]] +name = "openssh" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ca6c277973fb549b36dd8980941b5ea3ecebea026f5b1f0060acde74d893c22" +dependencies = [ + "dirs 4.0.0", + "libc", + "once_cell", + "shell-escape", + "tempfile", + "thiserror", + "tokio", + "tokio-pipe", +] + +[[package]] +name = "openssh-sftp-client" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4fa8e5f26e549bd266d9bcd9e5b4fd344729985ef1a7f5ac3e51f3f96a4a620" +dependencies = [ + "bytes", + "derive_destructure2", + "once_cell", + "openssh-sftp-client-lowlevel", + "openssh-sftp-error", + "scopeguard", + "tokio", + "tokio-io-utility", + "tokio-util", +] + +[[package]] +name = "openssh-sftp-client-lowlevel" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "406bf41d8372365497d5645e802a8dfe22008b8183edbe6c79e4b75614431daa" +dependencies = [ + "awaitable", + "bytes", + "concurrent_arena", + "derive_destructure2", + "openssh-sftp-error", + "openssh-sftp-protocol", + "pin-project", + "tokio", + "tokio-io-utility", +] + +[[package]] +name = "openssh-sftp-error" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d836b428ead150165d1178ed0aa672791c13b3ae9616ea1e34d13730a2cb486" +dependencies = [ + "awaitable-error", + "openssh-sftp-protocol-error", + "ssh_format_error", + "thiserror", + "tokio", +] + +[[package]] +name = "openssh-sftp-protocol" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf38532d784978966f95d241226223823f351d5bb2a4bebcf6b20b9cb1e393e0" +dependencies = [ + "bitflags 2.0.2", + "num-derive", + "num-traits", + "openssh-sftp-protocol-error", + "serde", + "ssh_format", + "vec-strings", +] + +[[package]] +name = "openssh-sftp-protocol-error" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0719269eb3f037866ae07ec89cb44ed2c1d63b72b2390cef8e1aa3016a956ff8" +dependencies = [ + "serde", + "thiserror", + "vec-strings", +] + [[package]] name = "openssl" version = "0.10.47" @@ -2717,6 +2892,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "owning_ref" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ff55baddef9e4ad00f88b6c743a2a8062d4c6ade126c2a528644b8e444d52ce" +dependencies = [ + "stable_deref_trait", +] + [[package]] name = "parking" version = "2.0.0" @@ -3425,7 +3609,7 @@ dependencies = [ "base64 0.21.0", "bytes", "chrono", - "dirs", + "dirs 5.0.0", "form_urlencoded", "hex", "hmac", @@ -3841,6 +4025,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shell-escape" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45bb67a18fa91266cc7807181f62f9178a6873bfad7dc788c42e6430db40184f" + [[package]] name = "shell-words" version = "1.1.0" @@ -3993,6 +4183,32 @@ dependencies = [ "der", ] +[[package]] +name = "ssh_format" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24ab31081d1c9097c327ec23550858cb5ffb4af6b866c1ef4d728455f01f3304" +dependencies = [ + "bytes", + "serde", + "ssh_format_error", +] + +[[package]] +name = "ssh_format_error" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be3c6519de7ca611f71ef7e8a56eb57aa1c818fecb5242d0a0f39c83776c210c" +dependencies = [ + "serde", +] + +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "strsim" version = "0.10.0" @@ -4088,6 +4304,12 @@ version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" +[[package]] +name = "thin-vec" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aac81b6fd6beb5884b0cf3321b8117e6e5d47ecb6fc89f414cfdcca8b2fe2dd8" + [[package]] name = "thiserror" version = "1.0.40" @@ -4222,6 +4444,16 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "tokio-io-utility" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d672654d175710e52c7c41f6aec77c62b3c0954e2a7ebce9049d1e94ed7c263" +dependencies = [ + "bytes", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.0.0" @@ -4243,6 +4475,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-pipe" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f213a84bffbd61b8fa0ba8a044b4bbe35d471d0b518867181e82bd5c15542784" +dependencies = [ + "libc", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.23.4" @@ -4256,9 +4498,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.7" +version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2" +checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" dependencies = [ "bytes", "futures-core", @@ -4398,6 +4640,11 @@ name = "triomphe" version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1ee9bd9239c339d714d657fac840c6d2a4f9c45f4f9ec7b0975113458be78db" +dependencies = [ + "arc-swap", + "serde", + "stable_deref_trait", +] [[package]] name = "trust-dns-proto" @@ -4566,6 +4813,16 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "vec-strings" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8509489e2a7ee219522238ad45fd370bec6808811ac15ac6b07453804e77659" +dependencies = [ + "serde", + "thin-vec", +] + [[package]] name = "version_check" version = "0.9.4" diff --git a/core/Cargo.toml b/core/Cargo.toml index b88de207a9cb..5dc6170c4690 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -135,6 +135,12 @@ services-s3 = [ "reqsign?/services-aws", "reqsign?/reqwest_request", ] +services-sftp = [ + "dep:openssh", + "dep:openssh-sftp-client", + "dep:bb8", + "dep:owning_ref", +] services-sled = ["dep:sled"] services-supabase = [] services-wasabi = [ @@ -176,7 +182,10 @@ metrics = { version = "0.20", optional = true } minitrace = { version = "0.4.0", optional = true } moka = { version = "0.10", optional = true, features = ["future"] } once_cell = "1" +openssh = { version = "0.9.9", optional = true } +openssh-sftp-client = { version = "0.12.2", optional = true } opentelemetry = { version = "0.19.0", optional = true } +owning_ref = { version = "0.4.1", optional = true } parking_lot = "0.12" percent-encoding = "2" pin-project = "1" diff --git a/core/README.md b/core/README.md index 8bbce48e84ea..ca758fa8e806 100644 --- a/core/README.md +++ b/core/README.md @@ -36,6 +36,7 @@ - [redis](https://docs.rs/opendal/latest/opendal/services/struct.Redis.html): [Redis](https://redis.io/) services support. - [rocksdb](https://docs.rs/opendal/latest/opendal/services/struct.Rocksdb.html): [RocksDB](http://rocksdb.org/) services support. - [s3](https://docs.rs/opendal/latest/opendal/services/struct.S3.html): [AWS S3](https://aws.amazon.com/s3/) alike services. +- [sftp](https://docs.rs/opendal/latest/opendal/services/struct.Sftp.html): [SFTP](https://datatracker.ietf.org/doc/html/draft-ietf-secsh-filexfer-02) services support. - [sled](https://docs.rs/opendal/latest/opendal/services/sled/struct.Sled.html): [sled](https://crates.io/crates/sled) services support. - [webdav](https://docs.rs/opendal/latest/opendal/services/struct.Webdav.html): [WebDAV](https://datatracker.ietf.org/doc/html/rfc4918) Service Support. - [webhdfs](https://docs.rs/opendal/latest/opendal/services/struct.Webhdfs.html): [WebHDFS](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html) Service Support. diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs index 323389806609..33ad34a6046c 100644 --- a/core/src/services/ftp/backend.rs +++ b/core/src/services/ftp/backend.rs @@ -60,10 +60,10 @@ use crate::*; /// /// # Configuration /// -/// - `endpoint`: set the endpoint for connection +/// - `endpoint`: Set the endpoint for connection /// - `root`: Set the work directory for backend -/// - `credential`: login credentials -/// - `tls`: tls mode +/// - `user`: Set the login user +/// - `password`: Set the login password /// /// You can refer to [`FtpBuilder`]'s docs for more information /// diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index 026219868a96..d08135b06306 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -114,6 +114,11 @@ mod s3; #[cfg(feature = "services-s3")] pub use s3::S3; +#[cfg(feature = "services-sftp")] +mod sftp; +#[cfg(feature = "services-sftp")] +pub use sftp::Sftp; + #[cfg(feature = "services-sled")] mod sled; #[cfg(feature = "services-sled")] diff --git a/core/src/services/sftp/backend.rs b/core/src/services/sftp/backend.rs new file mode 100644 index 000000000000..4ae80d7f5b9b --- /dev/null +++ b/core/src/services/sftp/backend.rs @@ -0,0 +1,507 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cmp::min; +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; + +use async_trait::async_trait; +use bb8::PooledConnection; +use futures::executor::block_on; +use log::debug; +use openssh::RemoteChild; +use openssh::Session; +use openssh::SessionBuilder; +use openssh::Stdio; +use openssh_sftp_client::Sftp; +use owning_ref::OwningHandle; +use tokio::sync::OnceCell; + +use super::error::is_not_found; +use super::error::is_sftp_protocol_error; +use super::error::SftpError; +use super::pager::SftpPager; +use super::utils::SftpReader; +use super::writer::SftpWriter; +use crate::ops::*; +use crate::raw::*; +use crate::*; + +/// SFTP services support. (only works on unix) +/// +/// # Capabilities +/// +/// This service can be used to: +/// +/// - [x] read +/// - [x] write +/// - [x] list +/// - [ ] ~~scan~~ +/// - [ ] ~~presign~~ +/// - [ ] blocking +/// +/// # Configuration +/// +/// - `endpoint`: Set the endpoint for connection +/// - `root`: Set the work directory for backend, default to `/home/$USER/` +/// - `user`: Set the login user +/// - `key`: Set the public key for login +/// +/// It doesn't support password login, you can use public key instead. +/// +/// You can refer to [`SftpBuilder`]'s docs for more information +/// +/// # Example +/// +/// ## Via Builder +/// +/// ```no_run +/// use anyhow::Result; +/// use opendal::services::Ftp; +/// use opendal::Object; +/// use opendal::Operator; +/// +/// #[tokio::main] +/// async fn main() -> Result<()> { +/// // create backend builder +/// let mut builder = Sftp::default(); +/// +/// builder.endpoint("127.0.0.1").user("test").password("test"); +/// +/// let op: Operator = Operator::new(builder)?.finish(); +/// let _obj: Object = op.object("test_file"); +/// Ok(()) +/// } +/// ``` + +#[derive(Default)] +pub struct SftpBuilder { + endpoint: Option, + root: Option, + user: Option, + key: Option, +} + +impl Debug for SftpBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Builder") + .field("endpoint", &self.endpoint) + .field("root", &self.root) + .finish() + } +} + +impl SftpBuilder { + /// set endpoint for sftp backend. + pub fn endpoint(&mut self, endpoint: &str) -> &mut Self { + self.endpoint = if endpoint.is_empty() { + None + } else { + Some(endpoint.to_string()) + }; + + self + } + + /// set root path for sftp backend. + pub fn root(&mut self, root: &str) -> &mut Self { + self.root = if root.is_empty() { + None + } else { + Some(root.to_string()) + }; + + self + } + + /// set user for sftp backend. + pub fn user(&mut self, user: &str) -> &mut Self { + self.user = if user.is_empty() { + None + } else { + Some(user.to_string()) + }; + + self + } + + /// set key path for sftp backend. + pub fn key(&mut self, key: &str) -> &mut Self { + self.key = if key.is_empty() { + None + } else { + Some(key.to_string()) + }; + + self + } +} + +impl Builder for SftpBuilder { + const SCHEME: Scheme = Scheme::Sftp; + type Accessor = SftpBackend; + + fn build(&mut self) -> Result { + debug!("sftp backend build started: {:?}", &self); + let endpoint = match self.endpoint.clone() { + Some(v) => v, + None => return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")), + }; + + let user = match self.user.clone() { + Some(v) => v, + None => return Err(Error::new(ErrorKind::ConfigInvalid, "user is empty")), + }; + + let root = self + .root + .clone() + .map(|r| normalize_root(r.as_str())) + .unwrap_or(format!("/home/{}/", user)); + + debug!("sftp backend finished: {:?}", &self); + + Ok(SftpBackend { + endpoint, + root, + user, + key: self.key.clone(), + sftp: OnceCell::new(), + }) + } + + fn from_map(map: HashMap) -> Self { + let mut builder = SftpBuilder::default(); + + map.get("root").map(|v| builder.root(v)); + map.get("endpoint").map(|v| builder.endpoint(v)); + map.get("user").map(|v| builder.user(v)); + map.get("key").map(|v| builder.key(v)); + + builder + } +} + +#[derive(Clone)] +pub struct Manager { + endpoint: String, + user: String, + key: Option, +} + +pub struct Connection { + // the remote child owns the ref to session, so we need to use owning handle + // The session will only create one child, so we can make sure the child can live + // as long as the session. (the session will be dropped when the connection is dropped) + // Related: https://stackoverflow.com/a/47260399 + child: OwningHandle, Box>>, + pub sftp: Sftp, +} + +#[async_trait] +impl bb8::ManageConnection for Manager { + type Connection = Connection; + type Error = SftpError; + + async fn connect(&self) -> std::result::Result { + let mut session = SessionBuilder::default(); + + session.user(self.user.clone()); + + if let Some(key) = &self.key { + session.keyfile(key); + } + + let session = session.connect(self.endpoint.clone()).await?; + + let sess = Box::new(session); + let mut oref = OwningHandle::new_with_fn(sess, unsafe { + |x| { + Box::new( + block_on( + (*x).subsystem("sftp") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn(), + ) + .unwrap(), + ) + } + }); + + let sftp = Sftp::new( + oref.stdin().take().unwrap(), + oref.stdout().take().unwrap(), + Default::default(), + ) + .await?; + + Ok(Connection { child: oref, sftp }) + } + + async fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> { + conn.child.session().check().await?; + Ok(()) + } + + /// Always allow reuse conn. + fn has_broken(&self, _: &mut Self::Connection) -> bool { + false + } +} + +/// Backend is used to serve `Accessor` support for sftp. +pub struct SftpBackend { + endpoint: String, + root: String, + user: String, + key: Option, + sftp: OnceCell>, +} + +impl Debug for SftpBackend { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Backend").finish() + } +} + +#[async_trait] +impl Accessor for SftpBackend { + type Reader = SftpReader; + type BlockingReader = (); + type Writer = SftpWriter; + type BlockingWriter = (); + type Pager = SftpPager; + type BlockingPager = (); + + fn info(&self) -> AccessorInfo { + let mut am = AccessorInfo::default(); + am.set_root(self.root.as_str()) + .set_scheme(Scheme::Sftp) + .set_capability(Capability { + stat: true, + read: true, + write: true, + list: true, + list_with_limit: true, + + ..Default::default() + }); + + am + } + + async fn create_dir(&self, path: &str, _: OpCreate) -> Result { + let client = self.sftp_connect().await?; + let mut fs = client.sftp.fs(); + fs.set_cwd(self.root.clone()); + + let paths: Vec<&str> = path.split_inclusive('/').collect(); + let mut current = self.root.clone(); + for p in paths { + if p.is_empty() { + continue; + } + + current.push_str(p); + let res = fs.create_dir(p).await; + + if let Err(e) = res { + // ignore error if dir already exists + if !is_sftp_protocol_error(&e) { + return Err(e.into()); + } + } + fs.set_cwd(current.clone()); + } + + return Ok(RpCreate::default()); + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let client = self.sftp_connect().await?; + + let mut fs = client.sftp.fs(); + fs.set_cwd(self.root.clone()); + let path = fs.canonicalize(path).await?; + + let mut file = client.sftp.open(path.as_path()).await?; + + let total_length = file.metadata().await?.len().ok_or(Error::new( + ErrorKind::NotFound, + format!("file not found: {}", path.to_str().unwrap()).as_str(), + ))?; + + let br = args.range(); + let (start, end) = match (br.offset(), br.size()) { + // Read a specific range. + (Some(offset), Some(size)) => (offset, min(offset + size, total_length)), + // Read from offset. + (Some(offset), None) => (offset, total_length), + // Read the last size bytes. + (None, Some(size)) => ( + if total_length > size { + total_length - size + } else { + 0 + }, + total_length, + ), + // Read the whole file. + (None, None) => (0, total_length), + }; + + let r = SftpReader::new(self.sftp_connect_owned().await?, path, start, end).await?; + + Ok((RpRead::new(end - start), r)) + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + if args.content_length().is_none() { + return Err(Error::new( + ErrorKind::Unsupported, + "write without content length is not supported", + )); + } + + if let Some((dir, _)) = path.rsplit_once('/') { + self.create_dir(dir, OpCreate::default()).await?; + } + + let path = format!("{}{}", self.root, path); + + Ok(( + RpWrite::new(), + SftpWriter::new(self.sftp_connect_owned().await?, path), + )) + } + + async fn stat(&self, path: &str, _: OpStat) -> Result { + let client = self.sftp_connect().await?; + let mut fs = client.sftp.fs(); + fs.set_cwd(self.root.clone()); + + let meta = fs.metadata(path).await?; + + Ok(RpStat::new(meta.into())) + } + + async fn delete(&self, path: &str, _: OpDelete) -> Result { + let client = self.sftp_connect().await?; + + let mut fs = client.sftp.fs(); + fs.set_cwd(self.root.clone()); + + if path.ends_with('/') { + let file_path = format!("./{}", path); + let dir = match fs.open_dir(file_path.clone()).await { + Ok(dir) => dir, + Err(e) => { + if is_not_found(&e) { + return Ok(RpDelete::default()); + } else { + return Err(e.into()); + } + } + } + .read_dir() + .await?; + + for file in &dir { + let file_name = file.filename().to_str().unwrap(); + if file_name == "." || file_name == ".." { + continue; + } + let file_path = format!("{}{}", path, file_name); + self.delete(file_path.as_str(), OpDelete::default()).await?; + } + + match fs.remove_dir(path).await { + Err(e) if !is_not_found(&e) => { + return Err(e.into()); + } + _ => {} + } + } else { + match fs.remove_file(path).await { + Err(e) if !is_not_found(&e) => { + return Err(e.into()); + } + _ => {} + } + }; + + Ok(RpDelete::default()) + } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { + let client = self.sftp_connect().await?; + let mut fs = client.sftp.fs(); + fs.set_cwd(self.root.clone()); + + let file_path = format!("./{}", path); + + let mut dir = match fs.open_dir(file_path.clone()).await { + Ok(dir) => dir, + Err(e) => { + if is_not_found(&e) { + return Ok((RpList::default(), SftpPager::empty())); + } else { + return Err(e.into()); + } + } + }; + let dir = dir.read_dir().await?; + + Ok(( + RpList::default(), + SftpPager::new(dir.into_inner(), path.to_owned(), args.limit()), + )) + } +} + +impl SftpBackend { + async fn pool(&self) -> Result<&bb8::Pool> { + let pool = self + .sftp + .get_or_try_init(|| async { + let manager = Manager { + endpoint: self.endpoint.clone(), + user: self.user.clone(), + key: self.key.clone(), + }; + + bb8::Pool::builder().max_size(10).build(manager).await + }) + .await?; + + Ok(pool) + } + + pub async fn sftp_connect(&self) -> Result> { + let conn = self.pool().await?.get().await?; + + Ok(conn) + } + + pub async fn sftp_connect_owned(&self) -> Result> { + let conn = self.pool().await?.get_owned().await?; + + Ok(conn) + } +} diff --git a/core/src/services/sftp/error.rs b/core/src/services/sftp/error.rs new file mode 100644 index 000000000000..0e7dc2ef4573 --- /dev/null +++ b/core/src/services/sftp/error.rs @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bb8::RunError; +use openssh::Error as SshError; +use openssh_sftp_client::{error::SftpErrorKind, Error as SftpClientError}; + +use crate::{Error, ErrorKind}; + +#[derive(Debug)] +pub enum SftpError { + SftpClientError(SftpClientError), + SshError(SshError), +} + +impl From for Error { + fn from(e: SftpClientError) -> Self { + let kind = match &e { + SftpClientError::UnsupportedSftpProtocol { version: _ } => ErrorKind::Unsupported, + SftpClientError::SftpError(kind, _msg) => match kind { + SftpErrorKind::NoSuchFile => ErrorKind::NotFound, + SftpErrorKind::PermDenied => ErrorKind::PermissionDenied, + SftpErrorKind::OpUnsupported => ErrorKind::Unsupported, + _ => ErrorKind::Unexpected, + }, + _ => ErrorKind::Unexpected, + }; + + Error::new(kind, "sftp error").set_source(e) + } +} + +impl From for Error { + fn from(e: SshError) -> Self { + Error::new(ErrorKind::Unexpected, "ssh error").set_source(e) + } +} + +impl From for SftpError { + fn from(e: SftpClientError) -> Self { + SftpError::SftpClientError(e) + } +} + +impl From for SftpError { + fn from(e: SshError) -> Self { + SftpError::SshError(e) + } +} + +impl From for Error { + fn from(e: SftpError) -> Self { + match e { + SftpError::SftpClientError(e) => e.into(), + SftpError::SshError(e) => e.into(), + } + } +} + +impl From> for Error { + fn from(e: RunError) -> Self { + match e { + RunError::User(err) => err.into(), + RunError::TimedOut => { + Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary() + } + } + } +} + +pub(super) fn is_not_found(e: &SftpClientError) -> bool { + matches!(e, SftpClientError::SftpError(SftpErrorKind::NoSuchFile, _)) +} + +pub(super) fn is_sftp_protocol_error(e: &SftpClientError) -> bool { + matches!(e, SftpClientError::SftpError(_, _)) +} diff --git a/core/src/services/sftp/mod.rs b/core/src/services/sftp/mod.rs new file mode 100644 index 000000000000..d854429bc7a0 --- /dev/null +++ b/core/src/services/sftp/mod.rs @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub use backend::SftpBuilder as Sftp; + +mod backend; +mod error; +mod pager; +mod utils; +mod writer; diff --git a/core/src/services/sftp/pager.rs b/core/src/services/sftp/pager.rs new file mode 100644 index 000000000000..e6b557c250f6 --- /dev/null +++ b/core/src/services/sftp/pager.rs @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use openssh_sftp_client::fs::DirEntry; + +use crate::raw::oio; +use crate::Result; + +pub struct SftpPager { + dir: Box<[DirEntry]>, + path: String, + limit: Option, + complete: bool, +} + +impl SftpPager { + pub fn new(dir: Box<[DirEntry]>, path: String, limit: Option) -> Self { + Self { + dir, + path, + limit, + complete: false, + } + } + + pub fn empty() -> Self { + Self { + dir: Box::new([]), + path: String::new(), + limit: None, + complete: true, + } + } +} + +#[async_trait] +impl oio::Page for SftpPager { + async fn next(&mut self) -> Result>> { + if self.complete { + return Ok(None); + } + + // when listing the root directory, the prefix should be empty + if self.path == "/" { + self.path = "".to_owned(); + } + + let iter = self + .dir + .iter() + .filter(|e| { + // filter out "." and ".." + e.filename().to_str().unwrap() != "." && e.filename().to_str().unwrap() != ".." + }) + .map(|e| map_entry(self.path.clone(), e.clone())); + + let v: Vec = if let Some(limit) = self.limit { + iter.take(limit).collect() + } else { + iter.collect() + }; + + self.complete = true; + + if v.is_empty() { + Ok(None) + } else { + Ok(Some(v)) + } + } +} + +fn map_entry(prefix: String, value: DirEntry) -> oio::Entry { + let path = format!( + "{}{}{}", + prefix, + value.filename().to_str().unwrap(), + if value.file_type().unwrap().is_dir() { + "/" + } else { + "" + } + ); + + oio::Entry::new(path.as_str(), value.metadata().into()) +} diff --git a/core/src/services/sftp/utils.rs b/core/src/services/sftp/utils.rs new file mode 100644 index 000000000000..91387aad03b0 --- /dev/null +++ b/core/src/services/sftp/utils.rs @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::io::SeekFrom; +use std::path::PathBuf; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use async_compat::Compat; +use bb8::PooledConnection; +use futures::executor::block_on; +use openssh_sftp_client::file::TokioCompatFile; +use openssh_sftp_client::metadata::MetaData as SftpMeta; +use owning_ref::OwningHandle; + +use super::backend::Manager; +use crate::raw::oio; +use crate::raw::oio::into_reader::FdReader; +use crate::raw::oio::ReadExt; +use crate::EntryMode; +use crate::Metadata; +use crate::Result; + +pub struct SftpReader { + // similar situation to connection struct + // We can make sure the file can live as long as the connection. + file: OwningHandle< + Box>, + Box>>>, + >, +} + +impl SftpReader { + pub async fn new( + conn: PooledConnection<'static, Manager>, + path: PathBuf, + start: u64, + end: u64, + ) -> Result { + let mut file = OwningHandle::new_with_fn(Box::new(conn), |conn| unsafe { + let file = block_on((*conn).sftp.open(path)).unwrap(); + let f = Compat::new(TokioCompatFile::from(file)); + Box::new(oio::into_reader::from_fd(f, start, end)) + }); + + file.seek(SeekFrom::Start(0)).await?; + + Ok(SftpReader { file }) + } +} + +impl oio::Read for SftpReader { + fn poll_read(&mut self, cx: &mut Context, buf: &mut [u8]) -> Poll> { + Pin::new(&mut *self.file).poll_read(cx, buf) + } + + fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll> { + Pin::new(&mut *self.file).poll_seek(cx, pos) + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + Pin::new(&mut *self.file).poll_next(cx) + } +} + +impl From for Metadata { + fn from(meta: SftpMeta) -> Self { + let mode = meta + .file_type() + .map(|filetype| { + if filetype.is_file() { + EntryMode::FILE + } else if filetype.is_dir() { + EntryMode::DIR + } else { + EntryMode::Unknown + } + }) + .unwrap_or(EntryMode::Unknown); + + let mut metadata = Metadata::new(mode); + + if let Some(size) = meta.len() { + metadata.set_content_length(size); + } + + if let Some(modified) = meta.modified() { + metadata.set_last_modified(modified.as_system_time().into()); + } + + metadata + } +} diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs new file mode 100644 index 000000000000..2fa5215d1307 --- /dev/null +++ b/core/src/services/sftp/writer.rs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use bb8::PooledConnection; +use bytes::Bytes; + +use super::backend::Manager; +use crate::raw::oio; +use crate::{Error, ErrorKind, Result}; + +pub struct SftpWriter { + conn: PooledConnection<'static, Manager>, + path: String, +} + +impl SftpWriter { + pub fn new(conn: PooledConnection<'static, Manager>, path: String) -> Self { + SftpWriter { conn, path } + } +} + +#[async_trait] +impl oio::Write for SftpWriter { + async fn write(&mut self, bs: Bytes) -> Result<()> { + let mut file = self.conn.sftp.create(&self.path).await?; + + file.write_all(&bs).await?; + + Ok(()) + } + + async fn abort(&mut self) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "SFTP does not support aborting writes", + )) + } + + async fn close(&mut self) -> Result<()> { + Ok(()) + } +} diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 5019d22e39a0..7e52ab84ea0d 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -71,6 +71,8 @@ pub enum Scheme { Rocksdb, /// [s3][crate::services::S3]: AWS S3 alike services. S3, + /// [sftp][crate::services::Sftp]: SFTP services + Sftp, /// [sled][crate::services::Sled]: Sled services Sled, /// [Supabase][crate::services::Supabase]: Supabase storage service @@ -166,6 +168,7 @@ impl From for &'static str { Scheme::Redis => "redis", Scheme::Rocksdb => "rocksdb", Scheme::S3 => "s3", + Scheme::Sftp => "sftp", Scheme::Sled => "sled", Scheme::Supabase => "supabase", Scheme::Oss => "oss", diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs index c9038d05e8f5..991b0a6c9f58 100644 --- a/core/tests/behavior/main.rs +++ b/core/tests/behavior/main.rs @@ -110,6 +110,7 @@ cfg_if::cfg_if! { if #[cfg(feature = "services-redis")] { behavior_tests!(Redis) cfg_if::cfg_if! { if #[cfg(feature = "services-rocksdb")] { behavior_tests!(Rocksdb); }} behavior_tests!(Oss); behavior_tests!(S3); +cfg_if::cfg_if! { if #[cfg(feature = "services-sftp")] { behavior_tests!(Sftp); }} cfg_if::cfg_if! { if #[cfg(feature = "services-sled")] { behavior_tests!(Sled); }} behavior_tests!(Webdav); behavior_tests!(Webhdfs);