From 7afe36d9694dd15281ac239011ab5e9686a6eabd Mon Sep 17 00:00:00 2001 From: Max Burke Date: Sat, 25 Jan 2020 12:03:36 -0800 Subject: [PATCH 1/2] [rust] Explicitly seeking kills BufRead performance. BufRead will discard its internal buffer when seek() is called. This kills much of the performance of a buffered reader, especially when many tiny (1-byte) reads are performed. --- rust/parquet/src/util/io.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rust/parquet/src/util/io.rs b/rust/parquet/src/util/io.rs index b36a710722f..87635c57ac6 100644 --- a/rust/parquet/src/util/io.rs +++ b/rust/parquet/src/util/io.rs @@ -47,8 +47,11 @@ pub struct FileSource { impl FileSource { /// Creates new file reader with start and length from a file handle pub fn new(fd: &R, start: u64, length: usize) -> Self { + let mut reader = BufReader::new(fd.try_clone().unwrap()); + reader.seek(SeekFrom::Start(start as u64)).unwrap(); + Self { - reader: Mutex::new(BufReader::new(fd.try_clone().unwrap())), + reader: Mutex::new(reader), start, end: start + length as u64, } @@ -65,7 +68,6 @@ impl Read for FileSource { let bytes_to_read = cmp::min(buf.len(), (self.end - self.start) as usize); let buf = &mut buf[0..bytes_to_read]; - reader.seek(SeekFrom::Start(self.start as u64))?; let res = reader.read(buf); if let Ok(bytes_read) = res { self.start += bytes_read as u64; From 7b5b905fa6946d85e3f49513687d8823535f4bad Mon Sep 17 00:00:00 2001 From: Max Burke Date: Sun, 26 Jan 2020 10:10:20 -0800 Subject: [PATCH 2/2] FileSource::read does need to do some seeking as the file position may change if the underlying file descriptor is manipulated. Use relative seeking to prevent BufReader from discarding the internal buffer. --- rust/parquet/src/lib.rs | 2 ++ rust/parquet/src/util/io.rs | 11 +++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/rust/parquet/src/lib.rs b/rust/parquet/src/lib.rs index 8d23e89c3d0..07c72adeccb 100644 --- a/rust/parquet/src/lib.rs +++ b/rust/parquet/src/lib.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#![feature(bufreader_seek_relative)] +#![feature(seek_convenience)] #![feature(specialization)] #![allow(dead_code)] #![allow(non_camel_case_types)] diff --git a/rust/parquet/src/util/io.rs b/rust/parquet/src/util/io.rs index 87635c57ac6..3da95e411d3 100644 --- a/rust/parquet/src/util/io.rs +++ b/rust/parquet/src/util/io.rs @@ -47,8 +47,7 @@ pub struct FileSource { impl FileSource { /// Creates new file reader with start and length from a file handle pub fn new(fd: &R, start: u64, length: usize) -> Self { - let mut reader = BufReader::new(fd.try_clone().unwrap()); - reader.seek(SeekFrom::Start(start as u64)).unwrap(); + let reader = BufReader::new(fd.try_clone().unwrap()); Self { reader: Mutex::new(reader), @@ -68,6 +67,14 @@ impl Read for FileSource { let bytes_to_read = cmp::min(buf.len(), (self.end - self.start) as usize); let buf = &mut buf[0..bytes_to_read]; + let pos = reader.stream_position()?; + let seek_offset = self.start as i64 - pos as i64; + if seek_offset != 0 { + // BufReader::seek will discard its internal buffer on every seek. + // Using seek_relative will retain the buffer if the seek position + // lands within the buffer bounds. + reader.seek_relative(seek_offset)?; + } let res = reader.read(buf); if let Ok(bytes_read) = res { self.start += bytes_read as u64;