From c8bea51af458ac05833594b6290bd61af96b12e5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 20 May 2021 11:10:29 -0400 Subject: [PATCH 1/2] Return Vec from PruningPredicateBuilder rather than an `Fn` --- datafusion/src/physical_optimizer/pruning.rs | 85 ++++++++++---------- datafusion/src/physical_plan/parquet.rs | 37 +++++++-- 2 files changed, 72 insertions(+), 50 deletions(-) diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs index a13ca56630bc0..51c1b2b9f1e7e 100644 --- a/datafusion/src/physical_optimizer/pruning.rs +++ b/datafusion/src/physical_optimizer/pruning.rs @@ -15,9 +15,14 @@ // specific language governing permissions and limitations // under the License. -//! This module contains code to rule out row groups / partitions / -//! etc based on statistics prior in order to skip evaluating entire -//! swaths of rows. +//! This module contains code to prune "containers" of row groups +//! based on statistics prior to execution. This can lead to +//! significant performance improvements by avoiding the need +//! to evaluate a plan on entire containers (e.g. an entire file) +//! +//! For example, it is used to prune (skip) row groups while reading +//! parquet files if it can be determined from the predicate that +//! nothing in the row group can match. //! //! This code is currently specific to Parquet, but soon (TM), via //! https://github.com/apache/arrow-datafusion/issues/363 it will @@ -85,24 +90,24 @@ impl PruningPredicateBuilder { }) } - /// Generate a predicate function used to filter based on - /// statistics + /// For each set of statistics, evalates the predicate in this + /// builder and returns a `bool` with the following meaning for a + /// container with those statistics: + /// + /// `true`: The container MAY contain rows that match the predicate /// - /// This function takes a slice of statistics as parameter, so - /// that DataFusion's physical expressions can be executed once - /// against a single RecordBatch, containing statistics arrays, on - /// which the physical predicate expression is executed to - /// generate a row group filter array. + /// `false`: The container definitely does NOT contain rows that match the predicate /// - /// The generated filter function is then used in the returned - /// closure to filter row groups. NOTE this is parquet specific at the moment + /// Note this function takes a slice of statistics as a parameter + /// to amortize the cost of the evaluation of the predicate + /// against a single record batch. pub fn build_pruning_predicate( &self, - row_group_metadata: &[RowGroupMetaData], - ) -> Box bool> { + statistics: &[RowGroupMetaData], + ) -> Result> { // build statistics record batch - let predicate_result = build_statistics_record_batch( - row_group_metadata, + let predicate_array = build_statistics_record_batch( + statistics, &self.schema, &self.stat_column_req, ) @@ -112,33 +117,29 @@ impl PruningPredicateBuilder { }) .and_then(|v| match v { ColumnarValue::Array(array) => Ok(array), - ColumnarValue::Scalar(_) => Err(DataFusionError::Plan( + ColumnarValue::Scalar(_) => Err(DataFusionError::Internal( "predicate expression didn't return an array".to_string(), )), - }); - - let predicate_array = match predicate_result { - Ok(array) => array, - // row group filter array could not be built - // return a closure which will not filter out any row groups - _ => return Box::new(|_r, _i| true), - }; + })?; - let predicate_array = predicate_array.as_any().downcast_ref::(); - match predicate_array { - // return row group predicate function - Some(array) => { - // when the result of the predicate expression for a row group is null / undefined, - // e.g. due to missing statistics, this row group can't be filtered out, - // so replace with true - let predicate_values = - array.iter().map(|x| x.unwrap_or(true)).collect::>(); - Box::new(move |_, i| predicate_values[i]) - } - // predicate result is not a BooleanArray - // return a closure which will not filter out any row groups - _ => Box::new(|_r, _i| true), - } + let predicate_array = predicate_array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal(format!( + "Expected pruning predicate evaluation to be BooleanArray, \ + but was {:?}", + predicate_array + )) + })?; + + // when the result of the predicate expression for a row group is null / undefined, + // e.g. due to missing statistics, this row group can't be filtered out, + // so replace with true + Ok(predicate_array + .into_iter() + .map(|x| x.unwrap_or(true)) + .collect::>()) } } @@ -146,7 +147,7 @@ impl PruningPredicateBuilder { /// [`RowGroupMetadata`] structs), creating arrays, one for each /// statistics column, as requested in the stat_column_req parameter. fn build_statistics_record_batch( - row_groups: &[RowGroupMetaData], + statistics: &[RowGroupMetaData], schema: &Schema, stat_column_req: &[(String, StatisticsType, Field)], ) -> Result { @@ -154,7 +155,7 @@ fn build_statistics_record_batch( let mut arrays = Vec::::new(); for (column_name, statistics_type, stat_field) in stat_column_req { if let Some((column_index, _)) = schema.column_with_name(column_name) { - let statistics = row_groups + let statistics = statistics .iter() .map(|g| g.column(column_index).statistics()) .collect::>(); diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index 66b1253db3d40..f36171cdb73f4 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -38,7 +38,10 @@ use arrow::{ error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; -use parquet::file::reader::{FileReader, SerializedFileReader}; +use parquet::file::{ + metadata::RowGroupMetaData, + reader::{FileReader, SerializedFileReader}, +}; use fmt::Debug; use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; @@ -454,6 +457,22 @@ fn send_result( Ok(()) } +fn build_row_group_predicate( + predicate_builder: &PruningPredicateBuilder, + row_group_metadata: &[RowGroupMetaData], +) -> Box bool> { + let predicate_values = predicate_builder.build_pruning_predicate(row_group_metadata); + + let predicate_values = match predicate_values { + Ok(values) => values, + // stats filter array could not be built + // return a closure which will not filter out any row groups + _ => return Box::new(|_r, _i| true), + }; + + Box::new(move |_, i| predicate_values[i]) +} + fn read_files( filenames: &[String], projection: &[usize], @@ -467,8 +486,10 @@ fn read_files( let file = File::open(&filename)?; let mut file_reader = SerializedFileReader::new(file)?; if let Some(predicate_builder) = predicate_builder { - let row_group_predicate = predicate_builder - .build_pruning_predicate(file_reader.metadata().row_groups()); + let row_group_predicate = build_row_group_predicate( + predicate_builder, + file_reader.metadata().row_groups(), + ); file_reader.filter_row_groups(&row_group_predicate); } let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); @@ -643,7 +664,7 @@ mod tests { ); let row_group_metadata = vec![rgm1, rgm2]; let row_group_predicate = - predicate_builder.build_pruning_predicate(&row_group_metadata); + build_row_group_predicate(&predicate_builder, &row_group_metadata); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -673,7 +694,7 @@ mod tests { ); let row_group_metadata = vec![rgm1, rgm2]; let row_group_predicate = - predicate_builder.build_pruning_predicate(&row_group_metadata); + build_row_group_predicate(&predicate_builder, &row_group_metadata); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -718,7 +739,7 @@ mod tests { ); let row_group_metadata = vec![rgm1, rgm2]; let row_group_predicate = - predicate_builder.build_pruning_predicate(&row_group_metadata); + build_row_group_predicate(&predicate_builder, &row_group_metadata); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -733,7 +754,7 @@ mod tests { let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2))); let predicate_builder = PruningPredicateBuilder::try_new(&expr, schema)?; let row_group_predicate = - predicate_builder.build_pruning_predicate(&row_group_metadata); + build_row_group_predicate(&predicate_builder, &row_group_metadata); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -777,7 +798,7 @@ mod tests { ); let row_group_metadata = vec![rgm1, rgm2]; let row_group_predicate = - predicate_builder.build_pruning_predicate(&row_group_metadata); + build_row_group_predicate(&predicate_builder, &row_group_metadata); let row_group_filter = row_group_metadata .iter() .enumerate() From 1db93632de7d785320413723a55dc6833eee49cb Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 21 May 2021 13:48:30 -0400 Subject: [PATCH 2/2] Update datafusion/src/physical_optimizer/pruning.rs Co-authored-by: Jorge Leitao --- datafusion/src/physical_optimizer/pruning.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs index 51c1b2b9f1e7e..0446904eae030 100644 --- a/datafusion/src/physical_optimizer/pruning.rs +++ b/datafusion/src/physical_optimizer/pruning.rs @@ -96,7 +96,7 @@ impl PruningPredicateBuilder { /// /// `true`: The container MAY contain rows that match the predicate /// - /// `false`: The container definitely does NOT contain rows that match the predicate + /// `false`: The container MUST NOT contain rows that match the predicate /// /// Note this function takes a slice of statistics as a parameter /// to amortize the cost of the evaluation of the predicate