From d187e36f877a5f79422a60e0f1a1451d83cfebd7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 21 Nov 2023 09:51:26 -0500 Subject: [PATCH 01/16] Extract parquet statistics to its own module, add tests --- .../src/datasource/physical_plan/parquet.rs | 23 +- .../physical_plan/parquet/page_filter.rs | 5 +- .../physical_plan/parquet/row_groups.rs | 141 +-- .../physical_plan/parquet/statistics.rs | 807 ++++++++++++++++++ 4 files changed, 834 insertions(+), 142 deletions(-) create mode 100644 datafusion/core/src/datasource/physical_plan/parquet/statistics.rs diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 731672ceb8b89..677f018869b63 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -66,6 +66,7 @@ mod metrics; pub mod page_filter; mod row_filter; mod row_groups; +mod statistics; pub use metrics::ParquetFileMetrics; @@ -718,28 +719,6 @@ pub async fn plan_to_parquet( Ok(()) } -// Copy from the arrow-rs -// https://github.com/apache/arrow-rs/blob/733b7e7fd1e8c43a404c3ce40ecf741d493c21b4/parquet/src/arrow/buffer/bit_util.rs#L55 -// Convert the byte slice to fixed length byte array with the length of 16 -fn sign_extend_be(b: &[u8]) -> [u8; 16] { - assert!(b.len() <= 16, "Array too large, expected less than 16"); - let is_negative = (b[0] & 128u8) == 128u8; - let mut result = if is_negative { [255u8; 16] } else { [0u8; 16] }; - for (d, s) in result.iter_mut().skip(16 - b.len()).zip(b) { - *d = *s; - } - result -} - -// Convert the bytes array to i128. -// The endian of the input bytes array must be big-endian. -pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 { - // The bytes array are from parquet file and must be the big-endian. - // The endian is defined by parquet format, and the reference document - // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66 - i128::from_be_bytes(sign_extend_be(b)) -} - // Convert parquet column schema to arrow data type, and just consider the // decimal data type. pub(crate) fn parquet_to_arrow_decimal_type( diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index b5b5f154f7a0f..42bfef35996e9 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -39,9 +39,8 @@ use parquet::{ }; use std::sync::Arc; -use crate::datasource::physical_plan::parquet::{ - from_bytes_to_i128, parquet_to_arrow_decimal_type, -}; +use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type; +use crate::datasource::physical_plan::parquet::statistics::from_bytes_to_i128; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use super::metrics::ParquetFileMetrics; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 0079368f9cdd2..bb52c0469932b 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -15,31 +15,26 @@ // specific language governing permissions and limitations // under the License. -use arrow::{ - array::ArrayRef, - datatypes::{DataType, Schema}, -}; +use arrow::{array::ArrayRef, datatypes::Schema}; use datafusion_common::tree_node::{TreeNode, VisitRecursion}; use datafusion_common::{Column, DataFusionError, Result, ScalarValue}; use parquet::{ arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder}, bloom_filter::Sbbf, - file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics}, + file::metadata::RowGroupMetaData, }; use std::{ collections::{HashMap, HashSet}, sync::Arc, }; -use crate::datasource::{ - listing::FileRange, - physical_plan::parquet::{from_bytes_to_i128, parquet_to_arrow_decimal_type}, -}; +use crate::datasource::listing::FileRange; use crate::logical_expr::Operator; use crate::physical_expr::expressions as phys_expr; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use crate::physical_plan::PhysicalExpr; +use super::statistics::RowGroupStatisticsConverter; use super::ParquetFileMetrics; /// Prune row groups based on statistics @@ -303,112 +298,6 @@ struct RowGroupPruningStatistics<'a> { parquet_schema: &'a Schema, } -/// Extract the min/max statistics from a `ParquetStatistics` object -macro_rules! get_statistic { - ($column_statistics:expr, $func:ident, $bytes_func:ident, $target_arrow_type:expr) => {{ - if !$column_statistics.has_min_max_set() { - return None; - } - match $column_statistics { - ParquetStatistics::Boolean(s) => Some(ScalarValue::Boolean(Some(*s.$func()))), - ParquetStatistics::Int32(s) => { - match $target_arrow_type { - // int32 to decimal with the precision and scale - Some(DataType::Decimal128(precision, scale)) => { - Some(ScalarValue::Decimal128( - Some(*s.$func() as i128), - precision, - scale, - )) - } - _ => Some(ScalarValue::Int32(Some(*s.$func()))), - } - } - ParquetStatistics::Int64(s) => { - match $target_arrow_type { - // int64 to decimal with the precision and scale - Some(DataType::Decimal128(precision, scale)) => { - Some(ScalarValue::Decimal128( - Some(*s.$func() as i128), - precision, - scale, - )) - } - _ => Some(ScalarValue::Int64(Some(*s.$func()))), - } - } - // 96 bit ints not supported - ParquetStatistics::Int96(_) => None, - ParquetStatistics::Float(s) => Some(ScalarValue::Float32(Some(*s.$func()))), - ParquetStatistics::Double(s) => Some(ScalarValue::Float64(Some(*s.$func()))), - ParquetStatistics::ByteArray(s) => { - match $target_arrow_type { - // decimal data type - Some(DataType::Decimal128(precision, scale)) => { - Some(ScalarValue::Decimal128( - Some(from_bytes_to_i128(s.$bytes_func())), - precision, - scale, - )) - } - _ => { - let s = std::str::from_utf8(s.$bytes_func()) - .map(|s| s.to_string()) - .ok(); - Some(ScalarValue::Utf8(s)) - } - } - } - // type not supported yet - ParquetStatistics::FixedLenByteArray(s) => { - match $target_arrow_type { - // just support the decimal data type - Some(DataType::Decimal128(precision, scale)) => { - Some(ScalarValue::Decimal128( - Some(from_bytes_to_i128(s.$bytes_func())), - precision, - scale, - )) - } - _ => None, - } - } - } - }}; -} - -// Extract the min or max value calling `func` or `bytes_func` on the ParquetStatistics as appropriate -macro_rules! get_min_max_values { - ($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{ - let (_column_index, field) = - if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) { - (v, f) - } else { - // Named column was not present - return None; - }; - - let data_type = field.data_type(); - // The result may be None, because DataFusion doesn't have support for ScalarValues of the column type - let null_scalar: ScalarValue = data_type.try_into().ok()?; - - $self.row_group_metadata - .columns() - .iter() - .find(|c| c.column_descr().name() == &$column.name) - .and_then(|c| if c.statistics().is_some() {Some((c.statistics().unwrap(), c.column_descr()))} else {None}) - .map(|(stats, column_descr)| - { - let target_data_type = parquet_to_arrow_decimal_type(column_descr); - get_statistic!(stats, $func, $bytes_func, target_data_type) - }) - .flatten() - // column either didn't have statistics at all or didn't have min/max values - .or_else(|| Some(null_scalar.clone())) - .and_then(|s| s.to_array().ok()) - }} -} - // Extract the null count value on the ParquetStatistics macro_rules! get_null_count_values { ($self:expr, $column:expr) => {{ @@ -431,11 +320,29 @@ macro_rules! get_null_count_values { impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { fn min_values(&self, column: &Column) -> Option { - get_min_max_values!(self, column, min, min_bytes) + let field = self + .parquet_schema + .fields() + .find(&column.name) + .map(|(_idx, field)| field)?; + + RowGroupStatisticsConverter::new(field) + .min([self.row_group_metadata]) + // ignore errors during conversion, and just use no statistics + .ok() } fn max_values(&self, column: &Column) -> Option { - get_min_max_values!(self, column, max, max_bytes) + let field = self + .parquet_schema + .fields() + .find(&column.name) + .map(|(_idx, field)| field)?; + + RowGroupStatisticsConverter::new(field) + .max([self.row_group_metadata]) + // ignore errors during conversion, and just use no statistics + .ok() } fn num_containers(&self) -> usize { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs new file mode 100644 index 0000000000000..e3aa9241d39df --- /dev/null +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -0,0 +1,807 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`RowGroupStatisticsConverter`] tp converts parquet RowGroup statistics to arrow [`ArrayRef`]. + +use arrow::{array::ArrayRef, datatypes::DataType}; +use arrow_array::new_empty_array; +use arrow_schema::Field; +use datafusion_common::{Result, ScalarValue}; +use parquet::file::{ + metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics, +}; + +// Convert the bytes array to i128. +// The endian of the input bytes array must be big-endian. +pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 { + // The bytes array are from parquet file and must be the big-endian. + // The endian is defined by parquet format, and the reference document + // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66 + i128::from_be_bytes(sign_extend_be(b)) +} + +// Copy from arrow-rs +// https://github.com/apache/arrow-rs/blob/733b7e7fd1e8c43a404c3ce40ecf741d493c21b4/parquet/src/arrow/buffer/bit_util.rs#L55 +// Convert the byte slice to fixed length byte array with the length of 16 +fn sign_extend_be(b: &[u8]) -> [u8; 16] { + assert!(b.len() <= 16, "Array too large, expected less than 16"); + let is_negative = (b[0] & 128u8) == 128u8; + let mut result = if is_negative { [255u8; 16] } else { [0u8; 16] }; + for (d, s) in result.iter_mut().skip(16 - b.len()).zip(b) { + *d = *s; + } + result +} + +/// Converts parquet RowGroup statistics (stored in +/// [`RowGroupMetaData`]) into an arrow [`ArrayRef`] +/// +/// For example, given a parquet file with 3 Row Groups, when asked for +/// statistics for column "A" it will return a single array with 3 elements, +/// +pub(crate) struct RowGroupStatisticsConverter<'a> { + field: &'a Field, +} + +/// Extract a single min/max statistics from a [`ParquetStatistics`] object +/// +/// * `$column_statistics` is the `ParquetStatistics` object +/// * `$func is the function` (`min`/`max`) to call to get the value +/// * `$bytes_func` is the function (`min_bytes`/`max_bytes`) to call to get the value as bytes +/// * `$target_arrow_type` is the [`DataType`] of the target statistics +macro_rules! get_statistic { + ($column_statistics:expr, $func:ident, $bytes_func:ident, $target_arrow_type:expr) => {{ + if !$column_statistics.has_min_max_set() { + return None; + } + match $column_statistics { + ParquetStatistics::Boolean(s) => Some(ScalarValue::Boolean(Some(*s.$func()))), + ParquetStatistics::Int32(s) => { + match $target_arrow_type { + // int32 to decimal with the precision and scale + Some(DataType::Decimal128(precision, scale)) => { + Some(ScalarValue::Decimal128( + Some(*s.$func() as i128), + precision, + scale, + )) + } + _ => Some(ScalarValue::Int32(Some(*s.$func()))), + } + } + ParquetStatistics::Int64(s) => { + match $target_arrow_type { + // int64 to decimal with the precision and scale + Some(DataType::Decimal128(precision, scale)) => { + Some(ScalarValue::Decimal128( + Some(*s.$func() as i128), + precision, + scale, + )) + } + _ => Some(ScalarValue::Int64(Some(*s.$func()))), + } + } + // 96 bit ints not supported + ParquetStatistics::Int96(_) => None, + ParquetStatistics::Float(s) => Some(ScalarValue::Float32(Some(*s.$func()))), + ParquetStatistics::Double(s) => Some(ScalarValue::Float64(Some(*s.$func()))), + ParquetStatistics::ByteArray(s) => { + match $target_arrow_type { + // decimal data type + Some(DataType::Decimal128(precision, scale)) => { + Some(ScalarValue::Decimal128( + Some(from_bytes_to_i128(s.$bytes_func())), + precision, + scale, + )) + } + _ => { + let s = std::str::from_utf8(s.$bytes_func()) + .map(|s| s.to_string()) + .ok(); + Some(ScalarValue::Utf8(s)) + } + } + } + // type not supported yet + ParquetStatistics::FixedLenByteArray(s) => { + match $target_arrow_type { + // just support the decimal data type + Some(DataType::Decimal128(precision, scale)) => { + Some(ScalarValue::Decimal128( + Some(from_bytes_to_i128(s.$bytes_func())), + precision, + scale, + )) + } + _ => None, + } + } + } + }}; +} + +#[derive(Debug, Clone, Copy)] +enum MinMax { + Min, + Max, +} + +impl<'a> RowGroupStatisticsConverter<'a> { + /// Create a new RowGoupStatisticsConverter suitable that can extract + /// statistics for the specified field + pub fn new(field: &'a Field) -> Self { + Self { field } + } + + /// Returns the min value for the column into an array ref. + pub fn min<'b>( + &self, + row_group_meta_data: impl IntoIterator, + ) -> Result { + self.min_max_impl(MinMax::Min, row_group_meta_data) + } + + /// Returns the max value for the column into an array ref. + pub fn max<'b>( + &self, + row_group_meta_data: impl IntoIterator, + ) -> Result { + self.min_max_impl(MinMax::Max, row_group_meta_data) + } + + /// Extracts all min/max values for the column into an array ref. + fn min_max_impl<'b>( + &self, + mm: MinMax, + row_group_meta_data: impl IntoIterator, + ) -> Result { + let mut row_group_meta_data = row_group_meta_data.into_iter().peekable(); + + // if it is empty, return empty array + if row_group_meta_data.peek().is_none() { + return Ok(new_empty_array(self.field.data_type())); + } + + let maybe_index = row_group_meta_data.peek().and_then(|rg_meta| { + rg_meta + .columns() + .iter() + .enumerate() + .find(|(_idx, c)| c.column_descr().name() == self.field.name()) + .map(|(idx, _c)| idx) + }); + + // don't have this column, return an array of all NULLs + let Some(column_index) = maybe_index else { + let num_row_groups = row_group_meta_data.count(); + let sv = ScalarValue::try_from(self.field.data_type())?; + return sv.to_array_of_size(num_row_groups); + }; + + let stats_iter = row_group_meta_data.map(move |row_group_meta_data| { + row_group_meta_data.column(column_index).statistics() + }); + + // this is the value to use when the statistics are not set + let null_value = ScalarValue::try_from(self.field.data_type())?; + match mm { + MinMax::Min => { + let values = stats_iter.map(|column_statistics| { + column_statistics + .and_then(|column_statistics| { + get_statistic!( + column_statistics, + min, + min_bytes, + Some(self.field.data_type().clone()) + ) + }) + .unwrap_or_else(|| null_value.clone()) + }); + ScalarValue::iter_to_array(values) + } + MinMax::Max => { + let values = stats_iter.map(|column_statistics| { + column_statistics + .and_then(|column_statistics| { + get_statistic!( + column_statistics, + max, + max_bytes, + Some(self.field.data_type().clone()) + ) + }) + .unwrap_or_else(|| null_value.clone()) + }); + ScalarValue::iter_to_array(values) + } + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use arrow_array::{ + BinaryArray, BooleanArray, Decimal128Array, Float32Array, Float64Array, + Int32Array, Int64Array, RecordBatch, StringArray, TimestampNanosecondArray, + }; + use arrow_schema::SchemaRef; + use bytes::Bytes; + use datafusion_common::test_util::parquet_test_data; + use parquet::arrow::arrow_reader::ArrowReaderBuilder; + use parquet::arrow::arrow_writer::ArrowWriter; + use parquet::file::metadata::ParquetMetaData; + use parquet::file::properties::{EnabledStatistics, WriterProperties}; + use std::path::PathBuf; + use std::sync::Arc; + + // TODO error cases (with parquet statistics that are mismatched in expected type) + + #[test] + fn roundtrip_empty() { + let empty_bool_array = new_empty_array(&DataType::Boolean); + Test { + input: empty_bool_array.clone(), + expected_min: empty_bool_array.clone(), + expected_max: empty_bool_array.clone(), + } + .run() + } + + #[test] + fn roundtrip_bool() { + Test { + input: Arc::new(BooleanArray::from(vec![ + // row group 1 + Some(true), + None, + Some(true), + // row group 2 + Some(true), + Some(false), + None, + // row group 3 + None, + None, + None, + ])), + expected_min: Arc::new(BooleanArray::from(vec![ + Some(true), + Some(false), + None, + ])), + expected_max: Arc::new(BooleanArray::from(vec![ + Some(true), + Some(true), + None, + ])), + } + .run() + } + + #[test] + fn roundtrip_int32() { + Test { + input: Arc::new(Int32Array::from(vec![ + // row group 1 + Some(1), + None, + Some(3), + // row group 2 + Some(0), + Some(5), + None, + // row group 3 + None, + None, + None, + ])), + expected_min: Arc::new(Int32Array::from(vec![Some(1), Some(0), None])), + expected_max: Arc::new(Int32Array::from(vec![Some(3), Some(5), None])), + } + .run() + } + + #[test] + fn roundtrip_int64() { + Test { + input: Arc::new(Int64Array::from(vec![ + // row group 1 + Some(1), + None, + Some(3), + // row group 2 + Some(0), + Some(5), + None, + // row group 3 + None, + None, + None, + ])), + expected_min: Arc::new(Int64Array::from(vec![Some(1), Some(0), None])), + expected_max: Arc::new(Int64Array::from(vec![Some(3), Some(5), None])), + } + .run() + } + + #[test] + fn roundtrip_f32() { + Test { + input: Arc::new(Float32Array::from(vec![ + // row group 1 + Some(1.0), + None, + Some(3.0), + // row group 2 + Some(-1.0), + Some(5.0), + None, + // row group 3 + None, + None, + None, + ])), + expected_min: Arc::new(Float32Array::from(vec![Some(1.0), Some(-1.0), None])), + expected_max: Arc::new(Float32Array::from(vec![Some(3.0), Some(5.0), None])), + } + .run() + } + + #[test] + fn roundtrip_f64() { + Test { + input: Arc::new(Float64Array::from(vec![ + // row group 1 + Some(1.0), + None, + Some(3.0), + // row group 2 + Some(-1.0), + Some(5.0), + None, + // row group 3 + None, + None, + None, + ])), + expected_min: Arc::new(Float64Array::from(vec![Some(1.0), Some(-1.0), None])), + expected_max: Arc::new(Float64Array::from(vec![Some(3.0), Some(5.0), None])), + } + .run() + } + + #[test] + #[should_panic( + expected = "Inconsistent types in ScalarValue::iter_to_array. Expected Int64, got TimestampNanosecond(NULL, None)" + )] + fn roundtrip_timestamp() { + Test { + input: Arc::new(TimestampNanosecondArray::from(vec![ + // row group 1 + Some(1), + None, + Some(3), + // row group 2 + Some(9), + Some(5), + None, + // row group 3 + None, + None, + None, + ])), + expected_min: Arc::new(TimestampNanosecondArray::from(vec![ + Some(1), + Some(5), + None, + ])), + expected_max: Arc::new(TimestampNanosecondArray::from(vec![ + Some(3), + Some(9), + None, + ])), + } + .run() + } + + #[test] + fn roundtrip_decimal() { + Test { + input: Arc::new( + Decimal128Array::from(vec![ + // row group 1 + Some(100), + None, + Some(22000), + // row group 2 + Some(500000), + Some(330000), + None, + // row group 3 + None, + None, + None, + ]) + .with_precision_and_scale(9, 2) + .unwrap(), + ), + expected_min: Arc::new( + Decimal128Array::from(vec![Some(100), Some(330000), None]) + .with_precision_and_scale(9, 2) + .unwrap(), + ), + expected_max: Arc::new( + Decimal128Array::from(vec![Some(22000), Some(500000), None]) + .with_precision_and_scale(9, 2) + .unwrap(), + ), + } + .run() + } + + #[test] + fn roundtrip_utf8() { + Test { + input: Arc::new(StringArray::from(vec![ + // row group 1 + Some("A"), + None, + Some("Q"), + // row group 2 + Some("ZZ"), + Some("AA"), + None, + // row group 3 + None, + None, + None, + ])), + expected_min: Arc::new(StringArray::from(vec![Some("A"), Some("AA"), None])), + expected_max: Arc::new(StringArray::from(vec![Some("Q"), Some("ZZ"), None])), + } + .run() + } + + #[test] + #[should_panic( + expected = "Inconsistent types in ScalarValue::iter_to_array. Expected Utf8, got Binary(NULL)" + )] + fn roundtrip_binary() { + Test { + input: Arc::new(BinaryArray::from_opt_vec(vec![ + // row group 1 + Some(b"A"), + None, + Some(b"Q"), + // row group 2 + Some(b"ZZ"), + Some(b"AA"), + None, + // row group 3 + None, + None, + None, + ])), + expected_min: Arc::new(BinaryArray::from_opt_vec(vec![ + Some(b"A"), + Some(b"AA"), + None, + ])), + expected_max: Arc::new(BinaryArray::from_opt_vec(vec![ + Some(b"Q"), + Some(b"ZZ"), + None, + ])), + } + .run() + } + + const ROWS_PER_ROW_GROUP: usize = 3; + + /// Writes the input batch into a parquet file, with every every three rows as + /// their own row group, and compares the min/maxes to the expected values + struct Test { + input: ArrayRef, + expected_min: ArrayRef, + expected_max: ArrayRef, + } + + impl Test { + fn run(self) { + let Self { + input, + expected_min, + expected_max, + } = self; + + let input_batch = RecordBatch::try_from_iter([("c1", input)]).unwrap(); + + let schema = input_batch.schema(); + + let metadata = parquet_metadata(schema.clone(), input_batch); + + for field in schema.fields() { + let converter = RowGroupStatisticsConverter::new(field); + let row_groups = metadata.row_groups(); + let min = converter.min(row_groups).unwrap(); + assert_eq!( + &min, + &expected_min, + "Min. Statistics\n\n{}\n\n", + DisplayStats(row_groups) + ); + + let max = converter.max(row_groups).unwrap(); + assert_eq!( + &max, + &expected_max, + "Max. Statistics\n\n{}\n\n", + DisplayStats(row_groups) + ); + } + } + } + + /// Write the specified batches out as parquet and return the metadata + fn parquet_metadata(schema: SchemaRef, batch: RecordBatch) -> Arc { + let props = WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Chunk) + .set_max_row_group_size(ROWS_PER_ROW_GROUP) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let reader = ArrowReaderBuilder::try_new(Bytes::from(buffer)).unwrap(); + reader.metadata().clone() + } + + /// Formats the statistics nicely for display + struct DisplayStats<'a>(&'a [RowGroupMetaData]); + impl<'a> std::fmt::Display for DisplayStats<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let row_groups = self.0; + writeln!(f, " row_groups: {}", row_groups.len())?; + for rg in row_groups { + for col in rg.columns() { + if let Some(statistics) = col.statistics() { + writeln!(f, " {}: {:?}", col.column_path(), statistics)?; + } + } + } + Ok(()) + } + } + + struct ExpectedColumn { + name: &'static str, + expected_min: ArrayRef, + expected_max: ArrayRef, + } + + /// Reads statistics out of the specified, and compares them to the expected values + struct TestFile { + file_name: &'static str, + expected_columns: Vec, + } + + impl TestFile { + fn new(file_name: &'static str) -> Self { + Self { + file_name, + expected_columns: Vec::new(), + } + } + + fn with_column(mut self, column: ExpectedColumn) -> Self { + self.expected_columns.push(column); + self + } + + /// Reads the specified parquet file and validates that the exepcted min/max + /// values for the specified columns are as expected. + fn run(self) { + let path = PathBuf::from(parquet_test_data()).join(self.file_name); + let file = std::fs::File::open(path).unwrap(); + let reader = ArrowReaderBuilder::try_new(file).unwrap(); + let arrow_schema = reader.schema(); + let metadata = reader.metadata(); + + for rg in metadata.row_groups() { + println!( + "Columns: {}", + rg.columns() + .iter() + .map(|c| c.column_descr().name()) + .collect::>() + .join(", ") + ); + } + println!(" row groups: {}", metadata.row_groups().len()); + + for expected_column in self.expected_columns { + let ExpectedColumn { + name, + expected_min, + expected_max, + } = expected_column; + + let field = arrow_schema + .field_with_name(name) + .expect("can't find field in schema"); + + let converter = RowGroupStatisticsConverter::new(field); + let actual_min = converter.min(metadata.row_groups()).unwrap(); + assert_eq!(&expected_min, &actual_min, "column {name}"); + + let actual_max = converter.max(metadata.row_groups()).unwrap(); + assert_eq!(&expected_max, &actual_max, "column {name}"); + } + } + } + + #[test] + fn alltypes_plain() { + // /parquet-testing/data/datapage_v1-snappy-compressed-checksum.parquet + // row_groups: 1 + // (has no statistics) + TestFile::new("alltypes_plain.parquet") + // No column statistics should be read as NULL, but with the right type + .with_column(ExpectedColumn { + name: "id", + expected_min: Arc::new(Int32Array::from(vec![None])), + expected_max: Arc::new(Int32Array::from(vec![None])), + }) + .with_column(ExpectedColumn { + name: "bool_col", + expected_min: Arc::new(BooleanArray::from(vec![None])), + expected_max: Arc::new(BooleanArray::from(vec![None])), + }) + .run(); + } + + #[test] + fn alltypes_tiny_pages() { + // /parquet-testing/data/alltypes_tiny_pages.parquet + // row_groups: 1 + // "id": Int32({min: Some(0), max: Some(7299), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "bool_col": Boolean({min: Some(false), max: Some(true), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "tinyint_col": Int32({min: Some(0), max: Some(9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "smallint_col": Int32({min: Some(0), max: Some(9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "int_col": Int32({min: Some(0), max: Some(9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "bigint_col": Int64({min: Some(0), max: Some(90), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "float_col": Float({min: Some(0.0), max: Some(9.9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "double_col": Double({min: Some(0.0), max: Some(90.89999999999999), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "date_string_col": ByteArray({min: Some(ByteArray { data: "01/01/09" }), max: Some(ByteArray { data: "12/31/10" }), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "string_col": ByteArray({min: Some(ByteArray { data: "0" }), max: Some(ByteArray { data: "9" }), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "timestamp_col": Int96({min: None, max: None, distinct_count: None, null_count: 0, min_max_deprecated: true, min_max_backwards_compatible: true}) + // "year": Int32({min: Some(2009), max: Some(2010), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "month": Int32({min: Some(1), max: Some(12), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + TestFile::new("alltypes_tiny_pages.parquet") + .with_column(ExpectedColumn { + name: "id", + expected_min: Arc::new(Int32Array::from(vec![Some(0)])), + expected_max: Arc::new(Int32Array::from(vec![Some(7299)])), + }) + .with_column(ExpectedColumn { + name: "bool_col", + expected_min: Arc::new(BooleanArray::from(vec![Some(false)])), + expected_max: Arc::new(BooleanArray::from(vec![Some(true)])), + }) + .with_column(ExpectedColumn { + name: "tinyint_col", + expected_min: Arc::new(Int32Array::from(vec![Some(0)])), + expected_max: Arc::new(Int32Array::from(vec![Some(9)])), + }) + .with_column(ExpectedColumn { + name: "smallint_col", + expected_min: Arc::new(Int32Array::from(vec![Some(0)])), + expected_max: Arc::new(Int32Array::from(vec![Some(9)])), + }) + .with_column(ExpectedColumn { + name: "int_col", + expected_min: Arc::new(Int32Array::from(vec![Some(0)])), + expected_max: Arc::new(Int32Array::from(vec![Some(9)])), + }) + .with_column(ExpectedColumn { + name: "bigint_col", + expected_min: Arc::new(Int64Array::from(vec![Some(0)])), + expected_max: Arc::new(Int64Array::from(vec![Some(90)])), + }) + .with_column(ExpectedColumn { + name: "float_col", + expected_min: Arc::new(Float32Array::from(vec![Some(0.0)])), + expected_max: Arc::new(Float32Array::from(vec![Some(9.9)])), + }) + .with_column(ExpectedColumn { + name: "double_col", + expected_min: Arc::new(Float64Array::from(vec![Some(0.0)])), + expected_max: Arc::new(Float64Array::from(vec![Some(90.89999999999999)])), + }) + .with_column(ExpectedColumn { + name: "date_string_col", + expected_min: Arc::new(StringArray::from(vec![Some("01/01/09")])), + expected_max: Arc::new(StringArray::from(vec![Some("12/31/10")])), + }) + .with_column(ExpectedColumn { + name: "string_col", + expected_min: Arc::new(StringArray::from(vec![Some("0")])), + expected_max: Arc::new(StringArray::from(vec![Some("9")])), + }) + // File has no min/max for timestamp_col + .with_column(ExpectedColumn { + name: "timestamp_col", + expected_min: Arc::new(TimestampNanosecondArray::from(vec![None])), + expected_max: Arc::new(TimestampNanosecondArray::from(vec![None])), + }) + .with_column(ExpectedColumn { + name: "year", + expected_min: Arc::new(Int32Array::from(vec![Some(2009)])), + expected_max: Arc::new(Int32Array::from(vec![Some(2010)])), + }) + .with_column(ExpectedColumn { + name: "month", + expected_min: Arc::new(Int32Array::from(vec![Some(1)])), + expected_max: Arc::new(Int32Array::from(vec![Some(12)])), + }) + .run(); + } + + #[test] + fn fixed_length_decimal_legacy() { + // /parquet-testing/data/fixed_length_decimal_legacy.parquet + // row_groups: 1 + // "value": FixedLenByteArray({min: Some(FixedLenByteArray(ByteArray { data: Some(ByteBufferPtr { data: b"\0\0\0\0\0\xc8" }) })), max: Some(FixedLenByteArray(ByteArray { data: "\0\0\0\0\t`" })), distinct_count: None, null_count: 0, min_max_deprecated: true, min_max_backwards_compatible: true}) + + TestFile::new("fixed_length_decimal_legacy.parquet") + .with_column(ExpectedColumn { + name: "value", + expected_min: Arc::new( + Decimal128Array::from(vec![Some(200)]) + .with_precision_and_scale(13, 2) + .unwrap(), + ), + expected_max: Arc::new( + Decimal128Array::from(vec![Some(2400)]) + .with_precision_and_scale(13, 2) + .unwrap(), + ), + }) + .run(); + } + + #[test] + fn nan_in_stats() { + // /parquet-testing/data/nan_in_stats.parquet + // row_groups: 1 + // "x": Double({min: Some(1.0), max: Some(NaN), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + + TestFile::new("nan_in_stats.parquet") + .with_column(ExpectedColumn { + name: "x", + expected_min: Arc::new(Float64Array::from(vec![Some(1.0)])), + expected_max: Arc::new(Float64Array::from(vec![Some(f64::NAN)])), + }) + .run(); + } +} From d4e660ab9cf3694bb2da8f887b46126691780233 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 27 Nov 2023 13:38:06 -0500 Subject: [PATCH 02/16] Update datafusion/core/src/datasource/physical_plan/parquet/statistics.rs Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- .../core/src/datasource/physical_plan/parquet/statistics.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index e3aa9241d39df..a9458da9345bf 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -47,8 +47,7 @@ fn sign_extend_be(b: &[u8]) -> [u8; 16] { result } -/// Converts parquet RowGroup statistics (stored in -/// [`RowGroupMetaData`]) into an arrow [`ArrayRef`] +/// Extracts statistics for a single leaf column from [`RowGroupMetaData`] as an arrow [`ArrayRef`] /// /// For example, given a parquet file with 3 Row Groups, when asked for /// statistics for column "A" it will return a single array with 3 elements, From fd2aebce832c27d1cb98de16459fc9b638ba87b8 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 27 Nov 2023 13:38:19 -0500 Subject: [PATCH 03/16] rename enum --- .../datasource/physical_plan/parquet/statistics.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index e3aa9241d39df..affd0ee901415 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -137,7 +137,7 @@ macro_rules! get_statistic { } #[derive(Debug, Clone, Copy)] -enum MinMax { +enum Statistic { Min, Max, } @@ -154,7 +154,7 @@ impl<'a> RowGroupStatisticsConverter<'a> { &self, row_group_meta_data: impl IntoIterator, ) -> Result { - self.min_max_impl(MinMax::Min, row_group_meta_data) + self.min_max_impl(Statistic::Min, row_group_meta_data) } /// Returns the max value for the column into an array ref. @@ -162,13 +162,13 @@ impl<'a> RowGroupStatisticsConverter<'a> { &self, row_group_meta_data: impl IntoIterator, ) -> Result { - self.min_max_impl(MinMax::Max, row_group_meta_data) + self.min_max_impl(Statistic::Max, row_group_meta_data) } /// Extracts all min/max values for the column into an array ref. fn min_max_impl<'b>( &self, - mm: MinMax, + mm: Statistic, row_group_meta_data: impl IntoIterator, ) -> Result { let mut row_group_meta_data = row_group_meta_data.into_iter().peekable(); @@ -201,7 +201,7 @@ impl<'a> RowGroupStatisticsConverter<'a> { // this is the value to use when the statistics are not set let null_value = ScalarValue::try_from(self.field.data_type())?; match mm { - MinMax::Min => { + Statistic::Min => { let values = stats_iter.map(|column_statistics| { column_statistics .and_then(|column_statistics| { @@ -216,7 +216,7 @@ impl<'a> RowGroupStatisticsConverter<'a> { }); ScalarValue::iter_to_array(values) } - MinMax::Max => { + Statistic::Max => { let values = stats_iter.map(|column_statistics| { column_statistics .and_then(|column_statistics| { From a128a2002a8caa63e0706c745f24691ee5f8f525 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 27 Nov 2023 13:54:27 -0500 Subject: [PATCH 04/16] Improve API --- .../physical_plan/parquet/row_groups.rs | 20 +++-------- .../physical_plan/parquet/statistics.rs | 34 +++++++++---------- 2 files changed, 21 insertions(+), 33 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index bb52c0469932b..95f26619571c3 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -320,27 +320,15 @@ macro_rules! get_null_count_values { impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { fn min_values(&self, column: &Column) -> Option { - let field = self - .parquet_schema - .fields() - .find(&column.name) - .map(|(_idx, field)| field)?; - - RowGroupStatisticsConverter::new(field) - .min([self.row_group_metadata]) + RowGroupStatisticsConverter::try_new(self.parquet_schema, &column.name) + .and_then(|converter| converter.min([self.row_group_metadata])) // ignore errors during conversion, and just use no statistics .ok() } fn max_values(&self, column: &Column) -> Option { - let field = self - .parquet_schema - .fields() - .find(&column.name) - .map(|(_idx, field)| field)?; - - RowGroupStatisticsConverter::new(field) - .max([self.row_group_metadata]) + RowGroupStatisticsConverter::try_new(self.parquet_schema, &column.name) + .and_then(|converter| converter.max([self.row_group_metadata])) // ignore errors during conversion, and just use no statistics .ok() } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 423ea2832c39e..72ee7891eaf1e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -47,15 +47,6 @@ fn sign_extend_be(b: &[u8]) -> [u8; 16] { result } -/// Extracts statistics for a single leaf column from [`RowGroupMetaData`] as an arrow [`ArrayRef`] -/// -/// For example, given a parquet file with 3 Row Groups, when asked for -/// statistics for column "A" it will return a single array with 3 elements, -/// -pub(crate) struct RowGroupStatisticsConverter<'a> { - field: &'a Field, -} - /// Extract a single min/max statistics from a [`ParquetStatistics`] object /// /// * `$column_statistics` is the `ParquetStatistics` object @@ -141,11 +132,21 @@ enum Statistic { Max, } +/// Extracts statistics for a single leaf column from [`RowGroupMetaData`] as an +/// arrow [`ArrayRef`] +/// +/// For example, given a parquet file with 3 Row Groups, when asked for +/// statistics for column "A" it will return a single array with 3 elements. +pub(crate) struct RowGroupStatisticsConverter<'a> { + field: &'a Field, +} + impl<'a> RowGroupStatisticsConverter<'a> { - /// Create a new RowGoupStatisticsConverter suitable that can extract - /// statistics for the specified field - pub fn new(field: &'a Field) -> Self { - Self { field } + /// Create a new RowGoupStatisticsConverter for extracting the named column + /// in the schema. + pub fn try_new(schema: &'a arrow_schema::Schema, column_name: &str) -> Result { + let field = schema.field_with_name(column_name)?; + Ok(Self { field }) } /// Returns the min value for the column into an array ref. @@ -538,7 +539,8 @@ mod test { let metadata = parquet_metadata(schema.clone(), input_batch); for field in schema.fields() { - let converter = RowGroupStatisticsConverter::new(field); + let converter = + RowGroupStatisticsConverter::try_new(&schema, field.name()).unwrap(); let row_groups = metadata.row_groups(); let min = converter.min(row_groups).unwrap(); assert_eq!( @@ -645,11 +647,9 @@ mod test { expected_max, } = expected_column; - let field = arrow_schema - .field_with_name(name) + let converter = RowGroupStatisticsConverter::try_new(&arrow_schema, name) .expect("can't find field in schema"); - let converter = RowGroupStatisticsConverter::new(field); let actual_min = converter.min(metadata.row_groups()).unwrap(); assert_eq!(&expected_min, &actual_min, "column {name}"); From b4009c2dd18f3203f8e55ebfb235d512cc785cae Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 27 Nov 2023 15:09:42 -0500 Subject: [PATCH 05/16] Add test for reading struct array statistics --- .../physical_plan/parquet/statistics.rs | 63 ++++++++++++++++++- 1 file changed, 61 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 72ee7891eaf1e..4a39af09884f1 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -239,8 +239,9 @@ impl<'a> RowGroupStatisticsConverter<'a> { mod test { use super::*; use arrow_array::{ - BinaryArray, BooleanArray, Decimal128Array, Float32Array, Float64Array, - Int32Array, Int64Array, RecordBatch, StringArray, TimestampNanosecondArray, + new_null_array, Array, BinaryArray, BooleanArray, Decimal128Array, Float32Array, + Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, StructArray, + TimestampNanosecondArray, }; use arrow_schema::SchemaRef; use bytes::Bytes; @@ -480,6 +481,45 @@ mod test { .run() } + #[test] + fn roundtrip_struct() { + let mut test = Test { + input: make_struct_array(vec![ + // row group 1 + (Some(true), Some(1)), + (None, None), + (Some(true), Some(3)), + // row group 2 + (Some(true), Some(0)), + (Some(false), Some(5)), + (None, None), + // row group 3 + (None, None), + (None, None), + (None, None), + ]), + // TODO not really sure what the min/max values are + expected_min: make_struct_array(vec![ + (Some(true), Some(1)), + (Some(true), Some(0)), + (None, None), + ]), + + expected_max: make_struct_array(vec![ + (Some(true), Some(3)), + (Some(true), Some(0)), + (None, None), + ]), + }; + // Due to https://github.com/apache/arrow-datafusion/issues/8334, + // statistics for struct arrays are not supported + test.expected_min = + new_null_array(test.input.data_type(), test.expected_min.len()); + test.expected_max = + new_null_array(test.input.data_type(), test.expected_min.len()); + test.run() + } + #[test] #[should_panic( expected = "Inconsistent types in ScalarValue::iter_to_array. Expected Utf8, got Binary(NULL)" @@ -803,4 +843,23 @@ mod test { }) .run(); } + + // returns a struct array with columns "b" and "i" with the specified values + fn make_struct_array(input: Vec<(Option, Option)>) -> ArrayRef { + let boolean: BooleanArray = input.iter().map(|(b, _i)| b).collect(); + let int: Int32Array = input.iter().map(|(_b, i)| i).collect(); + + let nullable = true; + let struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("b", DataType::Boolean, nullable)), + Arc::new(boolean) as ArrayRef, + ), + ( + Arc::new(Field::new("i", DataType::Int32, nullable)), + Arc::new(int) as ArrayRef, + ), + ]); + Arc::new(struct_array) + } } From ef79c42ae3b720721581e09b7e8a0e6befd2a355 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 27 Nov 2023 15:18:47 -0500 Subject: [PATCH 06/16] Add test for column after statistics --- .../physical_plan/parquet/statistics.rs | 62 +++++++++++++++---- 1 file changed, 49 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 4a39af09884f1..f8878eb58735e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -498,7 +498,6 @@ mod test { (None, None), (None, None), ]), - // TODO not really sure what the min/max values are expected_min: make_struct_array(vec![ (Some(true), Some(1)), (Some(true), Some(0)), @@ -554,6 +553,55 @@ mod test { .run() } + #[test] + fn struct_and_non_struct() { + // Ensures that statistics for an array that appears *after* a struct + // array are not wrong + let struct_col = make_struct_array(vec![ + // row group 1 + (Some(true), Some(1)), + (None, None), + (Some(true), Some(3)), + ]); + let int_col = Arc::new(Int32Array::from(vec![Some(100), Some(200), Some(300)])); + + let expected_min = Arc::new(Int32Array::from(vec![Some(100)])) as ArrayRef; + + let expected_max = Arc::new(Int32Array::from(vec![Some(300)])) as ArrayRef; + + let input_batch = RecordBatch::try_from_iter([ + ("struct_col", struct_col), + ("int_col", int_col), + ]) + .unwrap(); + + let schema = input_batch.schema(); + + let metadata = parquet_metadata(schema.clone(), input_batch); + + // read the int_col statistics + let (_idx, field) = schema.column_with_name("int_col").unwrap(); + + let converter = + RowGroupStatisticsConverter::try_new(&schema, field.name()).unwrap(); + let row_groups = metadata.row_groups(); + let min = converter.min(row_groups).unwrap(); + assert_eq!( + &min, + &expected_min, + "Min. Statistics\n\n{}\n\n", + DisplayStats(row_groups) + ); + + let max = converter.max(row_groups).unwrap(); + assert_eq!( + &max, + &expected_max, + "Max. Statistics\n\n{}\n\n", + DisplayStats(row_groups) + ); + } + const ROWS_PER_ROW_GROUP: usize = 3; /// Writes the input batch into a parquet file, with every every three rows as @@ -668,18 +716,6 @@ mod test { let arrow_schema = reader.schema(); let metadata = reader.metadata(); - for rg in metadata.row_groups() { - println!( - "Columns: {}", - rg.columns() - .iter() - .map(|c| c.column_descr().name()) - .collect::>() - .join(", ") - ); - } - println!(" row groups: {}", metadata.row_groups().len()); - for expected_column in self.expected_columns { let ExpectedColumn { name, From 9b914db36e1037c9e264a0fe5c13f14391d8dd68 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 27 Nov 2023 15:37:36 -0500 Subject: [PATCH 07/16] improve tests --- .../physical_plan/parquet/statistics.rs | 396 +++++++++--------- 1 file changed, 209 insertions(+), 187 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index f8878eb58735e..ae98bac173400 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -269,7 +269,7 @@ mod test { #[test] fn roundtrip_bool() { Test { - input: Arc::new(BooleanArray::from(vec![ + input: bool_array([ // row group 1 Some(true), None, @@ -282,17 +282,9 @@ mod test { None, None, None, - ])), - expected_min: Arc::new(BooleanArray::from(vec![ - Some(true), - Some(false), - None, - ])), - expected_max: Arc::new(BooleanArray::from(vec![ - Some(true), - Some(true), - None, - ])), + ]), + expected_min: bool_array([Some(true), Some(false), None]), + expected_max: bool_array([Some(true), Some(true), None]), } .run() } @@ -300,7 +292,7 @@ mod test { #[test] fn roundtrip_int32() { Test { - input: Arc::new(Int32Array::from(vec![ + input: i32_array([ // row group 1 Some(1), None, @@ -313,9 +305,9 @@ mod test { None, None, None, - ])), - expected_min: Arc::new(Int32Array::from(vec![Some(1), Some(0), None])), - expected_max: Arc::new(Int32Array::from(vec![Some(3), Some(5), None])), + ]), + expected_min: i32_array([Some(1), Some(0), None]), + expected_max: i32_array([Some(3), Some(5), None]), } .run() } @@ -323,7 +315,7 @@ mod test { #[test] fn roundtrip_int64() { Test { - input: Arc::new(Int64Array::from(vec![ + input: i64_array([ // row group 1 Some(1), None, @@ -336,9 +328,9 @@ mod test { None, None, None, - ])), - expected_min: Arc::new(Int64Array::from(vec![Some(1), Some(0), None])), - expected_max: Arc::new(Int64Array::from(vec![Some(3), Some(5), None])), + ]), + expected_min: i64_array([Some(1), Some(0), None]), + expected_max: i64_array(vec![Some(3), Some(5), None]), } .run() } @@ -346,7 +338,7 @@ mod test { #[test] fn roundtrip_f32() { Test { - input: Arc::new(Float32Array::from(vec![ + input: f32_array([ // row group 1 Some(1.0), None, @@ -359,9 +351,9 @@ mod test { None, None, None, - ])), - expected_min: Arc::new(Float32Array::from(vec![Some(1.0), Some(-1.0), None])), - expected_max: Arc::new(Float32Array::from(vec![Some(3.0), Some(5.0), None])), + ]), + expected_min: f32_array([Some(1.0), Some(-1.0), None]), + expected_max: f32_array([Some(3.0), Some(5.0), None]), } .run() } @@ -369,7 +361,7 @@ mod test { #[test] fn roundtrip_f64() { Test { - input: Arc::new(Float64Array::from(vec![ + input: f64_array([ // row group 1 Some(1.0), None, @@ -382,9 +374,9 @@ mod test { None, None, None, - ])), - expected_min: Arc::new(Float64Array::from(vec![Some(1.0), Some(-1.0), None])), - expected_max: Arc::new(Float64Array::from(vec![Some(3.0), Some(5.0), None])), + ]), + expected_min: f64_array([Some(1.0), Some(-1.0), None]), + expected_max: f64_array([Some(3.0), Some(5.0), None]), } .run() } @@ -395,7 +387,7 @@ mod test { )] fn roundtrip_timestamp() { Test { - input: Arc::new(TimestampNanosecondArray::from(vec![ + input: timestamp_array([ // row group 1 Some(1), None, @@ -408,17 +400,9 @@ mod test { None, None, None, - ])), - expected_min: Arc::new(TimestampNanosecondArray::from(vec![ - Some(1), - Some(5), - None, - ])), - expected_max: Arc::new(TimestampNanosecondArray::from(vec![ - Some(3), - Some(9), - None, - ])), + ]), + expected_min: timestamp_array([Some(1), Some(5), None]), + expected_max: timestamp_array([Some(3), Some(9), None]), } .run() } @@ -461,7 +445,7 @@ mod test { #[test] fn roundtrip_utf8() { Test { - input: Arc::new(StringArray::from(vec![ + input: utf8_array([ // row group 1 Some("A"), None, @@ -474,9 +458,9 @@ mod test { None, None, None, - ])), - expected_min: Arc::new(StringArray::from(vec![Some("A"), Some("AA"), None])), - expected_max: Arc::new(StringArray::from(vec![Some("Q"), Some("ZZ"), None])), + ]), + expected_min: utf8_array([Some("A"), Some("AA"), None]), + expected_max: utf8_array([Some("Q"), Some("ZZ"), None]), } .run() } @@ -484,7 +468,7 @@ mod test { #[test] fn roundtrip_struct() { let mut test = Test { - input: make_struct_array(vec![ + input: struct_array(vec![ // row group 1 (Some(true), Some(1)), (None, None), @@ -498,13 +482,13 @@ mod test { (None, None), (None, None), ]), - expected_min: make_struct_array(vec![ + expected_min: struct_array(vec![ (Some(true), Some(1)), (Some(true), Some(0)), (None, None), ]), - expected_max: make_struct_array(vec![ + expected_max: struct_array(vec![ (Some(true), Some(3)), (Some(true), Some(0)), (None, None), @@ -557,17 +541,17 @@ mod test { fn struct_and_non_struct() { // Ensures that statistics for an array that appears *after* a struct // array are not wrong - let struct_col = make_struct_array(vec![ + let struct_col = struct_array(vec![ // row group 1 (Some(true), Some(1)), (None, None), (Some(true), Some(3)), ]); - let int_col = Arc::new(Int32Array::from(vec![Some(100), Some(200), Some(300)])); + let int_col = i32_array([Some(100), Some(200), Some(300)]); - let expected_min = Arc::new(Int32Array::from(vec![Some(100)])) as ArrayRef; + let expected_min = i32_array([Some(100)]); - let expected_max = Arc::new(Int32Array::from(vec![Some(300)])) as ArrayRef; + let expected_max = i32_array(vec![Some(300)]); let input_batch = RecordBatch::try_from_iter([ ("struct_col", struct_col), @@ -602,6 +586,151 @@ mod test { ); } + #[test] + fn nan_in_stats() { + // /parquet-testing/data/nan_in_stats.parquet + // row_groups: 1 + // "x": Double({min: Some(1.0), max: Some(NaN), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + + TestFile::new("nan_in_stats.parquet") + .with_column(ExpectedColumn { + name: "x", + expected_min: Arc::new(Float64Array::from(vec![Some(1.0)])), + expected_max: Arc::new(Float64Array::from(vec![Some(f64::NAN)])), + }) + .run(); + } + + #[test] + fn alltypes_plain() { + // /parquet-testing/data/datapage_v1-snappy-compressed-checksum.parquet + // row_groups: 1 + // (has no statistics) + TestFile::new("alltypes_plain.parquet") + // No column statistics should be read as NULL, but with the right type + .with_column(ExpectedColumn { + name: "id", + expected_min: Arc::new(Int32Array::from(vec![None])), + expected_max: Arc::new(Int32Array::from(vec![None])), + }) + .with_column(ExpectedColumn { + name: "bool_col", + expected_min: Arc::new(BooleanArray::from(vec![None])), + expected_max: Arc::new(BooleanArray::from(vec![None])), + }) + .run(); + } + + #[test] + fn alltypes_tiny_pages() { + // /parquet-testing/data/alltypes_tiny_pages.parquet + // row_groups: 1 + // "id": Int32({min: Some(0), max: Some(7299), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "bool_col": Boolean({min: Some(false), max: Some(true), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "tinyint_col": Int32({min: Some(0), max: Some(9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "smallint_col": Int32({min: Some(0), max: Some(9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "int_col": Int32({min: Some(0), max: Some(9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "bigint_col": Int64({min: Some(0), max: Some(90), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "float_col": Float({min: Some(0.0), max: Some(9.9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "double_col": Double({min: Some(0.0), max: Some(90.89999999999999), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "date_string_col": ByteArray({min: Some(ByteArray { data: "01/01/09" }), max: Some(ByteArray { data: "12/31/10" }), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "string_col": ByteArray({min: Some(ByteArray { data: "0" }), max: Some(ByteArray { data: "9" }), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "timestamp_col": Int96({min: None, max: None, distinct_count: None, null_count: 0, min_max_deprecated: true, min_max_backwards_compatible: true}) + // "year": Int32({min: Some(2009), max: Some(2010), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "month": Int32({min: Some(1), max: Some(12), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + TestFile::new("alltypes_tiny_pages.parquet") + .with_column(ExpectedColumn { + name: "id", + expected_min: Arc::new(Int32Array::from(vec![Some(0)])), + expected_max: Arc::new(Int32Array::from(vec![Some(7299)])), + }) + .with_column(ExpectedColumn { + name: "bool_col", + expected_min: Arc::new(BooleanArray::from(vec![Some(false)])), + expected_max: Arc::new(BooleanArray::from(vec![Some(true)])), + }) + .with_column(ExpectedColumn { + name: "tinyint_col", + expected_min: Arc::new(Int32Array::from(vec![Some(0)])), + expected_max: Arc::new(Int32Array::from(vec![Some(9)])), + }) + .with_column(ExpectedColumn { + name: "smallint_col", + expected_min: Arc::new(Int32Array::from(vec![Some(0)])), + expected_max: Arc::new(Int32Array::from(vec![Some(9)])), + }) + .with_column(ExpectedColumn { + name: "int_col", + expected_min: Arc::new(Int32Array::from(vec![Some(0)])), + expected_max: Arc::new(Int32Array::from(vec![Some(9)])), + }) + .with_column(ExpectedColumn { + name: "bigint_col", + expected_min: Arc::new(Int64Array::from(vec![Some(0)])), + expected_max: Arc::new(Int64Array::from(vec![Some(90)])), + }) + .with_column(ExpectedColumn { + name: "float_col", + expected_min: Arc::new(Float32Array::from(vec![Some(0.0)])), + expected_max: Arc::new(Float32Array::from(vec![Some(9.9)])), + }) + .with_column(ExpectedColumn { + name: "double_col", + expected_min: Arc::new(Float64Array::from(vec![Some(0.0)])), + expected_max: Arc::new(Float64Array::from(vec![Some(90.89999999999999)])), + }) + .with_column(ExpectedColumn { + name: "date_string_col", + expected_min: Arc::new(StringArray::from(vec![Some("01/01/09")])), + expected_max: Arc::new(StringArray::from(vec![Some("12/31/10")])), + }) + .with_column(ExpectedColumn { + name: "string_col", + expected_min: Arc::new(StringArray::from(vec![Some("0")])), + expected_max: Arc::new(StringArray::from(vec![Some("9")])), + }) + // File has no min/max for timestamp_col + .with_column(ExpectedColumn { + name: "timestamp_col", + expected_min: Arc::new(TimestampNanosecondArray::from(vec![None])), + expected_max: Arc::new(TimestampNanosecondArray::from(vec![None])), + }) + .with_column(ExpectedColumn { + name: "year", + expected_min: Arc::new(Int32Array::from(vec![Some(2009)])), + expected_max: Arc::new(Int32Array::from(vec![Some(2010)])), + }) + .with_column(ExpectedColumn { + name: "month", + expected_min: Arc::new(Int32Array::from(vec![Some(1)])), + expected_max: Arc::new(Int32Array::from(vec![Some(12)])), + }) + .run(); + } + + #[test] + fn fixed_length_decimal_legacy() { + // /parquet-testing/data/fixed_length_decimal_legacy.parquet + // row_groups: 1 + // "value": FixedLenByteArray({min: Some(FixedLenByteArray(ByteArray { data: Some(ByteBufferPtr { data: b"\0\0\0\0\0\xc8" }) })), max: Some(FixedLenByteArray(ByteArray { data: "\0\0\0\0\t`" })), distinct_count: None, null_count: 0, min_max_deprecated: true, min_max_backwards_compatible: true}) + + TestFile::new("fixed_length_decimal_legacy.parquet") + .with_column(ExpectedColumn { + name: "value", + expected_min: Arc::new( + Decimal128Array::from(vec![Some(200)]) + .with_precision_and_scale(13, 2) + .unwrap(), + ), + expected_max: Arc::new( + Decimal128Array::from(vec![Some(2400)]) + .with_precision_and_scale(13, 2) + .unwrap(), + ), + }) + .run(); + } + const ROWS_PER_ROW_GROUP: usize = 3; /// Writes the input batch into a parquet file, with every every three rows as @@ -735,153 +864,46 @@ mod test { } } - #[test] - fn alltypes_plain() { - // /parquet-testing/data/datapage_v1-snappy-compressed-checksum.parquet - // row_groups: 1 - // (has no statistics) - TestFile::new("alltypes_plain.parquet") - // No column statistics should be read as NULL, but with the right type - .with_column(ExpectedColumn { - name: "id", - expected_min: Arc::new(Int32Array::from(vec![None])), - expected_max: Arc::new(Int32Array::from(vec![None])), - }) - .with_column(ExpectedColumn { - name: "bool_col", - expected_min: Arc::new(BooleanArray::from(vec![None])), - expected_max: Arc::new(BooleanArray::from(vec![None])), - }) - .run(); + fn bool_array(input: impl IntoIterator>) -> ArrayRef { + let array: BooleanArray = input.into_iter().collect(); + Arc::new(array) } - #[test] - fn alltypes_tiny_pages() { - // /parquet-testing/data/alltypes_tiny_pages.parquet - // row_groups: 1 - // "id": Int32({min: Some(0), max: Some(7299), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "bool_col": Boolean({min: Some(false), max: Some(true), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "tinyint_col": Int32({min: Some(0), max: Some(9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "smallint_col": Int32({min: Some(0), max: Some(9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "int_col": Int32({min: Some(0), max: Some(9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "bigint_col": Int64({min: Some(0), max: Some(90), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "float_col": Float({min: Some(0.0), max: Some(9.9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "double_col": Double({min: Some(0.0), max: Some(90.89999999999999), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "date_string_col": ByteArray({min: Some(ByteArray { data: "01/01/09" }), max: Some(ByteArray { data: "12/31/10" }), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "string_col": ByteArray({min: Some(ByteArray { data: "0" }), max: Some(ByteArray { data: "9" }), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "timestamp_col": Int96({min: None, max: None, distinct_count: None, null_count: 0, min_max_deprecated: true, min_max_backwards_compatible: true}) - // "year": Int32({min: Some(2009), max: Some(2010), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "month": Int32({min: Some(1), max: Some(12), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - TestFile::new("alltypes_tiny_pages.parquet") - .with_column(ExpectedColumn { - name: "id", - expected_min: Arc::new(Int32Array::from(vec![Some(0)])), - expected_max: Arc::new(Int32Array::from(vec![Some(7299)])), - }) - .with_column(ExpectedColumn { - name: "bool_col", - expected_min: Arc::new(BooleanArray::from(vec![Some(false)])), - expected_max: Arc::new(BooleanArray::from(vec![Some(true)])), - }) - .with_column(ExpectedColumn { - name: "tinyint_col", - expected_min: Arc::new(Int32Array::from(vec![Some(0)])), - expected_max: Arc::new(Int32Array::from(vec![Some(9)])), - }) - .with_column(ExpectedColumn { - name: "smallint_col", - expected_min: Arc::new(Int32Array::from(vec![Some(0)])), - expected_max: Arc::new(Int32Array::from(vec![Some(9)])), - }) - .with_column(ExpectedColumn { - name: "int_col", - expected_min: Arc::new(Int32Array::from(vec![Some(0)])), - expected_max: Arc::new(Int32Array::from(vec![Some(9)])), - }) - .with_column(ExpectedColumn { - name: "bigint_col", - expected_min: Arc::new(Int64Array::from(vec![Some(0)])), - expected_max: Arc::new(Int64Array::from(vec![Some(90)])), - }) - .with_column(ExpectedColumn { - name: "float_col", - expected_min: Arc::new(Float32Array::from(vec![Some(0.0)])), - expected_max: Arc::new(Float32Array::from(vec![Some(9.9)])), - }) - .with_column(ExpectedColumn { - name: "double_col", - expected_min: Arc::new(Float64Array::from(vec![Some(0.0)])), - expected_max: Arc::new(Float64Array::from(vec![Some(90.89999999999999)])), - }) - .with_column(ExpectedColumn { - name: "date_string_col", - expected_min: Arc::new(StringArray::from(vec![Some("01/01/09")])), - expected_max: Arc::new(StringArray::from(vec![Some("12/31/10")])), - }) - .with_column(ExpectedColumn { - name: "string_col", - expected_min: Arc::new(StringArray::from(vec![Some("0")])), - expected_max: Arc::new(StringArray::from(vec![Some("9")])), - }) - // File has no min/max for timestamp_col - .with_column(ExpectedColumn { - name: "timestamp_col", - expected_min: Arc::new(TimestampNanosecondArray::from(vec![None])), - expected_max: Arc::new(TimestampNanosecondArray::from(vec![None])), - }) - .with_column(ExpectedColumn { - name: "year", - expected_min: Arc::new(Int32Array::from(vec![Some(2009)])), - expected_max: Arc::new(Int32Array::from(vec![Some(2010)])), - }) - .with_column(ExpectedColumn { - name: "month", - expected_min: Arc::new(Int32Array::from(vec![Some(1)])), - expected_max: Arc::new(Int32Array::from(vec![Some(12)])), - }) - .run(); + fn i32_array(input: impl IntoIterator>) -> ArrayRef { + let array: Int32Array = input.into_iter().collect(); + Arc::new(array) } - #[test] - fn fixed_length_decimal_legacy() { - // /parquet-testing/data/fixed_length_decimal_legacy.parquet - // row_groups: 1 - // "value": FixedLenByteArray({min: Some(FixedLenByteArray(ByteArray { data: Some(ByteBufferPtr { data: b"\0\0\0\0\0\xc8" }) })), max: Some(FixedLenByteArray(ByteArray { data: "\0\0\0\0\t`" })), distinct_count: None, null_count: 0, min_max_deprecated: true, min_max_backwards_compatible: true}) + fn i64_array(input: impl IntoIterator>) -> ArrayRef { + let array: Int64Array = input.into_iter().collect(); + Arc::new(array) + } - TestFile::new("fixed_length_decimal_legacy.parquet") - .with_column(ExpectedColumn { - name: "value", - expected_min: Arc::new( - Decimal128Array::from(vec![Some(200)]) - .with_precision_and_scale(13, 2) - .unwrap(), - ), - expected_max: Arc::new( - Decimal128Array::from(vec![Some(2400)]) - .with_precision_and_scale(13, 2) - .unwrap(), - ), - }) - .run(); + fn f32_array(input: impl IntoIterator>) -> ArrayRef { + let array: Float32Array = input.into_iter().collect(); + Arc::new(array) } - #[test] - fn nan_in_stats() { - // /parquet-testing/data/nan_in_stats.parquet - // row_groups: 1 - // "x": Double({min: Some(1.0), max: Some(NaN), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + fn f64_array(input: impl IntoIterator>) -> ArrayRef { + let array: Float64Array = input.into_iter().collect(); + Arc::new(array) + } - TestFile::new("nan_in_stats.parquet") - .with_column(ExpectedColumn { - name: "x", - expected_min: Arc::new(Float64Array::from(vec![Some(1.0)])), - expected_max: Arc::new(Float64Array::from(vec![Some(f64::NAN)])), - }) - .run(); + fn timestamp_array(input: impl IntoIterator>) -> ArrayRef { + let array: TimestampNanosecondArray = input.into_iter().collect(); + Arc::new(array) + } + + fn utf8_array<'a>(input: impl IntoIterator>) -> ArrayRef { + let array: StringArray = input + .into_iter() + .map(|s| s.map(|s| s.to_string())) + .collect(); + Arc::new(array) } // returns a struct array with columns "b" and "i" with the specified values - fn make_struct_array(input: Vec<(Option, Option)>) -> ArrayRef { + fn struct_array(input: Vec<(Option, Option)>) -> ArrayRef { let boolean: BooleanArray = input.iter().map(|(b, _i)| b).collect(); let int: Int32Array = input.iter().map(|(_b, i)| i).collect(); From b95dea93dd82c56f32b878dfa8c9a2cad15c6299 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 27 Nov 2023 15:45:29 -0500 Subject: [PATCH 08/16] simplify --- .../physical_plan/parquet/statistics.rs | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index ae98bac173400..759b9ced406b9 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -610,13 +610,13 @@ mod test { // No column statistics should be read as NULL, but with the right type .with_column(ExpectedColumn { name: "id", - expected_min: Arc::new(Int32Array::from(vec![None])), - expected_max: Arc::new(Int32Array::from(vec![None])), + expected_min: i32_array([None]), + expected_max: i32_array([None]), }) .with_column(ExpectedColumn { name: "bool_col", - expected_min: Arc::new(BooleanArray::from(vec![None])), - expected_max: Arc::new(BooleanArray::from(vec![None])), + expected_min: bool_array([None]), + expected_max: bool_array([None]), }) .run(); } @@ -641,69 +641,69 @@ mod test { TestFile::new("alltypes_tiny_pages.parquet") .with_column(ExpectedColumn { name: "id", - expected_min: Arc::new(Int32Array::from(vec![Some(0)])), - expected_max: Arc::new(Int32Array::from(vec![Some(7299)])), + expected_min: i32_array([Some(0)]), + expected_max: i32_array([Some(7299)]), }) .with_column(ExpectedColumn { name: "bool_col", - expected_min: Arc::new(BooleanArray::from(vec![Some(false)])), - expected_max: Arc::new(BooleanArray::from(vec![Some(true)])), + expected_min: bool_array([Some(false)]), + expected_max: bool_array([Some(true)]), }) .with_column(ExpectedColumn { name: "tinyint_col", - expected_min: Arc::new(Int32Array::from(vec![Some(0)])), - expected_max: Arc::new(Int32Array::from(vec![Some(9)])), + expected_min: i32_array([Some(0)]), + expected_max: i32_array([Some(9)]), }) .with_column(ExpectedColumn { name: "smallint_col", - expected_min: Arc::new(Int32Array::from(vec![Some(0)])), - expected_max: Arc::new(Int32Array::from(vec![Some(9)])), + expected_min: i32_array([Some(0)]), + expected_max: i32_array([Some(9)]), }) .with_column(ExpectedColumn { name: "int_col", - expected_min: Arc::new(Int32Array::from(vec![Some(0)])), - expected_max: Arc::new(Int32Array::from(vec![Some(9)])), + expected_min: i32_array([Some(0)]), + expected_max: i32_array([Some(9)]), }) .with_column(ExpectedColumn { name: "bigint_col", - expected_min: Arc::new(Int64Array::from(vec![Some(0)])), - expected_max: Arc::new(Int64Array::from(vec![Some(90)])), + expected_min: i64_array([Some(0)]), + expected_max: i64_array([Some(90)]), }) .with_column(ExpectedColumn { name: "float_col", - expected_min: Arc::new(Float32Array::from(vec![Some(0.0)])), - expected_max: Arc::new(Float32Array::from(vec![Some(9.9)])), + expected_min: f32_array([Some(0.0)]), + expected_max: f32_array([Some(9.9)]), }) .with_column(ExpectedColumn { name: "double_col", - expected_min: Arc::new(Float64Array::from(vec![Some(0.0)])), - expected_max: Arc::new(Float64Array::from(vec![Some(90.89999999999999)])), + expected_min: f64_array([Some(0.0)]), + expected_max: f64_array([Some(90.89999999999999)]), }) .with_column(ExpectedColumn { name: "date_string_col", - expected_min: Arc::new(StringArray::from(vec![Some("01/01/09")])), - expected_max: Arc::new(StringArray::from(vec![Some("12/31/10")])), + expected_min: utf8_array([Some("01/01/09")]), + expected_max: utf8_array([Some("12/31/10")]), }) .with_column(ExpectedColumn { name: "string_col", - expected_min: Arc::new(StringArray::from(vec![Some("0")])), - expected_max: Arc::new(StringArray::from(vec![Some("9")])), + expected_min: utf8_array([Some("0")]), + expected_max: utf8_array([Some("9")]), }) // File has no min/max for timestamp_col .with_column(ExpectedColumn { name: "timestamp_col", - expected_min: Arc::new(TimestampNanosecondArray::from(vec![None])), - expected_max: Arc::new(TimestampNanosecondArray::from(vec![None])), + expected_min: timestamp_array([None]), + expected_max: timestamp_array([None]), }) .with_column(ExpectedColumn { name: "year", - expected_min: Arc::new(Int32Array::from(vec![Some(2009)])), - expected_max: Arc::new(Int32Array::from(vec![Some(2010)])), + expected_min: i32_array([Some(2009)]), + expected_max: i32_array([Some(2010)]), }) .with_column(ExpectedColumn { name: "month", - expected_min: Arc::new(Int32Array::from(vec![Some(1)])), - expected_max: Arc::new(Int32Array::from(vec![Some(12)])), + expected_min: i32_array([Some(1)]), + expected_max: i32_array([Some(12)]), }) .run(); } From cd3c04283db3b5ef1e523985fe4ea56802444cde Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 27 Nov 2023 15:48:05 -0500 Subject: [PATCH 09/16] clippy --- .../core/src/datasource/physical_plan/parquet/statistics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 759b9ced406b9..984d3c1b4005a 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -852,7 +852,7 @@ mod test { expected_max, } = expected_column; - let converter = RowGroupStatisticsConverter::try_new(&arrow_schema, name) + let converter = RowGroupStatisticsConverter::try_new(arrow_schema, name) .expect("can't find field in schema"); let actual_min = converter.min(metadata.row_groups()).unwrap(); From ab954533cd016a76a072b18c3f5a5a04e22e8fab Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 27 Nov 2023 15:53:35 -0500 Subject: [PATCH 10/16] Update datafusion/core/src/datasource/physical_plan/parquet/statistics.rs --- .../core/src/datasource/physical_plan/parquet/statistics.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 984d3c1b4005a..22c1d894768ca 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -385,6 +385,7 @@ mod test { #[should_panic( expected = "Inconsistent types in ScalarValue::iter_to_array. Expected Int64, got TimestampNanosecond(NULL, None)" )] + // Due to https://github.com/apache/arrow-datafusion/issues/8295 fn roundtrip_timestamp() { Test { input: timestamp_array([ From 0235a9e2e85f52207347a4c9e9dcebeb7070c6ab Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 27 Nov 2023 15:54:31 -0500 Subject: [PATCH 11/16] Update datafusion/core/src/datasource/physical_plan/parquet/statistics.rs --- .../core/src/datasource/physical_plan/parquet/statistics.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 22c1d894768ca..fd02f55c0dd58 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -508,6 +508,7 @@ mod test { #[should_panic( expected = "Inconsistent types in ScalarValue::iter_to_array. Expected Utf8, got Binary(NULL)" )] + // Due to https://github.com/apache/arrow-datafusion/issues/8295 fn roundtrip_binary() { Test { input: Arc::new(BinaryArray::from_opt_vec(vec![ From a601fbffcf3e884f84f80f2058b743483c1e1937 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 27 Nov 2023 17:02:11 -0500 Subject: [PATCH 12/16] Add test showing incorrect statistics --- .../physical_plan/parquet/statistics.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index fd02f55c0dd58..fc6648ffc1a70 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -550,11 +550,17 @@ mod test { (Some(true), Some(3)), ]); let int_col = i32_array([Some(100), Some(200), Some(300)]); - let expected_min = i32_array([Some(100)]); - let expected_max = i32_array(vec![Some(300)]); + // use a name that shadows a name in the struct column + match struct_col.data_type() { + DataType::Struct(fields) => { + assert_eq!(fields.get(1).unwrap().name(), "int_col") + } + _ => panic!("unexpected data type for struct column"), + }; + let input_batch = RecordBatch::try_from_iter([ ("struct_col", struct_col), ("int_col", int_col), @@ -904,7 +910,7 @@ mod test { Arc::new(array) } - // returns a struct array with columns "b" and "i" with the specified values + // returns a struct array with columns "bool_col" and "int_col" with the specified values fn struct_array(input: Vec<(Option, Option)>) -> ArrayRef { let boolean: BooleanArray = input.iter().map(|(b, _i)| b).collect(); let int: Int32Array = input.iter().map(|(_b, i)| i).collect(); @@ -912,11 +918,11 @@ mod test { let nullable = true; let struct_array = StructArray::from(vec![ ( - Arc::new(Field::new("b", DataType::Boolean, nullable)), + Arc::new(Field::new("bool_col", DataType::Boolean, nullable)), Arc::new(boolean) as ArrayRef, ), ( - Arc::new(Field::new("i", DataType::Int32, nullable)), + Arc::new(Field::new("int_col", DataType::Int32, nullable)), Arc::new(int) as ArrayRef, ), ]); From 06b5201988b7bc5a16126f2da139eb94c2dcb09d Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 28 Nov 2023 13:36:01 +0000 Subject: [PATCH 13/16] Rework statistics --- .../datasource/physical_plan/parquet/mod.rs | 1 + .../physical_plan/parquet/row_groups.rs | 79 ++++--- .../physical_plan/parquet/statistics.rs | 214 ++++++++---------- 3 files changed, 137 insertions(+), 157 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 677f018869b63..95aae71c779e0 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -507,6 +507,7 @@ impl FileOpener for ParquetOpener { let file_metadata = builder.metadata().clone(); let predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); let mut row_groups = row_groups::prune_row_groups_by_statistics( + builder.parquet_schema(), file_metadata.row_groups(), file_range, predicate, diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 95f26619571c3..39618d61d9265 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -16,8 +16,11 @@ // under the License. use arrow::{array::ArrayRef, datatypes::Schema}; +use arrow_schema::FieldRef; use datafusion_common::tree_node::{TreeNode, VisitRecursion}; use datafusion_common::{Column, DataFusionError, Result, ScalarValue}; +use parquet::file::metadata::ColumnChunkMetaData; +use parquet::schema::types::SchemaDescriptor; use parquet::{ arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder}, bloom_filter::Sbbf, @@ -29,12 +32,14 @@ use std::{ }; use crate::datasource::listing::FileRange; +use crate::datasource::physical_plan::parquet::statistics::{ + max_statistics, min_statistics, parquet_column, +}; use crate::logical_expr::Operator; use crate::physical_expr::expressions as phys_expr; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use crate::physical_plan::PhysicalExpr; -use super::statistics::RowGroupStatisticsConverter; use super::ParquetFileMetrics; /// Prune row groups based on statistics @@ -46,7 +51,10 @@ use super::ParquetFileMetrics; /// /// If an index IS present in the returned Vec it means the predicate /// did not filter out that row group. +/// +/// Note: This method currently ignores ColumnOrder (#8342) pub(crate) fn prune_row_groups_by_statistics( + parquet_schema: &SchemaDescriptor, groups: &[RowGroupMetaData], range: Option, predicate: Option<&PruningPredicate>, @@ -69,8 +77,9 @@ pub(crate) fn prune_row_groups_by_statistics( if let Some(predicate) = predicate { let pruning_stats = RowGroupPruningStatistics { + parquet_schema, row_group_metadata: metadata, - parquet_schema: predicate.schema().as_ref(), + arrow_schema: predicate.schema().as_ref(), }; match predicate.prune(&pruning_stats) { Ok(values) => { @@ -291,46 +300,33 @@ impl BloomFilterPruningPredicate { } } -/// Wraps parquet statistics in a way -/// that implements [`PruningStatistics`] +/// Wraps [`RowGroupMetaData`] in a way that implements [`PruningStatistics`] +/// +/// Note: This should be implemented for an array of [`RowGroupMetaData`] instead +/// of per row-group struct RowGroupPruningStatistics<'a> { + parquet_schema: &'a SchemaDescriptor, row_group_metadata: &'a RowGroupMetaData, - parquet_schema: &'a Schema, + arrow_schema: &'a Schema, } -// Extract the null count value on the ParquetStatistics -macro_rules! get_null_count_values { - ($self:expr, $column:expr) => {{ - let value = ScalarValue::UInt64( - if let Some(col) = $self - .row_group_metadata - .columns() - .iter() - .find(|c| c.column_descr().name() == &$column.name) - { - col.statistics().map(|s| s.null_count()) - } else { - Some($self.row_group_metadata.num_rows() as u64) - }, - ); - - value.to_array().ok() - }}; +impl<'a> RowGroupPruningStatistics<'a> { + /// Lookups up the parquet column by name + fn column(&self, name: &str) -> Option<(&ColumnChunkMetaData, &FieldRef)> { + let (idx, field) = parquet_column(self.parquet_schema, self.arrow_schema, name)?; + Some((self.row_group_metadata.column(idx), field)) + } } impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { fn min_values(&self, column: &Column) -> Option { - RowGroupStatisticsConverter::try_new(self.parquet_schema, &column.name) - .and_then(|converter| converter.min([self.row_group_metadata])) - // ignore errors during conversion, and just use no statistics - .ok() + let (column, field) = self.column(&column.name)?; + min_statistics(field.data_type(), std::iter::once(column.statistics())).ok() } fn max_values(&self, column: &Column) -> Option { - RowGroupStatisticsConverter::try_new(self.parquet_schema, &column.name) - .and_then(|converter| converter.max([self.row_group_metadata])) - // ignore errors during conversion, and just use no statistics - .ok() + let (column, field) = self.column(&column.name)?; + max_statistics(field.data_type(), std::iter::once(column.statistics())).ok() } fn num_containers(&self) -> usize { @@ -338,7 +334,9 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { } fn null_counts(&self, column: &Column) -> Option { - get_null_count_values!(self, column) + let (c, _) = self.column(&column.name)?; + let scalar = ScalarValue::UInt64(Some(c.statistics()?.null_count())); + scalar.to_array().ok() } } @@ -358,6 +356,7 @@ mod tests { use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::{create_physical_expr, PhysicalExpr}; use datafusion_sql::planner::ContextProvider; + use parquet::arrow::arrow_to_parquet_schema; use parquet::arrow::async_reader::ParquetObjectReader; use parquet::basic::LogicalType; use parquet::data_type::{ByteArray, FixedLenByteArray}; @@ -435,6 +434,7 @@ mod tests { let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( + &schema_descr, &[rgm1, rgm2], None, Some(&pruning_predicate), @@ -469,6 +469,7 @@ mod tests { // is null / undefined so the first row group can't be filtered out assert_eq!( prune_row_groups_by_statistics( + &schema_descr, &[rgm1, rgm2], None, Some(&pruning_predicate), @@ -516,6 +517,7 @@ mod tests { // when conditions are joined using AND assert_eq!( prune_row_groups_by_statistics( + &schema_descr, groups, None, Some(&pruning_predicate), @@ -534,6 +536,7 @@ mod tests { // this bypasses the entire predicate expression and no row groups are filtered out assert_eq!( prune_row_groups_by_statistics( + &schema_descr, groups, None, Some(&pruning_predicate), @@ -573,6 +576,7 @@ mod tests { Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Boolean, false), ])); + let schema_descr = arrow_to_parquet_schema(&schema).unwrap(); let expr = col("c1").gt(lit(15)).and(col("c2").is_null()); let expr = logical2physical(&expr, &schema); let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap(); @@ -582,6 +586,7 @@ mod tests { // First row group was filtered out because it contains no null value on "c2". assert_eq!( prune_row_groups_by_statistics( + &schema_descr, &groups, None, Some(&pruning_predicate), @@ -601,6 +606,7 @@ mod tests { Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Boolean, false), ])); + let schema_descr = arrow_to_parquet_schema(&schema).unwrap(); let expr = col("c1") .gt(lit(15)) .and(col("c2").eq(lit(ScalarValue::Boolean(None)))); @@ -613,6 +619,7 @@ mod tests { // pass predicates. Ideally these should both be false assert_eq!( prune_row_groups_by_statistics( + &schema_descr, &groups, None, Some(&pruning_predicate), @@ -671,6 +678,7 @@ mod tests { let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( + &schema_descr, &[rgm1, rgm2, rgm3], None, Some(&pruning_predicate), @@ -734,6 +742,7 @@ mod tests { let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( + &schema_descr, &[rgm1, rgm2, rgm3, rgm4], None, Some(&pruning_predicate), @@ -781,6 +790,7 @@ mod tests { let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( + &schema_descr, &[rgm1, rgm2, rgm3], None, Some(&pruning_predicate), @@ -851,6 +861,7 @@ mod tests { let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( + &schema_descr, &[rgm1, rgm2, rgm3], None, Some(&pruning_predicate), @@ -910,6 +921,7 @@ mod tests { let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( + &schema_descr, &[rgm1, rgm2, rgm3], None, Some(&pruning_predicate), @@ -923,7 +935,6 @@ mod tests { schema_descr: &SchemaDescPtr, column_statistics: Vec, ) -> RowGroupMetaData { - use parquet::file::metadata::ColumnChunkMetaData; let mut columns = vec![]; for (i, s) in column_statistics.iter().enumerate() { let column = ColumnChunkMetaData::builder(schema_descr.column(i)) @@ -941,7 +952,7 @@ mod tests { } fn get_test_schema_descr(fields: Vec) -> SchemaDescPtr { - use parquet::schema::types::{SchemaDescriptor, Type as SchemaType}; + use parquet::schema::types::Type as SchemaType; let schema_fields = fields .iter() .map(|field| { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index fc6648ffc1a70..156f314056d01 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -19,11 +19,12 @@ use arrow::{array::ArrayRef, datatypes::DataType}; use arrow_array::new_empty_array; -use arrow_schema::Field; +use arrow_schema::{Field, FieldRef, Schema}; use datafusion_common::{Result, ScalarValue}; use parquet::file::{ metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics, }; +use parquet::schema::types::SchemaDescriptor; // Convert the bytes array to i128. // The endian of the input bytes array must be big-endian. @@ -66,8 +67,8 @@ macro_rules! get_statistic { Some(DataType::Decimal128(precision, scale)) => { Some(ScalarValue::Decimal128( Some(*s.$func() as i128), - precision, - scale, + *precision, + *scale, )) } _ => Some(ScalarValue::Int32(Some(*s.$func()))), @@ -79,8 +80,8 @@ macro_rules! get_statistic { Some(DataType::Decimal128(precision, scale)) => { Some(ScalarValue::Decimal128( Some(*s.$func() as i128), - precision, - scale, + *precision, + *scale, )) } _ => Some(ScalarValue::Int64(Some(*s.$func()))), @@ -96,8 +97,8 @@ macro_rules! get_statistic { Some(DataType::Decimal128(precision, scale)) => { Some(ScalarValue::Decimal128( Some(from_bytes_to_i128(s.$bytes_func())), - precision, - scale, + *precision, + *scale, )) } _ => { @@ -115,8 +116,8 @@ macro_rules! get_statistic { Some(DataType::Decimal128(precision, scale)) => { Some(ScalarValue::Decimal128( Some(from_bytes_to_i128(s.$bytes_func())), - precision, - scale, + *precision, + *scale, )) } _ => None, @@ -126,111 +127,62 @@ macro_rules! get_statistic { }}; } -#[derive(Debug, Clone, Copy)] -enum Statistic { - Min, - Max, -} - -/// Extracts statistics for a single leaf column from [`RowGroupMetaData`] as an -/// arrow [`ArrayRef`] +/// Lookups up the parquet column by name /// -/// For example, given a parquet file with 3 Row Groups, when asked for -/// statistics for column "A" it will return a single array with 3 elements. -pub(crate) struct RowGroupStatisticsConverter<'a> { - field: &'a Field, +/// Returns the parquet column index and the corresponding arrow field +pub fn parquet_column<'a>( + parquet_schema: &SchemaDescriptor, + arrow_schema: &'a Schema, + name: &str, +) -> Option<(usize, &'a FieldRef)> { + let (root_idx, field) = arrow_schema.fields.find(name)?; + if field.data_type().is_nested() { + // Nested fields are not supported and require non-trivial logic + // to correctly walk the parquet schema accounting for the + // logical type rules - + // + // For example a ListArray could correspond to anything from 1 to 3 levels + // in the parquet schema + return None; + } + + // This could be made more efficient (#TBD) + let parquet_idx = (0..parquet_schema.columns().len()) + .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?; + Some((parquet_idx, field)) } -impl<'a> RowGroupStatisticsConverter<'a> { - /// Create a new RowGoupStatisticsConverter for extracting the named column - /// in the schema. - pub fn try_new(schema: &'a arrow_schema::Schema, column_name: &str) -> Result { - let field = schema.field_with_name(column_name)?; - Ok(Self { field }) - } - - /// Returns the min value for the column into an array ref. - pub fn min<'b>( - &self, - row_group_meta_data: impl IntoIterator, - ) -> Result { - self.min_max_impl(Statistic::Min, row_group_meta_data) - } - - /// Returns the max value for the column into an array ref. - pub fn max<'b>( - &self, - row_group_meta_data: impl IntoIterator, - ) -> Result { - self.min_max_impl(Statistic::Max, row_group_meta_data) - } - - /// Extracts all min/max values for the column into an array ref. - fn min_max_impl<'b>( - &self, - mm: Statistic, - row_group_meta_data: impl IntoIterator, - ) -> Result { - let mut row_group_meta_data = row_group_meta_data.into_iter().peekable(); - - // if it is empty, return empty array - if row_group_meta_data.peek().is_none() { - return Ok(new_empty_array(self.field.data_type())); - } +/// Extracts the min statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`] +pub fn min_statistics<'a, I: Iterator>>( + data_type: &DataType, + iterator: I, +) -> Result { + let scalars = iterator + .map(|x| x.and_then(|s| get_statistic!(s, min, min_bytes, Some(data_type)))); + collect_scalars(data_type, scalars) +} - let maybe_index = row_group_meta_data.peek().and_then(|rg_meta| { - rg_meta - .columns() - .iter() - .enumerate() - .find(|(_idx, c)| c.column_descr().name() == self.field.name()) - .map(|(idx, _c)| idx) - }); - - // don't have this column, return an array of all NULLs - let Some(column_index) = maybe_index else { - let num_row_groups = row_group_meta_data.count(); - let sv = ScalarValue::try_from(self.field.data_type())?; - return sv.to_array_of_size(num_row_groups); - }; +/// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`] +pub fn max_statistics<'a, I: Iterator>>( + data_type: &DataType, + iterator: I, +) -> Result { + let scalars = iterator + .map(|x| x.and_then(|s| get_statistic!(s, max, max_bytes, Some(data_type)))); + collect_scalars(data_type, scalars) +} - let stats_iter = row_group_meta_data.map(move |row_group_meta_data| { - row_group_meta_data.column(column_index).statistics() - }); - - // this is the value to use when the statistics are not set - let null_value = ScalarValue::try_from(self.field.data_type())?; - match mm { - Statistic::Min => { - let values = stats_iter.map(|column_statistics| { - column_statistics - .and_then(|column_statistics| { - get_statistic!( - column_statistics, - min, - min_bytes, - Some(self.field.data_type().clone()) - ) - }) - .unwrap_or_else(|| null_value.clone()) - }); - ScalarValue::iter_to_array(values) - } - Statistic::Max => { - let values = stats_iter.map(|column_statistics| { - column_statistics - .and_then(|column_statistics| { - get_statistic!( - column_statistics, - max, - max_bytes, - Some(self.field.data_type().clone()) - ) - }) - .unwrap_or_else(|| null_value.clone()) - }); - ScalarValue::iter_to_array(values) - } +/// Builds an array from an iterator of ScalarValue +fn collect_scalars>>( + data_type: &DataType, + iterator: I, +) -> Result { + let mut scalars = iterator.peekable(); + match scalars.peek().is_none() { + true => Ok(new_empty_array(data_type)), + false => { + let null = ScalarValue::try_from(data_type)?; + ScalarValue::iter_to_array(scalars.map(|x| x.unwrap_or_else(|| null.clone()))) } } } @@ -570,14 +522,16 @@ mod test { let schema = input_batch.schema(); let metadata = parquet_metadata(schema.clone(), input_batch); + let parquet_schema = metadata.file_metadata().schema_descr(); // read the int_col statistics - let (_idx, field) = schema.column_with_name("int_col").unwrap(); + let (idx, _) = parquet_column(parquet_schema, &schema, "int_col").unwrap(); + assert_eq!(idx, 2); - let converter = - RowGroupStatisticsConverter::try_new(&schema, field.name()).unwrap(); let row_groups = metadata.row_groups(); - let min = converter.min(row_groups).unwrap(); + let iter = row_groups.iter().map(|x| x.column(idx).statistics()); + + let min = min_statistics(&DataType::Int32, iter.clone()).unwrap(); assert_eq!( &min, &expected_min, @@ -585,7 +539,7 @@ mod test { DisplayStats(row_groups) ); - let max = converter.max(row_groups).unwrap(); + let max = max_statistics(&DataType::Int32, iter).unwrap(); assert_eq!( &max, &expected_max, @@ -762,12 +716,23 @@ mod test { let schema = input_batch.schema(); let metadata = parquet_metadata(schema.clone(), input_batch); + let parquet_schema = metadata.file_metadata().schema_descr(); + + let row_groups = metadata.row_groups(); for field in schema.fields() { - let converter = - RowGroupStatisticsConverter::try_new(&schema, field.name()).unwrap(); - let row_groups = metadata.row_groups(); - let min = converter.min(row_groups).unwrap(); + if field.data_type().is_nested() { + let lookup = parquet_column(parquet_schema, &schema, field.name()); + assert_eq!(lookup, None); + continue; + } + + let (idx, f) = + parquet_column(parquet_schema, &schema, field.name()).unwrap(); + assert_eq!(f, field); + + let iter = row_groups.iter().map(|x| x.column(idx).statistics()); + let min = min_statistics(f.data_type(), iter.clone()).unwrap(); assert_eq!( &min, &expected_min, @@ -775,7 +740,7 @@ mod test { DisplayStats(row_groups) ); - let max = converter.max(row_groups).unwrap(); + let max = max_statistics(f.data_type(), iter).unwrap(); assert_eq!( &max, &expected_max, @@ -852,6 +817,8 @@ mod test { let reader = ArrowReaderBuilder::try_new(file).unwrap(); let arrow_schema = reader.schema(); let metadata = reader.metadata(); + let row_groups = metadata.row_groups(); + let parquet_schema = metadata.file_metadata().schema_descr(); for expected_column in self.expected_columns { let ExpectedColumn { @@ -860,13 +827,14 @@ mod test { expected_max, } = expected_column; - let converter = RowGroupStatisticsConverter::try_new(arrow_schema, name) - .expect("can't find field in schema"); + let (idx, field) = + parquet_column(parquet_schema, arrow_schema, name).unwrap(); - let actual_min = converter.min(metadata.row_groups()).unwrap(); + let iter = row_groups.iter().map(|x| x.column(idx).statistics()); + let actual_min = min_statistics(field.data_type(), iter.clone()).unwrap(); assert_eq!(&expected_min, &actual_min, "column {name}"); - let actual_max = converter.max(metadata.row_groups()).unwrap(); + let actual_max = max_statistics(field.data_type(), iter).unwrap(); assert_eq!(&expected_max, &actual_max, "column {name}"); } } From e5cd8cf506cf28600df2711b10528a3a67cfed77 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 28 Nov 2023 10:34:21 -0500 Subject: [PATCH 14/16] Fix clippy --- .../src/datasource/physical_plan/parquet/statistics.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 156f314056d01..a566b770a9a7f 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -19,11 +19,9 @@ use arrow::{array::ArrayRef, datatypes::DataType}; use arrow_array::new_empty_array; -use arrow_schema::{Field, FieldRef, Schema}; +use arrow_schema::{FieldRef, Schema}; use datafusion_common::{Result, ScalarValue}; -use parquet::file::{ - metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics, -}; +use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::schema::types::SchemaDescriptor; // Convert the bytes array to i128. @@ -195,12 +193,12 @@ mod test { Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, StructArray, TimestampNanosecondArray, }; - use arrow_schema::SchemaRef; + use arrow_schema::{Field, SchemaRef}; use bytes::Bytes; use datafusion_common::test_util::parquet_test_data; use parquet::arrow::arrow_reader::ArrowReaderBuilder; use parquet::arrow::arrow_writer::ArrowWriter; - use parquet::file::metadata::ParquetMetaData; + use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; use parquet::file::properties::{EnabledStatistics, WriterProperties}; use std::path::PathBuf; use std::sync::Arc; From 702269195c7c09135ffb5df483af1575237952d6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 28 Nov 2023 10:46:34 -0500 Subject: [PATCH 15/16] Update documentation and make it clear the statistics are not publically accessable --- .../src/datasource/physical_plan/parquet/row_groups.rs | 3 ++- .../src/datasource/physical_plan/parquet/statistics.rs | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 39618d61d9265..0ab2046097c41 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -52,7 +52,8 @@ use super::ParquetFileMetrics; /// If an index IS present in the returned Vec it means the predicate /// did not filter out that row group. /// -/// Note: This method currently ignores ColumnOrder (#8342) +/// Note: This method currently ignores ColumnOrder +/// pub(crate) fn prune_row_groups_by_statistics( parquet_schema: &SchemaDescriptor, groups: &[RowGroupMetaData], diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index a566b770a9a7f..d4beb1914dbbd 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! [`RowGroupStatisticsConverter`] tp converts parquet RowGroup statistics to arrow [`ArrayRef`]. +//! [`min_statistics`] and [`max_statistics`] convert statistics in parquet format to arrow [`ArrayRef`]. use arrow::{array::ArrayRef, datatypes::DataType}; use arrow_array::new_empty_array; @@ -128,7 +128,7 @@ macro_rules! get_statistic { /// Lookups up the parquet column by name /// /// Returns the parquet column index and the corresponding arrow field -pub fn parquet_column<'a>( +pub(crate) fn parquet_column<'a>( parquet_schema: &SchemaDescriptor, arrow_schema: &'a Schema, name: &str, @@ -151,7 +151,7 @@ pub fn parquet_column<'a>( } /// Extracts the min statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`] -pub fn min_statistics<'a, I: Iterator>>( +pub(crate) fn min_statistics<'a, I: Iterator>>( data_type: &DataType, iterator: I, ) -> Result { @@ -161,7 +161,7 @@ pub fn min_statistics<'a, I: Iterator>>( } /// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`] -pub fn max_statistics<'a, I: Iterator>>( +pub(crate) fn max_statistics<'a, I: Iterator>>( data_type: &DataType, iterator: I, ) -> Result { From a5e235aec4225c637fcfecf22e0a2337da3769ee Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 28 Nov 2023 10:51:27 -0500 Subject: [PATCH 16/16] Add link to upstream arrow ticket --- .../core/src/datasource/physical_plan/parquet/statistics.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index d4beb1914dbbd..4e472606da515 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -17,6 +17,8 @@ //! [`min_statistics`] and [`max_statistics`] convert statistics in parquet format to arrow [`ArrayRef`]. +// TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328 + use arrow::{array::ArrayRef, datatypes::DataType}; use arrow_array::new_empty_array; use arrow_schema::{FieldRef, Schema};