From 7952b47358359abb41d703f2bfe4e83892ab4138 Mon Sep 17 00:00:00 2001 From: Manjusaka Date: Tue, 16 May 2023 01:53:44 +0800 Subject: [PATCH] feat(services/dropbox): Support create/read/delete for Dropbox Signed-off-by: Manjusaka --- core/Cargo.toml | 1 + core/src/services/dropbox/backend.rs | 108 +++++++++++++++++++ core/src/services/dropbox/builder.rs | 149 ++++++++++++++++++++++++++ core/src/services/dropbox/core.rs | 150 +++++++++++++++++++++++++++ core/src/services/dropbox/error.rs | 79 ++++++++++++++ core/src/services/dropbox/mod.rs | 24 +++++ core/src/services/dropbox/writer.rs | 77 ++++++++++++++ core/src/services/mod.rs | 6 ++ core/src/types/scheme.rs | 3 + 9 files changed, 597 insertions(+) create mode 100644 core/src/services/dropbox/backend.rs create mode 100644 core/src/services/dropbox/builder.rs create mode 100644 core/src/services/dropbox/core.rs create mode 100644 core/src/services/dropbox/error.rs create mode 100644 core/src/services/dropbox/mod.rs create mode 100644 core/src/services/dropbox/writer.rs diff --git a/core/Cargo.toml b/core/Cargo.toml index fd5bcc264743..16fcf4417b07 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -118,6 +118,7 @@ services-gcs = [ "reqsign?/services-google", "reqsign?/reqwest_request", ] +services-dropbox = [] services-gdrive = [] services-ghac = [] services-hdfs = ["dep:hdrs"] diff --git a/core/src/services/dropbox/backend.rs b/core/src/services/dropbox/backend.rs new file mode 100644 index 000000000000..1c7784e1f87d --- /dev/null +++ b/core/src/services/dropbox/backend.rs @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use http::StatusCode; + +use std::{fmt::Debug, sync::Arc}; + +use super::{core::DropboxCore, error::parse_error, writer::DropboxWriter}; +use crate::{ + raw::{ + parse_into_metadata, Accessor, AccessorInfo, HttpClient, IncomingAsyncBody, OpDelete, + OpRead, OpWrite, RpDelete, RpRead, RpWrite, + }, + types::Result, + Capability, Error, ErrorKind, +}; + +#[derive(Clone, Debug)] +pub struct DropboxBackend { + core: Arc, +} + +impl DropboxBackend { + pub(crate) fn new(root: String, access_token: String, http_client: HttpClient) -> Self { + DropboxBackend { + core: Arc::new(DropboxCore { + token: access_token, + client: http_client, + root, + }), + } + } +} + +#[async_trait] +impl Accessor for DropboxBackend { + type Reader = IncomingAsyncBody; + type BlockingReader = (); + type Writer = DropboxWriter; + type BlockingWriter = (); + type Pager = (); + type BlockingPager = (); + type Appender = (); + + fn info(&self) -> AccessorInfo { + let mut ma = AccessorInfo::default(); + ma.set_scheme(crate::Scheme::Dropbox) + .set_root(&self.core.root) + .set_capability(Capability { + read: true, + write: true, + delete: true, + ..Default::default() + }); + ma + } + + async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> { + let resp = self.core.dropbox_get(path).await?; + let status = resp.status(); + match status { + StatusCode::OK => { + let meta = parse_into_metadata(path, resp.headers())?; + Ok((RpRead::with_metadata(meta), resp.into_body())) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + if args.content_length().is_none() { + return Err(Error::new( + ErrorKind::Unsupported, + "write without content length is not supported", + )); + } + Ok(( + RpWrite::default(), + DropboxWriter::new(self.core.clone(), args, String::from(path)), + )) + } + + async fn delete(&self, path: &str, _: OpDelete) -> Result { + let resp = self.core.dropbox_delete(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(RpDelete::default()), + _ => Err(parse_error(resp).await?), + } + } +} diff --git a/core/src/services/dropbox/builder.rs b/core/src/services/dropbox/builder.rs new file mode 100644 index 000000000000..10bb9cda8621 --- /dev/null +++ b/core/src/services/dropbox/builder.rs @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::{Debug, Formatter}; + +use std::collections::HashMap; + +use super::backend::DropboxBackend; +use crate::raw::{normalize_root, HttpClient}; +use crate::Scheme; +use crate::*; + +/// [Dropbox](https://www.dropbox.com/) backend support. +/// +/// # Capabilities +/// +/// This service can be used to: +/// +/// - [x] read +/// - [x] write +/// - [x] delete +/// - [ ] copy +/// - [ ] create +/// - [ ] list +/// - [ ] rename +/// +/// # Notes +/// +/// +/// # Configuration +/// +/// - `access_token`: set the access_token for google drive api +/// - `root`: Set the work directory for backend +/// +/// You can refer to [`DropboxBuilder`]'s docs for more information +/// +/// # Example +/// +/// ## Via Builder +/// +/// ``` +/// use anyhow::Result; +/// use opendal::services::Dropbox; +/// use opendal::Operator; +/// use opendal::raw::OpWrite; +/// +/// #[tokio::main] +/// async fn main() -> Result<()> { +/// // create backend builder +/// let mut builder = Dropbox::default(); +/// +/// builder.access_token("x").root("/"); +/// +/// let op: Operator = Operator::new(builder)?.finish(); +/// let content = "who are you"; +/// +/// +/// let write = op.write_with("abc2.txt", content) +/// .content_type("application/octet-stream") +/// .content_length(content.len() as u64).await?; +/// let read = op.read("abc2.txt").await?; +/// let s = String::from_utf8(read).unwrap(); +/// println!("{}", s); +/// let delete = op.delete("abc.txt").await?; +/// Ok(()) +/// } +/// ``` + +#[derive(Default)] +pub struct DropboxBuilder { + access_token: Option, + root: Option, + http_client: Option, +} + +impl Debug for DropboxBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Builder").finish() + } +} + +impl DropboxBuilder { + /// default: no access token, which leads to failure + pub fn access_token(&mut self, access_token: &str) -> &mut Self { + self.access_token = Some(access_token.to_string()); + self + } + + /// default: no root path, which leads to failure + pub fn root(&mut self, root: &str) -> &mut Self { + self.root = Some(root.to_string()); + self + } + + /// Specify the http client that used by this service. + /// + /// # Notes + /// + /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed + /// during minor updates. + pub fn http_client(&mut self, http_client: HttpClient) -> &mut Self { + self.http_client = Some(http_client); + self + } +} + +impl Builder for DropboxBuilder { + const SCHEME: Scheme = Scheme::Dropbox; + type Accessor = DropboxBackend; + + fn from_map(map: HashMap) -> Self { + let mut builder = Self::default(); + map.get("access_token").map(|v| builder.access_token(v)); + builder + } + + fn build(&mut self) -> Result { + let root = normalize_root(&self.root.take().unwrap_or_default()); + let client = if let Some(client) = self.http_client.take() { + client + } else { + HttpClient::new().map_err(|err| { + err.with_operation("Builder::build") + .with_context("service", Scheme::Dropbox) + })? + }; + match self.access_token.clone() { + Some(access_token) => Ok(DropboxBackend::new(root, access_token, client)), + None => Err(Error::new( + ErrorKind::ConfigInvalid, + "access_token is required", + )), + } + } +} diff --git a/core/src/services/dropbox/core.rs b/core/src/services/dropbox/core.rs new file mode 100644 index 000000000000..cdaa13e4cfad --- /dev/null +++ b/core/src/services/dropbox/core.rs @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::default::Default; +use std::fmt::Debug; +use std::fmt::Formatter; + +use crate::raw::{build_rooted_abs_path, new_json_serialize_error, HttpClient}; + +use bytes::Bytes; +use http::request::Builder; +use http::{header, Request, Response}; +use serde::{Deserialize, Serialize}; + +use crate::{ + raw::{new_request_build_error, AsyncBody, IncomingAsyncBody}, + types::Result, +}; + +pub struct DropboxCore { + pub token: String, + pub client: HttpClient, + pub root: String, +} + +impl Debug for DropboxCore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut de = f.debug_struct("DropboxCore"); + de.finish() + } +} + +impl DropboxCore { + pub async fn dropbox_get(&self, path: &str) -> Result> { + let url: String = "https://content.dropboxapi.com/2/files/download".to_string(); + let download_args = DropboxDownloadArgs { + path: build_rooted_abs_path(&self.root, path), + }; + let request_payload = serde_json::to_string(&download_args) + .map_err(new_json_serialize_error) + .unwrap(); + let request = self + .build_auth_header(Request::post(&url)) + .header("Dropbox-API-Arg", request_payload) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + self.client.send(request).await + } + + pub async fn dropbox_update( + &self, + path: &str, + size: Option, + content_type: Option<&str>, + body: AsyncBody, + ) -> Result> { + let url = "https://content.dropboxapi.com/2/files/upload".to_string(); + let args = DropboxUploadArgs { + path: build_rooted_abs_path(&self.root, path), + ..Default::default() + }; + let mut request_builder = Request::post(&url); + if let Some(size) = size { + request_builder = request_builder.header(header::CONTENT_LENGTH, size); + } + if let Some(mime) = content_type { + request_builder = request_builder.header(header::CONTENT_TYPE, mime); + } + let request = self + .build_auth_header(request_builder) + .header( + "Dropbox-API-Arg", + serde_json::to_string(&args) + .map_err(new_json_serialize_error) + .unwrap(), + ) + .body(body) + .map_err(new_request_build_error)?; + + self.client.send(request).await + } + + pub async fn dropbox_delete(&self, path: &str) -> Result> { + let url = "https://api.dropboxapi.com/2/files/delete_v2".to_string(); + let args = DropboxDeleteArgs { + path: build_rooted_abs_path(&self.root, path), + }; + let request = self + .build_auth_header(Request::post(&url)) + .header(header::CONTENT_TYPE, "application/json") + .body(AsyncBody::Bytes(Bytes::from( + serde_json::to_string(&args) + .map_err(new_json_serialize_error) + .unwrap(), + ))) + .map_err(new_request_build_error)?; + self.client.send(request).await + } + + fn build_auth_header(&self, mut req: Builder) -> Builder { + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + req + } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +struct DropboxDownloadArgs { + path: String, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +struct DropboxUploadArgs { + path: String, + mode: String, + mute: bool, + autorename: bool, + strict_conflict: bool, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +struct DropboxDeleteArgs { + path: String, +} + +impl Default for DropboxUploadArgs { + fn default() -> Self { + DropboxUploadArgs { + mode: "overwrite".to_string(), + path: "".to_string(), + mute: true, + autorename: false, + strict_conflict: false, + } + } +} diff --git a/core/src/services/dropbox/error.rs b/core/src/services/dropbox/error.rs new file mode 100644 index 000000000000..aa7981ba06e4 --- /dev/null +++ b/core/src/services/dropbox/error.rs @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use http::Response; +use http::StatusCode; +use serde::Deserialize; + +use crate::raw::*; +use crate::Error; +use crate::ErrorKind; +use crate::Result; + +#[derive(Default, Debug, Deserialize)] +#[serde(default)] +struct DropboxError { + error_summary: String, + error: DropboxErrorDetail, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(default)] +struct DropboxErrorDetail { + #[serde(rename(deserialize = ".tag"))] + tag: String, +} + +/// Parse error response into Error. +pub async fn parse_error(resp: Response) -> Result { + let (parts, body) = resp.into_parts(); + let bs = body.bytes().await?; + + let (kind, retryable) = match parts.status { + StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), + StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false), + StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, false), + }; + let dropbox_error = + serde_json::from_slice::(&bs).map_err(new_json_deserialize_error); + match dropbox_error { + Ok(dropbox_error) => { + let mut err = Error::new(kind, dropbox_error.error_summary.as_ref()) + .with_context("response", format!("{parts:?}")); + + if retryable { + err = err.set_temporary(); + } + + Ok(err) + } + Err(_err) => { + let mut err = Error::new(kind, &String::from_utf8_lossy(&bs)) + .with_context("response", format!("{parts:?}")); + + if retryable { + err = err.set_temporary(); + } + + Ok(err) + } + } +} diff --git a/core/src/services/dropbox/mod.rs b/core/src/services/dropbox/mod.rs new file mode 100644 index 000000000000..239974a75d8c --- /dev/null +++ b/core/src/services/dropbox/mod.rs @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; +mod builder; +mod core; +mod error; +mod writer; + +pub use builder::DropboxBuilder as Dropbox; diff --git a/core/src/services/dropbox/writer.rs b/core/src/services/dropbox/writer.rs new file mode 100644 index 000000000000..2f5e9755800f --- /dev/null +++ b/core/src/services/dropbox/writer.rs @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use http::StatusCode; + +use super::core::DropboxCore; +use super::error::parse_error; +use crate::raw::*; +use crate::*; + +pub struct DropboxWriter { + core: Arc, + op: OpWrite, + path: String, +} + +impl DropboxWriter { + pub fn new(core: Arc, op: OpWrite, path: String) -> Self { + DropboxWriter { core, op, path } + } +} + +#[async_trait] +impl oio::Write for DropboxWriter { + async fn write(&mut self, bs: Bytes) -> Result<()> { + let resp = self + .core + .dropbox_update( + &self.path, + Some(bs.len()), + self.op.content_type(), + AsyncBody::Bytes(bs), + ) + .await?; + let status = resp.status(); + match status { + StatusCode::OK => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + + async fn abort(&mut self) -> Result<()> { + Ok(()) + } + + async fn close(&mut self) -> Result<()> { + Ok(()) + } +} diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index 301214719663..37a1bd8a4076 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -156,6 +156,12 @@ pub use onedrive::Onedrive; mod gdrive; #[cfg(feature = "services-gdrive")] pub use gdrive::Gdrive; + +#[cfg(feature = "services-dropbox")] +mod dropbox; +#[cfg(feature = "services-dropbox")] +pub use dropbox::Dropbox; + #[cfg(feature = "services-webhdfs")] pub use webhdfs::Webhdfs; diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 07a80843f7d7..f08465099f06 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -68,6 +68,8 @@ pub enum Scheme { Onedrive, /// [gdrive][crate::services::Gdrive]: GoogleDrive services. Gdrive, + /// [dropbox][crate::services::Dropbox]: Dropbox services. + Dropbox, /// [oss][crate::services::Oss]: Aliyun Object Storage Services Oss, /// [redis][crate::services::Redis]: Redis services @@ -175,6 +177,7 @@ impl From for &'static str { Scheme::Obs => "obs", Scheme::Onedrive => "onedrive", Scheme::Gdrive => "gdrive", + Scheme::Dropbox => "dropbox", Scheme::Redis => "redis", Scheme::Rocksdb => "rocksdb", Scheme::S3 => "s3",