Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 73 additions & 6 deletions core/src/services/webhdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use http::Request;
use http::Response;
use http::StatusCode;
use log::debug;
use serde::Deserialize;
use tokio::sync::OnceCell;

use super::error::parse_error;
Expand All @@ -34,6 +35,7 @@ use super::message::BooleanResp;
use super::message::FileStatusType;
use super::message::FileStatusWrapper;
use super::writer::WebhdfsWriter;
use super::writer::WebhdfsWriters;
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -236,6 +238,56 @@ impl WebhdfsBackend {
req.body(body).map_err(new_request_build_error)
}

pub async fn webhdfs_init_append_request(&self, path: &str) -> Result<String> {
let p = build_abs_path(&self.root, path);
let mut url = format!(
"{}/webhdfs/v1/{}?op=APPEND&noredirect=true",
self.endpoint,
percent_encode_path(&p),
);
if let Some(auth) = &self.auth {
url += &format!("&{auth}");
}

let req = Request::post(url)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;

let resp = self.client.send(req).await?;

let status = resp.status();

match status {
StatusCode::OK => {
let bs = resp.into_body().bytes().await?;
let resp: InitAppendResponse =
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;

Ok(resp.location)
}
_ => Err(parse_error(resp).await?),
}
}

pub async fn webhdfs_append_request(
&self,
location: &str,
size: u64,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
let mut url = location.to_string();

if let Some(auth) = &self.auth {
url += &format!("&{auth}");
}

let mut req = Request::post(&url);

req = req.header(CONTENT_LENGTH, size.to_string());

req.body(body).map_err(new_request_build_error)
}

async fn webhdfs_open_request(
&self,
path: &str,
Expand Down Expand Up @@ -329,7 +381,10 @@ impl WebhdfsBackend {
self.client.send(req).await
}

async fn webhdfs_get_file_status(&self, path: &str) -> Result<Response<IncomingAsyncBody>> {
pub(super) async fn webhdfs_get_file_status(
&self,
path: &str,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);
let mut url = format!(
"{}/webhdfs/v1/{}?op=GETFILESTATUS",
Expand Down Expand Up @@ -395,7 +450,7 @@ impl WebhdfsBackend {
#[async_trait]
impl Accessor for WebhdfsBackend {
type Reader = IncomingAsyncBody;
type Writer = oio::OneShotWriter<WebhdfsWriter>;
type Writer = WebhdfsWriters;
type Lister = oio::PageLister<WebhdfsLister>;
type BlockingReader = ();
type BlockingWriter = ();
Expand All @@ -413,6 +468,7 @@ impl Accessor for WebhdfsBackend {
read_with_range: true,

write: true,
write_can_append: true,
create_dir: true,
delete: true,

Expand Down Expand Up @@ -524,10 +580,15 @@ impl Accessor for WebhdfsBackend {
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
Ok((
RpWrite::default(),
oio::OneShotWriter::new(WebhdfsWriter::new(self.clone(), args, path.to_string())),
))
let w = WebhdfsWriter::new(self.clone(), args.clone(), path.to_string());

let w = if args.append() {
WebhdfsWriters::Two(oio::AppendObjectWriter::new(w))
} else {
WebhdfsWriters::One(oio::OneShotWriter::new(w))
};

Ok((RpWrite::default(), w))
}

async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
Expand Down Expand Up @@ -555,3 +616,9 @@ impl Accessor for WebhdfsBackend {
Ok((RpList::default(), oio::PageLister::new(l)))
}
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub(super) struct InitAppendResponse {
pub location: String,
}
62 changes: 62 additions & 0 deletions core/src/services/webhdfs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ use crate::raw::oio::WriteBuf;
use crate::raw::*;
use crate::*;

pub type WebhdfsWriters =
TwoWays<oio::OneShotWriter<WebhdfsWriter>, oio::AppendObjectWriter<WebhdfsWriter>>;

pub struct WebhdfsWriter {
backend: WebhdfsBackend,

Expand Down Expand Up @@ -62,3 +65,62 @@ impl oio::OneShotWrite for WebhdfsWriter {
}
}
}

#[async_trait]
impl oio::AppendObjectWrite for WebhdfsWriter {
async fn offset(&self) -> Result<u64> {
Ok(0)
}

async fn append(&self, _offset: u64, size: u64, body: AsyncBody) -> Result<()> {
let resp = self.backend.webhdfs_get_file_status(&self.path).await?;

let status = resp.status();

let location;

match status {
StatusCode::OK => {
location = self.backend.webhdfs_init_append_request(&self.path).await?;
}
StatusCode::NOT_FOUND => {
let req = self.backend.webhdfs_create_object_request(
&self.path,
None,
&self.op,
AsyncBody::Empty,
)?;

let resp = self.backend.client.send(req).await?;

let status = resp.status();

match status {
StatusCode::CREATED | StatusCode::OK => {
resp.into_body().consume().await?;

location = self.backend.webhdfs_init_append_request(&self.path).await?;
}
_ => return Err(parse_error(resp).await?),
}
}
_ => return Err(parse_error(resp).await?),
}

let req = self
.backend
.webhdfs_append_request(&location, size, body)
.await?;

let resp = self.backend.client.send(req).await?;

let status = resp.status();
match status {
StatusCode::OK => {
resp.into_body().consume().await?;
Ok(())
}
_ => Err(parse_error(resp).await?),
}
}
}
1 change: 1 addition & 0 deletions fixtures/webhdfs/docker-compose-webhdfs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ services:
- CLUSTER_NAME=test
- WEBHDFS_CONF_dfs_webhdfs_enabled=true
- CORE_CONF_hadoop_http_staticuser_user=root
- HDFS_CONF_dfs_replication=1