Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ use std::task::Poll;
use async_trait::async_trait;
use bytes::Bytes;

use crate::raw::oio::into_reader::RangeReader;
use crate::raw::oio::to_flat_pager;
use crate::raw::oio::to_hierarchy_pager;
use crate::raw::oio::into_flat_page;
use crate::raw::oio::into_hierarchy_page;
use crate::raw::oio::ByRangeSeekableReader;
use crate::raw::oio::Entry;
use crate::raw::oio::IntoStreamableReader;
use crate::raw::oio::ToFlatPager;
use crate::raw::oio::ToHierarchyPager;
use crate::raw::oio::FlatPager;
use crate::raw::oio::HierarchyPager;
use crate::raw::oio::StreamableReader;
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -172,7 +172,7 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
match (seekable, streamable) {
(true, true) => Ok((rp, CompleteReader::AlreadyComplete(r))),
(true, false) => {
let r = oio::into_streamable_reader(r, 256 * 1024);
let r = oio::into_streamable_read(r, 256 * 1024);
Ok((rp, CompleteReader::NeedStreamable(r)))
}
_ => {
Expand All @@ -193,12 +193,12 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
(offset, size)
}
};
let r = oio::into_reader::by_range(self.inner.clone(), path, r, offset, size);
let r = oio::into_seekable_read_by_range(self.inner.clone(), path, r, offset, size);

if streamable {
Ok((rp, CompleteReader::NeedSeekable(r)))
} else {
let r = oio::into_streamable_reader(r, 256 * 1024);
let r = oio::into_streamable_read(r, 256 * 1024);
Ok((rp, CompleteReader::NeedBoth(r)))
}
}
Expand All @@ -223,7 +223,7 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
match (seekable, streamable) {
(true, true) => Ok((rp, CompleteReader::AlreadyComplete(r))),
(true, false) => {
let r = oio::into_streamable_reader(r, 256 * 1024);
let r = oio::into_streamable_read(r, 256 * 1024);
Ok((rp, CompleteReader::NeedStreamable(r)))
}
(false, _) => Err(Error::new(
Expand Down Expand Up @@ -254,7 +254,7 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
let (rp, p) = self.inner.list(path, args).await?;
Ok((rp, CompletePager::AlreadyComplete(p)))
} else {
let p = to_flat_pager(
let p = into_flat_page(
self.inner.clone(),
path,
args.with_delimiter("/").limit().unwrap_or(1000),
Expand All @@ -269,7 +269,7 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
Ok((rp, CompletePager::AlreadyComplete(p)))
} else {
let (_, p) = self.inner.list(path, args.with_delimiter("")).await?;
let p = to_hierarchy_pager(p, path);
let p = into_hierarchy_page(p, path);
Ok((RpList::default(), CompletePager::NeedHierarchy(p)))
};
}
Expand Down Expand Up @@ -303,7 +303,7 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
let (rp, p) = self.inner.blocking_list(path, args)?;
Ok((rp, CompletePager::AlreadyComplete(p)))
} else {
let p = to_flat_pager(
let p = into_flat_page(
self.inner.clone(),
path,
args.with_delimiter("/").limit().unwrap_or(1000),
Expand All @@ -318,8 +318,8 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
Ok((rp, CompletePager::AlreadyComplete(p)))
} else {
let (_, p) = self.inner.blocking_list(path, args.with_delimiter(""))?;
let p: ToHierarchyPager<<A as Accessor>::BlockingPager> =
to_hierarchy_pager(p, path);
let p: HierarchyPager<<A as Accessor>::BlockingPager> =
into_hierarchy_page(p, path);
Ok((RpList::default(), CompletePager::NeedHierarchy(p)))
};
}
Expand Down Expand Up @@ -532,9 +532,9 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {

pub enum CompleteReader<A: Accessor, R> {
AlreadyComplete(R),
NeedSeekable(RangeReader<A>),
NeedStreamable(IntoStreamableReader<R>),
NeedBoth(IntoStreamableReader<RangeReader<A>>),
NeedSeekable(ByRangeSeekableReader<A>),
NeedStreamable(StreamableReader<R>),
NeedBoth(StreamableReader<ByRangeSeekableReader<A>>),
}

impl<A, R> oio::Read for CompleteReader<A, R>
Expand Down Expand Up @@ -614,8 +614,8 @@ where

pub enum CompletePager<A: Accessor, P> {
AlreadyComplete(P),
NeedFlat(ToFlatPager<Arc<A>, P>),
NeedHierarchy(ToHierarchyPager<P>),
NeedFlat(FlatPager<Arc<A>, P>),
NeedHierarchy(HierarchyPager<P>),
}

#[async_trait]
Expand Down
7 changes: 2 additions & 5 deletions core/src/raw/http_util/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use http::Response;
use super::body::IncomingAsyncBody;
use super::parse_content_length;
use super::AsyncBody;
use crate::raw::oio::into_stream;
use crate::raw::*;
use crate::Error;
use crate::ErrorKind;
use crate::Result;
Expand Down Expand Up @@ -150,10 +150,7 @@ impl HttpClient {
.set_source(err)
});

let body = IncomingAsyncBody::new(
Box::new(into_stream::from_futures_stream(stream)),
content_length,
);
let body = IncomingAsyncBody::new(Box::new(oio::into_stream(stream)), content_length);

let resp = hr.body(body).expect("response must build succeed");

Expand Down
4 changes: 2 additions & 2 deletions core/src/raw/http_util/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use http::Version;
use super::new_request_build_error;
use super::AsyncBody;
use super::IncomingAsyncBody;
use crate::raw::oio::into_stream;
use crate::raw::oio;
use crate::*;

/// Multipart is a builder for multipart/form-data.
Expand Down Expand Up @@ -320,7 +320,7 @@ impl MixedPart {
let bs: Bytes = self.content;
let length = bs.len();
let body = IncomingAsyncBody::new(
Box::new(into_stream::from_futures_stream(stream::iter(vec![Ok(bs)]))),
Box::new(oio::into_stream(stream::iter(vec![Ok(bs)]))),
Some(length as u64),
);

Expand Down
21 changes: 5 additions & 16 deletions core/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,16 @@
//! them whenever possible.

mod accessor;
pub use accessor::Accessor;
pub use accessor::AccessorInfo;
pub use accessor::FusedAccessor;
pub use accessor::*;

mod layer;
pub use layer::Layer;
pub use layer::LayeredAccessor;

pub mod oio;
pub use layer::*;

mod path;
pub use path::build_abs_path;
pub use path::build_rel_path;
pub use path::build_rooted_abs_path;
pub use path::get_basename;
pub use path::get_parent;
pub use path::normalize_path;
pub use path::normalize_root;
pub use path::validate_path;
pub use path::*;

mod operation;
pub use operation::Operation;
pub use operation::*;

mod version;
pub use version::VERSION;
Expand All @@ -70,3 +58,4 @@ pub use chrono_util::*;

// Expose as a pub mod to avoid confusing.
pub mod adapters;
pub mod oio;
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,5 @@
// specific language governing permissions and limitations
// under the License.

//! into_stream will provide different implementations to convert into
//! [`oio::Stream`][crate::raw::oio::Stream]

mod from_futures_stream;
pub use from_futures_stream::from_futures_stream;

mod from_futures_reader;
pub use from_futures_reader::from_futures_reader;
mod api;
pub use api::*;
119 changes: 0 additions & 119 deletions core/src/raw/oio/into_blocking_reader/from_fd.rs

This file was deleted.

46 changes: 0 additions & 46 deletions core/src/raw/oio/into_reader/mod.rs

This file was deleted.

Loading