Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 43 additions & 42 deletions datafusion/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@
// specific language governing permissions and limitations
// under the License.

//! This module contains code to rule out row groups / partitions /
//! etc based on statistics prior in order to skip evaluating entire
//! swaths of rows.
//! This module contains code to prune "containers" of row groups
//! based on statistics prior to execution. This can lead to
//! significant performance improvements by avoiding the need
//! to evaluate a plan on entire containers (e.g. an entire file)
//!
//! For example, it is used to prune (skip) row groups while reading
//! parquet files if it can be determined from the predicate that
//! nothing in the row group can match.
//!
//! This code is currently specific to Parquet, but soon (TM), via
//! https://github.com/apache/arrow-datafusion/issues/363 it will
Expand Down Expand Up @@ -85,24 +90,24 @@ impl PruningPredicateBuilder {
})
}

/// Generate a predicate function used to filter based on
/// statistics
/// For each set of statistics, evalates the predicate in this
/// builder and returns a `bool` with the following meaning for a
/// container with those statistics:
///
/// `true`: The container MAY contain rows that match the predicate
///
/// This function takes a slice of statistics as parameter, so
/// that DataFusion's physical expressions can be executed once
/// against a single RecordBatch, containing statistics arrays, on
/// which the physical predicate expression is executed to
/// generate a row group filter array.
/// `false`: The container MUST NOT contain rows that match the predicate
///
/// The generated filter function is then used in the returned
/// closure to filter row groups. NOTE this is parquet specific at the moment
/// Note this function takes a slice of statistics as a parameter
/// to amortize the cost of the evaluation of the predicate
/// against a single record batch.
pub fn build_pruning_predicate(
&self,
row_group_metadata: &[RowGroupMetaData],
) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
statistics: &[RowGroupMetaData],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main change in this PR is this function's signature. The rest of the changes are fallout from doing that

) -> Result<Vec<bool>> {
// build statistics record batch
let predicate_result = build_statistics_record_batch(
row_group_metadata,
let predicate_array = build_statistics_record_batch(
statistics,
&self.schema,
&self.stat_column_req,
)
Expand All @@ -112,49 +117,45 @@ impl PruningPredicateBuilder {
})
.and_then(|v| match v {
ColumnarValue::Array(array) => Ok(array),
ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
ColumnarValue::Scalar(_) => Err(DataFusionError::Internal(
"predicate expression didn't return an array".to_string(),
)),
});

let predicate_array = match predicate_result {
Ok(array) => array,
// row group filter array could not be built
// return a closure which will not filter out any row groups
_ => return Box::new(|_r, _i| true),
};
})?;

let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>();
match predicate_array {
// return row group predicate function
Some(array) => {
// when the result of the predicate expression for a row group is null / undefined,
// e.g. due to missing statistics, this row group can't be filtered out,
// so replace with true
let predicate_values =
array.iter().map(|x| x.unwrap_or(true)).collect::<Vec<_>>();
Box::new(move |_, i| predicate_values[i])
}
// predicate result is not a BooleanArray
// return a closure which will not filter out any row groups
_ => Box::new(|_r, _i| true),
}
let predicate_array = predicate_array
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or_else(|| {
DataFusionError::Internal(format!(
"Expected pruning predicate evaluation to be BooleanArray, \
but was {:?}",
predicate_array
))
})?;

// when the result of the predicate expression for a row group is null / undefined,
// e.g. due to missing statistics, this row group can't be filtered out,
// so replace with true
Ok(predicate_array
.into_iter()
.map(|x| x.unwrap_or(true))
.collect::<Vec<_>>())
}
}

/// Build a RecordBatch from a list of statistics (currently parquet
/// [`RowGroupMetadata`] structs), creating arrays, one for each
/// statistics column, as requested in the stat_column_req parameter.
fn build_statistics_record_batch(
row_groups: &[RowGroupMetaData],
statistics: &[RowGroupMetaData],
schema: &Schema,
stat_column_req: &[(String, StatisticsType, Field)],
) -> Result<RecordBatch> {
let mut fields = Vec::<Field>::new();
let mut arrays = Vec::<ArrayRef>::new();
for (column_name, statistics_type, stat_field) in stat_column_req {
if let Some((column_index, _)) = schema.column_with_name(column_name) {
let statistics = row_groups
let statistics = statistics
.iter()
.map(|g| g.column(column_index).statistics())
.collect::<Vec<_>>();
Expand Down
37 changes: 29 additions & 8 deletions datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ use arrow::{
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
};
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::file::{
metadata::RowGroupMetaData,
reader::{FileReader, SerializedFileReader},
};

use fmt::Debug;
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
Expand Down Expand Up @@ -454,6 +457,22 @@ fn send_result(
Ok(())
}

fn build_row_group_predicate(
predicate_builder: &PruningPredicateBuilder,
row_group_metadata: &[RowGroupMetaData],
) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
let predicate_values = predicate_builder.build_pruning_predicate(row_group_metadata);

let predicate_values = match predicate_values {
Ok(values) => values,
// stats filter array could not be built
// return a closure which will not filter out any row groups
_ => return Box::new(|_r, _i| true),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing code also (silently) ignores error so we continue that tradition in this PR

};

Box::new(move |_, i| predicate_values[i])
}

fn read_files(
filenames: &[String],
projection: &[usize],
Expand All @@ -467,8 +486,10 @@ fn read_files(
let file = File::open(&filename)?;
let mut file_reader = SerializedFileReader::new(file)?;
if let Some(predicate_builder) = predicate_builder {
let row_group_predicate = predicate_builder
.build_pruning_predicate(file_reader.metadata().row_groups());
let row_group_predicate = build_row_group_predicate(
predicate_builder,
file_reader.metadata().row_groups(),
);
file_reader.filter_row_groups(&row_group_predicate);
}
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
Expand Down Expand Up @@ -643,7 +664,7 @@ mod tests {
);
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate =
predicate_builder.build_pruning_predicate(&row_group_metadata);
build_row_group_predicate(&predicate_builder, &row_group_metadata);
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand Down Expand Up @@ -673,7 +694,7 @@ mod tests {
);
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate =
predicate_builder.build_pruning_predicate(&row_group_metadata);
build_row_group_predicate(&predicate_builder, &row_group_metadata);
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand Down Expand Up @@ -718,7 +739,7 @@ mod tests {
);
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate =
predicate_builder.build_pruning_predicate(&row_group_metadata);
build_row_group_predicate(&predicate_builder, &row_group_metadata);
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand All @@ -733,7 +754,7 @@ mod tests {
let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2)));
let predicate_builder = PruningPredicateBuilder::try_new(&expr, schema)?;
let row_group_predicate =
predicate_builder.build_pruning_predicate(&row_group_metadata);
build_row_group_predicate(&predicate_builder, &row_group_metadata);
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand Down Expand Up @@ -777,7 +798,7 @@ mod tests {
);
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate =
predicate_builder.build_pruning_predicate(&row_group_metadata);
build_row_group_predicate(&predicate_builder, &row_group_metadata);
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand Down