From be017d88141d7c54965cb8dc2b0648ed10b4de5f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 20 May 2021 13:20:26 -0400 Subject: [PATCH 1/4] Rewrite pruning logic in terms of PruningStatistics using Array trait --- datafusion/src/physical_optimizer/pruning.rs | 702 +++++++++++------- .../src/physical_optimizer/repartition.rs | 6 +- datafusion/src/physical_plan/parquet.rs | 133 +++- 3 files changed, 571 insertions(+), 270 deletions(-) diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs index 0446904eae030..3a5a64c6f6689 100644 --- a/datafusion/src/physical_optimizer/pruning.rs +++ b/datafusion/src/physical_optimizer/pruning.rs @@ -31,19 +31,11 @@ use std::{collections::HashSet, sync::Arc}; use arrow::{ - array::{ - make_array, new_null_array, ArrayData, ArrayRef, BooleanArray, - BooleanBufferBuilder, - }, - buffer::MutableBuffer, - datatypes::{DataType, Field, Schema}, + array::{new_null_array, ArrayRef, BooleanArray}, + datatypes::{Field, Schema, SchemaRef}, record_batch::RecordBatch, }; -use parquet::file::{ - metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics, -}; - use crate::{ error::{DataFusionError, Result}, execution::context::ExecutionContextState, @@ -52,26 +44,81 @@ use crate::{ physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr}, }; +/// Interface to pass statistics information to [`PruningPredicates`] +/// +/// Returns statistics for containers / files of data in Arrays. +/// +/// For example, for the following three files with a single column +/// ```text +/// file1: column a: min=5, max=10 +/// file2: column a: No stats +/// file2: column a: min=20, max=30 +/// ``` +/// +/// PruningStatistics should return: +/// +/// ```text +/// min_values("a") -> Some([5, Null, 20]) +/// max_values("a") -> Some([20, Null, 30]) +/// min_values("X") -> None +/// ``` +pub trait PruningStatistics { + /// return the minimum values for the named column, if known. + /// Note: the returned array must contain `num_containers()` rows + fn min_values(&self, column: &str) -> Option; + + /// return the maximum values for the named column, if known. + /// Note: the returned array must contain `num_containers()` rows. + fn max_values(&self, column: &str) -> Option; + + /// return the number of containers (e.g. row groups) being + /// pruned with these statistics + fn num_containers(&self) -> usize; +} + +/// Evaluates filter expressions on statistics in order to +/// prune data containers (e.g. parquet row group) +/// +/// See [`try_new`] for more information. #[derive(Debug, Clone)] -/// Builder used for generating predicate functions that can be used -/// to prune data based on statistics (e.g. parquet row group metadata) -pub struct PruningPredicateBuilder { - schema: Schema, +pub struct PruningPredicate { + /// The input schema against which the predicate will be evaluated + schema: SchemaRef, + /// Actual pruning predicate (rewritten in terms of column min/max statistics) predicate_expr: Arc, + /// The statistics required to evaluate this predicate: + /// * The column name in the input schema + /// * Statistics type (e.g. Min or Max) + /// * The field the statistics value should be placed in for + /// pruning predicate evaluation stat_column_req: Vec<(String, StatisticsType, Field)>, } -impl PruningPredicateBuilder { - /// Try to create a new instance of [`PruningPredicateBuilder`] +impl PruningPredicate { + /// Try to create a new instance of [`PruningPredicate`] + /// + /// This will translate the provided `expr` filter expression into + /// a *pruning predicate*. + /// + /// A pruning predicate is one that has been rewritten in terms of + /// the min and max values of column references and that evaluates + /// to FALSE if the filter predicate would evaluate FALSE *for + /// every row* whose values fell within the min / max ranges (aka + /// could be pruned). /// - /// This will translate the filter expression into a statistics predicate expression + /// The pruning predicate evaluates to TRUE or NULL + /// if the filter predicate *might* evaluate to TRUE for at least + /// one row whose vaules fell within the min/max ranges (in other + /// words they might pass the predicate) /// - /// For example, `(column / 2) = 4` becomes `(column_min / 2) <= 4 && 4 <= (column_max / 2))` - pub fn try_new(expr: &Expr, schema: Schema) -> Result { + /// For example, the filter expression `(column / 2) = 4` becomes + /// the pruning predicate + /// `(column_min / 2) <= 4 && 4 <= (column_max / 2))` + pub fn try_new(expr: &Expr, schema: SchemaRef) -> Result { // build predicate expression once let mut stat_column_req = Vec::<(String, StatisticsType, Field)>::new(); let logical_predicate_expr = - build_predicate_expression(expr, &schema, &mut stat_column_req)?; + build_predicate_expression(expr, schema.as_ref(), &mut stat_column_req)?; let stat_fields = stat_column_req .iter() .map(|(_, _, f)| f.clone()) @@ -90,37 +137,31 @@ impl PruningPredicateBuilder { }) } - /// For each set of statistics, evalates the predicate in this - /// builder and returns a `bool` with the following meaning for a - /// container with those statistics: + /// For each set of statistics, evalates the pruning predicate + /// and returns a `bool` with the following meaning for a + /// all rows whose values match the statistics: /// - /// `true`: The container MAY contain rows that match the predicate + /// `true`: There MAY be rows that match the predicate /// - /// `false`: The container MUST NOT contain rows that match the predicate + /// `false`: There are no rows that could match the predicate /// /// Note this function takes a slice of statistics as a parameter /// to amortize the cost of the evaluation of the predicate /// against a single record batch. - pub fn build_pruning_predicate( - &self, - statistics: &[RowGroupMetaData], - ) -> Result> { + pub fn prune(&self, statistics: &S) -> Result> { // build statistics record batch - let predicate_array = build_statistics_record_batch( - statistics, - &self.schema, - &self.stat_column_req, - ) - .and_then(|statistics_batch| { - // execute predicate expression - self.predicate_expr.evaluate(&statistics_batch) - }) - .and_then(|v| match v { - ColumnarValue::Array(array) => Ok(array), - ColumnarValue::Scalar(_) => Err(DataFusionError::Internal( - "predicate expression didn't return an array".to_string(), - )), - })?; + let predicate_array = + build_statistics_record_batch(statistics, &self.stat_column_req) + .and_then(|statistics_batch| { + // execute predicate expression + self.predicate_expr.evaluate(&statistics_batch) + }) + .and_then(|v| match v { + ColumnarValue::Array(array) => Ok(array), + ColumnarValue::Scalar(_) => Err(DataFusionError::Internal( + "predicate expression didn't return an array".to_string(), + )), + })?; let predicate_array = predicate_array .as_any() @@ -141,39 +182,78 @@ impl PruningPredicateBuilder { .map(|x| x.unwrap_or(true)) .collect::>()) } + + /// Return a reference to the input schema + pub fn schema(&self) -> &SchemaRef { + &self.schema + } } -/// Build a RecordBatch from a list of statistics (currently parquet -/// [`RowGroupMetadata`] structs), creating arrays, one for each -/// statistics column, as requested in the stat_column_req parameter. -fn build_statistics_record_batch( - statistics: &[RowGroupMetaData], - schema: &Schema, +/// Build a RecordBatch from a list of statistics, creating arrays, +/// with one row for each PruningStatistics and columns specified in +/// in the stat_column_req parameter. +/// +/// For example, if the requested columns are +/// ```text +/// ("s1", Min, Field:s1_min) +/// ("s2", Max, field:s2_max) +///``` +/// +/// And the input statistics had +/// ```text +/// S1(Min: 5, Max: 10) +/// S2(Min: 99, Max: 1000) +/// S3(Min: 1, Max: 2) +/// ``` +/// +/// Then this function would build a record batch with 2 columns and +/// one row s1_min and s2_max as follows (s3 is not requested): +/// +/// ```text +/// s1_min | s2_max +/// -------+-------- +/// 5 | 1000 +/// ``` +fn build_statistics_record_batch( + statistics: &S, stat_column_req: &[(String, StatisticsType, Field)], ) -> Result { let mut fields = Vec::::new(); let mut arrays = Vec::::new(); + // For each needed statistics column: for (column_name, statistics_type, stat_field) in stat_column_req { - if let Some((column_index, _)) = schema.column_with_name(column_name) { - let statistics = statistics - .iter() - .map(|g| g.column(column_index).statistics()) - .collect::>(); - let array = build_statistics_array( - &statistics, - *statistics_type, - stat_field.data_type(), - ); - fields.push(stat_field.clone()); - arrays.push(array); + let data_type = stat_field.data_type(); + + let num_containers = statistics.num_containers(); + + let array = match statistics_type { + StatisticsType::Min => statistics.min_values(column_name), + StatisticsType::Max => statistics.max_values(column_name), + }; + let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers)); + + if num_containers != array.len() { + return Err(DataFusionError::Internal(format!( + "mismatched statistics length. Expected {}, got {}", + num_containers, + array.len() + ))); } + + // cast statistics array to required data type (e.g. parquet + // provides timestamp statistics as "Int64") + let array = arrow::compute::cast(&array, data_type)?; + + fields.push(stat_field.clone()); + arrays.push(array); } + let schema = Arc::new(Schema::new(fields)); RecordBatch::try_new(schema, arrays) .map_err(|err| DataFusionError::Plan(err.to_string())) } -struct StatisticsExpressionBuilder<'a> { +struct PruningExpressionBuilder<'a> { column_name: String, column_expr: &'a Expr, scalar_expr: &'a Expr, @@ -182,7 +262,7 @@ struct StatisticsExpressionBuilder<'a> { reverse_operator: bool, } -impl<'a> StatisticsExpressionBuilder<'a> { +impl<'a> PruningExpressionBuilder<'a> { fn try_new( left: &'a Expr, right: &'a Expr, @@ -303,7 +383,11 @@ fn rewrite_column_expr( utils::rewrite_expression(&expr, &expressions) } -/// Translate logical filter expression into statistics predicate expression +/// Translate logical filter expression into pruning predicate +/// expression that will evaluate to FALSE if it can be determined no +/// rows between the min/max values could pass the predicates. +/// +/// Returns the pruning predicate as an [`Expr`] fn build_predicate_expression( expr: &Expr, schema: &Schema, @@ -328,7 +412,7 @@ fn build_predicate_expression( } let expr_builder = - StatisticsExpressionBuilder::try_new(left, right, schema, stat_column_req); + PruningExpressionBuilder::try_new(left, right, schema, stat_column_req); let mut expr_builder = match expr_builder { Ok(builder) => builder, // allow partial failure in predicate expression generation @@ -384,210 +468,307 @@ enum StatisticsType { Max, } -fn build_statistics_array( - statistics: &[Option<&ParquetStatistics>], - statistics_type: StatisticsType, - data_type: &DataType, -) -> ArrayRef { - let statistics_count = statistics.len(); - let first_group_stats = statistics.iter().find(|s| s.is_some()); - let first_group_stats = if let Some(Some(statistics)) = first_group_stats { - // found first row group with statistics defined - statistics - } else { - // no row group has statistics defined - return new_null_array(data_type, statistics_count); +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use super::*; + use crate::logical_plan::{col, lit}; + use crate::{assert_batches_eq, physical_optimizer::pruning::StatisticsType}; + use arrow::{ + array::{BinaryArray, Int32Array, Int64Array, StringArray}, + datatypes::{DataType, TimeUnit}, }; - let (data_size, arrow_type) = match first_group_stats { - ParquetStatistics::Int32(_) => (std::mem::size_of::(), DataType::Int32), - ParquetStatistics::Int64(_) => (std::mem::size_of::(), DataType::Int64), - ParquetStatistics::Float(_) => (std::mem::size_of::(), DataType::Float32), - ParquetStatistics::Double(_) => (std::mem::size_of::(), DataType::Float64), - ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => { - (0, DataType::Utf8) + #[derive(Debug)] + /// Test for container stats + struct ContainerStats { + min: ArrayRef, + max: ArrayRef, + } + + impl ContainerStats { + fn new_i32( + min: impl IntoIterator>, + max: impl IntoIterator>, + ) -> Self { + Self { + min: Arc::new(min.into_iter().collect::()), + max: Arc::new(max.into_iter().collect::()), + } } - _ => { - // type of statistics not supported - return new_null_array(data_type, statistics_count); + + fn new_utf8<'a>( + min: impl IntoIterator>, + max: impl IntoIterator>, + ) -> Self { + Self { + min: Arc::new(min.into_iter().collect::()), + max: Arc::new(max.into_iter().collect::()), + } } - }; - let statistics = statistics.iter().map(|s| { - s.filter(|s| s.has_min_max_set()) - .map(|s| match statistics_type { - StatisticsType::Min => s.min_bytes(), - StatisticsType::Max => s.max_bytes(), - }) - }); - - if arrow_type == DataType::Utf8 { - let data_size = statistics - .clone() - .map(|x| x.map(|b| b.len()).unwrap_or(0)) - .sum(); - let mut builder = - arrow::array::StringBuilder::with_capacity(statistics_count, data_size); - let string_statistics = - statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok())); - for maybe_string in string_statistics { - match maybe_string { - Some(string_value) => builder.append_value(string_value).unwrap(), - None => builder.append_null().unwrap(), - }; + fn min(&self) -> Option { + Some(self.min.clone()) + } + + fn max(&self) -> Option { + Some(self.max.clone()) } - return Arc::new(builder.finish()); - } - - let mut data_buffer = MutableBuffer::new(statistics_count * data_size); - let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count); - let mut null_count = 0; - for s in statistics { - if let Some(stat_data) = s { - bitmap_builder.append(true); - data_buffer.extend_from_slice(stat_data); - } else { - bitmap_builder.append(false); - data_buffer.resize(data_buffer.len() + data_size, 0); - null_count += 1; + + fn len(&self) -> usize { + assert_eq!(self.min.len(), self.max.len()); + self.min.len() } } - let mut builder = ArrayData::builder(arrow_type) - .len(statistics_count) - .add_buffer(data_buffer.into()); - if null_count > 0 { - builder = builder.null_bit_buffer(bitmap_builder.finish()); + #[derive(Debug, Default)] + struct TestStatistics { + // key: column name + stats: HashMap, } - let array_data = builder.build(); - let statistics_array = make_array(array_data); - if statistics_array.data_type() == data_type { - return statistics_array; + + impl TestStatistics { + fn new() -> Self { + Self::default() + } + + fn with( + mut self, + name: impl Into, + container_stats: ContainerStats, + ) -> Self { + self.stats.insert(name.into(), container_stats); + self + } } - // cast statistics array to required data type - arrow::compute::cast(&statistics_array, data_type) - .unwrap_or_else(|_| new_null_array(data_type, statistics_count)) -} -#[cfg(test)] -mod tests { - use super::*; - use crate::physical_optimizer::pruning::StatisticsType; - use arrow::{ - array::{Int32Array, StringArray}, - datatypes::DataType, - }; - use parquet::file::statistics::Statistics as ParquetStatistics; + impl PruningStatistics for TestStatistics { + fn min_values(&self, column: &str) -> Option { + self.stats + .get(column) + .map(|container_stats| container_stats.min()) + .unwrap_or(None) + } + + fn max_values(&self, column: &str) -> Option { + self.stats + .get(column) + .map(|container_stats| container_stats.max()) + .unwrap_or(None) + } + + fn num_containers(&self) -> usize { + self.stats + .values() + .next() + .map(|container_stats| container_stats.len()) + .unwrap_or(0) + } + } + + /// Returns the specified min/max container values + struct OneContainerStats { + min_values: Option, + max_values: Option, + num_containers: usize, + } + + impl PruningStatistics for OneContainerStats { + fn min_values(&self, _column: &str) -> Option { + self.min_values.clone() + } + + fn max_values(&self, _column: &str) -> Option { + self.max_values.clone() + } + + fn num_containers(&self) -> usize { + self.num_containers + } + } #[test] - fn build_statistics_array_int32() { - // build row group metadata array - let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false); - let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false); - let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false); - let statistics = vec![Some(&s1), Some(&s2), Some(&s3)]; - - let statistics_array = - build_statistics_array(&statistics, StatisticsType::Min, &DataType::Int32); - let int32_array = statistics_array - .as_any() - .downcast_ref::() - .unwrap(); - let int32_vec = int32_array.into_iter().collect::>(); - assert_eq!(int32_vec, vec![None, Some(2), Some(3)]); - - let statistics_array = - build_statistics_array(&statistics, StatisticsType::Max, &DataType::Int32); - let int32_array = statistics_array - .as_any() - .downcast_ref::() - .unwrap(); - let int32_vec = int32_array.into_iter().collect::>(); - // here the first max value is None and not the Some(10) value which was actually set - // because the min value is None - assert_eq!(int32_vec, vec![None, Some(20), Some(30)]); + fn test_build_statistics_record_batch() { + // Request a record batch with of s1_min, s2_max, s3_max, s3_min + let stat_column_req = vec![ + // min of original column s1, named s1_min + ( + "s1".to_string(), + StatisticsType::Min, + Field::new("s1_min", DataType::Int32, true), + ), + // max of original column s2, named s2_max + ( + "s2".to_string(), + StatisticsType::Max, + Field::new("s2_max", DataType::Int32, true), + ), + // max of original column s3, named s3_max + ( + "s3".to_string(), + StatisticsType::Max, + Field::new("s3_max", DataType::Utf8, true), + ), + // min of original column s3, named s3_min + ( + "s3".to_string(), + StatisticsType::Min, + Field::new("s3_min", DataType::Utf8, true), + ), + ]; + + let statistics = TestStatistics::new() + .with( + "s1", + ContainerStats::new_i32( + vec![None, None, Some(9), None], // min + vec![Some(10), None, None, None], // max + ), + ) + .with( + "s2", + ContainerStats::new_i32( + vec![Some(2), None, None, None], // min + vec![Some(20), None, None, None], // max + ), + ) + .with( + "s3", + ContainerStats::new_utf8( + vec![Some("a"), None, None, None], // min + vec![Some("q"), None, Some("r"), None], // max + ), + ); + + let batch = build_statistics_record_batch(&statistics, &stat_column_req).unwrap(); + let expected = vec![ + "+--------+--------+--------+--------+", + "| s1_min | s2_max | s3_max | s3_min |", + "+--------+--------+--------+--------+", + "| | 20 | q | a |", + "| | | | |", + "| 9 | | r | |", + "| | | | |", + "+--------+--------+--------+--------+", + ]; + + assert_batches_eq!(expected, &[batch]); } #[test] - fn build_statistics_array_utf8() { - // build row group metadata array - let s1 = ParquetStatistics::byte_array(None, Some("10".into()), None, 0, false); - let s2 = ParquetStatistics::byte_array( - Some("2".into()), - Some("20".into()), - None, - 0, - false, - ); - let s3 = ParquetStatistics::byte_array( - Some("3".into()), - Some("30".into()), - None, - 0, - false, - ); - let statistics = vec![Some(&s1), Some(&s2), Some(&s3)]; + fn test_build_statistics_casting() { + // Test requesting a Timestamp column, but getting statistics as Int64 + // which is what Parquet does + + // Request a record batch with of s1_min as a timestamp + let stat_column_req = vec![( + "s1".to_string(), + StatisticsType::Min, + Field::new( + "s1_min", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + ), + )]; + + // Note the statistics pass back i64 (not timestamp) + let statistics = OneContainerStats { + min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))), + max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))), + num_containers: 1, + }; - let statistics_array = - build_statistics_array(&statistics, StatisticsType::Min, &DataType::Utf8); - let string_array = statistics_array - .as_any() - .downcast_ref::() - .unwrap(); - let string_vec = string_array.into_iter().collect::>(); - assert_eq!(string_vec, vec![None, Some("2"), Some("3")]); - - let statistics_array = - build_statistics_array(&statistics, StatisticsType::Max, &DataType::Utf8); - let string_array = statistics_array - .as_any() - .downcast_ref::() - .unwrap(); - let string_vec = string_array.into_iter().collect::>(); - // here the first max value is None and not the Some("10") value which was actually set - // because the min value is None - assert_eq!(string_vec, vec![None, Some("20"), Some("30")]); + let batch = build_statistics_record_batch(&statistics, &stat_column_req).unwrap(); + let expected = vec![ + "+-------------------------------+", + "| s1_min |", + "+-------------------------------+", + "| 1970-01-01 00:00:00.000000010 |", + "+-------------------------------+", + ]; + + assert_batches_eq!(expected, &[batch]); } #[test] - fn build_statistics_array_empty_stats() { - let data_type = DataType::Int32; - let statistics = vec![]; - let statistics_array = - build_statistics_array(&statistics, StatisticsType::Min, &data_type); - assert_eq!(statistics_array.len(), 0); - - let statistics = vec![None, None]; - let statistics_array = - build_statistics_array(&statistics, StatisticsType::Min, &data_type); - assert_eq!(statistics_array.len(), statistics.len()); - assert_eq!(statistics_array.data_type(), &data_type); - for i in 0..statistics_array.len() { - assert_eq!(statistics_array.is_null(i), true); - assert_eq!(statistics_array.is_valid(i), false); - } + fn test_build_statistics_no_stats() { + let stat_column_req = vec![]; + + let statistics = OneContainerStats { + min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))), + max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))), + num_containers: 1, + }; + + let result = + build_statistics_record_batch(&statistics, &stat_column_req).unwrap_err(); + assert!( + result.to_string().contains("Invalid argument error"), + "{}", + result + ); } #[test] - fn build_statistics_array_unsupported_type() { - // boolean is not currently a supported type for statistics - let s1 = ParquetStatistics::boolean(Some(false), Some(true), None, 0, false); - let s2 = ParquetStatistics::boolean(Some(false), Some(true), None, 0, false); - let statistics = vec![Some(&s1), Some(&s2)]; - let data_type = DataType::Boolean; - let statistics_array = - build_statistics_array(&statistics, StatisticsType::Min, &data_type); - assert_eq!(statistics_array.len(), statistics.len()); - assert_eq!(statistics_array.data_type(), &data_type); - for i in 0..statistics_array.len() { - assert_eq!(statistics_array.is_null(i), true); - assert_eq!(statistics_array.is_valid(i), false); - } + fn test_build_statistics_inconsistent_types() { + // Test requesting a Utf8 column when the stats return some other type + + // Request a record batch with of s1_min as a timestamp + let stat_column_req = vec![( + "s1".to_string(), + StatisticsType::Min, + Field::new("s1_min", DataType::Utf8, true), + )]; + + // Note the statistics return binary (which can't be cast to string) + let statistics = OneContainerStats { + min_values: Some(Arc::new(BinaryArray::from(vec![&[255u8] as &[u8]]))), + max_values: None, + num_containers: 1, + }; + + let batch = build_statistics_record_batch(&statistics, &stat_column_req).unwrap(); + let expected = vec![ + "+--------+", + "| s1_min |", + "+--------+", + "| |", + "+--------+", + ]; + + assert_batches_eq!(expected, &[batch]); + } + + #[test] + fn test_build_statistics_inconsistent_length() { + // return an inconsistent length to the actual statistics arrays + let stat_column_req = vec![( + "s1".to_string(), + StatisticsType::Min, + Field::new("s1_min", DataType::Int64, true), + )]; + + // Note the statistics pass back i64 (not timestamp) + let statistics = OneContainerStats { + min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))), + max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))), + num_containers: 3, + }; + + let result = + build_statistics_record_batch(&statistics, &stat_column_req).unwrap_err(); + assert!( + result + .to_string() + .contains("mismatched statistics length. Expected 3, got 1"), + "{}", + result + ); } #[test] fn row_group_predicate_eq() -> Result<()> { - use crate::logical_plan::{col, lit}; let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); let expected_expr = "#c1_min LtEq Int32(1) And Int32(1) LtEq #c1_max"; @@ -606,7 +787,6 @@ mod tests { #[test] fn row_group_predicate_gt() -> Result<()> { - use crate::logical_plan::{col, lit}; let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); let expected_expr = "#c1_max Gt Int32(1)"; @@ -625,7 +805,6 @@ mod tests { #[test] fn row_group_predicate_gt_eq() -> Result<()> { - use crate::logical_plan::{col, lit}; let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); let expected_expr = "#c1_max GtEq Int32(1)"; @@ -643,7 +822,6 @@ mod tests { #[test] fn row_group_predicate_lt() -> Result<()> { - use crate::logical_plan::{col, lit}; let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); let expected_expr = "#c1_min Lt Int32(1)"; @@ -662,7 +840,6 @@ mod tests { #[test] fn row_group_predicate_lt_eq() -> Result<()> { - use crate::logical_plan::{col, lit}; let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); let expected_expr = "#c1_min LtEq Int32(1)"; @@ -680,7 +857,6 @@ mod tests { #[test] fn row_group_predicate_and() -> Result<()> { - use crate::logical_plan::{col, lit}; let schema = Schema::new(vec![ Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Int32, false), @@ -697,7 +873,6 @@ mod tests { #[test] fn row_group_predicate_or() -> Result<()> { - use crate::logical_plan::{col, lit}; let schema = Schema::new(vec![ Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Int32, false), @@ -713,7 +888,6 @@ mod tests { #[test] fn row_group_predicate_stat_column_req() -> Result<()> { - use crate::logical_plan::{col, lit}; let schema = Schema::new(vec![ Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Int32, false), @@ -749,4 +923,34 @@ mod tests { Ok(()) } + + #[test] + fn prune_api() { + let schema = Arc::new(Schema::new(vec![ + Field::new("s1", DataType::Utf8, false), + Field::new("s2", DataType::Int32, false), + ])); + + // Prune using s2 > 5 + let expr = col("s2").gt(lit(5)); + + let statistics = TestStatistics::new().with( + "s2", + ContainerStats::new_i32( + vec![Some(0), Some(4), None, Some(3)], // min + vec![Some(5), Some(6), None, None], // max + ), + ); + + // s2 [0, 5] ==> no rows should pass + // s2 [4, 6] ==> some rows could pass + // No stats for s2 ==> some rows could pass + // s2 [3, None] (null max) ==> some rows could pass + + let p = PruningPredicate::try_new(&expr, schema).unwrap(); + let result = p.prune(&statistics).unwrap(); + let expected = vec![false, true, true, true]; + + assert_eq!(result, expected); + } } diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index fee4b3e11e5d2..011db64aaf8a2 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -115,6 +115,7 @@ mod tests { #[test] fn added_repartition_to_single_partition() -> Result<()> { + let schema = Arc::new(Schema::empty()); let parquet_project = ProjectionExec::try_new( vec![], Arc::new(ParquetExec::new( @@ -122,7 +123,7 @@ mod tests { filenames: vec!["x".to_string()], statistics: Statistics::default(), }], - Schema::empty(), + schema, None, None, 2048, @@ -149,6 +150,7 @@ mod tests { #[test] fn repartition_deepest_node() -> Result<()> { + let schema = Arc::new(Schema::empty()); let parquet_project = ProjectionExec::try_new( vec![], Arc::new(ProjectionExec::try_new( @@ -158,7 +160,7 @@ mod tests { filenames: vec!["x".to_string()], statistics: Statistics::default(), }], - Schema::empty(), + schema, None, None, 2048, diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index f36171cdb73f4..d67915fc025d6 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -17,23 +17,25 @@ //! Execution plan for reading Parquet files -use std::any::Any; use std::fmt; use std::fs::File; use std::sync::Arc; use std::task::{Context, Poll}; +use std::{any::Any, convert::TryInto}; use crate::{ error::{DataFusionError, Result}, logical_plan::Expr, - physical_optimizer::pruning::PruningPredicateBuilder, + physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, physical_plan::{ common, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, }, + scalar::ScalarValue, }; use arrow::{ + array::ArrayRef, datatypes::{Schema, SchemaRef}, error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, @@ -41,10 +43,12 @@ use arrow::{ use parquet::file::{ metadata::RowGroupMetaData, reader::{FileReader, SerializedFileReader}, + statistics::Statistics as ParquetStatistics, }; use fmt::Debug; use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; + use tokio::{ sync::mpsc::{channel, Receiver, Sender}, task, @@ -69,7 +73,7 @@ pub struct ParquetExec { /// Statistics for the data set (sum of statistics for all partitions) statistics: Statistics, /// Optional predicate builder - predicate_builder: Option, + predicate_builder: Option, /// Optional limit of the number of rows limit: Option, } @@ -220,9 +224,9 @@ impl ParquetExec { schemas.len() ))); } - let schema = schemas[0].clone(); + let schema = Arc::new(schemas.pop().unwrap()); let predicate_builder = predicate.and_then(|predicate_expr| { - PruningPredicateBuilder::try_new(&predicate_expr, schema.clone()).ok() + PruningPredicate::try_new(&predicate_expr, schema.clone()).ok() }); Ok(Self::new( @@ -238,9 +242,9 @@ impl ParquetExec { /// Create a new Parquet reader execution plan with provided partitions and schema pub fn new( partitions: Vec, - schema: Schema, + schema: SchemaRef, projection: Option>, - predicate_builder: Option, + predicate_builder: Option, batch_size: usize, limit: Option, ) -> Self { @@ -457,11 +461,102 @@ fn send_result( Ok(()) } +/// Wraps parquet statistics in a way +/// that implements [`PruningStatistics`] +struct RowGroupPruningStatistics<'a> { + row_group_metadata: &'a [RowGroupMetaData], + parquet_schema: &'a Schema, +} + +/// Extract the min/max statistics from a `ParquetStatistics` object +macro_rules! get_statistic { + ($column_statistics:expr, $func:ident, $bytes_func:ident) => {{ + if !$column_statistics.has_min_max_set() { + return None; + } + match $column_statistics { + ParquetStatistics::Boolean(s) => Some(ScalarValue::Boolean(Some(*s.$func()))), + ParquetStatistics::Int32(s) => Some(ScalarValue::Int32(Some(*s.$func()))), + ParquetStatistics::Int64(s) => Some(ScalarValue::Int64(Some(*s.$func()))), + // 96 bit ints not supported + ParquetStatistics::Int96(_) => None, + ParquetStatistics::Float(s) => Some(ScalarValue::Float32(Some(*s.$func()))), + ParquetStatistics::Double(s) => Some(ScalarValue::Float64(Some(*s.$func()))), + ParquetStatistics::ByteArray(s) => { + let s = std::str::from_utf8(s.$bytes_func()) + .map(|s| s.to_string()) + .ok(); + Some(ScalarValue::Utf8(s)) + } + // type not supported yet + ParquetStatistics::FixedLenByteArray(_) => None, + } + }}; +} + +// Extract the min or max value calling `func` or `bytes_func` on the ParquetStatistics as appropriate +macro_rules! get_min_max_values { + ($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{ + let (column_index, field) = if let Some((v, f)) = $self.parquet_schema.column_with_name($column) { + (v, f) + } else { + // Named column was not present + return None + }; + + let data_type = field.data_type(); + let null_scalar: ScalarValue = if let Ok(v) = data_type.try_into() { + v + } else { + // DataFusion doesn't have support for ScalarValues of the column type + return None + }; + + let scalar_values : Vec = $self.row_group_metadata + .iter() + .flat_map(|meta| { + meta.column(column_index).statistics() + }) + .map(|stats| { + get_statistic!(stats, $func, $bytes_func) + }) + .map(|maybe_scalar| { + // column either did't have statistics at all or didn't have min/max values + maybe_scalar.unwrap_or_else(|| null_scalar.clone()) + }) + .collect(); + + // ignore errors converting to arrays (e.g. different types) + ScalarValue::iter_to_array(scalar_values.iter()).ok() + }} +} + +impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { + fn min_values(&self, column: &str) -> Option { + get_min_max_values!(self, column, min, min_bytes) + } + + fn max_values(&self, column: &str) -> Option { + get_min_max_values!(self, column, max, max_bytes) + } + + fn num_containers(&self) -> usize { + self.row_group_metadata.len() + } +} + fn build_row_group_predicate( - predicate_builder: &PruningPredicateBuilder, + predicate_builder: &PruningPredicate, row_group_metadata: &[RowGroupMetaData], ) -> Box bool> { - let predicate_values = predicate_builder.build_pruning_predicate(row_group_metadata); + let parquet_schema = predicate_builder.schema().as_ref(); + + let pruning_stats = RowGroupPruningStatistics { + row_group_metadata, + parquet_schema, + }; + + let predicate_values = predicate_builder.prune(&pruning_stats); let predicate_values = match predicate_values { Ok(values) => values, @@ -476,7 +571,7 @@ fn build_row_group_predicate( fn read_files( filenames: &[String], projection: &[usize], - predicate_builder: &Option, + predicate_builder: &Option, batch_size: usize, response_tx: Sender>, limit: Option, @@ -651,7 +746,7 @@ mod tests { // int > 1 => c1_max > 1 let expr = col("c1").gt(lit(15)); let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); - let predicate_builder = PruningPredicateBuilder::try_new(&expr, schema)?; + let predicate_builder = PruningPredicate::try_new(&expr, Arc::new(schema))?; let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]); let rgm1 = get_row_group_meta_data( @@ -681,7 +776,7 @@ mod tests { // int > 1 => c1_max > 1 let expr = col("c1").gt(lit(15)); let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); - let predicate_builder = PruningPredicateBuilder::try_new(&expr, schema)?; + let predicate_builder = PruningPredicate::try_new(&expr, Arc::new(schema))?; let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]); let rgm1 = get_row_group_meta_data( @@ -713,11 +808,11 @@ mod tests { // test row group predicate with partially supported expression // int > 1 and int % 2 => c1_max > 1 and true let expr = col("c1").gt(lit(15)).and(col("c2").modulus(lit(2))); - let schema = Schema::new(vec![ + let schema = Arc::new(Schema::new(vec![ Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Int32, false), - ]); - let predicate_builder = PruningPredicateBuilder::try_new(&expr, schema.clone())?; + ])); + let predicate_builder = PruningPredicate::try_new(&expr, schema.clone())?; let schema_descr = get_test_schema_descr(vec![ ("c1", PhysicalType::INT32), @@ -752,7 +847,7 @@ mod tests { // if conditions in predicate are joined with OR and an unsupported expression is used // this bypasses the entire predicate expression and no row groups are filtered out let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2))); - let predicate_builder = PruningPredicateBuilder::try_new(&expr, schema)?; + let predicate_builder = PruningPredicate::try_new(&expr, schema)?; let row_group_predicate = build_row_group_predicate(&predicate_builder, &row_group_metadata); let row_group_filter = row_group_metadata @@ -772,11 +867,11 @@ mod tests { // where a null array is generated for some statistics columns // int > 1 and bool = true => c1_max > 1 and null let expr = col("c1").gt(lit(15)).and(col("c2").eq(lit(true))); - let schema = Schema::new(vec![ + let schema = Arc::new(Schema::new(vec![ Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Boolean, false), - ]); - let predicate_builder = PruningPredicateBuilder::try_new(&expr, schema)?; + ])); + let predicate_builder = PruningPredicate::try_new(&expr, schema)?; let schema_descr = get_test_schema_descr(vec![ ("c1", PhysicalType::INT32), From 86f80797041ad08b236ac72a8cb810c0d9bd1c26 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 27 May 2021 06:25:20 -0400 Subject: [PATCH 2/4] avoid a collect --- datafusion/src/physical_plan/parquet.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index d67915fc025d6..91868c0ecfe50 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -512,7 +512,7 @@ macro_rules! get_min_max_values { return None }; - let scalar_values : Vec = $self.row_group_metadata + let scalar_values = $self.row_group_metadata .iter() .flat_map(|meta| { meta.column(column_index).statistics() @@ -523,11 +523,10 @@ macro_rules! get_min_max_values { .map(|maybe_scalar| { // column either did't have statistics at all or didn't have min/max values maybe_scalar.unwrap_or_else(|| null_scalar.clone()) - }) - .collect(); + }); // ignore errors converting to arrays (e.g. different types) - ScalarValue::iter_to_array(scalar_values.iter()).ok() + ScalarValue::iter_to_array(scalar_values).ok() }} } From ae1e57c1480fadb9627504a228a6215688b57163 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 27 May 2021 06:27:46 -0400 Subject: [PATCH 3/4] Revert "avoid a collect" This reverts commit 86f80797041ad08b236ac72a8cb810c0d9bd1c26. --- datafusion/src/physical_plan/parquet.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index 91868c0ecfe50..d67915fc025d6 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -512,7 +512,7 @@ macro_rules! get_min_max_values { return None }; - let scalar_values = $self.row_group_metadata + let scalar_values : Vec = $self.row_group_metadata .iter() .flat_map(|meta| { meta.column(column_index).statistics() @@ -523,10 +523,11 @@ macro_rules! get_min_max_values { .map(|maybe_scalar| { // column either did't have statistics at all or didn't have min/max values maybe_scalar.unwrap_or_else(|| null_scalar.clone()) - }); + }) + .collect(); // ignore errors converting to arrays (e.g. different types) - ScalarValue::iter_to_array(scalar_values).ok() + ScalarValue::iter_to_array(scalar_values.iter()).ok() }} } From ef966f31767f329b6f5295355ecbaff6ed828b3a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 27 May 2021 06:29:19 -0400 Subject: [PATCH 4/4] update for new api --- datafusion/src/physical_plan/parquet.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index d67915fc025d6..55a6d96738cb4 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -527,7 +527,7 @@ macro_rules! get_min_max_values { .collect(); // ignore errors converting to arrays (e.g. different types) - ScalarValue::iter_to_array(scalar_values.iter()).ok() + ScalarValue::iter_to_array(scalar_values).ok() }} }