diff --git a/core/Cargo.toml b/core/Cargo.toml index b8bbd0e989c0..a8579cf4d1b5 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -49,6 +49,7 @@ default = [ "services-s3", "services-webdav", "services-webhdfs", + "services-onedrive", ] # Build docs or not. @@ -121,6 +122,7 @@ services-obs = [ "reqsign?/services-huaweicloud", "reqsign?/reqwest_request", ] +services-onedrive = [] services-oss = [ "dep:reqsign", "reqsign?/services-aliyun", diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index ab0aec0a816e..a74de6064e51 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -136,5 +136,11 @@ pub use webdav::Webdav; #[cfg(feature = "services-webhdfs")] mod webhdfs; + +#[cfg(feature = "services-onedrive")] +mod onedrive; +#[cfg(feature = "services-onedrive")] +pub use onedrive::Onedrive; + #[cfg(feature = "services-webhdfs")] pub use webhdfs::Webhdfs; diff --git a/core/src/services/onedrive/backend.rs b/core/src/services/onedrive/backend.rs new file mode 100644 index 000000000000..ce15965e44ff --- /dev/null +++ b/core/src/services/onedrive/backend.rs @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use http::{header, Request, Response, StatusCode}; +use std::fmt::Debug; + +use crate::{ + ops::{OpRead, OpWrite}, + raw::{ + build_rooted_abs_path, new_request_build_error, parse_into_metadata, parse_location, + percent_encode_path, Accessor, AccessorInfo, AsyncBody, HttpClient, IncomingAsyncBody, + RpRead, RpWrite, + }, + types::Result, + Capability, Error, ErrorKind, +}; + +use super::{error::parse_error, writer::OneDriveWriter}; + +#[derive(Clone)] +pub struct OnedriveBackend { + root: String, + access_token: String, + client: HttpClient, +} + +impl OnedriveBackend { + pub(crate) fn new(root: String, access_token: String, http_client: HttpClient) -> Self { + Self { + root, + access_token, + client: http_client, + } + } +} + +impl Debug for OnedriveBackend { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut de = f.debug_struct("OneDriveBackend"); + de.field("root", &self.root); + de.field("access_token", &self.access_token); + de.finish() + } +} + +#[async_trait] +impl Accessor for OnedriveBackend { + type Reader = IncomingAsyncBody; + type BlockingReader = (); + type Writer = OneDriveWriter; + type BlockingWriter = (); + type Pager = (); + type BlockingPager = (); + + fn info(&self) -> AccessorInfo { + let mut ma = AccessorInfo::default(); + ma.set_scheme(crate::Scheme::Onedrive) + .set_root(&self.root) + .set_capability(Capability { + read: true, + read_can_next: true, + write: true, + list: true, + copy: true, + rename: true, + ..Default::default() + }); + + ma + } + + async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> { + let resp = self.onedrive_get(path).await?; + + let status = resp.status(); + + if status.is_redirection() { + let headers = resp.headers(); + let location = parse_location(headers)?; + + match location { + None => { + return Err(Error::new( + ErrorKind::ContentIncomplete, + "redirect location not found in response", + )); + } + Some(location) => { + let resp = self.onedrive_get_redirection(location).await?; + let meta = parse_into_metadata(path, resp.headers())?; + Ok((RpRead::with_metadata(meta), resp.into_body())) + } + } + } else { + match status { + StatusCode::OK | StatusCode::PARTIAL_CONTENT => { + let meta = parse_into_metadata(path, resp.headers())?; + Ok((RpRead::with_metadata(meta), resp.into_body())) + } + + _ => Err(parse_error(resp).await?), + } + } + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + if args.content_length().is_none() { + return Err(Error::new( + ErrorKind::Unsupported, + "write without content length is not supported", + )); + } + + let path = build_rooted_abs_path(&self.root, path); + + Ok(( + RpWrite::default(), + OneDriveWriter::new(self.clone(), args, path), + )) + } +} + +impl OnedriveBackend { + async fn onedrive_get(&self, path: &str) -> Result> { + let path = build_rooted_abs_path(&self.root, path); + let url: String = format!( + "https://graph.microsoft.com/v1.0/me/drive/root:{}:/content", + percent_encode_path(&path), + ); + + let mut req = Request::get(&url); + + let auth_header_content = format!("Bearer {}", self.access_token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.client.send(req).await + } + + async fn onedrive_get_redirection(&self, url: &str) -> Result> { + let mut req = Request::get(url); + + let auth_header_content = format!("Bearer {}", self.access_token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn onedrive_put( + &self, + path: &str, + size: Option, + content_type: Option<&str>, + body: AsyncBody, + ) -> Result> { + let url = format!( + "https://graph.microsoft.com/v1.0/me/drive/root:{}:/content", + percent_encode_path(path) + ); + + let mut req = Request::put(&url); + + let auth_header_content = format!("Bearer {}", self.access_token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + if let Some(size) = size { + req = req.header(header::CONTENT_LENGTH, size) + } + + if let Some(mime) = content_type { + req = req.header(header::CONTENT_TYPE, mime) + } + + let req = req.body(body).map_err(new_request_build_error)?; + + self.client.send(req).await + } +} diff --git a/core/src/services/onedrive/builder.rs b/core/src/services/onedrive/builder.rs new file mode 100644 index 000000000000..b1d020aac9eb --- /dev/null +++ b/core/src/services/onedrive/builder.rs @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fmt::{Debug, Formatter}; + +use log::debug; + +use super::backend::OnedriveBackend; +use crate::raw::{normalize_root, HttpClient}; +use crate::Scheme; +use crate::*; + +/// [OneDrive](https://onedrive.com) backend support. +/// +/// # Capabilities +/// +/// This service can be used to: +/// +/// - [x] read +/// - [x] write +/// - [ ] copy +/// - [ ] rename +/// - [ ] list +/// - [ ] ~~scan~~ +/// - [ ] ~~presign~~ +/// - [ ] blocking +/// +/// # Notes +/// +/// Currently, only OneDrive Personal is supported. +/// For uploading, only files under 4MB are supported via the Simple Upload API (). +/// +/// # Configuration +/// +/// - `access_token`: set the access_token for Graph API +/// - `root`: Set the work directory for backend +/// +/// You can refer to [`OneDriveBuilder`]'s docs for more information +/// +/// # Example +/// +/// ## Via Builder +/// +/// ```no_run +/// use anyhow::Result; +/// use opendal::services::Onedrive; +/// use opendal::Operator; +/// +/// #[tokio::main] +/// async fn main() -> Result<()> { +/// // create backend builder +/// let mut builder = Onedrive::default(); +/// +/// builder +/// .access_token("xxx") +/// .root("/path/to/root"); +/// +/// let op: Operator = Operator::new(builder)?.finish(); +/// Ok(()) +/// } +/// ``` +#[derive(Default)] +pub struct OnedriveBuilder { + access_token: Option, + root: Option, + http_client: Option, +} + +impl Debug for OnedriveBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Backend").field("root", &self.root).finish() + } +} + +impl OnedriveBuilder { + /// set the bearer access token for OneDrive + /// + /// default: no access token, which leads to failure + pub fn access_token(&mut self, access_token: &str) -> &mut Self { + self.access_token = Some(access_token.to_string()); + self + } + + /// Set root path of OneDrive folder. + pub fn root(&mut self, root: &str) -> &mut Self { + self.root = Some(root.to_string()); + self + } + + /// Specify the http client that used by this service. + /// + /// # Notes + /// + /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed + /// during minor updates. + pub fn http_client(&mut self, http_client: HttpClient) -> &mut Self { + self.http_client = Some(http_client); + self + } +} + +impl Builder for OnedriveBuilder { + const SCHEME: Scheme = Scheme::Onedrive; + + type Accessor = OnedriveBackend; + + fn from_map(map: HashMap) -> Self { + let mut builder = Self::default(); + + map.get("root").map(|v| builder.root(v)); + map.get("access_token").map(|v| builder.access_token(v)); + + builder + } + + fn build(&mut self) -> Result { + let root = normalize_root(&self.root.take().unwrap_or_default()); + debug!("backend use root {}", root); + + let client = if let Some(client) = self.http_client.take() { + client + } else { + HttpClient::new().map_err(|err| { + err.with_operation("Builder::build") + .with_context("service", Scheme::Onedrive) + })? + }; + + match self.access_token.clone() { + Some(access_token) => Ok(OnedriveBackend::new(root, access_token, client)), + None => Err(Error::new(ErrorKind::ConfigInvalid, "access_token not set")), + } + } +} diff --git a/core/src/services/onedrive/error.rs b/core/src/services/onedrive/error.rs new file mode 100644 index 000000000000..6fe6b57d42aa --- /dev/null +++ b/core/src/services/onedrive/error.rs @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use http::Response; +use http::StatusCode; + +use crate::raw::*; +use crate::Error; +use crate::ErrorKind; +use crate::Result; + +/// Parse error response into Error. +pub async fn parse_error(resp: Response) -> Result { + let (parts, body) = resp.into_parts(); + let bs = body.bytes().await?; + + let (kind, retryable) = match parts.status { + StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), + StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false), + StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, false), + }; + + let mut err = Error::new(kind, &String::from_utf8_lossy(&bs)) + .with_context("response", format!("{parts:?}")); + + if retryable { + err = err.set_temporary(); + } + + Ok(err) +} diff --git a/core/src/services/onedrive/mod.rs b/core/src/services/onedrive/mod.rs new file mode 100644 index 000000000000..0b58fc8cc8f3 --- /dev/null +++ b/core/src/services/onedrive/mod.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; +mod builder; +mod error; + +pub use builder::OnedriveBuilder as Onedrive; +mod writer; diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs new file mode 100644 index 000000000000..4ac6062a4c75 --- /dev/null +++ b/core/src/services/onedrive/writer.rs @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use bytes::Bytes; +use http::StatusCode; + +use super::backend::OnedriveBackend; +use super::error::parse_error; +use crate::ops::OpWrite; +use crate::raw::*; +use crate::*; + +pub struct OneDriveWriter { + backend: OnedriveBackend, + + op: OpWrite, + path: String, +} + +impl OneDriveWriter { + pub fn new(backend: OnedriveBackend, op: OpWrite, path: String) -> Self { + OneDriveWriter { backend, op, path } + } +} + +#[async_trait] +impl oio::Write for OneDriveWriter { + async fn write(&mut self, bs: Bytes) -> Result<()> { + let resp = self + .backend + .onedrive_put( + &self.path, + Some(bs.len()), + self.op.content_type(), + AsyncBody::Bytes(bs), + ) + .await?; + + let status = resp.status(); + + match status { + // Typical response code: 201 Created + // Reference: https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_put_content?view=odsp-graph-online#response + StatusCode::CREATED => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn abort(&mut self) -> Result<()> { + Ok(()) + } + + async fn close(&mut self) -> Result<()> { + Ok(()) + } +} diff --git a/core/src/services/webdav/backend.rs b/core/src/services/webdav/backend.rs index 4a073d6e9fea..2e9bd7e7afbd 100644 --- a/core/src/services/webdav/backend.rs +++ b/core/src/services/webdav/backend.rs @@ -433,7 +433,7 @@ impl WebdavBackend { ) -> Result> { let p = build_rooted_abs_path(&self.root, path); - let url = format!("{}{}", self.endpoint, percent_encode_path(&p)); + let url: String = format!("{}{}", self.endpoint, percent_encode_path(&p)); let mut req = Request::get(&url); diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 3673b172b0ef..5019d22e39a0 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -61,6 +61,8 @@ pub enum Scheme { Moka, /// [obs][crate::services::Obs]: Huawei Cloud OBS services. Obs, + /// [onedrive][crate::services::Onedrive]: Microsoft OneDrive services. + Onedrive, /// [oss][crate::services::Oss]: Aliyun Object Storage Services Oss, /// [redis][crate::services::Redis]: Redis services @@ -160,6 +162,7 @@ impl From for &'static str { Scheme::Memory => "memory", Scheme::Moka => "moka", Scheme::Obs => "obs", + Scheme::Onedrive => "onedrive", Scheme::Redis => "redis", Scheme::Rocksdb => "rocksdb", Scheme::S3 => "s3",