Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 96 additions & 21 deletions rust/parquet/src/util/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use std::{cmp, io::*, sync::Mutex};
use std::{cell::RefCell, cmp, io::*};

use crate::file::{reader::ParquetReader, writer::ParquetWriter};

const DEFAULT_BUF_SIZE: usize = 8 * 1024;

// ----------------------------------------------------------------------
// Read/Write wrappers for `File`.

Expand All @@ -31,47 +33,85 @@ pub trait Position {
}

/// Struct that represents a slice of a file data with independent start position and
/// length. Internally clones provided file handle, wraps with BufReader and resets
/// position before any read.
/// length. Internally clones provided file handle, wraps with a custom implementation
/// of BufReader that resets position before any read.
///
/// This is workaround and alternative for `file.try_clone()` method. It clones `File`
/// while preserving independent position, which is not available with `try_clone()`.
///
/// Designed after `arrow::io::RandomAccessFile`.
/// Designed after `arrow::io::RandomAccessFile` and `std::io::BufReader`
pub struct FileSource<R: ParquetReader> {
reader: Mutex<BufReader<R>>,
start: u64, // start position in a file
end: u64, // end position in a file
reader: RefCell<R>,
start: u64, // start position in a file
end: u64, // end position in a file
buf: Vec<u8>, // buffer where bytes read in advance are stored
buf_pos: usize, // current position of the reader in the buffer
buf_cap: usize, // current number of bytes read into the buffer
}

impl<R: ParquetReader> FileSource<R> {
/// Creates new file reader with start and length from a file handle
pub fn new(fd: &R, start: u64, length: usize) -> Self {
let reader = RefCell::new(fd.try_clone().unwrap());
Self {
reader: Mutex::new(BufReader::new(fd.try_clone().unwrap())),
reader,
start,
end: start + length as u64,
buf: vec![0 as u8; DEFAULT_BUF_SIZE],
buf_pos: 0,
buf_cap: 0,
}
}

fn fill_inner_buf(&mut self) -> Result<&[u8]> {
if self.buf_pos >= self.buf_cap {
// If we've reached the end of our internal buffer then we need to fetch
// some more data from the underlying reader.
// Branch using `>=` instead of the more correct `==`
// to tell the compiler that the pos..cap slice is always valid.
debug_assert!(self.buf_pos == self.buf_cap);
let mut reader = self.reader.borrow_mut();
reader.seek(SeekFrom::Start(self.start))?; // always seek to start before reading
self.buf_cap = reader.read(&mut self.buf)?;
self.buf_pos = 0;
}
Ok(&self.buf[self.buf_pos..self.buf_cap])
}

fn skip_inner_buf(&mut self, buf: &mut [u8]) -> Result<usize> {
// discard buffer
self.buf_pos = 0;
self.buf_cap = 0;
// read directly into param buffer
let mut reader = self.reader.borrow_mut();
reader.seek(SeekFrom::Start(self.start))?; // always seek to start before reading
let nread = reader.read(buf)?;
self.start += nread as u64;
Ok(nread)
}
}

impl<R: ParquetReader> Read for FileSource<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let mut reader = self
.reader
.lock()
.map_err(|err| Error::new(ErrorKind::Other, err.to_string()))?;

let bytes_to_read = cmp::min(buf.len(), (self.end - self.start) as usize);
let buf = &mut buf[0..bytes_to_read];

reader.seek(SeekFrom::Start(self.start as u64))?;
let res = reader.read(buf);
if let Ok(bytes_read) = res {
self.start += bytes_read as u64;
// If we don't have any buffered data and we're doing a massive read
// (larger than our internal buffer), bypass our internal buffer
// entirely.
if self.buf_pos == self.buf_cap && buf.len() >= self.buf.len() {
return self.skip_inner_buf(buf);
}

res
let nread = {
let mut rem = self.fill_inner_buf()?;
// copy the data from the inner buffer to the param buffer
rem.read(buf)?
};
// consume from buffer
self.buf_pos = cmp::min(self.buf_pos + nread, self.buf_cap);

self.start += nread as u64;
Ok(nread)
}
}

Expand Down Expand Up @@ -132,6 +172,8 @@ impl<'a> Position for Cursor<&'a mut Vec<u8>> {
mod tests {
use super::*;

use std::iter;

use crate::util::test_common::{get_temp_file, get_test_file};

#[test]
Expand Down Expand Up @@ -212,10 +254,43 @@ mod tests {

// Read data using file chunk
let mut res = vec![0u8; 7];
let mut chunk =
FileSource::new(&file, 0, file.metadata().unwrap().len() as usize);
let mut chunk = FileSource::new(&file, 0, file.metadata().unwrap().len() as usize);
chunk.read(&mut res[..]).unwrap();

assert_eq!(res, vec![b'a', b'b', b'c', b'd', b'e', b'f', b'g']);
}

#[test]
fn test_io_large_read() {
// Generate repeated 'abcdef' pattern and write it into a file
let patterned_data: Vec<u8> = iter::repeat(vec![0, 1, 2, 3, 4, 5])
.flatten()
.take(3 * DEFAULT_BUF_SIZE)
.collect();
// always use different temp files as test might be run in parallel
let mut file = get_temp_file("large_file_sink_test", &patterned_data);

// seek the underlying file to the first 'd'
file.seek(SeekFrom::Start(3)).unwrap();

// create the FileSource reader that starts at pos 1 ('b')
let mut chunk = FileSource::new(&file, 1, patterned_data.len() - 1);

// read the 'b' at pos 1
let mut res = vec![0u8; 1];
chunk.read_exact(&mut res).unwrap();
assert_eq!(res, &[1]);

// the underlying file is seeked to 'e'
file.seek(SeekFrom::Start(4)).unwrap();

// now read large chunk that starts with 'c' (after 'b')
let mut res = vec![0u8; 2 * DEFAULT_BUF_SIZE];
chunk.read_exact(&mut res).unwrap();
assert_eq!(
res,
&patterned_data[2..2 + 2 * DEFAULT_BUF_SIZE],
"read buf and original data are not equal"
);
}
}