From a2ec651fbe5ab1b49bc5ed0073bd01b2dcd510bc Mon Sep 17 00:00:00 2001 From: Morris Tai Date: Wed, 28 Jun 2023 18:58:31 -0400 Subject: [PATCH 1/8] feat: service add dbfs api 2.0 support --- core/Cargo.toml | 1 + core/src/services/dbfs/backend.rs | 519 ++++++++++++++++++++++++++++++ core/src/services/dbfs/docs.md | 58 ++++ core/src/services/dbfs/error.rs | 108 +++++++ core/src/services/dbfs/mod.rs | 23 ++ core/src/services/dbfs/pager.rs | 105 ++++++ core/src/services/dbfs/reader.rs | 311 ++++++++++++++++++ core/src/services/dbfs/writer.rs | 67 ++++ core/src/services/mod.rs | 5 + core/src/types/scheme.rs | 4 + 10 files changed, 1201 insertions(+) create mode 100644 core/src/services/dbfs/backend.rs create mode 100644 core/src/services/dbfs/docs.md create mode 100644 core/src/services/dbfs/error.rs create mode 100644 core/src/services/dbfs/mod.rs create mode 100644 core/src/services/dbfs/pager.rs create mode 100644 core/src/services/dbfs/reader.rs create mode 100644 core/src/services/dbfs/writer.rs diff --git a/core/Cargo.toml b/core/Cargo.toml index 3679f3955a68..acb7595d41d3 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -124,6 +124,7 @@ services-cos = [ ] services-d1 = [] services-dashmap = ["dep:dashmap"] +services-dbfs = [] services-dropbox = [] services-etcd = ["dep:etcd-client", "dep:bb8"] services-foundationdb = ["dep:foundationdb"] diff --git a/core/src/services/dbfs/backend.rs b/core/src/services/dbfs/backend.rs new file mode 100644 index 000000000000..702e8f779322 --- /dev/null +++ b/core/src/services/dbfs/backend.rs @@ -0,0 +1,519 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; + +use async_trait::async_trait; +use base64::prelude::BASE64_STANDARD; +use base64::Engine; +use bytes::Bytes; +use http::header; +use http::Request; +use http::Response; +use http::StatusCode; +use log::debug; +use serde::Deserialize; +use serde_json::json; + +use super::error::parse_dbfs_read_error; +use super::error::parse_error; +use super::pager::DbfsPager; +use super::reader::IncomingDbfsAsyncBody; +use super::writer::DbfsWriter; +use crate::raw::*; +use crate::*; + +/// [Dbfs](https://docs.databricks.com/api/azure/workspace/dbfs)'s REST API support. +#[doc = include_str!("docs.md")] +#[derive(Default, Clone)] +pub struct DbfsBuilder { + root: Option, + endpoint: String, + token: Option, +} + +impl Debug for DbfsBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("Builder"); + + ds.field("root", &self.root); + ds.field("endpoint", &self.endpoint); + + if self.token.is_some() { + ds.field("token", &""); + } + + ds.finish() + } +} + +impl DbfsBuilder { + /// Set root of this backend. + /// + /// All operations will happen under this root. + pub fn root(&mut self, root: &str) -> &mut Self { + if !root.is_empty() { + self.root = Some(root.to_string()) + } + + self + } + + /// Set endpoint of this backend. + /// + /// Endpoint must be full uri, e.g. + /// + /// - Azure: `https://adb-1234567890123456.78.azuredatabricks.net` + /// - Aws: `https://dbc-123a5678-90bc.cloud.databricks.com` + pub fn endpoint(&mut self, endpoint: &str) -> &mut Self { + assert!(!endpoint.is_empty()); + self.endpoint = endpoint.trim_end_matches('/').to_string(); + self + } + + /// Set the token of this backend. + pub fn token(&mut self, token: &str) -> &mut Self { + if !token.is_empty() { + self.token = Some(token.to_string()); + } + self + } +} + +impl Builder for DbfsBuilder { + const SCHEME: Scheme = Scheme::Dbfs; + type Accessor = DbfsBackend; + + fn from_map(map: HashMap) -> Self { + let mut builder = DbfsBuilder::default(); + + map.get("endpoint").map(|v| builder.endpoint(v)); + map.get("token").map(|v| builder.token(v)); + + builder + } + + /// Build a DbfsBackend. + fn build(&mut self) -> Result { + debug!("backend build started: {:?}", &self); + + let root = normalize_root(&self.root.take().unwrap_or_default()); + debug!("backend use root {}", root); + + let endpoint = match self.endpoint.is_empty() { + false => Ok(&self.endpoint), + true => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::Dbfs)), + }?; + debug!("backend use endpoint: {}", &endpoint); + + let token = match self.token.take() { + Some(token) => token, + None => { + return Err(Error::new( + ErrorKind::ConfigInvalid, + "missing token for Dbfs", + )); + } + }; + + let client = HttpClient::new()?; + + debug!("backend build finished: {:?}", &self); + Ok(DbfsBackend { + root, + endpoint: self.endpoint.clone(), + token, + client, + }) + } +} + +/// Backend for DBFS service +#[derive(Debug, Clone)] +pub struct DbfsBackend { + root: String, + endpoint: String, + token: String, + pub(super) client: HttpClient, +} + +impl DbfsBackend { + fn dbfs_create_dir_request(&self, path: &str) -> Result> { + let url = format!("{}/api/2.0/dbfs/mkdirs", self.endpoint); + let mut req = Request::post(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let p = build_rooted_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let req_body = format!("{{\"path\": \"{}\"}}", percent_encode_path(&p)); + let body = AsyncBody::Bytes(Bytes::from(req_body)); + + let req = req.body(body).map_err(new_request_build_error)?; + + Ok(req) + } + + async fn dbfs_delete(&self, path: &str) -> Result> { + let url = format!("{}/api/2.0/dbfs/delete", self.endpoint); + let mut req = Request::post(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let p = build_rooted_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let request_body = &json!({ + "path": percent_encode_path(&p), + // TODO: support recursive toggle, should we add a new field in OpDelete? + "recursive": true, + }); + + let body = AsyncBody::Bytes(Bytes::from(request_body.to_string())); + + let req = req.body(body).map_err(new_request_build_error)?; + + self.client.send(req).await + } + + async fn dbfs_rename(&self, from: &str, to: &str) -> Result> { + let source = build_rooted_abs_path(&self.root, from); + let target = build_rooted_abs_path(&self.root, to); + + let url = format!("{}/api/2.0/dbfs/move", self.endpoint); + let mut req = Request::post(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req_body = &json!({ + "source_path": percent_encode_path(&source), + "destination_path": percent_encode_path(&target), + }); + + let body = AsyncBody::Bytes(Bytes::from(req_body.to_string())); + + let req = req.body(body).map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn dbfs_list(&self, path: &str) -> Result> { + let p = build_rooted_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let url = format!( + "{}/api/2.0/dbfs/list?path={}", + self.endpoint, + percent_encode_path(&p) + ); + let mut req = Request::get(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub fn dbfs_create_file_request(&self, path: &str, body: Bytes) -> Result> { + let url = format!("{}/api/2.0/dbfs/put", self.endpoint); + + let contents = BASE64_STANDARD.encode(body); + let mut req = Request::post(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req_body = &json!({ + "path": path, + "contents": contents, + // TODO: support overwrite toggle, should we add a new field in OpWrite? + "overwrite": true, + }); + + let body = AsyncBody::Bytes(Bytes::from(req_body.to_string())); + + req.body(body).map_err(new_request_build_error) + } + + async fn dbfs_read( + &self, + path: &str, + range: BytesRange, + ) -> Result> { + let p = build_rooted_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let mut url = format!( + "{}/api/2.0/dbfs/read?path={}", + self.endpoint, + percent_encode_path(&p) + ); + + if let Some(offset) = range.offset() { + url.push_str(&format!("&offset={}", offset)); + } + + if let Some(length) = range.size() { + url.push_str(&format!("&length={}", length)); + } + + let mut req = Request::get(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.client.send_dbfs(req).await + } + + async fn dbfs_get_properties(&self, path: &str) -> Result> { + let p = build_rooted_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let url = format!( + "{}/api/2.0/dbfs/get-status?path={}", + &self.endpoint, + percent_encode_path(&p) + ); + + let mut req = Request::get(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.client.send(req).await + } + + async fn dbfs_ensure_parent_path(&self, path: &str) -> Result<()> { + let resp = self.dbfs_get_properties(path).await?; + + match resp.status() { + StatusCode::OK => return Ok(()), + StatusCode::NOT_FOUND => { + self.create_dir(path, OpCreateDir::default()).await?; + } + _ => return Err(parse_error(resp).await?), + } + Ok(()) + } +} + +#[async_trait] +impl Accessor for DbfsBackend { + type Reader = IncomingDbfsAsyncBody; + type BlockingReader = (); + type Writer = oio::OneShotWriter; + type BlockingWriter = (); + type Pager = DbfsPager; + type BlockingPager = (); + + fn info(&self) -> AccessorInfo { + let mut am = AccessorInfo::default(); + am.set_scheme(Scheme::Dbfs) + .set_root(&self.root) + .set_native_capability(Capability { + stat: true, + + read: true, + read_can_next: true, + read_with_range: true, + + write: true, + create_dir: true, + delete: true, + rename: true, + + list: true, + list_with_delimiter_slash: true, + + ..Default::default() + }); + am + } + + async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result { + let req = self.dbfs_create_dir_request(path)?; + + let resp = self.client.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::CREATED | StatusCode::OK => { + resp.into_body().consume().await?; + Ok(RpCreateDir::default()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let resp = self.dbfs_read(path, args.range()).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + // NOTE: If range is not specified, we need to get content length from stat API. + if let Some(size) = args.range().size() { + let mut meta = parse_into_metadata(path, resp.headers())?; + meta.set_content_length(size); + Ok((RpRead::with_metadata(meta), resp.into_body())) + } else { + let stat_resp = self.dbfs_get_properties(path).await?; + let meta = match stat_resp.status() { + StatusCode::OK => { + let mut meta = parse_into_metadata(path, stat_resp.headers())?; + let bs = stat_resp.into_body().bytes().await?; + let decoded_response = serde_json::from_slice::(&bs) + .map_err(new_json_deserialize_error)?; + meta.set_last_modified(parse_datetime_from_from_timestamp_millis( + decoded_response.modification_time, + )?); + match decoded_response.is_dir { + true => meta.set_mode(EntryMode::DIR), + false => { + meta.set_mode(EntryMode::FILE); + meta.set_content_length(decoded_response.file_size as u64) + } + }; + meta + } + _ => return Err(parse_error(stat_resp).await?), + }; + Ok((RpRead::with_metadata(meta), resp.into_body())) + } + } + _ => Err(parse_dbfs_read_error(resp).await?), + } + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + Ok(( + RpWrite::default(), + oio::OneShotWriter::new(DbfsWriter::new(self.clone(), args, path.to_string())), + )) + } + + async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result { + self.dbfs_ensure_parent_path(to).await?; + + let resp = self.dbfs_rename(from, to).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + resp.into_body().consume().await?; + Ok(RpRename::default()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn stat(&self, path: &str, _: OpStat) -> Result { + // Stat root always returns a DIR. + if path == "/" { + return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); + } + + let resp = self.dbfs_get_properties(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let mut meta = parse_into_metadata(path, resp.headers())?; + let bs = resp.into_body().bytes().await?; + let decoded_response = serde_json::from_slice::(&bs) + .map_err(new_json_deserialize_error)?; + meta.set_last_modified(parse_datetime_from_from_timestamp_millis( + decoded_response.modification_time, + )?); + match decoded_response.is_dir { + true => meta.set_mode(EntryMode::DIR), + false => { + meta.set_mode(EntryMode::FILE); + meta.set_content_length(decoded_response.file_size as u64) + } + }; + Ok(RpStat::new(meta)) + } + StatusCode::NOT_FOUND if path.ends_with('/') => { + Ok(RpStat::new(Metadata::new(EntryMode::DIR))) + } + _ => Err(parse_error(resp).await?), + } + } + + /// NOTE: Server will return 200 even if the path doesn't exist. + async fn delete(&self, path: &str, _: OpDelete) -> Result { + let resp = self.dbfs_delete(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(RpDelete::default()), + _ => { + let err = parse_error(resp).await?; + match err.kind() { + ErrorKind::NotFound => Ok(RpDelete::default()), + _ => Err(err), + } + } + } + } + + async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Pager)> { + let op = DbfsPager::new(self.clone(), path.to_string()); + + Ok((RpList::default(), op)) + } +} + +#[derive(Deserialize)] +struct DbfsStatus { + // Not used fields. + // path: String, + is_dir: bool, + file_size: i64, + modification_time: i64, +} diff --git a/core/src/services/dbfs/docs.md b/core/src/services/dbfs/docs.md new file mode 100644 index 000000000000..0e9cfa2ec4d8 --- /dev/null +++ b/core/src/services/dbfs/docs.md @@ -0,0 +1,58 @@ +This service will visit the [DBFS API](https://docs.databricks.com/api/azure/workspace/dbfs) supported by [Databricks File System](https://docs.databricks.com/en/dbfs/index.html). + +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [ ] copy +- [x] rename +- [x] list +- [ ] ~~scan~~ +- [ ] ~~presign~~ +- [ ] blocking + +## Configurations + +- `root`: Set the work directory for backend. +- `endpoint`: Set the endpoint for backend. +- `token`: Databricks personal access token. + +Refer to [`Builder`]'s public API docs for more information. + +## Examples + +### Via Builder + +```rust +use std::sync::Arc; + +use anyhow::Result; +use opendal::services::Dbfs; +use opendal::Operator; + +#[tokio::main] +async fn main() -> Result<()> { + let mut builder = Dbfs::default(); + // set the root for Dbfs, all operations will happen under this root + // + // Note: + // if the root is not exists, the builder will automatically create the + // root directory for you + // if the root exists and is a directory, the builder will continue working + // if the root exists and is a folder, the builder will fail on building backend + builder.root("/path/to/dir"); + // set the endpoint of Dbfs workspace + builder.endpoint("https://adb-1234567890123456.78.azuredatabricks.net"); + // set the personal access token for builder + builder.token("access_token"); + + let op: Operator = Operator::new(builder)?.finish(); + + Ok(()) +} +``` diff --git a/core/src/services/dbfs/error.rs b/core/src/services/dbfs/error.rs new file mode 100644 index 000000000000..8e0cef99d254 --- /dev/null +++ b/core/src/services/dbfs/error.rs @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; + +use http::Response; +use http::StatusCode; +use serde::Deserialize; + +use super::reader::IncomingDbfsAsyncBody; +use crate::raw::*; +use crate::Error; +use crate::ErrorKind; +use crate::Result; + +/// DbfsError is the error returned by DBFS service. +#[derive(Default, Deserialize)] +struct DbfsError { + error_code: String, + message: String, +} + +impl Debug for DbfsError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut de = f.debug_struct("DbfsError"); + de.field("error_code", &self.error_code); + // replace `\n` to ` ` for better reading. + de.field("message", &self.message.replace('\n', " ")); + + de.finish() + } +} + +pub async fn parse_error(resp: Response) -> Result { + let (parts, body) = resp.into_parts(); + let bs = body.bytes().await?; + + let (kind, retryable) = match parts.status { + StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), + StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false), + StatusCode::PRECONDITION_FAILED => (ErrorKind::ConditionNotMatch, false), + StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, false), + }; + + let message = match serde_json::from_slice::(&bs) { + Ok(dbfs_error) => format!("{:?}", dbfs_error.message), + Err(_) => String::from_utf8_lossy(&bs).into_owned(), + }; + + let mut err = Error::new(kind, &message); + + err = with_error_response_context(err, parts); + + if retryable { + err = err.set_temporary(); + } + + Ok(err) +} + +pub async fn parse_dbfs_read_error(resp: Response) -> Result { + let (parts, body) = resp.into_parts(); + let bs = body.bytes().await?; + + let (kind, retryable) = match parts.status { + StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), + StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false), + StatusCode::PRECONDITION_FAILED => (ErrorKind::ConditionNotMatch, false), + StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, false), + }; + + let message = match serde_json::from_slice::(&bs) { + Ok(dbfs_error) => format!("{:?}", dbfs_error.message), + Err(_) => String::from_utf8_lossy(&bs).into_owned(), + }; + + let mut err = Error::new(kind, &message); + + err = with_error_response_context(err, parts); + + if retryable { + err = err.set_temporary(); + } + + Ok(err) +} diff --git a/core/src/services/dbfs/mod.rs b/core/src/services/dbfs/mod.rs new file mode 100644 index 000000000000..094ebf76b4ed --- /dev/null +++ b/core/src/services/dbfs/mod.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; +pub use backend::DbfsBuilder as Dbfs; +mod error; +mod pager; +mod reader; +mod writer; diff --git a/core/src/services/dbfs/pager.rs b/core/src/services/dbfs/pager.rs new file mode 100644 index 000000000000..98306bd27aad --- /dev/null +++ b/core/src/services/dbfs/pager.rs @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use http::StatusCode; +use serde::Deserialize; + +use super::backend::DbfsBackend; +use super::error::parse_error; +use crate::raw::*; +use crate::*; + +pub struct DbfsPager { + backend: DbfsBackend, + path: String, + done: bool, +} + +impl DbfsPager { + pub fn new(backend: DbfsBackend, path: String) -> Self { + Self { + backend, + path, + done: false, + } + } +} + +#[async_trait] +impl oio::Page for DbfsPager { + async fn next(&mut self) -> Result>> { + if self.done { + return Ok(None); + } + + let response = self.backend.dbfs_list(&self.path).await?; + + let status_code = response.status(); + if !status_code.is_success() { + if status_code == StatusCode::NOT_FOUND { + return Ok(None); + } + let error = parse_error(response).await?; + return Err(error); + } + + let bytes = response.into_body().bytes().await?; + let mut decoded_response = + serde_json::from_slice::(&bytes).map_err(new_json_deserialize_error)?; + + self.done = true; + + let mut entries = Vec::with_capacity(decoded_response.files.len()); + + while let Some(status) = decoded_response.files.pop() { + let entry: oio::Entry = match status.is_dir { + true => { + let normalized_path = format!("{}/", &status.path); + let mut meta = Metadata::new(EntryMode::DIR); + meta.set_last_modified(parse_datetime_from_from_timestamp_millis( + status.modification_time, + )?); + oio::Entry::new(&normalized_path, meta) + } + false => { + let mut meta = Metadata::new(EntryMode::FILE); + meta.set_last_modified(parse_datetime_from_from_timestamp_millis( + status.modification_time, + )?); + meta.set_content_length(status.file_size as u64); + oio::Entry::new(&status.path, meta) + } + }; + entries.push(entry); + } + Ok(Some(entries)) + } +} + +#[derive(Debug, Deserialize)] +struct DbfsOutputList { + files: Vec, +} + +#[derive(Debug, Deserialize)] +struct DbfsStatus { + path: String, + is_dir: bool, + file_size: i64, + modification_time: i64, +} diff --git a/core/src/services/dbfs/reader.rs b/core/src/services/dbfs/reader.rs new file mode 100644 index 000000000000..58fb440aba18 --- /dev/null +++ b/core/src/services/dbfs/reader.rs @@ -0,0 +1,311 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cmp::Ordering; +use std::io; +use std::mem; +use std::str::FromStr; +use std::task::ready; +use std::task::Context; +use std::task::Poll; + +use base64::engine::general_purpose; +use base64::Engine; +use bytes::Buf; +use bytes::BufMut; +use bytes::Bytes; +use futures::StreamExt; +use futures::TryStreamExt; +use http::Request; +use http::Response; +use serde::Deserialize; + +use crate::raw::*; +use crate::*; + +impl HttpClient { + /// Send a request in async way. + pub async fn send_dbfs( + &self, + req: Request, + ) -> Result> { + // Uri stores all string alike data in `Bytes` which means + // the clone here is cheap. + let uri = req.uri().clone(); + let is_head = req.method() == http::Method::HEAD; + + let (parts, body) = req.into_parts(); + + let mut req_builder = self + .client + .request( + parts.method, + reqwest::Url::from_str(&uri.to_string()).expect("input request url must be valid"), + ) + .version(parts.version) + .headers(parts.headers); + + req_builder = match body { + AsyncBody::Empty => req_builder.body(reqwest::Body::from("")), + AsyncBody::Bytes(bs) => req_builder.body(reqwest::Body::from(bs)), + AsyncBody::ChunkedBytes(bs) => req_builder.body(reqwest::Body::wrap_stream(bs)), + AsyncBody::Stream(s) => req_builder.body(reqwest::Body::wrap_stream(s)), + }; + + let mut resp = req_builder.send().await.map_err(|err| { + let is_temporary = !( + // Builder related error should not be retried. + err.is_builder() || + // Error returned by RedirectPolicy. + // + // We don't set this by hand, just don't allow retry. + err.is_redirect() || + // We never use `Response::error_for_status`, just don't allow retry. + // + // Status should be checked by our services. + err.is_status() + ); + + let mut oerr = Error::new(ErrorKind::Unexpected, "send async request") + .with_operation("http_util::Client::send_async") + .with_context("url", uri.to_string()) + .set_source(err); + if is_temporary { + oerr = oerr.set_temporary(); + } + + oerr + })?; + + // Get content length from header so that we can check it. + // If the request method is HEAD, we will ignore this. + let content_length = if is_head { + None + } else { + parse_content_length(resp.headers()).expect("response content length must be valid") + }; + + let mut hr = Response::builder() + .version(resp.version()) + .status(resp.status()) + // Insert uri into response extension so that we can fetch + // it later. + .extension(uri.clone()); + // Swap headers directly instead of copy the entire map. + mem::swap(hr.headers_mut().unwrap(), resp.headers_mut()); + + let stream = resp.bytes_stream().map_err(move |err| { + // If stream returns a body related error, we can convert + // it to interrupt so we can retry it. + Error::new(ErrorKind::Unexpected, "read data from http stream") + .map(|v| if err.is_body() { v.set_temporary() } else { v }) + .with_context("url", uri.to_string()) + .set_source(err) + }); + + let body = IncomingDbfsAsyncBody::new(Box::new(oio::into_stream(stream)), content_length); + + let resp = hr.body(body).expect("response must build succeed"); + + Ok(resp) + } +} + +/// IncomingDbfsAsyncBody carries the content returned by remote servers. +/// +/// # Notes +/// +/// Client SHOULD NEVER construct this body. +pub struct IncomingDbfsAsyncBody { + inner: oio::Streamer, + size: Option, + consumed: u64, + chunk: Option, +} + +impl IncomingDbfsAsyncBody { + /// Construct a new incoming async body + pub fn new(s: oio::Streamer, size: Option) -> Self { + Self { + inner: s, + size, + consumed: 0, + chunk: None, + } + } + + /// Consume the entire body. + pub async fn consume(mut self) -> Result<()> { + use oio::ReadExt; + + while let Some(bs) = self.next().await { + bs.map_err(|err| { + Error::new(ErrorKind::Unexpected, "fetch bytes from stream") + .with_operation("http_util::IncomingDbfsAsyncBody::consume") + .set_source(err) + })?; + } + + Ok(()) + } + + /// Consume the response to bytes. + pub async fn bytes(mut self) -> Result { + use oio::ReadExt; + + // If there's only 1 chunk, we can just return Buf::to _bytes() + let mut first = if let Some(buf) = self.next().await { + buf? + } else { + return Ok(Bytes::new()); + }; + + let second = if let Some(buf) = self.next().await { + buf? + } else { + return Ok(first.copy_to_bytes(first.remaining())); + }; + + // With more than 1 buf, we gotta flatten into a Vec first. + let cap = first.remaining() + second.remaining() + self.size.unwrap_or_default() as usize; + let mut vec = Vec::with_capacity(cap); + vec.put(first); + vec.put(second); + + while let Some(buf) = self.next().await { + vec.put(buf?); + } + + Ok(vec.into()) + } + + #[inline] + fn check(expect: u64, actual: u64) -> Result<()> { + match actual.cmp(&expect) { + Ordering::Equal => Ok(()), + Ordering::Less => Err(Error::new( + ErrorKind::ContentIncomplete, + &format!("reader got too less data, expect: {expect}, actual: {actual}"), + ) + .set_temporary()), + Ordering::Greater => Err(Error::new( + ErrorKind::ContentTruncated, + &format!("reader got too much data, expect: {expect}, actual: {actual}"), + ) + .set_temporary()), + } + } +} + +impl oio::Read for IncomingDbfsAsyncBody { + fn poll_read(&mut self, cx: &mut Context<'_>, mut buf: &mut [u8]) -> Poll> { + if buf.is_empty() { + return Poll::Ready(Ok(0)); + } + + // We must get a valid bytes from underlying stream + let bs = loop { + match ready!(self.poll_next(cx)) { + Some(Ok(bs)) if bs.is_empty() => continue, + Some(Ok(bs)) => break bs, + Some(Err(err)) => return Poll::Ready(Err(err)), + None => return Poll::Ready(Ok(0)), + } + }; + + // TODO: reqwest::Body::bytes_stream() in poll_next() will not return whole response at once if the file is too big, which causes serde failed + let mut response_body = match serde_json::from_slice::(&bs) { + Ok(v) => v, + Err(err) => { + return Poll::Ready(Err(Error::new( + ErrorKind::Unexpected, + "parse response content failed", + ) + .with_operation("http_util::IncomingDbfsAsyncBody::poll_read") + .set_source(err))); + } + }; + + response_body.data = general_purpose::STANDARD + .decode(response_body.data) + .map_err(|err| { + Error::new(ErrorKind::Unexpected, "decode response content failed") + .with_operation("http_util::IncomingDbfsAsyncBody::poll_read") + .set_source(err) + }) + .and_then(|v| { + String::from_utf8(v).map_err(|err| { + Error::new( + ErrorKind::Unexpected, + "response data contains invalid utf8 bytes", + ) + .with_operation("http_util::IncomingDbfsAsyncBody::poll_read") + .set_source(err) + }) + })?; + + let amt = response_body.bytes_read as usize; + + buf.put_slice(response_body.data.as_ref()); + + // TODO: will handle chunk here till we find a way to get whole bytes at once + + Poll::Ready(Ok(amt)) + } + + fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> Poll> { + let (_, _) = (cx, pos); + + Poll::Ready(Err(Error::new( + ErrorKind::Unsupported, + "output reader doesn't support seeking", + ))) + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + if let Some(bs) = self.chunk.take() { + return Poll::Ready(Some(Ok(bs))); + } + + // NOTE: inner: Stream> + // Reader::poll_next() -> Stream::poll_next() + // control by reqwest::Body::bytes_stream() + let res = match ready!(self.inner.poll_next_unpin(cx)) { + Some(Ok(bs)) => { + self.consumed += bs.len() as u64; + Some(Ok(bs)) + } + Some(Err(err)) => Some(Err(err)), + None => { + if let Some(size) = self.size { + Self::check(size, self.consumed)?; + } + + None + } + }; + + Poll::Ready(res) + } +} + +#[derive(Debug, Default, Deserialize)] +#[serde(default)] +struct ReadContentJsonResponse { + bytes_read: u64, + data: String, +} diff --git a/core/src/services/dbfs/writer.rs b/core/src/services/dbfs/writer.rs new file mode 100644 index 000000000000..3dfe17b94a1b --- /dev/null +++ b/core/src/services/dbfs/writer.rs @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use http::StatusCode; + +use super::backend::DbfsBackend; +use super::error::parse_error; +use crate::raw::oio::WriteBuf; +use crate::raw::*; +use crate::*; + +pub struct DbfsWriter { + backend: DbfsBackend, + path: String, +} + +impl DbfsWriter { + const MAX_SIMPLE_SIZE: usize = 1 * 1024 * 1024; + + pub fn new(backend: DbfsBackend, _op: OpWrite, path: String) -> Self { + DbfsWriter { backend, path } + } +} + +#[async_trait] +impl oio::OneShotWrite for DbfsWriter { + async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> { + let bs = bs.bytes(bs.remaining()); + let size = bs.len(); + + // MAX_BLOCK_SIZE_EXCEEDED will be thrown if this limit(1MB) is exceeded. + if size >= Self::MAX_SIMPLE_SIZE { + return Err(Error::new( + ErrorKind::Unsupported, + "AppendObjectWrite has not been implemented for Dbfs", + )); + } + + let req = self.backend.dbfs_create_file_request(&self.path, bs)?; + + let resp = self.backend.client.send(req).await?; + + let status = resp.status(); + match status { + StatusCode::CREATED | StatusCode::OK => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } +} diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index d6d1b80806f5..06e12933e2e5 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -248,3 +248,8 @@ pub use self::azfile::Azfile; mod mongodb; #[cfg(feature = "services-mongodb")] pub use self::mongodb::Mongodb; + +#[cfg(feature = "services-dbfs")] +mod dbfs; +#[cfg(feature = "services-dbfs")] +pub use self::dbfs::Dbfs; diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 231d617295c1..879dc6b9e19a 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -52,6 +52,8 @@ pub enum Scheme { Etcd, /// [foundationdb][crate::services::Foundationdb]: Foundationdb services. Foundationdb, + /// [dbfs][crate::services::Dbfs]: DBFS backend support. + Dbfs, /// [fs][crate::services::Fs]: POSIX alike file system. Fs, /// [ftp][crate::services::Ftp]: FTP backend. @@ -279,6 +281,7 @@ impl FromStr for Scheme { "dashmap" => Ok(Scheme::Dashmap), "dropbox" => Ok(Scheme::Dropbox), "etcd" => Ok(Scheme::Etcd), + "dbfs" => Ok(Scheme::Dbfs), "fs" => Ok(Scheme::Fs), "gcs" => Ok(Scheme::Gcs), "gdrive" => Ok(Scheme::Gdrive), @@ -331,6 +334,7 @@ impl From for &'static str { Scheme::D1 => "d1", Scheme::Dashmap => "dashmap", Scheme::Etcd => "etcd", + Scheme::Dbfs => "dbfs", Scheme::Fs => "fs", Scheme::Gcs => "gcs", Scheme::Ghac => "ghac", From 8ebe67142900d39b59cf84a1761ae2f432864edf Mon Sep 17 00:00:00 2001 From: Morris Tai Date: Wed, 18 Oct 2023 09:41:53 -0400 Subject: [PATCH 2/8] feat: re-use Http client in crate and formatting --- core/src/raw/http_util/client.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index 783eaf8fbd1f..0a70a9b43fd1 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -38,7 +38,7 @@ const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(60); /// HttpClient that used across opendal. #[derive(Clone)] pub struct HttpClient { - client: reqwest::Client, + pub(crate) client: reqwest::Client, } /// We don't want users to know details about our clients. @@ -113,7 +113,7 @@ impl HttpClient { // // We don't set this by hand, just don't allow retry. err.is_redirect() || - // We never use `Response::error_for_status`, just don't allow retry. + // We never use `Response::error_for_status`, just don't allow retry. // // Status should be checked by our services. err.is_status() From 2e26409f6c5ce88b8e51c8870894b71bc05bf92e Mon Sep 17 00:00:00 2001 From: Morris Tai Date: Wed, 18 Oct 2023 09:55:59 -0400 Subject: [PATCH 3/8] chore: format modify for clippy check --- core/src/services/dbfs/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/services/dbfs/writer.rs b/core/src/services/dbfs/writer.rs index 3dfe17b94a1b..3d2d23c582b6 100644 --- a/core/src/services/dbfs/writer.rs +++ b/core/src/services/dbfs/writer.rs @@ -30,7 +30,7 @@ pub struct DbfsWriter { } impl DbfsWriter { - const MAX_SIMPLE_SIZE: usize = 1 * 1024 * 1024; + const MAX_SIMPLE_SIZE: usize = 1024 * 1024; pub fn new(backend: DbfsBackend, _op: OpWrite, path: String) -> Self { DbfsWriter { backend, path } From da1d8e00095196898416739a2cb80ae2c5c2b597 Mon Sep 17 00:00:00 2001 From: Morris Tai Date: Wed, 18 Oct 2023 14:47:27 -0400 Subject: [PATCH 4/8] feat: create dedicated core struct for backend --- core/src/services/dbfs/backend.rs | 309 +++++++----------------------- core/src/services/dbfs/core.rs | 236 +++++++++++++++++++++++ core/src/services/dbfs/mod.rs | 2 + core/src/services/dbfs/pager.rs | 15 +- core/src/services/dbfs/writer.rs | 17 +- 5 files changed, 323 insertions(+), 256 deletions(-) create mode 100644 core/src/services/dbfs/core.rs diff --git a/core/src/services/dbfs/backend.rs b/core/src/services/dbfs/backend.rs index 702e8f779322..e8e9654ee623 100644 --- a/core/src/services/dbfs/backend.rs +++ b/core/src/services/dbfs/backend.rs @@ -18,26 +18,21 @@ use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; +use std::sync::Arc; use async_trait::async_trait; -use base64::prelude::BASE64_STANDARD; -use base64::Engine; -use bytes::Bytes; -use http::header; -use http::Request; -use http::Response; use http::StatusCode; use log::debug; use serde::Deserialize; -use serde_json::json; -use super::error::parse_dbfs_read_error; +use crate::raw::*; +use crate::*; + +use super::core::DbfsCore; use super::error::parse_error; use super::pager::DbfsPager; -use super::reader::IncomingDbfsAsyncBody; +use super::reader::DbfsReader; use super::writer::DbfsWriter; -use crate::raw::*; -use crate::*; /// [Dbfs](https://docs.databricks.com/api/azure/workspace/dbfs)'s REST API support. #[doc = include_str!("docs.md")] @@ -138,10 +133,12 @@ impl Builder for DbfsBuilder { debug!("backend build finished: {:?}", &self); Ok(DbfsBackend { - root, - endpoint: self.endpoint.clone(), - token, - client, + core: Arc::new(DbfsCore { + root, + endpoint: endpoint.to_string(), + token, + client, + }), }) } } @@ -149,196 +146,12 @@ impl Builder for DbfsBuilder { /// Backend for DBFS service #[derive(Debug, Clone)] pub struct DbfsBackend { - root: String, - endpoint: String, - token: String, - pub(super) client: HttpClient, -} - -impl DbfsBackend { - fn dbfs_create_dir_request(&self, path: &str) -> Result> { - let url = format!("{}/api/2.0/dbfs/mkdirs", self.endpoint); - let mut req = Request::post(&url); - - let auth_header_content = format!("Bearer {}", self.token); - req = req.header(header::AUTHORIZATION, auth_header_content); - - let p = build_rooted_abs_path(&self.root, path) - .trim_end_matches('/') - .to_string(); - - let req_body = format!("{{\"path\": \"{}\"}}", percent_encode_path(&p)); - let body = AsyncBody::Bytes(Bytes::from(req_body)); - - let req = req.body(body).map_err(new_request_build_error)?; - - Ok(req) - } - - async fn dbfs_delete(&self, path: &str) -> Result> { - let url = format!("{}/api/2.0/dbfs/delete", self.endpoint); - let mut req = Request::post(&url); - - let auth_header_content = format!("Bearer {}", self.token); - req = req.header(header::AUTHORIZATION, auth_header_content); - - let p = build_rooted_abs_path(&self.root, path) - .trim_end_matches('/') - .to_string(); - - let request_body = &json!({ - "path": percent_encode_path(&p), - // TODO: support recursive toggle, should we add a new field in OpDelete? - "recursive": true, - }); - - let body = AsyncBody::Bytes(Bytes::from(request_body.to_string())); - - let req = req.body(body).map_err(new_request_build_error)?; - - self.client.send(req).await - } - - async fn dbfs_rename(&self, from: &str, to: &str) -> Result> { - let source = build_rooted_abs_path(&self.root, from); - let target = build_rooted_abs_path(&self.root, to); - - let url = format!("{}/api/2.0/dbfs/move", self.endpoint); - let mut req = Request::post(&url); - - let auth_header_content = format!("Bearer {}", self.token); - req = req.header(header::AUTHORIZATION, auth_header_content); - - let req_body = &json!({ - "source_path": percent_encode_path(&source), - "destination_path": percent_encode_path(&target), - }); - - let body = AsyncBody::Bytes(Bytes::from(req_body.to_string())); - - let req = req.body(body).map_err(new_request_build_error)?; - - self.client.send(req).await - } - - pub async fn dbfs_list(&self, path: &str) -> Result> { - let p = build_rooted_abs_path(&self.root, path) - .trim_end_matches('/') - .to_string(); - - let url = format!( - "{}/api/2.0/dbfs/list?path={}", - self.endpoint, - percent_encode_path(&p) - ); - let mut req = Request::get(&url); - - let auth_header_content = format!("Bearer {}", self.token); - req = req.header(header::AUTHORIZATION, auth_header_content); - - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.client.send(req).await - } - - pub fn dbfs_create_file_request(&self, path: &str, body: Bytes) -> Result> { - let url = format!("{}/api/2.0/dbfs/put", self.endpoint); - - let contents = BASE64_STANDARD.encode(body); - let mut req = Request::post(&url); - - let auth_header_content = format!("Bearer {}", self.token); - req = req.header(header::AUTHORIZATION, auth_header_content); - - let req_body = &json!({ - "path": path, - "contents": contents, - // TODO: support overwrite toggle, should we add a new field in OpWrite? - "overwrite": true, - }); - - let body = AsyncBody::Bytes(Bytes::from(req_body.to_string())); - - req.body(body).map_err(new_request_build_error) - } - - async fn dbfs_read( - &self, - path: &str, - range: BytesRange, - ) -> Result> { - let p = build_rooted_abs_path(&self.root, path) - .trim_end_matches('/') - .to_string(); - - let mut url = format!( - "{}/api/2.0/dbfs/read?path={}", - self.endpoint, - percent_encode_path(&p) - ); - - if let Some(offset) = range.offset() { - url.push_str(&format!("&offset={}", offset)); - } - - if let Some(length) = range.size() { - url.push_str(&format!("&length={}", length)); - } - - let mut req = Request::get(&url); - - let auth_header_content = format!("Bearer {}", self.token); - req = req.header(header::AUTHORIZATION, auth_header_content); - - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.client.send_dbfs(req).await - } - - async fn dbfs_get_properties(&self, path: &str) -> Result> { - let p = build_rooted_abs_path(&self.root, path) - .trim_end_matches('/') - .to_string(); - - let url = format!( - "{}/api/2.0/dbfs/get-status?path={}", - &self.endpoint, - percent_encode_path(&p) - ); - - let mut req = Request::get(&url); - - let auth_header_content = format!("Bearer {}", self.token); - req = req.header(header::AUTHORIZATION, auth_header_content); - - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.client.send(req).await - } - - async fn dbfs_ensure_parent_path(&self, path: &str) -> Result<()> { - let resp = self.dbfs_get_properties(path).await?; - - match resp.status() { - StatusCode::OK => return Ok(()), - StatusCode::NOT_FOUND => { - self.create_dir(path, OpCreateDir::default()).await?; - } - _ => return Err(parse_error(resp).await?), - } - Ok(()) - } + core: Arc, } #[async_trait] impl Accessor for DbfsBackend { - type Reader = IncomingDbfsAsyncBody; + type Reader = DbfsReader; type BlockingReader = (); type Writer = oio::OneShotWriter; type BlockingWriter = (); @@ -348,7 +161,7 @@ impl Accessor for DbfsBackend { fn info(&self) -> AccessorInfo { let mut am = AccessorInfo::default(); am.set_scheme(Scheme::Dbfs) - .set_root(&self.root) + .set_root(&self.core.root) .set_native_capability(Capability { stat: true, @@ -370,9 +183,7 @@ impl Accessor for DbfsBackend { } async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result { - let req = self.dbfs_create_dir_request(path)?; - - let resp = self.client.send(req).await?; + let resp = self.core.dbfs_create_dir(path).await?; let status = resp.status(); @@ -386,57 +197,69 @@ impl Accessor for DbfsBackend { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let resp = self.dbfs_read(path, args.range()).await?; + let resp = self.core.dbfs_read(path, args.range()).await?; let status = resp.status(); match status { - StatusCode::OK => { - // NOTE: If range is not specified, we need to get content length from stat API. - if let Some(size) = args.range().size() { - let mut meta = parse_into_metadata(path, resp.headers())?; - meta.set_content_length(size); - Ok((RpRead::with_metadata(meta), resp.into_body())) - } else { - let stat_resp = self.dbfs_get_properties(path).await?; - let meta = match stat_resp.status() { - StatusCode::OK => { - let mut meta = parse_into_metadata(path, stat_resp.headers())?; - let bs = stat_resp.into_body().bytes().await?; - let decoded_response = serde_json::from_slice::(&bs) - .map_err(new_json_deserialize_error)?; - meta.set_last_modified(parse_datetime_from_from_timestamp_millis( - decoded_response.modification_time, - )?); - match decoded_response.is_dir { - true => meta.set_mode(EntryMode::DIR), - false => { - meta.set_mode(EntryMode::FILE); - meta.set_content_length(decoded_response.file_size as u64) - } - }; - meta - } - _ => return Err(parse_error(stat_resp).await?), - }; - Ok((RpRead::with_metadata(meta), resp.into_body())) - } + StatusCode::OK | StatusCode::PARTIAL_CONTENT => { + let meta = parse_into_metadata(path, resp.headers())?; + Ok((RpRead::with_metadata(meta), resp.into_body())) } - _ => Err(parse_dbfs_read_error(resp).await?), + _ => Err(parse_error(resp).await?), } + + // let resp = self.core.dbfs_read(path, args.range()).await?; + // + // let status = resp.status(); + // + // match status { + // StatusCode::OK => { + // // NOTE: If range is not specified, we need to get content length from stat API. + // if let Some(size) = args.range().size() { + // let mut meta = parse_into_metadata(path, resp.headers())?; + // meta.set_content_length(size); + // Ok((RpRead::with_metadata(meta), resp.into_body())) + // } else { + // let stat_resp = self.core.dbfs_get_status(path).await?; + // let meta = match stat_resp.status() { + // StatusCode::OK => { + // let mut meta = parse_into_metadata(path, stat_resp.headers())?; + // let bs = stat_resp.into_body().bytes().await?; + // let decoded_response = serde_json::from_slice::(&bs) + // .map_err(new_json_deserialize_error)?; + // meta.set_last_modified(parse_datetime_from_from_timestamp_millis( + // decoded_response.modification_time, + // )?); + // match decoded_response.is_dir { + // true => meta.set_mode(EntryMode::DIR), + // false => { + // meta.set_mode(EntryMode::FILE); + // meta.set_content_length(decoded_response.file_size as u64) + // } + // }; + // meta + // } + // _ => return Err(parse_error(stat_resp).await?), + // }; + // Ok((RpRead::with_metadata(meta), resp.into_body())) + // } + // } + // _ => Err(parse_dbfs_read_error(resp).await?), + // } } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { Ok(( RpWrite::default(), - oio::OneShotWriter::new(DbfsWriter::new(self.clone(), args, path.to_string())), + oio::OneShotWriter::new(DbfsWriter::new(self.core.clone(), args, path.to_string())), )) } async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result { - self.dbfs_ensure_parent_path(to).await?; + self.core.dbfs_ensure_parent_path(to).await?; - let resp = self.dbfs_rename(from, to).await?; + let resp = self.core.dbfs_rename(from, to).await?; let status = resp.status(); @@ -455,7 +278,7 @@ impl Accessor for DbfsBackend { return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); } - let resp = self.dbfs_get_properties(path).await?; + let resp = self.core.dbfs_get_status(path).await?; let status = resp.status(); @@ -486,7 +309,7 @@ impl Accessor for DbfsBackend { /// NOTE: Server will return 200 even if the path doesn't exist. async fn delete(&self, path: &str, _: OpDelete) -> Result { - let resp = self.dbfs_delete(path).await?; + let resp = self.core.dbfs_delete(path).await?; let status = resp.status(); @@ -503,7 +326,7 @@ impl Accessor for DbfsBackend { } async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Pager)> { - let op = DbfsPager::new(self.clone(), path.to_string()); + let op = DbfsPager::new(self.core.clone(), path.to_string()); Ok((RpList::default(), op)) } diff --git a/core/src/services/dbfs/core.rs b/core/src/services/dbfs/core.rs new file mode 100644 index 000000000000..e03e14a1f015 --- /dev/null +++ b/core/src/services/dbfs/core.rs @@ -0,0 +1,236 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; + +use base64::prelude::BASE64_STANDARD; +use base64::Engine; +use bytes::Bytes; +use http::header; +use http::Request; +use http::Response; +use http::StatusCode; +use serde_json::json; + +use crate::raw::build_rooted_abs_path; +use crate::raw::new_request_build_error; +use crate::raw::percent_encode_path; +use crate::raw::AsyncBody; +use crate::raw::BytesRange; +use crate::raw::HttpClient; +use crate::raw::IncomingAsyncBody; +use crate::*; + +use super::error::parse_error; +use super::reader::IncomingDbfsAsyncBody; + +pub struct DbfsCore { + pub root: String, + pub endpoint: String, + pub token: String, + pub client: HttpClient, +} + +impl Debug for DbfsCore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DbfsCore") + .field("root", &self.root) + .field("endpoint", &self.endpoint) + .field("token", &self.token) + .finish_non_exhaustive() + } +} + +impl DbfsCore { + pub async fn dbfs_create_dir(&self, path: &str) -> Result> { + let url = format!("{}/api/2.0/dbfs/mkdirs", self.endpoint); + let mut req = Request::post(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let p = build_rooted_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let req_body = format!("{{\"path\": \"{}\"}}", percent_encode_path(&p)); + let body = AsyncBody::Bytes(Bytes::from(req_body)); + + let req = req.body(body).map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn dbfs_delete(&self, path: &str) -> Result> { + let url = format!("{}/api/2.0/dbfs/delete", self.endpoint); + let mut req = Request::post(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let p = build_rooted_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let request_body = &json!({ + "path": percent_encode_path(&p), + // TODO: support recursive toggle, should we add a new field in OpDelete? + "recursive": true, + }); + + let body = AsyncBody::Bytes(Bytes::from(request_body.to_string())); + + let req = req.body(body).map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn dbfs_rename(&self, from: &str, to: &str) -> Result> { + let source = build_rooted_abs_path(&self.root, from); + let target = build_rooted_abs_path(&self.root, to); + + let url = format!("{}/api/2.0/dbfs/move", self.endpoint); + let mut req = Request::post(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req_body = &json!({ + "source_path": percent_encode_path(&source), + "destination_path": percent_encode_path(&target), + }); + + let body = AsyncBody::Bytes(Bytes::from(req_body.to_string())); + + let req = req.body(body).map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn dbfs_list(&self, path: &str) -> Result> { + let p = build_rooted_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let url = format!( + "{}/api/2.0/dbfs/list?path={}", + self.endpoint, + percent_encode_path(&p) + ); + let mut req = Request::get(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub fn dbfs_create_file_request(&self, path: &str, body: Bytes) -> Result> { + let url = format!("{}/api/2.0/dbfs/put", self.endpoint); + + let contents = BASE64_STANDARD.encode(body); + let mut req = Request::post(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req_body = &json!({ + "path": path, + "contents": contents, + "overwrite": true, + }); + + let body = AsyncBody::Bytes(Bytes::from(req_body.to_string())); + + req.body(body).map_err(new_request_build_error) + } + + pub async fn dbfs_read( + &self, + path: &str, + range: BytesRange, + ) -> Result> { + let p = build_rooted_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let mut url = format!( + "{}/api/2.0/dbfs/read?path={}", + self.endpoint, + percent_encode_path(&p) + ); + + if let Some(offset) = range.offset() { + url.push_str(&format!("&offset={}", offset)); + } + + if let Some(length) = range.size() { + url.push_str(&format!("&length={}", length)); + } + + let mut req = Request::get(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn dbfs_get_status(&self, path: &str) -> Result> { + let p = build_rooted_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let url = format!( + "{}/api/2.0/dbfs/get-status?path={}", + &self.endpoint, + percent_encode_path(&p) + ); + + let mut req = Request::get(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn dbfs_ensure_parent_path(&self, path: &str) -> Result<()> { + let resp = self.dbfs_get_status(path).await?; + + match resp.status() { + StatusCode::OK => return Ok(()), + StatusCode::NOT_FOUND => { + self.dbfs_create_dir(path).await?; + } + _ => return Err(parse_error(resp).await?), + } + Ok(()) + } +} diff --git a/core/src/services/dbfs/mod.rs b/core/src/services/dbfs/mod.rs index 094ebf76b4ed..bb72050318ff 100644 --- a/core/src/services/dbfs/mod.rs +++ b/core/src/services/dbfs/mod.rs @@ -17,6 +17,8 @@ mod backend; pub use backend::DbfsBuilder as Dbfs; + +mod core; mod error; mod pager; mod reader; diff --git a/core/src/services/dbfs/pager.rs b/core/src/services/dbfs/pager.rs index 98306bd27aad..58aef5d757dc 100644 --- a/core/src/services/dbfs/pager.rs +++ b/core/src/services/dbfs/pager.rs @@ -15,25 +15,28 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use async_trait::async_trait; use http::StatusCode; use serde::Deserialize; -use super::backend::DbfsBackend; -use super::error::parse_error; use crate::raw::*; +use crate::services::dbfs::core::DbfsCore; use crate::*; +use super::error::parse_error; + pub struct DbfsPager { - backend: DbfsBackend, + core: Arc, path: String, done: bool, } impl DbfsPager { - pub fn new(backend: DbfsBackend, path: String) -> Self { + pub fn new(core: Arc, path: String) -> Self { Self { - backend, + core, path, done: false, } @@ -47,7 +50,7 @@ impl oio::Page for DbfsPager { return Ok(None); } - let response = self.backend.dbfs_list(&self.path).await?; + let response = self.core.dbfs_list(&self.path).await?; let status_code = response.status(); if !status_code.is_success() { diff --git a/core/src/services/dbfs/writer.rs b/core/src/services/dbfs/writer.rs index 3d2d23c582b6..04dbe4611812 100644 --- a/core/src/services/dbfs/writer.rs +++ b/core/src/services/dbfs/writer.rs @@ -15,25 +15,28 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use async_trait::async_trait; use http::StatusCode; -use super::backend::DbfsBackend; -use super::error::parse_error; use crate::raw::oio::WriteBuf; use crate::raw::*; +use crate::services::dbfs::core::DbfsCore; use crate::*; +use super::error::parse_error; + pub struct DbfsWriter { - backend: DbfsBackend, + core: Arc, path: String, } impl DbfsWriter { const MAX_SIMPLE_SIZE: usize = 1024 * 1024; - pub fn new(backend: DbfsBackend, _op: OpWrite, path: String) -> Self { - DbfsWriter { backend, path } + pub fn new(core: Arc, _op: OpWrite, path: String) -> Self { + DbfsWriter { core, path } } } @@ -51,9 +54,9 @@ impl oio::OneShotWrite for DbfsWriter { )); } - let req = self.backend.dbfs_create_file_request(&self.path, bs)?; + let req = self.core.dbfs_create_file_request(&self.path, bs)?; - let resp = self.backend.client.send(req).await?; + let resp = self.core.client.send(req).await?; let status = resp.status(); match status { From a06ad4c5f817ad28e26af36241cd3f70af4c3dc7 Mon Sep 17 00:00:00 2001 From: Morris Tai Date: Wed, 25 Oct 2023 10:29:09 -0400 Subject: [PATCH 5/8] feat: implement DBFS reader --- core/src/services/dbfs/backend.rs | 75 +++---- core/src/services/dbfs/core.rs | 25 +-- core/src/services/dbfs/error.rs | 32 --- core/src/services/dbfs/reader.rs | 330 +++++++++--------------------- 4 files changed, 142 insertions(+), 320 deletions(-) diff --git a/core/src/services/dbfs/backend.rs b/core/src/services/dbfs/backend.rs index e8e9654ee623..df035027ea68 100644 --- a/core/src/services/dbfs/backend.rs +++ b/core/src/services/dbfs/backend.rs @@ -197,56 +197,37 @@ impl Accessor for DbfsBackend { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let resp = self.core.dbfs_read(path, args.range()).await?; - - let status = resp.status(); - - match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) + let mut meta = Metadata::new(EntryMode::FILE); + + if let Some(length) = args.range().size() { + meta.set_content_length(length); + } else { + let stat_resp = self.core.dbfs_get_status(path).await?; + meta = parse_into_metadata(path, stat_resp.headers())?; + let decoded_response = + serde_json::from_slice::(&stat_resp.into_body().bytes().await?) + .map_err(new_json_deserialize_error)?; + meta.set_last_modified(parse_datetime_from_from_timestamp_millis( + decoded_response.modification_time, + )?); + meta.set_mode(if decoded_response.is_dir { + EntryMode::DIR + } else { + EntryMode::FILE + }); + if !decoded_response.is_dir { + meta.set_content_length(decoded_response.file_size as u64); } - _ => Err(parse_error(resp).await?), } - // let resp = self.core.dbfs_read(path, args.range()).await?; - // - // let status = resp.status(); - // - // match status { - // StatusCode::OK => { - // // NOTE: If range is not specified, we need to get content length from stat API. - // if let Some(size) = args.range().size() { - // let mut meta = parse_into_metadata(path, resp.headers())?; - // meta.set_content_length(size); - // Ok((RpRead::with_metadata(meta), resp.into_body())) - // } else { - // let stat_resp = self.core.dbfs_get_status(path).await?; - // let meta = match stat_resp.status() { - // StatusCode::OK => { - // let mut meta = parse_into_metadata(path, stat_resp.headers())?; - // let bs = stat_resp.into_body().bytes().await?; - // let decoded_response = serde_json::from_slice::(&bs) - // .map_err(new_json_deserialize_error)?; - // meta.set_last_modified(parse_datetime_from_from_timestamp_millis( - // decoded_response.modification_time, - // )?); - // match decoded_response.is_dir { - // true => meta.set_mode(EntryMode::DIR), - // false => { - // meta.set_mode(EntryMode::FILE); - // meta.set_content_length(decoded_response.file_size as u64) - // } - // }; - // meta - // } - // _ => return Err(parse_error(stat_resp).await?), - // }; - // Ok((RpRead::with_metadata(meta), resp.into_body())) - // } - // } - // _ => Err(parse_dbfs_read_error(resp).await?), - // } + let op = DbfsReader::new( + self.core.clone(), + args, + path.to_string(), + meta.content_length(), + ); + + Ok((RpRead::with_metadata(meta), op)) } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { diff --git a/core/src/services/dbfs/core.rs b/core/src/services/dbfs/core.rs index e03e14a1f015..34aff70066c8 100644 --- a/core/src/services/dbfs/core.rs +++ b/core/src/services/dbfs/core.rs @@ -26,17 +26,10 @@ use http::Response; use http::StatusCode; use serde_json::json; -use crate::raw::build_rooted_abs_path; -use crate::raw::new_request_build_error; -use crate::raw::percent_encode_path; -use crate::raw::AsyncBody; -use crate::raw::BytesRange; -use crate::raw::HttpClient; -use crate::raw::IncomingAsyncBody; +use crate::raw::*; use crate::*; use super::error::parse_error; -use super::reader::IncomingDbfsAsyncBody; pub struct DbfsCore { pub root: String, @@ -166,7 +159,8 @@ impl DbfsCore { pub async fn dbfs_read( &self, path: &str, - range: BytesRange, + offset: u64, + length: u64, ) -> Result> { let p = build_rooted_abs_path(&self.root, path) .trim_end_matches('/') @@ -178,11 +172,11 @@ impl DbfsCore { percent_encode_path(&p) ); - if let Some(offset) = range.offset() { + if offset > 0 { url.push_str(&format!("&offset={}", offset)); } - if let Some(length) = range.size() { + if length > 0 { url.push_str(&format!("&length={}", length)); } @@ -195,7 +189,14 @@ impl DbfsCore { .body(AsyncBody::Empty) .map_err(new_request_build_error)?; - self.client.send(req).await + let resp = self.client.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(resp), + _ => Err(parse_error(resp).await?), + } } pub async fn dbfs_get_status(&self, path: &str) -> Result> { diff --git a/core/src/services/dbfs/error.rs b/core/src/services/dbfs/error.rs index 8e0cef99d254..41069d45fb0d 100644 --- a/core/src/services/dbfs/error.rs +++ b/core/src/services/dbfs/error.rs @@ -21,7 +21,6 @@ use http::Response; use http::StatusCode; use serde::Deserialize; -use super::reader::IncomingDbfsAsyncBody; use crate::raw::*; use crate::Error; use crate::ErrorKind; @@ -75,34 +74,3 @@ pub async fn parse_error(resp: Response) -> Result { Ok(err) } - -pub async fn parse_dbfs_read_error(resp: Response) -> Result { - let (parts, body) = resp.into_parts(); - let bs = body.bytes().await?; - - let (kind, retryable) = match parts.status { - StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), - StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false), - StatusCode::PRECONDITION_FAILED => (ErrorKind::ConditionNotMatch, false), - StatusCode::INTERNAL_SERVER_ERROR - | StatusCode::BAD_GATEWAY - | StatusCode::SERVICE_UNAVAILABLE - | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true), - _ => (ErrorKind::Unexpected, false), - }; - - let message = match serde_json::from_slice::(&bs) { - Ok(dbfs_error) => format!("{:?}", dbfs_error.message), - Err(_) => String::from_utf8_lossy(&bs).into_owned(), - }; - - let mut err = Error::new(kind, &message); - - err = with_error_response_context(err, parts); - - if retryable { - err = err.set_temporary(); - } - - Ok(err) -} diff --git a/core/src/services/dbfs/reader.rs b/core/src/services/dbfs/reader.rs index 58fb440aba18..2e71a688d3ee 100644 --- a/core/src/services/dbfs/reader.rs +++ b/core/src/services/dbfs/reader.rs @@ -15,228 +15,62 @@ // specific language governing permissions and limitations // under the License. -use std::cmp::Ordering; -use std::io; -use std::mem; -use std::str::FromStr; +use std::cmp; +use std::io::SeekFrom; +use std::sync::Arc; use std::task::ready; use std::task::Context; use std::task::Poll; +use async_trait::async_trait; use base64::engine::general_purpose; use base64::Engine; -use bytes::Buf; use bytes::BufMut; use bytes::Bytes; -use futures::StreamExt; -use futures::TryStreamExt; -use http::Request; +use bytes::BytesMut; +use futures::future::BoxFuture; use http::Response; use serde::Deserialize; +use super::core::DbfsCore; + +use crate::raw::oio::WriteBuf; use crate::raw::*; use crate::*; -impl HttpClient { - /// Send a request in async way. - pub async fn send_dbfs( - &self, - req: Request, - ) -> Result> { - // Uri stores all string alike data in `Bytes` which means - // the clone here is cheap. - let uri = req.uri().clone(); - let is_head = req.method() == http::Method::HEAD; - - let (parts, body) = req.into_parts(); - - let mut req_builder = self - .client - .request( - parts.method, - reqwest::Url::from_str(&uri.to_string()).expect("input request url must be valid"), - ) - .version(parts.version) - .headers(parts.headers); - - req_builder = match body { - AsyncBody::Empty => req_builder.body(reqwest::Body::from("")), - AsyncBody::Bytes(bs) => req_builder.body(reqwest::Body::from(bs)), - AsyncBody::ChunkedBytes(bs) => req_builder.body(reqwest::Body::wrap_stream(bs)), - AsyncBody::Stream(s) => req_builder.body(reqwest::Body::wrap_stream(s)), - }; - - let mut resp = req_builder.send().await.map_err(|err| { - let is_temporary = !( - // Builder related error should not be retried. - err.is_builder() || - // Error returned by RedirectPolicy. - // - // We don't set this by hand, just don't allow retry. - err.is_redirect() || - // We never use `Response::error_for_status`, just don't allow retry. - // - // Status should be checked by our services. - err.is_status() - ); - - let mut oerr = Error::new(ErrorKind::Unexpected, "send async request") - .with_operation("http_util::Client::send_async") - .with_context("url", uri.to_string()) - .set_source(err); - if is_temporary { - oerr = oerr.set_temporary(); - } - - oerr - })?; - - // Get content length from header so that we can check it. - // If the request method is HEAD, we will ignore this. - let content_length = if is_head { - None - } else { - parse_content_length(resp.headers()).expect("response content length must be valid") - }; - - let mut hr = Response::builder() - .version(resp.version()) - .status(resp.status()) - // Insert uri into response extension so that we can fetch - // it later. - .extension(uri.clone()); - // Swap headers directly instead of copy the entire map. - mem::swap(hr.headers_mut().unwrap(), resp.headers_mut()); - - let stream = resp.bytes_stream().map_err(move |err| { - // If stream returns a body related error, we can convert - // it to interrupt so we can retry it. - Error::new(ErrorKind::Unexpected, "read data from http stream") - .map(|v| if err.is_body() { v.set_temporary() } else { v }) - .with_context("url", uri.to_string()) - .set_source(err) - }); - - let body = IncomingDbfsAsyncBody::new(Box::new(oio::into_stream(stream)), content_length); - - let resp = hr.body(body).expect("response must build succeed"); - - Ok(resp) - } -} +const DBFS_READ_LIMIT: usize = 1048576; -/// IncomingDbfsAsyncBody carries the content returned by remote servers. -/// -/// # Notes -/// -/// Client SHOULD NEVER construct this body. -pub struct IncomingDbfsAsyncBody { - inner: oio::Streamer, - size: Option, - consumed: u64, - chunk: Option, +pub struct DbfsReader { + state: State, + path: String, + offset: u64, + buffer: BytesMut, } -impl IncomingDbfsAsyncBody { - /// Construct a new incoming async body - pub fn new(s: oio::Streamer, size: Option) -> Self { - Self { - inner: s, - size, - consumed: 0, - chunk: None, - } - } - - /// Consume the entire body. - pub async fn consume(mut self) -> Result<()> { - use oio::ReadExt; - - while let Some(bs) = self.next().await { - bs.map_err(|err| { - Error::new(ErrorKind::Unexpected, "fetch bytes from stream") - .with_operation("http_util::IncomingDbfsAsyncBody::consume") - .set_source(err) - })?; - } - - Ok(()) - } - - /// Consume the response to bytes. - pub async fn bytes(mut self) -> Result { - use oio::ReadExt; - - // If there's only 1 chunk, we can just return Buf::to _bytes() - let mut first = if let Some(buf) = self.next().await { - buf? - } else { - return Ok(Bytes::new()); - }; - - let second = if let Some(buf) = self.next().await { - buf? - } else { - return Ok(first.copy_to_bytes(first.remaining())); - }; - - // With more than 1 buf, we gotta flatten into a Vec first. - let cap = first.remaining() + second.remaining() + self.size.unwrap_or_default() as usize; - let mut vec = Vec::with_capacity(cap); - vec.put(first); - vec.put(second); - - while let Some(buf) = self.next().await { - vec.put(buf?); +impl DbfsReader { + pub fn new(core: Arc, op: OpRead, path: String, content_length: u64) -> Self { + DbfsReader { + state: State::Idle(Some(core)), + path, + offset: op.range().offset().unwrap_or(0), + buffer: BytesMut::with_capacity(content_length as usize), } - - Ok(vec.into()) } #[inline] - fn check(expect: u64, actual: u64) -> Result<()> { - match actual.cmp(&expect) { - Ordering::Equal => Ok(()), - Ordering::Less => Err(Error::new( - ErrorKind::ContentIncomplete, - &format!("reader got too less data, expect: {expect}, actual: {actual}"), - ) - .set_temporary()), - Ordering::Greater => Err(Error::new( - ErrorKind::ContentTruncated, - &format!("reader got too much data, expect: {expect}, actual: {actual}"), - ) - .set_temporary()), - } + fn set_offset(&mut self, offset: u64) { + self.offset = offset; } -} -impl oio::Read for IncomingDbfsAsyncBody { - fn poll_read(&mut self, cx: &mut Context<'_>, mut buf: &mut [u8]) -> Poll> { - if buf.is_empty() { - return Poll::Ready(Ok(0)); - } - - // We must get a valid bytes from underlying stream - let bs = loop { - match ready!(self.poll_next(cx)) { - Some(Ok(bs)) if bs.is_empty() => continue, - Some(Ok(bs)) => break bs, - Some(Err(err)) => return Poll::Ready(Err(err)), - None => return Poll::Ready(Ok(0)), - } - }; - - // TODO: reqwest::Body::bytes_stream() in poll_next() will not return whole response at once if the file is too big, which causes serde failed - let mut response_body = match serde_json::from_slice::(&bs) { + fn serde_json_decode(&self, bs: &Bytes) -> Result { + let mut response_body = match serde_json::from_slice::(bs) { Ok(v) => v, Err(err) => { - return Poll::Ready(Err(Error::new( - ErrorKind::Unexpected, - "parse response content failed", - ) - .with_operation("http_util::IncomingDbfsAsyncBody::poll_read") - .set_source(err))); + return Err( + Error::new(ErrorKind::Unexpected, "parse response content failed") + .with_operation("http_util::IncomingDbfsAsyncBody::poll_read") + .set_source(err), + ); } }; @@ -258,48 +92,86 @@ impl oio::Read for IncomingDbfsAsyncBody { }) })?; - let amt = response_body.bytes_read as usize; - - buf.put_slice(response_body.data.as_ref()); - - // TODO: will handle chunk here till we find a way to get whole bytes at once - - Poll::Ready(Ok(amt)) + Ok(response_body.data.into()) } +} - fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> Poll> { - let (_, _) = (cx, pos); +enum State { + Idle(Option>), + Read(BoxFuture<'static, (Arc, Result>)>), + Decode(BoxFuture<'static, (Arc, Result)>), +} - Poll::Ready(Err(Error::new( - ErrorKind::Unsupported, - "output reader doesn't support seeking", - ))) - } +/// # Safety +/// +/// We will only take `&mut Self` reference for DbfsReader. +unsafe impl Sync for DbfsReader {} - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - if let Some(bs) = self.chunk.take() { - return Poll::Ready(Some(Ok(bs))); - } +#[async_trait] +impl oio::Read for DbfsReader { + fn poll_read(&mut self, cx: &mut Context<'_>, mut buf: &mut [u8]) -> Poll> { + while self.buffer.remaining() != buf.len() { + match &mut self.state { + State::Idle(core) => { + let core = core.take().expect("DbfsReader must be initialized"); + + let path = self.path.clone(); + let offset = self.offset; + let len = cmp::min(buf.len(), DBFS_READ_LIMIT); + + let fut = async move { + let resp = async { core.dbfs_read(&path, offset, len as u64).await }.await; + (core, resp) + }; + self.state = State::Read(Box::pin(fut)); + } + State::Read(fut) => { + let (core, resp) = ready!(fut.as_mut().poll(cx)); + let body = resp?.into_body(); + + let fut = async move { + let bs = async { body.bytes().await }.await; + (core, bs) + }; + self.state = State::Decode(Box::pin(fut)); + } + State::Decode(fut) => { + let (core, bs) = ready!(fut.as_mut().poll(cx)); + let data = self.serde_json_decode(&bs?)?; - // NOTE: inner: Stream> - // Reader::poll_next() -> Stream::poll_next() - // control by reqwest::Body::bytes_stream() - let res = match ready!(self.inner.poll_next_unpin(cx)) { - Some(Ok(bs)) => { - self.consumed += bs.len() as u64; - Some(Ok(bs)) - } - Some(Err(err)) => Some(Err(err)), - None => { - if let Some(size) = self.size { - Self::check(size, self.consumed)?; + self.buffer.put_slice(&data[..]); + self.set_offset(self.offset + data.len() as u64); + self.state = State::Idle(Some(core)); } + } + } + buf.put_slice(&self.buffer[..]); + Poll::Ready(Ok(self.buffer.remaining())) + } - None + fn poll_seek(&mut self, _cx: &mut Context<'_>, pos: SeekFrom) -> Poll> { + // TODO: drop existing buf and change the offset? + match pos { + SeekFrom::Start(n) => { + self.set_offset(n); + } + SeekFrom::End(n) => { + self.set_offset((self.buffer.remaining() as i64 + n) as u64); + } + SeekFrom::Current(n) => { + self.set_offset((self.offset as i64 + n) as u64); } }; + Poll::Ready(Ok(0)) + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + let _ = cx; - Poll::Ready(res) + Poll::Ready(Some(Err(Error::new( + ErrorKind::Unsupported, + "output reader doesn't support iterating", + )))) } } From 281638052f21ba582aeebef9c294d0b92f90e548 Mon Sep 17 00:00:00 2001 From: Morris Tai Date: Wed, 25 Oct 2023 10:34:10 -0400 Subject: [PATCH 6/8] chore: drop scope change for HttpClient --- core/src/raw/http_util/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index 0a70a9b43fd1..9ac5ac4e26b3 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -38,7 +38,7 @@ const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(60); /// HttpClient that used across opendal. #[derive(Clone)] pub struct HttpClient { - pub(crate) client: reqwest::Client, + client: reqwest::Client, } /// We don't want users to know details about our clients. From a7520334f551accd6cf902f5dcaf5d01463c18cf Mon Sep 17 00:00:00 2001 From: Morris Tai Date: Wed, 25 Oct 2023 13:31:21 -0400 Subject: [PATCH 7/8] feat: fix PR review #3334 --- core/src/services/dbfs/backend.rs | 7 +-- core/src/services/dbfs/reader.rs | 74 +++++++++++++------------------ 2 files changed, 31 insertions(+), 50 deletions(-) diff --git a/core/src/services/dbfs/backend.rs b/core/src/services/dbfs/backend.rs index df035027ea68..520489515db2 100644 --- a/core/src/services/dbfs/backend.rs +++ b/core/src/services/dbfs/backend.rs @@ -220,12 +220,7 @@ impl Accessor for DbfsBackend { } } - let op = DbfsReader::new( - self.core.clone(), - args, - path.to_string(), - meta.content_length(), - ); + let op = DbfsReader::new(self.core.clone(), args, path.to_string()); Ok((RpRead::with_metadata(meta), op)) } diff --git a/core/src/services/dbfs/reader.rs b/core/src/services/dbfs/reader.rs index 2e71a688d3ee..97a4c101d1f8 100644 --- a/core/src/services/dbfs/reader.rs +++ b/core/src/services/dbfs/reader.rs @@ -27,14 +27,11 @@ use base64::engine::general_purpose; use base64::Engine; use bytes::BufMut; use bytes::Bytes; -use bytes::BytesMut; use futures::future::BoxFuture; -use http::Response; use serde::Deserialize; use super::core::DbfsCore; -use crate::raw::oio::WriteBuf; use crate::raw::*; use crate::*; @@ -44,16 +41,16 @@ pub struct DbfsReader { state: State, path: String, offset: u64, - buffer: BytesMut, + has_filled: u64, } impl DbfsReader { - pub fn new(core: Arc, op: OpRead, path: String, content_length: u64) -> Self { + pub fn new(core: Arc, op: OpRead, path: String) -> Self { DbfsReader { - state: State::Idle(Some(core)), + state: State::Reading(Some(core)), path, offset: op.range().offset().unwrap_or(0), - buffer: BytesMut::with_capacity(content_length as usize), + has_filled: 0, } } @@ -63,7 +60,7 @@ impl DbfsReader { } fn serde_json_decode(&self, bs: &Bytes) -> Result { - let mut response_body = match serde_json::from_slice::(bs) { + let response_body = match serde_json::from_slice::(bs) { Ok(v) => v, Err(err) => { return Err( @@ -74,7 +71,7 @@ impl DbfsReader { } }; - response_body.data = general_purpose::STANDARD + let decoded_data = general_purpose::STANDARD .decode(response_body.data) .map_err(|err| { Error::new(ErrorKind::Unexpected, "decode response content failed") @@ -92,14 +89,13 @@ impl DbfsReader { }) })?; - Ok(response_body.data.into()) + Ok(decoded_data.into()) } } enum State { - Idle(Option>), - Read(BoxFuture<'static, (Arc, Result>)>), - Decode(BoxFuture<'static, (Arc, Result)>), + Reading(Option>), + Finalize(BoxFuture<'static, (Arc, Result)>), } /// # Safety @@ -110,9 +106,9 @@ unsafe impl Sync for DbfsReader {} #[async_trait] impl oio::Read for DbfsReader { fn poll_read(&mut self, cx: &mut Context<'_>, mut buf: &mut [u8]) -> Poll> { - while self.buffer.remaining() != buf.len() { + while self.has_filled as usize != buf.len() { match &mut self.state { - State::Idle(core) => { + State::Reading(core) => { let core = core.take().expect("DbfsReader must be initialized"); let path = self.path.clone(); @@ -121,48 +117,38 @@ impl oio::Read for DbfsReader { let fut = async move { let resp = async { core.dbfs_read(&path, offset, len as u64).await }.await; - (core, resp) - }; - self.state = State::Read(Box::pin(fut)); - } - State::Read(fut) => { - let (core, resp) = ready!(fut.as_mut().poll(cx)); - let body = resp?.into_body(); - - let fut = async move { + let body = match resp { + Ok(resp) => resp.into_body(), + Err(err) => { + return (core, Err(err)); + } + }; let bs = async { body.bytes().await }.await; (core, bs) }; - self.state = State::Decode(Box::pin(fut)); + self.state = State::Finalize(Box::pin(fut)); } - State::Decode(fut) => { + State::Finalize(fut) => { let (core, bs) = ready!(fut.as_mut().poll(cx)); let data = self.serde_json_decode(&bs?)?; - self.buffer.put_slice(&data[..]); + buf.put_slice(&data[..]); self.set_offset(self.offset + data.len() as u64); - self.state = State::Idle(Some(core)); + self.has_filled += data.len() as u64; + self.state = State::Reading(Some(core)); } } } - buf.put_slice(&self.buffer[..]); - Poll::Ready(Ok(self.buffer.remaining())) + Poll::Ready(Ok(self.has_filled as usize)) } - fn poll_seek(&mut self, _cx: &mut Context<'_>, pos: SeekFrom) -> Poll> { - // TODO: drop existing buf and change the offset? - match pos { - SeekFrom::Start(n) => { - self.set_offset(n); - } - SeekFrom::End(n) => { - self.set_offset((self.buffer.remaining() as i64 + n) as u64); - } - SeekFrom::Current(n) => { - self.set_offset((self.offset as i64 + n) as u64); - } - }; - Poll::Ready(Ok(0)) + fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll> { + let (_, _) = (cx, pos); + + Poll::Ready(Err(Error::new( + ErrorKind::Unsupported, + "output reader doesn't support seeking", + ))) } fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { From cfd5d11f88eb212e74fefe9790d2e7c36f2fa27b Mon Sep 17 00:00:00 2001 From: Morris Tai Date: Wed, 25 Oct 2023 23:47:14 -0400 Subject: [PATCH 8/8] feat: fix PR review #3334 --- core/src/services/dbfs/backend.rs | 23 ++++++++++------------- core/src/services/dbfs/core.rs | 6 ++++-- core/src/services/dbfs/reader.rs | 14 +++----------- 3 files changed, 17 insertions(+), 26 deletions(-) diff --git a/core/src/services/dbfs/backend.rs b/core/src/services/dbfs/backend.rs index 520489515db2..c01acb043ae6 100644 --- a/core/src/services/dbfs/backend.rs +++ b/core/src/services/dbfs/backend.rs @@ -39,7 +39,7 @@ use super::writer::DbfsWriter; #[derive(Default, Clone)] pub struct DbfsBuilder { root: Option, - endpoint: String, + endpoint: Option, token: Option, } @@ -77,8 +77,11 @@ impl DbfsBuilder { /// - Azure: `https://adb-1234567890123456.78.azuredatabricks.net` /// - Aws: `https://dbc-123a5678-90bc.cloud.databricks.com` pub fn endpoint(&mut self, endpoint: &str) -> &mut Self { - assert!(!endpoint.is_empty()); - self.endpoint = endpoint.trim_end_matches('/').to_string(); + self.endpoint = if endpoint.is_empty() { + None + } else { + Some(endpoint.trim_end_matches('/').to_string()) + }; self } @@ -111,9 +114,9 @@ impl Builder for DbfsBuilder { let root = normalize_root(&self.root.take().unwrap_or_default()); debug!("backend use root {}", root); - let endpoint = match self.endpoint.is_empty() { - false => Ok(&self.endpoint), - true => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty") + let endpoint = match &self.endpoint { + Some(endpoint) => Ok(endpoint.clone()), + None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty") .with_operation("Builder::build") .with_context("service", Scheme::Dbfs)), }?; @@ -291,13 +294,7 @@ impl Accessor for DbfsBackend { match status { StatusCode::OK => Ok(RpDelete::default()), - _ => { - let err = parse_error(resp).await?; - match err.kind() { - ErrorKind::NotFound => Ok(RpDelete::default()), - _ => Err(err), - } - } + _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/dbfs/core.rs b/core/src/services/dbfs/core.rs index 34aff70066c8..44e20e571099 100644 --- a/core/src/services/dbfs/core.rs +++ b/core/src/services/dbfs/core.rs @@ -60,8 +60,10 @@ impl DbfsCore { .trim_end_matches('/') .to_string(); - let req_body = format!("{{\"path\": \"{}\"}}", percent_encode_path(&p)); - let body = AsyncBody::Bytes(Bytes::from(req_body)); + let req_body = &json!({ + "path": percent_encode_path(&p), + }); + let body = AsyncBody::Bytes(Bytes::from(req_body.to_string())); let req = req.body(body).map_err(new_request_build_error)?; diff --git a/core/src/services/dbfs/reader.rs b/core/src/services/dbfs/reader.rs index 97a4c101d1f8..a74ddf71987e 100644 --- a/core/src/services/dbfs/reader.rs +++ b/core/src/services/dbfs/reader.rs @@ -35,7 +35,9 @@ use super::core::DbfsCore; use crate::raw::*; use crate::*; -const DBFS_READ_LIMIT: usize = 1048576; +// The number of bytes to read starting from the offset. This has a limit of 1 MB +// Reference: https://docs.databricks.com/api/azure/workspace/dbfs/read +const DBFS_READ_LIMIT: usize = 1024 * 1024; pub struct DbfsReader { state: State, @@ -77,16 +79,6 @@ impl DbfsReader { Error::new(ErrorKind::Unexpected, "decode response content failed") .with_operation("http_util::IncomingDbfsAsyncBody::poll_read") .set_source(err) - }) - .and_then(|v| { - String::from_utf8(v).map_err(|err| { - Error::new( - ErrorKind::Unexpected, - "response data contains invalid utf8 bytes", - ) - .with_operation("http_util::IncomingDbfsAsyncBody::poll_read") - .set_source(err) - }) })?; Ok(decoded_data.into())