From 2cf1fb53b98af089cfae4e0762c433ec897f00fe Mon Sep 17 00:00:00 2001 From: shuai_yang Date: Sun, 12 Nov 2023 14:44:53 +0800 Subject: [PATCH] feat(service): alluxio support write --- core/src/services/alluxio/backend.rs | 5 +- core/src/services/alluxio/core.rs | 4 +- core/src/services/alluxio/writer.rs | 124 ++++++++++++++++++++++++--- 3 files changed, 116 insertions(+), 17 deletions(-) diff --git a/core/src/services/alluxio/backend.rs b/core/src/services/alluxio/backend.rs index 58d33c07a3a3..e047bf89ce06 100644 --- a/core/src/services/alluxio/backend.rs +++ b/core/src/services/alluxio/backend.rs @@ -24,7 +24,6 @@ use async_trait::async_trait; use log::debug; use serde::Deserialize; -use crate::raw::oio::OneShotWriter; use crate::raw::*; use crate::*; @@ -201,6 +200,7 @@ impl Accessor for AlluxioBackend { write: true, /// https://github.com/Alluxio/alluxio/issues/8212 write_can_append: false, + write_can_multi: true, create_dir: true, delete: true, @@ -229,8 +229,7 @@ impl Accessor for AlluxioBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let w = AlluxioWriter::new(self.core.clone(), args, path.to_string()); - let w = OneShotWriter::new(w); + let w = AlluxioWriter::new(self.core.clone(), args.clone(), path.to_string()); Ok((RpWrite::default(), w)) } diff --git a/core/src/services/alluxio/core.rs b/core/src/services/alluxio/core.rs index 86bf36923cf1..a68e0bcdc1ce 100644 --- a/core/src/services/alluxio/core.rs +++ b/core/src/services/alluxio/core.rs @@ -343,7 +343,9 @@ impl AlluxioCore { match status { StatusCode::OK => { let body = resp.into_body().bytes().await?; - Ok(body.len()) + let size: usize = + serde_json::from_slice(&body).map_err(new_json_serialize_error)?; + Ok(size) } _ => Err(parse_error(resp).await?), } diff --git a/core/src/services/alluxio/writer.rs b/core/src/services/alluxio/writer.rs index 4eb52ae9059f..404304d84466 100644 --- a/core/src/services/alluxio/writer.rs +++ b/core/src/services/alluxio/writer.rs @@ -16,42 +16,140 @@ // under the License. use std::sync::Arc; +use std::task::ready; +use std::task::Context; +use std::task::Poll; use async_trait::async_trait; +use futures::future::BoxFuture; +use crate::raw::oio::WriteBuf; use crate::raw::*; -use crate::Result; + +use crate::*; use super::core::AlluxioCore; -pub type AlluxioWriters = oio::OneShotWriter; +pub type AlluxioWriters = AlluxioWriter; pub struct AlluxioWriter { - core: Arc, + state: State, _op: OpWrite, path: String, + stream_id: Option, +} + +enum State { + Idle(Option>), + Init(BoxFuture<'static, (Arc, Result)>), + Write(BoxFuture<'static, (Arc, Result)>), + Close(BoxFuture<'static, (Arc, Result<()>)>), } impl AlluxioWriter { pub fn new(core: Arc, _op: OpWrite, path: String) -> Self { - AlluxioWriter { core, _op, path } + AlluxioWriter { + state: State::Idle(Some(core)), + _op, + path, + stream_id: None, + } } } +/// # Safety +/// +/// We will only take `&mut Self` reference for State. +unsafe impl Sync for State {} + #[async_trait] -impl oio::OneShotWrite for AlluxioWriter { - async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> { - let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining())); +impl oio::Write for AlluxioWriter { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn WriteBuf) -> Poll> { + loop { + match &mut self.state { + State::Idle(w) => match self.stream_id.as_ref() { + Some(stream_id) => { + let size = bs.remaining(); + let cb = oio::ChunkedBytes::from_vec(bs.vectored_bytes(size)).clone(); + + let stream_id = *stream_id; - let stream_id = self.core.create_file(&self.path).await?; + let w = w.take().expect("writer must be valid"); - self.core - .write(stream_id, AsyncBody::ChunkedBytes(bs)) - .await?; + self.state = State::Write(Box::pin(async move { + let part = w.write(stream_id, AsyncBody::ChunkedBytes(cb)).await; - self.core.close(stream_id).await?; + (w, part) + })); + } + None => { + let path = self.path.clone(); + let w = w.take().expect("writer must be valid"); + self.state = State::Init(Box::pin(async move { + let upload_id = w.create_file(&path).await; + (w, upload_id) + })); + } + }, + State::Init(fut) => { + let (w, stream_id) = ready!(fut.as_mut().poll(cx)); + self.state = State::Idle(Some(w)); + self.stream_id = Some(stream_id?); + } + State::Write(fut) => { + let (w, part) = ready!(fut.as_mut().poll(cx)); + self.state = State::Idle(Some(w)); + return Poll::Ready(Ok(part?)); + } + State::Close(_) => { + unreachable!( + "MultipartUploadWriter must not go into State::Close during poll_write" + ) + } + } + } + } + + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + loop { + match &mut self.state { + State::Idle(w) => { + let w = w.take().expect("writer must be valid"); + match self.stream_id { + Some(stream_id) => { + self.state = State::Close(Box::pin(async move { + let res = w.close(stream_id).await; + (w, res) + })); + } + None => { + return Poll::Ready(Ok(())); + } + } + } + State::Close(fut) => { + let (w, res) = futures::ready!(fut.as_mut().poll(cx)); + self.state = State::Idle(Some(w)); + + res?; + + return Poll::Ready(Ok(())); + } + State::Init(_) => { + unreachable!("AlluxioWriter must not go into State::Init during poll_close") + } + State::Write(_) => unreachable! { + "AlluxioWriter must not go into State::Write during poll_close" + }, + } + } + } - Ok(()) + fn poll_abort(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Err(Error::new( + ErrorKind::Unsupported, + "AlluxioWriter doesn't support abort", + ))) } }