From 0da6cc4d0a0344a1ff6941f536c31ceab8aa8943 Mon Sep 17 00:00:00 2001 From: alamb Date: Wed, 19 Aug 2020 09:19:23 -0400 Subject: [PATCH 1/2] ARROW-9790: [Rust][Parquet] Fix ParquetFileArrowReader boundary conditions --- rust/parquet/src/arrow/array_reader.rs | 4 +-- rust/parquet/src/arrow/arrow_reader.rs | 39 ++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index ae6ca497b858..b890d9c779d2 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -136,10 +136,8 @@ impl ArrayReader for PrimitiveArrayReader { while records_read < batch_size { let records_to_read = batch_size - records_read; + // NB can be 0 if at end of page let records_read_once = self.record_reader.read_records(records_to_read)?; - if records_read_once == 0 { - break; // record reader has no record - } records_read = records_read + records_read_once; // Record reader exhausted diff --git a/rust/parquet/src/arrow/arrow_reader.rs b/rust/parquet/src/arrow/arrow_reader.rs index f052d0f36e5c..180514b39883 100644 --- a/rust/parquet/src/arrow/arrow_reader.rs +++ b/rust/parquet/src/arrow/arrow_reader.rs @@ -304,6 +304,45 @@ mod tests { >(2, 100, 2, message_type, 15, 50, converter); } + #[test] + fn test_bool_single_column_reader_test_batch_size_divides_into_row_group_size() { + let message_type = " + message test_schema { + REQUIRED BOOLEAN leaf; + } + "; + + // Use a batch size (5) so batches to fall on + // row group boundaries (25 rows in 3 row groups --> row + // groups of 10, 10, and 5) to test edge refilling edge cases. + let converter = FromConverter::new(); + single_column_reader_test::< + BoolType, + BooleanArray, + FromConverter>, BooleanArray>, + BoolType, + >(3, 25, 2, message_type, 5, 50, converter); + } + + #[test] + fn test_bool_single_column_reader_test_batch_size_divides_into_row_group_size2() { + let message_type = " + message test_schema { + REQUIRED BOOLEAN leaf; + } + "; + + // Ensure that every batch size (25) falls exactly a row group + // boundary (25 in this case) to test edge case. + let converter = FromConverter::new(); + single_column_reader_test::< + BoolType, + BooleanArray, + FromConverter>, BooleanArray>, + BoolType, + >(4, 100, 2, message_type, 25, 50, converter); + } + struct RandFixedLenGen {} impl RandGen for RandFixedLenGen { From cfdea8349a8f41229a4c8fb2cfddec3bb11df112 Mon Sep 17 00:00:00 2001 From: alamb Date: Wed, 19 Aug 2020 10:16:31 -0400 Subject: [PATCH 2/2] Add a test for reading empty pages --- rust/parquet/src/arrow/array_reader.rs | 58 +++++++++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index b890d9c779d2..05ec9a0fa069 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -932,7 +932,7 @@ mod tests { use super::*; use crate::arrow::converter::Utf8Converter; use crate::basic::{Encoding, Type as PhysicalType}; - use crate::column::page::Page; + use crate::column::page::{Page, PageReader}; use crate::data_type::{ByteArray, DataType, Int32Type, Int64Type}; use crate::errors::Result; use crate::file::reader::{FileReader, SerializedFileReader}; @@ -998,6 +998,33 @@ mod tests { } } + #[test] + fn test_primitive_array_reader_empty_pages() { + // Construct column schema + let message_type = " + message test_schema { + REQUIRED INT32 leaf; + } + "; + + let schema = parse_message_type(message_type) + .map(|t| Rc::new(SchemaDescriptor::new(Rc::new(t)))) + .unwrap(); + + let column_desc = schema.column(0); + let page_iterator = EmptyPageIterator::new(schema.clone()); + + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc.clone(), + ) + .unwrap(); + + // expect no values to be read + let array = array_reader.next_batch(50).unwrap(); + assert!(array.is_empty()); + } + #[test] fn test_primitive_array_reader_data() { // Construct column schema @@ -1450,6 +1477,35 @@ mod tests { } } + /// Iterator for testing reading empty columns + struct EmptyPageIterator { + schema: SchemaDescPtr, + } + + impl EmptyPageIterator { + fn new(schema: SchemaDescPtr) -> Self { + EmptyPageIterator { schema } + } + } + + impl Iterator for EmptyPageIterator { + type Item = Result>; + + fn next(&mut self) -> Option { + None + } + } + + impl PageIterator for EmptyPageIterator { + fn schema(&mut self) -> Result { + Ok(self.schema.clone()) + } + + fn column_schema(&mut self) -> Result { + Ok(self.schema.column(0)) + } + } + #[test] fn test_struct_array_reader() { let array_1 = Arc::new(PrimitiveArray::::from(vec![1, 2, 3, 4, 5]));