diff --git a/rust/parquet/src/util/io.rs b/rust/parquet/src/util/io.rs index b36a710722f..467c240d0c8 100644 --- a/rust/parquet/src/util/io.rs +++ b/rust/parquet/src/util/io.rs @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. -use std::{cmp, io::*, sync::Mutex}; +use std::{cell::RefCell, cmp, io::*}; use crate::file::{reader::ParquetReader, writer::ParquetWriter}; +const DEFAULT_BUF_SIZE: usize = 8 * 1024; + // ---------------------------------------------------------------------- // Read/Write wrappers for `File`. @@ -31,47 +33,85 @@ pub trait Position { } /// Struct that represents a slice of a file data with independent start position and -/// length. Internally clones provided file handle, wraps with BufReader and resets -/// position before any read. +/// length. Internally clones provided file handle, wraps with a custom implementation +/// of BufReader that resets position before any read. /// /// This is workaround and alternative for `file.try_clone()` method. It clones `File` /// while preserving independent position, which is not available with `try_clone()`. /// -/// Designed after `arrow::io::RandomAccessFile`. +/// Designed after `arrow::io::RandomAccessFile` and `std::io::BufReader` pub struct FileSource { - reader: Mutex>, - start: u64, // start position in a file - end: u64, // end position in a file + reader: RefCell, + start: u64, // start position in a file + end: u64, // end position in a file + buf: Vec, // buffer where bytes read in advance are stored + buf_pos: usize, // current position of the reader in the buffer + buf_cap: usize, // current number of bytes read into the buffer } impl FileSource { /// Creates new file reader with start and length from a file handle pub fn new(fd: &R, start: u64, length: usize) -> Self { + let reader = RefCell::new(fd.try_clone().unwrap()); Self { - reader: Mutex::new(BufReader::new(fd.try_clone().unwrap())), + reader, start, end: start + length as u64, + buf: vec![0 as u8; DEFAULT_BUF_SIZE], + buf_pos: 0, + buf_cap: 0, + } + } + + fn fill_inner_buf(&mut self) -> Result<&[u8]> { + if self.buf_pos >= self.buf_cap { + // If we've reached the end of our internal buffer then we need to fetch + // some more data from the underlying reader. + // Branch using `>=` instead of the more correct `==` + // to tell the compiler that the pos..cap slice is always valid. + debug_assert!(self.buf_pos == self.buf_cap); + let mut reader = self.reader.borrow_mut(); + reader.seek(SeekFrom::Start(self.start))?; // always seek to start before reading + self.buf_cap = reader.read(&mut self.buf)?; + self.buf_pos = 0; } + Ok(&self.buf[self.buf_pos..self.buf_cap]) + } + + fn skip_inner_buf(&mut self, buf: &mut [u8]) -> Result { + // discard buffer + self.buf_pos = 0; + self.buf_cap = 0; + // read directly into param buffer + let mut reader = self.reader.borrow_mut(); + reader.seek(SeekFrom::Start(self.start))?; // always seek to start before reading + let nread = reader.read(buf)?; + self.start += nread as u64; + Ok(nread) } } impl Read for FileSource { fn read(&mut self, buf: &mut [u8]) -> Result { - let mut reader = self - .reader - .lock() - .map_err(|err| Error::new(ErrorKind::Other, err.to_string()))?; - let bytes_to_read = cmp::min(buf.len(), (self.end - self.start) as usize); let buf = &mut buf[0..bytes_to_read]; - reader.seek(SeekFrom::Start(self.start as u64))?; - let res = reader.read(buf); - if let Ok(bytes_read) = res { - self.start += bytes_read as u64; + // If we don't have any buffered data and we're doing a massive read + // (larger than our internal buffer), bypass our internal buffer + // entirely. + if self.buf_pos == self.buf_cap && buf.len() >= self.buf.len() { + return self.skip_inner_buf(buf); } - - res + let nread = { + let mut rem = self.fill_inner_buf()?; + // copy the data from the inner buffer to the param buffer + rem.read(buf)? + }; + // consume from buffer + self.buf_pos = cmp::min(self.buf_pos + nread, self.buf_cap); + + self.start += nread as u64; + Ok(nread) } } @@ -132,6 +172,8 @@ impl<'a> Position for Cursor<&'a mut Vec> { mod tests { use super::*; + use std::iter; + use crate::util::test_common::{get_temp_file, get_test_file}; #[test] @@ -212,10 +254,43 @@ mod tests { // Read data using file chunk let mut res = vec![0u8; 7]; - let mut chunk = - FileSource::new(&file, 0, file.metadata().unwrap().len() as usize); + let mut chunk = FileSource::new(&file, 0, file.metadata().unwrap().len() as usize); chunk.read(&mut res[..]).unwrap(); assert_eq!(res, vec![b'a', b'b', b'c', b'd', b'e', b'f', b'g']); } + + #[test] + fn test_io_large_read() { + // Generate repeated 'abcdef' pattern and write it into a file + let patterned_data: Vec = iter::repeat(vec![0, 1, 2, 3, 4, 5]) + .flatten() + .take(3 * DEFAULT_BUF_SIZE) + .collect(); + // always use different temp files as test might be run in parallel + let mut file = get_temp_file("large_file_sink_test", &patterned_data); + + // seek the underlying file to the first 'd' + file.seek(SeekFrom::Start(3)).unwrap(); + + // create the FileSource reader that starts at pos 1 ('b') + let mut chunk = FileSource::new(&file, 1, patterned_data.len() - 1); + + // read the 'b' at pos 1 + let mut res = vec![0u8; 1]; + chunk.read_exact(&mut res).unwrap(); + assert_eq!(res, &[1]); + + // the underlying file is seeked to 'e' + file.seek(SeekFrom::Start(4)).unwrap(); + + // now read large chunk that starts with 'c' (after 'b') + let mut res = vec![0u8; 2 * DEFAULT_BUF_SIZE]; + chunk.read_exact(&mut res).unwrap(); + assert_eq!( + res, + &patterned_data[2..2 + 2 * DEFAULT_BUF_SIZE], + "read buf and original data are not equal" + ); + } }