From dc6b163ac1d872b82846e7c5284e09424c642275 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 28 Apr 2023 18:13:23 +0800 Subject: [PATCH 1/6] Refactor multipart Signed-off-by: Xuanwo --- core/src/raw/http_util/body.rs | 2 +- core/src/raw/http_util/mod.rs | 2 +- core/src/raw/http_util/multipart.rs | 50 +++++++++++++++++------------ core/src/services/ipmfs/writer.rs | 2 +- 4 files changed, 32 insertions(+), 24 deletions(-) diff --git a/core/src/raw/http_util/body.rs b/core/src/raw/http_util/body.rs index 4d874e06f235..3f7d5e735854 100644 --- a/core/src/raw/http_util/body.rs +++ b/core/src/raw/http_util/body.rs @@ -47,7 +47,7 @@ pub enum AsyncBody { /// /// If input with this field, we will goto the internal multipart /// handle logic. - Multipart(Multipart), + Multipart(Multipart), } type BytesStream = Box> + Send + Sync + Unpin>; diff --git a/core/src/raw/http_util/mod.rs b/core/src/raw/http_util/mod.rs index f94098ceb3cc..e579d5425394 100644 --- a/core/src/raw/http_util/mod.rs +++ b/core/src/raw/http_util/mod.rs @@ -61,5 +61,5 @@ mod bytes_content_range; pub use bytes_content_range::BytesContentRange; mod multipart; +pub use multipart::FormDataPart; pub use multipart::Multipart; -pub use multipart::Part; diff --git a/core/src/raw/http_util/multipart.rs b/core/src/raw/http_util/multipart.rs index 797478eb01b4..feb57edf1dfe 100644 --- a/core/src/raw/http_util/multipart.rs +++ b/core/src/raw/http_util/multipart.rs @@ -20,18 +20,18 @@ use http::{header::CONTENT_DISPOSITION, HeaderMap, HeaderName, HeaderValue}; /// Multipart is a builder for multipart/form-data. #[derive(Debug)] -pub struct Multipart { +pub struct Multipart { boundary: String, - parts: Vec, + parts: Vec, } -impl Default for Multipart { +impl Default for Multipart { fn default() -> Self { Self::new() } } -impl Multipart { +impl Multipart { /// Create a new multipart with random boundary. pub fn new() -> Self { Multipart { @@ -48,11 +48,13 @@ impl Multipart { } /// Insert a part into multipart. - pub fn part(mut self, part: Part) -> Self { + pub fn part(mut self, part: T) -> Self { self.parts.push(part); self } +} +impl> Multipart { pub(crate) fn build(self) -> (String, Bytes) { let mut bs = BytesMut::new(); @@ -63,7 +65,7 @@ impl Multipart { bs.extend_from_slice(self.boundary.as_bytes()); bs.extend_from_slice(b"\r\n"); - bs.extend_from_slice(&v.build()); + bs.extend_from_slice(v.into().as_ref()); } // Write the last boundary @@ -76,14 +78,14 @@ impl Multipart { } } -/// Part is a builder for multipart/form-data part. +/// FormDataPart is a builder for multipart/form-data part. #[derive(Debug)] -pub struct Part { +pub struct FormDataPart { headers: HeaderMap, content: Bytes, } -impl Part { +impl FormDataPart { /// Create a new part builder /// /// # Panics @@ -135,6 +137,12 @@ impl Part { } } +impl From for Bytes { + fn from(val: FormDataPart) -> Bytes { + val.build() + } +} + #[cfg(test)] mod tests { use super::*; @@ -145,8 +153,8 @@ mod tests { #[test] fn test_multipart_basic() { let multipart = Multipart::new() - .part(Part::new("foo").content(Bytes::from("bar"))) - .part(Part::new("hello").content(Bytes::from("world"))); + .part(FormDataPart::new("foo").content(Bytes::from("bar"))) + .part(FormDataPart::new("hello").content(Bytes::from("world"))); let (boundary, body) = multipart.build(); @@ -170,18 +178,18 @@ mod tests { fn test_multipart_s3_form_upload() { let multipart = Multipart::new() .with_boundary("9431149156168") - .part(Part::new("key").content("user/eric/MyPicture.jpg")) - .part(Part::new("acl").content("public-read")) - .part(Part::new("success_action_redirect").content( + .part(FormDataPart::new("key").content("user/eric/MyPicture.jpg")) + .part(FormDataPart::new("acl").content("public-read")) + .part(FormDataPart::new("success_action_redirect").content( "https://awsexamplebucket1.s3.us-west-1.amazonaws.com/successful_upload.html", )) - .part(Part::new("Content-Type").content("image/jpeg")) - .part(Part::new("x-amz-meta-uuid").content("14365123651274")) - .part(Part::new("x-amz-meta-tag").content("Some,Tag,For,Picture")) - .part(Part::new("AWSAccessKeyId").content("AKIAIOSFODNN7EXAMPLE")) - .part(Part::new("Policy").content("eyAiZXhwaXJhdGlvbiI6ICIyMDA3LTEyLTAxVDEyOjAwOjAwLjAwMFoiLAogICJjb25kaXRpb25zIjogWwogICAgeyJidWNrZXQiOiAiam9obnNtaXRoIn0sCiAgICBbInN0YXJ0cy13aXRoIiwgIiRrZXkiLCAidXNlci9lcmljLyJdLAogICAgeyJhY2wiOiAicHVibGljLXJlYWQifSwKICAgIHsic3VjY2Vzc19hY3Rpb25fcmVkaXJlY3QiOiAiaHR0cDovL2pvaG5zbWl0aC5zMy5hbWF6b25hd3MuY29tL3N1Y2Nlc3NmdWxfdXBsb2FkLmh0bWwifSwKICAgIFsic3RhcnRzLXdpdGgiLCAiJENvbnRlbnQtVHlwZSIsICJpbWFnZS8iXSwKICAgIHsieC1hbXotbWV0YS11dWlkIjogIjE0MzY1MTIzNjUxMjc0In0sCiAgICBbInN0YXJ0cy13aXRoIiwgIiR4LWFtei1tZXRhLXRhZyIsICIiXQogIF0KfQo=")) - .part(Part::new("Signature").content("0RavWzkygo6QX9caELEqKi9kDbU=")) - .part(Part::new("file").header(CONTENT_TYPE, "image/jpeg".parse().unwrap()).content("...file content...")).part(Part::new("submit").content("Upload to Amazon S3")); + .part(FormDataPart::new("Content-Type").content("image/jpeg")) + .part(FormDataPart::new("x-amz-meta-uuid").content("14365123651274")) + .part(FormDataPart::new("x-amz-meta-tag").content("Some,Tag,For,Picture")) + .part(FormDataPart::new("AWSAccessKeyId").content("AKIAIOSFODNN7EXAMPLE")) + .part(FormDataPart::new("Policy").content("eyAiZXhwaXJhdGlvbiI6ICIyMDA3LTEyLTAxVDEyOjAwOjAwLjAwMFoiLAogICJjb25kaXRpb25zIjogWwogICAgeyJidWNrZXQiOiAiam9obnNtaXRoIn0sCiAgICBbInN0YXJ0cy13aXRoIiwgIiRrZXkiLCAidXNlci9lcmljLyJdLAogICAgeyJhY2wiOiAicHVibGljLXJlYWQifSwKICAgIHsic3VjY2Vzc19hY3Rpb25fcmVkaXJlY3QiOiAiaHR0cDovL2pvaG5zbWl0aC5zMy5hbWF6b25hd3MuY29tL3N1Y2Nlc3NmdWxfdXBsb2FkLmh0bWwifSwKICAgIFsic3RhcnRzLXdpdGgiLCAiJENvbnRlbnQtVHlwZSIsICJpbWFnZS8iXSwKICAgIHsieC1hbXotbWV0YS11dWlkIjogIjE0MzY1MTIzNjUxMjc0In0sCiAgICBbInN0YXJ0cy13aXRoIiwgIiR4LWFtei1tZXRhLXRhZyIsICIiXQogIF0KfQo=")) + .part(FormDataPart::new("Signature").content("0RavWzkygo6QX9caELEqKi9kDbU=")) + .part(FormDataPart::new("file").header(CONTENT_TYPE, "image/jpeg".parse().unwrap()).content("...file content...")).part(FormDataPart::new("submit").content("Upload to Amazon S3")); let (_, body) = multipart.build(); diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index 9b72d8906a2b..de721345a4ae 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -43,7 +43,7 @@ impl oio::Write for IpmfsWriter { .backend .ipmfs_write( &self.path, - AsyncBody::Multipart(Multipart::new().part(Part::new("data").content(bs))), + AsyncBody::Multipart(Multipart::new().part(FormDataPart::new("data").content(bs))), ) .await?; From e3ed71f2ef76b3da8829ed8790af52b15af12c84 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 28 Apr 2023 19:04:08 +0800 Subject: [PATCH 2/6] Implement multipart mixed support Signed-off-by: Xuanwo --- core/src/raw/http_util/mod.rs | 1 + core/src/raw/http_util/multipart.rs | 350 +++++++++++++++++++++++++++- 2 files changed, 345 insertions(+), 6 deletions(-) diff --git a/core/src/raw/http_util/mod.rs b/core/src/raw/http_util/mod.rs index e579d5425394..135aa675c0b5 100644 --- a/core/src/raw/http_util/mod.rs +++ b/core/src/raw/http_util/mod.rs @@ -62,4 +62,5 @@ pub use bytes_content_range::BytesContentRange; mod multipart; pub use multipart::FormDataPart; +pub use multipart::MixedPart; pub use multipart::Multipart; diff --git a/core/src/raw/http_util/multipart.rs b/core/src/raw/http_util/multipart.rs index feb57edf1dfe..24cfb8a2c824 100644 --- a/core/src/raw/http_util/multipart.rs +++ b/core/src/raw/http_util/multipart.rs @@ -15,8 +15,13 @@ // specific language governing permissions and limitations // under the License. +use std::str::FromStr; + use bytes::{Bytes, BytesMut}; -use http::{header::CONTENT_DISPOSITION, HeaderMap, HeaderName, HeaderValue}; +use http::{ + header::{CONTENT_DISPOSITION, CONTENT_TYPE}, + HeaderMap, HeaderName, HeaderValue, Method, Request, Uri, Version, +}; /// Multipart is a builder for multipart/form-data. #[derive(Debug)] @@ -143,6 +148,132 @@ impl From for Bytes { } } +/// MixedPart is a builder for multipart/mixed part. +#[derive(Debug)] +pub struct MixedPart { + part_headers: HeaderMap, + + method: Method, + uri: Uri, + version: Version, + headers: HeaderMap, + content: Bytes, +} + +impl MixedPart { + /// Create a new mixed part with gien uri. + pub fn new(uri: &str) -> Self { + let mut part_headers = HeaderMap::new(); + part_headers.insert(CONTENT_TYPE, "application/http".parse().unwrap()); + part_headers.insert("content-transfer-encoding", "binary".parse().unwrap()); + + let uri = Uri::from_str(uri).expect("the uri used to build a mixed part must be valid"); + + Self { + part_headers, + method: Method::GET, + uri, + version: Version::HTTP_11, + headers: HeaderMap::new(), + content: Bytes::new(), + } + } + + /// Build a mixed part from a request. + pub fn from_request(req: Request) -> Self { + let mut part_headers = HeaderMap::new(); + part_headers.insert(CONTENT_TYPE, "application/http".parse().unwrap()); + part_headers.insert("content-transfer-encoding", "binary".parse().unwrap()); + + let (parts, body) = req.into_parts(); + + Self { + part_headers, + method: parts.method, + // TODO: Maybe we should support query too?; + uri: Uri::from_str(parts.uri.path()) + .expect("the uri used to build a mixed part must be valid"), + version: parts.version, + headers: parts.headers, + content: body, + } + } + + /// Insert a part header into part. + pub fn part_header(mut self, key: HeaderName, value: HeaderValue) -> Self { + self.part_headers.insert(key, value); + self + } + + /// Set the method for request in this part. + pub fn method(mut self, method: Method) -> Self { + self.method = method; + self + } + + /// Set the version for request in this part. + pub fn version(mut self, version: Version) -> Self { + self.version = version; + self + } + + /// Insert a header into part. + pub fn header(mut self, key: HeaderName, value: HeaderValue) -> Self { + self.headers.insert(key, value); + self + } + + /// Set the content for this part. + pub fn content(mut self, content: impl Into) -> Self { + self.content = content.into(); + self + } + + pub(crate) fn build(self) -> Bytes { + let mut bs = BytesMut::new(); + + // Write parts headers. + for (k, v) in self.part_headers.iter() { + bs.extend_from_slice(k.as_str().as_bytes()); + bs.extend_from_slice(b": "); + bs.extend_from_slice(v.as_bytes()); + bs.extend_from_slice(b"\r\n"); + } + + // Write request line: `DELETE /container0/blob0 HTTP/1.1` + bs.extend_from_slice(b"\r\n"); + bs.extend_from_slice(self.method.as_str().as_bytes()); + bs.extend_from_slice(b" "); + bs.extend_from_slice(self.uri.path().as_bytes()); + bs.extend_from_slice(b" "); + bs.extend_from_slice(format!("{:?}", self.version).as_bytes()); + bs.extend_from_slice(b"\r\n"); + + // Write request headers. + for (k, v) in self.headers.iter() { + bs.extend_from_slice(k.as_str().as_bytes()); + bs.extend_from_slice(b": "); + bs.extend_from_slice(v.as_bytes()); + bs.extend_from_slice(b"\r\n"); + } + + // Write content. + bs.extend_from_slice(b"\r\n"); + if !self.content.is_empty() { + bs.extend_from_slice(&self.content); + bs.extend_from_slice(b"\r\n"); + } + + bs.freeze() + } +} + +impl From for Bytes { + fn from(val: MixedPart) -> Bytes { + val.build() + } +} + #[cfg(test)] mod tests { use super::*; @@ -151,7 +282,7 @@ mod tests { use pretty_assertions::assert_eq; #[test] - fn test_multipart_basic() { + fn test_multipart_formdata_basic() { let multipart = Multipart::new() .part(FormDataPart::new("foo").content(Bytes::from("bar"))) .part(FormDataPart::new("hello").content(Bytes::from("world"))); @@ -173,9 +304,9 @@ mod tests { assert_eq!(expected, String::from_utf8(body.to_vec()).unwrap()); } - /// This test is inspired from + /// This test is inspired by #[test] - fn test_multipart_s3_form_upload() { + fn test_multipart_formdata_s3_form_upload() { let multipart = Multipart::new() .with_boundary("9431149156168") .part(FormDataPart::new("key").content("user/eric/MyPicture.jpg")) @@ -183,7 +314,7 @@ mod tests { .part(FormDataPart::new("success_action_redirect").content( "https://awsexamplebucket1.s3.us-west-1.amazonaws.com/successful_upload.html", )) - .part(FormDataPart::new("Content-Type").content("image/jpeg")) + .part(FormDataPart::new("content-type").content("image/jpeg")) .part(FormDataPart::new("x-amz-meta-uuid").content("14365123651274")) .part(FormDataPart::new("x-amz-meta-tag").content("Some,Tag,For,Picture")) .part(FormDataPart::new("AWSAccessKeyId").content("AKIAIOSFODNN7EXAMPLE")) @@ -206,7 +337,7 @@ content-disposition: form-data; name="success_action_redirect" https://awsexamplebucket1.s3.us-west-1.amazonaws.com/successful_upload.html --9431149156168 -content-disposition: form-data; name="Content-Type" +content-disposition: form-data; name="content-type" image/jpeg --9431149156168 @@ -239,6 +370,213 @@ content-disposition: form-data; name="submit" Upload to Amazon S3 --9431149156168-- +"#; + + assert_eq!( + expected, + // Rust can't represent `\r` in a string literal, so we + // replace `\r\n` with `\n` for comparison + String::from_utf8(body.to_vec()) + .unwrap() + .replace("\r\n", "\n") + ); + } + + /// This test is inspired by + #[test] + fn test_multipart_mixed_gcs_batch_metadata() { + let multipart = Multipart::new() + .with_boundary("===============7330845974216740156==") + .part( + MixedPart::new("/storage/v1/b/example-bucket/o/obj1") + .method(Method::PATCH) + .part_header( + "content-id".parse().unwrap(), + "".parse().unwrap(), + ) + .header( + "content-type".parse().unwrap(), + "application/json".parse().unwrap(), + ) + .header( + "accept".parse().unwrap(), + "application/json".parse().unwrap(), + ) + .header("content-length".parse().unwrap(), "31".parse().unwrap()) + .content(r#"{"metadata": {"type": "tabby"}}"#), + ) + .part( + MixedPart::new("/storage/v1/b/example-bucket/o/obj2") + .method(Method::PATCH) + .part_header( + "content-id".parse().unwrap(), + "".parse().unwrap(), + ) + .header( + "content-type".parse().unwrap(), + "application/json".parse().unwrap(), + ) + .header( + "accept".parse().unwrap(), + "application/json".parse().unwrap(), + ) + .header("content-length".parse().unwrap(), "32".parse().unwrap()) + .content(r#"{"metadata": {"type": "tuxedo"}}"#), + ) + .part( + MixedPart::new("/storage/v1/b/example-bucket/o/obj3") + .method(Method::PATCH) + .part_header( + "content-id".parse().unwrap(), + "".parse().unwrap(), + ) + .header( + "content-type".parse().unwrap(), + "application/json".parse().unwrap(), + ) + .header( + "accept".parse().unwrap(), + "application/json".parse().unwrap(), + ) + .header("content-length".parse().unwrap(), "32".parse().unwrap()) + .content(r#"{"metadata": {"type": "calico"}}"#), + ); + + let (_, body) = multipart.build(); + + let expected = r#"--===============7330845974216740156== +content-type: application/http +content-transfer-encoding: binary +content-id: + +PATCH /storage/v1/b/example-bucket/o/obj1 HTTP/1.1 +content-type: application/json +accept: application/json +content-length: 31 + +{"metadata": {"type": "tabby"}} +--===============7330845974216740156== +content-type: application/http +content-transfer-encoding: binary +content-id: + +PATCH /storage/v1/b/example-bucket/o/obj2 HTTP/1.1 +content-type: application/json +accept: application/json +content-length: 32 + +{"metadata": {"type": "tuxedo"}} +--===============7330845974216740156== +content-type: application/http +content-transfer-encoding: binary +content-id: + +PATCH /storage/v1/b/example-bucket/o/obj3 HTTP/1.1 +content-type: application/json +accept: application/json +content-length: 32 + +{"metadata": {"type": "calico"}} +--===============7330845974216740156==-- +"#; + + assert_eq!( + expected, + // Rust can't represent `\r` in a string literal, so we + // replace `\r\n` with `\n` for comparison + String::from_utf8(body.to_vec()) + .unwrap() + .replace("\r\n", "\n") + ); + } + + /// This test is inspired by + #[test] + fn test_multipart_mixed_azblob_batch_delete() { + let multipart = Multipart::new() + .with_boundary("batch_357de4f7-6d0b-4e02-8cd2-6361411a9525") + .part( + MixedPart::new("/container0/blob0") + .method(Method::DELETE) + .part_header("content-id".parse().unwrap(), "0".parse().unwrap()) + .header( + "x-ms-date".parse().unwrap(), + "Thu, 14 Jun 2018 16:46:54 GMT".parse().unwrap(), + ) + .header( + "authorization".parse().unwrap(), + "SharedKey account:G4jjBXA7LI/RnWKIOQ8i9xH4p76pAQ+4Fs4R1VxasaE=" + .parse() + .unwrap(), + ) + .header("content-length".parse().unwrap(), "0".parse().unwrap()), + ) + .part( + MixedPart::new("/container1/blob1") + .method(Method::DELETE) + .part_header("content-id".parse().unwrap(), "1".parse().unwrap()) + .header( + "x-ms-date".parse().unwrap(), + "Thu, 14 Jun 2018 16:46:54 GMT".parse().unwrap(), + ) + .header( + "authorization".parse().unwrap(), + "SharedKey account:IvCoYDQ+0VcaA/hKFjUmQmIxXv2RT3XwwTsOTHL39HI=" + .parse() + .unwrap(), + ) + .header("content-length".parse().unwrap(), "0".parse().unwrap()), + ) + .part( + MixedPart::new("/container2/blob2") + .method(Method::DELETE) + .part_header("content-id".parse().unwrap(), "2".parse().unwrap()) + .header( + "x-ms-date".parse().unwrap(), + "Thu, 14 Jun 2018 16:46:54 GMT".parse().unwrap(), + ) + .header( + "authorization".parse().unwrap(), + "SharedKey account:S37N2JTjcmOQVLHLbDmp2johz+KpTJvKhbVc4M7+UqI=" + .parse() + .unwrap(), + ) + .header("content-length".parse().unwrap(), "0".parse().unwrap()), + ); + + let (_, body) = multipart.build(); + + let expected = r#"--batch_357de4f7-6d0b-4e02-8cd2-6361411a9525 +content-type: application/http +content-transfer-encoding: binary +content-id: 0 + +DELETE /container0/blob0 HTTP/1.1 +x-ms-date: Thu, 14 Jun 2018 16:46:54 GMT +authorization: SharedKey account:G4jjBXA7LI/RnWKIOQ8i9xH4p76pAQ+4Fs4R1VxasaE= +content-length: 0 + +--batch_357de4f7-6d0b-4e02-8cd2-6361411a9525 +content-type: application/http +content-transfer-encoding: binary +content-id: 1 + +DELETE /container1/blob1 HTTP/1.1 +x-ms-date: Thu, 14 Jun 2018 16:46:54 GMT +authorization: SharedKey account:IvCoYDQ+0VcaA/hKFjUmQmIxXv2RT3XwwTsOTHL39HI= +content-length: 0 + +--batch_357de4f7-6d0b-4e02-8cd2-6361411a9525 +content-type: application/http +content-transfer-encoding: binary +content-id: 2 + +DELETE /container2/blob2 HTTP/1.1 +x-ms-date: Thu, 14 Jun 2018 16:46:54 GMT +authorization: SharedKey account:S37N2JTjcmOQVLHLbDmp2johz+KpTJvKhbVc4M7+UqI= +content-length: 0 + +--batch_357de4f7-6d0b-4e02-8cd2-6361411a9525-- "#; assert_eq!( From 5c4d1b3e274acb2759e25959507ae7974574ee5d Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 28 Apr 2023 19:06:15 +0800 Subject: [PATCH 3/6] Add request support Signed-off-by: Xuanwo --- core/src/raw/http_util/body.rs | 9 ++++----- core/src/raw/http_util/client.rs | 15 ++++++++++++++- core/src/services/ipmfs/writer.rs | 4 +++- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/core/src/raw/http_util/body.rs b/core/src/raw/http_util/body.rs index 3f7d5e735854..7a23b659d5de 100644 --- a/core/src/raw/http_util/body.rs +++ b/core/src/raw/http_util/body.rs @@ -43,11 +43,10 @@ pub enum AsyncBody { Empty, /// Body with bytes. Bytes(Bytes), - /// Body with a multipart field. - /// - /// If input with this field, we will goto the internal multipart - /// handle logic. - Multipart(Multipart), + /// Body with a multipart form-data. + MultipartFormData(Multipart), + /// Body with a multipart mixed. + MultipartMixed(Multipart), } type BytesStream = Box> + Send + Sync + Unpin>; diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index 6b478ef810c0..8d27bd3ef23b 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -99,7 +99,7 @@ impl HttpClient { req_builder = match body { AsyncBody::Empty => req_builder.body(reqwest::Body::from("")), AsyncBody::Bytes(bs) => req_builder.body(reqwest::Body::from(bs)), - AsyncBody::Multipart(mp) => { + AsyncBody::MultipartFormData(mp) => { let (boundary, bs) = mp.build(); // Insert content type with correct boundary. @@ -110,6 +110,19 @@ impl HttpClient { // Insert content length with calculated size. req_builder = req_builder.header(CONTENT_LENGTH, bs.len()); + req_builder.body(reqwest::Body::from(bs)) + } + AsyncBody::MultipartMixed(mp) => { + let (boundary, bs) = mp.build(); + + // Insert content type with correct boundary. + req_builder = req_builder.header( + CONTENT_TYPE, + format!("multipart/mixed; boundary={boundary}").as_str(), + ); + // Insert content length with calculated size. + req_builder = req_builder.header(CONTENT_LENGTH, bs.len()); + req_builder.body(reqwest::Body::from(bs)) } }; diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index de721345a4ae..81d9c6a5bca6 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -43,7 +43,9 @@ impl oio::Write for IpmfsWriter { .backend .ipmfs_write( &self.path, - AsyncBody::Multipart(Multipart::new().part(FormDataPart::new("data").content(bs))), + AsyncBody::MultipartFormData( + Multipart::new().part(FormDataPart::new("data").content(bs)), + ), ) .await?; From a0fd6d5add967d78b1340228940850d8b0e34103 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 28 Apr 2023 21:01:56 +0800 Subject: [PATCH 4/6] Refactor multipart Signed-off-by: Xuanwo --- core/src/raw/http_util/body.rs | 6 -- core/src/raw/http_util/client.rs | 28 ----- core/src/raw/http_util/mod.rs | 1 + core/src/raw/http_util/multipart.rs | 103 ++++++++++++------ core/src/services/azblob/backend.rs | 1 + core/src/services/azblob/batch.rs | 162 ++-------------------------- core/src/services/azblob/core.rs | 40 +++---- core/src/services/ipmfs/backend.rs | 8 +- core/src/services/ipmfs/writer.rs | 10 +- 9 files changed, 97 insertions(+), 262 deletions(-) diff --git a/core/src/raw/http_util/body.rs b/core/src/raw/http_util/body.rs index 7a23b659d5de..f6359ec4cee2 100644 --- a/core/src/raw/http_util/body.rs +++ b/core/src/raw/http_util/body.rs @@ -33,8 +33,6 @@ use crate::Error; use crate::ErrorKind; use crate::Result; -use super::multipart::Multipart; - /// Body used in async HTTP requests. #[derive(Default)] pub enum AsyncBody { @@ -43,10 +41,6 @@ pub enum AsyncBody { Empty, /// Body with bytes. Bytes(Bytes), - /// Body with a multipart form-data. - MultipartFormData(Multipart), - /// Body with a multipart mixed. - MultipartMixed(Multipart), } type BytesStream = Box> + Send + Sync + Unpin>; diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index 8d27bd3ef23b..c6cb38a468a4 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -21,8 +21,6 @@ use std::mem; use std::str::FromStr; use futures::TryStreamExt; -use http::header::CONTENT_LENGTH; -use http::header::CONTENT_TYPE; use http::Request; use http::Response; use reqwest::redirect::Policy; @@ -99,32 +97,6 @@ impl HttpClient { req_builder = match body { AsyncBody::Empty => req_builder.body(reqwest::Body::from("")), AsyncBody::Bytes(bs) => req_builder.body(reqwest::Body::from(bs)), - AsyncBody::MultipartFormData(mp) => { - let (boundary, bs) = mp.build(); - - // Insert content type with correct boundary. - req_builder = req_builder.header( - CONTENT_TYPE, - format!("multipart/form-data; boundary={boundary}").as_str(), - ); - // Insert content length with calculated size. - req_builder = req_builder.header(CONTENT_LENGTH, bs.len()); - - req_builder.body(reqwest::Body::from(bs)) - } - AsyncBody::MultipartMixed(mp) => { - let (boundary, bs) = mp.build(); - - // Insert content type with correct boundary. - req_builder = req_builder.header( - CONTENT_TYPE, - format!("multipart/mixed; boundary={boundary}").as_str(), - ); - // Insert content length with calculated size. - req_builder = req_builder.header(CONTENT_LENGTH, bs.len()); - - req_builder.body(reqwest::Body::from(bs)) - } }; let mut resp = req_builder.send().await.map_err(|err| { diff --git a/core/src/raw/http_util/mod.rs b/core/src/raw/http_util/mod.rs index 135aa675c0b5..dc6822b88011 100644 --- a/core/src/raw/http_util/mod.rs +++ b/core/src/raw/http_util/mod.rs @@ -64,3 +64,4 @@ mod multipart; pub use multipart::FormDataPart; pub use multipart::MixedPart; pub use multipart::Multipart; +pub use multipart::Part; diff --git a/core/src/raw/http_util/multipart.rs b/core/src/raw/http_util/multipart.rs index 24cfb8a2c824..d9ec1013caba 100644 --- a/core/src/raw/http_util/multipart.rs +++ b/core/src/raw/http_util/multipart.rs @@ -17,26 +17,29 @@ use std::str::FromStr; +use crate::*; use bytes::{Bytes, BytesMut}; use http::{ - header::{CONTENT_DISPOSITION, CONTENT_TYPE}, + header::{CONTENT_DISPOSITION, CONTENT_LENGTH, CONTENT_TYPE}, HeaderMap, HeaderName, HeaderValue, Method, Request, Uri, Version, }; +use super::{new_request_build_error, AsyncBody}; + /// Multipart is a builder for multipart/form-data. #[derive(Debug)] -pub struct Multipart { +pub struct Multipart { boundary: String, parts: Vec, } -impl Default for Multipart { +impl Default for Multipart { fn default() -> Self { Self::new() } } -impl Multipart { +impl Multipart { /// Create a new multipart with random boundary. pub fn new() -> Self { Multipart { @@ -57,20 +60,18 @@ impl Multipart { self.parts.push(part); self } -} -impl> Multipart { - pub(crate) fn build(self) -> (String, Bytes) { + pub(crate) fn build(&self) -> Bytes { let mut bs = BytesMut::new(); // Write headers. - for v in self.parts { + for v in self.parts.iter() { // Write the first boundary bs.extend_from_slice(b"--"); bs.extend_from_slice(self.boundary.as_bytes()); bs.extend_from_slice(b"\r\n"); - bs.extend_from_slice(v.into().as_ref()); + bs.extend_from_slice(v.build().as_ref()); } // Write the last boundary @@ -79,8 +80,40 @@ impl> Multipart { bs.extend_from_slice(b"--"); bs.extend_from_slice(b"\r\n"); - (self.boundary, bs.freeze()) + bs.freeze() } + + /// Consume the input and generate a request with multipart body. + /// + /// This founction will make sure content_type and content_length set correctly. + pub fn apply(self, mut builder: http::request::Builder) -> Result> { + let bs = self.build(); + + // Insert content type with correct boundary. + builder = builder.header( + CONTENT_TYPE, + format!("multipart/{}; boundary={}", T::TYPE, self.boundary).as_str(), + ); + // Insert content length with calculated size. + builder = builder.header(CONTENT_LENGTH, bs.len()); + + log::debug!("current body:\n{}", String::from_utf8_lossy(&bs)); + + builder + .body(AsyncBody::Bytes(bs)) + .map_err(new_request_build_error) + } +} + +/// Part is a trait for multipart part. +pub trait Part { + /// TYPE is the type of multipart. + /// + /// Current available types are: `form-data` and `mixed` + const TYPE: &'static str; + + /// Build will consume this part and generates the bytes. + fn build(&self) -> Bytes; } /// FormDataPart is a builder for multipart/form-data part. @@ -121,8 +154,12 @@ impl FormDataPart { self.content = content.into(); self } +} + +impl Part for FormDataPart { + const TYPE: &'static str = "form-data"; - pub(crate) fn build(self) -> Bytes { + fn build(&self) -> Bytes { let mut bs = BytesMut::new(); // Write headers. @@ -142,12 +179,6 @@ impl FormDataPart { } } -impl From for Bytes { - fn from(val: FormDataPart) -> Bytes { - val.build() - } -} - /// MixedPart is a builder for multipart/mixed part. #[derive(Debug)] pub struct MixedPart { @@ -180,13 +211,18 @@ impl MixedPart { } /// Build a mixed part from a request. - pub fn from_request(req: Request) -> Self { + pub fn from_request(req: Request) -> Self { let mut part_headers = HeaderMap::new(); part_headers.insert(CONTENT_TYPE, "application/http".parse().unwrap()); part_headers.insert("content-transfer-encoding", "binary".parse().unwrap()); let (parts, body) = req.into_parts(); + let content = match body { + AsyncBody::Empty => Bytes::new(), + AsyncBody::Bytes(bs) => bs, + }; + Self { part_headers, method: parts.method, @@ -195,7 +231,7 @@ impl MixedPart { .expect("the uri used to build a mixed part must be valid"), version: parts.version, headers: parts.headers, - content: body, + content, } } @@ -228,8 +264,12 @@ impl MixedPart { self.content = content.into(); self } +} + +impl Part for MixedPart { + const TYPE: &'static str = "mixed"; - pub(crate) fn build(self) -> Bytes { + fn build(&self) -> Bytes { let mut bs = BytesMut::new(); // Write parts headers. @@ -268,12 +308,6 @@ impl MixedPart { } } -impl From for Bytes { - fn from(val: MixedPart) -> Bytes { - val.build() - } -} - #[cfg(test)] mod tests { use super::*; @@ -284,22 +318,21 @@ mod tests { #[test] fn test_multipart_formdata_basic() { let multipart = Multipart::new() + .with_boundary("lalala") .part(FormDataPart::new("foo").content(Bytes::from("bar"))) .part(FormDataPart::new("hello").content(Bytes::from("world"))); - let (boundary, body) = multipart.build(); + let body = multipart.build(); - let expected = format!( - "--{boundary}\r\n\ + let expected = "--lalala\r\n\ content-disposition: form-data; name=\"foo\"\r\n\ \r\n\ bar\r\n\ - --{boundary}\r\n\ + --lalala\r\n\ content-disposition: form-data; name=\"hello\"\r\n\ \r\n\ world\r\n\ - --{boundary}--\r\n", - ); + --lalala--\r\n"; assert_eq!(expected, String::from_utf8(body.to_vec()).unwrap()); } @@ -322,7 +355,7 @@ mod tests { .part(FormDataPart::new("Signature").content("0RavWzkygo6QX9caELEqKi9kDbU=")) .part(FormDataPart::new("file").header(CONTENT_TYPE, "image/jpeg".parse().unwrap()).content("...file content...")).part(FormDataPart::new("submit").content("Upload to Amazon S3")); - let (_, body) = multipart.build(); + let body = multipart.build(); let expected = r#"--9431149156168 content-disposition: form-data; name="key" @@ -442,7 +475,7 @@ Upload to Amazon S3 .content(r#"{"metadata": {"type": "calico"}}"#), ); - let (_, body) = multipart.build(); + let body = multipart.build(); let expected = r#"--===============7330845974216740156== content-type: application/http @@ -544,7 +577,7 @@ content-length: 32 .header("content-length".parse().unwrap(), "0".parse().unwrap()), ); - let (_, body) = multipart.build(); + let body = multipart.build(); let expected = r#"--batch_357de4f7-6d0b-4e02-8cd2-6361411a9525 content-type: application/http diff --git a/core/src/services/azblob/backend.rs b/core/src/services/azblob/backend.rs index 69a59fe0a785..5cb3dae7cb5e 100644 --- a/core/src/services/azblob/backend.rs +++ b/core/src/services/azblob/backend.rs @@ -648,6 +648,7 @@ impl Accessor for AzblobBackend { "batch delete limit exceeded", )); } + // construct and complete batch request let resp = self.core.azblob_batch_delete(&paths).await?; diff --git a/core/src/services/azblob/batch.rs b/core/src/services/azblob/batch.rs index 8386aa1621d8..41cbefabb933 100644 --- a/core/src/services/azblob/batch.rs +++ b/core/src/services/azblob/batch.rs @@ -15,100 +15,12 @@ // specific language governing permissions and limitations // under the License. -use bytes::BufMut; -use bytes::BytesMut; -use http::header::CONTENT_LENGTH; -use http::header::CONTENT_TYPE; -use http::Request; use http::StatusCode; -use uuid::Uuid; use super::error::parse_http_error; use crate::raw::*; use crate::*; -const AZURE_BATCH_LIMIT: usize = 256; - -/// helper struct for batch requests -pub(crate) struct BatchDeleteRequestBuilder { - url: String, - sub_requests: Vec>, -} - -impl BatchDeleteRequestBuilder { - /// create a new batch request builder - pub fn new(url: &str) -> Self { - Self { - url: url.to_string(), - sub_requests: vec![], - } - } - /// append request in batch - /// - /// # Note - /// `sub_req` must have been signed by signer - pub fn append(&mut self, sub_req: Request) -> &mut Self { - debug_assert!(self.sub_requests.len() < AZURE_BATCH_LIMIT); - self.sub_requests.push(sub_req); - self - } - /// create an batch request, not signed by signer - pub fn try_into_req(self) -> Result> { - let boundary = format!("opendal-{}", Uuid::new_v4()); - let mut body = BytesMut::new(); - - let req_builder = Request::post(&self.url).header( - CONTENT_TYPE, - format!("multipart/mixed; boundary={}", boundary), - ); - - for (idx, req) in self.sub_requests.into_iter().enumerate() { - let headers: String = req - .headers() - .iter() - .map(|(k, v)| { - let (k, v) = (k.as_str(), v.to_str().unwrap()); - format!("{}: {}", k, v) - }) - .collect::>() - .join("\n"); - let path = req - .uri() - .path_and_query() - .expect("delete request comes with no path") - .to_string(); - - let block = format!( - r#"--{boundary} -Content-Type: application/http -Content-Transfer-Encoding: binary -Content-ID: {idx} - -{} {} HTTP/1.1 -{} - -"#, - req.method(), - path, - headers - ); - // replace LF with CRLF, required by Azure Storage Blobs service. - // - // The Rust compiler converts all CRLF sequences to LF when reading source files - // since 2019, so it is safe to convert here - let block = block.replace('\n', "\r\n"); - body.put(block.as_bytes()); - } - body.put(format!("--{}--", boundary).as_bytes()); - - let content_length = body.len(); - req_builder - .header(CONTENT_LENGTH, content_length) - .body(AsyncBody::Bytes(body.freeze())) - .map_err(new_request_build_error) - } -} - pub(super) fn parse_batch_delete_response( boundary: &str, body: String, @@ -117,12 +29,12 @@ pub(super) fn parse_batch_delete_response( let mut reps = Vec::with_capacity(expect.len()); let mut resp_packs: Vec<&str> = body.trim().split(&format!("--{boundary}")).collect(); - if resp_packs.len() != (expect.len() + 2) { - return Err(Error::new( - ErrorKind::Unexpected, - "invalid batch delete response", - )); - } + // if resp_packs.len() != (expect.len() + 1) { + // return Err(Error::new( + // ErrorKind::Unexpected, + // "invalid batch delete response", + // )); + // } // drop the tail resp_packs.pop(); for (resp_pack, name) in resp_packs[1..].iter().zip(expect.into_iter()) { @@ -177,67 +89,7 @@ pub(super) fn parse_batch_delete_response( #[cfg(test)] mod test { - use anyhow::anyhow; - use anyhow::Result; - use http::header::CONTENT_LENGTH; - use http::header::CONTENT_TYPE; - use http::Request; - - use super::BatchDeleteRequestBuilder; - use crate::raw::AsyncBody; - use crate::services::azblob::batch::parse_batch_delete_response; - - #[test] - fn batch_delete_req_builder_test() -> Result<()> { - let url = "https://test.blob.core.windows.net/test"; - let delete_url = "https://test.blob.core.windows.net/test/test.txt"; - let delete_req = Request::delete(delete_url) - .header(CONTENT_LENGTH, 0) - .body(AsyncBody::Empty) - .expect("must success"); - - let mut builder = BatchDeleteRequestBuilder::new(url); - builder.append(delete_req); - - let req = builder.try_into_req().expect("must success"); - - let (header, body) = req.into_parts(); - let content_type = header - .headers - .get(CONTENT_TYPE) - .expect("expect header in request: CONTENT_TYPE: application/mixed.") - .to_str() - .unwrap(); - let boundary = content_type - .split("boundary=") - .collect::>() - .get(1) - .expect("get invalid CONTENT_TYPE header in response") - .to_owned(); - - let bs = match body { - AsyncBody::Bytes(bs) => bs, - _ => return Err(anyhow!("wrong body type")), - }; - - let s = String::from_utf8_lossy(&bs); - let splits: Vec<&str> = s.split(&format!("--{}", boundary)).collect(); - assert_eq!(splits.len(), 3); - - let expect_body_str = r#" -Content-Type: application/http -Content-Transfer-Encoding: binary -Content-ID: 0 - -DELETE /test/test.txt HTTP/1.1 -content-length: 0 - -"# - .replace('\n', "\r\n"); - let actual_body_str = splits[1]; - assert_eq!(actual_body_str, expect_body_str); - Ok(()) - } + use super::*; #[test] fn test_break_down_batch() { diff --git a/core/src/services/azblob/core.rs b/core/src/services/azblob/core.rs index 6f4e9b195c19..02e210bb7c70 100644 --- a/core/src/services/azblob/core.rs +++ b/core/src/services/azblob/core.rs @@ -19,7 +19,6 @@ use std::fmt; use std::fmt::Debug; use std::fmt::Formatter; use std::fmt::Write; -use std::str::FromStr; use http::header::HeaderName; use http::header::CONTENT_LENGTH; @@ -28,12 +27,10 @@ use http::header::IF_MATCH; use http::header::IF_NONE_MATCH; use http::Request; use http::Response; -use http::Uri; use reqsign::AzureStorageCredential; use reqsign::AzureStorageLoader; use reqsign::AzureStorageSigner; -use super::batch::BatchDeleteRequestBuilder; use crate::raw::*; use crate::*; @@ -274,7 +271,7 @@ impl AzblobCore { self.send(req).await } - pub async fn azblob_delete_blob(&self, path: &str) -> Result> { + pub fn azblob_delete_blob_request(&self, path: &str) -> Result> { let p = build_abs_path(&self.root, path); let url = format!( @@ -286,9 +283,13 @@ impl AzblobCore { let req = Request::delete(&url); - let mut req = req + req.header(CONTENT_LENGTH, 0) .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; + .map_err(new_request_build_error) + } + + pub async fn azblob_delete_blob(&self, path: &str) -> Result> { + let mut req = self.azblob_delete_blob_request(path)?; self.sign(&mut req).await?; self.send(req).await @@ -364,35 +365,22 @@ impl AzblobCore { &self, paths: &[String], ) -> Result> { - // init batch request let url = format!( "{}/{}?restype=container&comp=batch", self.endpoint, self.container ); - let mut batch_delete_req_builder = BatchDeleteRequestBuilder::new(&url); - for path in paths.iter() { - // build sub requests - let p = build_abs_path(&self.root, path); - let encoded_path = percent_encode_path(&p); - - let url = Uri::from_str(&format!( - "{}/{}/{}", - self.endpoint, self.container, encoded_path - )) - .unwrap(); + let mut multipart = Multipart::new(); - let mut sub_req = Request::delete(&url.to_string()) - .header(CONTENT_LENGTH, 0) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.batch_sign(&mut sub_req).await?; + for path in paths.iter() { + let mut req = self.azblob_delete_blob_request(path)?; - batch_delete_req_builder.append(sub_req); + self.batch_sign(&mut req).await?; + multipart = multipart.part(MixedPart::from_request(req)); } - let mut req = batch_delete_req_builder.try_into_req()?; + let req = Request::post(url); + let mut req = multipart.apply(req)?; self.sign(&mut req).await?; self.send(req).await diff --git a/core/src/services/ipmfs/backend.rs b/core/src/services/ipmfs/backend.rs index 61c18232aef3..7432be7f4ea9 100644 --- a/core/src/services/ipmfs/backend.rs +++ b/core/src/services/ipmfs/backend.rs @@ -21,6 +21,7 @@ use std::str; use std::sync::Arc; use async_trait::async_trait; +use bytes::Bytes; use http::Request; use http::Response; use http::StatusCode; @@ -281,7 +282,7 @@ impl IpmfsBackend { pub async fn ipmfs_write( &self, path: &str, - body: AsyncBody, + body: Bytes, ) -> Result> { let p = build_rooted_abs_path(&self.root, path); @@ -291,9 +292,10 @@ impl IpmfsBackend { percent_encode_path(&p) ); - let req = Request::post(url); + let multipart = Multipart::new().part(FormDataPart::new("data").content(body)); - let req = req.body(body).map_err(new_request_build_error)?; + let req: http::request::Builder = Request::post(url); + let req = multipart.apply(req)?; self.client.send(req).await } diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index 81d9c6a5bca6..f704dcd0ebfb 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -39,15 +39,7 @@ impl IpmfsWriter { #[async_trait] impl oio::Write for IpmfsWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { - let resp = self - .backend - .ipmfs_write( - &self.path, - AsyncBody::MultipartFormData( - Multipart::new().part(FormDataPart::new("data").content(bs)), - ), - ) - .await?; + let resp = self.backend.ipmfs_write(&self.path, bs).await?; let status = resp.status(); From ae1756c7ab11820f8924230b41bbce8909a54cf4 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 28 Apr 2023 21:12:32 +0800 Subject: [PATCH 5/6] Fix Signed-off-by: Xuanwo --- core/src/raw/http_util/multipart.rs | 6 ++++-- core/src/services/azblob/batch.rs | 12 ++++++------ core/src/services/azblob/core.rs | 6 ++++-- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/core/src/raw/http_util/multipart.rs b/core/src/raw/http_util/multipart.rs index d9ec1013caba..1bff5df23d30 100644 --- a/core/src/raw/http_util/multipart.rs +++ b/core/src/raw/http_util/multipart.rs @@ -97,8 +97,6 @@ impl Multipart { // Insert content length with calculated size. builder = builder.header(CONTENT_LENGTH, bs.len()); - log::debug!("current body:\n{}", String::from_utf8_lossy(&bs)); - builder .body(AsyncBody::Bytes(bs)) .map_err(new_request_build_error) @@ -211,6 +209,10 @@ impl MixedPart { } /// Build a mixed part from a request. + /// + /// # Notes + /// + /// Mixed parts only takes the path from the request uri. pub fn from_request(req: Request) -> Self { let mut part_headers = HeaderMap::new(); part_headers.insert(CONTENT_TYPE, "application/http".parse().unwrap()); diff --git a/core/src/services/azblob/batch.rs b/core/src/services/azblob/batch.rs index 41cbefabb933..c6004be022c6 100644 --- a/core/src/services/azblob/batch.rs +++ b/core/src/services/azblob/batch.rs @@ -29,12 +29,12 @@ pub(super) fn parse_batch_delete_response( let mut reps = Vec::with_capacity(expect.len()); let mut resp_packs: Vec<&str> = body.trim().split(&format!("--{boundary}")).collect(); - // if resp_packs.len() != (expect.len() + 1) { - // return Err(Error::new( - // ErrorKind::Unexpected, - // "invalid batch delete response", - // )); - // } + if resp_packs.len() != (expect.len() + 2) { + return Err(Error::new( + ErrorKind::Unexpected, + "invalid batch delete response", + )); + } // drop the tail resp_packs.pop(); for (resp_pack, name) in resp_packs[1..].iter().zip(expect.into_iter()) { diff --git a/core/src/services/azblob/core.rs b/core/src/services/azblob/core.rs index 02e210bb7c70..62665a0e5480 100644 --- a/core/src/services/azblob/core.rs +++ b/core/src/services/azblob/core.rs @@ -372,11 +372,13 @@ impl AzblobCore { let mut multipart = Multipart::new(); - for path in paths.iter() { + for (idx, path) in paths.iter().enumerate() { let mut req = self.azblob_delete_blob_request(path)?; self.batch_sign(&mut req).await?; - multipart = multipart.part(MixedPart::from_request(req)); + multipart = multipart.part( + MixedPart::from_request(req).part_header("content-id".parse().unwrap(), idx.into()), + ); } let req = Request::post(url); From 36c7cc02d886e61278c9101eaa5a902555dff456 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 28 Apr 2023 21:37:56 +0800 Subject: [PATCH 6/6] Enable batch delete Signed-off-by: Xuanwo --- core/src/services/azblob/backend.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/services/azblob/backend.rs b/core/src/services/azblob/backend.rs index 5cb3dae7cb5e..5c07de8d6ca4 100644 --- a/core/src/services/azblob/backend.rs +++ b/core/src/services/azblob/backend.rs @@ -475,6 +475,7 @@ impl Accessor for AzblobBackend { copy: true, presign: self.has_sas_token, batch: true, + batch_delete: true, batch_max_operations: Some(AZBLOB_BATCH_LIMIT), ..Default::default() });