diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 0c27b146fef8..20a1dd7b28ca 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -25,13 +25,13 @@ use std::task::Poll; use async_trait::async_trait; use bytes::Bytes; -use crate::raw::oio::into_reader::RangeReader; -use crate::raw::oio::to_flat_pager; -use crate::raw::oio::to_hierarchy_pager; +use crate::raw::oio::into_flat_page; +use crate::raw::oio::into_hierarchy_page; +use crate::raw::oio::ByRangeSeekableReader; use crate::raw::oio::Entry; -use crate::raw::oio::IntoStreamableReader; -use crate::raw::oio::ToFlatPager; -use crate::raw::oio::ToHierarchyPager; +use crate::raw::oio::FlatPager; +use crate::raw::oio::HierarchyPager; +use crate::raw::oio::StreamableReader; use crate::raw::*; use crate::*; @@ -172,7 +172,7 @@ impl CompleteReaderAccessor { match (seekable, streamable) { (true, true) => Ok((rp, CompleteReader::AlreadyComplete(r))), (true, false) => { - let r = oio::into_streamable_reader(r, 256 * 1024); + let r = oio::into_streamable_read(r, 256 * 1024); Ok((rp, CompleteReader::NeedStreamable(r))) } _ => { @@ -193,12 +193,12 @@ impl CompleteReaderAccessor { (offset, size) } }; - let r = oio::into_reader::by_range(self.inner.clone(), path, r, offset, size); + let r = oio::into_seekable_read_by_range(self.inner.clone(), path, r, offset, size); if streamable { Ok((rp, CompleteReader::NeedSeekable(r))) } else { - let r = oio::into_streamable_reader(r, 256 * 1024); + let r = oio::into_streamable_read(r, 256 * 1024); Ok((rp, CompleteReader::NeedBoth(r))) } } @@ -223,7 +223,7 @@ impl CompleteReaderAccessor { match (seekable, streamable) { (true, true) => Ok((rp, CompleteReader::AlreadyComplete(r))), (true, false) => { - let r = oio::into_streamable_reader(r, 256 * 1024); + let r = oio::into_streamable_read(r, 256 * 1024); Ok((rp, CompleteReader::NeedStreamable(r))) } (false, _) => Err(Error::new( @@ -254,7 +254,7 @@ impl CompleteReaderAccessor { let (rp, p) = self.inner.list(path, args).await?; Ok((rp, CompletePager::AlreadyComplete(p))) } else { - let p = to_flat_pager( + let p = into_flat_page( self.inner.clone(), path, args.with_delimiter("/").limit().unwrap_or(1000), @@ -269,7 +269,7 @@ impl CompleteReaderAccessor { Ok((rp, CompletePager::AlreadyComplete(p))) } else { let (_, p) = self.inner.list(path, args.with_delimiter("")).await?; - let p = to_hierarchy_pager(p, path); + let p = into_hierarchy_page(p, path); Ok((RpList::default(), CompletePager::NeedHierarchy(p))) }; } @@ -303,7 +303,7 @@ impl CompleteReaderAccessor { let (rp, p) = self.inner.blocking_list(path, args)?; Ok((rp, CompletePager::AlreadyComplete(p))) } else { - let p = to_flat_pager( + let p = into_flat_page( self.inner.clone(), path, args.with_delimiter("/").limit().unwrap_or(1000), @@ -318,8 +318,8 @@ impl CompleteReaderAccessor { Ok((rp, CompletePager::AlreadyComplete(p))) } else { let (_, p) = self.inner.blocking_list(path, args.with_delimiter(""))?; - let p: ToHierarchyPager<::BlockingPager> = - to_hierarchy_pager(p, path); + let p: HierarchyPager<::BlockingPager> = + into_hierarchy_page(p, path); Ok((RpList::default(), CompletePager::NeedHierarchy(p))) }; } @@ -532,9 +532,9 @@ impl LayeredAccessor for CompleteReaderAccessor { pub enum CompleteReader { AlreadyComplete(R), - NeedSeekable(RangeReader), - NeedStreamable(IntoStreamableReader), - NeedBoth(IntoStreamableReader>), + NeedSeekable(ByRangeSeekableReader), + NeedStreamable(StreamableReader), + NeedBoth(StreamableReader>), } impl oio::Read for CompleteReader @@ -614,8 +614,8 @@ where pub enum CompletePager { AlreadyComplete(P), - NeedFlat(ToFlatPager, P>), - NeedHierarchy(ToHierarchyPager

), + NeedFlat(FlatPager, P>), + NeedHierarchy(HierarchyPager

), } #[async_trait] diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index 75137122a5c7..9578af9ce52e 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -27,7 +27,7 @@ use http::Response; use super::body::IncomingAsyncBody; use super::parse_content_length; use super::AsyncBody; -use crate::raw::oio::into_stream; +use crate::raw::*; use crate::Error; use crate::ErrorKind; use crate::Result; @@ -150,10 +150,7 @@ impl HttpClient { .set_source(err) }); - let body = IncomingAsyncBody::new( - Box::new(into_stream::from_futures_stream(stream)), - content_length, - ); + let body = IncomingAsyncBody::new(Box::new(oio::into_stream(stream)), content_length); let resp = hr.body(body).expect("response must build succeed"); diff --git a/core/src/raw/http_util/multipart.rs b/core/src/raw/http_util/multipart.rs index d9c932cdea62..1b64de3d0dc1 100644 --- a/core/src/raw/http_util/multipart.rs +++ b/core/src/raw/http_util/multipart.rs @@ -38,7 +38,7 @@ use http::Version; use super::new_request_build_error; use super::AsyncBody; use super::IncomingAsyncBody; -use crate::raw::oio::into_stream; +use crate::raw::oio; use crate::*; /// Multipart is a builder for multipart/form-data. @@ -320,7 +320,7 @@ impl MixedPart { let bs: Bytes = self.content; let length = bs.len(); let body = IncomingAsyncBody::new( - Box::new(into_stream::from_futures_stream(stream::iter(vec![Ok(bs)]))), + Box::new(oio::into_stream(stream::iter(vec![Ok(bs)]))), Some(length as u64), ); diff --git a/core/src/raw/mod.rs b/core/src/raw/mod.rs index e7c2bbcfc3da..34638b9ff9aa 100644 --- a/core/src/raw/mod.rs +++ b/core/src/raw/mod.rs @@ -27,28 +27,16 @@ //! them whenever possible. mod accessor; -pub use accessor::Accessor; -pub use accessor::AccessorInfo; -pub use accessor::FusedAccessor; +pub use accessor::*; mod layer; -pub use layer::Layer; -pub use layer::LayeredAccessor; - -pub mod oio; +pub use layer::*; mod path; -pub use path::build_abs_path; -pub use path::build_rel_path; -pub use path::build_rooted_abs_path; -pub use path::get_basename; -pub use path::get_parent; -pub use path::normalize_path; -pub use path::normalize_root; -pub use path::validate_path; +pub use path::*; mod operation; -pub use operation::Operation; +pub use operation::*; mod version; pub use version::VERSION; @@ -70,3 +58,4 @@ pub use chrono_util::*; // Expose as a pub mod to avoid confusing. pub mod adapters; +pub mod oio; diff --git a/core/src/raw/oio/append.rs b/core/src/raw/oio/append/api.rs similarity index 100% rename from core/src/raw/oio/append.rs rename to core/src/raw/oio/append/api.rs diff --git a/core/src/raw/oio/into_stream/mod.rs b/core/src/raw/oio/append/mod.rs similarity index 74% rename from core/src/raw/oio/into_stream/mod.rs rename to core/src/raw/oio/append/mod.rs index 8b30b654673d..defb32e6b568 100644 --- a/core/src/raw/oio/into_stream/mod.rs +++ b/core/src/raw/oio/append/mod.rs @@ -15,11 +15,5 @@ // specific language governing permissions and limitations // under the License. -//! into_stream will provide different implementations to convert into -//! [`oio::Stream`][crate::raw::oio::Stream] - -mod from_futures_stream; -pub use from_futures_stream::from_futures_stream; - -mod from_futures_reader; -pub use from_futures_reader::from_futures_reader; +mod api; +pub use api::*; diff --git a/core/src/raw/oio/into_blocking_reader/from_fd.rs b/core/src/raw/oio/into_blocking_reader/from_fd.rs deleted file mode 100644 index ac01b33e7562..000000000000 --- a/core/src/raw/oio/into_blocking_reader/from_fd.rs +++ /dev/null @@ -1,119 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::cmp; -use std::io::Read; -use std::io::Seek; -use std::io::SeekFrom; - -use bytes::Bytes; - -use crate::raw::*; -use crate::*; - -/// Convert given fd into [`oio::BlockingRead`]. -pub fn from_fd(fd: R, start: u64, end: u64) -> FdReader -where - R: Read + Seek + Send + Sync, -{ - FdReader { - inner: fd, - start, - end, - offset: 0, - } -} - -/// FdReader is a wrapper of input fd to implement [`oio::Read`]. -pub struct FdReader { - inner: R, - - start: u64, - end: u64, - offset: u64, -} - -impl FdReader -where - R: Read + Seek + Send + Sync, -{ - #[inline] - pub(crate) fn current_size(&self) -> i64 { - debug_assert!(self.offset >= self.start, "offset must in range"); - self.end as i64 - self.offset as i64 - } -} - -impl oio::BlockingRead for FdReader -where - R: Read + Seek + Send + Sync + 'static, -{ - fn read(&mut self, buf: &mut [u8]) -> Result { - if self.current_size() <= 0 { - return Ok(0); - } - - let max = cmp::min(buf.len() as u64, self.current_size() as u64) as usize; - // TODO: we can use pread instead. - let n = self.inner.read(&mut buf[..max]).map_err(|err| { - Error::new(ErrorKind::Unexpected, "read data from FdReader") - .with_context("source", "FdReader") - .set_source(err) - })?; - self.offset += n as u64; - Ok(n) - } - - /// TODO: maybe we don't need to do seek really, just call pread instead. - /// - /// We need to wait for tokio's pread support. - fn seek(&mut self, pos: SeekFrom) -> Result { - let (base, offset) = match pos { - SeekFrom::Start(n) => (self.start as i64, n as i64), - SeekFrom::End(n) => (self.end as i64, n), - SeekFrom::Current(n) => (self.offset as i64, n), - }; - - match base.checked_add(offset) { - Some(n) if n < 0 => Err(Error::new( - ErrorKind::InvalidInput, - "invalid seek to a negative or overflowing position", - )), - Some(n) => { - let cur = self.inner.seek(SeekFrom::Start(n as u64)).map_err(|err| { - Error::new(ErrorKind::Unexpected, "seek data from FdReader") - .with_context("source", "FdReader") - .set_source(err) - })?; - - self.offset = cur; - Ok(self.offset - self.start) - } - None => Err(Error::new( - ErrorKind::InvalidInput, - "invalid seek to a negative or overflowing position", - )), - } - } - - fn next(&mut self) -> Option> { - Some(Err(Error::new( - ErrorKind::Unsupported, - "output reader doesn't support iterating", - ))) - } -} diff --git a/core/src/raw/oio/into_reader/mod.rs b/core/src/raw/oio/into_reader/mod.rs deleted file mode 100644 index 609032327d6a..000000000000 --- a/core/src/raw/oio/into_reader/mod.rs +++ /dev/null @@ -1,46 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! into_reader will provide different implementations to convert into -//! [`oio::Read`][crate::raw::oio::Read] -//! -//! - (Some(offset), Some(size)) => by_range -//! - (Some(offset), None) => by_offset -//! - (None, Some(size)) => by_tail -//! - (None, None) => by_offset -//! -//! The main different is whether and when to call `stat` to get total -//! content length. -//! -//! # TODO -//! -//! We only implement by_range so far. -//! -//! We should implement other types so that they can be zero cost on non-seek -//! cases. -//! -//! For example, for `by_full`, we don't need to do `stat` everytime. If -//! user call `poll_read` first, we can get the total_size from returning -//! reader. In this way, we can save 40ms in average for every s3 read call. - -mod by_range; -pub use by_range::by_range; -pub use by_range::RangeReader; - -mod from_fd; -pub use from_fd::from_fd; -pub use from_fd::FdReader; diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs index fb1d31efda82..2f76090dc0af 100644 --- a/core/src/raw/oio/mod.rs +++ b/core/src/raw/oio/mod.rs @@ -23,56 +23,23 @@ //! our `output` traits. mod read; -pub use read::BlockingRead; -pub use read::BlockingReader; -pub use read::Read; -pub use read::ReadExt; -pub use read::ReadOperation; -pub use read::Reader; - -pub mod into_blocking_reader; -pub mod into_reader; +pub use read::*; mod write; -pub use write::BlockingWrite; -pub use write::BlockingWriter; -pub use write::Write; -pub use write::WriteOperation; -pub use write::Writer; +pub use write::*; mod append; -pub use append::Append; -pub use append::AppendOperation; -pub use append::Appender; +pub use append::*; mod stream; -pub use stream::Stream; -pub use stream::Streamer; +pub use stream::*; -pub mod into_stream; +mod page; +pub use page::*; mod cursor; pub use cursor::Cursor; pub use cursor::VectorCursor; -mod into_streamable; -pub use into_streamable::into_streamable_reader; -pub use into_streamable::IntoStreamableReader; - mod entry; pub use entry::Entry; - -mod page; -pub use page::BlockingPage; -pub use page::BlockingPager; -pub use page::Page; -pub use page::PageOperation; -pub use page::Pager; - -mod to_flat_pager; -pub use to_flat_pager::to_flat_pager; -pub use to_flat_pager::ToFlatPager; - -mod to_hierarchy_pager; -pub use to_hierarchy_pager::to_hierarchy_pager; -pub use to_hierarchy_pager::ToHierarchyPager; diff --git a/core/src/raw/oio/page.rs b/core/src/raw/oio/page/api.rs similarity index 99% rename from core/src/raw/oio/page.rs rename to core/src/raw/oio/page/api.rs index c05ed0f3e097..18bbe4c5dc29 100644 --- a/core/src/raw/oio/page.rs +++ b/core/src/raw/oio/page/api.rs @@ -20,7 +20,7 @@ use std::fmt::Formatter; use async_trait::async_trait; -use super::Entry; +use crate::raw::oio::Entry; use crate::*; /// PageOperation is the name for APIs of pager. diff --git a/core/src/raw/oio/to_flat_pager.rs b/core/src/raw/oio/page/into_flat_page.rs similarity index 96% rename from core/src/raw/oio/to_flat_pager.rs rename to core/src/raw/oio/page/into_flat_page.rs index 01c585e65445..1f2ffb992d01 100644 --- a/core/src/raw/oio/to_flat_pager.rs +++ b/core/src/raw/oio/page/into_flat_page.rs @@ -24,7 +24,7 @@ use crate::raw::*; use crate::*; /// to_flat_pager is used to make a hierarchy pager flat. -pub fn to_flat_pager(acc: A, path: &str, size: usize) -> ToFlatPager { +pub fn into_flat_page(acc: A, path: &str, size: usize) -> FlatPager { #[cfg(debug_assertions)] { let meta = acc.info(); @@ -34,7 +34,7 @@ pub fn to_flat_pager(acc: A, path: &str, size: usize) -> ToFlatP ); } - ToFlatPager { + FlatPager { acc, size, root: path.to_string(), @@ -80,7 +80,7 @@ pub fn to_flat_pager(acc: A, path: &str, size: usize) -> ToFlatP /// Especially, for storage services that can't return dirs first, ToFlatPager /// may output parent dirs' files before nested dirs, this is expected because files /// always output directly while listing. -pub struct ToFlatPager { +pub struct FlatPager { acc: A, size: usize, root: String, @@ -90,7 +90,7 @@ pub struct ToFlatPager { } #[async_trait] -impl oio::Page for ToFlatPager +impl oio::Page for FlatPager where A: Accessor, P: oio::Page, @@ -150,7 +150,7 @@ where } } -impl oio::BlockingPage for ToFlatPager +impl oio::BlockingPage for FlatPager where A: Accessor, P: oio::BlockingPage, @@ -299,7 +299,7 @@ mod tests { let _ = tracing_subscriber::fmt().with_test_writer().try_init(); let acc = MockService::new(); - let mut pager = to_flat_pager(acc, "x/", 10); + let mut pager = into_flat_page(acc, "x/", 10); let mut entries = Vec::default(); diff --git a/core/src/raw/oio/to_hierarchy_pager.rs b/core/src/raw/oio/page/into_hierarchy_pager.rs similarity index 95% rename from core/src/raw/oio/to_hierarchy_pager.rs rename to core/src/raw/oio/page/into_hierarchy_pager.rs index e301155f7619..10a5449d6c1e 100644 --- a/core/src/raw/oio/to_hierarchy_pager.rs +++ b/core/src/raw/oio/page/into_hierarchy_pager.rs @@ -23,14 +23,14 @@ use crate::raw::*; use crate::*; /// to_hierarchy_pager is used to make a hierarchy pager flat. -pub fn to_hierarchy_pager

(pager: P, path: &str) -> ToHierarchyPager

{ +pub fn into_hierarchy_page

(pager: P, path: &str) -> HierarchyPager

{ let path = if path == "/" { "".to_string() } else { path.to_string() }; - ToHierarchyPager { + HierarchyPager { pager, path, visited: HashSet::default(), @@ -47,13 +47,13 @@ pub fn to_hierarchy_pager

(pager: P, path: &str) -> ToHierarchyPager

{ /// returned. /// /// Please keep calling next_page until we returned `Ok(None)` -pub struct ToHierarchyPager

{ +pub struct HierarchyPager

{ pager: P, path: String, visited: HashSet, } -impl

ToHierarchyPager

{ +impl

HierarchyPager

{ /// TODO: use retain_mut instead after we bump MSRV to 1.61. fn filter_entries(&mut self, entries: Vec) -> Vec { entries @@ -117,7 +117,7 @@ impl

ToHierarchyPager

{ } #[async_trait] -impl oio::Page for ToHierarchyPager

{ +impl oio::Page for HierarchyPager

{ async fn next(&mut self) -> Result>> { let page = self.pager.next().await?; @@ -133,7 +133,7 @@ impl oio::Page for ToHierarchyPager

{ } } -impl oio::BlockingPage for ToHierarchyPager

{ +impl oio::BlockingPage for HierarchyPager

{ fn next(&mut self) -> Result>> { let page = self.pager.next()?; @@ -201,7 +201,7 @@ mod tests { let _ = tracing_subscriber::fmt().with_test_writer().try_init(); let pager = MockPager::new(&["x/x/", "x/y/", "y/", "x/x/x", "y/y", "xy/", "z", "y/a"]); - let mut pager = to_hierarchy_pager(pager, ""); + let mut pager = into_hierarchy_page(pager, ""); let mut entries = Vec::default(); diff --git a/core/src/raw/oio/page/mod.rs b/core/src/raw/oio/page/mod.rs new file mode 100644 index 000000000000..26be17451455 --- /dev/null +++ b/core/src/raw/oio/page/mod.rs @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod api; +pub use api::BlockingPage; +pub use api::BlockingPager; +pub use api::Page; +pub use api::PageOperation; +pub use api::Pager; + +mod into_flat_page; +pub use into_flat_page::into_flat_page; +pub use into_flat_page::FlatPager; + +mod into_hierarchy_pager; +pub use into_hierarchy_pager::into_hierarchy_page; +pub use into_hierarchy_pager::HierarchyPager; diff --git a/core/src/raw/oio/read.rs b/core/src/raw/oio/read/api.rs similarity index 95% rename from core/src/raw/oio/read.rs rename to core/src/raw/oio/read/api.rs index e3bce7a06585..9bde2b8018ac 100644 --- a/core/src/raw/oio/read.rs +++ b/core/src/raw/oio/read/api.rs @@ -149,17 +149,6 @@ impl Read for Box { } } -fn convert_to_io_error(err: Error) -> io::Error { - let kind = match err.kind() { - ErrorKind::NotFound => io::ErrorKind::NotFound, - ErrorKind::PermissionDenied => io::ErrorKind::PermissionDenied, - ErrorKind::InvalidInput => io::ErrorKind::InvalidInput, - _ => io::ErrorKind::Interrupted, - }; - - io::Error::new(kind, err) -} - impl futures::AsyncRead for dyn Read { fn poll_read( mut self: Pin<&mut Self>, @@ -167,7 +156,7 @@ impl futures::AsyncRead for dyn Read { buf: &mut [u8], ) -> Poll> { let this: &mut dyn Read = &mut *self; - this.poll_read(cx, buf).map_err(convert_to_io_error) + this.poll_read(cx, buf).map_err(format_io_error) } } @@ -178,7 +167,7 @@ impl futures::AsyncSeek for dyn Read { pos: io::SeekFrom, ) -> Poll> { let this: &mut dyn Read = &mut *self; - this.poll_seek(cx, pos).map_err(convert_to_io_error) + this.poll_seek(cx, pos).map_err(format_io_error) } } @@ -359,7 +348,7 @@ impl io::Read for dyn BlockingRead { #[inline] fn read(&mut self, buf: &mut [u8]) -> io::Result { let this: &mut dyn BlockingRead = &mut *self; - this.read(buf).map_err(convert_to_io_error) + this.read(buf).map_err(format_io_error) } } @@ -367,7 +356,7 @@ impl io::Seek for dyn BlockingRead { #[inline] fn seek(&mut self, pos: io::SeekFrom) -> io::Result { let this: &mut dyn BlockingRead = &mut *self; - this.seek(pos).map_err(convert_to_io_error) + this.seek(pos).map_err(format_io_error) } } @@ -380,3 +369,20 @@ impl Iterator for dyn BlockingRead { this.next() } } + +/// helper functions to format `Error` into `io::Error`. +/// +/// This function is added privately by design and only valid in current +/// context (i.e. `oio` crate). We don't want to expose this function to +/// users. +#[inline] +fn format_io_error(err: Error) -> io::Error { + let kind = match err.kind() { + ErrorKind::NotFound => io::ErrorKind::NotFound, + ErrorKind::PermissionDenied => io::ErrorKind::PermissionDenied, + ErrorKind::InvalidInput => io::ErrorKind::InvalidInput, + _ => io::ErrorKind::Interrupted, + }; + + io::Error::new(kind, err) +} diff --git a/core/src/raw/oio/into_reader/from_fd.rs b/core/src/raw/oio/read/into_read_from_file.rs similarity index 61% rename from core/src/raw/oio/into_reader/from_fd.rs rename to core/src/raw/oio/read/into_read_from_file.rs index 1c3e02d5519c..52ebbfec21d3 100644 --- a/core/src/raw/oio/into_reader/from_fd.rs +++ b/core/src/raw/oio/read/into_read_from_file.rs @@ -16,24 +16,24 @@ // under the License. use std::cmp; +use std::io::Read; +use std::io::Seek; use std::io::SeekFrom; use std::pin::Pin; use std::task::ready; use std::task::Context; use std::task::Poll; +use bytes::Bytes; use futures::AsyncRead; use futures::AsyncSeek; use crate::raw::*; use crate::*; -/// Convert given fd into [`oio::Reader`]. -pub fn from_fd(fd: R, start: u64, end: u64) -> FdReader -where - R: AsyncRead + AsyncSeek + Unpin + Send + Sync, -{ - FdReader { +/// Convert given file into [`oio::Reader`]. +pub fn into_read_from_file(fd: R, start: u64, end: u64) -> FromFileReader { + FromFileReader { inner: fd, start, end, @@ -42,7 +42,7 @@ where } /// FdReader is a wrapper of input fd to implement [`oio::Read`]. -pub struct FdReader { +pub struct FromFileReader { inner: R, start: u64, @@ -50,17 +50,14 @@ pub struct FdReader { offset: u64, } -impl FdReader -where - R: AsyncRead + AsyncSeek + Unpin + Send + Sync, -{ +impl FromFileReader { pub(crate) fn current_size(&self) -> i64 { debug_assert!(self.offset >= self.start, "offset must in range"); self.end as i64 - self.offset as i64 } } -impl oio::Read for FdReader +impl oio::Read for FromFileReader where R: AsyncRead + AsyncSeek + Unpin + Send + Sync, { @@ -115,7 +112,7 @@ where } } - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { let _ = cx; Poll::Ready(Some(Err(Error::new( @@ -124,3 +121,63 @@ where )))) } } + +impl oio::BlockingRead for FromFileReader +where + R: Read + Seek + Send + Sync + 'static, +{ + fn read(&mut self, buf: &mut [u8]) -> Result { + if self.current_size() <= 0 { + return Ok(0); + } + + let max = cmp::min(buf.len() as u64, self.current_size() as u64) as usize; + // TODO: we can use pread instead. + let n = self.inner.read(&mut buf[..max]).map_err(|err| { + Error::new(ErrorKind::Unexpected, "read data from FdReader") + .with_context("source", "FdReader") + .set_source(err) + })?; + self.offset += n as u64; + Ok(n) + } + + /// TODO: maybe we don't need to do seek really, just call pread instead. + /// + /// We need to wait for tokio's pread support. + fn seek(&mut self, pos: SeekFrom) -> Result { + let (base, offset) = match pos { + SeekFrom::Start(n) => (self.start as i64, n as i64), + SeekFrom::End(n) => (self.end as i64, n), + SeekFrom::Current(n) => (self.offset as i64, n), + }; + + match base.checked_add(offset) { + Some(n) if n < 0 => Err(Error::new( + ErrorKind::InvalidInput, + "invalid seek to a negative or overflowing position", + )), + Some(n) => { + let cur = self.inner.seek(SeekFrom::Start(n as u64)).map_err(|err| { + Error::new(ErrorKind::Unexpected, "seek data from FdReader") + .with_context("source", "FdReader") + .set_source(err) + })?; + + self.offset = cur; + Ok(self.offset - self.start) + } + None => Err(Error::new( + ErrorKind::InvalidInput, + "invalid seek to a negative or overflowing position", + )), + } + } + + fn next(&mut self) -> Option> { + Some(Err(Error::new( + ErrorKind::Unsupported, + "output reader doesn't support iterating", + ))) + } +} diff --git a/core/src/raw/oio/into_reader/by_range.rs b/core/src/raw/oio/read/into_seekable_read_by_range.rs similarity index 95% rename from core/src/raw/oio/into_reader/by_range.rs rename to core/src/raw/oio/read/into_seekable_read_by_range.rs index cb761523bda8..a020b883c3cd 100644 --- a/core/src/raw/oio/into_reader/by_range.rs +++ b/core/src/raw/oio/read/into_seekable_read_by_range.rs @@ -32,18 +32,26 @@ use crate::*; /// Convert given reader into [`oio::Reader`] by range. /// +/// # Input +/// +/// The input is an Accessor will may return a non-seekable reader. +/// +/// # Output +/// +/// The output is a reader that can be seek by range. +/// /// # Notes /// /// This operation is not zero cost. If the accessor already returns a /// seekable reader, please don't use this. -pub fn by_range( +pub fn into_seekable_read_by_range( acc: Arc, path: &str, reader: A::Reader, offset: u64, size: u64, -) -> RangeReader { - RangeReader { +) -> ByRangeSeekableReader { + ByRangeSeekableReader { acc, path: path.to_string(), offset, @@ -55,8 +63,8 @@ pub fn by_range( } } -/// RangeReader that can do seek on non-seekable reader. -pub struct RangeReader { +/// ByRangeReader that can do seek on non-seekable reader. +pub struct ByRangeSeekableReader { acc: Arc, path: String, @@ -84,7 +92,7 @@ enum State { /// Safety: State will only be accessed under &mut. unsafe impl Sync for State {} -impl RangeReader { +impl ByRangeSeekableReader { fn read_future(&self) -> BoxFuture<'static, Result<(RpRead, A::Reader)>> { let acc = self.acc.clone(); let path = self.path.clone(); @@ -123,7 +131,7 @@ impl RangeReader { } } -impl oio::Read for RangeReader { +impl oio::Read for ByRangeSeekableReader { fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { match &mut self.state { State::Idle => { @@ -387,7 +395,8 @@ mod tests { let r = MockReader { inner: futures::io::Cursor::new(bs.to_vec()), }; - let mut r = Box::new(by_range(acc, "x", r, 0, bs.len() as u64)) as oio::Reader; + let mut r = + Box::new(into_seekable_read_by_range(acc, "x", r, 0, bs.len() as u64)) as oio::Reader; let mut buf = Vec::new(); r.read_to_end(&mut buf).await?; @@ -421,7 +430,7 @@ mod tests { let r = MockReader { inner: futures::io::Cursor::new(bs[4096..4096 + 4096].to_vec()), }; - let mut r = Box::new(by_range(acc, "x", r, 4096, 4096)) as oio::Reader; + let mut r = Box::new(into_seekable_read_by_range(acc, "x", r, 4096, 4096)) as oio::Reader; let mut buf = Vec::new(); r.read_to_end(&mut buf).await?; diff --git a/core/src/raw/oio/into_streamable.rs b/core/src/raw/oio/read/into_streamable_read.rs similarity index 88% rename from core/src/raw/oio/into_streamable.rs rename to core/src/raw/oio/read/into_streamable_read.rs index 50af7d975e59..c3f90e24d585 100644 --- a/core/src/raw/oio/into_streamable.rs +++ b/core/src/raw/oio/read/into_streamable_read.rs @@ -26,9 +26,9 @@ use tokio::io::ReadBuf; use crate::raw::*; use crate::*; -/// as_streamable is used to make [`oio::Read`] or [`oio::BlockingRead`] streamable. -pub fn into_streamable_reader(r: R, capacity: usize) -> IntoStreamableReader { - IntoStreamableReader { +/// into_streamable is used to make [`oio::Read`] or [`oio::BlockingRead`] streamable. +pub fn into_streamable_read(r: R, capacity: usize) -> StreamableReader { + StreamableReader { r, cap: capacity, buf: Vec::with_capacity(capacity), @@ -36,13 +36,13 @@ pub fn into_streamable_reader(r: R, capacity: usize) -> IntoStreamableReader< } /// Make given read streamable. -pub struct IntoStreamableReader { +pub struct StreamableReader { r: R, cap: usize, buf: Vec, } -impl oio::Read for IntoStreamableReader { +impl oio::Read for StreamableReader { fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { self.r.poll_read(cx, buf) } @@ -67,7 +67,7 @@ impl oio::Read for IntoStreamableReader { } } -impl oio::BlockingRead for IntoStreamableReader { +impl oio::BlockingRead for StreamableReader { fn read(&mut self, buf: &mut [u8]) -> Result { self.r.read(buf) } @@ -113,7 +113,7 @@ mod tests { let cap = rng.gen_range(1..1024 * 1024); let r = oio::Cursor::from(content.clone()); - let mut s = into_streamable_reader(Box::new(r) as oio::Reader, cap); + let mut s = into_streamable_read(Box::new(r) as oio::Reader, cap); let mut bs = BytesMut::new(); while let Some(b) = s.next().await { @@ -136,7 +136,7 @@ mod tests { let cap = rng.gen_range(1..1024 * 1024); let r = oio::Cursor::from(content.clone()); - let mut s = into_streamable_reader(Box::new(r) as oio::BlockingReader, cap); + let mut s = into_streamable_read(Box::new(r) as oio::BlockingReader, cap); let mut bs = BytesMut::new(); while let Some(b) = s.next() { diff --git a/core/src/raw/oio/read/mod.rs b/core/src/raw/oio/read/mod.rs new file mode 100644 index 000000000000..64466bbdd60c --- /dev/null +++ b/core/src/raw/oio/read/mod.rs @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod api; +pub use api::BlockingRead; +pub use api::BlockingReader; +pub use api::Read; +pub use api::ReadExt; +pub use api::ReadOperation; +pub use api::Reader; + +mod into_streamable_read; +pub use into_streamable_read::into_streamable_read; +pub use into_streamable_read::StreamableReader; + +mod into_seekable_read_by_range; +pub use into_seekable_read_by_range::into_seekable_read_by_range; +pub use into_seekable_read_by_range::ByRangeSeekableReader; + +mod into_read_from_file; +pub use into_read_from_file::into_read_from_file; +pub use into_read_from_file::FromFileReader; diff --git a/core/src/raw/oio/stream.rs b/core/src/raw/oio/stream/api.rs similarity index 100% rename from core/src/raw/oio/stream.rs rename to core/src/raw/oio/stream/api.rs diff --git a/core/src/raw/oio/into_stream/from_futures_stream.rs b/core/src/raw/oio/stream/into_stream.rs similarity index 87% rename from core/src/raw/oio/into_stream/from_futures_stream.rs rename to core/src/raw/oio/stream/into_stream.rs index c04379f71f72..2b21974378e3 100644 --- a/core/src/raw/oio/into_stream/from_futures_stream.rs +++ b/core/src/raw/oio/stream/into_stream.rs @@ -25,18 +25,18 @@ use crate::raw::*; use crate::*; /// Convert given futures stream into [`oio::Stream`]. -pub fn from_futures_stream(stream: S) -> FromFuturesStream +pub fn into_stream(stream: S) -> IntoStream where S: futures::Stream> + Send + Sync + Unpin, { - FromFuturesStream { inner: stream } + IntoStream { inner: stream } } -pub struct FromFuturesStream { +pub struct IntoStream { inner: S, } -impl oio::Stream for FromFuturesStream +impl oio::Stream for IntoStream where S: futures::Stream> + Send + Sync + Unpin, { diff --git a/core/src/raw/oio/into_stream/from_futures_reader.rs b/core/src/raw/oio/stream/into_stream_from_reader.rs similarity index 94% rename from core/src/raw/oio/into_stream/from_futures_reader.rs rename to core/src/raw/oio/stream/into_stream_from_reader.rs index 0cdb8c34951d..cead7d11d72d 100644 --- a/core/src/raw/oio/into_stream/from_futures_reader.rs +++ b/core/src/raw/oio/stream/into_stream_from_reader.rs @@ -33,22 +33,22 @@ use crate::*; const DEFAULT_CAPACITY: usize = 64 * 1024; /// Convert given futures reader into [`oio::Stream`]. -pub fn from_futures_reader(r: R) -> FromFuturesReader +pub fn into_stream_from_reader(r: R) -> FromReaderStream where R: AsyncRead + Send + Sync + Unpin, { - FromFuturesReader { + FromReaderStream { inner: Some(r), buf: BytesMut::new(), } } -pub struct FromFuturesReader { +pub struct FromReaderStream { inner: Option, buf: BytesMut, } -impl oio::Stream for FromFuturesReader +impl oio::Stream for FromReaderStream where S: AsyncRead + Send + Sync + Unpin, { diff --git a/core/src/raw/oio/into_blocking_reader/mod.rs b/core/src/raw/oio/stream/mod.rs similarity index 79% rename from core/src/raw/oio/into_blocking_reader/mod.rs rename to core/src/raw/oio/stream/mod.rs index 6b476a2133ec..64798480f42e 100644 --- a/core/src/raw/oio/into_blocking_reader/mod.rs +++ b/core/src/raw/oio/stream/mod.rs @@ -15,9 +15,12 @@ // specific language governing permissions and limitations // under the License. -//! into_blocking_reader will provide different implementations to convert -//! into [`oio::BlockingRead`][crate::raw::oio::BlockingRead] +mod api; +pub use api::Stream; +pub use api::Streamer; -mod from_fd; -pub use from_fd::from_fd; -pub use from_fd::FdReader; +mod into_stream_from_reader; +pub use into_stream_from_reader::into_stream_from_reader; + +mod into_stream; +pub use into_stream::into_stream; diff --git a/core/src/raw/oio/write.rs b/core/src/raw/oio/write/api.rs similarity index 100% rename from core/src/raw/oio/write.rs rename to core/src/raw/oio/write/api.rs diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs new file mode 100644 index 000000000000..eb809a871de7 --- /dev/null +++ b/core/src/raw/oio/write/mod.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod api; +pub use api::BlockingWrite; +pub use api::BlockingWriter; +pub use api::Write; +pub use api::WriteOperation; +pub use api::Writer; diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index 505fda60d28a..53da83e50dad 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -244,8 +244,8 @@ impl FsBackend { #[async_trait] impl Accessor for FsBackend { - type Reader = oio::into_reader::FdReader>; - type BlockingReader = oio::into_blocking_reader::FdReader; + type Reader = oio::FromFileReader>; + type BlockingReader = oio::FromFileReader; type Writer = FsWriter; type BlockingWriter = FsWriter; type Appender = FsAppender; @@ -358,7 +358,7 @@ impl Accessor for FsBackend { (None, None) => (0, total_length), }; - let mut r = oio::into_reader::from_fd(f, start, end); + let mut r = oio::into_read_from_file(f, start, end); // Rewind to make sure we are on the correct offset. r.seek(SeekFrom::Start(0)).await?; @@ -558,7 +558,7 @@ impl Accessor for FsBackend { (None, None) => (0, total_length), }; - let mut r = oio::into_blocking_reader::from_fd(f, start, end); + let mut r: oio::FromFileReader = oio::into_read_from_file(f, start, end); // Rewind to make sure we are on the correct offset. r.seek(SeekFrom::Start(0))?; diff --git a/core/src/services/hdfs/backend.rs b/core/src/services/hdfs/backend.rs index 27e9138cccc7..0a6a255fbbf3 100644 --- a/core/src/services/hdfs/backend.rs +++ b/core/src/services/hdfs/backend.rs @@ -161,8 +161,8 @@ unsafe impl Sync for HdfsBackend {} #[async_trait] impl Accessor for HdfsBackend { - type Reader = oio::into_reader::FdReader; - type BlockingReader = oio::into_blocking_reader::FdReader; + type Reader = oio::FromFileReader; + type BlockingReader = oio::FromFileReader; type Writer = HdfsWriter; type BlockingWriter = HdfsWriter; type Appender = HdfsAppender; @@ -231,7 +231,7 @@ impl Accessor for HdfsBackend { (None, None) => (0, meta.len()), }; - let mut r = oio::into_reader::from_fd(f, start, end); + let mut r = oio::into_read_from_file(f, start, end); // Rewind to make sure we are on the correct offset. r.seek(SeekFrom::Start(0)).await?; @@ -397,7 +397,7 @@ impl Accessor for HdfsBackend { (None, None) => (0, meta.len()), }; - let mut r = oio::into_blocking_reader::from_fd(f, start, end); + let mut r = oio::into_read_from_file(f, start, end); // Rewind to make sure we are on the correct offset. r.seek(SeekFrom::Start(0))?; diff --git a/core/src/services/sftp/utils.rs b/core/src/services/sftp/utils.rs index 303aee70946e..fc8bb8a6f64c 100644 --- a/core/src/services/sftp/utils.rs +++ b/core/src/services/sftp/utils.rs @@ -29,7 +29,7 @@ use openssh_sftp_client::file::TokioCompatFile; use openssh_sftp_client::metadata::MetaData as SftpMeta; use crate::raw::oio; -use crate::raw::oio::into_reader::FdReader; +use crate::raw::oio::FromFileReader; use crate::raw::oio::ReadExt; use crate::EntryMode; use crate::Metadata; @@ -38,7 +38,7 @@ use crate::Result; pub struct SftpReaderInner { file: Pin>>, } -pub type SftpReader = FdReader; +pub type SftpReader = FromFileReader; impl SftpReaderInner { pub async fn new(file: File) -> Self { @@ -53,7 +53,7 @@ impl SftpReader { /// Create a new reader from a file, starting at the given offset and ending at the given offset. pub async fn new(file: File, start: u64, end: u64) -> Result { let file = SftpReaderInner::new(file).await; - let mut r = oio::into_reader::from_fd(file, start, end); + let mut r = oio::into_read_from_file(file, start, end); r.seek(SeekFrom::Start(0)).await?; Ok(r) } diff --git a/core/src/services/webhdfs/error.rs b/core/src/services/webhdfs/error.rs index 7964ea38399f..449725e632e3 100644 --- a/core/src/services/webhdfs/error.rs +++ b/core/src/services/webhdfs/error.rs @@ -83,7 +83,6 @@ mod tests { use serde_json::from_reader; use super::*; - use crate::raw::oio::into_stream; /// Error response example from https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Error%20Responses #[tokio::test] @@ -101,9 +100,7 @@ mod tests { "#, ); let body = IncomingAsyncBody::new( - Box::new(into_stream::from_futures_stream(stream::iter(vec![Ok( - ill_args.clone(), - )]))), + Box::new(oio::into_stream(stream::iter(vec![Ok(ill_args.clone())]))), None, ); let resp = Response::builder() diff --git a/core/src/types/reader.rs b/core/src/types/reader.rs index 0f3bd2eaaf29..4b328418250b 100644 --- a/core/src/types/reader.rs +++ b/core/src/types/reader.rs @@ -208,7 +208,7 @@ impl BlockingReader { r } else { // Make this capacity configurable. - Box::new(oio::into_streamable_reader(r, 256 * 1024)) + Box::new(oio::into_streamable_read(r, 256 * 1024)) }; Ok(BlockingReader { inner: r }) diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index c63288e91656..37a9fe72ca98 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -129,9 +129,7 @@ impl Writer { T: Into, { if let State::Idle(Some(w)) = &mut self.state { - let s = Box::new(oio::into_stream::from_futures_stream( - sink_from.map_ok(|v| v.into()), - )); + let s = Box::new(oio::into_stream(sink_from.map_ok(|v| v.into()))); w.sink(size, s).await } else { unreachable!( @@ -176,7 +174,7 @@ impl Writer { R: futures::AsyncRead + Send + Sync + Unpin + 'static, { if let State::Idle(Some(w)) = &mut self.state { - let s = Box::new(oio::into_stream::from_futures_reader(read_from)); + let s = Box::new(oio::into_stream_from_reader(read_from)); w.sink(size, s).await } else { unreachable!(