diff --git a/.env.example b/.env.example index b42f03bddcf0..d50da55072dc 100644 --- a/.env.example +++ b/.env.example @@ -186,3 +186,10 @@ OPENDAL_D1_DATABASE_ID= OPENDAL_D1_TABLE= OPENDAL_D1_KEY_FIELD= OPENDAL_D1_VALUE_FIELD= +# azfile +OPENDAL_AZFILE_TEST=false +OPENDAL_AZFILE_ENDPOINT= +OPENDAL_AZFILE_ROOT=/tmp/opendal/ +OPENDAL_AZFILE_ACCOUNT_NAME= +OPENDAL_AZFILE_ACCOUNT_KEY= +OPENDAL_AZFILE_SHARE_NAME= diff --git a/core/Cargo.toml b/core/Cargo.toml index 5e20e949e731..ba4c955c58e3 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -50,6 +50,7 @@ default = [ "services-s3", "services-webdav", "services-webhdfs", + "services-azfile", ] # Build docs or not. @@ -180,6 +181,7 @@ services-wasabi = [ ] services-webdav = [] services-webhdfs = [] +services-azfile = [] [lib] bench = false diff --git a/core/src/services/azdls/docs.md b/core/src/services/azdls/docs.md index 04c72245df38..57a289d1c562 100644 --- a/core/src/services/azdls/docs.md +++ b/core/src/services/azdls/docs.md @@ -45,9 +45,9 @@ use opendal::Operator; #[tokio::main] async fn main() -> Result<()> { - // Create azblob backend builder. + // Create azdls backend builder. let mut builder = Azdls::default(); - // Set the root for azblob, all operations will happen under this root. + // Set the root for azdls, all operations will happen under this root. // // NOTE: the root must be absolute path. builder.root("/path/to/dir"); diff --git a/core/src/services/azdls/error.rs b/core/src/services/azdls/error.rs index 70b165398d86..04b839ce9f72 100644 --- a/core/src/services/azdls/error.rs +++ b/core/src/services/azdls/error.rs @@ -77,7 +77,7 @@ pub async fn parse_error(resp: Response) -> Result { }; let mut message = match de::from_reader::<_, AzdlsError>(bs.clone().reader()) { - Ok(azblob_err) => format!("{azblob_err:?}"), + Ok(azdls_err) => format!("{azdls_err:?}"), Err(_) => String::from_utf8_lossy(&bs).into_owned(), }; // If there is no body here, fill with error code. diff --git a/core/src/services/azfile/backend.rs b/core/src/services/azfile/backend.rs new file mode 100644 index 000000000000..c7d8ad41cb91 --- /dev/null +++ b/core/src/services/azfile/backend.rs @@ -0,0 +1,442 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use async_trait::async_trait; +use http::StatusCode; +use log::debug; +use reqsign::AzureStorageConfig; +use reqsign::AzureStorageLoader; +use reqsign::AzureStorageSigner; + +use crate::raw::*; +use crate::services::azfile::pager::AzfilePager; +use crate::*; + +use super::core::AzfileCore; +use super::error::parse_error; +use super::writer::AzfileWriter; +use super::writer::AzfileWriters; + +/// Default endpoint of Azure File services. +const DEFAULT_AZFILE_ENDPOINT_SUFFIX: &str = "file.core.windows.net"; + +/// Azure File services support. +#[doc = include_str!("docs.md")] +#[derive(Default, Clone)] +pub struct AzfileBuilder { + root: Option, + endpoint: Option, + account_name: Option, + share_name: String, + account_key: Option, + sas_token: Option, + http_client: Option, +} + +impl Debug for AzfileBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("Builder"); + + ds.field("root", &self.root); + ds.field("endpoint", &self.endpoint); + ds.field("share_name", &self.share_name); + if self.account_name.is_some() { + ds.field("account_name", &""); + } + if self.account_key.is_some() { + ds.field("account_key", &""); + } + + ds.finish() + } +} + +impl AzfileBuilder { + /// Set root of this backend. + /// + /// All operations will happen under this root. + pub fn root(&mut self, root: &str) -> &mut Self { + if !root.is_empty() { + self.root = Some(root.to_string()) + } + + self + } + + /// Set endpoint of this backend. + pub fn endpoint(&mut self, endpoint: &str) -> &mut Self { + if !endpoint.is_empty() { + // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/` + self.endpoint = Some(endpoint.trim_end_matches('/').to_string()); + } + + self + } + + /// Set account_name of this backend. + /// + /// - If account_name is set, we will take user's input first. + /// - If not, we will try to load it from environment. + pub fn account_name(&mut self, account_name: &str) -> &mut Self { + if !account_name.is_empty() { + self.account_name = Some(account_name.to_string()); + } + + self + } + + /// Set account_key of this backend. + /// + /// - If account_key is set, we will take user's input first. + /// - If not, we will try to load it from environment. + pub fn account_key(&mut self, account_key: &str) -> &mut Self { + if !account_key.is_empty() { + self.account_key = Some(account_key.to_string()); + } + + self + } + + /// Set file share name of this backend. + /// + /// # Notes + /// You can find more about from: https://learn.microsoft.com/en-us/rest/api/storageservices/operations-on-shares--file-service + pub fn share_name(&mut self, share_name: &str) -> &mut Self { + if !share_name.is_empty() { + self.share_name = share_name.to_string(); + } + + self + } + + /// Specify the http client that used by this service. + /// + /// # Notes + /// + /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed + /// during minor updates. + pub fn http_client(&mut self, client: HttpClient) -> &mut Self { + self.http_client = Some(client); + self + } +} + +impl Builder for AzfileBuilder { + const SCHEME: Scheme = Scheme::Azfile; + type Accessor = AzfileBackend; + + fn from_map(map: HashMap) -> Self { + let mut builder = AzfileBuilder::default(); + + map.get("root").map(|v| builder.root(v)); + map.get("endpoint").map(|v| builder.endpoint(v)); + map.get("account_name").map(|v| builder.account_name(v)); + map.get("account_key").map(|v| builder.account_key(v)); + map.get("share_name").map(|v| builder.share_name(v)); + + builder + } + + fn build(&mut self) -> Result { + debug!("backend build started: {:?}", &self); + + let root = normalize_root(&self.root.take().unwrap_or_default()); + debug!("backend use root {}", root); + + let endpoint = match &self.endpoint { + Some(endpoint) => Ok(endpoint.clone()), + None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::Azfile)), + }?; + debug!("backend use endpoint {}", &endpoint); + + let client = if let Some(client) = self.http_client.take() { + client + } else { + HttpClient::new().map_err(|err| { + err.with_operation("Builder::build") + .with_context("service", Scheme::Azfile) + })? + }; + + let account_name_option = self + .account_name + .clone() + .or_else(|| infer_account_name_from_endpoint(endpoint.as_str())); + + let account_name = match account_name_option { + Some(account_name) => Ok(account_name), + None => Err( + Error::new(ErrorKind::ConfigInvalid, "account_name is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::Azfile), + ), + }?; + + let config_loader = AzureStorageConfig { + account_name: Some(account_name), + sas_token: self.sas_token.clone(), + ..Default::default() + }; + + let cred_loader = AzureStorageLoader::new(config_loader); + + let signer = AzureStorageSigner::new(); + + debug!("backend build finished: {:?}", &self); + Ok(AzfileBackend { + core: Arc::new(AzfileCore { + root, + endpoint, + loader: cred_loader, + client, + signer, + share_name: self.share_name.clone(), + }), + }) + } +} + +fn infer_account_name_from_endpoint(endpoint: &str) -> Option { + let endpoint: &str = endpoint + .strip_prefix("http://") + .or_else(|| endpoint.strip_prefix("https://")) + .unwrap_or(endpoint); + + let mut parts = endpoint.splitn(2, '.'); + let account_name = parts.next(); + let endpoint_suffix = parts + .next() + .unwrap_or_default() + .trim_end_matches('/') + .to_lowercase(); + + if endpoint_suffix == DEFAULT_AZFILE_ENDPOINT_SUFFIX { + account_name.map(|s| s.to_string()) + } else { + None + } +} + +/// Backend for azfile services. +#[derive(Debug, Clone)] +pub struct AzfileBackend { + core: Arc, +} + +#[async_trait] +impl Accessor for AzfileBackend { + type Reader = IncomingAsyncBody; + type BlockingReader = (); + type Writer = AzfileWriters; + type BlockingWriter = (); + type Pager = AzfilePager; + type BlockingPager = (); + + fn info(&self) -> AccessorInfo { + let mut am = AccessorInfo::default(); + am.set_scheme(Scheme::Azfile) + .set_root(&self.core.root) + .set_native_capability(Capability { + stat: true, + + read: true, + read_can_next: true, + read_with_range: true, + + write: true, + create_dir: true, + delete: true, + rename: true, + + list: true, + list_with_delimiter_slash: true, + + ..Default::default() + }); + + am + } + + async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result { + self.core.ensure_parent_dir_exists(path).await?; + let resp = self.core.azfile_create_dir(path).await?; + let status = resp.status(); + + match status { + StatusCode::CREATED => { + resp.into_body().consume().await?; + Ok(RpCreateDir::default()) + } + _ => { + if resp + .headers() + .get("x-ms-error-code") + .map(|value| value.to_str().unwrap_or("")) + .unwrap_or_else(|| "") + == "ResourceAlreadyExists" + { + Ok(RpCreateDir::default()) + } else { + Err(parse_error(resp).await?) + } + } + } + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let resp = self.core.azfile_read(path, args.range()).await?; + + let status = resp.status(); + + match status { + StatusCode::OK | StatusCode::PARTIAL_CONTENT => { + let meta = parse_into_metadata(path, resp.headers())?; + Ok((RpRead::with_metadata(meta), resp.into_body())) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + self.core.ensure_parent_dir_exists(path).await?; + let w = AzfileWriter::new(self.core.clone(), args.clone(), path.to_string()); + let w = if args.append() { + AzfileWriters::Two(oio::AppendObjectWriter::new(w)) + } else { + AzfileWriters::One(oio::OneShotWriter::new(w)) + }; + return Ok((RpWrite::default(), w)); + } + + async fn stat(&self, path: &str, _: OpStat) -> Result { + // Stat root always returns a DIR. + if path == "/" { + return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); + } + + if path.ends_with('/') { + let resp = self.core.azfile_get_path_properties(path).await?; + let status = resp.status(); + match status { + StatusCode::OK => { + let meta = parse_into_metadata(path, resp.headers())?; + Ok(RpStat::new(meta)) + } + _ => Err(parse_error(resp).await?), + } + } else { + let resp = self.core.azfile_get_file_properties(path).await?; + let status = resp.status(); + match status { + StatusCode::OK => { + let meta = parse_into_metadata(path, resp.headers())?; + Ok(RpStat::new(meta)) + } + _ => Err(parse_error(resp).await?), + } + } + } + + async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result { + self.core.ensure_parent_dir_exists(to).await?; + let resp = self.core.azfile_rename(from, to).await?; + let status = resp.status(); + match status { + StatusCode::OK => { + resp.into_body().consume().await?; + Ok(RpRename::default()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn delete(&self, path: &str, _: OpDelete) -> Result { + let resp = if path.ends_with('/') { + self.core.azfile_delete_dir(path).await? + } else { + self.core.azfile_delete_file(path).await? + }; + + let status = resp.status(); + match status { + StatusCode::ACCEPTED => { + resp.into_body().consume().await?; + Ok(RpDelete::default()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { + let op = AzfilePager::new(self.core.clone(), path.to_string(), args.limit()); + + Ok((RpList::default(), op)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::Builder; + + #[test] + fn test_infer_storage_name_from_endpoint() { + let cases = vec![ + ( + "test infer account name from endpoint", + "https://account.file.core.windows.net", + "account", + ), + ( + "test infer account name from endpoint with trailing slash", + "https://account.file.core.windows.net/", + "account", + ), + ]; + for (desc, endpoint, expected) in cases { + let account_name = infer_account_name_from_endpoint(endpoint); + assert_eq!(account_name, Some(expected.to_string()), "{}", desc); + } + } + + #[test] + fn test_builder_from_endpoint_and_key_infer_account_name() { + let mut azfile_builder = AzfileBuilder::default(); + azfile_builder.endpoint("https://account.file.core.windows.net/"); + azfile_builder.account_key("account-key"); + let azfile = azfile_builder + .build() + .expect("build Azdls should be succeeded."); + + assert_eq!( + azfile.core.endpoint, + "https://account.file.core.windows.net" + ); + + assert_eq!( + azfile_builder.account_key.unwrap(), + "account-key".to_string() + ); + } +} diff --git a/core/src/services/azfile/core.rs b/core/src/services/azfile/core.rs new file mode 100644 index 000000000000..ce9717ac5536 --- /dev/null +++ b/core/src/services/azfile/core.rs @@ -0,0 +1,418 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::VecDeque; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::fmt::Write; + +use http::header::CONTENT_DISPOSITION; +use http::header::CONTENT_LENGTH; +use http::header::CONTENT_TYPE; +use http::header::RANGE; +use http::HeaderName; +use http::HeaderValue; +use http::Request; +use http::Response; +use http::StatusCode; +use reqsign::AzureStorageCredential; +use reqsign::AzureStorageLoader; +use reqsign::AzureStorageSigner; + +use crate::raw::*; +use crate::*; + +const X_MS_VERSION: &str = "x-ms-version"; +const X_MS_WRITE: &str = "x-ms-write"; +const X_MS_RENAME_SOURCE: &str = "x-ms-rename-source"; +const X_MS_CONTENT_LENGTH: &str = "x-ms-content-length"; +const X_MS_TYPE: &str = "x-ms-type"; + +pub struct AzfileCore { + pub root: String, + pub endpoint: String, + pub share_name: String, + pub client: HttpClient, + pub loader: AzureStorageLoader, + pub signer: AzureStorageSigner, +} + +impl Debug for AzfileCore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AzfileCore") + .field("root", &self.root) + .field("endpoint", &self.endpoint) + .field("share_name", &self.share_name) + .finish_non_exhaustive() + } +} + +impl AzfileCore { + async fn load_credential(&self) -> Result { + let cred = self + .loader + .load() + .await + .map_err(new_request_credential_error)?; + + if let Some(cred) = cred { + Ok(cred) + } else { + Err(Error::new( + ErrorKind::ConfigInvalid, + "no valid credential found", + )) + } + } + + pub async fn sign(&self, req: &mut Request) -> Result<()> { + let cred = self.load_credential().await?; + // Insert x-ms-version header for normal requests. + req.headers_mut().insert( + HeaderName::from_static(X_MS_VERSION), + // consistent with azdls and azblob + HeaderValue::from_static("2022-11-02"), + ); + self.signer.sign(req, &cred).map_err(new_request_sign_error) + } + + #[inline] + pub async fn send(&self, req: Request) -> Result> { + self.client.send(req).await + } + + pub async fn azfile_read( + &self, + path: &str, + range: BytesRange, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "{}/{}/{}", + self.endpoint, + self.share_name, + percent_encode_path(&p) + ); + + let mut req = Request::get(&url); + + req = req.header(RANGE, range.to_header()); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn azfile_create_file( + &self, + path: &str, + size: usize, + args: &OpWrite, + ) -> Result> { + let p = build_abs_path(&self.root, path) + .trim_start_matches('/') + .to_string(); + let url = format!( + "{}/{}/{}", + self.endpoint, + self.share_name, + percent_encode_path(&p) + ); + + let mut req = Request::put(&url); + + // x-ms-content-length specifies the maximum size for the file, up to 4 tebibytes (TiB) + // https://learn.microsoft.com/en-us/rest/api/storageservices/create-file + req = req.header(X_MS_CONTENT_LENGTH, size); + + req = req.header(X_MS_TYPE, "file"); + + // Content length must be 0 for create request. + req = req.header(CONTENT_LENGTH, 0); + + if let Some(ty) = args.content_type() { + req = req.header(CONTENT_TYPE, ty); + } + + if let Some(pos) = args.content_disposition() { + req = req.header(CONTENT_DISPOSITION, pos); + } + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn azfile_update( + &self, + path: &str, + size: u64, + position: u64, + body: AsyncBody, + ) -> Result> { + let p = build_abs_path(&self.root, path) + .trim_start_matches('/') + .to_string(); + + let url = format!( + "{}/{}/{}?comp=range", + self.endpoint, + self.share_name, + percent_encode_path(&p) + ); + + let mut req = Request::put(&url); + + req = req.header(CONTENT_LENGTH, size); + + req = req.header(X_MS_WRITE, "update"); + + req = req.header( + RANGE, + BytesRange::from(position..position + size).to_header(), + ); + + let mut req = req.body(body).map_err(new_request_build_error)?; + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn azfile_get_file_properties( + &self, + path: &str, + ) -> Result> { + let p = build_abs_path(&self.root, path) + .trim_start_matches('/') + .to_string(); + + let url = format!( + "{}/{}/{}", + self.endpoint, + self.share_name, + percent_encode_path(&p) + ); + + let req = Request::head(&url); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn azfile_get_path_properties( + &self, + path: &str, + ) -> Result> { + let p = build_abs_path(&self.root, path) + .trim_start_matches('/') + .to_string(); + + let url = format!( + "{}/{}/{}?restype=directory", + self.endpoint, + self.share_name, + percent_encode_path(&p) + ); + + let req = Request::head(&url); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn azfile_rename( + &self, + path: &str, + new_path: &str, + ) -> Result> { + let p = build_abs_path(&self.root, path) + .trim_start_matches('/') + .to_string(); + + let new_p = build_abs_path(&self.root, new_path) + .trim_start_matches('/') + .to_string(); + + let url = if path.ends_with('/') { + format!( + "{}/{}/{}?restype=directory&comp=rename", + self.endpoint, + self.share_name, + percent_encode_path(&p) + ) + } else { + format!( + "{}/{}/{}?comp=rename", + self.endpoint, + self.share_name, + percent_encode_path(&p) + ) + }; + + let mut req = Request::put(&url); + + req = req.header(CONTENT_LENGTH, 0); + + req = req.header(X_MS_RENAME_SOURCE, percent_encode_path(&new_p)); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn azfile_create_dir(&self, path: &str) -> Result> { + let p = build_abs_path(&self.root, path) + .trim_start_matches('/') + .to_string(); + + let url = format!( + "{}/{}/{}?restype=directory", + self.endpoint, + self.share_name, + percent_encode_path(&p) + ); + + let mut req = Request::put(&url); + + req = req.header(CONTENT_LENGTH, 0); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn azfile_delete_file(&self, path: &str) -> Result> { + let p = build_abs_path(&self.root, path) + .trim_start_matches('/') + .to_string(); + + let url = format!( + "{}/{}/{}", + self.endpoint, + self.share_name, + percent_encode_path(&p) + ); + + let req = Request::delete(&url); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn azfile_delete_dir(&self, path: &str) -> Result> { + let p = build_abs_path(&self.root, path) + .trim_start_matches('/') + .to_string(); + + let url = format!( + "{}/{}/{}?restype=directory", + self.endpoint, + self.share_name, + percent_encode_path(&p) + ); + + let req = Request::delete(&url); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn azfile_list( + &self, + path: &str, + limit: &Option, + continuation: &String, + ) -> Result> { + let p = build_abs_path(&self.root, path) + .trim_start_matches('/') + .to_string(); + + let mut url = format!( + "{}/{}/{}?restype=directory&comp=list&include=Timestamps,ETag", + self.endpoint, + self.share_name, + percent_encode_path(&p), + ); + + if !continuation.is_empty() { + write!(url, "&marker={}", &continuation).expect("write into string must succeed"); + } + + if let Some(limit) = limit { + write!(url, "&maxresults={}", limit).expect("write into string must succeed"); + } + + let req = Request::get(&url); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn ensure_parent_dir_exists(&self, path: &str) -> Result<()> { + let mut dirs = VecDeque::default(); + // azure file service does not support recursive directory creation + let mut p = path; + while p != "/" { + p = get_parent(p); + dirs.push_front(p); + } + for dir in dirs { + let resp = self.azfile_create_dir(dir).await?; + + if resp.status() != StatusCode::CREATED { + if resp + .headers() + .get("x-ms-error-code") + .map(|value| value.to_str().unwrap_or("")) + .unwrap_or_else(|| "") + == "ResourceAlreadyExists" + { + continue; + } + return Err(Error::new( + ErrorKind::Unexpected, + format!("failed to create directory: {}", dir).as_str(), + )); + } + } + + Ok(()) + } +} diff --git a/core/src/services/azfile/docs.md b/core/src/services/azfile/docs.md new file mode 100644 index 000000000000..8bb7c4d76d4c --- /dev/null +++ b/core/src/services/azfile/docs.md @@ -0,0 +1,66 @@ +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [ ] copy +- [x] rename +- [x] list +- [ ] ~~scan~~ +- [ ] presign +- [ ] blocking + +## Configuration + +- `root`: Set the work dir for backend. +- `endpoint`: Set the endpoint for backend. +- `account_name`: Set the account_name for backend. +- `account_key`: Set the account_key for backend. +- `share_name`: Set the share_name for backend. + +Refer to public API docs for more information. + +## Example + +### Via Builder + +```rust +use std::sync::Arc; + +use anyhow::Result; +use opendal::services::Azfile; +use opendal::Operator; + +#[tokio::main] +async fn main() -> Result<()> { + // Create azfile backend builder. + let mut builder = Azfile::default(); + // Set the root for azfile, all operations will happen under this root. + // + // NOTE: the root must be absolute path. + builder.root("/path/to/dir"); + // Set the filesystem name, this is required. + builder.share_name("test"); + // Set the endpoint, this is required. + // + // For examples: + // - "https://accountname.file.core.windows.net" + builder.endpoint("https://accountname.file.core.windows.net"); + // Set the account_name and account_key. + // + // OpenDAL will try load credential from the env. + // If credential not set and no valid credential in env, OpenDAL will + // send request without signing like anonymous user. + builder.account_name("account_name"); + builder.account_key("account_key"); + + // `Accessor` provides the low level APIs, we will use `Operator` normally. + let op: Operator = Operator::new(builder)?.finish(); + + Ok(()) +} +``` diff --git a/core/src/services/azfile/error.rs b/core/src/services/azfile/error.rs new file mode 100644 index 000000000000..026f5b3980f7 --- /dev/null +++ b/core/src/services/azfile/error.rs @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; + +use bytes::Buf; +use http::Response; +use http::StatusCode; +use quick_xml::de; +use serde::Deserialize; + +use crate::raw::*; +use crate::Error; +use crate::ErrorKind; +use crate::Result; + +/// AzfileError is the error returned by azure file service. +#[derive(Default, Deserialize)] +#[serde(default, rename_all = "PascalCase")] +struct AzfileError { + code: String, + message: String, + query_parameter_name: String, + query_parameter_value: String, + reason: String, +} + +impl Debug for AzfileError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut de = f.debug_struct("AzfileError"); + de.field("code", &self.code); + // replace `\n` to ` ` for better reading. + de.field("message", &self.message.replace('\n', " ")); + + if !self.query_parameter_name.is_empty() { + de.field("query_parameter_name", &self.query_parameter_name); + } + if !self.query_parameter_value.is_empty() { + de.field("query_parameter_value", &self.query_parameter_value); + } + if !self.reason.is_empty() { + de.field("reason", &self.reason); + } + + de.finish() + } +} + +/// Parse error response into Error. +pub async fn parse_error(resp: Response) -> Result { + let (parts, body) = resp.into_parts(); + let bs = body.bytes().await?; + + let (kind, retryable) = match parts.status { + StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), + StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false), + StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => { + (ErrorKind::ConditionNotMatch, false) + } + StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, false), + }; + + let mut message = match de::from_reader::<_, AzfileError>(bs.clone().reader()) { + Ok(azfile_err) => format!("{azfile_err:?}"), + Err(_) => String::from_utf8_lossy(&bs).into_owned(), + }; + + // If there is no body here, fill with error code. + if message.is_empty() { + if let Some(v) = parts.headers.get("x-ms-error-code") { + if let Ok(code) = v.to_str() { + message = format!( + "{:?}", + AzfileError { + code: code.to_string(), + ..Default::default() + } + ) + } + } + } + + let mut err = Error::new(kind, &message); + + err = with_error_response_context(err, parts); + + if retryable { + err = err.set_temporary(); + } + + Ok(err) +} diff --git a/core/src/services/azfile/mod.rs b/core/src/services/azfile/mod.rs new file mode 100644 index 000000000000..8aeebac0fdac --- /dev/null +++ b/core/src/services/azfile/mod.rs @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub use backend::AzfileBuilder as Azfile; + +mod backend; +mod core; +mod error; +mod pager; +mod writer; diff --git a/core/src/services/azfile/pager.rs b/core/src/services/azfile/pager.rs new file mode 100644 index 000000000000..8272d19f0c2a --- /dev/null +++ b/core/src/services/azfile/pager.rs @@ -0,0 +1,231 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use http::StatusCode; +use quick_xml::de::from_str; +use serde::Deserialize; + +use crate::raw::*; +use crate::*; + +use super::core::AzfileCore; +use super::error::parse_error; + +pub struct AzfilePager { + core: Arc, + path: String, + limit: Option, + done: bool, + continuation: String, +} + +impl AzfilePager { + pub fn new(core: Arc, path: String, limit: Option) -> Self { + Self { + core, + path, + limit, + done: false, + continuation: "".to_string(), + } + } +} + +#[async_trait] +impl oio::Page for AzfilePager { + async fn next(&mut self) -> Result>> { + if self.done { + return Ok(None); + } + + let resp = self + .core + .azfile_list(&self.path, &self.limit, &self.continuation) + .await?; + + let status = resp.status(); + + if status != StatusCode::OK { + if status == StatusCode::NOT_FOUND { + return Ok(None); + } + return Err(parse_error(resp).await?); + } + + let bs = resp.into_body().bytes().await?; + + let text = String::from_utf8(bs.to_vec()).expect("response convert to string must success"); + + let results: EnumerationResults = from_str(&text).map_err(|e| { + Error::new(ErrorKind::Unexpected, "deserialize xml from response").set_source(e) + })?; + + let mut entries = Vec::new(); + + for file in results.entries.file { + let meta = Metadata::new(EntryMode::FILE) + .with_etag(file.properties.etag) + .with_content_length(file.properties.content_length.unwrap_or(0)) + .with_last_modified(parse_datetime_from_rfc2822(&file.properties.last_modified)?); + let path = self.path.clone().trim_start_matches('/').to_string() + &file.name; + entries.push(oio::Entry::new(&path, meta)); + } + + for dir in results.entries.directory { + let meta = Metadata::new(EntryMode::DIR) + .with_etag(dir.properties.etag) + .with_last_modified(parse_datetime_from_rfc2822(&dir.properties.last_modified)?); + let path = self.path.clone().trim_start_matches('/').to_string() + &dir.name + "/"; + entries.push(oio::Entry::new(&path, meta)); + } + + if results.next_marker.is_empty() { + self.done = true; + } else { + self.continuation = results.next_marker; + } + + if entries.is_empty() { + Ok(None) + } else { + Ok(Some(entries)) + } + } +} + +#[derive(Debug, Deserialize, PartialEq)] +#[serde(rename_all = "PascalCase")] +struct EnumerationResults { + marker: Option, + prefix: Option, + max_results: Option, + directory_id: Option, + entries: Entries, + #[serde(default)] + next_marker: String, +} + +#[derive(Debug, Deserialize, PartialEq)] +#[serde(rename_all = "PascalCase")] +struct Entries { + #[serde(default)] + file: Vec, + #[serde(default)] + directory: Vec, +} + +#[derive(Debug, Deserialize, PartialEq)] +#[serde(rename_all = "PascalCase")] +struct File { + #[serde(rename = "FileId")] + file_id: String, + name: String, + properties: Properties, +} + +#[derive(Debug, Deserialize, PartialEq)] +#[serde(rename_all = "PascalCase")] +struct Directory { + #[serde(rename = "FileId")] + file_id: String, + name: String, + properties: Properties, +} + +#[derive(Debug, Deserialize, PartialEq)] +#[serde(rename_all = "PascalCase")] +struct Properties { + #[serde(rename = "Content-Length")] + content_length: Option, + #[serde(rename = "CreationTime")] + creation_time: String, + #[serde(rename = "LastAccessTime")] + last_access_time: String, + #[serde(rename = "LastWriteTime")] + last_write_time: String, + #[serde(rename = "ChangeTime")] + change_time: String, + #[serde(rename = "Last-Modified")] + last_modified: String, + #[serde(rename = "Etag")] + etag: String, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_list_result() { + let xml = r#" + + + string-value + string-value + 100 + directory-id + + + Rust By Example.pdf + 13835093239654252544 + + 5832374 + 2023-09-25T12:43:05.8483527Z + 2023-09-25T12:43:05.8483527Z + 2023-09-25T12:43:08.6337775Z + 2023-09-25T12:43:08.6337775Z + Mon, 25 Sep 2023 12:43:08 GMT + \"0x8DBBDC4F8AC4AEF\" + + + + test_list_rich_dir + 12105702186650959872 + + 2023-10-15T12:03:40.7194774Z + 2023-10-15T12:03:40.7194774Z + 2023-10-15T12:03:40.7194774Z + 2023-10-15T12:03:40.7194774Z + Sun, 15 Oct 2023 12:03:40 GMT + \"0x8DBCD76C58C3E96\" + + + + + + "#; + + let results: EnumerationResults = from_str(xml).unwrap(); + + assert_eq!(results.entries.file[0].name, "Rust By Example.pdf"); + + assert_eq!( + results.entries.file[0].properties.etag, + "\\\"0x8DBBDC4F8AC4AEF\\\"" + ); + + assert_eq!(results.entries.directory[0].name, "test_list_rich_dir"); + + assert_eq!( + results.entries.directory[0].properties.etag, + "\\\"0x8DBCD76C58C3E96\\\"" + ); + } +} diff --git a/core/src/services/azfile/writer.rs b/core/src/services/azfile/writer.rs new file mode 100644 index 000000000000..4f903a9b22a9 --- /dev/null +++ b/core/src/services/azfile/writer.rs @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use http::StatusCode; + +use crate::raw::*; +use crate::*; + +use super::core::AzfileCore; +use super::error::parse_error; + +pub type AzfileWriters = + oio::TwoWaysWriter, oio::AppendObjectWriter>; + +pub struct AzfileWriter { + core: Arc, + op: OpWrite, + path: String, +} + +impl AzfileWriter { + pub fn new(core: Arc, op: OpWrite, path: String) -> Self { + AzfileWriter { core, op, path } + } +} + +#[async_trait] +impl oio::OneShotWrite for AzfileWriter { + async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> { + let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining())); + + let resp = self + .core + .azfile_create_file(&self.path, bs.len(), &self.op) + .await?; + + let status = resp.status(); + match status { + StatusCode::OK | StatusCode::CREATED => { + resp.into_body().consume().await?; + } + _ => { + return Err(parse_error(resp) + .await? + .with_operation("Backend::azfile_create_file")); + } + } + + let resp = self + .core + .azfile_update(&self.path, bs.len() as u64, 0, AsyncBody::ChunkedBytes(bs)) + .await?; + let status = resp.status(); + return match status { + StatusCode::OK | StatusCode::CREATED => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp) + .await? + .with_operation("Backend::azfile_update")), + }; + } +} + +#[async_trait] +impl oio::AppendObjectWrite for AzfileWriter { + async fn offset(&self) -> Result { + let resp = self.core.azfile_get_file_properties(&self.path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(parse_content_length(resp.headers())?.unwrap_or_default()), + _ => Err(parse_error(resp).await?), + } + } + + async fn append(&self, offset: u64, size: u64, body: AsyncBody) -> Result<()> { + let resp = self + .core + .azfile_update(&self.path, size, offset, body) + .await?; + + let status = resp.status(); + match status { + StatusCode::OK | StatusCode::CREATED => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp) + .await? + .with_operation("Backend::azfile_update")), + } + } +} diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index 93da8b917b0a..d69ea5b0728f 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -233,3 +233,8 @@ pub use self::sqlite::Sqlite; mod d1; #[cfg(feature = "services-d1")] pub use self::d1::D1; + +#[cfg(feature = "services-azfile")] +mod azfile; +#[cfg(feature = "services-azfile")] +pub use self::azfile::Azfile; diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index fad9d7f6e207..16692e53c404 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -118,6 +118,8 @@ pub enum Scheme { Redb, /// [tikv][crate::services::tikv]: Tikv Services Tikv, + /// [azfile][crate::services::azfile]: Azfile Services + Azfile, /// Custom that allow users to implement services outside of OpenDAL. /// /// # NOTE @@ -198,6 +200,7 @@ impl FromStr for Scheme { "webdav" => Ok(Scheme::Webdav), "webhdfs" => Ok(Scheme::Webhdfs), "tikv" => Ok(Scheme::Tikv), + "azfile" => Ok(Scheme::Azfile), _ => Ok(Scheme::Custom(Box::leak(s.into_boxed_str()))), } } @@ -248,6 +251,7 @@ impl From for &'static str { Scheme::Webhdfs => "webhdfs", Scheme::Redb => "redb", Scheme::Tikv => "tikv", + Scheme::Azfile => "azfile", Scheme::Sqlite => "sqlite", Scheme::Custom(v) => v, } diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs index cc0bf667f154..3d7cc54d37de 100644 --- a/core/tests/behavior/main.rs +++ b/core/tests/behavior/main.rs @@ -187,6 +187,8 @@ fn main() -> anyhow::Result<()> { tests.extend(behavior_test::()); #[cfg(feature = "services-d1")] tests.extend(behavior_test::()); + #[cfg(feature = "services-azfile")] + tests.extend(behavior_test::()); // Don't init logging while building operator which may break cargo // nextest output