From 78e4125849dbbba8173aebe14e6e64773868d36d Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 24 Jul 2023 11:44:48 +0800 Subject: [PATCH 01/10] Cleanup use Signed-off-by: Xuanwo --- core/src/raw/mod.rs | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/core/src/raw/mod.rs b/core/src/raw/mod.rs index e7c2bbcfc3da..34638b9ff9aa 100644 --- a/core/src/raw/mod.rs +++ b/core/src/raw/mod.rs @@ -27,28 +27,16 @@ //! them whenever possible. mod accessor; -pub use accessor::Accessor; -pub use accessor::AccessorInfo; -pub use accessor::FusedAccessor; +pub use accessor::*; mod layer; -pub use layer::Layer; -pub use layer::LayeredAccessor; - -pub mod oio; +pub use layer::*; mod path; -pub use path::build_abs_path; -pub use path::build_rel_path; -pub use path::build_rooted_abs_path; -pub use path::get_basename; -pub use path::get_parent; -pub use path::normalize_path; -pub use path::normalize_root; -pub use path::validate_path; +pub use path::*; mod operation; -pub use operation::Operation; +pub use operation::*; mod version; pub use version::VERSION; @@ -70,3 +58,4 @@ pub use chrono_util::*; // Expose as a pub mod to avoid confusing. pub mod adapters; +pub mod oio; From f59978b8c6974405dae2d1226deacd40fa3f8ea7 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 24 Jul 2023 11:49:55 +0800 Subject: [PATCH 02/10] Move append Signed-off-by: Xuanwo --- core/src/raw/oio/{append.rs => append/api.rs} | 0 core/src/raw/oio/append/mod.rs | 19 +++++++++++++++++++ 2 files changed, 19 insertions(+) rename core/src/raw/oio/{append.rs => append/api.rs} (100%) create mode 100644 core/src/raw/oio/append/mod.rs diff --git a/core/src/raw/oio/append.rs b/core/src/raw/oio/append/api.rs similarity index 100% rename from core/src/raw/oio/append.rs rename to core/src/raw/oio/append/api.rs diff --git a/core/src/raw/oio/append/mod.rs b/core/src/raw/oio/append/mod.rs new file mode 100644 index 000000000000..defb32e6b568 --- /dev/null +++ b/core/src/raw/oio/append/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod api; +pub use api::*; From 0c550d69ea1c437a30eff9d60df86514115915d7 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 24 Jul 2023 12:11:33 +0800 Subject: [PATCH 03/10] Move reader Signed-off-by: Xuanwo --- core/src/layers/complete.rs | 12 +++---- core/src/raw/oio/into_reader/mod.rs | 4 --- core/src/raw/oio/mod.rs | 8 ++--- core/src/raw/oio/{read.rs => read/api.rs} | 36 +++++++++++-------- .../into_read_by_range.rs} | 18 +++++----- .../src/raw/oio/{ => read}/into_streamable.rs | 12 +++---- core/src/raw/oio/read/mod.rs | 27 ++++++++++++++ 7 files changed, 73 insertions(+), 44 deletions(-) rename core/src/raw/oio/{read.rs => read/api.rs} (95%) rename core/src/raw/oio/{into_reader/by_range.rs => read/into_read_by_range.rs} (97%) rename core/src/raw/oio/{ => read}/into_streamable.rs (91%) create mode 100644 core/src/raw/oio/read/mod.rs diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 0c27b146fef8..a820b5ab21ca 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -25,11 +25,11 @@ use std::task::Poll; use async_trait::async_trait; use bytes::Bytes; -use crate::raw::oio::into_reader::RangeReader; use crate::raw::oio::to_flat_pager; use crate::raw::oio::to_hierarchy_pager; +use crate::raw::oio::ByRangeReader; use crate::raw::oio::Entry; -use crate::raw::oio::IntoStreamableReader; +use crate::raw::oio::StreamableReader; use crate::raw::oio::ToFlatPager; use crate::raw::oio::ToHierarchyPager; use crate::raw::*; @@ -193,7 +193,7 @@ impl CompleteReaderAccessor { (offset, size) } }; - let r = oio::into_reader::by_range(self.inner.clone(), path, r, offset, size); + let r = oio::into_read_by_range(self.inner.clone(), path, r, offset, size); if streamable { Ok((rp, CompleteReader::NeedSeekable(r))) @@ -532,9 +532,9 @@ impl LayeredAccessor for CompleteReaderAccessor { pub enum CompleteReader { AlreadyComplete(R), - NeedSeekable(RangeReader), - NeedStreamable(IntoStreamableReader), - NeedBoth(IntoStreamableReader>), + NeedSeekable(ByRangeReader), + NeedStreamable(StreamableReader), + NeedBoth(StreamableReader>), } impl oio::Read for CompleteReader diff --git a/core/src/raw/oio/into_reader/mod.rs b/core/src/raw/oio/into_reader/mod.rs index 609032327d6a..b6d424ae95cd 100644 --- a/core/src/raw/oio/into_reader/mod.rs +++ b/core/src/raw/oio/into_reader/mod.rs @@ -37,10 +37,6 @@ //! user call `poll_read` first, we can get the total_size from returning //! reader. In this way, we can save 40ms in average for every s3 read call. -mod by_range; -pub use by_range::by_range; -pub use by_range::RangeReader; - mod from_fd; pub use from_fd::from_fd; pub use from_fd::FdReader; diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs index fb1d31efda82..db01900098ac 100644 --- a/core/src/raw/oio/mod.rs +++ b/core/src/raw/oio/mod.rs @@ -23,12 +23,16 @@ //! our `output` traits. mod read; +pub use read::into_read_by_range; +pub use read::into_streamable_reader; pub use read::BlockingRead; pub use read::BlockingReader; +pub use read::ByRangeReader; pub use read::Read; pub use read::ReadExt; pub use read::ReadOperation; pub use read::Reader; +pub use read::StreamableReader; pub mod into_blocking_reader; pub mod into_reader; @@ -55,10 +59,6 @@ mod cursor; pub use cursor::Cursor; pub use cursor::VectorCursor; -mod into_streamable; -pub use into_streamable::into_streamable_reader; -pub use into_streamable::IntoStreamableReader; - mod entry; pub use entry::Entry; diff --git a/core/src/raw/oio/read.rs b/core/src/raw/oio/read/api.rs similarity index 95% rename from core/src/raw/oio/read.rs rename to core/src/raw/oio/read/api.rs index e3bce7a06585..9bde2b8018ac 100644 --- a/core/src/raw/oio/read.rs +++ b/core/src/raw/oio/read/api.rs @@ -149,17 +149,6 @@ impl Read for Box { } } -fn convert_to_io_error(err: Error) -> io::Error { - let kind = match err.kind() { - ErrorKind::NotFound => io::ErrorKind::NotFound, - ErrorKind::PermissionDenied => io::ErrorKind::PermissionDenied, - ErrorKind::InvalidInput => io::ErrorKind::InvalidInput, - _ => io::ErrorKind::Interrupted, - }; - - io::Error::new(kind, err) -} - impl futures::AsyncRead for dyn Read { fn poll_read( mut self: Pin<&mut Self>, @@ -167,7 +156,7 @@ impl futures::AsyncRead for dyn Read { buf: &mut [u8], ) -> Poll> { let this: &mut dyn Read = &mut *self; - this.poll_read(cx, buf).map_err(convert_to_io_error) + this.poll_read(cx, buf).map_err(format_io_error) } } @@ -178,7 +167,7 @@ impl futures::AsyncSeek for dyn Read { pos: io::SeekFrom, ) -> Poll> { let this: &mut dyn Read = &mut *self; - this.poll_seek(cx, pos).map_err(convert_to_io_error) + this.poll_seek(cx, pos).map_err(format_io_error) } } @@ -359,7 +348,7 @@ impl io::Read for dyn BlockingRead { #[inline] fn read(&mut self, buf: &mut [u8]) -> io::Result { let this: &mut dyn BlockingRead = &mut *self; - this.read(buf).map_err(convert_to_io_error) + this.read(buf).map_err(format_io_error) } } @@ -367,7 +356,7 @@ impl io::Seek for dyn BlockingRead { #[inline] fn seek(&mut self, pos: io::SeekFrom) -> io::Result { let this: &mut dyn BlockingRead = &mut *self; - this.seek(pos).map_err(convert_to_io_error) + this.seek(pos).map_err(format_io_error) } } @@ -380,3 +369,20 @@ impl Iterator for dyn BlockingRead { this.next() } } + +/// helper functions to format `Error` into `io::Error`. +/// +/// This function is added privately by design and only valid in current +/// context (i.e. `oio` crate). We don't want to expose this function to +/// users. +#[inline] +fn format_io_error(err: Error) -> io::Error { + let kind = match err.kind() { + ErrorKind::NotFound => io::ErrorKind::NotFound, + ErrorKind::PermissionDenied => io::ErrorKind::PermissionDenied, + ErrorKind::InvalidInput => io::ErrorKind::InvalidInput, + _ => io::ErrorKind::Interrupted, + }; + + io::Error::new(kind, err) +} diff --git a/core/src/raw/oio/into_reader/by_range.rs b/core/src/raw/oio/read/into_read_by_range.rs similarity index 97% rename from core/src/raw/oio/into_reader/by_range.rs rename to core/src/raw/oio/read/into_read_by_range.rs index cb761523bda8..ada195eb93f5 100644 --- a/core/src/raw/oio/into_reader/by_range.rs +++ b/core/src/raw/oio/read/into_read_by_range.rs @@ -36,14 +36,14 @@ use crate::*; /// /// This operation is not zero cost. If the accessor already returns a /// seekable reader, please don't use this. -pub fn by_range( +pub fn into_read_by_range( acc: Arc, path: &str, reader: A::Reader, offset: u64, size: u64, -) -> RangeReader { - RangeReader { +) -> ByRangeReader { + ByRangeReader { acc, path: path.to_string(), offset, @@ -55,8 +55,8 @@ pub fn by_range( } } -/// RangeReader that can do seek on non-seekable reader. -pub struct RangeReader { +/// ByRangeReader that can do seek on non-seekable reader. +pub struct ByRangeReader { acc: Arc, path: String, @@ -84,7 +84,7 @@ enum State { /// Safety: State will only be accessed under &mut. unsafe impl Sync for State {} -impl RangeReader { +impl ByRangeReader { fn read_future(&self) -> BoxFuture<'static, Result<(RpRead, A::Reader)>> { let acc = self.acc.clone(); let path = self.path.clone(); @@ -123,7 +123,7 @@ impl RangeReader { } } -impl oio::Read for RangeReader { +impl oio::Read for ByRangeReader { fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { match &mut self.state { State::Idle => { @@ -387,7 +387,7 @@ mod tests { let r = MockReader { inner: futures::io::Cursor::new(bs.to_vec()), }; - let mut r = Box::new(by_range(acc, "x", r, 0, bs.len() as u64)) as oio::Reader; + let mut r = Box::new(into_read_by_range(acc, "x", r, 0, bs.len() as u64)) as oio::Reader; let mut buf = Vec::new(); r.read_to_end(&mut buf).await?; @@ -421,7 +421,7 @@ mod tests { let r = MockReader { inner: futures::io::Cursor::new(bs[4096..4096 + 4096].to_vec()), }; - let mut r = Box::new(by_range(acc, "x", r, 4096, 4096)) as oio::Reader; + let mut r = Box::new(into_read_by_range(acc, "x", r, 4096, 4096)) as oio::Reader; let mut buf = Vec::new(); r.read_to_end(&mut buf).await?; diff --git a/core/src/raw/oio/into_streamable.rs b/core/src/raw/oio/read/into_streamable.rs similarity index 91% rename from core/src/raw/oio/into_streamable.rs rename to core/src/raw/oio/read/into_streamable.rs index 50af7d975e59..0f659ff08c31 100644 --- a/core/src/raw/oio/into_streamable.rs +++ b/core/src/raw/oio/read/into_streamable.rs @@ -26,9 +26,9 @@ use tokio::io::ReadBuf; use crate::raw::*; use crate::*; -/// as_streamable is used to make [`oio::Read`] or [`oio::BlockingRead`] streamable. -pub fn into_streamable_reader(r: R, capacity: usize) -> IntoStreamableReader { - IntoStreamableReader { +/// into_streamable is used to make [`oio::Read`] or [`oio::BlockingRead`] streamable. +pub fn into_streamable_reader(r: R, capacity: usize) -> StreamableReader { + StreamableReader { r, cap: capacity, buf: Vec::with_capacity(capacity), @@ -36,13 +36,13 @@ pub fn into_streamable_reader(r: R, capacity: usize) -> IntoStreamableReader< } /// Make given read streamable. -pub struct IntoStreamableReader { +pub struct StreamableReader { r: R, cap: usize, buf: Vec, } -impl oio::Read for IntoStreamableReader { +impl oio::Read for StreamableReader { fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { self.r.poll_read(cx, buf) } @@ -67,7 +67,7 @@ impl oio::Read for IntoStreamableReader { } } -impl oio::BlockingRead for IntoStreamableReader { +impl oio::BlockingRead for StreamableReader { fn read(&mut self, buf: &mut [u8]) -> Result { self.r.read(buf) } diff --git a/core/src/raw/oio/read/mod.rs b/core/src/raw/oio/read/mod.rs new file mode 100644 index 000000000000..b7ad51c1398b --- /dev/null +++ b/core/src/raw/oio/read/mod.rs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod api; +pub use api::*; + +mod into_streamable; +pub use into_streamable::into_streamable_reader; +pub use into_streamable::StreamableReader; + +mod into_read_by_range; +pub use into_read_by_range::into_read_by_range; +pub use into_read_by_range::ByRangeReader; From 172fc799d58fbfa1f92d82686b56f9b159f9ca29 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 24 Jul 2023 12:44:33 +0800 Subject: [PATCH 04/10] More more convert API Signed-off-by: Xuanwo --- core/src/raw/oio/into_reader/mod.rs | 42 ------------------- core/src/raw/oio/mod.rs | 3 +- .../into_read_from_file.rs} | 10 ++--- ..._streamable.rs => into_streamable_read.rs} | 0 core/src/raw/oio/read/mod.rs | 10 +++-- core/src/services/fs/backend.rs | 4 +- core/src/services/hdfs/backend.rs | 4 +- core/src/services/sftp/utils.rs | 6 +-- 8 files changed, 21 insertions(+), 58 deletions(-) delete mode 100644 core/src/raw/oio/into_reader/mod.rs rename core/src/raw/oio/{into_reader/from_fd.rs => read/into_read_from_file.rs} (94%) rename core/src/raw/oio/read/{into_streamable.rs => into_streamable_read.rs} (100%) diff --git a/core/src/raw/oio/into_reader/mod.rs b/core/src/raw/oio/into_reader/mod.rs deleted file mode 100644 index b6d424ae95cd..000000000000 --- a/core/src/raw/oio/into_reader/mod.rs +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! into_reader will provide different implementations to convert into -//! [`oio::Read`][crate::raw::oio::Read] -//! -//! - (Some(offset), Some(size)) => by_range -//! - (Some(offset), None) => by_offset -//! - (None, Some(size)) => by_tail -//! - (None, None) => by_offset -//! -//! The main different is whether and when to call `stat` to get total -//! content length. -//! -//! # TODO -//! -//! We only implement by_range so far. -//! -//! We should implement other types so that they can be zero cost on non-seek -//! cases. -//! -//! For example, for `by_full`, we don't need to do `stat` everytime. If -//! user call `poll_read` first, we can get the total_size from returning -//! reader. In this way, we can save 40ms in average for every s3 read call. - -mod from_fd; -pub use from_fd::from_fd; -pub use from_fd::FdReader; diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs index db01900098ac..65695adf5340 100644 --- a/core/src/raw/oio/mod.rs +++ b/core/src/raw/oio/mod.rs @@ -24,10 +24,12 @@ mod read; pub use read::into_read_by_range; +pub use read::into_read_from_file; pub use read::into_streamable_reader; pub use read::BlockingRead; pub use read::BlockingReader; pub use read::ByRangeReader; +pub use read::FromFileReader; pub use read::Read; pub use read::ReadExt; pub use read::ReadOperation; @@ -35,7 +37,6 @@ pub use read::Reader; pub use read::StreamableReader; pub mod into_blocking_reader; -pub mod into_reader; mod write; pub use write::BlockingWrite; diff --git a/core/src/raw/oio/into_reader/from_fd.rs b/core/src/raw/oio/read/into_read_from_file.rs similarity index 94% rename from core/src/raw/oio/into_reader/from_fd.rs rename to core/src/raw/oio/read/into_read_from_file.rs index 1c3e02d5519c..64b5f725139c 100644 --- a/core/src/raw/oio/into_reader/from_fd.rs +++ b/core/src/raw/oio/read/into_read_from_file.rs @@ -29,11 +29,11 @@ use crate::raw::*; use crate::*; /// Convert given fd into [`oio::Reader`]. -pub fn from_fd(fd: R, start: u64, end: u64) -> FdReader +pub fn into_read_from_file(fd: R, start: u64, end: u64) -> FromFileReader where R: AsyncRead + AsyncSeek + Unpin + Send + Sync, { - FdReader { + FromFileReader { inner: fd, start, end, @@ -42,7 +42,7 @@ where } /// FdReader is a wrapper of input fd to implement [`oio::Read`]. -pub struct FdReader { +pub struct FromFileReader { inner: R, start: u64, @@ -50,7 +50,7 @@ pub struct FdReader { offset: u64, } -impl FdReader +impl FromFileReader where R: AsyncRead + AsyncSeek + Unpin + Send + Sync, { @@ -60,7 +60,7 @@ where } } -impl oio::Read for FdReader +impl oio::Read for FromFileReader where R: AsyncRead + AsyncSeek + Unpin + Send + Sync, { diff --git a/core/src/raw/oio/read/into_streamable.rs b/core/src/raw/oio/read/into_streamable_read.rs similarity index 100% rename from core/src/raw/oio/read/into_streamable.rs rename to core/src/raw/oio/read/into_streamable_read.rs diff --git a/core/src/raw/oio/read/mod.rs b/core/src/raw/oio/read/mod.rs index b7ad51c1398b..0ceaa5baa8cb 100644 --- a/core/src/raw/oio/read/mod.rs +++ b/core/src/raw/oio/read/mod.rs @@ -18,10 +18,14 @@ mod api; pub use api::*; -mod into_streamable; -pub use into_streamable::into_streamable_reader; -pub use into_streamable::StreamableReader; +mod into_streamable_read; +pub use into_streamable_read::into_streamable_reader; +pub use into_streamable_read::StreamableReader; mod into_read_by_range; pub use into_read_by_range::into_read_by_range; pub use into_read_by_range::ByRangeReader; + +mod into_read_from_file; +pub use into_read_from_file::into_read_from_file; +pub use into_read_from_file::FromFileReader; diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index 505fda60d28a..42d1aa9eaf15 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -244,7 +244,7 @@ impl FsBackend { #[async_trait] impl Accessor for FsBackend { - type Reader = oio::into_reader::FdReader>; + type Reader = oio::FromFileReader>; type BlockingReader = oio::into_blocking_reader::FdReader; type Writer = FsWriter; type BlockingWriter = FsWriter; @@ -358,7 +358,7 @@ impl Accessor for FsBackend { (None, None) => (0, total_length), }; - let mut r = oio::into_reader::from_fd(f, start, end); + let mut r = oio::into_read_from_file(f, start, end); // Rewind to make sure we are on the correct offset. r.seek(SeekFrom::Start(0)).await?; diff --git a/core/src/services/hdfs/backend.rs b/core/src/services/hdfs/backend.rs index 27e9138cccc7..629a74a595ec 100644 --- a/core/src/services/hdfs/backend.rs +++ b/core/src/services/hdfs/backend.rs @@ -161,7 +161,7 @@ unsafe impl Sync for HdfsBackend {} #[async_trait] impl Accessor for HdfsBackend { - type Reader = oio::into_reader::FdReader; + type Reader = oio::FromFileReader; type BlockingReader = oio::into_blocking_reader::FdReader; type Writer = HdfsWriter; type BlockingWriter = HdfsWriter; @@ -231,7 +231,7 @@ impl Accessor for HdfsBackend { (None, None) => (0, meta.len()), }; - let mut r = oio::into_reader::from_fd(f, start, end); + let mut r = oio::into_read_from_file(f, start, end); // Rewind to make sure we are on the correct offset. r.seek(SeekFrom::Start(0)).await?; diff --git a/core/src/services/sftp/utils.rs b/core/src/services/sftp/utils.rs index 303aee70946e..fc8bb8a6f64c 100644 --- a/core/src/services/sftp/utils.rs +++ b/core/src/services/sftp/utils.rs @@ -29,7 +29,7 @@ use openssh_sftp_client::file::TokioCompatFile; use openssh_sftp_client::metadata::MetaData as SftpMeta; use crate::raw::oio; -use crate::raw::oio::into_reader::FdReader; +use crate::raw::oio::FromFileReader; use crate::raw::oio::ReadExt; use crate::EntryMode; use crate::Metadata; @@ -38,7 +38,7 @@ use crate::Result; pub struct SftpReaderInner { file: Pin>>, } -pub type SftpReader = FdReader; +pub type SftpReader = FromFileReader; impl SftpReaderInner { pub async fn new(file: File) -> Self { @@ -53,7 +53,7 @@ impl SftpReader { /// Create a new reader from a file, starting at the given offset and ending at the given offset. pub async fn new(file: File, start: u64, end: u64) -> Result { let file = SftpReaderInner::new(file).await; - let mut r = oio::into_reader::from_fd(file, start, end); + let mut r = oio::into_read_from_file(file, start, end); r.seek(SeekFrom::Start(0)).await?; Ok(r) } From f239b1609d0371dcb62ca9e526540a7e14f8e2df Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 24 Jul 2023 12:53:05 +0800 Subject: [PATCH 05/10] Renaming Signed-off-by: Xuanwo --- core/src/layers/complete.rs | 14 +++++------ core/src/raw/oio/mod.rs | 6 ++--- ...ange.rs => into_seekable_read_by_range.rs} | 25 +++++++++++++------ core/src/raw/oio/read/into_streamable_read.rs | 6 ++--- core/src/raw/oio/read/mod.rs | 8 +++--- core/src/types/reader.rs | 2 +- 6 files changed, 35 insertions(+), 26 deletions(-) rename core/src/raw/oio/read/{into_read_by_range.rs => into_seekable_read_by_range.rs} (95%) diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index a820b5ab21ca..fdd8f272757d 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -27,7 +27,7 @@ use bytes::Bytes; use crate::raw::oio::to_flat_pager; use crate::raw::oio::to_hierarchy_pager; -use crate::raw::oio::ByRangeReader; +use crate::raw::oio::ByRangeSeekableReader; use crate::raw::oio::Entry; use crate::raw::oio::StreamableReader; use crate::raw::oio::ToFlatPager; @@ -172,7 +172,7 @@ impl CompleteReaderAccessor { match (seekable, streamable) { (true, true) => Ok((rp, CompleteReader::AlreadyComplete(r))), (true, false) => { - let r = oio::into_streamable_reader(r, 256 * 1024); + let r = oio::into_streamable_read(r, 256 * 1024); Ok((rp, CompleteReader::NeedStreamable(r))) } _ => { @@ -193,12 +193,12 @@ impl CompleteReaderAccessor { (offset, size) } }; - let r = oio::into_read_by_range(self.inner.clone(), path, r, offset, size); + let r = oio::into_seekable_read_by_range(self.inner.clone(), path, r, offset, size); if streamable { Ok((rp, CompleteReader::NeedSeekable(r))) } else { - let r = oio::into_streamable_reader(r, 256 * 1024); + let r = oio::into_streamable_read(r, 256 * 1024); Ok((rp, CompleteReader::NeedBoth(r))) } } @@ -223,7 +223,7 @@ impl CompleteReaderAccessor { match (seekable, streamable) { (true, true) => Ok((rp, CompleteReader::AlreadyComplete(r))), (true, false) => { - let r = oio::into_streamable_reader(r, 256 * 1024); + let r = oio::into_streamable_read(r, 256 * 1024); Ok((rp, CompleteReader::NeedStreamable(r))) } (false, _) => Err(Error::new( @@ -532,9 +532,9 @@ impl LayeredAccessor for CompleteReaderAccessor { pub enum CompleteReader { AlreadyComplete(R), - NeedSeekable(ByRangeReader), + NeedSeekable(ByRangeSeekableReader), NeedStreamable(StreamableReader), - NeedBoth(StreamableReader>), + NeedBoth(StreamableReader>), } impl oio::Read for CompleteReader diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs index 65695adf5340..cf7940ee149c 100644 --- a/core/src/raw/oio/mod.rs +++ b/core/src/raw/oio/mod.rs @@ -23,12 +23,12 @@ //! our `output` traits. mod read; -pub use read::into_read_by_range; pub use read::into_read_from_file; -pub use read::into_streamable_reader; +pub use read::into_seekable_read_by_range; +pub use read::into_streamable_read; pub use read::BlockingRead; pub use read::BlockingReader; -pub use read::ByRangeReader; +pub use read::ByRangeSeekableReader; pub use read::FromFileReader; pub use read::Read; pub use read::ReadExt; diff --git a/core/src/raw/oio/read/into_read_by_range.rs b/core/src/raw/oio/read/into_seekable_read_by_range.rs similarity index 95% rename from core/src/raw/oio/read/into_read_by_range.rs rename to core/src/raw/oio/read/into_seekable_read_by_range.rs index ada195eb93f5..3e76f9b58d3d 100644 --- a/core/src/raw/oio/read/into_read_by_range.rs +++ b/core/src/raw/oio/read/into_seekable_read_by_range.rs @@ -32,18 +32,26 @@ use crate::*; /// Convert given reader into [`oio::Reader`] by range. /// +/// # Input +/// +/// The input is an Accessor will may return a non-seekable reader. +/// +/// # Output +/// +/// The output is a reader that can be seeked by range. +/// /// # Notes /// /// This operation is not zero cost. If the accessor already returns a /// seekable reader, please don't use this. -pub fn into_read_by_range( +pub fn into_seekable_read_by_range( acc: Arc, path: &str, reader: A::Reader, offset: u64, size: u64, -) -> ByRangeReader { - ByRangeReader { +) -> ByRangeSeekableReader { + ByRangeSeekableReader { acc, path: path.to_string(), offset, @@ -56,7 +64,7 @@ pub fn into_read_by_range( } /// ByRangeReader that can do seek on non-seekable reader. -pub struct ByRangeReader { +pub struct ByRangeSeekableReader { acc: Arc, path: String, @@ -84,7 +92,7 @@ enum State { /// Safety: State will only be accessed under &mut. unsafe impl Sync for State {} -impl ByRangeReader { +impl ByRangeSeekableReader { fn read_future(&self) -> BoxFuture<'static, Result<(RpRead, A::Reader)>> { let acc = self.acc.clone(); let path = self.path.clone(); @@ -123,7 +131,7 @@ impl ByRangeReader { } } -impl oio::Read for ByRangeReader { +impl oio::Read for ByRangeSeekableReader { fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { match &mut self.state { State::Idle => { @@ -387,7 +395,8 @@ mod tests { let r = MockReader { inner: futures::io::Cursor::new(bs.to_vec()), }; - let mut r = Box::new(into_read_by_range(acc, "x", r, 0, bs.len() as u64)) as oio::Reader; + let mut r = + Box::new(into_seekable_read_by_range(acc, "x", r, 0, bs.len() as u64)) as oio::Reader; let mut buf = Vec::new(); r.read_to_end(&mut buf).await?; @@ -421,7 +430,7 @@ mod tests { let r = MockReader { inner: futures::io::Cursor::new(bs[4096..4096 + 4096].to_vec()), }; - let mut r = Box::new(into_read_by_range(acc, "x", r, 4096, 4096)) as oio::Reader; + let mut r = Box::new(into_seekable_read_by_range(acc, "x", r, 4096, 4096)) as oio::Reader; let mut buf = Vec::new(); r.read_to_end(&mut buf).await?; diff --git a/core/src/raw/oio/read/into_streamable_read.rs b/core/src/raw/oio/read/into_streamable_read.rs index 0f659ff08c31..c3f90e24d585 100644 --- a/core/src/raw/oio/read/into_streamable_read.rs +++ b/core/src/raw/oio/read/into_streamable_read.rs @@ -27,7 +27,7 @@ use crate::raw::*; use crate::*; /// into_streamable is used to make [`oio::Read`] or [`oio::BlockingRead`] streamable. -pub fn into_streamable_reader(r: R, capacity: usize) -> StreamableReader { +pub fn into_streamable_read(r: R, capacity: usize) -> StreamableReader { StreamableReader { r, cap: capacity, @@ -113,7 +113,7 @@ mod tests { let cap = rng.gen_range(1..1024 * 1024); let r = oio::Cursor::from(content.clone()); - let mut s = into_streamable_reader(Box::new(r) as oio::Reader, cap); + let mut s = into_streamable_read(Box::new(r) as oio::Reader, cap); let mut bs = BytesMut::new(); while let Some(b) = s.next().await { @@ -136,7 +136,7 @@ mod tests { let cap = rng.gen_range(1..1024 * 1024); let r = oio::Cursor::from(content.clone()); - let mut s = into_streamable_reader(Box::new(r) as oio::BlockingReader, cap); + let mut s = into_streamable_read(Box::new(r) as oio::BlockingReader, cap); let mut bs = BytesMut::new(); while let Some(b) = s.next() { diff --git a/core/src/raw/oio/read/mod.rs b/core/src/raw/oio/read/mod.rs index 0ceaa5baa8cb..06a7538ed6b1 100644 --- a/core/src/raw/oio/read/mod.rs +++ b/core/src/raw/oio/read/mod.rs @@ -19,12 +19,12 @@ mod api; pub use api::*; mod into_streamable_read; -pub use into_streamable_read::into_streamable_reader; +pub use into_streamable_read::into_streamable_read; pub use into_streamable_read::StreamableReader; -mod into_read_by_range; -pub use into_read_by_range::into_read_by_range; -pub use into_read_by_range::ByRangeReader; +mod into_seekable_read_by_range; +pub use into_seekable_read_by_range::into_seekable_read_by_range; +pub use into_seekable_read_by_range::ByRangeSeekableReader; mod into_read_from_file; pub use into_read_from_file::into_read_from_file; diff --git a/core/src/types/reader.rs b/core/src/types/reader.rs index 0f3bd2eaaf29..4b328418250b 100644 --- a/core/src/types/reader.rs +++ b/core/src/types/reader.rs @@ -208,7 +208,7 @@ impl BlockingReader { r } else { // Make this capacity configurable. - Box::new(oio::into_streamable_reader(r, 256 * 1024)) + Box::new(oio::into_streamable_read(r, 256 * 1024)) }; Ok(BlockingReader { inner: r }) From ae4dd1ad28c61e49a3879440b682ce7e8a5c4532 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 24 Jul 2023 13:36:38 +0800 Subject: [PATCH 06/10] polish Signed-off-by: Xuanwo --- .../raw/oio/into_blocking_reader/from_fd.rs | 119 ------------------ core/src/raw/oio/into_blocking_reader/mod.rs | 23 ---- core/src/raw/oio/mod.rs | 2 - core/src/raw/oio/read/into_read_from_file.rs | 77 ++++++++++-- core/src/services/fs/backend.rs | 4 +- core/src/services/hdfs/backend.rs | 4 +- 6 files changed, 71 insertions(+), 158 deletions(-) delete mode 100644 core/src/raw/oio/into_blocking_reader/from_fd.rs delete mode 100644 core/src/raw/oio/into_blocking_reader/mod.rs diff --git a/core/src/raw/oio/into_blocking_reader/from_fd.rs b/core/src/raw/oio/into_blocking_reader/from_fd.rs deleted file mode 100644 index ac01b33e7562..000000000000 --- a/core/src/raw/oio/into_blocking_reader/from_fd.rs +++ /dev/null @@ -1,119 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::cmp; -use std::io::Read; -use std::io::Seek; -use std::io::SeekFrom; - -use bytes::Bytes; - -use crate::raw::*; -use crate::*; - -/// Convert given fd into [`oio::BlockingRead`]. -pub fn from_fd(fd: R, start: u64, end: u64) -> FdReader -where - R: Read + Seek + Send + Sync, -{ - FdReader { - inner: fd, - start, - end, - offset: 0, - } -} - -/// FdReader is a wrapper of input fd to implement [`oio::Read`]. -pub struct FdReader { - inner: R, - - start: u64, - end: u64, - offset: u64, -} - -impl FdReader -where - R: Read + Seek + Send + Sync, -{ - #[inline] - pub(crate) fn current_size(&self) -> i64 { - debug_assert!(self.offset >= self.start, "offset must in range"); - self.end as i64 - self.offset as i64 - } -} - -impl oio::BlockingRead for FdReader -where - R: Read + Seek + Send + Sync + 'static, -{ - fn read(&mut self, buf: &mut [u8]) -> Result { - if self.current_size() <= 0 { - return Ok(0); - } - - let max = cmp::min(buf.len() as u64, self.current_size() as u64) as usize; - // TODO: we can use pread instead. - let n = self.inner.read(&mut buf[..max]).map_err(|err| { - Error::new(ErrorKind::Unexpected, "read data from FdReader") - .with_context("source", "FdReader") - .set_source(err) - })?; - self.offset += n as u64; - Ok(n) - } - - /// TODO: maybe we don't need to do seek really, just call pread instead. - /// - /// We need to wait for tokio's pread support. - fn seek(&mut self, pos: SeekFrom) -> Result { - let (base, offset) = match pos { - SeekFrom::Start(n) => (self.start as i64, n as i64), - SeekFrom::End(n) => (self.end as i64, n), - SeekFrom::Current(n) => (self.offset as i64, n), - }; - - match base.checked_add(offset) { - Some(n) if n < 0 => Err(Error::new( - ErrorKind::InvalidInput, - "invalid seek to a negative or overflowing position", - )), - Some(n) => { - let cur = self.inner.seek(SeekFrom::Start(n as u64)).map_err(|err| { - Error::new(ErrorKind::Unexpected, "seek data from FdReader") - .with_context("source", "FdReader") - .set_source(err) - })?; - - self.offset = cur; - Ok(self.offset - self.start) - } - None => Err(Error::new( - ErrorKind::InvalidInput, - "invalid seek to a negative or overflowing position", - )), - } - } - - fn next(&mut self) -> Option> { - Some(Err(Error::new( - ErrorKind::Unsupported, - "output reader doesn't support iterating", - ))) - } -} diff --git a/core/src/raw/oio/into_blocking_reader/mod.rs b/core/src/raw/oio/into_blocking_reader/mod.rs deleted file mode 100644 index 6b476a2133ec..000000000000 --- a/core/src/raw/oio/into_blocking_reader/mod.rs +++ /dev/null @@ -1,23 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! into_blocking_reader will provide different implementations to convert -//! into [`oio::BlockingRead`][crate::raw::oio::BlockingRead] - -mod from_fd; -pub use from_fd::from_fd; -pub use from_fd::FdReader; diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs index cf7940ee149c..ef4f39c359c1 100644 --- a/core/src/raw/oio/mod.rs +++ b/core/src/raw/oio/mod.rs @@ -36,8 +36,6 @@ pub use read::ReadOperation; pub use read::Reader; pub use read::StreamableReader; -pub mod into_blocking_reader; - mod write; pub use write::BlockingWrite; pub use write::BlockingWriter; diff --git a/core/src/raw/oio/read/into_read_from_file.rs b/core/src/raw/oio/read/into_read_from_file.rs index 64b5f725139c..3f647fed01a1 100644 --- a/core/src/raw/oio/read/into_read_from_file.rs +++ b/core/src/raw/oio/read/into_read_from_file.rs @@ -16,12 +16,15 @@ // under the License. use std::cmp; +use std::io::Read; +use std::io::Seek; use std::io::SeekFrom; use std::pin::Pin; use std::task::ready; use std::task::Context; use std::task::Poll; +use bytes::Bytes; use futures::AsyncRead; use futures::AsyncSeek; @@ -29,10 +32,7 @@ use crate::raw::*; use crate::*; /// Convert given fd into [`oio::Reader`]. -pub fn into_read_from_file(fd: R, start: u64, end: u64) -> FromFileReader -where - R: AsyncRead + AsyncSeek + Unpin + Send + Sync, -{ +pub fn into_read_from_file(fd: R, start: u64, end: u64) -> FromFileReader { FromFileReader { inner: fd, start, @@ -42,7 +42,7 @@ where } /// FdReader is a wrapper of input fd to implement [`oio::Read`]. -pub struct FromFileReader { +pub struct FromFileReader { inner: R, start: u64, @@ -50,10 +50,7 @@ pub struct FromFileReader { offset: u64, } -impl FromFileReader -where - R: AsyncRead + AsyncSeek + Unpin + Send + Sync, -{ +impl FromFileReader { pub(crate) fn current_size(&self) -> i64 { debug_assert!(self.offset >= self.start, "offset must in range"); self.end as i64 - self.offset as i64 @@ -115,7 +112,7 @@ where } } - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { let _ = cx; Poll::Ready(Some(Err(Error::new( @@ -124,3 +121,63 @@ where )))) } } + +impl oio::BlockingRead for FromFileReader +where + R: Read + Seek + Send + Sync + 'static, +{ + fn read(&mut self, buf: &mut [u8]) -> Result { + if self.current_size() <= 0 { + return Ok(0); + } + + let max = cmp::min(buf.len() as u64, self.current_size() as u64) as usize; + // TODO: we can use pread instead. + let n = self.inner.read(&mut buf[..max]).map_err(|err| { + Error::new(ErrorKind::Unexpected, "read data from FdReader") + .with_context("source", "FdReader") + .set_source(err) + })?; + self.offset += n as u64; + Ok(n) + } + + /// TODO: maybe we don't need to do seek really, just call pread instead. + /// + /// We need to wait for tokio's pread support. + fn seek(&mut self, pos: SeekFrom) -> Result { + let (base, offset) = match pos { + SeekFrom::Start(n) => (self.start as i64, n as i64), + SeekFrom::End(n) => (self.end as i64, n), + SeekFrom::Current(n) => (self.offset as i64, n), + }; + + match base.checked_add(offset) { + Some(n) if n < 0 => Err(Error::new( + ErrorKind::InvalidInput, + "invalid seek to a negative or overflowing position", + )), + Some(n) => { + let cur = self.inner.seek(SeekFrom::Start(n as u64)).map_err(|err| { + Error::new(ErrorKind::Unexpected, "seek data from FdReader") + .with_context("source", "FdReader") + .set_source(err) + })?; + + self.offset = cur; + Ok(self.offset - self.start) + } + None => Err(Error::new( + ErrorKind::InvalidInput, + "invalid seek to a negative or overflowing position", + )), + } + } + + fn next(&mut self) -> Option> { + Some(Err(Error::new( + ErrorKind::Unsupported, + "output reader doesn't support iterating", + ))) + } +} diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index 42d1aa9eaf15..53da83e50dad 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -245,7 +245,7 @@ impl FsBackend { #[async_trait] impl Accessor for FsBackend { type Reader = oio::FromFileReader>; - type BlockingReader = oio::into_blocking_reader::FdReader; + type BlockingReader = oio::FromFileReader; type Writer = FsWriter; type BlockingWriter = FsWriter; type Appender = FsAppender; @@ -558,7 +558,7 @@ impl Accessor for FsBackend { (None, None) => (0, total_length), }; - let mut r = oio::into_blocking_reader::from_fd(f, start, end); + let mut r: oio::FromFileReader = oio::into_read_from_file(f, start, end); // Rewind to make sure we are on the correct offset. r.seek(SeekFrom::Start(0))?; diff --git a/core/src/services/hdfs/backend.rs b/core/src/services/hdfs/backend.rs index 629a74a595ec..0a6a255fbbf3 100644 --- a/core/src/services/hdfs/backend.rs +++ b/core/src/services/hdfs/backend.rs @@ -162,7 +162,7 @@ unsafe impl Sync for HdfsBackend {} #[async_trait] impl Accessor for HdfsBackend { type Reader = oio::FromFileReader; - type BlockingReader = oio::into_blocking_reader::FdReader; + type BlockingReader = oio::FromFileReader; type Writer = HdfsWriter; type BlockingWriter = HdfsWriter; type Appender = HdfsAppender; @@ -397,7 +397,7 @@ impl Accessor for HdfsBackend { (None, None) => (0, meta.len()), }; - let mut r = oio::into_blocking_reader::from_fd(f, start, end); + let mut r = oio::into_read_from_file(f, start, end); // Rewind to make sure we are on the correct offset. r.seek(SeekFrom::Start(0))?; From b958c3b4da70cc7fa4b3ead22ea61be7a745b9f7 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 24 Jul 2023 13:57:08 +0800 Subject: [PATCH 07/10] Add stream Signed-off-by: Xuanwo --- core/src/raw/http_util/client.rs | 7 ++----- core/src/raw/http_util/multipart.rs | 4 ++-- core/src/raw/oio/mod.rs | 4 ++-- core/src/raw/oio/read/into_read_from_file.rs | 2 +- core/src/raw/oio/{stream.rs => stream/api.rs} | 0 .../into_stream.rs} | 8 ++++---- .../into_stream_from_reader.rs} | 8 ++++---- core/src/raw/oio/{into_stream => stream}/mod.rs | 13 +++++++------ core/src/services/webhdfs/error.rs | 5 +---- core/src/types/writer.rs | 6 ++---- 10 files changed, 25 insertions(+), 32 deletions(-) rename core/src/raw/oio/{stream.rs => stream/api.rs} (100%) rename core/src/raw/oio/{into_stream/from_futures_stream.rs => stream/into_stream.rs} (87%) rename core/src/raw/oio/{into_stream/from_futures_reader.rs => stream/into_stream_from_reader.rs} (94%) rename core/src/raw/oio/{into_stream => stream}/mod.rs (75%) diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index 75137122a5c7..9578af9ce52e 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -27,7 +27,7 @@ use http::Response; use super::body::IncomingAsyncBody; use super::parse_content_length; use super::AsyncBody; -use crate::raw::oio::into_stream; +use crate::raw::*; use crate::Error; use crate::ErrorKind; use crate::Result; @@ -150,10 +150,7 @@ impl HttpClient { .set_source(err) }); - let body = IncomingAsyncBody::new( - Box::new(into_stream::from_futures_stream(stream)), - content_length, - ); + let body = IncomingAsyncBody::new(Box::new(oio::into_stream(stream)), content_length); let resp = hr.body(body).expect("response must build succeed"); diff --git a/core/src/raw/http_util/multipart.rs b/core/src/raw/http_util/multipart.rs index d9c932cdea62..1b64de3d0dc1 100644 --- a/core/src/raw/http_util/multipart.rs +++ b/core/src/raw/http_util/multipart.rs @@ -38,7 +38,7 @@ use http::Version; use super::new_request_build_error; use super::AsyncBody; use super::IncomingAsyncBody; -use crate::raw::oio::into_stream; +use crate::raw::oio; use crate::*; /// Multipart is a builder for multipart/form-data. @@ -320,7 +320,7 @@ impl MixedPart { let bs: Bytes = self.content; let length = bs.len(); let body = IncomingAsyncBody::new( - Box::new(into_stream::from_futures_stream(stream::iter(vec![Ok(bs)]))), + Box::new(oio::into_stream(stream::iter(vec![Ok(bs)]))), Some(length as u64), ); diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs index ef4f39c359c1..44cdeb878162 100644 --- a/core/src/raw/oio/mod.rs +++ b/core/src/raw/oio/mod.rs @@ -49,11 +49,11 @@ pub use append::AppendOperation; pub use append::Appender; mod stream; +pub use stream::into_stream; +pub use stream::into_stream_from_reader; pub use stream::Stream; pub use stream::Streamer; -pub mod into_stream; - mod cursor; pub use cursor::Cursor; pub use cursor::VectorCursor; diff --git a/core/src/raw/oio/read/into_read_from_file.rs b/core/src/raw/oio/read/into_read_from_file.rs index 3f647fed01a1..52ebbfec21d3 100644 --- a/core/src/raw/oio/read/into_read_from_file.rs +++ b/core/src/raw/oio/read/into_read_from_file.rs @@ -31,7 +31,7 @@ use futures::AsyncSeek; use crate::raw::*; use crate::*; -/// Convert given fd into [`oio::Reader`]. +/// Convert given file into [`oio::Reader`]. pub fn into_read_from_file(fd: R, start: u64, end: u64) -> FromFileReader { FromFileReader { inner: fd, diff --git a/core/src/raw/oio/stream.rs b/core/src/raw/oio/stream/api.rs similarity index 100% rename from core/src/raw/oio/stream.rs rename to core/src/raw/oio/stream/api.rs diff --git a/core/src/raw/oio/into_stream/from_futures_stream.rs b/core/src/raw/oio/stream/into_stream.rs similarity index 87% rename from core/src/raw/oio/into_stream/from_futures_stream.rs rename to core/src/raw/oio/stream/into_stream.rs index c04379f71f72..2b21974378e3 100644 --- a/core/src/raw/oio/into_stream/from_futures_stream.rs +++ b/core/src/raw/oio/stream/into_stream.rs @@ -25,18 +25,18 @@ use crate::raw::*; use crate::*; /// Convert given futures stream into [`oio::Stream`]. -pub fn from_futures_stream(stream: S) -> FromFuturesStream +pub fn into_stream(stream: S) -> IntoStream where S: futures::Stream> + Send + Sync + Unpin, { - FromFuturesStream { inner: stream } + IntoStream { inner: stream } } -pub struct FromFuturesStream { +pub struct IntoStream { inner: S, } -impl oio::Stream for FromFuturesStream +impl oio::Stream for IntoStream where S: futures::Stream> + Send + Sync + Unpin, { diff --git a/core/src/raw/oio/into_stream/from_futures_reader.rs b/core/src/raw/oio/stream/into_stream_from_reader.rs similarity index 94% rename from core/src/raw/oio/into_stream/from_futures_reader.rs rename to core/src/raw/oio/stream/into_stream_from_reader.rs index 0cdb8c34951d..cead7d11d72d 100644 --- a/core/src/raw/oio/into_stream/from_futures_reader.rs +++ b/core/src/raw/oio/stream/into_stream_from_reader.rs @@ -33,22 +33,22 @@ use crate::*; const DEFAULT_CAPACITY: usize = 64 * 1024; /// Convert given futures reader into [`oio::Stream`]. -pub fn from_futures_reader(r: R) -> FromFuturesReader +pub fn into_stream_from_reader(r: R) -> FromReaderStream where R: AsyncRead + Send + Sync + Unpin, { - FromFuturesReader { + FromReaderStream { inner: Some(r), buf: BytesMut::new(), } } -pub struct FromFuturesReader { +pub struct FromReaderStream { inner: Option, buf: BytesMut, } -impl oio::Stream for FromFuturesReader +impl oio::Stream for FromReaderStream where S: AsyncRead + Send + Sync + Unpin, { diff --git a/core/src/raw/oio/into_stream/mod.rs b/core/src/raw/oio/stream/mod.rs similarity index 75% rename from core/src/raw/oio/into_stream/mod.rs rename to core/src/raw/oio/stream/mod.rs index 8b30b654673d..64798480f42e 100644 --- a/core/src/raw/oio/into_stream/mod.rs +++ b/core/src/raw/oio/stream/mod.rs @@ -15,11 +15,12 @@ // specific language governing permissions and limitations // under the License. -//! into_stream will provide different implementations to convert into -//! [`oio::Stream`][crate::raw::oio::Stream] +mod api; +pub use api::Stream; +pub use api::Streamer; -mod from_futures_stream; -pub use from_futures_stream::from_futures_stream; +mod into_stream_from_reader; +pub use into_stream_from_reader::into_stream_from_reader; -mod from_futures_reader; -pub use from_futures_reader::from_futures_reader; +mod into_stream; +pub use into_stream::into_stream; diff --git a/core/src/services/webhdfs/error.rs b/core/src/services/webhdfs/error.rs index 7964ea38399f..449725e632e3 100644 --- a/core/src/services/webhdfs/error.rs +++ b/core/src/services/webhdfs/error.rs @@ -83,7 +83,6 @@ mod tests { use serde_json::from_reader; use super::*; - use crate::raw::oio::into_stream; /// Error response example from https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Error%20Responses #[tokio::test] @@ -101,9 +100,7 @@ mod tests { "#, ); let body = IncomingAsyncBody::new( - Box::new(into_stream::from_futures_stream(stream::iter(vec![Ok( - ill_args.clone(), - )]))), + Box::new(oio::into_stream(stream::iter(vec![Ok(ill_args.clone())]))), None, ); let resp = Response::builder() diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index c63288e91656..37a9fe72ca98 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -129,9 +129,7 @@ impl Writer { T: Into, { if let State::Idle(Some(w)) = &mut self.state { - let s = Box::new(oio::into_stream::from_futures_stream( - sink_from.map_ok(|v| v.into()), - )); + let s = Box::new(oio::into_stream(sink_from.map_ok(|v| v.into()))); w.sink(size, s).await } else { unreachable!( @@ -176,7 +174,7 @@ impl Writer { R: futures::AsyncRead + Send + Sync + Unpin + 'static, { if let State::Idle(Some(w)) = &mut self.state { - let s = Box::new(oio::into_stream::from_futures_reader(read_from)); + let s = Box::new(oio::into_stream_from_reader(read_from)); w.sink(size, s).await } else { unreachable!( From c35eac5b4cd03516a0e7d55fe36a9e59588a0208 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 24 Jul 2023 14:11:40 +0800 Subject: [PATCH 08/10] Add page Signed-off-by: Xuanwo --- core/src/layers/complete.rs | 22 +++++----- core/src/raw/oio/mod.rs | 42 +++---------------- core/src/raw/oio/{page.rs => page/api.rs} | 2 +- .../into_flat_page.rs} | 12 +++--- .../into_hierarchy_pager.rs} | 14 +++---- core/src/raw/oio/page/mod.rs | 31 ++++++++++++++ core/src/raw/oio/read/mod.rs | 7 +++- 7 files changed, 67 insertions(+), 63 deletions(-) rename core/src/raw/oio/{page.rs => page/api.rs} (99%) rename core/src/raw/oio/{to_flat_pager.rs => page/into_flat_page.rs} (96%) rename core/src/raw/oio/{to_hierarchy_pager.rs => page/into_hierarchy_pager.rs} (95%) create mode 100644 core/src/raw/oio/page/mod.rs diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index fdd8f272757d..20a1dd7b28ca 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -25,13 +25,13 @@ use std::task::Poll; use async_trait::async_trait; use bytes::Bytes; -use crate::raw::oio::to_flat_pager; -use crate::raw::oio::to_hierarchy_pager; +use crate::raw::oio::into_flat_page; +use crate::raw::oio::into_hierarchy_page; use crate::raw::oio::ByRangeSeekableReader; use crate::raw::oio::Entry; +use crate::raw::oio::FlatPager; +use crate::raw::oio::HierarchyPager; use crate::raw::oio::StreamableReader; -use crate::raw::oio::ToFlatPager; -use crate::raw::oio::ToHierarchyPager; use crate::raw::*; use crate::*; @@ -254,7 +254,7 @@ impl CompleteReaderAccessor { let (rp, p) = self.inner.list(path, args).await?; Ok((rp, CompletePager::AlreadyComplete(p))) } else { - let p = to_flat_pager( + let p = into_flat_page( self.inner.clone(), path, args.with_delimiter("/").limit().unwrap_or(1000), @@ -269,7 +269,7 @@ impl CompleteReaderAccessor { Ok((rp, CompletePager::AlreadyComplete(p))) } else { let (_, p) = self.inner.list(path, args.with_delimiter("")).await?; - let p = to_hierarchy_pager(p, path); + let p = into_hierarchy_page(p, path); Ok((RpList::default(), CompletePager::NeedHierarchy(p))) }; } @@ -303,7 +303,7 @@ impl CompleteReaderAccessor { let (rp, p) = self.inner.blocking_list(path, args)?; Ok((rp, CompletePager::AlreadyComplete(p))) } else { - let p = to_flat_pager( + let p = into_flat_page( self.inner.clone(), path, args.with_delimiter("/").limit().unwrap_or(1000), @@ -318,8 +318,8 @@ impl CompleteReaderAccessor { Ok((rp, CompletePager::AlreadyComplete(p))) } else { let (_, p) = self.inner.blocking_list(path, args.with_delimiter(""))?; - let p: ToHierarchyPager<::BlockingPager> = - to_hierarchy_pager(p, path); + let p: HierarchyPager<::BlockingPager> = + into_hierarchy_page(p, path); Ok((RpList::default(), CompletePager::NeedHierarchy(p))) }; } @@ -614,8 +614,8 @@ where pub enum CompletePager { AlreadyComplete(P), - NeedFlat(ToFlatPager, P>), - NeedHierarchy(ToHierarchyPager

), + NeedFlat(FlatPager, P>), + NeedHierarchy(HierarchyPager

), } #[async_trait] diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs index 44cdeb878162..b2839acfb922 100644 --- a/core/src/raw/oio/mod.rs +++ b/core/src/raw/oio/mod.rs @@ -23,36 +23,16 @@ //! our `output` traits. mod read; -pub use read::into_read_from_file; -pub use read::into_seekable_read_by_range; -pub use read::into_streamable_read; -pub use read::BlockingRead; -pub use read::BlockingReader; -pub use read::ByRangeSeekableReader; -pub use read::FromFileReader; -pub use read::Read; -pub use read::ReadExt; -pub use read::ReadOperation; -pub use read::Reader; -pub use read::StreamableReader; +pub use read::*; mod write; -pub use write::BlockingWrite; -pub use write::BlockingWriter; -pub use write::Write; -pub use write::WriteOperation; -pub use write::Writer; +pub use write::*; mod append; -pub use append::Append; -pub use append::AppendOperation; -pub use append::Appender; +pub use append::*; mod stream; -pub use stream::into_stream; -pub use stream::into_stream_from_reader; -pub use stream::Stream; -pub use stream::Streamer; +pub use stream::*; mod cursor; pub use cursor::Cursor; @@ -62,16 +42,4 @@ mod entry; pub use entry::Entry; mod page; -pub use page::BlockingPage; -pub use page::BlockingPager; -pub use page::Page; -pub use page::PageOperation; -pub use page::Pager; - -mod to_flat_pager; -pub use to_flat_pager::to_flat_pager; -pub use to_flat_pager::ToFlatPager; - -mod to_hierarchy_pager; -pub use to_hierarchy_pager::to_hierarchy_pager; -pub use to_hierarchy_pager::ToHierarchyPager; +pub use page::*; diff --git a/core/src/raw/oio/page.rs b/core/src/raw/oio/page/api.rs similarity index 99% rename from core/src/raw/oio/page.rs rename to core/src/raw/oio/page/api.rs index c05ed0f3e097..18bbe4c5dc29 100644 --- a/core/src/raw/oio/page.rs +++ b/core/src/raw/oio/page/api.rs @@ -20,7 +20,7 @@ use std::fmt::Formatter; use async_trait::async_trait; -use super::Entry; +use crate::raw::oio::Entry; use crate::*; /// PageOperation is the name for APIs of pager. diff --git a/core/src/raw/oio/to_flat_pager.rs b/core/src/raw/oio/page/into_flat_page.rs similarity index 96% rename from core/src/raw/oio/to_flat_pager.rs rename to core/src/raw/oio/page/into_flat_page.rs index 01c585e65445..1f2ffb992d01 100644 --- a/core/src/raw/oio/to_flat_pager.rs +++ b/core/src/raw/oio/page/into_flat_page.rs @@ -24,7 +24,7 @@ use crate::raw::*; use crate::*; /// to_flat_pager is used to make a hierarchy pager flat. -pub fn to_flat_pager(acc: A, path: &str, size: usize) -> ToFlatPager { +pub fn into_flat_page(acc: A, path: &str, size: usize) -> FlatPager { #[cfg(debug_assertions)] { let meta = acc.info(); @@ -34,7 +34,7 @@ pub fn to_flat_pager(acc: A, path: &str, size: usize) -> ToFlatP ); } - ToFlatPager { + FlatPager { acc, size, root: path.to_string(), @@ -80,7 +80,7 @@ pub fn to_flat_pager(acc: A, path: &str, size: usize) -> ToFlatP /// Especially, for storage services that can't return dirs first, ToFlatPager /// may output parent dirs' files before nested dirs, this is expected because files /// always output directly while listing. -pub struct ToFlatPager { +pub struct FlatPager { acc: A, size: usize, root: String, @@ -90,7 +90,7 @@ pub struct ToFlatPager { } #[async_trait] -impl oio::Page for ToFlatPager +impl oio::Page for FlatPager where A: Accessor, P: oio::Page, @@ -150,7 +150,7 @@ where } } -impl oio::BlockingPage for ToFlatPager +impl oio::BlockingPage for FlatPager where A: Accessor, P: oio::BlockingPage, @@ -299,7 +299,7 @@ mod tests { let _ = tracing_subscriber::fmt().with_test_writer().try_init(); let acc = MockService::new(); - let mut pager = to_flat_pager(acc, "x/", 10); + let mut pager = into_flat_page(acc, "x/", 10); let mut entries = Vec::default(); diff --git a/core/src/raw/oio/to_hierarchy_pager.rs b/core/src/raw/oio/page/into_hierarchy_pager.rs similarity index 95% rename from core/src/raw/oio/to_hierarchy_pager.rs rename to core/src/raw/oio/page/into_hierarchy_pager.rs index e301155f7619..10a5449d6c1e 100644 --- a/core/src/raw/oio/to_hierarchy_pager.rs +++ b/core/src/raw/oio/page/into_hierarchy_pager.rs @@ -23,14 +23,14 @@ use crate::raw::*; use crate::*; /// to_hierarchy_pager is used to make a hierarchy pager flat. -pub fn to_hierarchy_pager

(pager: P, path: &str) -> ToHierarchyPager

{ +pub fn into_hierarchy_page

(pager: P, path: &str) -> HierarchyPager

{ let path = if path == "/" { "".to_string() } else { path.to_string() }; - ToHierarchyPager { + HierarchyPager { pager, path, visited: HashSet::default(), @@ -47,13 +47,13 @@ pub fn to_hierarchy_pager

(pager: P, path: &str) -> ToHierarchyPager

{ /// returned. /// /// Please keep calling next_page until we returned `Ok(None)` -pub struct ToHierarchyPager

{ +pub struct HierarchyPager

{ pager: P, path: String, visited: HashSet, } -impl

ToHierarchyPager

{ +impl

HierarchyPager

{ /// TODO: use retain_mut instead after we bump MSRV to 1.61. fn filter_entries(&mut self, entries: Vec) -> Vec { entries @@ -117,7 +117,7 @@ impl

ToHierarchyPager

{ } #[async_trait] -impl oio::Page for ToHierarchyPager

{ +impl oio::Page for HierarchyPager

{ async fn next(&mut self) -> Result>> { let page = self.pager.next().await?; @@ -133,7 +133,7 @@ impl oio::Page for ToHierarchyPager

{ } } -impl oio::BlockingPage for ToHierarchyPager

{ +impl oio::BlockingPage for HierarchyPager

{ fn next(&mut self) -> Result>> { let page = self.pager.next()?; @@ -201,7 +201,7 @@ mod tests { let _ = tracing_subscriber::fmt().with_test_writer().try_init(); let pager = MockPager::new(&["x/x/", "x/y/", "y/", "x/x/x", "y/y", "xy/", "z", "y/a"]); - let mut pager = to_hierarchy_pager(pager, ""); + let mut pager = into_hierarchy_page(pager, ""); let mut entries = Vec::default(); diff --git a/core/src/raw/oio/page/mod.rs b/core/src/raw/oio/page/mod.rs new file mode 100644 index 000000000000..26be17451455 --- /dev/null +++ b/core/src/raw/oio/page/mod.rs @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod api; +pub use api::BlockingPage; +pub use api::BlockingPager; +pub use api::Page; +pub use api::PageOperation; +pub use api::Pager; + +mod into_flat_page; +pub use into_flat_page::into_flat_page; +pub use into_flat_page::FlatPager; + +mod into_hierarchy_pager; +pub use into_hierarchy_pager::into_hierarchy_page; +pub use into_hierarchy_pager::HierarchyPager; diff --git a/core/src/raw/oio/read/mod.rs b/core/src/raw/oio/read/mod.rs index 06a7538ed6b1..64466bbdd60c 100644 --- a/core/src/raw/oio/read/mod.rs +++ b/core/src/raw/oio/read/mod.rs @@ -16,7 +16,12 @@ // under the License. mod api; -pub use api::*; +pub use api::BlockingRead; +pub use api::BlockingReader; +pub use api::Read; +pub use api::ReadExt; +pub use api::ReadOperation; +pub use api::Reader; mod into_streamable_read; pub use into_streamable_read::into_streamable_read; From 4fbdfd7284616960c21029ba5318f5b634c4c3f0 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 24 Jul 2023 14:13:34 +0800 Subject: [PATCH 09/10] move write Signed-off-by: Xuanwo --- core/src/raw/oio/mod.rs | 6 +++--- core/src/raw/oio/{write.rs => write/api.rs} | 0 core/src/raw/oio/write/mod.rs | 23 +++++++++++++++++++++ 3 files changed, 26 insertions(+), 3 deletions(-) rename core/src/raw/oio/{write.rs => write/api.rs} (100%) create mode 100644 core/src/raw/oio/write/mod.rs diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs index b2839acfb922..2f76090dc0af 100644 --- a/core/src/raw/oio/mod.rs +++ b/core/src/raw/oio/mod.rs @@ -34,12 +34,12 @@ pub use append::*; mod stream; pub use stream::*; +mod page; +pub use page::*; + mod cursor; pub use cursor::Cursor; pub use cursor::VectorCursor; mod entry; pub use entry::Entry; - -mod page; -pub use page::*; diff --git a/core/src/raw/oio/write.rs b/core/src/raw/oio/write/api.rs similarity index 100% rename from core/src/raw/oio/write.rs rename to core/src/raw/oio/write/api.rs diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs new file mode 100644 index 000000000000..eb809a871de7 --- /dev/null +++ b/core/src/raw/oio/write/mod.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod api; +pub use api::BlockingWrite; +pub use api::BlockingWriter; +pub use api::Write; +pub use api::WriteOperation; +pub use api::Writer; From 80acfd44fda2e7d98f7a2a5b1d0cde9c39267c1e Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 24 Jul 2023 14:16:35 +0800 Subject: [PATCH 10/10] FIx typo Signed-off-by: Xuanwo --- core/src/raw/oio/read/into_seekable_read_by_range.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/raw/oio/read/into_seekable_read_by_range.rs b/core/src/raw/oio/read/into_seekable_read_by_range.rs index 3e76f9b58d3d..a020b883c3cd 100644 --- a/core/src/raw/oio/read/into_seekable_read_by_range.rs +++ b/core/src/raw/oio/read/into_seekable_read_by_range.rs @@ -38,7 +38,7 @@ use crate::*; /// /// # Output /// -/// The output is a reader that can be seeked by range. +/// The output is a reader that can be seek by range. /// /// # Notes ///