diff --git a/core/src/services/azdls/backend.rs b/core/src/services/azdls/backend.rs index 612ce98f71aa..33e3a0efe6a1 100644 --- a/core/src/services/azdls/backend.rs +++ b/core/src/services/azdls/backend.rs @@ -32,6 +32,7 @@ use super::error::parse_error; use super::pager::AzdlsPager; use super::writer::AzdlsWriter; use crate::raw::*; +use crate::services::azdls::writer::AzdlsWriters; use crate::*; /// Known endpoint suffix Azure Data Lake Storage Gen2 URI syntax. @@ -230,7 +231,7 @@ pub struct AzdlsBackend { impl Accessor for AzdlsBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = oio::OneShotWriter; + type Writer = AzdlsWriters; type BlockingWriter = (); type Pager = AzdlsPager; type BlockingPager = (); @@ -248,6 +249,7 @@ impl Accessor for AzdlsBackend { read_with_range: true, write: true, + write_can_append: true, create_dir: true, delete: true, rename: true, @@ -299,10 +301,13 @@ impl Accessor for AzdlsBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - Ok(( - RpWrite::default(), - oio::OneShotWriter::new(AzdlsWriter::new(self.core.clone(), args, path.to_string())), - )) + let w = AzdlsWriter::new(self.core.clone(), args.clone(), path.to_string()); + let w = if args.append() { + AzdlsWriters::Two(oio::AppendObjectWriter::new(w)) + } else { + AzdlsWriters::One(oio::OneShotWriter::new(w)) + }; + Ok((RpWrite::default(), w)) } async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result { diff --git a/core/src/services/azdls/core.rs b/core/src/services/azdls/core.rs index d83f981a414c..d17296f9cb88 100644 --- a/core/src/services/azdls/core.rs +++ b/core/src/services/azdls/core.rs @@ -203,7 +203,8 @@ impl AzdlsCore { pub fn azdls_update_request( &self, path: &str, - size: Option, + size: Option, + position: u64, body: AsyncBody, ) -> Result> { let p = build_abs_path(&self.root, path); @@ -211,10 +212,11 @@ impl AzdlsCore { // - close: Make this is the final action to this file. // - flush: Flush the file directly. let url = format!( - "{}/{}/{}?action=append&close=true&flush=true&position=0", + "{}/{}/{}?action=append&close=true&flush=true&position={}", self.endpoint, self.filesystem, - percent_encode_path(&p) + percent_encode_path(&p), + position ); let mut req = Request::patch(&url); diff --git a/core/src/services/azdls/writer.rs b/core/src/services/azdls/writer.rs index 2cbbac4f32fe..bc0201fbf4d4 100644 --- a/core/src/services/azdls/writer.rs +++ b/core/src/services/azdls/writer.rs @@ -26,6 +26,9 @@ use crate::raw::oio::WriteBuf; use crate::raw::*; use crate::*; +pub type AzdlsWriters = + oio::TwoWaysWriter, oio::AppendObjectWriter>; + pub struct AzdlsWriter { core: Arc, @@ -65,7 +68,8 @@ impl oio::OneShotWrite for AzdlsWriter { let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining())); let mut req = self.core.azdls_update_request( &self.path, - Some(bs.len()), + Some(bs.len() as u64), + 0, AsyncBody::ChunkedBytes(bs), )?; @@ -85,3 +89,40 @@ impl oio::OneShotWrite for AzdlsWriter { } } } + +#[async_trait] +impl oio::AppendObjectWrite for AzdlsWriter { + async fn offset(&self) -> Result { + let resp = self.core.azdls_get_properties(&self.path).await?; + + let status = resp.status(); + let headers = resp.headers(); + + match status { + StatusCode::OK => Ok(parse_content_length(headers)?.unwrap_or_default()), + StatusCode::NOT_FOUND => Ok(0), + _ => Err(parse_error(resp).await?), + } + } + + async fn append(&self, offset: u64, size: u64, body: AsyncBody) -> Result<()> { + let mut req = self + .core + .azdls_update_request(&self.path, Some(size), offset, body)?; + + self.core.sign(&mut req).await?; + + let resp = self.core.send(req).await?; + + let status = resp.status(); + match status { + StatusCode::OK | StatusCode::ACCEPTED => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp) + .await? + .with_operation("Backend::azdls_update_request")), + } + } +}