From 0d7a0f5aefb1fca9f77868131af0a307ce3e511e Mon Sep 17 00:00:00 2001 From: Young-Flash <871946895@qq.com> Date: Sun, 15 Oct 2023 15:52:31 +0800 Subject: [PATCH] refactor(oay): import dav-server-opendalfs --- Cargo.lock | 1 + bin/oay/Cargo.toml | 3 +- bin/oay/src/services/webdav/mod.rs | 6 +- bin/oay/src/services/webdav/service.rs | 8 +- .../src/services/webdav/webdav_dir_entry.rs | 54 ----- bin/oay/src/services/webdav/webdav_file.rs | 94 -------- .../src/services/webdav/webdav_metadata.rs | 62 ----- bin/oay/src/services/webdav/webdavfs.rs | 220 ------------------ 8 files changed, 8 insertions(+), 440 deletions(-) delete mode 100644 bin/oay/src/services/webdav/webdav_dir_entry.rs delete mode 100644 bin/oay/src/services/webdav/webdav_file.rs delete mode 100644 bin/oay/src/services/webdav/webdav_metadata.rs delete mode 100644 bin/oay/src/services/webdav/webdavfs.rs diff --git a/Cargo.lock b/Cargo.lock index 601e740cdb81..8c328a97a3e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3829,6 +3829,7 @@ dependencies = [ "chrono", "clap 4.3.10", "dav-server", + "dav-server-opendalfs", "dirs 5.0.1", "futures", "futures-util", diff --git a/bin/oay/Cargo.toml b/bin/oay/Cargo.toml index 4e3f57d83565..d8df51753920 100644 --- a/bin/oay/Cargo.toml +++ b/bin/oay/Cargo.toml @@ -33,7 +33,7 @@ version.workspace = true default = ["frontends-webdav", "frontends-s3"] frontends-s3 = [] -frontends-webdav = ["dep:dav-server", "dep:bytes", "dep:futures-util"] +frontends-webdav = ["dep:dav-server", "dep:dav-server-opendalfs", "dep:bytes", "dep:futures-util"] [dependencies] anyhow = "1" @@ -42,6 +42,7 @@ bytes = { version = "1.4.0", optional = true } chrono = "0.4.28" clap = { version = "4", features = ["cargo", "string"] } dav-server = { version = "0.5.5", optional = true } +dav-server-opendalfs = { path = "../../integrations/dav-server", optional = true } dirs = "5.0.0" futures = "0.3" futures-util = { version = "0.3.16", optional = true } diff --git a/bin/oay/src/services/webdav/mod.rs b/bin/oay/src/services/webdav/mod.rs index e432889a8cb5..bcf580055efe 100644 --- a/bin/oay/src/services/webdav/mod.rs +++ b/bin/oay/src/services/webdav/mod.rs @@ -16,8 +16,4 @@ // under the License. mod service; -mod webdav_dir_entry; -mod webdav_file; -mod webdav_metadata; -pub mod webdavfs; -pub use service::*; +pub use service::WebdavService; diff --git a/bin/oay/src/services/webdav/service.rs b/bin/oay/src/services/webdav/service.rs index 6a1bcf55269b..71bc31499e14 100644 --- a/bin/oay/src/services/webdav/service.rs +++ b/bin/oay/src/services/webdav/service.rs @@ -25,19 +25,19 @@ use axum::Router; use dav_server::DavHandler; use opendal::Operator; -use super::webdavfs::WebdavFs; use crate::Config; +use dav_server_opendalfs::OpendalFs; pub struct WebdavService { cfg: Arc, - webdavfs: Box, + opendalfs: Box, } impl WebdavService { pub fn new(cfg: Arc, op: Operator) -> Self { Self { cfg, - webdavfs: WebdavFs::new(op), + opendalfs: OpendalFs::new(op), } } @@ -45,7 +45,7 @@ impl WebdavService { let webdav_cfg = &self.cfg.frontends.webdav; let webdav_handler = DavHandler::builder() - .filesystem(self.webdavfs.clone()) + .filesystem(self.opendalfs.clone()) .build_handler(); let webdav_service = tower::service_fn(move |req: Request| { diff --git a/bin/oay/src/services/webdav/webdav_dir_entry.rs b/bin/oay/src/services/webdav/webdav_dir_entry.rs deleted file mode 100644 index c3836db3ef48..000000000000 --- a/bin/oay/src/services/webdav/webdav_dir_entry.rs +++ /dev/null @@ -1,54 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use dav_server::fs::DavDirEntry; -use futures::FutureExt; -use opendal::Entry; -use opendal::Operator; - -use super::webdav_file::convert_error; -use super::webdav_metadata::WebdavMetaData; - -pub struct WebDAVDirEntry { - dir_entry: Entry, - op: Operator, -} - -impl DavDirEntry for WebDAVDirEntry { - fn name(&self) -> Vec { - self.dir_entry.name().as_bytes().to_vec() - } - - fn metadata(&self) -> dav_server::fs::FsFuture> { - async move { - self.op - .stat(self.dir_entry.path()) - .await - .map(|metadata| { - Box::new(WebdavMetaData::new(metadata)) as Box - }) - .map_err(convert_error) - } - .boxed() - } -} - -impl WebDAVDirEntry { - pub fn new(dir_entry: Entry, op: Operator) -> Self { - WebDAVDirEntry { dir_entry, op } - } -} diff --git a/bin/oay/src/services/webdav/webdav_file.rs b/bin/oay/src/services/webdav/webdav_file.rs deleted file mode 100644 index 53e99c07fceb..000000000000 --- a/bin/oay/src/services/webdav/webdav_file.rs +++ /dev/null @@ -1,94 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::io::SeekFrom; - -use bytes::Bytes; -use dav_server::davpath::DavPath; -use dav_server::fs::DavFile; -use dav_server::fs::DavMetaData; -use dav_server::fs::FsFuture; -use dav_server::fs::OpenOptions; -use futures::FutureExt; -use opendal::Operator; - -use super::webdav_metadata::WebdavMetaData; - -#[derive(Debug)] -pub struct WebdavFile { - pub op: Operator, - pub path: DavPath, - pub options: OpenOptions, -} - -impl DavFile for WebdavFile { - fn read_bytes(&mut self, count: usize) -> FsFuture { - async move { - let file_path = self.path.as_url_string(); - self.op - .read_with(&file_path) - .range(0..count as u64) - .await - .map(Bytes::from) - .map_err(convert_error) - } - .boxed() - } - - fn metadata(&mut self) -> FsFuture> { - async move { - self.op - .stat(self.path.as_url_string().as_str()) - .await - .map(|opendal_metadata| { - Box::new(WebdavMetaData::new(opendal_metadata)) as Box - }) - .map_err(convert_error) - } - .boxed() - } - - fn write_buf(&mut self, buf: Box) -> FsFuture<()> { - self.write_bytes(Bytes::copy_from_slice(buf.chunk())) - } - - fn write_bytes(&mut self, buf: Bytes) -> FsFuture<()> { - async move { - let file_path = self.path.as_url_string(); - self.op.write(&file_path, buf).await.map_err(convert_error) - } - .boxed() - } - - fn seek(&mut self, _pos: SeekFrom) -> FsFuture { - futures_util::future::err(dav_server::fs::FsError::NotImplemented).boxed() - } - - fn flush(&mut self) -> FsFuture<()> { - futures_util::future::ok(()).boxed() - } -} - -pub fn convert_error(opendal_error: opendal::Error) -> dav_server::fs::FsError { - match opendal_error.kind() { - opendal::ErrorKind::AlreadyExists | opendal::ErrorKind::IsSameFile => { - dav_server::fs::FsError::Exists - } - opendal::ErrorKind::NotFound => dav_server::fs::FsError::NotFound, - _ => dav_server::fs::FsError::GeneralFailure, - } -} diff --git a/bin/oay/src/services/webdav/webdav_metadata.rs b/bin/oay/src/services/webdav/webdav_metadata.rs deleted file mode 100644 index 03295125d5ee..000000000000 --- a/bin/oay/src/services/webdav/webdav_metadata.rs +++ /dev/null @@ -1,62 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use dav_server::fs::DavMetaData; -use dav_server::fs::FsError; -use opendal::Metadata; - -#[derive(Debug, Clone)] -pub struct WebdavMetaData { - metadata: Metadata, -} - -impl WebdavMetaData { - pub fn new(metadata: Metadata) -> Self { - WebdavMetaData { metadata } - } -} - -impl DavMetaData for WebdavMetaData { - fn len(&self) -> u64 { - self.metadata.content_length() - } - - fn modified(&self) -> dav_server::fs::FsResult { - match self.metadata.last_modified() { - Some(t) => Ok(t.into()), - None => Err(FsError::GeneralFailure), - } - } - - fn is_dir(&self) -> bool { - self.metadata.is_dir() - } - - fn is_file(&self) -> bool { - self.metadata.is_file() - } - - fn etag(&self) -> Option { - self.metadata.etag().map(|s| s.to_string()) - } - - fn status_changed(&self) -> dav_server::fs::FsResult { - self.metadata - .last_modified() - .map_or(Err(FsError::GeneralFailure), |t| Ok(t.into())) - } -} diff --git a/bin/oay/src/services/webdav/webdavfs.rs b/bin/oay/src/services/webdav/webdavfs.rs deleted file mode 100644 index b4167dbaac81..000000000000 --- a/bin/oay/src/services/webdav/webdavfs.rs +++ /dev/null @@ -1,220 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::path::Path; -use std::pin::Pin; -use std::task::Poll::Pending; -use std::task::Poll::Ready; - -use dav_server::davpath::DavPath; -use dav_server::fs::DavDirEntry; -use dav_server::fs::DavFile; -use dav_server::fs::DavFileSystem; -use dav_server::fs::DavMetaData; -use dav_server::fs::FsError; -use futures::FutureExt; -use futures_util::Stream; -use futures_util::StreamExt; -use opendal::Lister; -use opendal::Operator; - -use super::webdav_file::convert_error; -use super::webdav_file::WebdavFile; -use super::webdav_metadata::WebdavMetaData; -use crate::services::webdav::webdav_dir_entry::WebDAVDirEntry; - -#[derive(Clone)] -pub struct WebdavFs { - pub op: Operator, -} - -impl DavFileSystem for WebdavFs { - fn open<'a>( - &'a self, - path: &'a dav_server::davpath::DavPath, - options: dav_server::fs::OpenOptions, - ) -> dav_server::fs::FsFuture> { - async move { - let file = WebdavFile { - op: self.op.clone(), - path: path.clone(), - options, - }; - Ok(Box::new(file) as Box) - } - .boxed() - } - - fn read_dir<'a>( - &'a self, - path: &'a dav_server::davpath::DavPath, - _meta: dav_server::fs::ReadDirMeta, - ) -> dav_server::fs::FsFuture>> - { - async move { - self.op - .lister(path.as_url_string().as_str()) - .await - .map(|lister| DavStream::new(self.op.clone(), lister).boxed()) - .map_err(convert_error) - } - .boxed() - } - - fn metadata<'a>( - &'a self, - path: &'a dav_server::davpath::DavPath, - ) -> dav_server::fs::FsFuture> { - async move { - let opendal_metadata = self.op.stat(path.as_url_string().as_str()).await; - match opendal_metadata { - Ok(metadata) => { - let webdav_metadata = WebdavMetaData::new(metadata); - Ok(Box::new(webdav_metadata) as Box) - } - Err(e) => Err(convert_error(e)), - } - } - .boxed() - } - - fn create_dir<'a>(&'a self, path: &'a DavPath) -> dav_server::fs::FsFuture<()> { - async move { - let path = path.as_url_string(); - - // check if the parent path is exist. - // During MKCOL processing, a server MUST make the Request-URI a member of its parent collection, unless the Request-URI is "/". If no such ancestor exists, the method MUST fail. - // refer to https://datatracker.ietf.org/doc/html/rfc2518#section-8.3.1 - let parent = Path::new(&path).parent().unwrap(); - match self.op.is_exist(parent.to_str().unwrap()).await { - Ok(exist) => { - if !exist && parent != Path::new("/") { - return Err(dav_server::fs::FsError::NotFound); - } - } - Err(e) => { - return Err(convert_error(e)); - } - } - - let path = path.as_str(); - // check if the given path is exist (MKCOL on existing collection should fail (RFC2518:8.3.1)) - let exist = self.op.is_exist(path).await; - match exist { - Ok(exist) => match exist { - true => Err(dav_server::fs::FsError::Exists), - false => { - let res = self.op.create_dir(path).await; - match res { - Ok(_) => Ok(()), - Err(e) => Err(convert_error(e)), - } - } - }, - Err(e) => Err(convert_error(e)), - } - } - .boxed() - } - - fn remove_file<'a>(&'a self, path: &'a DavPath) -> dav_server::fs::FsFuture<()> { - async move { - self.op - .delete(path.as_url_string().as_str()) - .await - .map_err(convert_error) - } - .boxed() - } - - fn remove_dir<'a>(&'a self, path: &'a DavPath) -> dav_server::fs::FsFuture<()> { - self.remove_file(path) - } - - fn copy<'a>(&'a self, from: &'a DavPath, to: &'a DavPath) -> dav_server::fs::FsFuture<()> { - async move { - let from_path = from - .as_rel_ospath() - .to_str() - .ok_or(FsError::GeneralFailure)?; - let to_path = to.as_rel_ospath().to_str().ok_or(FsError::GeneralFailure)?; - self.op - .copy(from_path, to_path) - .await - .map_err(convert_error) - } - .boxed() - } - - fn rename<'a>(&'a self, from: &'a DavPath, to: &'a DavPath) -> dav_server::fs::FsFuture<()> { - async move { - let from_path = from - .as_rel_ospath() - .to_str() - .ok_or(FsError::GeneralFailure)?; - let to_path = to.as_rel_ospath().to_str().ok_or(FsError::GeneralFailure)?; - if from.is_collection() { - let _ = self.remove_file(to).await; - } - self.op - .rename(from_path, to_path) - .await - .map_err(convert_error) - } - .boxed() - } -} - -impl WebdavFs { - pub fn new(op: Operator) -> Box { - Box::new(WebdavFs { op }) - } -} - -struct DavStream { - op: Operator, - lister: Lister, -} - -impl Stream for DavStream { - type Item = Box; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let dav_stream = self.get_mut(); - let lister = Pin::new(&mut dav_stream.lister).get_mut(); - - match Pin::new(lister).poll_next(cx) { - Ready(entry) => match entry { - Some(entry) => { - let webdav_entry = WebDAVDirEntry::new(entry.unwrap(), dav_stream.op.clone()); - Ready(Some(Box::new(webdav_entry) as Box)) - } - None => Ready(None), - }, - Pending => Pending, - } - } -} - -impl DavStream { - fn new(op: Operator, lister: Lister) -> Self { - DavStream { op, lister } - } -}