Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rust/parquet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

#![feature(bufreader_seek_relative)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's our intention to move to stable Rust at some point, we can revisit these in the future

#![feature(seek_convenience)]
#![feature(specialization)]
#![allow(dead_code)]
#![allow(non_camel_case_types)]
Expand Down
13 changes: 11 additions & 2 deletions rust/parquet/src/util/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ pub struct FileSource<R: ParquetReader> {
impl<R: ParquetReader> FileSource<R> {
/// Creates new file reader with start and length from a file handle
pub fn new(fd: &R, start: u64, length: usize) -> Self {
let reader = BufReader::new(fd.try_clone().unwrap());

Self {
reader: Mutex::new(BufReader::new(fd.try_clone().unwrap())),
reader: Mutex::new(reader),
start,
end: start + length as u64,
}
Expand All @@ -65,7 +67,14 @@ impl<R: ParquetReader> Read for FileSource<R> {
let bytes_to_read = cmp::min(buf.len(), (self.end - self.start) as usize);
let buf = &mut buf[0..bytes_to_read];

reader.seek(SeekFrom::Start(self.start as u64))?;
let pos = reader.stream_position()?;
let seek_offset = self.start as i64 - pos as i64;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that u64 cast i64 may overflow, can we change the code to

if self.start > pos {
 ...
} else {
 ...
}

if seek_offset != 0 {
// BufReader::seek will discard its internal buffer on every seek.
// Using seek_relative will retain the buffer if the seek position
// lands within the buffer bounds.
reader.seek_relative(seek_offset)?;
}
let res = reader.read(buf);
if let Ok(bytes_read) = res {
self.start += bytes_read as u64;
Expand Down