Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/services/cos/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ pub struct CosBackend {
impl Accessor for CosBackend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
type Writer = CosWriter;
type Writer = oio::MultipartUploadWriter<CosWriter>;
type BlockingWriter = ();
type Appender = CosAppender;
type Pager = CosPager;
Expand All @@ -294,6 +294,7 @@ impl Accessor for CosBackend {
write_can_sink: true,
write_with_content_type: true,
write_with_cache_control: true,
write_with_content_disposition: true,
write_without_content_length: true,

append: true,
Expand Down Expand Up @@ -323,7 +324,7 @@ impl Accessor for CosBackend {
async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
let mut req =
self.core
.cos_put_object_request(path, Some(0), None, None, AsyncBody::Empty)?;
.cos_put_object_request(path, Some(0), None, None, None, AsyncBody::Empty)?;

self.core.sign(&mut req).await?;

Expand Down Expand Up @@ -437,6 +438,7 @@ impl Accessor for CosBackend {
path,
None,
v.content_type(),
v.content_disposition(),
v.cache_control(),
AsyncBody::Empty,
)?,
Expand Down
21 changes: 9 additions & 12 deletions core/src/services/cos/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ impl CosCore {
path: &str,
size: Option<u64>,
content_type: Option<&str>,
content_disposition: Option<&str>,
cache_control: Option<&str>,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
Expand All @@ -168,7 +169,9 @@ impl CosCore {
if let Some(cache_control) = cache_control {
req = req.header(CACHE_CONTROL, cache_control)
}

if let Some(pos) = content_disposition {
req = req.header(CONTENT_DISPOSITION, pos)
Comment thread
Xuanwo marked this conversation as resolved.
}
if let Some(mime) = content_type {
req = req.header(CONTENT_TYPE, mime)
}
Expand Down Expand Up @@ -368,7 +371,7 @@ impl CosCore {
path: &str,
upload_id: &str,
part_number: usize,
size: Option<u64>,
size: u64,
body: AsyncBody,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);
Expand All @@ -382,11 +385,7 @@ impl CosCore {
);

let mut req = Request::put(&url);

if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size);
}

req = req.header(CONTENT_LENGTH, size);
// Set body
let mut req = req.body(body).map_err(new_request_build_error)?;

Expand All @@ -399,7 +398,7 @@ impl CosCore {
&self,
path: &str,
upload_id: &str,
parts: &[CompleteMultipartUploadRequestPart],
parts: Vec<CompleteMultipartUploadRequestPart>,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);

Expand All @@ -412,10 +411,8 @@ impl CosCore {

let req = Request::post(&url);

let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest {
part: parts.to_vec(),
})
.map_err(new_xml_deserialize_error)?;
let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { part: parts })
.map_err(new_xml_deserialize_error)?;
// Make sure content length has been set to avoid post with chunked encoding.
let req = req.header(CONTENT_LENGTH, content.len());
// Set content-type to `application/xml` to avoid mixed with form post.
Expand Down
165 changes: 41 additions & 124 deletions core/src/services/cos/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::sync::Arc;

use async_trait::async_trait;
use bytes::Buf;
use bytes::Bytes;
use http::StatusCode;

use super::core::*;
Expand All @@ -32,33 +31,29 @@ pub struct CosWriter {

op: OpWrite,
path: String,
upload_id: Option<String>,

parts: Vec<CompleteMultipartUploadRequestPart>,
buffer: oio::VectorCursor,
buffer_size: usize,
}

impl CosWriter {
pub fn new(core: Arc<CosCore>, path: &str, op: OpWrite) -> Self {
let buffer_size = core.write_min_size;
CosWriter {
pub fn new(core: Arc<CosCore>, path: &str, op: OpWrite) -> oio::MultipartUploadWriter<Self> {
let write_min_size = core.write_min_size;
let total_size = op.content_length();
let cos_writer = CosWriter {
core,
path: path.to_string(),
op,

upload_id: None,
parts: vec![],
buffer: oio::VectorCursor::new(),
buffer_size,
}
};
oio::MultipartUploadWriter::new(cos_writer, total_size).with_write_min_size(write_min_size)
}
}

async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> {
#[async_trait]
impl oio::MultipartUploadWrite for CosWriter {
async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
let mut req = self.core.cos_put_object_request(
&self.path,
Some(size),
self.op.content_type(),
self.op.content_disposition(),
self.op.cache_control(),
body,
)?;
Expand All @@ -78,7 +73,7 @@ impl CosWriter {
}
}

async fn initiate_upload(&self) -> Result<String> {
async fn initiate_part(&self) -> Result<String> {
let resp = self
.core
.cos_initiate_multipart_upload(
Expand Down Expand Up @@ -107,20 +102,16 @@ impl CosWriter {
async fn write_part(
&self,
upload_id: &str,
bs: Bytes,
) -> Result<CompleteMultipartUploadRequestPart> {
part_number: usize,
size: u64,
body: AsyncBody,
) -> Result<oio::MultipartUploadPart> {
// COS requires part number must between [1..=10000]
let part_number = self.parts.len() + 1;
let part_number = part_number + 1;

let resp = self
.core
.cos_upload_part_request(
&self.path,
upload_id,
part_number,
Some(bs.len() as u64),
AsyncBody::Bytes(bs),
)
.cos_upload_part_request(&self.path, upload_id, part_number, size, body)
.await?;

let status = resp.status();
Expand All @@ -138,78 +129,43 @@ impl CosWriter {

resp.into_body().consume().await?;

Ok(CompleteMultipartUploadRequestPart { part_number, etag })
Ok(oio::MultipartUploadPart { part_number, etag })
}
_ => Err(parse_error(resp).await?),
}
}
}

#[async_trait]
impl oio::Write for CosWriter {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let upload_id = match &self.upload_id {
Some(upload_id) => upload_id,
None => {
if self.op.content_length().unwrap_or_default() == bs.len() as u64 {
return self
.write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
.await;
} else {
let upload_id = self.initiate_upload().await?;
self.upload_id = Some(upload_id);
self.upload_id.as_deref().unwrap()
}
}
};
async fn complete_part(
&self,
upload_id: &str,
parts: &[oio::MultipartUploadPart],
) -> Result<()> {
let parts = parts
.iter()
.map(|p| CompleteMultipartUploadRequestPart {
part_number: p.part_number,
etag: p.etag.clone(),
})
.collect();

// Ignore empty bytes
if bs.is_empty() {
return Ok(());
}
let resp = self
.core
.cos_complete_multipart_upload(&self.path, upload_id, parts)
.await?;

self.buffer.push(bs);
// Return directly if the buffer is not full
if self.buffer.len() <= self.buffer_size {
return Ok(());
}
let status = resp.status();

let bs = self.buffer.peak_at_least(self.buffer_size);
let size = bs.len();
match status {
StatusCode::OK => {
resp.into_body().consume().await?;

match self.write_part(upload_id, bs).await {
Ok(part) => {
self.buffer.take(size);
self.parts.push(part);
Ok(())
}
Err(e) => {
// If the upload fails, we should pop the given bs to make sure
// write is re-enter safe.
self.buffer.pop();
Err(e)
}
}
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
if self.op.content_length().unwrap_or_default() == size {
return self.write_oneshot(size, AsyncBody::Stream(s)).await;
} else {
return Err(Error::new(
ErrorKind::Unsupported,
"COS does not support streaming multipart upload",
));
_ => Err(parse_error(resp).await?),
}
}

async fn abort(&mut self) -> Result<()> {
let upload_id = if let Some(upload_id) = &self.upload_id {
upload_id
} else {
return Ok(());
};

async fn abort_part(&self, upload_id: &str) -> Result<()> {
let resp = self
.core
.cos_abort_multipart_upload(&self.path, upload_id)
Expand All @@ -224,43 +180,4 @@ impl oio::Write for CosWriter {
_ => Err(parse_error(resp).await?),
}
}

async fn close(&mut self) -> Result<()> {
let upload_id = if let Some(upload_id) = &self.upload_id {
upload_id
} else {
return Ok(());
};

// Make sure internal buffer has been flushed.
if !self.buffer.is_empty() {
let bs = self.buffer.peak_exact(self.buffer.len());

match self.write_part(upload_id, bs).await {
Ok(part) => {
self.buffer.clear();
self.parts.push(part);
}
Err(e) => {
return Err(e);
}
}
}

let resp = self
.core
.cos_complete_multipart_upload(&self.path, upload_id, &self.parts)
.await?;

let status = resp.status();

match status {
StatusCode::OK => {
resp.into_body().consume().await?;

Ok(())
}
_ => Err(parse_error(resp).await?),
}
}
}