From a53a3fe1fb668df3b67eb1152fd4097e7313797f Mon Sep 17 00:00:00 2001 From: salvor Date: Sun, 30 Apr 2023 11:06:46 +0800 Subject: [PATCH 1/7] add sftp basic skeleton, exist life cycle problem --- .env.example | 6 + Cargo.lock | 263 +++++++++++++++++++- core/Cargo.toml | 3 + core/README.md | 1 + core/src/services/ftp/backend.rs | 6 +- core/src/services/mod.rs | 5 + core/src/services/sftp/backend.rs | 391 ++++++++++++++++++++++++++++++ core/src/services/sftp/error.rs | 107 ++++++++ core/src/services/sftp/mod.rs | 24 ++ core/src/services/sftp/pager.rs | 16 ++ core/src/services/sftp/utils.rs | 133 ++++++++++ core/src/services/sftp/writer.rs | 55 +++++ core/src/types/scheme.rs | 3 + core/tests/behavior/main.rs | 1 + 14 files changed, 1003 insertions(+), 11 deletions(-) create mode 100644 core/src/services/sftp/backend.rs create mode 100644 core/src/services/sftp/error.rs create mode 100644 core/src/services/sftp/mod.rs create mode 100644 core/src/services/sftp/pager.rs create mode 100644 core/src/services/sftp/utils.rs create mode 100644 core/src/services/sftp/writer.rs diff --git a/.env.example b/.env.example index 2bc8a6b74353..2b83073a7cdf 100644 --- a/.env.example +++ b/.env.example @@ -61,6 +61,12 @@ OPENDAL_REDIS_DB=0 OPENDAL_ROCKSDB_TEST=false OPENDAL_ROCKSDB_DATADIR=/path/to/database OPENDAL_ROCKSDB_ROOT=/path/to/root +# sftp +OPENDAL_SFTP_TEST=false +OPENDAL_SFTP_ENDPOINT=ssh:// +OPENDAL_SFTP_ROOT=/path/to/dir +OPENDAL_SFTP_USER= +OPENDAL_SFTP_KEY= # sled OPENDAL_SLED_TEST=false OPENDAL_SLED_DATADIR=/path/to/database diff --git a/Cargo.lock b/Cargo.lock index 15b2aa908dd9..cfbc891d73dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -104,6 +104,12 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" +[[package]] +name = "array-init" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d62b7694a562cdf5a74227903507c56ab2cc8bdd1f781ed5cb4cf9c9f810bfc" + [[package]] name = "assert-json-diff" version = "2.0.2" @@ -312,6 +318,22 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "awaitable" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70af449c9a763cb655c6a1e5338b42d99c67190824ff90658c1e30be844c0775" +dependencies = [ + "awaitable-error", + "cfg-if", +] + +[[package]] +name = "awaitable-error" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5b3469636cdf8543cceab175efca534471f36eee12fb8374aba00eb5e7e7f8a" + [[package]] name = "backon" version = "0.4.0" @@ -716,12 +738,31 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "concurrent_arena" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24bfeb060a299f86521bb3940344800fc861cc506356e44a273a42cb552afde5" +dependencies = [ + "arc-swap", + "array-init", + "const_fn_assert", + "parking_lot 0.12.1", + "triomphe", +] + [[package]] name = "const-oid" version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "520fbf3c07483f94e3e3ca9d0cfd913d7718ef2483d2cfd91c0d9e91474ab913" +[[package]] +name = "const_fn_assert" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27d614f23f34f7b5165a77dc1591f497e2518f9cec4b4f4b92bfc4dc6cf7a190" + [[package]] name = "convert_case" version = "0.6.0" @@ -994,6 +1035,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "derive_destructure2" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35cb7e5875e1028a73e551747d6d0118f25c3d6dbba2dadf97cc0f4d0c53f2f5" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "diff" version = "0.1.13" @@ -1018,13 +1070,33 @@ dependencies = [ "subtle", ] +[[package]] +name = "dirs" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" +dependencies = [ + "dirs-sys 0.3.7", +] + [[package]] name = "dirs" version = "5.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dece029acd3353e3a58ac2e3eb3c8d6c35827a892edc6cc4138ef9c33df46ecd" dependencies = [ - "dirs-sys", + "dirs-sys 0.4.0", +] + +[[package]] +name = "dirs-sys" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" +dependencies = [ + "libc", + "redox_users", + "winapi", ] [[package]] @@ -2301,6 +2373,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-derive" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "num-integer" version = "0.1.45" @@ -2347,8 +2430,8 @@ name = "oay" version = "0.33.2" dependencies = [ "anyhow", - "clap 4.2.5", - "dirs", + "clap 4.1.11", + "dirs 5.0.0", "env_logger", "futures", "log", @@ -2397,8 +2480,8 @@ version = "0.33.2" dependencies = [ "anyhow", "assert_cmd", - "clap 4.2.5", - "dirs", + "clap 4.1.11", + "dirs 5.0.0", "env_logger", "futures", "log", @@ -2453,6 +2536,8 @@ dependencies = [ "minitrace", "moka", "once_cell", + "openssh", + "openssh-sftp-client", "opentelemetry 0.19.0", "opentelemetry-jaeger", "parking_lot 0.12.1", @@ -2535,6 +2620,95 @@ dependencies = [ "rb-sys-env", ] +[[package]] +name = "openssh" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ca6c277973fb549b36dd8980941b5ea3ecebea026f5b1f0060acde74d893c22" +dependencies = [ + "dirs 4.0.0", + "libc", + "once_cell", + "shell-escape", + "tempfile", + "thiserror", + "tokio", + "tokio-pipe", +] + +[[package]] +name = "openssh-sftp-client" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4fa8e5f26e549bd266d9bcd9e5b4fd344729985ef1a7f5ac3e51f3f96a4a620" +dependencies = [ + "bytes", + "derive_destructure2", + "once_cell", + "openssh-sftp-client-lowlevel", + "openssh-sftp-error", + "scopeguard", + "tokio", + "tokio-io-utility", + "tokio-util", +] + +[[package]] +name = "openssh-sftp-client-lowlevel" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "406bf41d8372365497d5645e802a8dfe22008b8183edbe6c79e4b75614431daa" +dependencies = [ + "awaitable", + "bytes", + "concurrent_arena", + "derive_destructure2", + "openssh-sftp-error", + "openssh-sftp-protocol", + "pin-project", + "tokio", + "tokio-io-utility", +] + +[[package]] +name = "openssh-sftp-error" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d836b428ead150165d1178ed0aa672791c13b3ae9616ea1e34d13730a2cb486" +dependencies = [ + "awaitable-error", + "openssh-sftp-protocol-error", + "ssh_format_error", + "thiserror", + "tokio", +] + +[[package]] +name = "openssh-sftp-protocol" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf38532d784978966f95d241226223823f351d5bb2a4bebcf6b20b9cb1e393e0" +dependencies = [ + "bitflags 2.0.2", + "num-derive", + "num-traits", + "openssh-sftp-protocol-error", + "serde", + "ssh_format", + "vec-strings", +] + +[[package]] +name = "openssh-sftp-protocol-error" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0719269eb3f037866ae07ec89cb44ed2c1d63b72b2390cef8e1aa3016a956ff8" +dependencies = [ + "serde", + "thiserror", + "vec-strings", +] + [[package]] name = "openssl" version = "0.10.47" @@ -3425,7 +3599,7 @@ dependencies = [ "base64 0.21.0", "bytes", "chrono", - "dirs", + "dirs 5.0.0", "form_urlencoded", "hex", "hmac", @@ -3841,6 +4015,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shell-escape" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45bb67a18fa91266cc7807181f62f9178a6873bfad7dc788c42e6430db40184f" + [[package]] name = "shell-words" version = "1.1.0" @@ -3993,6 +4173,32 @@ dependencies = [ "der", ] +[[package]] +name = "ssh_format" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24ab31081d1c9097c327ec23550858cb5ffb4af6b866c1ef4d728455f01f3304" +dependencies = [ + "bytes", + "serde", + "ssh_format_error", +] + +[[package]] +name = "ssh_format_error" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be3c6519de7ca611f71ef7e8a56eb57aa1c818fecb5242d0a0f39c83776c210c" +dependencies = [ + "serde", +] + +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "strsim" version = "0.10.0" @@ -4088,6 +4294,12 @@ version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" +[[package]] +name = "thin-vec" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aac81b6fd6beb5884b0cf3321b8117e6e5d47ecb6fc89f414cfdcca8b2fe2dd8" + [[package]] name = "thiserror" version = "1.0.40" @@ -4222,6 +4434,16 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "tokio-io-utility" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d672654d175710e52c7c41f6aec77c62b3c0954e2a7ebce9049d1e94ed7c263" +dependencies = [ + "bytes", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.0.0" @@ -4243,6 +4465,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-pipe" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f213a84bffbd61b8fa0ba8a044b4bbe35d471d0b518867181e82bd5c15542784" +dependencies = [ + "libc", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.23.4" @@ -4256,9 +4488,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.7" +version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2" +checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" dependencies = [ "bytes", "futures-core", @@ -4398,6 +4630,11 @@ name = "triomphe" version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1ee9bd9239c339d714d657fac840c6d2a4f9c45f4f9ec7b0975113458be78db" +dependencies = [ + "arc-swap", + "serde", + "stable_deref_trait", +] [[package]] name = "trust-dns-proto" @@ -4566,6 +4803,16 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "vec-strings" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8509489e2a7ee219522238ad45fd370bec6808811ac15ac6b07453804e77659" +dependencies = [ + "serde", + "thin-vec", +] + [[package]] name = "version_check" version = "0.9.4" diff --git a/core/Cargo.toml b/core/Cargo.toml index b88de207a9cb..18713d881456 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -135,6 +135,7 @@ services-s3 = [ "reqsign?/services-aws", "reqsign?/reqwest_request", ] +services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:bb8"] services-sled = ["dep:sled"] services-supabase = [] services-wasabi = [ @@ -176,6 +177,8 @@ metrics = { version = "0.20", optional = true } minitrace = { version = "0.4.0", optional = true } moka = { version = "0.10", optional = true, features = ["future"] } once_cell = "1" +openssh = { version = "0.9.9", optional = true } +openssh-sftp-client = { version = "0.12.2", optional = true } opentelemetry = { version = "0.19.0", optional = true } parking_lot = "0.12" percent-encoding = "2" diff --git a/core/README.md b/core/README.md index 8bbce48e84ea..ca758fa8e806 100644 --- a/core/README.md +++ b/core/README.md @@ -36,6 +36,7 @@ - [redis](https://docs.rs/opendal/latest/opendal/services/struct.Redis.html): [Redis](https://redis.io/) services support. - [rocksdb](https://docs.rs/opendal/latest/opendal/services/struct.Rocksdb.html): [RocksDB](http://rocksdb.org/) services support. - [s3](https://docs.rs/opendal/latest/opendal/services/struct.S3.html): [AWS S3](https://aws.amazon.com/s3/) alike services. +- [sftp](https://docs.rs/opendal/latest/opendal/services/struct.Sftp.html): [SFTP](https://datatracker.ietf.org/doc/html/draft-ietf-secsh-filexfer-02) services support. - [sled](https://docs.rs/opendal/latest/opendal/services/sled/struct.Sled.html): [sled](https://crates.io/crates/sled) services support. - [webdav](https://docs.rs/opendal/latest/opendal/services/struct.Webdav.html): [WebDAV](https://datatracker.ietf.org/doc/html/rfc4918) Service Support. - [webhdfs](https://docs.rs/opendal/latest/opendal/services/struct.Webhdfs.html): [WebHDFS](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html) Service Support. diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs index 323389806609..33ad34a6046c 100644 --- a/core/src/services/ftp/backend.rs +++ b/core/src/services/ftp/backend.rs @@ -60,10 +60,10 @@ use crate::*; /// /// # Configuration /// -/// - `endpoint`: set the endpoint for connection +/// - `endpoint`: Set the endpoint for connection /// - `root`: Set the work directory for backend -/// - `credential`: login credentials -/// - `tls`: tls mode +/// - `user`: Set the login user +/// - `password`: Set the login password /// /// You can refer to [`FtpBuilder`]'s docs for more information /// diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index 026219868a96..d08135b06306 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -114,6 +114,11 @@ mod s3; #[cfg(feature = "services-s3")] pub use s3::S3; +#[cfg(feature = "services-sftp")] +mod sftp; +#[cfg(feature = "services-sftp")] +pub use sftp::Sftp; + #[cfg(feature = "services-sled")] mod sled; #[cfg(feature = "services-sled")] diff --git a/core/src/services/sftp/backend.rs b/core/src/services/sftp/backend.rs new file mode 100644 index 000000000000..a23b362b2578 --- /dev/null +++ b/core/src/services/sftp/backend.rs @@ -0,0 +1,391 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cmp::min; +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::io::SeekFrom; +use std::sync::Arc; + +use async_compat::Compat; +use async_trait::async_trait; +use bb8::PooledConnection; +use bb8::RunError; +use futures::AsyncSeekExt; +use log::debug; +use openssh::SessionBuilder; +use openssh::Stdio; +use openssh_sftp_client::file::TokioCompatFile; +use openssh_sftp_client::Sftp; +use tokio::sync::OnceCell; + +use super::error::parse_io_error; +use super::error::SftpError; +use super::utils::SftpReader; +//use super::utils::SftpReader; +use super::writer::SftpWriter; +use crate::ops::*; +use crate::raw::oio::into_reader::FdReader; +use crate::raw::oio::ReadExt; +use crate::raw::*; +use crate::*; + +/// SFTP services support. (only works on unix) +/// +/// # Capabilities +/// +/// This service can be used to: +/// +/// - [x] read +/// - [x] write +/// - [x] list +/// - [ ] ~~scan~~ +/// - [ ] ~~presign~~ +/// - [ ] blocking +/// +/// # Configuration +/// +/// - `endpoint`: Set the endpoint for connection +/// - `root`: Set the work directory for backend +/// - `user`: Set the login user +/// - `key`: Set the public key for login +/// +/// It doesn't support password login, you can use public key instead. +/// +/// You can refer to [`SftpBuilder`]'s docs for more information +/// +/// # Example +/// +/// ## Via Builder +/// +/// ```no_run +/// use anyhow::Result; +/// use opendal::services::Ftp; +/// use opendal::Object; +/// use opendal::Operator; +/// +/// #[tokio::main] +/// async fn main() -> Result<()> { +/// // create backend builder +/// let mut builder = Sftp::default(); +/// +/// builder.endpoint("127.0.0.1").user("test").password("test"); +/// +/// let op: Operator = Operator::new(builder)?.finish(); +/// let _obj: Object = op.object("test_file"); +/// Ok(()) +/// } +/// ``` + +#[derive(Default)] +pub struct SftpBuilder { + endpoint: Option, + root: Option, + user: Option, + key: Option, +} + +impl Debug for SftpBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Builder") + .field("endpoint", &self.endpoint) + .field("root", &self.root) + .finish() + } +} + +impl SftpBuilder { + /// set endpoint for sftp backend. + pub fn endpoint(&mut self, endpoint: &str) -> &mut Self { + self.endpoint = if endpoint.is_empty() { + None + } else { + Some(endpoint.to_string()) + }; + + self + } + + /// set root path for sftp backend. + pub fn root(&mut self, root: &str) -> &mut Self { + self.root = if root.is_empty() { + None + } else { + Some(root.to_string()) + }; + + self + } + + /// set user for sftp backend. + pub fn user(&mut self, user: &str) -> &mut Self { + self.user = if user.is_empty() { + None + } else { + Some(user.to_string()) + }; + + self + } + + /// set key path for sftp backend. + pub fn key(&mut self, key: &str) -> &mut Self { + self.key = if key.is_empty() { + None + } else { + Some(key.to_string()) + }; + + self + } +} + +impl Builder for SftpBuilder { + const SCHEME: Scheme = Scheme::Sftp; + type Accessor = SftpBackend; + + fn build(&mut self) -> Result { + debug!("sftp backend build started: {:?}", &self); + let endpoint = match self.endpoint.clone() { + None => return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")), + Some(v) => v, + }; + + let root = normalize_root(&self.root.clone().unwrap_or_default()); + + let user = match &self.user { + None => "".to_string(), + Some(v) => v.clone(), + }; + + debug!("sftp backend finished: {:?}", &self); + + Ok(SftpBackend { + endpoint, + root, + user, + key: self.key.clone(), + sftp: OnceCell::new(), + }) + } + + fn from_map(map: HashMap) -> Self { + let mut builder = SftpBuilder::default(); + + map.get("root").map(|v| builder.root(v)); + map.get("endpoint").map(|v| builder.endpoint(v)); + map.get("user").map(|v| builder.user(v)); + map.get("key").map(|v| builder.key(v)); + + builder + } +} + +#[derive(Clone)] +pub struct Manager { + endpoint: String, + root: String, + user: String, + key: Option, +} + +#[async_trait] +impl bb8::ManageConnection for Manager { + type Connection = Sftp; + type Error = SftpError; + + async fn connect(&self) -> std::result::Result { + let mut session = SessionBuilder::default(); + + session.user(self.user.clone()); + + if let Some(key) = &self.key { + session.keyfile(key); + } + + let session = session.connect(self.endpoint.clone()).await?; + + let mut child = session + .subsystem("sftp") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn() + .await?; + + let sftp = Sftp::new( + child.stdin().take().unwrap(), + child.stdout().take().unwrap(), + Default::default(), + ) + .await?; + + Ok(sftp) + } + + async fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> { + conn.fs().metadata(".").await?; + Ok(()) + } + + /// Always allow reuse conn. + fn has_broken(&self, _: &mut Self::Connection) -> bool { + false + } +} + +/// Backend is used to serve `Accessor` support for sftp. +pub struct SftpBackend { + endpoint: String, + root: String, + user: String, + key: Option, + sftp: OnceCell>, +} + +impl Debug for SftpBackend { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Backend").finish() + } +} + +impl Clone for SftpBackend { + fn clone(&self) -> Self { + Self { + endpoint: self.endpoint.clone(), + root: self.root.clone(), + user: self.user.clone(), + key: self.key.clone(), + sftp: OnceCell::new(), + } + } +} + +#[async_trait] +impl Accessor for SftpBackend { + type Reader = SftpReader; + type BlockingReader = (); + type Writer = SftpWriter; + type BlockingWriter = (); + type Pager = (); + type BlockingPager = (); + + fn info(&self) -> AccessorInfo { + let mut am = AccessorInfo::default(); + am.set_scheme(Scheme::Sftp) + .set_root(&self.root) + .set_capability(Capability { + read: true, + write: true, + // list: true, + ..Default::default() + }); + + am + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let client = self.sftp_connect().await?; + let mut file = client.open(path).await?; + + let total_length = file.metadata().await?.len().ok_or(Error::new( + ErrorKind::NotFound, + format!("file not found: {}", path).as_str(), + ))?; + + let br = args.range(); + let (start, end) = match (br.offset(), br.size()) { + // Read a specific range. + (Some(offset), Some(size)) => (offset, min(offset + size, total_length)), + // Read from offset. + (Some(offset), None) => (offset, total_length), + // Read the last size bytes. + (None, Some(size)) => ( + if total_length > size { + total_length - size + } else { + 0 + }, + total_length, + ), + // Read the whole file. + (None, None) => (0, total_length), + }; + + let mut r = SftpReader::new(self.clone(), path, start, end - start); + + Ok((RpRead::new(end - start), r)) + } + + async fn write(&self, path: &str, _args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + Ok(( + RpWrite::new(), + SftpWriter::new(self.sftp_connect().await?, path.to_owned()), + )) + } +} + +impl SftpBackend { + pub async fn sftp_connect(&self) -> Result> { + let pool = self + .sftp + .get_or_try_init(|| async { + let manager = Manager { + endpoint: self.endpoint.clone(), + root: self.root.clone(), + user: self.user.clone(), + key: self.key.clone(), + }; + + bb8::Pool::builder().max_size(10).build(manager).await + }) + .await?; + + let conn = pool.get_owned().await?; + + Ok(conn) + } + /* + async fn sftp_connect(&self) -> Result { + let mut session = SessionBuilder::default(); + + session.user(self.user.clone()); + + if let Some(key) = &self.key { + session.keyfile(key); + } + + let session = session.connect(self.endpoint.clone()).await?; + + let mut child = session + .subsystem("sftp") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn() + .await?; + + let sftp = Sftp::new( + child.stdin().take().unwrap(), + child.stdout().take().unwrap(), + Default::default(), + ) + .await?; + + Ok(sftp) + } + */ +} diff --git a/core/src/services/sftp/error.rs b/core/src/services/sftp/error.rs new file mode 100644 index 000000000000..5dc4296e679f --- /dev/null +++ b/core/src/services/sftp/error.rs @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::io; + +use bb8::RunError; +use openssh::Error as SshError; +use openssh_sftp_client::Error as SftpClientError; + +use crate::{Error, ErrorKind}; + +use super::backend::Manager; + +#[derive(Debug)] +pub enum SftpError { + SftpClientError(SftpClientError), + SshError(SshError), +} + +impl From for Error { + fn from(e: SftpClientError) -> Self { + let kind = match e { + SftpClientError::UnsupportedSftpProtocol { version: _ } => ErrorKind::Unsupported, + _ => ErrorKind::Unexpected, + }; + + let err = Error::new(kind, "sftp error").set_source(e); + + err + } +} + +impl From for Error { + fn from(e: SshError) -> Self { + let kind = ErrorKind::Unexpected; // todo + + let err = Error::new(kind, "ssh error").set_source(e); + + err + } +} + +impl From for SftpError { + fn from(e: SftpClientError) -> Self { + SftpError::SftpClientError(e) + } +} + +impl From for SftpError { + fn from(e: SshError) -> Self { + SftpError::SshError(e) + } +} + +impl From for Error { + fn from(e: SftpError) -> Self { + match e { + SftpError::SftpClientError(e) => e.into(), + SftpError::SshError(e) => e.into(), + } + } +} + +impl From> for Error { + fn from(e: RunError) -> Self { + match e { + RunError::User(err) => err.into(), + RunError::TimedOut => { + Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary() + } + } + } +} + +/// Parse all io related errors. +pub fn parse_io_error(err: io::Error) -> Error { + use io::ErrorKind::*; + + let (kind, retryable) = match err.kind() { + NotFound => (ErrorKind::NotFound, false), + PermissionDenied => (ErrorKind::PermissionDenied, false), + Interrupted | UnexpectedEof | TimedOut | WouldBlock => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, true), + }; + + let mut err = Error::new(kind, &err.kind().to_string()).set_source(err); + + if retryable { + err = err.set_temporary(); + } + + err +} diff --git a/core/src/services/sftp/mod.rs b/core/src/services/sftp/mod.rs new file mode 100644 index 000000000000..37b6ad057c76 --- /dev/null +++ b/core/src/services/sftp/mod.rs @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub use backend::SftpBuilder as Sftp; + +mod backend; +mod utils; +mod error; +mod pager; +mod writer; diff --git a/core/src/services/sftp/pager.rs b/core/src/services/sftp/pager.rs new file mode 100644 index 000000000000..6e778edd7530 --- /dev/null +++ b/core/src/services/sftp/pager.rs @@ -0,0 +1,16 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. \ No newline at end of file diff --git a/core/src/services/sftp/utils.rs b/core/src/services/sftp/utils.rs new file mode 100644 index 000000000000..53b10c354f35 --- /dev/null +++ b/core/src/services/sftp/utils.rs @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::io; +use std::io::SeekFrom; +use std::path::PathBuf; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; + +use bb8::Pool; +use bb8::PooledConnection; +use futures::executor::block_on; +use futures::ready; +use openssh_sftp_client::file::File; +use openssh_sftp_client::file::TokioCompatFile; +use openssh_sftp_client::metadata::MetaData as SftpMeta; +use openssh_sftp_client::Sftp; +use tokio::io::AsyncBufRead; +use tokio::io::AsyncRead; +use tokio::io::AsyncSeek; + +use super::backend::Manager; +use super::backend::SftpBackend; +use super::error::parse_io_error; +use crate::raw::oio; +use crate::EntryMode; +use crate::Error; +use crate::ErrorKind; +use crate::Metadata; +use crate::Result; + +pub struct SftpReader { + backend: SftpBackend, + path: PathBuf, + offset: u64, + size: u64, +} + +unsafe impl Sync for SftpReader {} + +impl SftpReader { + pub fn new(backend: SftpBackend, path: &str, offset: u64, size: u64) -> Self { + SftpReader { + backend, + path: path.into(), + offset, + size, + } + } +} + +impl oio::Read for SftpReader { + fn poll_read(&mut self, cx: &mut Context, buf: &mut [u8]) -> Poll> { + let client = block_on(self.backend.sftp_connect())?; + let mut file = TokioCompatFile::from(block_on(client.open(self.path.clone()))?); + + if self.offset > 0 { + Pin::new(&mut file) + .start_seek(SeekFrom::Start(self.offset)) + .map_err(parse_io_error)?; + ready!(Pin::new(&mut file).poll_complete(cx)).map_err(parse_io_error)?; + } + + let mut buf = tokio::io::ReadBuf::new(buf); + ready!(Pin::new(&mut file).poll_read(cx, &mut buf)).map_err(parse_io_error)?; + Poll::Ready(Ok(buf.filled().len())) + } + + fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll> { + let (_, _) = (cx, pos); + + Poll::Ready(Err(Error::new( + ErrorKind::Unsupported, + "output reader doesn't support seeking", + ))) + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + // let mut file = TokioCompatFile::from(self.backend.open(self.path).map_err(parse_io_error)?); + + // let mut buf = bytes::BytesMut::with_capacity(1024 * 1024); + // let n = futures::ready!(Pin::new(&mut file).poll_read(cx, &mut buf))?; + + // if n == 0 { + // Poll::Ready(None) + // } else { + // buf.truncate(n); + // Poll::Ready(Some(Ok(buf.freeze()))) + // } + todo!() + } +} + +impl From for Metadata { + fn from(meta: SftpMeta) -> Self { + let mode = meta + .file_type() + .map(|filetype| { + if filetype.is_file() { + EntryMode::FILE + } else if filetype.is_dir() { + EntryMode::DIR + } else { + EntryMode::Unknown + } + }) + .unwrap_or(EntryMode::Unknown); + + let metadata = Metadata::new(mode); + + // if let Some(modified) = meta.modified() { + // metadata.set_last_modified(modified.into()); + // } + + metadata + } +} diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs new file mode 100644 index 000000000000..200c4b3c24a8 --- /dev/null +++ b/core/src/services/sftp/writer.rs @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use bb8::PooledConnection; +use bytes::Bytes; +use openssh_sftp_client::Sftp; + +use crate::raw::oio; +use crate::Result; +use super::backend::Manager; + +pub struct SftpWriter { + backend: PooledConnection<'static, Manager>, + path: String, +} + +impl<'a> SftpWriter { + pub fn new(backend: PooledConnection<'static, Manager>, path: String) -> Self { + SftpWriter { backend, path } + } +} + +#[async_trait] +impl oio::Write for SftpWriter { + async fn write(&mut self, bs: Bytes) -> Result<()> { + self.backend.fs().write(&self.path, bs.as_ref()).await?; + + Ok(()) + } + + async fn abort(&mut self) -> Result<()> { + Ok(()) + } + + async fn close(&mut self) -> Result<()> { + Ok(()) + } +} diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 5019d22e39a0..7e52ab84ea0d 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -71,6 +71,8 @@ pub enum Scheme { Rocksdb, /// [s3][crate::services::S3]: AWS S3 alike services. S3, + /// [sftp][crate::services::Sftp]: SFTP services + Sftp, /// [sled][crate::services::Sled]: Sled services Sled, /// [Supabase][crate::services::Supabase]: Supabase storage service @@ -166,6 +168,7 @@ impl From for &'static str { Scheme::Redis => "redis", Scheme::Rocksdb => "rocksdb", Scheme::S3 => "s3", + Scheme::Sftp => "sftp", Scheme::Sled => "sled", Scheme::Supabase => "supabase", Scheme::Oss => "oss", diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs index c9038d05e8f5..991b0a6c9f58 100644 --- a/core/tests/behavior/main.rs +++ b/core/tests/behavior/main.rs @@ -110,6 +110,7 @@ cfg_if::cfg_if! { if #[cfg(feature = "services-redis")] { behavior_tests!(Redis) cfg_if::cfg_if! { if #[cfg(feature = "services-rocksdb")] { behavior_tests!(Rocksdb); }} behavior_tests!(Oss); behavior_tests!(S3); +cfg_if::cfg_if! { if #[cfg(feature = "services-sftp")] { behavior_tests!(Sftp); }} cfg_if::cfg_if! { if #[cfg(feature = "services-sled")] { behavior_tests!(Sled); }} behavior_tests!(Webdav); behavior_tests!(Webhdfs); From d7a0ef89f14a5cfe5e473089e3c83cdb88e26bb2 Mon Sep 17 00:00:00 2001 From: salvor Date: Sun, 30 Apr 2023 16:22:40 +0800 Subject: [PATCH 2/7] complete sftp basic skeleton --- Cargo.lock | 10 ++ core/Cargo.toml | 8 +- core/src/services/sftp/backend.rs | 172 ++++++++++++++++++------------ core/src/services/sftp/error.rs | 24 ----- core/src/services/sftp/pager.rs | 45 +++++++- core/src/services/sftp/utils.rs | 88 +++++---------- core/src/services/sftp/writer.rs | 16 +-- 7 files changed, 198 insertions(+), 165 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cfbc891d73dc..1ffaeeb7e404 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2540,6 +2540,7 @@ dependencies = [ "openssh-sftp-client", "opentelemetry 0.19.0", "opentelemetry-jaeger", + "owning_ref", "parking_lot 0.12.1", "paste", "percent-encoding", @@ -2891,6 +2892,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "owning_ref" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ff55baddef9e4ad00f88b6c743a2a8062d4c6ade126c2a528644b8e444d52ce" +dependencies = [ + "stable_deref_trait", +] + [[package]] name = "parking" version = "2.0.0" diff --git a/core/Cargo.toml b/core/Cargo.toml index 18713d881456..5dc6170c4690 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -135,7 +135,12 @@ services-s3 = [ "reqsign?/services-aws", "reqsign?/reqwest_request", ] -services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:bb8"] +services-sftp = [ + "dep:openssh", + "dep:openssh-sftp-client", + "dep:bb8", + "dep:owning_ref", +] services-sled = ["dep:sled"] services-supabase = [] services-wasabi = [ @@ -180,6 +185,7 @@ once_cell = "1" openssh = { version = "0.9.9", optional = true } openssh-sftp-client = { version = "0.12.2", optional = true } opentelemetry = { version = "0.19.0", optional = true } +owning_ref = { version = "0.4.1", optional = true } parking_lot = "0.12" percent-encoding = "2" pin-project = "1" diff --git a/core/src/services/sftp/backend.rs b/core/src/services/sftp/backend.rs index a23b362b2578..1be2c8ab9d63 100644 --- a/core/src/services/sftp/backend.rs +++ b/core/src/services/sftp/backend.rs @@ -19,29 +19,24 @@ use std::cmp::min; use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; -use std::io::SeekFrom; -use std::sync::Arc; -use async_compat::Compat; use async_trait::async_trait; use bb8::PooledConnection; -use bb8::RunError; -use futures::AsyncSeekExt; +use futures::executor::block_on; use log::debug; +use openssh::RemoteChild; +use openssh::Session; use openssh::SessionBuilder; use openssh::Stdio; -use openssh_sftp_client::file::TokioCompatFile; use openssh_sftp_client::Sftp; +use owning_ref::OwningHandle; use tokio::sync::OnceCell; -use super::error::parse_io_error; use super::error::SftpError; +use super::pager::SftpPager; use super::utils::SftpReader; -//use super::utils::SftpReader; use super::writer::SftpWriter; use crate::ops::*; -use crate::raw::oio::into_reader::FdReader; -use crate::raw::oio::ReadExt; use crate::raw::*; use crate::*; @@ -199,14 +194,22 @@ impl Builder for SftpBuilder { #[derive(Clone)] pub struct Manager { endpoint: String, - root: String, user: String, key: Option, } +pub struct Connection { + // the remote child owns the ref to session, so we need to use owning handle + // safety explanation will be talked about deeply in the future + // todo: add safety explanation + // Related: https://stackoverflow.com/a/47260399 + child: OwningHandle, Box>>, + pub sftp: Sftp, +} + #[async_trait] impl bb8::ManageConnection for Manager { - type Connection = Sftp; + type Connection = Connection; type Error = SftpError; async fn connect(&self) -> std::result::Result { @@ -220,25 +223,33 @@ impl bb8::ManageConnection for Manager { let session = session.connect(self.endpoint.clone()).await?; - let mut child = session - .subsystem("sftp") - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .spawn() - .await?; + let sess = Box::new(session); + let mut oref = OwningHandle::new_with_fn(sess, unsafe { + |x| { + Box::new( + block_on( + (*x).subsystem("sftp") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn(), + ) + .unwrap(), + ) + } + }); let sftp = Sftp::new( - child.stdin().take().unwrap(), - child.stdout().take().unwrap(), + oref.stdin().take().unwrap(), + oref.stdout().take().unwrap(), Default::default(), ) .await?; - Ok(sftp) + Ok(Connection { child: oref, sftp }) } async fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> { - conn.fs().metadata(".").await?; + conn.child.session().check().await?; Ok(()) } @@ -263,25 +274,13 @@ impl Debug for SftpBackend { } } -impl Clone for SftpBackend { - fn clone(&self) -> Self { - Self { - endpoint: self.endpoint.clone(), - root: self.root.clone(), - user: self.user.clone(), - key: self.key.clone(), - sftp: OnceCell::new(), - } - } -} - #[async_trait] impl Accessor for SftpBackend { type Reader = SftpReader; type BlockingReader = (); type Writer = SftpWriter; type BlockingWriter = (); - type Pager = (); + type Pager = SftpPager; type BlockingPager = (); fn info(&self) -> AccessorInfo { @@ -289,22 +288,41 @@ impl Accessor for SftpBackend { am.set_scheme(Scheme::Sftp) .set_root(&self.root) .set_capability(Capability { + stat: true, read: true, write: true, - // list: true, + list: true, + list_with_limit: true, + ..Default::default() }); am } + async fn create_dir(&self, path: &str, _: OpCreate) -> Result { + let client = self.sftp_connect().await?; + + let mut fs = client.sftp.fs(); + fs.set_cwd(self.root.clone()); + + fs.create_dir(path).await?; + + return Ok(RpCreate::default()); + } + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { let client = self.sftp_connect().await?; - let mut file = client.open(path).await?; + + let mut fs = client.sftp.fs(); + fs.set_cwd(self.root.clone()); + let path = fs.canonicalize(path).await?; + + let mut file = client.sftp.open(path.as_path()).await?; let total_length = file.metadata().await?.len().ok_or(Error::new( ErrorKind::NotFound, - format!("file not found: {}", path).as_str(), + format!("file not found: {}", path.to_str().unwrap()).as_str(), ))?; let br = args.range(); @@ -326,7 +344,7 @@ impl Accessor for SftpBackend { (None, None) => (0, total_length), }; - let mut r = SftpReader::new(self.clone(), path, start, end - start); + let r = SftpReader::new(self.sftp_connect().await?, path, start, end); Ok((RpRead::new(end - start), r)) } @@ -334,7 +352,51 @@ impl Accessor for SftpBackend { async fn write(&self, path: &str, _args: OpWrite) -> Result<(RpWrite, Self::Writer)> { Ok(( RpWrite::new(), - SftpWriter::new(self.sftp_connect().await?, path.to_owned()), + SftpWriter::new( + self.sftp_connect().await?, + path.to_owned(), + self.root.clone(), + ), + )) + } + + async fn stat(&self, path: &str, _: OpStat) -> Result { + let client = self.sftp_connect().await?; + + let mut fs = client.sftp.fs(); + fs.set_cwd(self.root.clone()); + + let meta = fs.metadata(path).await?; + + Ok(RpStat::new(meta.into())) + } + + async fn delete(&self, path: &str, _: OpDelete) -> Result { + let client = self.sftp_connect().await?; + + let mut fs = client.sftp.fs(); + fs.set_cwd(self.root.clone()); + + if path.ends_with('/') { + fs.remove_dir(path).await? + } else { + fs.remove_file(path).await? + }; + + Ok(RpDelete::default()) + } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { + let client = self.sftp_connect().await?; + + let mut fs = client.sftp.fs(); + fs.set_cwd(self.root.clone()); + + let dir = fs.open_dir(path).await?.read_dir().await?; + + Ok(( + RpList::default(), + SftpPager::new(dir, args.limit()), )) } } @@ -346,7 +408,6 @@ impl SftpBackend { .get_or_try_init(|| async { let manager = Manager { endpoint: self.endpoint.clone(), - root: self.root.clone(), user: self.user.clone(), key: self.key.clone(), }; @@ -359,33 +420,4 @@ impl SftpBackend { Ok(conn) } - /* - async fn sftp_connect(&self) -> Result { - let mut session = SessionBuilder::default(); - - session.user(self.user.clone()); - - if let Some(key) = &self.key { - session.keyfile(key); - } - - let session = session.connect(self.endpoint.clone()).await?; - - let mut child = session - .subsystem("sftp") - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .spawn() - .await?; - - let sftp = Sftp::new( - child.stdin().take().unwrap(), - child.stdout().take().unwrap(), - Default::default(), - ) - .await?; - - Ok(sftp) - } - */ } diff --git a/core/src/services/sftp/error.rs b/core/src/services/sftp/error.rs index 5dc4296e679f..f625332fb93c 100644 --- a/core/src/services/sftp/error.rs +++ b/core/src/services/sftp/error.rs @@ -15,16 +15,12 @@ // specific language governing permissions and limitations // under the License. -use std::io; - use bb8::RunError; use openssh::Error as SshError; use openssh_sftp_client::Error as SftpClientError; use crate::{Error, ErrorKind}; -use super::backend::Manager; - #[derive(Debug)] pub enum SftpError { SftpClientError(SftpClientError), @@ -85,23 +81,3 @@ impl From> for Error { } } } - -/// Parse all io related errors. -pub fn parse_io_error(err: io::Error) -> Error { - use io::ErrorKind::*; - - let (kind, retryable) = match err.kind() { - NotFound => (ErrorKind::NotFound, false), - PermissionDenied => (ErrorKind::PermissionDenied, false), - Interrupted | UnexpectedEof | TimedOut | WouldBlock => (ErrorKind::Unexpected, true), - _ => (ErrorKind::Unexpected, true), - }; - - let mut err = Error::new(kind, &err.kind().to_string()).set_source(err); - - if retryable { - err = err.set_temporary(); - } - - err -} diff --git a/core/src/services/sftp/pager.rs b/core/src/services/sftp/pager.rs index 6e778edd7530..d40ddeb8534f 100644 --- a/core/src/services/sftp/pager.rs +++ b/core/src/services/sftp/pager.rs @@ -13,4 +13,47 @@ // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations -// under the License. \ No newline at end of file +// under the License. + +use async_trait::async_trait; +use openssh_sftp_client::fs::{DirEntry, ReadDir}; + +use crate::raw::oio; +use crate::Result; + +pub struct SftpPager { + dir: ReadDir, + limit: Option, +} + +impl SftpPager { + pub fn new(dir: ReadDir, limit: Option) -> Self { + Self { dir, limit } + } +} + +#[async_trait] +impl oio::Page for SftpPager { + async fn next(&mut self) -> Result>> { + let v: Vec = if let Some(limit) = self.limit { + self.dir.clone().into_iter().map(|e| e.into()).take(limit).collect() + } else { + self.dir.clone().into_iter().map(|e| e.into()).collect() + }; + + if v.len() == 0 { + Ok(None) + } else { + Ok(Some(v)) + } + } +} + +impl From for oio::Entry { + fn from(value: DirEntry) -> Self { + oio::Entry::new( + value.filename().as_os_str().to_str().unwrap(), + value.metadata().into(), + ) + } +} diff --git a/core/src/services/sftp/utils.rs b/core/src/services/sftp/utils.rs index 53b10c354f35..16a45b278de2 100644 --- a/core/src/services/sftp/utils.rs +++ b/core/src/services/sftp/utils.rs @@ -15,95 +15,61 @@ // specific language governing permissions and limitations // under the License. -use std::io; use std::io::SeekFrom; use std::path::PathBuf; use std::pin::Pin; -use std::sync::Arc; use std::task::Context; use std::task::Poll; -use bb8::Pool; +use async_compat::Compat; use bb8::PooledConnection; use futures::executor::block_on; -use futures::ready; -use openssh_sftp_client::file::File; use openssh_sftp_client::file::TokioCompatFile; use openssh_sftp_client::metadata::MetaData as SftpMeta; -use openssh_sftp_client::Sftp; -use tokio::io::AsyncBufRead; -use tokio::io::AsyncRead; -use tokio::io::AsyncSeek; +use owning_ref::OwningHandle; use super::backend::Manager; -use super::backend::SftpBackend; -use super::error::parse_io_error; use crate::raw::oio; use crate::EntryMode; -use crate::Error; -use crate::ErrorKind; use crate::Metadata; use crate::Result; pub struct SftpReader { - backend: SftpBackend, - path: PathBuf, - offset: u64, - size: u64, + // todo: add safety explanation + file: OwningHandle< + Box>, + Box>>>, + >, } -unsafe impl Sync for SftpReader {} - impl SftpReader { - pub fn new(backend: SftpBackend, path: &str, offset: u64, size: u64) -> Self { - SftpReader { - backend, - path: path.into(), - offset, - size, - } + pub fn new( + conn: PooledConnection<'static, Manager>, + path: PathBuf, + start: u64, + end: u64, + ) -> Self { + let file = OwningHandle::new_with_fn(Box::new(conn), |conn| unsafe { + let file = block_on((*conn).sftp.open(path)).unwrap(); + let f = Compat::new(TokioCompatFile::from(file)); + Box::new(oio::into_reader::from_fd(f, start, end)) + }); + + SftpReader { file } } } impl oio::Read for SftpReader { fn poll_read(&mut self, cx: &mut Context, buf: &mut [u8]) -> Poll> { - let client = block_on(self.backend.sftp_connect())?; - let mut file = TokioCompatFile::from(block_on(client.open(self.path.clone()))?); - - if self.offset > 0 { - Pin::new(&mut file) - .start_seek(SeekFrom::Start(self.offset)) - .map_err(parse_io_error)?; - ready!(Pin::new(&mut file).poll_complete(cx)).map_err(parse_io_error)?; - } - - let mut buf = tokio::io::ReadBuf::new(buf); - ready!(Pin::new(&mut file).poll_read(cx, &mut buf)).map_err(parse_io_error)?; - Poll::Ready(Ok(buf.filled().len())) + Pin::new(&mut *self.file).poll_read(cx, buf) } fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll> { - let (_, _) = (cx, pos); - - Poll::Ready(Err(Error::new( - ErrorKind::Unsupported, - "output reader doesn't support seeking", - ))) + Pin::new(&mut *self.file).poll_seek(cx, pos) } fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - // let mut file = TokioCompatFile::from(self.backend.open(self.path).map_err(parse_io_error)?); - - // let mut buf = bytes::BytesMut::with_capacity(1024 * 1024); - // let n = futures::ready!(Pin::new(&mut file).poll_read(cx, &mut buf))?; - - // if n == 0 { - // Poll::Ready(None) - // } else { - // buf.truncate(n); - // Poll::Ready(Some(Ok(buf.freeze()))) - // } - todo!() + Pin::new(&mut *self.file).poll_next(cx) } } @@ -122,11 +88,11 @@ impl From for Metadata { }) .unwrap_or(EntryMode::Unknown); - let metadata = Metadata::new(mode); + let mut metadata = Metadata::new(mode); - // if let Some(modified) = meta.modified() { - // metadata.set_last_modified(modified.into()); - // } + if let Some(modified) = meta.modified() { + metadata.set_last_modified(modified.as_system_time().into()); + } metadata } diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index 200c4b3c24a8..4ef41a6e6765 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -15,32 +15,32 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; - use async_trait::async_trait; use bb8::PooledConnection; use bytes::Bytes; -use openssh_sftp_client::Sftp; +use super::backend::Manager; use crate::raw::oio; use crate::Result; -use super::backend::Manager; pub struct SftpWriter { - backend: PooledConnection<'static, Manager>, + conn: PooledConnection<'static, Manager>, path: String, + root: String, } impl<'a> SftpWriter { - pub fn new(backend: PooledConnection<'static, Manager>, path: String) -> Self { - SftpWriter { backend, path } + pub fn new(conn: PooledConnection<'static, Manager>, path: String, root: String) -> Self { + SftpWriter { conn, path, root } } } #[async_trait] impl oio::Write for SftpWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { - self.backend.fs().write(&self.path, bs.as_ref()).await?; + let mut fs = self.conn.sftp.fs(); + fs.set_cwd(self.root.clone()); + fs.write(&self.path, bs.as_ref()).await?; Ok(()) } From c072a8b3cbb0e06b9466cd3cde6bfc9214dc694f Mon Sep 17 00:00:00 2001 From: salvor Date: Mon, 1 May 2023 16:12:55 +0800 Subject: [PATCH 3/7] fix some buf && pass alomost all tests --- core/src/services/sftp/backend.rs | 139 ++++++++++++++++++++++++------ core/src/services/sftp/error.rs | 27 +++++- core/src/services/sftp/pager.rs | 70 ++++++++++++--- core/src/services/sftp/utils.rs | 18 ++-- core/src/services/sftp/writer.rs | 18 ++-- 5 files changed, 218 insertions(+), 54 deletions(-) diff --git a/core/src/services/sftp/backend.rs b/core/src/services/sftp/backend.rs index 1be2c8ab9d63..fe71d5bf8ad7 100644 --- a/core/src/services/sftp/backend.rs +++ b/core/src/services/sftp/backend.rs @@ -32,6 +32,8 @@ use openssh_sftp_client::Sftp; use owning_ref::OwningHandle; use tokio::sync::OnceCell; +use super::error::is_not_found; +use super::error::is_sftp_protocol_error; use super::error::SftpError; use super::pager::SftpPager; use super::utils::SftpReader; @@ -56,7 +58,7 @@ use crate::*; /// # Configuration /// /// - `endpoint`: Set the endpoint for connection -/// - `root`: Set the work directory for backend +/// - `root`: Set the work directory for backend, default to `/home/$USER/` /// - `user`: Set the login user /// - `key`: Set the public key for login /// @@ -157,17 +159,21 @@ impl Builder for SftpBuilder { fn build(&mut self) -> Result { debug!("sftp backend build started: {:?}", &self); let endpoint = match self.endpoint.clone() { - None => return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")), Some(v) => v, + None => return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")), }; - let root = normalize_root(&self.root.clone().unwrap_or_default()); - - let user = match &self.user { - None => "".to_string(), - Some(v) => v.clone(), + let user = match self.user.clone() { + Some(v) => v, + None => return Err(Error::new(ErrorKind::ConfigInvalid, "user is empty")), }; + let root = self + .root + .clone() + .map(|r| normalize_root(r.as_str())) + .unwrap_or(format!("/home/{}/", user)); + debug!("sftp backend finished: {:?}", &self); Ok(SftpBackend { @@ -285,15 +291,15 @@ impl Accessor for SftpBackend { fn info(&self) -> AccessorInfo { let mut am = AccessorInfo::default(); - am.set_scheme(Scheme::Sftp) - .set_root(&self.root) + am.set_root(self.root.as_str()) + .set_scheme(Scheme::Sftp) .set_capability(Capability { stat: true, read: true, write: true, list: true, list_with_limit: true, - + ..Default::default() }); @@ -302,11 +308,26 @@ impl Accessor for SftpBackend { async fn create_dir(&self, path: &str, _: OpCreate) -> Result { let client = self.sftp_connect().await?; - let mut fs = client.sftp.fs(); fs.set_cwd(self.root.clone()); - fs.create_dir(path).await?; + let paths: Vec<&str> = path.split_inclusive('/').collect(); + let mut current = self.root.clone(); + for p in paths { + if p.is_empty() { + continue; + } + + current.push_str(p); + let res = fs.create_dir(p).await; + + if let Err(e) = res { + if !is_sftp_protocol_error(&e) { + return Err(e.into()); + } + } + fs.set_cwd(current.clone()); + } return Ok(RpCreate::default()); } @@ -344,25 +365,33 @@ impl Accessor for SftpBackend { (None, None) => (0, total_length), }; - let r = SftpReader::new(self.sftp_connect().await?, path, start, end); + let r = SftpReader::new(self.sftp_connect_owned().await?, path, start, end).await?; Ok((RpRead::new(end - start), r)) } - async fn write(&self, path: &str, _args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + if args.content_length().is_none() { + return Err(Error::new( + ErrorKind::Unsupported, + "write without content length is not supported", + )); + } + + if let Some((dir, _)) = path.rsplit_once("/") { + self.create_dir(dir, OpCreate::default()).await?; + } + + let path = format!("{}{}", self.root, path); + Ok(( RpWrite::new(), - SftpWriter::new( - self.sftp_connect().await?, - path.to_owned(), - self.root.clone(), - ), + SftpWriter::new(self.sftp_connect_owned().await?, path), )) } async fn stat(&self, path: &str, _: OpStat) -> Result { let client = self.sftp_connect().await?; - let mut fs = client.sftp.fs(); fs.set_cwd(self.root.clone()); @@ -378,9 +407,44 @@ impl Accessor for SftpBackend { fs.set_cwd(self.root.clone()); if path.ends_with('/') { - fs.remove_dir(path).await? + let file_path = format!("./{}", path); + let dir = match fs.open_dir(file_path.clone()).await { + Ok(dir) => dir, + Err(e) => { + if is_not_found(&e) { + return Ok(RpDelete::default()); + } else { + return Err(e.into()); + } + } + } + .read_dir() + .await?; + + for file in &dir { + let file_name = file.filename().to_str().unwrap(); + if file_name == "." || file_name == ".." { + continue; + } + let file_path = format!("{}{}", path, file_name); + self.delete(file_path.as_str(), OpDelete::default()).await?; + } + + if let Err(e) = fs.remove_dir(path).await { + if is_not_found(&e) { + return Ok(RpDelete::default()); + } else { + return Err(e.into()); + } + } } else { - fs.remove_file(path).await? + if let Err(e) = fs.remove_file(path).await { + if is_not_found(&e) { + return Ok(RpDelete::default()); + } else { + return Err(e.into()); + } + } }; Ok(RpDelete::default()) @@ -388,21 +452,32 @@ impl Accessor for SftpBackend { async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { let client = self.sftp_connect().await?; - let mut fs = client.sftp.fs(); fs.set_cwd(self.root.clone()); - let dir = fs.open_dir(path).await?.read_dir().await?; + let file_path = format!("./{}", path); + + let mut dir = match fs.open_dir(file_path.clone()).await { + Ok(dir) => dir, + Err(e) => { + if is_not_found(&e) { + return Ok((RpList::default(), SftpPager::empty())); + } else { + return Err(e.into()); + } + } + }; + let dir = dir.read_dir().await?; Ok(( RpList::default(), - SftpPager::new(dir, args.limit()), + SftpPager::new(dir.into_inner(), path.to_owned(), args.limit()), )) } } impl SftpBackend { - pub async fn sftp_connect(&self) -> Result> { + async fn pool(&self) -> Result<&bb8::Pool> { let pool = self .sftp .get_or_try_init(|| async { @@ -416,7 +491,17 @@ impl SftpBackend { }) .await?; - let conn = pool.get_owned().await?; + Ok(pool) + } + + pub async fn sftp_connect(&self) -> Result> { + let conn = self.pool().await?.get().await?; + + Ok(conn) + } + + pub async fn sftp_connect_owned(&self) -> Result> { + let conn = self.pool().await?.get_owned().await?; Ok(conn) } diff --git a/core/src/services/sftp/error.rs b/core/src/services/sftp/error.rs index f625332fb93c..78fc112941a7 100644 --- a/core/src/services/sftp/error.rs +++ b/core/src/services/sftp/error.rs @@ -17,7 +17,7 @@ use bb8::RunError; use openssh::Error as SshError; -use openssh_sftp_client::Error as SftpClientError; +use openssh_sftp_client::{error::SftpErrorKind, Error as SftpClientError}; use crate::{Error, ErrorKind}; @@ -29,8 +29,14 @@ pub enum SftpError { impl From for Error { fn from(e: SftpClientError) -> Self { - let kind = match e { + let kind = match &e { SftpClientError::UnsupportedSftpProtocol { version: _ } => ErrorKind::Unsupported, + SftpClientError::SftpError(kind, _msg) => match kind { + SftpErrorKind::NoSuchFile => ErrorKind::NotFound, + SftpErrorKind::PermDenied => ErrorKind::PermissionDenied, + SftpErrorKind::OpUnsupported => ErrorKind::Unsupported, + _ => ErrorKind::Unexpected, + }, _ => ErrorKind::Unexpected, }; @@ -81,3 +87,20 @@ impl From> for Error { } } } + +pub(super) fn is_not_found(e: &SftpClientError) -> bool { + match e { + SftpClientError::SftpError(kind, _msg) => match kind { + SftpErrorKind::NoSuchFile => true, + _ => false, + }, + _ => false, + } +} + +pub(super) fn is_sftp_protocol_error(e: &SftpClientError) -> bool { + match e { + SftpClientError::SftpError(_kind, _msg) => true, + _ => false, + } +} diff --git a/core/src/services/sftp/pager.rs b/core/src/services/sftp/pager.rs index d40ddeb8534f..de27ee24c913 100644 --- a/core/src/services/sftp/pager.rs +++ b/core/src/services/sftp/pager.rs @@ -16,31 +16,65 @@ // under the License. use async_trait::async_trait; -use openssh_sftp_client::fs::{DirEntry, ReadDir}; +use openssh_sftp_client::fs::DirEntry; use crate::raw::oio; use crate::Result; pub struct SftpPager { - dir: ReadDir, + dir: Box<[DirEntry]>, + path: String, limit: Option, + complete: bool, } impl SftpPager { - pub fn new(dir: ReadDir, limit: Option) -> Self { - Self { dir, limit } + pub fn new(dir: Box<[DirEntry]>, path: String, limit: Option) -> Self { + Self { + dir, + path, + limit, + complete: false, + } + } + + pub fn empty() -> Self { + Self { + dir: Box::new([]), + path: String::new(), + limit: None, + complete: true, + } } } #[async_trait] impl oio::Page for SftpPager { async fn next(&mut self) -> Result>> { + if self.complete { + return Ok(None); + } + + if self.path == "/" { + self.path = "".to_owned(); + } + + let iter = self + .dir + .iter() + .filter(|e| { + e.filename().to_str().unwrap() != "." && e.filename().to_str().unwrap() != ".." + }) + .map(|e| map_entry(self.path.clone(), e.clone())); + let v: Vec = if let Some(limit) = self.limit { - self.dir.clone().into_iter().map(|e| e.into()).take(limit).collect() + iter.take(limit).collect() } else { - self.dir.clone().into_iter().map(|e| e.into()).collect() + iter.collect() }; + self.complete = true; + if v.len() == 0 { Ok(None) } else { @@ -49,11 +83,23 @@ impl oio::Page for SftpPager { } } -impl From for oio::Entry { - fn from(value: DirEntry) -> Self { - oio::Entry::new( - value.filename().as_os_str().to_str().unwrap(), - value.metadata().into(), - ) +fn map_entry(prefix: String, value: DirEntry) -> oio::Entry { + if value.filename().to_str().unwrap() == "file-71" { + println!("map_entry: {:?}", value); } + + oio::Entry::new( + format!( + "{}{}{}", + prefix, + value.filename().to_str().unwrap(), + if value.file_type().unwrap().is_dir() { + "/" + } else { + "" + } + ) + .as_str(), + value.metadata().into(), + ) } diff --git a/core/src/services/sftp/utils.rs b/core/src/services/sftp/utils.rs index 16a45b278de2..f90c32bff646 100644 --- a/core/src/services/sftp/utils.rs +++ b/core/src/services/sftp/utils.rs @@ -30,6 +30,8 @@ use owning_ref::OwningHandle; use super::backend::Manager; use crate::raw::oio; +use crate::raw::oio::into_reader::FdReader; +use crate::raw::oio::ReadExt; use crate::EntryMode; use crate::Metadata; use crate::Result; @@ -38,24 +40,26 @@ pub struct SftpReader { // todo: add safety explanation file: OwningHandle< Box>, - Box>>>, + Box>>>, >, } impl SftpReader { - pub fn new( + pub async fn new( conn: PooledConnection<'static, Manager>, path: PathBuf, start: u64, end: u64, - ) -> Self { - let file = OwningHandle::new_with_fn(Box::new(conn), |conn| unsafe { + ) -> Result { + let mut file = OwningHandle::new_with_fn(Box::new(conn), |conn| unsafe { let file = block_on((*conn).sftp.open(path)).unwrap(); let f = Compat::new(TokioCompatFile::from(file)); Box::new(oio::into_reader::from_fd(f, start, end)) }); - SftpReader { file } + file.seek(SeekFrom::Start(0)).await?; + + Ok(SftpReader { file }) } } @@ -90,6 +94,10 @@ impl From for Metadata { let mut metadata = Metadata::new(mode); + if let Some(size) = meta.len() { + metadata.set_content_length(size); + } + if let Some(modified) = meta.modified() { metadata.set_last_modified(modified.as_system_time().into()); } diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index 4ef41a6e6765..c394e51fcd61 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -21,32 +21,34 @@ use bytes::Bytes; use super::backend::Manager; use crate::raw::oio; -use crate::Result; +use crate::{Error, ErrorKind, Result}; pub struct SftpWriter { conn: PooledConnection<'static, Manager>, path: String, - root: String, } impl<'a> SftpWriter { - pub fn new(conn: PooledConnection<'static, Manager>, path: String, root: String) -> Self { - SftpWriter { conn, path, root } + pub fn new(conn: PooledConnection<'static, Manager>, path: String) -> Self { + SftpWriter { conn, path } } } #[async_trait] impl oio::Write for SftpWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { - let mut fs = self.conn.sftp.fs(); - fs.set_cwd(self.root.clone()); - fs.write(&self.path, bs.as_ref()).await?; + let mut file = self.conn.sftp.create(&self.path).await?; + + file.write_all(&bs).await?; Ok(()) } async fn abort(&mut self) -> Result<()> { - Ok(()) + Err(Error::new( + ErrorKind::Unsupported, + "SFTP does not support aborting writes", + )) } async fn close(&mut self) -> Result<()> { From c06f2a3fc11ac6423b36f8f7b5b99e3fd2a8fb38 Mon Sep 17 00:00:00 2001 From: salvor Date: Mon, 1 May 2023 16:22:21 +0800 Subject: [PATCH 4/7] typo --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1ffaeeb7e404..ae8f0cc82e76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2430,7 +2430,7 @@ name = "oay" version = "0.33.2" dependencies = [ "anyhow", - "clap 4.1.11", + "clap 4.2.5", "dirs 5.0.0", "env_logger", "futures", @@ -2480,7 +2480,7 @@ version = "0.33.2" dependencies = [ "anyhow", "assert_cmd", - "clap 4.1.11", + "clap 4.2.5", "dirs 5.0.0", "env_logger", "futures", From 71b5ec7e8571b36d9c9362d95797bb0896290669 Mon Sep 17 00:00:00 2001 From: salvor Date: Mon, 1 May 2023 16:56:09 +0800 Subject: [PATCH 5/7] add some comments --- core/src/services/sftp/backend.rs | 5 +++-- core/src/services/sftp/error.rs | 2 +- core/src/services/sftp/pager.rs | 30 +++++++++++++----------------- core/src/services/sftp/utils.rs | 3 ++- 4 files changed, 19 insertions(+), 21 deletions(-) diff --git a/core/src/services/sftp/backend.rs b/core/src/services/sftp/backend.rs index fe71d5bf8ad7..229781d48e09 100644 --- a/core/src/services/sftp/backend.rs +++ b/core/src/services/sftp/backend.rs @@ -206,8 +206,8 @@ pub struct Manager { pub struct Connection { // the remote child owns the ref to session, so we need to use owning handle - // safety explanation will be talked about deeply in the future - // todo: add safety explanation + // The session will only create one child, so we can make sure the child can live + // as long as the session. (the session will be dropped when the connection is dropped) // Related: https://stackoverflow.com/a/47260399 child: OwningHandle, Box>>, pub sftp: Sftp, @@ -322,6 +322,7 @@ impl Accessor for SftpBackend { let res = fs.create_dir(p).await; if let Err(e) = res { + // ignore error if dir already exists if !is_sftp_protocol_error(&e) { return Err(e.into()); } diff --git a/core/src/services/sftp/error.rs b/core/src/services/sftp/error.rs index 78fc112941a7..9e1155ea9913 100644 --- a/core/src/services/sftp/error.rs +++ b/core/src/services/sftp/error.rs @@ -48,7 +48,7 @@ impl From for Error { impl From for Error { fn from(e: SshError) -> Self { - let kind = ErrorKind::Unexpected; // todo + let kind = ErrorKind::Unexpected; let err = Error::new(kind, "ssh error").set_source(e); diff --git a/core/src/services/sftp/pager.rs b/core/src/services/sftp/pager.rs index de27ee24c913..2f3fd57ec928 100644 --- a/core/src/services/sftp/pager.rs +++ b/core/src/services/sftp/pager.rs @@ -55,6 +55,7 @@ impl oio::Page for SftpPager { return Ok(None); } + // when listing the root directory, the prefix should be empty if self.path == "/" { self.path = "".to_owned(); } @@ -63,6 +64,7 @@ impl oio::Page for SftpPager { .dir .iter() .filter(|e| { + // filter out "." and ".." e.filename().to_str().unwrap() != "." && e.filename().to_str().unwrap() != ".." }) .map(|e| map_entry(self.path.clone(), e.clone())); @@ -84,22 +86,16 @@ impl oio::Page for SftpPager { } fn map_entry(prefix: String, value: DirEntry) -> oio::Entry { - if value.filename().to_str().unwrap() == "file-71" { - println!("map_entry: {:?}", value); - } + let path = format!( + "{}{}{}", + prefix, + value.filename().to_str().unwrap(), + if value.file_type().unwrap().is_dir() { + "/" + } else { + "" + } + ); - oio::Entry::new( - format!( - "{}{}{}", - prefix, - value.filename().to_str().unwrap(), - if value.file_type().unwrap().is_dir() { - "/" - } else { - "" - } - ) - .as_str(), - value.metadata().into(), - ) + oio::Entry::new(path.as_str(), value.metadata().into()) } diff --git a/core/src/services/sftp/utils.rs b/core/src/services/sftp/utils.rs index f90c32bff646..91387aad03b0 100644 --- a/core/src/services/sftp/utils.rs +++ b/core/src/services/sftp/utils.rs @@ -37,7 +37,8 @@ use crate::Metadata; use crate::Result; pub struct SftpReader { - // todo: add safety explanation + // similar situation to connection struct + // We can make sure the file can live as long as the connection. file: OwningHandle< Box>, Box>>>, From 77f0ec5602348b978590c07d3aca7f9773490eb7 Mon Sep 17 00:00:00 2001 From: salvor Date: Mon, 1 May 2023 17:02:54 +0800 Subject: [PATCH 6/7] fix format --- core/src/services/sftp/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/services/sftp/mod.rs b/core/src/services/sftp/mod.rs index 37b6ad057c76..d854429bc7a0 100644 --- a/core/src/services/sftp/mod.rs +++ b/core/src/services/sftp/mod.rs @@ -18,7 +18,7 @@ pub use backend::SftpBuilder as Sftp; mod backend; -mod utils; mod error; mod pager; +mod utils; mod writer; From 1d185a7e419c313dddef712ca540175dfb71ddc5 Mon Sep 17 00:00:00 2001 From: salvor Date: Mon, 1 May 2023 17:43:22 +0800 Subject: [PATCH 7/7] fix clippy warning --- core/src/services/sftp/backend.rs | 16 +++++++--------- core/src/services/sftp/error.rs | 23 ++++------------------- core/src/services/sftp/pager.rs | 2 +- core/src/services/sftp/writer.rs | 2 +- 4 files changed, 13 insertions(+), 30 deletions(-) diff --git a/core/src/services/sftp/backend.rs b/core/src/services/sftp/backend.rs index 229781d48e09..4ae80d7f5b9b 100644 --- a/core/src/services/sftp/backend.rs +++ b/core/src/services/sftp/backend.rs @@ -379,7 +379,7 @@ impl Accessor for SftpBackend { )); } - if let Some((dir, _)) = path.rsplit_once("/") { + if let Some((dir, _)) = path.rsplit_once('/') { self.create_dir(dir, OpCreate::default()).await?; } @@ -431,20 +431,18 @@ impl Accessor for SftpBackend { self.delete(file_path.as_str(), OpDelete::default()).await?; } - if let Err(e) = fs.remove_dir(path).await { - if is_not_found(&e) { - return Ok(RpDelete::default()); - } else { + match fs.remove_dir(path).await { + Err(e) if !is_not_found(&e) => { return Err(e.into()); } + _ => {} } } else { - if let Err(e) = fs.remove_file(path).await { - if is_not_found(&e) { - return Ok(RpDelete::default()); - } else { + match fs.remove_file(path).await { + Err(e) if !is_not_found(&e) => { return Err(e.into()); } + _ => {} } }; diff --git a/core/src/services/sftp/error.rs b/core/src/services/sftp/error.rs index 9e1155ea9913..0e7dc2ef4573 100644 --- a/core/src/services/sftp/error.rs +++ b/core/src/services/sftp/error.rs @@ -40,19 +40,13 @@ impl From for Error { _ => ErrorKind::Unexpected, }; - let err = Error::new(kind, "sftp error").set_source(e); - - err + Error::new(kind, "sftp error").set_source(e) } } impl From for Error { fn from(e: SshError) -> Self { - let kind = ErrorKind::Unexpected; - - let err = Error::new(kind, "ssh error").set_source(e); - - err + Error::new(ErrorKind::Unexpected, "ssh error").set_source(e) } } @@ -89,18 +83,9 @@ impl From> for Error { } pub(super) fn is_not_found(e: &SftpClientError) -> bool { - match e { - SftpClientError::SftpError(kind, _msg) => match kind { - SftpErrorKind::NoSuchFile => true, - _ => false, - }, - _ => false, - } + matches!(e, SftpClientError::SftpError(SftpErrorKind::NoSuchFile, _)) } pub(super) fn is_sftp_protocol_error(e: &SftpClientError) -> bool { - match e { - SftpClientError::SftpError(_kind, _msg) => true, - _ => false, - } + matches!(e, SftpClientError::SftpError(_, _)) } diff --git a/core/src/services/sftp/pager.rs b/core/src/services/sftp/pager.rs index 2f3fd57ec928..e6b557c250f6 100644 --- a/core/src/services/sftp/pager.rs +++ b/core/src/services/sftp/pager.rs @@ -77,7 +77,7 @@ impl oio::Page for SftpPager { self.complete = true; - if v.len() == 0 { + if v.is_empty() { Ok(None) } else { Ok(Some(v)) diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index c394e51fcd61..2fa5215d1307 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -28,7 +28,7 @@ pub struct SftpWriter { path: String, } -impl<'a> SftpWriter { +impl SftpWriter { pub fn new(conn: PooledConnection<'static, Manager>, path: String) -> Self { SftpWriter { conn, path } }