diff --git a/core/src/raw/http_util/body.rs b/core/src/raw/http_util/body.rs index 4d874e06f235..f6359ec4cee2 100644 --- a/core/src/raw/http_util/body.rs +++ b/core/src/raw/http_util/body.rs @@ -33,8 +33,6 @@ use crate::Error; use crate::ErrorKind; use crate::Result; -use super::multipart::Multipart; - /// Body used in async HTTP requests. #[derive(Default)] pub enum AsyncBody { @@ -43,11 +41,6 @@ pub enum AsyncBody { Empty, /// Body with bytes. Bytes(Bytes), - /// Body with a multipart field. - /// - /// If input with this field, we will goto the internal multipart - /// handle logic. - Multipart(Multipart), } type BytesStream = Box> + Send + Sync + Unpin>; diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index 6b478ef810c0..c6cb38a468a4 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -21,8 +21,6 @@ use std::mem; use std::str::FromStr; use futures::TryStreamExt; -use http::header::CONTENT_LENGTH; -use http::header::CONTENT_TYPE; use http::Request; use http::Response; use reqwest::redirect::Policy; @@ -99,19 +97,6 @@ impl HttpClient { req_builder = match body { AsyncBody::Empty => req_builder.body(reqwest::Body::from("")), AsyncBody::Bytes(bs) => req_builder.body(reqwest::Body::from(bs)), - AsyncBody::Multipart(mp) => { - let (boundary, bs) = mp.build(); - - // Insert content type with correct boundary. - req_builder = req_builder.header( - CONTENT_TYPE, - format!("multipart/form-data; boundary={boundary}").as_str(), - ); - // Insert content length with calculated size. - req_builder = req_builder.header(CONTENT_LENGTH, bs.len()); - - req_builder.body(reqwest::Body::from(bs)) - } }; let mut resp = req_builder.send().await.map_err(|err| { diff --git a/core/src/raw/http_util/mod.rs b/core/src/raw/http_util/mod.rs index f94098ceb3cc..dc6822b88011 100644 --- a/core/src/raw/http_util/mod.rs +++ b/core/src/raw/http_util/mod.rs @@ -61,5 +61,7 @@ mod bytes_content_range; pub use bytes_content_range::BytesContentRange; mod multipart; +pub use multipart::FormDataPart; +pub use multipart::MixedPart; pub use multipart::Multipart; pub use multipart::Part; diff --git a/core/src/raw/http_util/multipart.rs b/core/src/raw/http_util/multipart.rs index 797478eb01b4..1bff5df23d30 100644 --- a/core/src/raw/http_util/multipart.rs +++ b/core/src/raw/http_util/multipart.rs @@ -15,23 +15,31 @@ // specific language governing permissions and limitations // under the License. +use std::str::FromStr; + +use crate::*; use bytes::{Bytes, BytesMut}; -use http::{header::CONTENT_DISPOSITION, HeaderMap, HeaderName, HeaderValue}; +use http::{ + header::{CONTENT_DISPOSITION, CONTENT_LENGTH, CONTENT_TYPE}, + HeaderMap, HeaderName, HeaderValue, Method, Request, Uri, Version, +}; + +use super::{new_request_build_error, AsyncBody}; /// Multipart is a builder for multipart/form-data. #[derive(Debug)] -pub struct Multipart { +pub struct Multipart { boundary: String, - parts: Vec, + parts: Vec, } -impl Default for Multipart { +impl Default for Multipart { fn default() -> Self { Self::new() } } -impl Multipart { +impl Multipart { /// Create a new multipart with random boundary. pub fn new() -> Self { Multipart { @@ -48,22 +56,22 @@ impl Multipart { } /// Insert a part into multipart. - pub fn part(mut self, part: Part) -> Self { + pub fn part(mut self, part: T) -> Self { self.parts.push(part); self } - pub(crate) fn build(self) -> (String, Bytes) { + pub(crate) fn build(&self) -> Bytes { let mut bs = BytesMut::new(); // Write headers. - for v in self.parts { + for v in self.parts.iter() { // Write the first boundary bs.extend_from_slice(b"--"); bs.extend_from_slice(self.boundary.as_bytes()); bs.extend_from_slice(b"\r\n"); - bs.extend_from_slice(&v.build()); + bs.extend_from_slice(v.build().as_ref()); } // Write the last boundary @@ -72,18 +80,48 @@ impl Multipart { bs.extend_from_slice(b"--"); bs.extend_from_slice(b"\r\n"); - (self.boundary, bs.freeze()) + bs.freeze() + } + + /// Consume the input and generate a request with multipart body. + /// + /// This founction will make sure content_type and content_length set correctly. + pub fn apply(self, mut builder: http::request::Builder) -> Result> { + let bs = self.build(); + + // Insert content type with correct boundary. + builder = builder.header( + CONTENT_TYPE, + format!("multipart/{}; boundary={}", T::TYPE, self.boundary).as_str(), + ); + // Insert content length with calculated size. + builder = builder.header(CONTENT_LENGTH, bs.len()); + + builder + .body(AsyncBody::Bytes(bs)) + .map_err(new_request_build_error) } } -/// Part is a builder for multipart/form-data part. +/// Part is a trait for multipart part. +pub trait Part { + /// TYPE is the type of multipart. + /// + /// Current available types are: `form-data` and `mixed` + const TYPE: &'static str; + + /// Build will consume this part and generates the bytes. + fn build(&self) -> Bytes; +} + +/// FormDataPart is a builder for multipart/form-data part. #[derive(Debug)] -pub struct Part { +pub struct FormDataPart { headers: HeaderMap, content: Bytes, } -impl Part { +impl FormDataPart { /// Create a new part builder /// /// # Panics @@ -114,8 +152,12 @@ impl Part { self.content = content.into(); self } +} + +impl Part for FormDataPart { + const TYPE: &'static str = "form-data"; - pub(crate) fn build(self) -> Bytes { + fn build(&self) -> Bytes { let mut bs = BytesMut::new(); // Write headers. @@ -135,6 +177,139 @@ impl Part { } } +/// MixedPart is a builder for multipart/mixed part. +#[derive(Debug)] +pub struct MixedPart { + part_headers: HeaderMap, + + method: Method, + uri: Uri, + version: Version, + headers: HeaderMap, + content: Bytes, +} + +impl MixedPart { + /// Create a new mixed part with gien uri. + pub fn new(uri: &str) -> Self { + let mut part_headers = HeaderMap::new(); + part_headers.insert(CONTENT_TYPE, "application/http".parse().unwrap()); + part_headers.insert("content-transfer-encoding", "binary".parse().unwrap()); + + let uri = Uri::from_str(uri).expect("the uri used to build a mixed part must be valid"); + + Self { + part_headers, + method: Method::GET, + uri, + version: Version::HTTP_11, + headers: HeaderMap::new(), + content: Bytes::new(), + } + } + + /// Build a mixed part from a request. + /// + /// # Notes + /// + /// Mixed parts only takes the path from the request uri. + pub fn from_request(req: Request) -> Self { + let mut part_headers = HeaderMap::new(); + part_headers.insert(CONTENT_TYPE, "application/http".parse().unwrap()); + part_headers.insert("content-transfer-encoding", "binary".parse().unwrap()); + + let (parts, body) = req.into_parts(); + + let content = match body { + AsyncBody::Empty => Bytes::new(), + AsyncBody::Bytes(bs) => bs, + }; + + Self { + part_headers, + method: parts.method, + // TODO: Maybe we should support query too?; + uri: Uri::from_str(parts.uri.path()) + .expect("the uri used to build a mixed part must be valid"), + version: parts.version, + headers: parts.headers, + content, + } + } + + /// Insert a part header into part. + pub fn part_header(mut self, key: HeaderName, value: HeaderValue) -> Self { + self.part_headers.insert(key, value); + self + } + + /// Set the method for request in this part. + pub fn method(mut self, method: Method) -> Self { + self.method = method; + self + } + + /// Set the version for request in this part. + pub fn version(mut self, version: Version) -> Self { + self.version = version; + self + } + + /// Insert a header into part. + pub fn header(mut self, key: HeaderName, value: HeaderValue) -> Self { + self.headers.insert(key, value); + self + } + + /// Set the content for this part. + pub fn content(mut self, content: impl Into) -> Self { + self.content = content.into(); + self + } +} + +impl Part for MixedPart { + const TYPE: &'static str = "mixed"; + + fn build(&self) -> Bytes { + let mut bs = BytesMut::new(); + + // Write parts headers. + for (k, v) in self.part_headers.iter() { + bs.extend_from_slice(k.as_str().as_bytes()); + bs.extend_from_slice(b": "); + bs.extend_from_slice(v.as_bytes()); + bs.extend_from_slice(b"\r\n"); + } + + // Write request line: `DELETE /container0/blob0 HTTP/1.1` + bs.extend_from_slice(b"\r\n"); + bs.extend_from_slice(self.method.as_str().as_bytes()); + bs.extend_from_slice(b" "); + bs.extend_from_slice(self.uri.path().as_bytes()); + bs.extend_from_slice(b" "); + bs.extend_from_slice(format!("{:?}", self.version).as_bytes()); + bs.extend_from_slice(b"\r\n"); + + // Write request headers. + for (k, v) in self.headers.iter() { + bs.extend_from_slice(k.as_str().as_bytes()); + bs.extend_from_slice(b": "); + bs.extend_from_slice(v.as_bytes()); + bs.extend_from_slice(b"\r\n"); + } + + // Write content. + bs.extend_from_slice(b"\r\n"); + if !self.content.is_empty() { + bs.extend_from_slice(&self.content); + bs.extend_from_slice(b"\r\n"); + } + + bs.freeze() + } +} + #[cfg(test)] mod tests { use super::*; @@ -143,47 +318,46 @@ mod tests { use pretty_assertions::assert_eq; #[test] - fn test_multipart_basic() { + fn test_multipart_formdata_basic() { let multipart = Multipart::new() - .part(Part::new("foo").content(Bytes::from("bar"))) - .part(Part::new("hello").content(Bytes::from("world"))); + .with_boundary("lalala") + .part(FormDataPart::new("foo").content(Bytes::from("bar"))) + .part(FormDataPart::new("hello").content(Bytes::from("world"))); - let (boundary, body) = multipart.build(); + let body = multipart.build(); - let expected = format!( - "--{boundary}\r\n\ + let expected = "--lalala\r\n\ content-disposition: form-data; name=\"foo\"\r\n\ \r\n\ bar\r\n\ - --{boundary}\r\n\ + --lalala\r\n\ content-disposition: form-data; name=\"hello\"\r\n\ \r\n\ world\r\n\ - --{boundary}--\r\n", - ); + --lalala--\r\n"; assert_eq!(expected, String::from_utf8(body.to_vec()).unwrap()); } - /// This test is inspired from + /// This test is inspired by #[test] - fn test_multipart_s3_form_upload() { + fn test_multipart_formdata_s3_form_upload() { let multipart = Multipart::new() .with_boundary("9431149156168") - .part(Part::new("key").content("user/eric/MyPicture.jpg")) - .part(Part::new("acl").content("public-read")) - .part(Part::new("success_action_redirect").content( + .part(FormDataPart::new("key").content("user/eric/MyPicture.jpg")) + .part(FormDataPart::new("acl").content("public-read")) + .part(FormDataPart::new("success_action_redirect").content( "https://awsexamplebucket1.s3.us-west-1.amazonaws.com/successful_upload.html", )) - .part(Part::new("Content-Type").content("image/jpeg")) - .part(Part::new("x-amz-meta-uuid").content("14365123651274")) - .part(Part::new("x-amz-meta-tag").content("Some,Tag,For,Picture")) - .part(Part::new("AWSAccessKeyId").content("AKIAIOSFODNN7EXAMPLE")) - .part(Part::new("Policy").content("eyAiZXhwaXJhdGlvbiI6ICIyMDA3LTEyLTAxVDEyOjAwOjAwLjAwMFoiLAogICJjb25kaXRpb25zIjogWwogICAgeyJidWNrZXQiOiAiam9obnNtaXRoIn0sCiAgICBbInN0YXJ0cy13aXRoIiwgIiRrZXkiLCAidXNlci9lcmljLyJdLAogICAgeyJhY2wiOiAicHVibGljLXJlYWQifSwKICAgIHsic3VjY2Vzc19hY3Rpb25fcmVkaXJlY3QiOiAiaHR0cDovL2pvaG5zbWl0aC5zMy5hbWF6b25hd3MuY29tL3N1Y2Nlc3NmdWxfdXBsb2FkLmh0bWwifSwKICAgIFsic3RhcnRzLXdpdGgiLCAiJENvbnRlbnQtVHlwZSIsICJpbWFnZS8iXSwKICAgIHsieC1hbXotbWV0YS11dWlkIjogIjE0MzY1MTIzNjUxMjc0In0sCiAgICBbInN0YXJ0cy13aXRoIiwgIiR4LWFtei1tZXRhLXRhZyIsICIiXQogIF0KfQo=")) - .part(Part::new("Signature").content("0RavWzkygo6QX9caELEqKi9kDbU=")) - .part(Part::new("file").header(CONTENT_TYPE, "image/jpeg".parse().unwrap()).content("...file content...")).part(Part::new("submit").content("Upload to Amazon S3")); + .part(FormDataPart::new("content-type").content("image/jpeg")) + .part(FormDataPart::new("x-amz-meta-uuid").content("14365123651274")) + .part(FormDataPart::new("x-amz-meta-tag").content("Some,Tag,For,Picture")) + .part(FormDataPart::new("AWSAccessKeyId").content("AKIAIOSFODNN7EXAMPLE")) + .part(FormDataPart::new("Policy").content("eyAiZXhwaXJhdGlvbiI6ICIyMDA3LTEyLTAxVDEyOjAwOjAwLjAwMFoiLAogICJjb25kaXRpb25zIjogWwogICAgeyJidWNrZXQiOiAiam9obnNtaXRoIn0sCiAgICBbInN0YXJ0cy13aXRoIiwgIiRrZXkiLCAidXNlci9lcmljLyJdLAogICAgeyJhY2wiOiAicHVibGljLXJlYWQifSwKICAgIHsic3VjY2Vzc19hY3Rpb25fcmVkaXJlY3QiOiAiaHR0cDovL2pvaG5zbWl0aC5zMy5hbWF6b25hd3MuY29tL3N1Y2Nlc3NmdWxfdXBsb2FkLmh0bWwifSwKICAgIFsic3RhcnRzLXdpdGgiLCAiJENvbnRlbnQtVHlwZSIsICJpbWFnZS8iXSwKICAgIHsieC1hbXotbWV0YS11dWlkIjogIjE0MzY1MTIzNjUxMjc0In0sCiAgICBbInN0YXJ0cy13aXRoIiwgIiR4LWFtei1tZXRhLXRhZyIsICIiXQogIF0KfQo=")) + .part(FormDataPart::new("Signature").content("0RavWzkygo6QX9caELEqKi9kDbU=")) + .part(FormDataPart::new("file").header(CONTENT_TYPE, "image/jpeg".parse().unwrap()).content("...file content...")).part(FormDataPart::new("submit").content("Upload to Amazon S3")); - let (_, body) = multipart.build(); + let body = multipart.build(); let expected = r#"--9431149156168 content-disposition: form-data; name="key" @@ -198,7 +372,7 @@ content-disposition: form-data; name="success_action_redirect" https://awsexamplebucket1.s3.us-west-1.amazonaws.com/successful_upload.html --9431149156168 -content-disposition: form-data; name="Content-Type" +content-disposition: form-data; name="content-type" image/jpeg --9431149156168 @@ -231,6 +405,213 @@ content-disposition: form-data; name="submit" Upload to Amazon S3 --9431149156168-- +"#; + + assert_eq!( + expected, + // Rust can't represent `\r` in a string literal, so we + // replace `\r\n` with `\n` for comparison + String::from_utf8(body.to_vec()) + .unwrap() + .replace("\r\n", "\n") + ); + } + + /// This test is inspired by + #[test] + fn test_multipart_mixed_gcs_batch_metadata() { + let multipart = Multipart::new() + .with_boundary("===============7330845974216740156==") + .part( + MixedPart::new("/storage/v1/b/example-bucket/o/obj1") + .method(Method::PATCH) + .part_header( + "content-id".parse().unwrap(), + "".parse().unwrap(), + ) + .header( + "content-type".parse().unwrap(), + "application/json".parse().unwrap(), + ) + .header( + "accept".parse().unwrap(), + "application/json".parse().unwrap(), + ) + .header("content-length".parse().unwrap(), "31".parse().unwrap()) + .content(r#"{"metadata": {"type": "tabby"}}"#), + ) + .part( + MixedPart::new("/storage/v1/b/example-bucket/o/obj2") + .method(Method::PATCH) + .part_header( + "content-id".parse().unwrap(), + "".parse().unwrap(), + ) + .header( + "content-type".parse().unwrap(), + "application/json".parse().unwrap(), + ) + .header( + "accept".parse().unwrap(), + "application/json".parse().unwrap(), + ) + .header("content-length".parse().unwrap(), "32".parse().unwrap()) + .content(r#"{"metadata": {"type": "tuxedo"}}"#), + ) + .part( + MixedPart::new("/storage/v1/b/example-bucket/o/obj3") + .method(Method::PATCH) + .part_header( + "content-id".parse().unwrap(), + "".parse().unwrap(), + ) + .header( + "content-type".parse().unwrap(), + "application/json".parse().unwrap(), + ) + .header( + "accept".parse().unwrap(), + "application/json".parse().unwrap(), + ) + .header("content-length".parse().unwrap(), "32".parse().unwrap()) + .content(r#"{"metadata": {"type": "calico"}}"#), + ); + + let body = multipart.build(); + + let expected = r#"--===============7330845974216740156== +content-type: application/http +content-transfer-encoding: binary +content-id: + +PATCH /storage/v1/b/example-bucket/o/obj1 HTTP/1.1 +content-type: application/json +accept: application/json +content-length: 31 + +{"metadata": {"type": "tabby"}} +--===============7330845974216740156== +content-type: application/http +content-transfer-encoding: binary +content-id: + +PATCH /storage/v1/b/example-bucket/o/obj2 HTTP/1.1 +content-type: application/json +accept: application/json +content-length: 32 + +{"metadata": {"type": "tuxedo"}} +--===============7330845974216740156== +content-type: application/http +content-transfer-encoding: binary +content-id: + +PATCH /storage/v1/b/example-bucket/o/obj3 HTTP/1.1 +content-type: application/json +accept: application/json +content-length: 32 + +{"metadata": {"type": "calico"}} +--===============7330845974216740156==-- +"#; + + assert_eq!( + expected, + // Rust can't represent `\r` in a string literal, so we + // replace `\r\n` with `\n` for comparison + String::from_utf8(body.to_vec()) + .unwrap() + .replace("\r\n", "\n") + ); + } + + /// This test is inspired by + #[test] + fn test_multipart_mixed_azblob_batch_delete() { + let multipart = Multipart::new() + .with_boundary("batch_357de4f7-6d0b-4e02-8cd2-6361411a9525") + .part( + MixedPart::new("/container0/blob0") + .method(Method::DELETE) + .part_header("content-id".parse().unwrap(), "0".parse().unwrap()) + .header( + "x-ms-date".parse().unwrap(), + "Thu, 14 Jun 2018 16:46:54 GMT".parse().unwrap(), + ) + .header( + "authorization".parse().unwrap(), + "SharedKey account:G4jjBXA7LI/RnWKIOQ8i9xH4p76pAQ+4Fs4R1VxasaE=" + .parse() + .unwrap(), + ) + .header("content-length".parse().unwrap(), "0".parse().unwrap()), + ) + .part( + MixedPart::new("/container1/blob1") + .method(Method::DELETE) + .part_header("content-id".parse().unwrap(), "1".parse().unwrap()) + .header( + "x-ms-date".parse().unwrap(), + "Thu, 14 Jun 2018 16:46:54 GMT".parse().unwrap(), + ) + .header( + "authorization".parse().unwrap(), + "SharedKey account:IvCoYDQ+0VcaA/hKFjUmQmIxXv2RT3XwwTsOTHL39HI=" + .parse() + .unwrap(), + ) + .header("content-length".parse().unwrap(), "0".parse().unwrap()), + ) + .part( + MixedPart::new("/container2/blob2") + .method(Method::DELETE) + .part_header("content-id".parse().unwrap(), "2".parse().unwrap()) + .header( + "x-ms-date".parse().unwrap(), + "Thu, 14 Jun 2018 16:46:54 GMT".parse().unwrap(), + ) + .header( + "authorization".parse().unwrap(), + "SharedKey account:S37N2JTjcmOQVLHLbDmp2johz+KpTJvKhbVc4M7+UqI=" + .parse() + .unwrap(), + ) + .header("content-length".parse().unwrap(), "0".parse().unwrap()), + ); + + let body = multipart.build(); + + let expected = r#"--batch_357de4f7-6d0b-4e02-8cd2-6361411a9525 +content-type: application/http +content-transfer-encoding: binary +content-id: 0 + +DELETE /container0/blob0 HTTP/1.1 +x-ms-date: Thu, 14 Jun 2018 16:46:54 GMT +authorization: SharedKey account:G4jjBXA7LI/RnWKIOQ8i9xH4p76pAQ+4Fs4R1VxasaE= +content-length: 0 + +--batch_357de4f7-6d0b-4e02-8cd2-6361411a9525 +content-type: application/http +content-transfer-encoding: binary +content-id: 1 + +DELETE /container1/blob1 HTTP/1.1 +x-ms-date: Thu, 14 Jun 2018 16:46:54 GMT +authorization: SharedKey account:IvCoYDQ+0VcaA/hKFjUmQmIxXv2RT3XwwTsOTHL39HI= +content-length: 0 + +--batch_357de4f7-6d0b-4e02-8cd2-6361411a9525 +content-type: application/http +content-transfer-encoding: binary +content-id: 2 + +DELETE /container2/blob2 HTTP/1.1 +x-ms-date: Thu, 14 Jun 2018 16:46:54 GMT +authorization: SharedKey account:S37N2JTjcmOQVLHLbDmp2johz+KpTJvKhbVc4M7+UqI= +content-length: 0 + +--batch_357de4f7-6d0b-4e02-8cd2-6361411a9525-- "#; assert_eq!( diff --git a/core/src/services/azblob/backend.rs b/core/src/services/azblob/backend.rs index 69a59fe0a785..5c07de8d6ca4 100644 --- a/core/src/services/azblob/backend.rs +++ b/core/src/services/azblob/backend.rs @@ -475,6 +475,7 @@ impl Accessor for AzblobBackend { copy: true, presign: self.has_sas_token, batch: true, + batch_delete: true, batch_max_operations: Some(AZBLOB_BATCH_LIMIT), ..Default::default() }); @@ -648,6 +649,7 @@ impl Accessor for AzblobBackend { "batch delete limit exceeded", )); } + // construct and complete batch request let resp = self.core.azblob_batch_delete(&paths).await?; diff --git a/core/src/services/azblob/batch.rs b/core/src/services/azblob/batch.rs index 8386aa1621d8..c6004be022c6 100644 --- a/core/src/services/azblob/batch.rs +++ b/core/src/services/azblob/batch.rs @@ -15,100 +15,12 @@ // specific language governing permissions and limitations // under the License. -use bytes::BufMut; -use bytes::BytesMut; -use http::header::CONTENT_LENGTH; -use http::header::CONTENT_TYPE; -use http::Request; use http::StatusCode; -use uuid::Uuid; use super::error::parse_http_error; use crate::raw::*; use crate::*; -const AZURE_BATCH_LIMIT: usize = 256; - -/// helper struct for batch requests -pub(crate) struct BatchDeleteRequestBuilder { - url: String, - sub_requests: Vec>, -} - -impl BatchDeleteRequestBuilder { - /// create a new batch request builder - pub fn new(url: &str) -> Self { - Self { - url: url.to_string(), - sub_requests: vec![], - } - } - /// append request in batch - /// - /// # Note - /// `sub_req` must have been signed by signer - pub fn append(&mut self, sub_req: Request) -> &mut Self { - debug_assert!(self.sub_requests.len() < AZURE_BATCH_LIMIT); - self.sub_requests.push(sub_req); - self - } - /// create an batch request, not signed by signer - pub fn try_into_req(self) -> Result> { - let boundary = format!("opendal-{}", Uuid::new_v4()); - let mut body = BytesMut::new(); - - let req_builder = Request::post(&self.url).header( - CONTENT_TYPE, - format!("multipart/mixed; boundary={}", boundary), - ); - - for (idx, req) in self.sub_requests.into_iter().enumerate() { - let headers: String = req - .headers() - .iter() - .map(|(k, v)| { - let (k, v) = (k.as_str(), v.to_str().unwrap()); - format!("{}: {}", k, v) - }) - .collect::>() - .join("\n"); - let path = req - .uri() - .path_and_query() - .expect("delete request comes with no path") - .to_string(); - - let block = format!( - r#"--{boundary} -Content-Type: application/http -Content-Transfer-Encoding: binary -Content-ID: {idx} - -{} {} HTTP/1.1 -{} - -"#, - req.method(), - path, - headers - ); - // replace LF with CRLF, required by Azure Storage Blobs service. - // - // The Rust compiler converts all CRLF sequences to LF when reading source files - // since 2019, so it is safe to convert here - let block = block.replace('\n', "\r\n"); - body.put(block.as_bytes()); - } - body.put(format!("--{}--", boundary).as_bytes()); - - let content_length = body.len(); - req_builder - .header(CONTENT_LENGTH, content_length) - .body(AsyncBody::Bytes(body.freeze())) - .map_err(new_request_build_error) - } -} - pub(super) fn parse_batch_delete_response( boundary: &str, body: String, @@ -177,67 +89,7 @@ pub(super) fn parse_batch_delete_response( #[cfg(test)] mod test { - use anyhow::anyhow; - use anyhow::Result; - use http::header::CONTENT_LENGTH; - use http::header::CONTENT_TYPE; - use http::Request; - - use super::BatchDeleteRequestBuilder; - use crate::raw::AsyncBody; - use crate::services::azblob::batch::parse_batch_delete_response; - - #[test] - fn batch_delete_req_builder_test() -> Result<()> { - let url = "https://test.blob.core.windows.net/test"; - let delete_url = "https://test.blob.core.windows.net/test/test.txt"; - let delete_req = Request::delete(delete_url) - .header(CONTENT_LENGTH, 0) - .body(AsyncBody::Empty) - .expect("must success"); - - let mut builder = BatchDeleteRequestBuilder::new(url); - builder.append(delete_req); - - let req = builder.try_into_req().expect("must success"); - - let (header, body) = req.into_parts(); - let content_type = header - .headers - .get(CONTENT_TYPE) - .expect("expect header in request: CONTENT_TYPE: application/mixed.") - .to_str() - .unwrap(); - let boundary = content_type - .split("boundary=") - .collect::>() - .get(1) - .expect("get invalid CONTENT_TYPE header in response") - .to_owned(); - - let bs = match body { - AsyncBody::Bytes(bs) => bs, - _ => return Err(anyhow!("wrong body type")), - }; - - let s = String::from_utf8_lossy(&bs); - let splits: Vec<&str> = s.split(&format!("--{}", boundary)).collect(); - assert_eq!(splits.len(), 3); - - let expect_body_str = r#" -Content-Type: application/http -Content-Transfer-Encoding: binary -Content-ID: 0 - -DELETE /test/test.txt HTTP/1.1 -content-length: 0 - -"# - .replace('\n', "\r\n"); - let actual_body_str = splits[1]; - assert_eq!(actual_body_str, expect_body_str); - Ok(()) - } + use super::*; #[test] fn test_break_down_batch() { diff --git a/core/src/services/azblob/core.rs b/core/src/services/azblob/core.rs index 6f4e9b195c19..62665a0e5480 100644 --- a/core/src/services/azblob/core.rs +++ b/core/src/services/azblob/core.rs @@ -19,7 +19,6 @@ use std::fmt; use std::fmt::Debug; use std::fmt::Formatter; use std::fmt::Write; -use std::str::FromStr; use http::header::HeaderName; use http::header::CONTENT_LENGTH; @@ -28,12 +27,10 @@ use http::header::IF_MATCH; use http::header::IF_NONE_MATCH; use http::Request; use http::Response; -use http::Uri; use reqsign::AzureStorageCredential; use reqsign::AzureStorageLoader; use reqsign::AzureStorageSigner; -use super::batch::BatchDeleteRequestBuilder; use crate::raw::*; use crate::*; @@ -274,7 +271,7 @@ impl AzblobCore { self.send(req).await } - pub async fn azblob_delete_blob(&self, path: &str) -> Result> { + pub fn azblob_delete_blob_request(&self, path: &str) -> Result> { let p = build_abs_path(&self.root, path); let url = format!( @@ -286,9 +283,13 @@ impl AzblobCore { let req = Request::delete(&url); - let mut req = req + req.header(CONTENT_LENGTH, 0) .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; + .map_err(new_request_build_error) + } + + pub async fn azblob_delete_blob(&self, path: &str) -> Result> { + let mut req = self.azblob_delete_blob_request(path)?; self.sign(&mut req).await?; self.send(req).await @@ -364,35 +365,24 @@ impl AzblobCore { &self, paths: &[String], ) -> Result> { - // init batch request let url = format!( "{}/{}?restype=container&comp=batch", self.endpoint, self.container ); - let mut batch_delete_req_builder = BatchDeleteRequestBuilder::new(&url); - - for path in paths.iter() { - // build sub requests - let p = build_abs_path(&self.root, path); - let encoded_path = percent_encode_path(&p); - - let url = Uri::from_str(&format!( - "{}/{}/{}", - self.endpoint, self.container, encoded_path - )) - .unwrap(); - let mut sub_req = Request::delete(&url.to_string()) - .header(CONTENT_LENGTH, 0) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; + let mut multipart = Multipart::new(); - self.batch_sign(&mut sub_req).await?; + for (idx, path) in paths.iter().enumerate() { + let mut req = self.azblob_delete_blob_request(path)?; - batch_delete_req_builder.append(sub_req); + self.batch_sign(&mut req).await?; + multipart = multipart.part( + MixedPart::from_request(req).part_header("content-id".parse().unwrap(), idx.into()), + ); } - let mut req = batch_delete_req_builder.try_into_req()?; + let req = Request::post(url); + let mut req = multipart.apply(req)?; self.sign(&mut req).await?; self.send(req).await diff --git a/core/src/services/ipmfs/backend.rs b/core/src/services/ipmfs/backend.rs index 61c18232aef3..7432be7f4ea9 100644 --- a/core/src/services/ipmfs/backend.rs +++ b/core/src/services/ipmfs/backend.rs @@ -21,6 +21,7 @@ use std::str; use std::sync::Arc; use async_trait::async_trait; +use bytes::Bytes; use http::Request; use http::Response; use http::StatusCode; @@ -281,7 +282,7 @@ impl IpmfsBackend { pub async fn ipmfs_write( &self, path: &str, - body: AsyncBody, + body: Bytes, ) -> Result> { let p = build_rooted_abs_path(&self.root, path); @@ -291,9 +292,10 @@ impl IpmfsBackend { percent_encode_path(&p) ); - let req = Request::post(url); + let multipart = Multipart::new().part(FormDataPart::new("data").content(body)); - let req = req.body(body).map_err(new_request_build_error)?; + let req: http::request::Builder = Request::post(url); + let req = multipart.apply(req)?; self.client.send(req).await } diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index 9b72d8906a2b..f704dcd0ebfb 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -39,13 +39,7 @@ impl IpmfsWriter { #[async_trait] impl oio::Write for IpmfsWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { - let resp = self - .backend - .ipmfs_write( - &self.path, - AsyncBody::Multipart(Multipart::new().part(Part::new("data").content(bs))), - ) - .await?; + let resp = self.backend.ipmfs_write(&self.path, bs).await?; let status = resp.status();