diff --git a/core/src/services/webhdfs/backend.rs b/core/src/services/webhdfs/backend.rs index 18f4b55ca1cc..ba804db99596 100644 --- a/core/src/services/webhdfs/backend.rs +++ b/core/src/services/webhdfs/backend.rs @@ -25,6 +25,7 @@ use http::Request; use http::Response; use http::StatusCode; use log::debug; +use serde::Deserialize; use tokio::sync::OnceCell; use super::error::parse_error; @@ -34,6 +35,7 @@ use super::message::BooleanResp; use super::message::FileStatusType; use super::message::FileStatusWrapper; use super::writer::WebhdfsWriter; +use super::writer::WebhdfsWriters; use crate::raw::*; use crate::*; @@ -236,6 +238,56 @@ impl WebhdfsBackend { req.body(body).map_err(new_request_build_error) } + pub async fn webhdfs_init_append_request(&self, path: &str) -> Result { + let p = build_abs_path(&self.root, path); + let mut url = format!( + "{}/webhdfs/v1/{}?op=APPEND&noredirect=true", + self.endpoint, + percent_encode_path(&p), + ); + if let Some(auth) = &self.auth { + url += &format!("&{auth}"); + } + + let req = Request::post(url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + let resp = self.client.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let bs = resp.into_body().bytes().await?; + let resp: InitAppendResponse = + serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; + + Ok(resp.location) + } + _ => Err(parse_error(resp).await?), + } + } + + pub async fn webhdfs_append_request( + &self, + location: &str, + size: u64, + body: AsyncBody, + ) -> Result> { + let mut url = location.to_string(); + + if let Some(auth) = &self.auth { + url += &format!("&{auth}"); + } + + let mut req = Request::post(&url); + + req = req.header(CONTENT_LENGTH, size.to_string()); + + req.body(body).map_err(new_request_build_error) + } + async fn webhdfs_open_request( &self, path: &str, @@ -329,7 +381,10 @@ impl WebhdfsBackend { self.client.send(req).await } - async fn webhdfs_get_file_status(&self, path: &str) -> Result> { + pub(super) async fn webhdfs_get_file_status( + &self, + path: &str, + ) -> Result> { let p = build_abs_path(&self.root, path); let mut url = format!( "{}/webhdfs/v1/{}?op=GETFILESTATUS", @@ -395,7 +450,7 @@ impl WebhdfsBackend { #[async_trait] impl Accessor for WebhdfsBackend { type Reader = IncomingAsyncBody; - type Writer = oio::OneShotWriter; + type Writer = WebhdfsWriters; type Lister = oio::PageLister; type BlockingReader = (); type BlockingWriter = (); @@ -413,6 +468,7 @@ impl Accessor for WebhdfsBackend { read_with_range: true, write: true, + write_can_append: true, create_dir: true, delete: true, @@ -524,10 +580,15 @@ impl Accessor for WebhdfsBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - Ok(( - RpWrite::default(), - oio::OneShotWriter::new(WebhdfsWriter::new(self.clone(), args, path.to_string())), - )) + let w = WebhdfsWriter::new(self.clone(), args.clone(), path.to_string()); + + let w = if args.append() { + WebhdfsWriters::Two(oio::AppendObjectWriter::new(w)) + } else { + WebhdfsWriters::One(oio::OneShotWriter::new(w)) + }; + + Ok((RpWrite::default(), w)) } async fn delete(&self, path: &str, _: OpDelete) -> Result { @@ -555,3 +616,9 @@ impl Accessor for WebhdfsBackend { Ok((RpList::default(), oio::PageLister::new(l))) } } + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub(super) struct InitAppendResponse { + pub location: String, +} diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index e6d866b9e29f..296f3edb1063 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -24,6 +24,9 @@ use crate::raw::oio::WriteBuf; use crate::raw::*; use crate::*; +pub type WebhdfsWriters = + TwoWays, oio::AppendObjectWriter>; + pub struct WebhdfsWriter { backend: WebhdfsBackend, @@ -62,3 +65,62 @@ impl oio::OneShotWrite for WebhdfsWriter { } } } + +#[async_trait] +impl oio::AppendObjectWrite for WebhdfsWriter { + async fn offset(&self) -> Result { + Ok(0) + } + + async fn append(&self, _offset: u64, size: u64, body: AsyncBody) -> Result<()> { + let resp = self.backend.webhdfs_get_file_status(&self.path).await?; + + let status = resp.status(); + + let location; + + match status { + StatusCode::OK => { + location = self.backend.webhdfs_init_append_request(&self.path).await?; + } + StatusCode::NOT_FOUND => { + let req = self.backend.webhdfs_create_object_request( + &self.path, + None, + &self.op, + AsyncBody::Empty, + )?; + + let resp = self.backend.client.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::CREATED | StatusCode::OK => { + resp.into_body().consume().await?; + + location = self.backend.webhdfs_init_append_request(&self.path).await?; + } + _ => return Err(parse_error(resp).await?), + } + } + _ => return Err(parse_error(resp).await?), + } + + let req = self + .backend + .webhdfs_append_request(&location, size, body) + .await?; + + let resp = self.backend.client.send(req).await?; + + let status = resp.status(); + match status { + StatusCode::OK => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } +} diff --git a/fixtures/webhdfs/docker-compose-webhdfs.yml b/fixtures/webhdfs/docker-compose-webhdfs.yml index 24f16a28a7e8..3b93f6df3782 100644 --- a/fixtures/webhdfs/docker-compose-webhdfs.yml +++ b/fixtures/webhdfs/docker-compose-webhdfs.yml @@ -40,3 +40,4 @@ services: - CLUSTER_NAME=test - WEBHDFS_CONF_dfs_webhdfs_enabled=true - CORE_CONF_hadoop_http_staticuser_user=root + - HDFS_CONF_dfs_replication=1