diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 14bf7d287a3..b9db4f8c37f 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -953,7 +953,7 @@ mod tests { use std::rc::Rc; use std::sync::Arc; - fn make_column_chuncks( + fn make_column_chunks( column_desc: ColumnDescPtr, encoding: Encoding, num_levels: usize, @@ -964,11 +964,11 @@ mod tests { values: &mut Vec, page_lists: &mut Vec>, use_v2: bool, - num_chuncks: usize, + num_chunks: usize, ) where T::T: PartialOrd + SampleUniform + Copy, { - for _i in 0..num_chuncks { + for _i in 0..num_chunks { let mut pages = VecDeque::new(); let mut data = Vec::new(); let mut page_def_levels = Vec::new(); @@ -1039,7 +1039,7 @@ mod tests { { let mut data = Vec::new(); let mut page_lists = Vec::new(); - make_column_chuncks::( + make_column_chunks::( column_desc.clone(), Encoding::PLAIN, 100, @@ -1061,7 +1061,7 @@ mod tests { ) .unwrap(); - // Read first 50 values, which are all from the first column chunck + // Read first 50 values, which are all from the first column chunk let array = array_reader.next_batch(50).unwrap(); let array = array .as_any() @@ -1120,7 +1120,7 @@ mod tests { { let mut data = Vec::new(); let mut page_lists = Vec::new(); - make_column_chuncks::<$arrow_parquet_type>( + make_column_chunks::<$arrow_parquet_type>( column_desc.clone(), Encoding::PLAIN, 100, @@ -1225,7 +1225,7 @@ mod tests { let mut def_levels = Vec::new(); let mut rep_levels = Vec::new(); let mut page_lists = Vec::new(); - make_column_chuncks::( + make_column_chunks::( column_desc.clone(), Encoding::PLAIN, 100, @@ -1250,7 +1250,7 @@ mod tests { let mut accu_len: usize = 0; - // Read first 50 values, which are all from the first column chunck + // Read first 50 values, which are all from the first column chunk let array = array_reader.next_batch(50).unwrap(); assert_eq!( Some(&def_levels[accu_len..(accu_len + array.len())]), diff --git a/rust/parquet/src/file/footer.rs b/rust/parquet/src/file/footer.rs new file mode 100644 index 00000000000..240381c3038 --- /dev/null +++ b/rust/parquet/src/file/footer.rs @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + cmp::min, + io::{Cursor, Read, Seek, SeekFrom}, + rc::Rc, +}; + +use byteorder::{ByteOrder, LittleEndian}; +use parquet_format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; +use thrift::protocol::TCompactInputProtocol; + +use crate::basic::ColumnOrder; + +use crate::errors::{ParquetError, Result}; +use crate::file::{ + metadata::*, reader::ChunkReader, DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, + PARQUET_MAGIC, +}; + +use crate::schema::types::{self, SchemaDescriptor}; + +/// Layout of Parquet file +/// +---------------------------+-----+---+ +/// | Rest of file | B | A | +/// +---------------------------+-----+---+ +/// where A: parquet footer, B: parquet metadata. +/// +/// The reader first reads DEFAULT_FOOTER_SIZE bytes from the end of the file. +/// If it is not enough according to the length indicated in the footer, it reads more bytes. +pub fn parse_metadata(chunk_reader: &R) -> Result { + // check file is large enough to hold footer + let file_size = chunk_reader.len(); + if file_size < (FOOTER_SIZE as u64) { + return Err(general_err!( + "Invalid Parquet file. Size is smaller than footer" + )); + } + + // read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer + let default_end_len = min(DEFAULT_FOOTER_READ_SIZE, chunk_reader.len() as usize); + let mut default_end_reader = chunk_reader + .get_read(chunk_reader.len() - default_end_len as u64, default_end_len)?; + let mut default_len_end_buf = vec![0; default_end_len]; + default_end_reader.read_exact(&mut default_len_end_buf)?; + + // check this is indeed a parquet file + if default_len_end_buf[default_end_len - 4..] != PARQUET_MAGIC { + return Err(general_err!("Invalid Parquet file. Corrupt footer")); + } + + // get the metadata length from the footer + let metadata_len = LittleEndian::read_i32( + &default_len_end_buf[default_end_len - 8..default_end_len - 4], + ) as i64; + if metadata_len < 0 { + return Err(general_err!( + "Invalid Parquet file. Metadata length is less than zero ({})", + metadata_len + )); + } + let footer_metadata_len = FOOTER_SIZE + metadata_len as usize; + + // build up the reader covering the entire metadata + let mut default_end_cursor = Cursor::new(default_len_end_buf); + let metadata_read: Box; + if footer_metadata_len > file_size as usize { + return Err(general_err!( + "Invalid Parquet file. Metadata start is less than zero ({})", + file_size as i64 - footer_metadata_len as i64 + )); + } else if footer_metadata_len < DEFAULT_FOOTER_READ_SIZE { + // the whole metadata is in the bytes we already read + default_end_cursor.seek(SeekFrom::End(-(footer_metadata_len as i64)))?; + metadata_read = Box::new(default_end_cursor); + } else { + // the end of file read by default is not long enough, read missing bytes + let complementary_end_read = chunk_reader.get_read( + file_size - footer_metadata_len as u64, + FOOTER_SIZE + metadata_len as usize - default_end_len, + )?; + metadata_read = Box::new(complementary_end_read.chain(default_end_cursor)); + } + + // TODO: row group filtering + let mut prot = TCompactInputProtocol::new(metadata_read); + let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) + .map_err(|e| ParquetError::General(format!("Could not parse metadata: {}", e)))?; + let schema = types::from_thrift(&t_file_metadata.schema)?; + let schema_descr = Rc::new(SchemaDescriptor::new(schema.clone())); + let mut row_groups = Vec::new(); + for rg in t_file_metadata.row_groups { + row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?); + } + let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_descr); + + let file_metadata = FileMetaData::new( + t_file_metadata.version, + t_file_metadata.num_rows, + t_file_metadata.created_by, + t_file_metadata.key_value_metadata, + schema, + schema_descr, + column_orders, + ); + Ok(ParquetMetaData::new(file_metadata, row_groups)) +} + +/// Parses column orders from Thrift definition. +/// If no column orders are defined, returns `None`. +fn parse_column_orders( + t_column_orders: Option>, + schema_descr: &SchemaDescriptor, +) -> Option> { + match t_column_orders { + Some(orders) => { + // Should always be the case + assert_eq!( + orders.len(), + schema_descr.num_columns(), + "Column order length mismatch" + ); + let mut res = Vec::new(); + for (i, column) in schema_descr.columns().iter().enumerate() { + match orders[i] { + TColumnOrder::TYPEORDER(_) => { + let sort_order = ColumnOrder::get_sort_order( + column.logical_type(), + column.physical_type(), + ); + res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order)); + } + } + } + Some(res) + } + None => None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::basic::SortOrder; + use crate::basic::Type; + use crate::schema::types::Type as SchemaType; + use crate::util::test_common::get_temp_file; + use parquet_format::TypeDefinedOrder; + + #[test] + fn test_parse_metadata_size_smaller_than_footer() { + let test_file = get_temp_file("corrupt-1.parquet", &[]); + let reader_result = parse_metadata(&test_file); + assert!(reader_result.is_err()); + assert_eq!( + reader_result.err().unwrap(), + general_err!("Invalid Parquet file. Size is smaller than footer") + ); + } + + #[test] + fn test_parse_metadata_corrupt_footer() { + let test_file = get_temp_file("corrupt-2.parquet", &[1, 2, 3, 4, 5, 6, 7, 8]); + let reader_result = parse_metadata(&test_file); + assert!(reader_result.is_err()); + assert_eq!( + reader_result.err().unwrap(), + general_err!("Invalid Parquet file. Corrupt footer") + ); + } + + #[test] + fn test_parse_metadata_invalid_length() { + let test_file = + get_temp_file("corrupt-3.parquet", &[0, 0, 0, 255, b'P', b'A', b'R', b'1']); + let reader_result = parse_metadata(&test_file); + assert!(reader_result.is_err()); + assert_eq!( + reader_result.err().unwrap(), + general_err!( + "Invalid Parquet file. Metadata length is less than zero (-16777216)" + ) + ); + } + + #[test] + fn test_parse_metadata_invalid_start() { + let test_file = + get_temp_file("corrupt-4.parquet", &[255, 0, 0, 0, b'P', b'A', b'R', b'1']); + let reader_result = parse_metadata(&test_file); + assert!(reader_result.is_err()); + assert_eq!( + reader_result.err().unwrap(), + general_err!("Invalid Parquet file. Metadata start is less than zero (-255)") + ); + } + + #[test] + fn test_metadata_column_orders_parse() { + // Define simple schema, we do not need to provide logical types. + let mut fields = vec![ + Rc::new( + SchemaType::primitive_type_builder("col1", Type::INT32) + .build() + .unwrap(), + ), + Rc::new( + SchemaType::primitive_type_builder("col2", Type::FLOAT) + .build() + .unwrap(), + ), + ]; + let schema = SchemaType::group_type_builder("schema") + .with_fields(&mut fields) + .build() + .unwrap(); + let schema_descr = SchemaDescriptor::new(Rc::new(schema)); + + let t_column_orders = Some(vec![ + TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), + TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), + ]); + + assert_eq!( + parse_column_orders(t_column_orders, &schema_descr), + Some(vec![ + ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED), + ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED) + ]) + ); + + // Test when no column orders are defined. + assert_eq!(parse_column_orders(None, &schema_descr), None); + } + + #[test] + #[should_panic(expected = "Column order length mismatch")] + fn test_metadata_column_orders_len_mismatch() { + let schema = SchemaType::group_type_builder("schema").build().unwrap(); + let schema_descr = SchemaDescriptor::new(Rc::new(schema)); + + let t_column_orders = + Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]); + + parse_column_orders(t_column_orders, &schema_descr); + } +} diff --git a/rust/parquet/src/file/mod.rs b/rust/parquet/src/file/mod.rs index 6dbf131b9c3..8b026e361f7 100644 --- a/rust/parquet/src/file/mod.rs +++ b/rust/parquet/src/file/mod.rs @@ -96,11 +96,16 @@ //! println!("{}", row); //! } //! ``` +pub mod footer; pub mod metadata; pub mod properties; pub mod reader; +pub mod serialized_reader; pub mod statistics; pub mod writer; const FOOTER_SIZE: usize = 8; const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1']; + +/// The number of bytes read at the end of the parquet file on first read +const DEFAULT_FOOTER_READ_SIZE: usize = 64 * 1024; diff --git a/rust/parquet/src/file/reader.rs b/rust/parquet/src/file/reader.rs index a6e5be0fa90..50991872eaf 100644 --- a/rust/parquet/src/file/reader.rs +++ b/rust/parquet/src/file/reader.rs @@ -18,35 +18,37 @@ //! Contains file reader API and provides methods to access file metadata, row group //! readers to read individual column chunks, or access record iterator. -use std::{ - convert::TryFrom, - fs::File, - io::{BufReader, Cursor, Read, Seek, SeekFrom}, - path::Path, - rc::Rc, -}; +use std::{boxed::Box, io::Read, rc::Rc}; -use byteorder::{ByteOrder, LittleEndian}; -use parquet_format::{ - ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData, PageHeader, PageType, -}; -use thrift::protocol::TCompactInputProtocol; - -use crate::basic::{ColumnOrder, Compression, Encoding, Type}; use crate::column::page::PageIterator; -use crate::column::{ - page::{Page, PageReader}, - reader::{ColumnReader, ColumnReaderImpl}, -}; -use crate::compression::{create_codec, Codec}; +use crate::column::{page::PageReader, reader::ColumnReader}; use crate::errors::{ParquetError, Result}; -use crate::file::{metadata::*, statistics, FOOTER_SIZE, PARQUET_MAGIC}; +use crate::file::metadata::*; +pub use crate::file::serialized_reader::{SerializedFileReader, SerializedPageReader}; use crate::record::reader::RowIter; -use crate::record::Row; -use crate::schema::types::{ - self, ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, Type as SchemaType, -}; -use crate::util::{io::FileSource, memory::ByteBufferPtr}; +use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, Type as SchemaType}; + +use crate::basic::Type; + +use crate::column::reader::ColumnReaderImpl; + +/// Length should return the total number of bytes in the input source. +/// It's mainly used to read the metadata, which is at the end of the source. +#[allow(clippy::len_without_is_empty)] +pub trait Length { + /// Returns the amount of bytes of the inner source. + fn len(&self) -> u64; +} + +/// The ChunkReader trait generates readers of chunks of a source. +/// For a file system reader, each chunk might contain a clone of File bounded on a given range. +/// For an object store reader, each read can be mapped to a range request. +pub trait ChunkReader: Length { + type T: Read; + /// get a serialy readeable slice of the current reader + /// This should fail if the slice exceeds the current bounds + fn get_read(&self, start: u64, length: usize) -> Result; +} // ---------------------------------------------------------------------- // APIs for file & row group readers @@ -85,299 +87,8 @@ pub trait RowGroupReader { fn get_column_page_reader(&self, i: usize) -> Result>; /// Get value reader for the `i`th column chunk. - fn get_column_reader(&self, i: usize) -> Result; - - /// Get iterator of `Row`s from this row group. - /// - /// Projected schema can be a subset of or equal to the file schema, when it is None, - /// full file schema is assumed. - fn get_row_iter(&self, projection: Option) -> Result; -} - -// ---------------------------------------------------------------------- -// Serialized impl for file & row group readers - -/// Length should return the amount of bytes that implementor contains. -/// It's mainly used to read the metadata, which is at the end of the source. -#[allow(clippy::len_without_is_empty)] -pub trait Length { - /// Returns the amount of bytes of the inner source. - fn len(&self) -> u64; -} - -/// TryClone tries to clone the type and should maintain the `Seek` position of the given -/// instance. -pub trait TryClone: Sized { - /// Clones the type returning a new instance or an error if it's not possible - /// to clone it. - fn try_clone(&self) -> Result; -} - -impl Length for File { - fn len(&self) -> u64 { - self.metadata().map(|m| m.len()).unwrap_or(0u64) - } -} - -impl TryClone for File { - fn try_clone(&self) -> Result { - self.try_clone().map_err(|e| e.into()) - } -} - -impl<'a> Length for Cursor<&'a [u8]> { - fn len(&self) -> u64 { - self.get_ref().len() as u64 - } -} - -impl<'a> TryClone for Cursor<&'a [u8]> { - fn try_clone(&self) -> Result { - Ok(self.clone()) - } -} - -impl Length for Cursor> { - fn len(&self) -> u64 { - self.get_ref().len() as u64 - } -} - -impl TryClone for Cursor> { - fn try_clone(&self) -> Result { - Ok(self.clone()) - } -} - -/// ParquetReader is the interface which needs to be fulfilled to be able to parse a -/// parquet source. -pub trait ParquetReader: Read + Seek + Length + TryClone {} -impl ParquetReader for T {} - -/// A serialized implementation for Parquet [`FileReader`]. -pub struct SerializedFileReader { - buf: BufReader, - metadata: ParquetMetaData, -} - -impl SerializedFileReader { - /// Creates file reader from a Parquet file. - /// Returns error if Parquet file does not exist or is corrupt. - pub fn new(reader: R) -> Result { - let mut buf = BufReader::new(reader); - let metadata = Self::parse_metadata(&mut buf)?; - Ok(Self { buf, metadata }) - } - - // Layout of Parquet file - // +---------------------------+---+-----+ - // | Rest of file | B | A | - // +---------------------------+---+-----+ - // where A: parquet footer, B: parquet metadata. - // - fn parse_metadata(buf: &mut BufReader) -> Result { - let file_size = buf.get_ref().len(); - if file_size < (FOOTER_SIZE as u64) { - return Err(general_err!( - "Invalid Parquet file. Size is smaller than footer" - )); - } - let mut footer_buffer: [u8; FOOTER_SIZE] = [0; FOOTER_SIZE]; - buf.seek(SeekFrom::End(-(FOOTER_SIZE as i64)))?; - buf.read_exact(&mut footer_buffer)?; - if footer_buffer[4..] != PARQUET_MAGIC { - return Err(general_err!("Invalid Parquet file. Corrupt footer")); - } - let metadata_len = LittleEndian::read_i32(&footer_buffer[0..4]) as i64; - if metadata_len < 0 { - return Err(general_err!( - "Invalid Parquet file. Metadata length is less than zero ({})", - metadata_len - )); - } - let metadata_start: i64 = file_size as i64 - FOOTER_SIZE as i64 - metadata_len; - if metadata_start < 0 { - return Err(general_err!( - "Invalid Parquet file. Metadata start is less than zero ({})", - metadata_start - )); - } - buf.seek(SeekFrom::Start(metadata_start as u64))?; - let metadata_buf = buf.take(metadata_len as u64).into_inner(); - - // TODO: row group filtering - let mut prot = TCompactInputProtocol::new(metadata_buf); - let t_file_metadata: TFileMetaData = - TFileMetaData::read_from_in_protocol(&mut prot).map_err(|e| { - ParquetError::General(format!("Could not parse metadata: {}", e)) - })?; - let schema = types::from_thrift(&t_file_metadata.schema)?; - let schema_descr = Rc::new(SchemaDescriptor::new(schema.clone())); - let mut row_groups = Vec::new(); - for rg in t_file_metadata.row_groups { - row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?); - } - let column_orders = - Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr); - - let file_metadata = FileMetaData::new( - t_file_metadata.version, - t_file_metadata.num_rows, - t_file_metadata.created_by, - t_file_metadata.key_value_metadata, - schema, - schema_descr, - column_orders, - ); - Ok(ParquetMetaData::new(file_metadata, row_groups)) - } - - /// Parses column orders from Thrift definition. - /// If no column orders are defined, returns `None`. - fn parse_column_orders( - t_column_orders: Option>, - schema_descr: &SchemaDescriptor, - ) -> Option> { - match t_column_orders { - Some(orders) => { - // Should always be the case - assert_eq!( - orders.len(), - schema_descr.num_columns(), - "Column order length mismatch" - ); - let mut res = Vec::new(); - for (i, column) in schema_descr.columns().iter().enumerate() { - match orders[i] { - TColumnOrder::TYPEORDER(_) => { - let sort_order = ColumnOrder::get_sort_order( - column.logical_type(), - column.physical_type(), - ); - res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order)); - } - } - } - Some(res) - } - None => None, - } - } -} - -impl FileReader for SerializedFileReader { - fn metadata(&self) -> &ParquetMetaData { - &self.metadata - } - - fn num_row_groups(&self) -> usize { - self.metadata.num_row_groups() - } - - fn get_row_group(&self, i: usize) -> Result> { - let row_group_metadata = self.metadata.row_group(i); - // Row groups should be processed sequentially. - let f = self.buf.get_ref().try_clone()?; - Ok(Box::new(SerializedRowGroupReader::new( - f, - row_group_metadata, - ))) - } - - fn get_row_iter(&self, projection: Option) -> Result { - RowIter::from_file(projection, self) - } -} - -impl TryFrom for SerializedFileReader { - type Error = ParquetError; - - fn try_from(file: File) -> Result { - Self::new(file) - } -} - -impl<'a> TryFrom<&'a Path> for SerializedFileReader { - type Error = ParquetError; - - fn try_from(path: &Path) -> Result { - let file = File::open(path)?; - Self::try_from(file) - } -} - -impl TryFrom for SerializedFileReader { - type Error = ParquetError; - - fn try_from(path: String) -> Result { - Self::try_from(Path::new(&path)) - } -} - -impl<'a> TryFrom<&'a str> for SerializedFileReader { - type Error = ParquetError; - - fn try_from(path: &str) -> Result { - Self::try_from(Path::new(&path)) - } -} - -/// Conversion into a [`RowIter`](crate::record::reader::RowIter) -/// using the full file schema over all row groups. -impl IntoIterator for SerializedFileReader { - type Item = Row; - type IntoIter = RowIter<'static>; - - fn into_iter(self) -> Self::IntoIter { - RowIter::from_file_into(Box::new(self)) - } -} - -/// A serialized implementation for Parquet [`RowGroupReader`]. -pub struct SerializedRowGroupReader<'a, R: ParquetReader> { - buf: BufReader, - metadata: &'a RowGroupMetaData, -} - -impl<'a, R: 'static + ParquetReader> SerializedRowGroupReader<'a, R> { - /// Creates new row group reader from a file and row group metadata. - fn new(file: R, metadata: &'a RowGroupMetaData) -> Self { - let buf = BufReader::new(file); - Self { buf, metadata } - } -} - -impl<'a, R: 'static + ParquetReader> RowGroupReader for SerializedRowGroupReader<'a, R> { - fn metadata(&self) -> &RowGroupMetaData { - &self.metadata - } - - fn num_columns(&self) -> usize { - self.metadata.num_columns() - } - - // TODO: fix PARQUET-816 - fn get_column_page_reader(&self, i: usize) -> Result> { - let col = self.metadata.column(i); - let col_start = if col.has_dictionary_page() { - col.dictionary_page_offset().unwrap() - } else { - col.data_page_offset() - }; - let col_length = col.compressed_size(); - let file_chunk = - FileSource::new(self.buf.get_ref(), col_start as u64, col_length as usize); - let page_reader = SerializedPageReader::new( - file_chunk, - col.num_values(), - col.compression(), - col.column_descr().physical_type(), - )?; - Ok(Box::new(page_reader)) - } - fn get_column_reader(&self, i: usize) -> Result { - let schema_descr = self.metadata.schema_descr(); + let schema_descr = self.metadata().schema_descr(); let col_descr = schema_descr.column(i); let col_page_reader = self.get_column_page_reader(i)?; let col_reader = match col_descr.physical_type() { @@ -415,174 +126,15 @@ impl<'a, R: 'static + ParquetReader> RowGroupReader for SerializedRowGroupReader Ok(col_reader) } - fn get_row_iter(&self, projection: Option) -> Result { - RowIter::from_row_group(projection, self) - } -} - -/// A serialized implementation for Parquet [`PageReader`]. -pub struct SerializedPageReader { - // The file source buffer which references exactly the bytes for the column trunk - // to be read by this page reader. - buf: T, - - // The compression codec for this column chunk. Only set for non-PLAIN codec. - decompressor: Option>, - - // The number of values we have seen so far. - seen_num_values: i64, - - // The number of total values in this column chunk. - total_num_values: i64, - - // Column chunk type. - physical_type: Type, -} - -impl SerializedPageReader { - /// Creates a new serialized page reader from file source. - pub fn new( - buf: T, - total_num_values: i64, - compression: Compression, - physical_type: Type, - ) -> Result { - let decompressor = create_codec(compression)?; - let result = Self { - buf, - total_num_values, - seen_num_values: 0, - decompressor, - physical_type, - }; - Ok(result) - } - - /// Reads Page header from Thrift. - fn read_page_header(&mut self) -> Result { - let mut prot = TCompactInputProtocol::new(&mut self.buf); - let page_header = PageHeader::read_from_in_protocol(&mut prot)?; - Ok(page_header) - } + /// Get iterator of `Row`s from this row group. + /// + /// Projected schema can be a subset of or equal to the file schema, when it is None, + /// full file schema is assumed. + fn get_row_iter(&self, projection: Option) -> Result; } -impl PageReader for SerializedPageReader { - fn get_next_page(&mut self) -> Result> { - while self.seen_num_values < self.total_num_values { - let page_header = self.read_page_header()?; - - // When processing data page v2, depending on enabled compression for the - // page, we should account for uncompressed data ('offset') of - // repetition and definition levels. - // - // We always use 0 offset for other pages other than v2, `true` flag means - // that compression will be applied if decompressor is defined - let mut offset: usize = 0; - let mut can_decompress = true; - - if let Some(ref header_v2) = page_header.data_page_header_v2 { - offset = (header_v2.definition_levels_byte_length - + header_v2.repetition_levels_byte_length) - as usize; - // When is_compressed flag is missing the page is considered compressed - can_decompress = header_v2.is_compressed.unwrap_or(true); - } - - let compressed_len = page_header.compressed_page_size as usize - offset; - let uncompressed_len = page_header.uncompressed_page_size as usize - offset; - // We still need to read all bytes from buffered stream - let mut buffer = vec![0; offset + compressed_len]; - self.buf.read_exact(&mut buffer)?; - - // TODO: page header could be huge because of statistics. We should set a - // maximum page header size and abort if that is exceeded. - if let Some(decompressor) = self.decompressor.as_mut() { - if can_decompress { - let mut decompressed_buffer = Vec::with_capacity(uncompressed_len); - let decompressed_size = decompressor - .decompress(&buffer[offset..], &mut decompressed_buffer)?; - if decompressed_size != uncompressed_len { - return Err(general_err!( - "Actual decompressed size doesn't match the expected one ({} vs {})", - decompressed_size, - uncompressed_len - )); - } - if offset == 0 { - buffer = decompressed_buffer; - } else { - // Prepend saved offsets to the buffer - buffer.truncate(offset); - buffer.append(&mut decompressed_buffer); - } - } - } - - let result = match page_header.type_ { - PageType::DictionaryPage => { - assert!(page_header.dictionary_page_header.is_some()); - let dict_header = - page_header.dictionary_page_header.as_ref().unwrap(); - let is_sorted = dict_header.is_sorted.unwrap_or(false); - Page::DictionaryPage { - buf: ByteBufferPtr::new(buffer), - num_values: dict_header.num_values as u32, - encoding: Encoding::from(dict_header.encoding), - is_sorted, - } - } - PageType::DataPage => { - assert!(page_header.data_page_header.is_some()); - let header = page_header.data_page_header.unwrap(); - self.seen_num_values += header.num_values as i64; - Page::DataPage { - buf: ByteBufferPtr::new(buffer), - num_values: header.num_values as u32, - encoding: Encoding::from(header.encoding), - def_level_encoding: Encoding::from( - header.definition_level_encoding, - ), - rep_level_encoding: Encoding::from( - header.repetition_level_encoding, - ), - statistics: statistics::from_thrift( - self.physical_type, - header.statistics, - ), - } - } - PageType::DataPageV2 => { - assert!(page_header.data_page_header_v2.is_some()); - let header = page_header.data_page_header_v2.unwrap(); - let is_compressed = header.is_compressed.unwrap_or(true); - self.seen_num_values += header.num_values as i64; - Page::DataPageV2 { - buf: ByteBufferPtr::new(buffer), - num_values: header.num_values as u32, - encoding: Encoding::from(header.encoding), - num_nulls: header.num_nulls as u32, - num_rows: header.num_rows as u32, - def_levels_byte_len: header.definition_levels_byte_length as u32, - rep_levels_byte_len: header.repetition_levels_byte_length as u32, - is_compressed, - statistics: statistics::from_thrift( - self.physical_type, - header.statistics, - ), - } - } - _ => { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - continue; - } - }; - return Ok(Some(result)); - } - - // We are at the end of this column chunk and no more page left. Return None. - Ok(None) - } -} +// ---------------------------------------------------------------------- +// Iterator /// Implementation of page iterator for parquet file. pub struct FilePageIterator { @@ -652,477 +204,3 @@ impl PageIterator for FilePageIterator { self.schema().map(|s| s.column(self.column_index)) } } - -#[cfg(test)] -mod tests { - use super::*; - - use parquet_format::TypeDefinedOrder; - - use crate::basic::SortOrder; - use crate::record::RowAccessor; - use crate::schema::parser::parse_message_type; - use crate::util::test_common::{get_temp_file, get_test_file, get_test_path}; - - #[test] - fn test_file_reader_metadata_size_smaller_than_footer() { - let test_file = get_temp_file("corrupt-1.parquet", &[]); - let reader_result = SerializedFileReader::new(test_file); - assert!(reader_result.is_err()); - assert_eq!( - reader_result.err().unwrap(), - general_err!("Invalid Parquet file. Size is smaller than footer") - ); - } - - #[test] - fn test_cursor_and_file_has_the_same_behaviour() { - let mut buf: Vec = Vec::new(); - get_test_file("alltypes_plain.parquet") - .read_to_end(&mut buf) - .unwrap(); - let cursor = Cursor::new(buf); - let read_from_cursor = SerializedFileReader::new(cursor).unwrap(); - - let test_file = get_test_file("alltypes_plain.parquet"); - let read_from_file = SerializedFileReader::new(test_file).unwrap(); - - let file_iter = read_from_file.get_row_iter(None).unwrap(); - let cursor_iter = read_from_cursor.get_row_iter(None).unwrap(); - - assert!(file_iter.eq(cursor_iter)); - } - - #[test] - fn test_file_reader_metadata_corrupt_footer() { - let test_file = get_temp_file("corrupt-2.parquet", &[1, 2, 3, 4, 5, 6, 7, 8]); - let reader_result = SerializedFileReader::new(test_file); - assert!(reader_result.is_err()); - assert_eq!( - reader_result.err().unwrap(), - general_err!("Invalid Parquet file. Corrupt footer") - ); - } - - #[test] - fn test_file_reader_metadata_invalid_length() { - let test_file = - get_temp_file("corrupt-3.parquet", &[0, 0, 0, 255, b'P', b'A', b'R', b'1']); - let reader_result = SerializedFileReader::new(test_file); - assert!(reader_result.is_err()); - assert_eq!( - reader_result.err().unwrap(), - general_err!( - "Invalid Parquet file. Metadata length is less than zero (-16777216)" - ) - ); - } - - #[test] - fn test_file_reader_metadata_invalid_start() { - let test_file = - get_temp_file("corrupt-4.parquet", &[255, 0, 0, 0, b'P', b'A', b'R', b'1']); - let reader_result = SerializedFileReader::new(test_file); - assert!(reader_result.is_err()); - assert_eq!( - reader_result.err().unwrap(), - general_err!("Invalid Parquet file. Metadata start is less than zero (-255)") - ); - } - - #[test] - fn test_file_reader_column_orders_parse() { - // Define simple schema, we do not need to provide logical types. - let mut fields = vec![ - Rc::new( - SchemaType::primitive_type_builder("col1", Type::INT32) - .build() - .unwrap(), - ), - Rc::new( - SchemaType::primitive_type_builder("col2", Type::FLOAT) - .build() - .unwrap(), - ), - ]; - let schema = SchemaType::group_type_builder("schema") - .with_fields(&mut fields) - .build() - .unwrap(); - let schema_descr = SchemaDescriptor::new(Rc::new(schema)); - - let t_column_orders = Some(vec![ - TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), - TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), - ]); - - assert_eq!( - SerializedFileReader::::parse_column_orders( - t_column_orders, - &schema_descr - ), - Some(vec![ - ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED), - ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED) - ]) - ); - - // Test when no column orders are defined. - assert_eq!( - SerializedFileReader::::parse_column_orders(None, &schema_descr), - None - ); - } - - #[test] - #[should_panic(expected = "Column order length mismatch")] - fn test_file_reader_column_orders_len_mismatch() { - let schema = SchemaType::group_type_builder("schema").build().unwrap(); - let schema_descr = SchemaDescriptor::new(Rc::new(schema)); - - let t_column_orders = - Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]); - - SerializedFileReader::::parse_column_orders(t_column_orders, &schema_descr); - } - - #[test] - fn test_file_reader_try_from() { - // Valid file path - let test_file = get_test_file("alltypes_plain.parquet"); - let test_path_buf = get_test_path("alltypes_plain.parquet"); - let test_path = test_path_buf.as_path(); - let test_path_str = test_path.to_str().unwrap(); - - let reader = SerializedFileReader::try_from(test_file); - assert!(reader.is_ok()); - - let reader = SerializedFileReader::try_from(test_path); - assert!(reader.is_ok()); - - let reader = SerializedFileReader::try_from(test_path_str); - assert!(reader.is_ok()); - - let reader = SerializedFileReader::try_from(test_path_str.to_string()); - assert!(reader.is_ok()); - - // Invalid file path - let test_path = Path::new("invalid.parquet"); - let test_path_str = test_path.to_str().unwrap(); - - let reader = SerializedFileReader::try_from(test_path); - assert!(reader.is_err()); - - let reader = SerializedFileReader::try_from(test_path_str); - assert!(reader.is_err()); - - let reader = SerializedFileReader::try_from(test_path_str.to_string()); - assert!(reader.is_err()); - } - - #[test] - fn test_file_reader_into_iter() -> Result<()> { - let path = get_test_path("alltypes_plain.parquet"); - let vec = vec![path.clone(), path] - .iter() - .map(|p| SerializedFileReader::try_from(p.as_path()).unwrap()) - .flat_map(|r| r.into_iter()) - .flat_map(|r| r.get_int(0)) - .collect::>(); - - // rows in the parquet file are not sorted by "id" - // each file contains [id:4, id:5, id:6, id:7, id:2, id:3, id:0, id:1] - assert_eq!(vec, vec![4, 5, 6, 7, 2, 3, 0, 1, 4, 5, 6, 7, 2, 3, 0, 1]); - - Ok(()) - } - - #[test] - fn test_file_reader_into_iter_project() -> Result<()> { - let path = get_test_path("alltypes_plain.parquet"); - let result = vec![path] - .iter() - .map(|p| SerializedFileReader::try_from(p.as_path()).unwrap()) - .flat_map(|r| { - let schema = "message schema { OPTIONAL INT32 id; }"; - let proj = parse_message_type(&schema).ok(); - - r.into_iter().project(proj).unwrap() - }) - .map(|r| format!("{}", r)) - .collect::>() - .join(","); - - assert_eq!( - result, - "{id: 4},{id: 5},{id: 6},{id: 7},{id: 2},{id: 3},{id: 0},{id: 1}" - ); - - Ok(()) - } - - #[test] - fn test_reuse_file_chunk() { - // This test covers the case of maintaining the correct start position in a file - // stream for each column reader after initializing and moving to the next one - // (without necessarily reading the entire column). - let test_file = get_test_file("alltypes_plain.parquet"); - let reader = SerializedFileReader::new(test_file).unwrap(); - let row_group = reader.get_row_group(0).unwrap(); - - let mut page_readers = Vec::new(); - for i in 0..row_group.num_columns() { - page_readers.push(row_group.get_column_page_reader(i).unwrap()); - } - - // Now buffer each col reader, we do not expect any failures like: - // General("underlying Thrift error: end of file") - for mut page_reader in page_readers { - assert!(page_reader.get_next_page().is_ok()); - } - } - - #[test] - fn test_file_reader() { - let test_file = get_test_file("alltypes_plain.parquet"); - let reader_result = SerializedFileReader::new(test_file); - assert!(reader_result.is_ok()); - let reader = reader_result.unwrap(); - - // Test contents in Parquet metadata - let metadata = reader.metadata(); - assert_eq!(metadata.num_row_groups(), 1); - - // Test contents in file metadata - let file_metadata = metadata.file_metadata(); - assert!(file_metadata.created_by().is_some()); - assert_eq!( - file_metadata.created_by().as_ref().unwrap(), - "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)" - ); - assert!(file_metadata.key_value_metadata().is_none()); - assert_eq!(file_metadata.num_rows(), 8); - assert_eq!(file_metadata.version(), 1); - assert_eq!(file_metadata.column_orders(), None); - - // Test contents in row group metadata - let row_group_metadata = metadata.row_group(0); - assert_eq!(row_group_metadata.num_columns(), 11); - assert_eq!(row_group_metadata.num_rows(), 8); - assert_eq!(row_group_metadata.total_byte_size(), 671); - // Check each column order - for i in 0..row_group_metadata.num_columns() { - assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED); - } - - // Test row group reader - let row_group_reader_result = reader.get_row_group(0); - assert!(row_group_reader_result.is_ok()); - let row_group_reader: Box = row_group_reader_result.unwrap(); - assert_eq!( - row_group_reader.num_columns(), - row_group_metadata.num_columns() - ); - assert_eq!( - row_group_reader.metadata().total_byte_size(), - row_group_metadata.total_byte_size() - ); - - // Test page readers - // TODO: test for every column - let page_reader_0_result = row_group_reader.get_column_page_reader(0); - assert!(page_reader_0_result.is_ok()); - let mut page_reader_0: Box = page_reader_0_result.unwrap(); - let mut page_count = 0; - while let Ok(Some(page)) = page_reader_0.get_next_page() { - let is_expected_page = match page { - Page::DictionaryPage { - buf, - num_values, - encoding, - is_sorted, - } => { - assert_eq!(buf.len(), 32); - assert_eq!(num_values, 8); - assert_eq!(encoding, Encoding::PLAIN_DICTIONARY); - assert_eq!(is_sorted, false); - true - } - Page::DataPage { - buf, - num_values, - encoding, - def_level_encoding, - rep_level_encoding, - statistics, - } => { - assert_eq!(buf.len(), 11); - assert_eq!(num_values, 8); - assert_eq!(encoding, Encoding::PLAIN_DICTIONARY); - assert_eq!(def_level_encoding, Encoding::RLE); - assert_eq!(rep_level_encoding, Encoding::BIT_PACKED); - assert!(statistics.is_none()); - true - } - _ => false, - }; - assert!(is_expected_page); - page_count += 1; - } - assert_eq!(page_count, 2); - } - - #[test] - fn test_file_reader_datapage_v2() { - let test_file = get_test_file("datapage_v2.snappy.parquet"); - let reader_result = SerializedFileReader::new(test_file); - assert!(reader_result.is_ok()); - let reader = reader_result.unwrap(); - - // Test contents in Parquet metadata - let metadata = reader.metadata(); - assert_eq!(metadata.num_row_groups(), 1); - - // Test contents in file metadata - let file_metadata = metadata.file_metadata(); - assert!(file_metadata.created_by().is_some()); - assert_eq!( - file_metadata.created_by().as_ref().unwrap(), - "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)" - ); - assert!(file_metadata.key_value_metadata().is_some()); - assert_eq!( - file_metadata.key_value_metadata().to_owned().unwrap().len(), - 1 - ); - - assert_eq!(file_metadata.num_rows(), 5); - assert_eq!(file_metadata.version(), 1); - assert_eq!(file_metadata.column_orders(), None); - - let row_group_metadata = metadata.row_group(0); - - // Check each column order - for i in 0..row_group_metadata.num_columns() { - assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED); - } - - // Test row group reader - let row_group_reader_result = reader.get_row_group(0); - assert!(row_group_reader_result.is_ok()); - let row_group_reader: Box = row_group_reader_result.unwrap(); - assert_eq!( - row_group_reader.num_columns(), - row_group_metadata.num_columns() - ); - assert_eq!( - row_group_reader.metadata().total_byte_size(), - row_group_metadata.total_byte_size() - ); - - // Test page readers - // TODO: test for every column - let page_reader_0_result = row_group_reader.get_column_page_reader(0); - assert!(page_reader_0_result.is_ok()); - let mut page_reader_0: Box = page_reader_0_result.unwrap(); - let mut page_count = 0; - while let Ok(Some(page)) = page_reader_0.get_next_page() { - let is_expected_page = match page { - Page::DictionaryPage { - buf, - num_values, - encoding, - is_sorted, - } => { - assert_eq!(buf.len(), 7); - assert_eq!(num_values, 1); - assert_eq!(encoding, Encoding::PLAIN); - assert_eq!(is_sorted, false); - true - } - Page::DataPageV2 { - buf, - num_values, - encoding, - num_nulls, - num_rows, - def_levels_byte_len, - rep_levels_byte_len, - is_compressed, - statistics, - } => { - assert_eq!(buf.len(), 4); - assert_eq!(num_values, 5); - assert_eq!(encoding, Encoding::RLE_DICTIONARY); - assert_eq!(num_nulls, 1); - assert_eq!(num_rows, 5); - assert_eq!(def_levels_byte_len, 2); - assert_eq!(rep_levels_byte_len, 0); - assert_eq!(is_compressed, true); - assert!(statistics.is_some()); - true - } - _ => false, - }; - assert!(is_expected_page); - page_count += 1; - } - assert_eq!(page_count, 2); - } - - #[test] - fn test_page_iterator() { - let file = get_test_file("alltypes_plain.parquet"); - let file_reader = Rc::new(SerializedFileReader::new(file).unwrap()); - - let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap(); - - // read first page - let page = page_iterator.next(); - assert!(page.is_some()); - assert!(page.unwrap().is_ok()); - - // reach end of file - let page = page_iterator.next(); - assert!(page.is_none()); - - let row_group_indices = Box::new(0..1); - let mut page_iterator = - FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap(); - - // read first page - let page = page_iterator.next(); - assert!(page.is_some()); - assert!(page.unwrap().is_ok()); - - // reach end of file - let page = page_iterator.next(); - assert!(page.is_none()); - } - - #[test] - fn test_file_reader_key_value_metadata() { - let file = get_test_file("binary.parquet"); - let file_reader = Rc::new(SerializedFileReader::new(file).unwrap()); - - let metadata = file_reader - .metadata - .file_metadata() - .key_value_metadata() - .as_ref() - .unwrap(); - - assert_eq!(metadata.len(), 3); - - assert_eq!(metadata.get(0).unwrap().key, "parquet.proto.descriptor"); - - assert_eq!(metadata.get(1).unwrap().key, "writer.model.name"); - assert_eq!(metadata.get(1).unwrap().value, Some("protobuf".to_owned())); - - assert_eq!(metadata.get(2).unwrap().key, "parquet.proto.class"); - assert_eq!( - metadata.get(2).unwrap().value, - Some("foo.baz.Foobaz$Event".to_owned()) - ); - } -} diff --git a/rust/parquet/src/file/serialized_reader.rs b/rust/parquet/src/file/serialized_reader.rs new file mode 100644 index 00000000000..82997005e76 --- /dev/null +++ b/rust/parquet/src/file/serialized_reader.rs @@ -0,0 +1,747 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader +//! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM) + +use std::{convert::TryFrom, fs::File, io::Read, path::Path, rc::Rc}; + +use parquet_format::{PageHeader, PageType}; +use thrift::protocol::TCompactInputProtocol; + +use crate::basic::{Compression, Encoding, Type}; +use crate::column::page::{Page, PageReader}; +use crate::compression::{create_codec, Codec}; +use crate::errors::{ParquetError, Result}; +use crate::file::{footer, metadata::*, reader::*, statistics}; +use crate::record::reader::RowIter; +use crate::record::Row; +use crate::schema::types::Type as SchemaType; +use crate::util::{ + cursor::SliceableCursor, + io::{FileSource, TryClone}, + memory::ByteBufferPtr, +}; + +// ---------------------------------------------------------------------- +// Implementations of traits facilitating the creation of a new reader + +impl Length for File { + fn len(&self) -> u64 { + self.metadata().map(|m| m.len()).unwrap_or(0u64) + } +} + +impl TryClone for File { + fn try_clone(&self) -> std::io::Result { + self.try_clone() + } +} + +impl ChunkReader for File { + type T = FileSource; + + fn get_read(&self, start: u64, length: usize) -> Result { + Ok(FileSource::new(self, start, length)) + } +} + +impl Length for SliceableCursor { + fn len(&self) -> u64 { + SliceableCursor::len(self) + } +} + +impl ChunkReader for SliceableCursor { + type T = SliceableCursor; + + fn get_read(&self, start: u64, length: usize) -> Result { + self.slice(start, length).map_err(|e| e.into()) + } +} + +impl TryFrom for SerializedFileReader { + type Error = ParquetError; + + fn try_from(file: File) -> Result { + Self::new(file) + } +} + +impl<'a> TryFrom<&'a Path> for SerializedFileReader { + type Error = ParquetError; + + fn try_from(path: &Path) -> Result { + let file = File::open(path)?; + Self::try_from(file) + } +} + +impl TryFrom for SerializedFileReader { + type Error = ParquetError; + + fn try_from(path: String) -> Result { + Self::try_from(Path::new(&path)) + } +} + +impl<'a> TryFrom<&'a str> for SerializedFileReader { + type Error = ParquetError; + + fn try_from(path: &str) -> Result { + Self::try_from(Path::new(&path)) + } +} + +/// Conversion into a [`RowIter`](crate::record::reader::RowIter) +/// using the full file schema over all row groups. +impl IntoIterator for SerializedFileReader { + type Item = Row; + type IntoIter = RowIter<'static>; + + fn into_iter(self) -> Self::IntoIter { + RowIter::from_file_into(Box::new(self)) + } +} + +// ---------------------------------------------------------------------- +// Implementations of file & row group readers + +/// A serialized implementation for Parquet [`FileReader`]. +pub struct SerializedFileReader { + chunk_reader: Rc, + metadata: ParquetMetaData, +} + +impl SerializedFileReader { + /// Creates file reader from a Parquet file. + /// Returns error if Parquet file does not exist or is corrupt. + pub fn new(chunk_reader: R) -> Result { + let metadata = footer::parse_metadata(&chunk_reader)?; + Ok(Self { + chunk_reader: Rc::new(chunk_reader), + metadata, + }) + } +} + +impl FileReader for SerializedFileReader { + fn metadata(&self) -> &ParquetMetaData { + &self.metadata + } + + fn num_row_groups(&self) -> usize { + self.metadata.num_row_groups() + } + + fn get_row_group(&self, i: usize) -> Result> { + let row_group_metadata = self.metadata.row_group(i); + // Row groups should be processed sequentially. + let f = Rc::clone(&self.chunk_reader); + Ok(Box::new(SerializedRowGroupReader::new( + f, + row_group_metadata, + ))) + } + + fn get_row_iter(&self, projection: Option) -> Result { + RowIter::from_file(projection, self) + } +} + +/// A serialized implementation for Parquet [`RowGroupReader`]. +pub struct SerializedRowGroupReader<'a, R: ChunkReader> { + chunk_reader: Rc, + metadata: &'a RowGroupMetaData, +} + +impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> { + /// Creates new row group reader from a file and row group metadata. + fn new(chunk_reader: Rc, metadata: &'a RowGroupMetaData) -> Self { + Self { + chunk_reader, + metadata, + } + } +} + +impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'a, R> { + fn metadata(&self) -> &RowGroupMetaData { + &self.metadata + } + + fn num_columns(&self) -> usize { + self.metadata.num_columns() + } + + // TODO: fix PARQUET-816 + fn get_column_page_reader(&self, i: usize) -> Result> { + let col = self.metadata.column(i); + let col_start = if col.has_dictionary_page() { + col.dictionary_page_offset().unwrap() + } else { + col.data_page_offset() + }; + let col_length = col.compressed_size(); + let file_chunk = self + .chunk_reader + .get_read(col_start as u64, col_length as usize)?; + let page_reader = SerializedPageReader::new( + file_chunk, + col.num_values(), + col.compression(), + col.column_descr().physical_type(), + )?; + Ok(Box::new(page_reader)) + } + + fn get_row_iter(&self, projection: Option) -> Result { + RowIter::from_row_group(projection, self) + } +} + +/// A serialized implementation for Parquet [`PageReader`]. +pub struct SerializedPageReader { + // The file source buffer which references exactly the bytes for the column trunk + // to be read by this page reader. + buf: T, + + // The compression codec for this column chunk. Only set for non-PLAIN codec. + decompressor: Option>, + + // The number of values we have seen so far. + seen_num_values: i64, + + // The number of total values in this column chunk. + total_num_values: i64, + + // Column chunk type. + physical_type: Type, +} + +impl SerializedPageReader { + /// Creates a new serialized page reader from file source. + pub fn new( + buf: T, + total_num_values: i64, + compression: Compression, + physical_type: Type, + ) -> Result { + let decompressor = create_codec(compression)?; + let result = Self { + buf, + total_num_values, + seen_num_values: 0, + decompressor, + physical_type, + }; + Ok(result) + } + + /// Reads Page header from Thrift. + fn read_page_header(&mut self) -> Result { + let mut prot = TCompactInputProtocol::new(&mut self.buf); + let page_header = PageHeader::read_from_in_protocol(&mut prot)?; + Ok(page_header) + } +} + +impl PageReader for SerializedPageReader { + fn get_next_page(&mut self) -> Result> { + while self.seen_num_values < self.total_num_values { + let page_header = self.read_page_header()?; + + // When processing data page v2, depending on enabled compression for the + // page, we should account for uncompressed data ('offset') of + // repetition and definition levels. + // + // We always use 0 offset for other pages other than v2, `true` flag means + // that compression will be applied if decompressor is defined + let mut offset: usize = 0; + let mut can_decompress = true; + + if let Some(ref header_v2) = page_header.data_page_header_v2 { + offset = (header_v2.definition_levels_byte_length + + header_v2.repetition_levels_byte_length) + as usize; + // When is_compressed flag is missing the page is considered compressed + can_decompress = header_v2.is_compressed.unwrap_or(true); + } + + let compressed_len = page_header.compressed_page_size as usize - offset; + let uncompressed_len = page_header.uncompressed_page_size as usize - offset; + // We still need to read all bytes from buffered stream + let mut buffer = vec![0; offset + compressed_len]; + self.buf.read_exact(&mut buffer)?; + + // TODO: page header could be huge because of statistics. We should set a + // maximum page header size and abort if that is exceeded. + if let Some(decompressor) = self.decompressor.as_mut() { + if can_decompress { + let mut decompressed_buffer = Vec::with_capacity(uncompressed_len); + let decompressed_size = decompressor + .decompress(&buffer[offset..], &mut decompressed_buffer)?; + if decompressed_size != uncompressed_len { + return Err(general_err!( + "Actual decompressed size doesn't match the expected one ({} vs {})", + decompressed_size, + uncompressed_len + )); + } + if offset == 0 { + buffer = decompressed_buffer; + } else { + // Prepend saved offsets to the buffer + buffer.truncate(offset); + buffer.append(&mut decompressed_buffer); + } + } + } + + let result = match page_header.type_ { + PageType::DictionaryPage => { + assert!(page_header.dictionary_page_header.is_some()); + let dict_header = + page_header.dictionary_page_header.as_ref().unwrap(); + let is_sorted = dict_header.is_sorted.unwrap_or(false); + Page::DictionaryPage { + buf: ByteBufferPtr::new(buffer), + num_values: dict_header.num_values as u32, + encoding: Encoding::from(dict_header.encoding), + is_sorted, + } + } + PageType::DataPage => { + assert!(page_header.data_page_header.is_some()); + let header = page_header.data_page_header.unwrap(); + self.seen_num_values += header.num_values as i64; + Page::DataPage { + buf: ByteBufferPtr::new(buffer), + num_values: header.num_values as u32, + encoding: Encoding::from(header.encoding), + def_level_encoding: Encoding::from( + header.definition_level_encoding, + ), + rep_level_encoding: Encoding::from( + header.repetition_level_encoding, + ), + statistics: statistics::from_thrift( + self.physical_type, + header.statistics, + ), + } + } + PageType::DataPageV2 => { + assert!(page_header.data_page_header_v2.is_some()); + let header = page_header.data_page_header_v2.unwrap(); + let is_compressed = header.is_compressed.unwrap_or(true); + self.seen_num_values += header.num_values as i64; + Page::DataPageV2 { + buf: ByteBufferPtr::new(buffer), + num_values: header.num_values as u32, + encoding: Encoding::from(header.encoding), + num_nulls: header.num_nulls as u32, + num_rows: header.num_rows as u32, + def_levels_byte_len: header.definition_levels_byte_length as u32, + rep_levels_byte_len: header.repetition_levels_byte_length as u32, + is_compressed, + statistics: statistics::from_thrift( + self.physical_type, + header.statistics, + ), + } + } + _ => { + // For unknown page type (e.g., INDEX_PAGE), skip and read next. + continue; + } + }; + return Ok(Some(result)); + } + + // We are at the end of this column chunk and no more page left. Return None. + Ok(None) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::basic::ColumnOrder; + use crate::record::RowAccessor; + use crate::schema::parser::parse_message_type; + use crate::util::test_common::{get_test_file, get_test_path}; + use std::rc::Rc; + + #[test] + fn test_cursor_and_file_has_the_same_behaviour() { + let mut buf: Vec = Vec::new(); + get_test_file("alltypes_plain.parquet") + .read_to_end(&mut buf) + .unwrap(); + let cursor = SliceableCursor::new(buf); + let read_from_cursor = SerializedFileReader::new(cursor).unwrap(); + + let test_file = get_test_file("alltypes_plain.parquet"); + let read_from_file = SerializedFileReader::new(test_file).unwrap(); + + let file_iter = read_from_file.get_row_iter(None).unwrap(); + let cursor_iter = read_from_cursor.get_row_iter(None).unwrap(); + + assert!(file_iter.eq(cursor_iter)); + } + + #[test] + fn test_file_reader_try_from() { + // Valid file path + let test_file = get_test_file("alltypes_plain.parquet"); + let test_path_buf = get_test_path("alltypes_plain.parquet"); + let test_path = test_path_buf.as_path(); + let test_path_str = test_path.to_str().unwrap(); + + let reader = SerializedFileReader::try_from(test_file); + assert!(reader.is_ok()); + + let reader = SerializedFileReader::try_from(test_path); + assert!(reader.is_ok()); + + let reader = SerializedFileReader::try_from(test_path_str); + assert!(reader.is_ok()); + + let reader = SerializedFileReader::try_from(test_path_str.to_string()); + assert!(reader.is_ok()); + + // Invalid file path + let test_path = Path::new("invalid.parquet"); + let test_path_str = test_path.to_str().unwrap(); + + let reader = SerializedFileReader::try_from(test_path); + assert!(reader.is_err()); + + let reader = SerializedFileReader::try_from(test_path_str); + assert!(reader.is_err()); + + let reader = SerializedFileReader::try_from(test_path_str.to_string()); + assert!(reader.is_err()); + } + + #[test] + fn test_file_reader_into_iter() -> Result<()> { + let path = get_test_path("alltypes_plain.parquet"); + let vec = vec![path.clone(), path] + .iter() + .map(|p| SerializedFileReader::try_from(p.as_path()).unwrap()) + .flat_map(|r| r.into_iter()) + .flat_map(|r| r.get_int(0)) + .collect::>(); + + // rows in the parquet file are not sorted by "id" + // each file contains [id:4, id:5, id:6, id:7, id:2, id:3, id:0, id:1] + assert_eq!(vec, vec![4, 5, 6, 7, 2, 3, 0, 1, 4, 5, 6, 7, 2, 3, 0, 1]); + + Ok(()) + } + + #[test] + fn test_file_reader_into_iter_project() -> Result<()> { + let path = get_test_path("alltypes_plain.parquet"); + let result = vec![path] + .iter() + .map(|p| SerializedFileReader::try_from(p.as_path()).unwrap()) + .flat_map(|r| { + let schema = "message schema { OPTIONAL INT32 id; }"; + let proj = parse_message_type(&schema).ok(); + + r.into_iter().project(proj).unwrap() + }) + .map(|r| format!("{}", r)) + .collect::>() + .join(","); + + assert_eq!( + result, + "{id: 4},{id: 5},{id: 6},{id: 7},{id: 2},{id: 3},{id: 0},{id: 1}" + ); + + Ok(()) + } + + #[test] + fn test_reuse_file_chunk() { + // This test covers the case of maintaining the correct start position in a file + // stream for each column reader after initializing and moving to the next one + // (without necessarily reading the entire column). + let test_file = get_test_file("alltypes_plain.parquet"); + let reader = SerializedFileReader::new(test_file).unwrap(); + let row_group = reader.get_row_group(0).unwrap(); + + let mut page_readers = Vec::new(); + for i in 0..row_group.num_columns() { + page_readers.push(row_group.get_column_page_reader(i).unwrap()); + } + + // Now buffer each col reader, we do not expect any failures like: + // General("underlying Thrift error: end of file") + for mut page_reader in page_readers { + assert!(page_reader.get_next_page().is_ok()); + } + } + + #[test] + fn test_file_reader() { + let test_file = get_test_file("alltypes_plain.parquet"); + let reader_result = SerializedFileReader::new(test_file); + assert!(reader_result.is_ok()); + let reader = reader_result.unwrap(); + + // Test contents in Parquet metadata + let metadata = reader.metadata(); + assert_eq!(metadata.num_row_groups(), 1); + + // Test contents in file metadata + let file_metadata = metadata.file_metadata(); + assert!(file_metadata.created_by().is_some()); + assert_eq!( + file_metadata.created_by().as_ref().unwrap(), + "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)" + ); + assert!(file_metadata.key_value_metadata().is_none()); + assert_eq!(file_metadata.num_rows(), 8); + assert_eq!(file_metadata.version(), 1); + assert_eq!(file_metadata.column_orders(), None); + + // Test contents in row group metadata + let row_group_metadata = metadata.row_group(0); + assert_eq!(row_group_metadata.num_columns(), 11); + assert_eq!(row_group_metadata.num_rows(), 8); + assert_eq!(row_group_metadata.total_byte_size(), 671); + // Check each column order + for i in 0..row_group_metadata.num_columns() { + assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED); + } + + // Test row group reader + let row_group_reader_result = reader.get_row_group(0); + assert!(row_group_reader_result.is_ok()); + let row_group_reader: Box = row_group_reader_result.unwrap(); + assert_eq!( + row_group_reader.num_columns(), + row_group_metadata.num_columns() + ); + assert_eq!( + row_group_reader.metadata().total_byte_size(), + row_group_metadata.total_byte_size() + ); + + // Test page readers + // TODO: test for every column + let page_reader_0_result = row_group_reader.get_column_page_reader(0); + assert!(page_reader_0_result.is_ok()); + let mut page_reader_0: Box = page_reader_0_result.unwrap(); + let mut page_count = 0; + while let Ok(Some(page)) = page_reader_0.get_next_page() { + let is_expected_page = match page { + Page::DictionaryPage { + buf, + num_values, + encoding, + is_sorted, + } => { + assert_eq!(buf.len(), 32); + assert_eq!(num_values, 8); + assert_eq!(encoding, Encoding::PLAIN_DICTIONARY); + assert_eq!(is_sorted, false); + true + } + Page::DataPage { + buf, + num_values, + encoding, + def_level_encoding, + rep_level_encoding, + statistics, + } => { + assert_eq!(buf.len(), 11); + assert_eq!(num_values, 8); + assert_eq!(encoding, Encoding::PLAIN_DICTIONARY); + assert_eq!(def_level_encoding, Encoding::RLE); + assert_eq!(rep_level_encoding, Encoding::BIT_PACKED); + assert!(statistics.is_none()); + true + } + _ => false, + }; + assert!(is_expected_page); + page_count += 1; + } + assert_eq!(page_count, 2); + } + + #[test] + fn test_file_reader_datapage_v2() { + let test_file = get_test_file("datapage_v2.snappy.parquet"); + let reader_result = SerializedFileReader::new(test_file); + assert!(reader_result.is_ok()); + let reader = reader_result.unwrap(); + + // Test contents in Parquet metadata + let metadata = reader.metadata(); + assert_eq!(metadata.num_row_groups(), 1); + + // Test contents in file metadata + let file_metadata = metadata.file_metadata(); + assert!(file_metadata.created_by().is_some()); + assert_eq!( + file_metadata.created_by().as_ref().unwrap(), + "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)" + ); + assert!(file_metadata.key_value_metadata().is_some()); + assert_eq!( + file_metadata.key_value_metadata().to_owned().unwrap().len(), + 1 + ); + + assert_eq!(file_metadata.num_rows(), 5); + assert_eq!(file_metadata.version(), 1); + assert_eq!(file_metadata.column_orders(), None); + + let row_group_metadata = metadata.row_group(0); + + // Check each column order + for i in 0..row_group_metadata.num_columns() { + assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED); + } + + // Test row group reader + let row_group_reader_result = reader.get_row_group(0); + assert!(row_group_reader_result.is_ok()); + let row_group_reader: Box = row_group_reader_result.unwrap(); + assert_eq!( + row_group_reader.num_columns(), + row_group_metadata.num_columns() + ); + assert_eq!( + row_group_reader.metadata().total_byte_size(), + row_group_metadata.total_byte_size() + ); + + // Test page readers + // TODO: test for every column + let page_reader_0_result = row_group_reader.get_column_page_reader(0); + assert!(page_reader_0_result.is_ok()); + let mut page_reader_0: Box = page_reader_0_result.unwrap(); + let mut page_count = 0; + while let Ok(Some(page)) = page_reader_0.get_next_page() { + let is_expected_page = match page { + Page::DictionaryPage { + buf, + num_values, + encoding, + is_sorted, + } => { + assert_eq!(buf.len(), 7); + assert_eq!(num_values, 1); + assert_eq!(encoding, Encoding::PLAIN); + assert_eq!(is_sorted, false); + true + } + Page::DataPageV2 { + buf, + num_values, + encoding, + num_nulls, + num_rows, + def_levels_byte_len, + rep_levels_byte_len, + is_compressed, + statistics, + } => { + assert_eq!(buf.len(), 4); + assert_eq!(num_values, 5); + assert_eq!(encoding, Encoding::RLE_DICTIONARY); + assert_eq!(num_nulls, 1); + assert_eq!(num_rows, 5); + assert_eq!(def_levels_byte_len, 2); + assert_eq!(rep_levels_byte_len, 0); + assert_eq!(is_compressed, true); + assert!(statistics.is_some()); + true + } + _ => false, + }; + assert!(is_expected_page); + page_count += 1; + } + assert_eq!(page_count, 2); + } + + #[test] + fn test_page_iterator() { + let file = get_test_file("alltypes_plain.parquet"); + let file_reader = Rc::new(SerializedFileReader::new(file).unwrap()); + + let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap(); + + // read first page + let page = page_iterator.next(); + assert!(page.is_some()); + assert!(page.unwrap().is_ok()); + + // reach end of file + let page = page_iterator.next(); + assert!(page.is_none()); + + let row_group_indices = Box::new(0..1); + let mut page_iterator = + FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap(); + + // read first page + let page = page_iterator.next(); + assert!(page.is_some()); + assert!(page.unwrap().is_ok()); + + // reach end of file + let page = page_iterator.next(); + assert!(page.is_none()); + } + + #[test] + fn test_file_reader_key_value_metadata() { + let file = get_test_file("binary.parquet"); + let file_reader = Rc::new(SerializedFileReader::new(file).unwrap()); + + let metadata = file_reader + .metadata + .file_metadata() + .key_value_metadata() + .as_ref() + .unwrap(); + + assert_eq!(metadata.len(), 3); + + assert_eq!(metadata.get(0).unwrap().key, "parquet.proto.descriptor"); + + assert_eq!(metadata.get(1).unwrap().key, "writer.model.name"); + assert_eq!(metadata.get(1).unwrap().value, Some("protobuf".to_owned())); + + assert_eq!(metadata.get(2).unwrap().key, "parquet.proto.class"); + assert_eq!( + metadata.get(2).unwrap().value, + Some("foo.baz.Foobaz$Event".to_owned()) + ); + } +} diff --git a/rust/parquet/src/file/writer.rs b/rust/parquet/src/file/writer.rs index c65b88ebef7..3509a23ce1d 100644 --- a/rust/parquet/src/file/writer.rs +++ b/rust/parquet/src/file/writer.rs @@ -34,11 +34,11 @@ use crate::column::{ }; use crate::errors::{ParquetError, Result}; use crate::file::{ - metadata::*, properties::WriterPropertiesPtr, reader::TryClone, + metadata::*, properties::WriterPropertiesPtr, statistics::to_thrift as statistics_to_thrift, FOOTER_SIZE, PARQUET_MAGIC, }; use crate::schema::types::{self, SchemaDescPtr, SchemaDescriptor, TypePtr}; -use crate::util::io::{FileSink, Position}; +use crate::util::io::{FileSink, Position, TryClone}; // ---------------------------------------------------------------------- // APIs for file & row group writers diff --git a/rust/parquet/src/util/cursor.rs b/rust/parquet/src/util/cursor.rs new file mode 100644 index 00000000000..ae3c8ddc2ef --- /dev/null +++ b/rust/parquet/src/util/cursor.rs @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cmp; +use std::io::{self, Error, ErrorKind, Read}; +use std::rc::Rc; + +/// This is object to use if your file is already in memory. +/// The sliceable cursor is similar to std::io::Cursor, except that it makes it easy to create "cursor slices". +/// To achieve this, it uses Rc instead of shared references. Indeed reference fields are painfull +/// because the lack of Generic Associated Type implies that you would require complex lifetime propagation when +/// returning such a cursor. +pub struct SliceableCursor { + inner: Rc>, + start: u64, + length: usize, + pos: u64, +} + +impl SliceableCursor { + pub fn new(content: Vec) -> Self { + let size = content.len(); + SliceableCursor { + inner: Rc::new(content), + start: 0, + pos: 0, + length: size, + } + } + + /// Create a slice cursor using the same data as a current one. + pub fn slice(&self, start: u64, length: usize) -> io::Result { + let new_start = self.start + start; + if new_start >= self.inner.len() as u64 + || new_start as usize + length > self.inner.len() + { + return Err(Error::new(ErrorKind::InvalidInput, "out of bound")); + } + Ok(SliceableCursor { + inner: Rc::clone(&self.inner), + start: new_start, + pos: new_start, + length, + }) + } + + fn remaining_slice(&self) -> &[u8] { + let end = self.start as usize + self.length; + let offset = cmp::min(self.pos, end as u64) as usize; + &self.inner[offset..end] + } + + /// Get the length of the current cursor slice + pub fn len(&self) -> u64 { + self.length as u64 + } +} + +/// Implementation inspired by std::io::Cursor +impl Read for SliceableCursor { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let n = Read::read(&mut self.remaining_slice(), buf)?; + self.pos += n as u64; + Ok(n) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Create a SliceableCursor of all u8 values in ascending order + fn get_u8_range() -> SliceableCursor { + let data: Vec = (0u8..=255).collect(); + SliceableCursor::new(data) + } + + /// Reads all the bytes in the slice and checks that it matches the u8 range from start to end_included + fn check_read_all(mut cursor: SliceableCursor, start: u8, end_included: u8) { + let mut target = vec![]; + let cursor_res = cursor.read_to_end(&mut target); + println!("{:?}", cursor_res); + assert!(!cursor_res.is_err(), "reading error"); + assert_eq!((end_included - start) as usize + 1, cursor_res.unwrap()); + assert_eq!((start..=end_included).collect::>(), target); + } + + #[test] + fn read_all_whole() { + let cursor = get_u8_range(); + check_read_all(cursor, 0, 255); + } + + #[test] + fn read_all_slice() { + let cursor = get_u8_range().slice(10, 10).expect("error while slicing"); + check_read_all(cursor, 10, 19); + } +} diff --git a/rust/parquet/src/util/io.rs b/rust/parquet/src/util/io.rs index af740cd5c0f..223b8d5e676 100644 --- a/rust/parquet/src/util/io.rs +++ b/rust/parquet/src/util/io.rs @@ -17,11 +17,25 @@ use std::{cell::RefCell, cmp, io::*}; -use crate::file::{reader::ParquetReader, writer::ParquetWriter}; +use crate::file::{reader::Length, writer::ParquetWriter}; const DEFAULT_BUF_SIZE: usize = 8 * 1024; // ---------------------------------------------------------------------- + +/// TryClone tries to clone the type and should maintain the `Seek` position of the given +/// instance. +pub trait TryClone: Sized { + /// Clones the type returning a new instance or an error if it's not possible + /// to clone it. + fn try_clone(&self) -> Result; +} + +/// ParquetReader is the interface which needs to be fulfilled to be able to parse a +/// parquet source. +pub trait ParquetReader: Read + Seek + Length + TryClone {} +impl ParquetReader for T {} + // Read/Write wrappers for `File`. /// Position trait returns the current position in the stream. @@ -121,6 +135,12 @@ impl Position for FileSource { } } +impl Length for FileSource { + fn len(&self) -> u64 { + self.end - self.start + } +} + /// Struct that represents `File` output stream with position tracking. /// Used as a sink in file writer. pub struct FileSink { diff --git a/rust/parquet/src/util/mod.rs b/rust/parquet/src/util/mod.rs index 669cc3c0a49..af9a1aa1eba 100644 --- a/rust/parquet/src/util/mod.rs +++ b/rust/parquet/src/util/mod.rs @@ -20,6 +20,7 @@ pub mod memory; #[macro_use] pub mod bit_util; mod bit_packing; +pub mod cursor; pub mod hash_util; #[cfg(test)]