diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs index d83a8a3761e2..78236f2f6613 100644 --- a/core/src/services/cos/backend.rs +++ b/core/src/services/cos/backend.rs @@ -268,7 +268,7 @@ pub struct CosBackend { impl Accessor for CosBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = CosWriter; + type Writer = oio::MultipartUploadWriter; type BlockingWriter = (); type Appender = CosAppender; type Pager = CosPager; @@ -294,6 +294,7 @@ impl Accessor for CosBackend { write_can_sink: true, write_with_content_type: true, write_with_cache_control: true, + write_with_content_disposition: true, write_without_content_length: true, append: true, @@ -323,7 +324,7 @@ impl Accessor for CosBackend { async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result { let mut req = self.core - .cos_put_object_request(path, Some(0), None, None, AsyncBody::Empty)?; + .cos_put_object_request(path, Some(0), None, None, None, AsyncBody::Empty)?; self.core.sign(&mut req).await?; @@ -437,6 +438,7 @@ impl Accessor for CosBackend { path, None, v.content_type(), + v.content_disposition(), v.cache_control(), AsyncBody::Empty, )?, diff --git a/core/src/services/cos/core.rs b/core/src/services/cos/core.rs index 56081cb18abf..6a81c7d7ef7b 100644 --- a/core/src/services/cos/core.rs +++ b/core/src/services/cos/core.rs @@ -153,6 +153,7 @@ impl CosCore { path: &str, size: Option, content_type: Option<&str>, + content_disposition: Option<&str>, cache_control: Option<&str>, body: AsyncBody, ) -> Result> { @@ -168,7 +169,9 @@ impl CosCore { if let Some(cache_control) = cache_control { req = req.header(CACHE_CONTROL, cache_control) } - + if let Some(pos) = content_disposition { + req = req.header(CONTENT_DISPOSITION, pos) + } if let Some(mime) = content_type { req = req.header(CONTENT_TYPE, mime) } @@ -368,7 +371,7 @@ impl CosCore { path: &str, upload_id: &str, part_number: usize, - size: Option, + size: u64, body: AsyncBody, ) -> Result> { let p = build_abs_path(&self.root, path); @@ -382,11 +385,7 @@ impl CosCore { ); let mut req = Request::put(&url); - - if let Some(size) = size { - req = req.header(CONTENT_LENGTH, size); - } - + req = req.header(CONTENT_LENGTH, size); // Set body let mut req = req.body(body).map_err(new_request_build_error)?; @@ -399,7 +398,7 @@ impl CosCore { &self, path: &str, upload_id: &str, - parts: &[CompleteMultipartUploadRequestPart], + parts: Vec, ) -> Result> { let p = build_abs_path(&self.root, path); @@ -412,10 +411,8 @@ impl CosCore { let req = Request::post(&url); - let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { - part: parts.to_vec(), - }) - .map_err(new_xml_deserialize_error)?; + let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { part: parts }) + .map_err(new_xml_deserialize_error)?; // Make sure content length has been set to avoid post with chunked encoding. let req = req.header(CONTENT_LENGTH, content.len()); // Set content-type to `application/xml` to avoid mixed with form post. diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs index 676e123cf1a6..702680ea3678 100644 --- a/core/src/services/cos/writer.rs +++ b/core/src/services/cos/writer.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::Buf; -use bytes::Bytes; use http::StatusCode; use super::core::*; @@ -32,33 +31,29 @@ pub struct CosWriter { op: OpWrite, path: String, - upload_id: Option, - - parts: Vec, - buffer: oio::VectorCursor, - buffer_size: usize, } impl CosWriter { - pub fn new(core: Arc, path: &str, op: OpWrite) -> Self { - let buffer_size = core.write_min_size; - CosWriter { + pub fn new(core: Arc, path: &str, op: OpWrite) -> oio::MultipartUploadWriter { + let write_min_size = core.write_min_size; + let total_size = op.content_length(); + let cos_writer = CosWriter { core, path: path.to_string(), op, - - upload_id: None, - parts: vec![], - buffer: oio::VectorCursor::new(), - buffer_size, - } + }; + oio::MultipartUploadWriter::new(cos_writer, total_size).with_write_min_size(write_min_size) } +} - async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> { +#[async_trait] +impl oio::MultipartUploadWrite for CosWriter { + async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { let mut req = self.core.cos_put_object_request( &self.path, Some(size), self.op.content_type(), + self.op.content_disposition(), self.op.cache_control(), body, )?; @@ -78,7 +73,7 @@ impl CosWriter { } } - async fn initiate_upload(&self) -> Result { + async fn initiate_part(&self) -> Result { let resp = self .core .cos_initiate_multipart_upload( @@ -107,20 +102,16 @@ impl CosWriter { async fn write_part( &self, upload_id: &str, - bs: Bytes, - ) -> Result { + part_number: usize, + size: u64, + body: AsyncBody, + ) -> Result { // COS requires part number must between [1..=10000] - let part_number = self.parts.len() + 1; + let part_number = part_number + 1; let resp = self .core - .cos_upload_part_request( - &self.path, - upload_id, - part_number, - Some(bs.len() as u64), - AsyncBody::Bytes(bs), - ) + .cos_upload_part_request(&self.path, upload_id, part_number, size, body) .await?; let status = resp.status(); @@ -138,78 +129,43 @@ impl CosWriter { resp.into_body().consume().await?; - Ok(CompleteMultipartUploadRequestPart { part_number, etag }) + Ok(oio::MultipartUploadPart { part_number, etag }) } _ => Err(parse_error(resp).await?), } } -} -#[async_trait] -impl oio::Write for CosWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { - let upload_id = match &self.upload_id { - Some(upload_id) => upload_id, - None => { - if self.op.content_length().unwrap_or_default() == bs.len() as u64 { - return self - .write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) - .await; - } else { - let upload_id = self.initiate_upload().await?; - self.upload_id = Some(upload_id); - self.upload_id.as_deref().unwrap() - } - } - }; + async fn complete_part( + &self, + upload_id: &str, + parts: &[oio::MultipartUploadPart], + ) -> Result<()> { + let parts = parts + .iter() + .map(|p| CompleteMultipartUploadRequestPart { + part_number: p.part_number, + etag: p.etag.clone(), + }) + .collect(); - // Ignore empty bytes - if bs.is_empty() { - return Ok(()); - } + let resp = self + .core + .cos_complete_multipart_upload(&self.path, upload_id, parts) + .await?; - self.buffer.push(bs); - // Return directly if the buffer is not full - if self.buffer.len() <= self.buffer_size { - return Ok(()); - } + let status = resp.status(); - let bs = self.buffer.peak_at_least(self.buffer_size); - let size = bs.len(); + match status { + StatusCode::OK => { + resp.into_body().consume().await?; - match self.write_part(upload_id, bs).await { - Ok(part) => { - self.buffer.take(size); - self.parts.push(part); Ok(()) } - Err(e) => { - // If the upload fails, we should pop the given bs to make sure - // write is re-enter safe. - self.buffer.pop(); - Err(e) - } - } - } - - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - if self.op.content_length().unwrap_or_default() == size { - return self.write_oneshot(size, AsyncBody::Stream(s)).await; - } else { - return Err(Error::new( - ErrorKind::Unsupported, - "COS does not support streaming multipart upload", - )); + _ => Err(parse_error(resp).await?), } } - async fn abort(&mut self) -> Result<()> { - let upload_id = if let Some(upload_id) = &self.upload_id { - upload_id - } else { - return Ok(()); - }; - + async fn abort_part(&self, upload_id: &str) -> Result<()> { let resp = self .core .cos_abort_multipart_upload(&self.path, upload_id) @@ -224,43 +180,4 @@ impl oio::Write for CosWriter { _ => Err(parse_error(resp).await?), } } - - async fn close(&mut self) -> Result<()> { - let upload_id = if let Some(upload_id) = &self.upload_id { - upload_id - } else { - return Ok(()); - }; - - // Make sure internal buffer has been flushed. - if !self.buffer.is_empty() { - let bs = self.buffer.peak_exact(self.buffer.len()); - - match self.write_part(upload_id, bs).await { - Ok(part) => { - self.buffer.clear(); - self.parts.push(part); - } - Err(e) => { - return Err(e); - } - } - } - - let resp = self - .core - .cos_complete_multipart_upload(&self.path, upload_id, &self.parts) - .await?; - - let status = resp.status(); - - match status { - StatusCode::OK => { - resp.into_body().consume().await?; - - Ok(()) - } - _ => Err(parse_error(resp).await?), - } - } }