Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 62 additions & 56 deletions rust/lance-io/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::os::windows::fs::FileExt;
use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use deepsize::DeepSizeOf;
use futures::future::BoxFuture;
use lance_core::{Error, Result};
use object_store::path::Path;
use snafu::location;
Expand Down Expand Up @@ -153,7 +154,6 @@ impl LocalObjectReader {
}
}

#[async_trait]
impl Reader for LocalObjectReader {
fn path(&self) -> &Path {
&self.path
Expand All @@ -168,80 +168,86 @@ impl Reader for LocalObjectReader {
}

/// Returns the file size.
async fn size(&self) -> object_store::Result<usize> {
let file = self.file.clone();
self.size
.get_or_try_init(|| async move {
let metadata = tokio::task::spawn_blocking(move || {
file.metadata().map_err(|err| object_store::Error::Generic {
store: "LocalFileSystem",
source: err.into(),
fn size(&self) -> BoxFuture<'_, object_store::Result<usize>> {
Box::pin(async move {
let file = self.file.clone();
self.size
.get_or_try_init(|| async move {
let metadata = tokio::task::spawn_blocking(move || {
file.metadata().map_err(|err| object_store::Error::Generic {
store: "LocalFileSystem",
source: err.into(),
})
})
.await??;
Ok(metadata.len() as usize)
})
.await??;
Ok(metadata.len() as usize)
})
.await
.cloned()
.await
.cloned()
})
}

/// Reads a range of data.
#[instrument(level = "debug", skip(self))]
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes> {
fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, object_store::Result<Bytes>> {
let file = self.file.clone();
let io_tracker = self.io_tracker.clone();
let path = self.path.clone();
let num_bytes = range.len() as u64;
let range_u64 = (range.start as u64)..(range.end as u64);

let result = tokio::task::spawn_blocking(move || {
let mut buf = BytesMut::with_capacity(range.len());
// Safety: `buf` is set with appropriate capacity above. It is
// written to below and we check all data is initialized at that point.
unsafe { buf.set_len(range.len()) };
#[cfg(unix)]
file.read_exact_at(buf.as_mut(), range.start as u64)?;
#[cfg(windows)]
read_exact_at(file, buf.as_mut(), range.start as u64)?;

Ok(buf.freeze())
})
.await?
.map_err(|err: std::io::Error| object_store::Error::Generic {
store: "LocalFileSystem",
source: err.into(),
});

if result.is_ok() {
io_tracker.record_read("get_range", path, num_bytes, Some(range_u64));
}
Box::pin(async move {
let result = tokio::task::spawn_blocking(move || {
let mut buf = BytesMut::with_capacity(range.len());
// Safety: `buf` is set with appropriate capacity above. It is
// written to below and we check all data is initialized at that point.
unsafe { buf.set_len(range.len()) };
#[cfg(unix)]
file.read_exact_at(buf.as_mut(), range.start as u64)?;
#[cfg(windows)]
read_exact_at(file, buf.as_mut(), range.start as u64)?;

Ok(buf.freeze())
})
.await?
.map_err(|err: std::io::Error| object_store::Error::Generic {
store: "LocalFileSystem",
source: err.into(),
});

if result.is_ok() {
io_tracker.record_read("get_range", path, num_bytes, Some(range_u64));
}

result
result
})
}

/// Reads the entire file.
#[instrument(level = "debug", skip(self))]
async fn get_all(&self) -> object_store::Result<Bytes> {
let mut file = self.file.clone();
let io_tracker = self.io_tracker.clone();
let path = self.path.clone();
fn get_all(&self) -> BoxFuture<'_, object_store::Result<Bytes>> {
Box::pin(async move {
let mut file = self.file.clone();
let io_tracker = self.io_tracker.clone();
let path = self.path.clone();

let result = tokio::task::spawn_blocking(move || {
let mut buf = Vec::new();
file.read_to_end(buf.as_mut())?;
Ok(Bytes::from(buf))
})
.await?
.map_err(|err: std::io::Error| object_store::Error::Generic {
store: "LocalFileSystem",
source: err.into(),
});

if let Ok(bytes) = &result {
io_tracker.record_read("get_all", path, bytes.len() as u64, None);
}

let result = tokio::task::spawn_blocking(move || {
let mut buf = Vec::new();
file.read_to_end(buf.as_mut())?;
Ok(Bytes::from(buf))
result
})
.await?
.map_err(|err: std::io::Error| object_store::Error::Generic {
store: "LocalFileSystem",
source: err.into(),
});

if let Ok(bytes) = &result {
io_tracker.record_read("get_all", path, bytes.len() as u64, None);
}

result
}
}

Expand Down
Loading