From d55065265f87785a2885afdbbd8ace44e90184a8 Mon Sep 17 00:00:00 2001 From: suyanhanx Date: Mon, 22 May 2023 15:57:50 +0800 Subject: [PATCH 1/6] feat(services/oss): add append support Signed-off-by: suyanhanx --- core/src/services/oss/appender.rs | 144 ++++++++++++++++++++++++++++++ core/src/services/oss/backend.rs | 2 + core/src/services/oss/core.rs | 48 ++++++++++ core/src/services/oss/mod.rs | 1 + core/src/types/capability.rs | 3 + 5 files changed, 198 insertions(+) create mode 100644 core/src/services/oss/appender.rs diff --git a/core/src/services/oss/appender.rs b/core/src/services/oss/appender.rs new file mode 100644 index 000000000000..71eb1b69c92d --- /dev/null +++ b/core/src/services/oss/appender.rs @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use http::StatusCode; + +use super::core::*; +use super::error::parse_error; +use crate::ops::OpAppend; +use crate::raw::*; +use crate::*; + +pub const X_OSS_NEXT_APPEND_POSITION: &str = "x-oss-next-append-position"; + +pub struct OssAppender { + core: Arc, + + op: OpAppend, + path: String, + + position: Option, +} + +impl OssAppender { + #[allow(dead_code)] + pub fn new(core: Arc, path: &str, op: OpAppend) -> Self { + Self { + core, + op, + path: path.to_string(), + position: None, + } + } +} + +#[async_trait] +impl oio::Append for OssAppender { + async fn append(&mut self, bs: Bytes) -> Result<()> { + // If the position is not set, we need to get the current position. + if self.position.is_none() { + let resp = self.core.oss_head_object(&self.path, None, None).await?; + + let status = resp.status(); + match status { + StatusCode::OK => { + let position = resp + .headers() + .get(X_OSS_NEXT_APPEND_POSITION) + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.parse::().ok()) + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "missing x-oss-next-append-position, the object may not be appendable", + ) + })?; + self.position = Some(position); + } + StatusCode::NOT_FOUND => { + self.position = Some(0); + } + _ => { + return Err(parse_error(resp).await?); + } + } + } + + let mut req = self.core.oss_append_object_request( + &self.path, + self.position.expect("position is not set"), + bs.len(), + &self.op, + AsyncBody::Bytes(bs), + )?; + + self.core.sign(&mut req).await?; + + let resp = self.core.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let position = resp + .headers() + .get(X_OSS_NEXT_APPEND_POSITION) + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.parse::().ok()) + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "missing x-oss-next-append-position, the object may not be appendable", + ) + })?; + self.position = Some(position); + Ok(()) + } + StatusCode::CONFLICT => { + // The object is not appendable or the position is not match with the object's length. + // If the position is not match, we could get the current position and retry. + let position = resp + .headers() + .get(X_OSS_NEXT_APPEND_POSITION) + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.parse::().ok()) + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "missing x-oss-next-append-position, the object may not be appendable", + ) + })?; + self.position = Some(position); + + // Then return the error to the caller, so the caller could retry. + Err(Error::new( + ErrorKind::ConditionNotMatch, + "the position is not match with the object's length. position has been updated.", + )) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn close(&mut self) -> Result<()> { + Ok(()) + } +} diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 713236f148ef..468c23aed31b 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -469,6 +469,8 @@ impl Accessor for OssBackend { create_dir: true, copy: true, + append: true, + list: true, list_with_delimiter_slash: true, list_without_delimiter: true, diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs index 68297dfcc492..e6eee43090bf 100644 --- a/core/src/services/oss/core.rs +++ b/core/src/services/oss/core.rs @@ -37,6 +37,7 @@ use reqsign::AliyunOssSigner; use serde::Deserialize; use serde::Serialize; +use crate::ops::OpAppend; use crate::ops::OpWrite; use crate::raw::*; use crate::*; @@ -186,6 +187,53 @@ impl OssCore { Ok(req) } + /// Oss append object request + /// + /// # Note + /// + /// This request is used to append data to an existing object or create an appendable object. + /// So you must set the `append` and `position` header. + /// + /// https://www.alibabacloud.com/help/object-storage-service/latest/appendobject + pub fn oss_append_object_request( + &self, + path: &str, + position: u64, + size: usize, + args: &OpAppend, + body: AsyncBody, + ) -> Result> { + let p = build_abs_path(&self.root, path); + let endpoint = self.get_endpoint(false); + let url = format!("{}/{}", endpoint, percent_encode_path(&p)); + + let mut req = Request::post(&url); + + // The header `append` does not need a value. + req = req.header(HeaderName::from_static("append"), ""); + req = req.header(HeaderName::from_static("position"), position); + + req = req.header(CONTENT_LENGTH, size); + + if let Some(mime) = args.content_type() { + req = req.header(CONTENT_TYPE, mime); + } + + if let Some(pos) = args.content_disposition() { + req = req.header(CONTENT_DISPOSITION, pos); + } + + if let Some(cache_control) = args.cache_control() { + req = req.header(CACHE_CONTROL, cache_control) + } + + // set sse headers + req = self.insert_sse_headers(req); + + let req = req.body(body).map_err(new_request_build_error)?; + Ok(req) + } + pub fn oss_get_object_request( &self, path: &str, diff --git a/core/src/services/oss/mod.rs b/core/src/services/oss/mod.rs index 9bfd1692c7d0..21829ea51dd9 100644 --- a/core/src/services/oss/mod.rs +++ b/core/src/services/oss/mod.rs @@ -18,6 +18,7 @@ mod backend; pub use backend::OssBuilder as Oss; +mod appender; mod core; mod error; mod pager; diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs index c6b8e981b944..1153a93f8329 100644 --- a/core/src/types/capability.rs +++ b/core/src/types/capability.rs @@ -151,6 +151,9 @@ impl Debug for Capability { if self.write { s.push("Write"); } + if self.append { + s.push("Append"); + } if self.create_dir { s.push("CreateDir"); } From 49273be52209f7cfbb7dc65cf9b250b91a9f1d4a Mon Sep 17 00:00:00 2001 From: suyanhanx Date: Mon, 22 May 2023 23:10:59 +0800 Subject: [PATCH 2/6] set capability Signed-off-by: suyanhanx --- core/src/services/oss/backend.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 468c23aed31b..7b7320840857 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -470,6 +470,9 @@ impl Accessor for OssBackend { copy: true, append: true, + append_with_cache_control: true, + append_with_content_type: true, + append_with_content_disposition: true, list: true, list_with_delimiter_slash: true, From d0498cc6fcc908f58cbd0b966105480f315ebdb1 Mon Sep 17 00:00:00 2001 From: suyanhanx Date: Mon, 22 May 2023 23:11:47 +0800 Subject: [PATCH 3/6] fix `you` Signed-off-by: suyanhanx --- core/src/services/oss/core.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs index e6eee43090bf..eb9e7979aac5 100644 --- a/core/src/services/oss/core.rs +++ b/core/src/services/oss/core.rs @@ -192,7 +192,7 @@ impl OssCore { /// # Note /// /// This request is used to append data to an existing object or create an appendable object. - /// So you must set the `append` and `position` header. + /// So we must set the `append` and `position` header. /// /// https://www.alibabacloud.com/help/object-storage-service/latest/appendobject pub fn oss_append_object_request( From 46a765b2d66c4f5ace4fcbcc53247188f37f4024 Mon Sep 17 00:00:00 2001 From: suyanhanx Date: Tue, 23 May 2023 10:11:55 +0800 Subject: [PATCH 4/6] add append for oss Signed-off-by: suyanhanx --- core/src/services/oss/appender.rs | 1 - core/src/services/oss/backend.rs | 10 +++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/core/src/services/oss/appender.rs b/core/src/services/oss/appender.rs index 71eb1b69c92d..b327d824931a 100644 --- a/core/src/services/oss/appender.rs +++ b/core/src/services/oss/appender.rs @@ -39,7 +39,6 @@ pub struct OssAppender { } impl OssAppender { - #[allow(dead_code)] pub fn new(core: Arc, path: &str, op: OpAppend) -> Self { Self { core, diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 7b7320840857..c436fc98314c 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -30,6 +30,7 @@ use reqsign::AliyunConfig; use reqsign::AliyunLoader; use reqsign::AliyunOssSigner; +use super::appender::OssAppender; use super::core::*; use super::error::parse_error; use super::pager::OssPager; @@ -441,7 +442,7 @@ impl Accessor for OssBackend { type BlockingReader = (); type Writer = OssWriter; type BlockingWriter = (); - type Appender = (); + type Appender = OssAppender; type Pager = OssPager; type BlockingPager = (); @@ -538,6 +539,13 @@ impl Accessor for OssBackend { )) } + async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { + Ok(( + RpAppend::default(), + OssAppender::new(self.core.clone(), path, args), + )) + } + async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { let resp = self.core.oss_copy_object(from, to).await?; let status = resp.status(); From 61dafaaa8f5419cb831711832536b963a37a00c4 Mon Sep 17 00:00:00 2001 From: suyanhanx Date: Tue, 23 May 2023 10:28:38 +0800 Subject: [PATCH 5/6] fix append Signed-off-by: suyanhanx --- core/src/services/oss/core.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs index eb9e7979aac5..6fd36ce27c70 100644 --- a/core/src/services/oss/core.rs +++ b/core/src/services/oss/core.rs @@ -205,14 +205,15 @@ impl OssCore { ) -> Result> { let p = build_abs_path(&self.root, path); let endpoint = self.get_endpoint(false); - let url = format!("{}/{}", endpoint, percent_encode_path(&p)); + let url = format!( + "{}/{}?append&position={}", + endpoint, + percent_encode_path(&p), + position + ); let mut req = Request::post(&url); - // The header `append` does not need a value. - req = req.header(HeaderName::from_static("append"), ""); - req = req.header(HeaderName::from_static("position"), position); - req = req.header(CONTENT_LENGTH, size); if let Some(mime) = args.content_type() { From 10e114d1278ec7db97e2e6d27dc62cb573b492b6 Mon Sep 17 00:00:00 2001 From: suyanhanx Date: Tue, 23 May 2023 10:42:53 +0800 Subject: [PATCH 6/6] try copy from appender Signed-off-by: suyanhanx --- core/src/types/operator/operator.rs | 6 ++--- core/tests/behavior/append.rs | 39 ++++++++++++++++++++++++++++- 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index e97ed5dc5de5..da0cf1c7dc15 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -1006,9 +1006,9 @@ impl Operator { } let bs = bs.into(); - let (_, mut w) = self.inner().append(&path, args).await?; - w.append(bs).await?; - w.close().await?; + let (_, mut a) = self.inner().append(&path, args).await?; + a.append(bs).await?; + a.close().await?; Ok(()) } diff --git a/core/tests/behavior/append.rs b/core/tests/behavior/append.rs index 1c03a3e6a6da..e9e55d2aed80 100644 --- a/core/tests/behavior/append.rs +++ b/core/tests/behavior/append.rs @@ -16,10 +16,13 @@ // under the License. use anyhow::Result; +use log::warn; use opendal::ops::OpAppend; use opendal::EntryMode; use opendal::ErrorKind; use opendal::Operator; +use sha2::Digest; +use sha2::Sha256; use super::utils::*; @@ -68,6 +71,7 @@ macro_rules! behavior_append_tests { test_append_with_content_type, test_append_with_content_disposition, + test_appender_futures_copy, test_fuzz_appender, ); )* @@ -101,7 +105,7 @@ pub async fn test_append(op: Operator) -> Result<()> { /// Test append to a directory path must fail. pub async fn test_append_with_dir_path(op: Operator) -> Result<()> { - let path = uuid::Uuid::new_v4().to_string(); + let path = format!("{}/", uuid::Uuid::new_v4()); let (content, _) = gen_bytes(); let res = op.append(&path, content).await; @@ -197,6 +201,39 @@ pub async fn test_append_with_content_disposition(op: Operator) -> Result<()> { Ok(()) } +/// Copy data from reader to writer +pub async fn test_appender_futures_copy(op: Operator) -> Result<()> { + let path = uuid::Uuid::new_v4().to_string(); + let (content, size): (Vec, usize) = + gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024); + + let mut a = match op.appender(&path).await { + Ok(a) => a, + Err(err) if err.kind() == ErrorKind::Unsupported => { + warn!("service doesn't support write with append"); + return Ok(()); + } + Err(err) => return Err(err.into()), + }; + + futures::io::copy(&mut content.as_slice(), &mut a).await?; + a.close().await?; + + let meta = op.stat(&path).await.expect("stat must succeed"); + assert_eq!(meta.content_length(), size as u64); + + let bs = op.read(&path).await?; + assert_eq!(bs.len(), size, "read size"); + assert_eq!( + format!("{:x}", Sha256::digest(&bs[..size])), + format!("{:x}", Sha256::digest(content)), + "read content" + ); + + op.delete(&path).await.expect("delete must succeed"); + Ok(()) +} + /// Test for fuzzing appender. pub async fn test_fuzz_appender(op: Operator) -> Result<()> { let path = uuid::Uuid::new_v4().to_string();