From dde688c214440b66a7f23cb68cc429579733e85f Mon Sep 17 00:00:00 2001 From: rdettai Date: Fri, 24 Apr 2020 10:36:14 +0200 Subject: [PATCH 1/5] [fix] use custom BuffReader to limit seek to strict necessary --- rust/parquet/src/util/io.rs | 72 +++++++++++++++++++++++++++---------- 1 file changed, 54 insertions(+), 18 deletions(-) diff --git a/rust/parquet/src/util/io.rs b/rust/parquet/src/util/io.rs index b36a710722f..29a1fc5a13f 100644 --- a/rust/parquet/src/util/io.rs +++ b/rust/parquet/src/util/io.rs @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. -use std::{cmp, io::*, sync::Mutex}; +use std::{cell::RefCell, cmp, io::*}; use crate::file::{reader::ParquetReader, writer::ParquetWriter}; +const DEFAULT_BUF_SIZE: usize = 8 * 1024; + // ---------------------------------------------------------------------- // Read/Write wrappers for `File`. @@ -31,47 +33,81 @@ pub trait Position { } /// Struct that represents a slice of a file data with independent start position and -/// length. Internally clones provided file handle, wraps with BufReader and resets -/// position before any read. +/// length. Internally clones provided file handle, wraps with a custom implementation +/// of BufReader that resets position before any read. /// /// This is workaround and alternative for `file.try_clone()` method. It clones `File` /// while preserving independent position, which is not available with `try_clone()`. /// -/// Designed after `arrow::io::RandomAccessFile`. +/// Designed after `arrow::io::RandomAccessFile` and `std::io::BufReader` pub struct FileSource { - reader: Mutex>, - start: u64, // start position in a file - end: u64, // end position in a file + reader: RefCell, + start: u64, // start position in a file + end: u64, // end position in a file + buf: Vec, // the internal buffer `buf` in BufReader + buf_pos: usize, // equivalent to the `pos` param in BufReader + buf_cap: usize, // equivalent to the `cap` param in BufReader } impl FileSource { /// Creates new file reader with start and length from a file handle pub fn new(fd: &R, start: u64, length: usize) -> Self { + let reader = RefCell::new(fd.try_clone().unwrap()); Self { - reader: Mutex::new(BufReader::new(fd.try_clone().unwrap())), + reader, start, end: start + length as u64, + buf: vec![0 as u8; DEFAULT_BUF_SIZE], + buf_pos: 0, + buf_cap: 0, + } + } + + // inspired from BufReader + fn fill_buf(&mut self) -> Result<&[u8]> { + if self.buf_pos >= self.buf_cap { + // If we've reached the end of our internal buffer then we need to fetch + // some more data from the underlying reader. + // Branch using `>=` instead of the more correct `==` + // to tell the compiler that the pos..cap slice is always valid. + debug_assert!(self.buf_pos == self.buf_cap); + let mut reader = self.reader.borrow_mut(); + reader.seek(SeekFrom::Start(self.start))?; // always seek to start before reading + self.buf_cap = reader.read(&mut self.buf)?; + self.buf_pos = 0; } + Ok(&self.buf[self.buf_pos..self.buf_cap]) } } impl Read for FileSource { fn read(&mut self, buf: &mut [u8]) -> Result { - let mut reader = self - .reader - .lock() - .map_err(|err| Error::new(ErrorKind::Other, err.to_string()))?; - let bytes_to_read = cmp::min(buf.len(), (self.end - self.start) as usize); let buf = &mut buf[0..bytes_to_read]; - reader.seek(SeekFrom::Start(self.start as u64))?; - let res = reader.read(buf); - if let Ok(bytes_read) = res { - self.start += bytes_read as u64; + // If we don't have any buffered data and we're doing a massive read + // (larger than our internal buffer), bypass our internal buffer + // entirely. + if self.buf_pos == self.buf_cap && buf.len() >= self.buf.len() { + // discard buffer + self.buf_pos = 0; + self.buf_cap = 0; + // read directly into param buffer + let mut reader = self.reader.borrow_mut(); + reader.seek(SeekFrom::Start(self.start))?; // always seek to start before reading + return reader.read(&mut self.buf); } + let nread = { + let mut rem = self.fill_buf()?; + // copy the data from the inner buffer to the param buffer + rem.read(buf)? + }; + // consume from buffer + self.buf_pos = cmp::min(self.buf_pos + nread, self.buf_cap); + + self.start += nread as u64; - res + Ok(nread) } } From d912cea7ac63fce232f4ba8cd9bb5aa924e565c2 Mon Sep 17 00:00:00 2001 From: rdettai Date: Fri, 24 Apr 2020 10:36:14 +0200 Subject: [PATCH 2/5] [fix] case where large read is asked at once This should have been cought by tests, need test on large column --- rust/parquet/src/util/io.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rust/parquet/src/util/io.rs b/rust/parquet/src/util/io.rs index 29a1fc5a13f..55c4a5c5e5f 100644 --- a/rust/parquet/src/util/io.rs +++ b/rust/parquet/src/util/io.rs @@ -95,7 +95,9 @@ impl Read for FileSource { // read directly into param buffer let mut reader = self.reader.borrow_mut(); reader.seek(SeekFrom::Start(self.start))?; // always seek to start before reading - return reader.read(&mut self.buf); + let nread = reader.read(buf)?; + self.start += nread as u64; + return Ok(nread); } let nread = { let mut rem = self.fill_buf()?; From ee8c7077f006d3127b935b52efa62545b80c93a1 Mon Sep 17 00:00:00 2001 From: rdettai Date: Fri, 24 Apr 2020 10:36:14 +0200 Subject: [PATCH 3/5] [test] read large chunck --- rust/parquet/src/util/io.rs | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/rust/parquet/src/util/io.rs b/rust/parquet/src/util/io.rs index 55c4a5c5e5f..c6e28747bd8 100644 --- a/rust/parquet/src/util/io.rs +++ b/rust/parquet/src/util/io.rs @@ -170,6 +170,8 @@ impl<'a> Position for Cursor<&'a mut Vec> { mod tests { use super::*; + use std::iter; + use crate::util::test_common::{get_temp_file, get_test_file}; #[test] @@ -256,4 +258,38 @@ mod tests { assert_eq!(res, vec![b'a', b'b', b'c', b'd', b'e', b'f', b'g']); } + + #[test] + fn test_io_large_read() { + // Generate repeated 'abcdef' pattern and write it into a file + let patterned_data: Vec = + iter::repeat(vec![b'a', b'b', b'c', b'd', b'e', b'f']) + .flatten() + .take(3 * DEFAULT_BUF_SIZE) + .collect(); + let mut file = get_temp_file("file_sink_test", &patterned_data); + + // seek the underlying file to the first 'd' + file.seek(SeekFrom::Start(3)).unwrap(); + + // create the FileSource reader that starts at pos 1 ('b') + let mut chunk = FileSource::new(&file, 1, patterned_data.len() - 1); + + // read the 'b' at pos 1 + let mut res = vec![0u8; 1]; + chunk.read_exact(&mut res).unwrap(); + assert_eq!(res, &[b'b']); + + // the underlying file is seeked to 'e' + file.seek(SeekFrom::Start(4)).unwrap(); + + // now read large chunk that starts with 'c' (after 'b') + let mut res = vec![0u8; 2 * DEFAULT_BUF_SIZE]; + chunk.read_exact(&mut res).unwrap(); + assert_eq!( + res, + &patterned_data[2..2 + 2 * DEFAULT_BUF_SIZE], + "read buf should start with 'c' [99u8]" + ); + } } From f06c2147eb562db6ac4bc89fe75087800e2a05e4 Mon Sep 17 00:00:00 2001 From: rdettai Date: Fri, 24 Apr 2020 14:53:30 +0200 Subject: [PATCH 4/5] [fix] comments and tiny refacto --- rust/parquet/src/util/io.rs | 51 ++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/rust/parquet/src/util/io.rs b/rust/parquet/src/util/io.rs index c6e28747bd8..a7d7a49f4a8 100644 --- a/rust/parquet/src/util/io.rs +++ b/rust/parquet/src/util/io.rs @@ -44,9 +44,9 @@ pub struct FileSource { reader: RefCell, start: u64, // start position in a file end: u64, // end position in a file - buf: Vec, // the internal buffer `buf` in BufReader - buf_pos: usize, // equivalent to the `pos` param in BufReader - buf_cap: usize, // equivalent to the `cap` param in BufReader + buf: Vec, // buffer where bytes read in advance are stored + buf_pos: usize, // current position of the reader in the buffer + buf_cap: usize, // current number of bytes read into the buffer } impl FileSource { @@ -63,8 +63,7 @@ impl FileSource { } } - // inspired from BufReader - fn fill_buf(&mut self) -> Result<&[u8]> { + fn fill_inner_buf(&mut self) -> Result<&[u8]> { if self.buf_pos >= self.buf_cap { // If we've reached the end of our internal buffer then we need to fetch // some more data from the underlying reader. @@ -78,6 +77,18 @@ impl FileSource { } Ok(&self.buf[self.buf_pos..self.buf_cap]) } + + fn skip_inner_buf(&mut self, buf: &mut [u8]) -> Result { + // discard buffer + self.buf_pos = 0; + self.buf_cap = 0; + // read directly into param buffer + let mut reader = self.reader.borrow_mut(); + reader.seek(SeekFrom::Start(self.start))?; // always seek to start before reading + let nread = reader.read(buf)?; + self.start += nread as u64; + Ok(nread) + } } impl Read for FileSource { @@ -89,18 +100,10 @@ impl Read for FileSource { // (larger than our internal buffer), bypass our internal buffer // entirely. if self.buf_pos == self.buf_cap && buf.len() >= self.buf.len() { - // discard buffer - self.buf_pos = 0; - self.buf_cap = 0; - // read directly into param buffer - let mut reader = self.reader.borrow_mut(); - reader.seek(SeekFrom::Start(self.start))?; // always seek to start before reading - let nread = reader.read(buf)?; - self.start += nread as u64; - return Ok(nread); + return self.skip_inner_buf(buf); } let nread = { - let mut rem = self.fill_buf()?; + let mut rem = self.fill_inner_buf()?; // copy the data from the inner buffer to the param buffer rem.read(buf)? }; @@ -108,7 +111,7 @@ impl Read for FileSource { self.buf_pos = cmp::min(self.buf_pos + nread, self.buf_cap); self.start += nread as u64; - + println!("{}", nread); Ok(nread) } } @@ -262,12 +265,12 @@ mod tests { #[test] fn test_io_large_read() { // Generate repeated 'abcdef' pattern and write it into a file - let patterned_data: Vec = - iter::repeat(vec![b'a', b'b', b'c', b'd', b'e', b'f']) - .flatten() - .take(3 * DEFAULT_BUF_SIZE) - .collect(); - let mut file = get_temp_file("file_sink_test", &patterned_data); + let patterned_data: Vec = iter::repeat(vec![0, 1, 2, 3, 4, 5]) + .flatten() + .take(3 * DEFAULT_BUF_SIZE) + .collect(); + // always use different temp files as test might be run in parallel + let mut file = get_temp_file("large_file_sink_test", &patterned_data); // seek the underlying file to the first 'd' file.seek(SeekFrom::Start(3)).unwrap(); @@ -278,7 +281,7 @@ mod tests { // read the 'b' at pos 1 let mut res = vec![0u8; 1]; chunk.read_exact(&mut res).unwrap(); - assert_eq!(res, &[b'b']); + assert_eq!(res, &[1]); // the underlying file is seeked to 'e' file.seek(SeekFrom::Start(4)).unwrap(); @@ -289,7 +292,7 @@ mod tests { assert_eq!( res, &patterned_data[2..2 + 2 * DEFAULT_BUF_SIZE], - "read buf should start with 'c' [99u8]" + "read buf and original data are not equal" ); } } From 7bb4049ef536bdf9437126a5b56fbe8f13830283 Mon Sep 17 00:00:00 2001 From: rdettai Date: Fri, 24 Apr 2020 16:03:51 +0200 Subject: [PATCH 5/5] [fix] println forgotten --- rust/parquet/src/util/io.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/rust/parquet/src/util/io.rs b/rust/parquet/src/util/io.rs index a7d7a49f4a8..467c240d0c8 100644 --- a/rust/parquet/src/util/io.rs +++ b/rust/parquet/src/util/io.rs @@ -111,7 +111,6 @@ impl Read for FileSource { self.buf_pos = cmp::min(self.buf_pos + nread, self.buf_cap); self.start += nread as u64; - println!("{}", nread); Ok(nread) } } @@ -255,8 +254,7 @@ mod tests { // Read data using file chunk let mut res = vec![0u8; 7]; - let mut chunk = - FileSource::new(&file, 0, file.metadata().unwrap().len() as usize); + let mut chunk = FileSource::new(&file, 0, file.metadata().unwrap().len() as usize); chunk.read(&mut res[..]).unwrap(); assert_eq!(res, vec![b'a', b'b', b'c', b'd', b'e', b'f', b'g']);