Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 53 additions & 28 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,10 @@
// specific language governing permissions and limitations
// under the License.

//! This module contains code to prune "containers" of row groups
//! based on statistics prior to execution. This can lead to
//! significant performance improvements by avoiding the need
//! to evaluate a plan on entire containers (e.g. an entire file)
//! [`PruningPredicate`] to apply filter [`Expr`] to prune "containers"
//! based on statistics (e.g. Parquet Row Groups)
//!
//! For example, DataFusion uses this code to prune (skip) row groups
//! while reading parquet files if it can be determined from the
//! predicate that nothing in the row group can match.
//!
//! This code can also be used by other systems to prune other
//! entities (e.g. entire files) if the statistics are known via some
//! other source (e.g. a catalog)

//! [`Expr`]: crate::prelude::Expr
use std::collections::HashSet;
use std::convert::TryFrom;
use std::sync::Arc;
Expand All @@ -53,18 +44,21 @@ use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};
use log::trace;

/// Interface to pass statistics information to [`PruningPredicate`]
/// Interface to pass statistics (min/max/nulls) information to [`PruningPredicate`].
///
/// Returns statistics for containers / files of data in Arrays.
/// Returns statistics for containers / files as Arrow [`ArrayRef`], so the
/// evaluation happens once on a single `RecordBatch`, amortizing the overhead
/// of evaluating of the predicate. This is important when pruning 1000s of
/// containers which often happens in analytic systems.
///
/// For example, for the following three files with a single column
/// For example, for the following three files with a single column `a`:
/// ```text
/// file1: column a: min=5, max=10
/// file2: column a: No stats
/// file2: column a: min=20, max=30
/// ```
///
/// PruningStatistics should return:
/// PruningStatistics would return:
///
/// ```text
/// min_values("a") -> Some([5, Null, 20])
Expand All @@ -91,10 +85,44 @@ pub trait PruningStatistics {
fn null_counts(&self, column: &Column) -> Option<ArrayRef>;
}

/// Evaluates filter expressions on statistics in order to
/// prune data containers (e.g. parquet row group)
/// Evaluates filter expressions on statistics, rather than the actual data. If
/// no rows could possibly pass the filter entire containers can be "pruned"
/// (skipped), without reading any actual data, leading to significant
/// performance improvements.
///
/// [`PruningPredicate`]s are used to prune (avoid scanning) Parquet Row Groups
/// based on the min/max values found in the Parquet metadata. If the
/// `PruningPredicate` can guarantee that no rows in the Row Group match the
/// filter, the entire Row Group is skipped during query execution.
///
/// Note that this API is designed to be general, as it works:
///
/// 1. Arbitrary expressions expressions (including user defined functions)
///
/// 2. Anything that implements the [`PruningStatistics`] trait, not just
/// Parquet metadata, allowing it to be used by other systems to prune entities
/// (e.g. entire files) if the statistics are known via some other source, such
/// as a catalog.
///
/// # Example
///
/// Given an expression like `x = 5` and statistics for 3 containers (Row
/// Groups, files, etc) `A`, `B`, and `C`:
///
/// ```text
/// A: {x_min = 0, x_max = 4}
/// B: {x_min = 2, x_max = 10}
/// C: {x_min = 5, x_max = 8}
/// ```
///
/// Applying the `PruningPredicate` will concludes that `A` can be pruned:
///
/// See [`PruningPredicate::try_new`] for more information.
/// ```text
/// A: false (no rows could possibly match x = 5)
/// B: true (rows might match x = 5)
/// C: true (rows might match x = 5)
/// ```
/// See [`PruningPredicate::try_new`] and [`PruningPredicate::prune`] for more information.
#[derive(Debug, Clone)]
pub struct PruningPredicate {
/// The input schema against which the predicate will be evaluated
Expand Down Expand Up @@ -146,17 +174,14 @@ impl PruningPredicate {
///
/// `true`: There MAY be rows that match the predicate
///
/// `false`: There are no rows that could match the predicate
/// `false`: There are no rows that could possibly match the predicate
///
/// Note this function takes a slice of statistics as a parameter
/// to amortize the cost of the evaluation of the predicate
/// against a single record batch.
///
/// Note: the predicate passed to `prune` should be simplified as
/// Note: the predicate passed to `prune` should already be simplified as
/// much as possible (e.g. this pass doesn't handle some
/// expressions like `b = false`, but it does handle the
/// simplified version `b`. The predicates are simplified via the
/// ConstantFolding optimizer pass
/// simplified version `b`. See [`ExprSimplifier`] to simplify expressions.
///
/// [`ExprSimplifier`]: crate::optimizer::simplify_expressions::ExprSimplifier
pub fn prune<S: PruningStatistics>(&self, statistics: &S) -> Result<Vec<bool>> {
// build a RecordBatch that contains the min/max values in the
// appropriate statistics columns
Expand Down Expand Up @@ -909,7 +934,7 @@ fn build_statistics_expr(
_ => {
return plan_err!(
"expressions other than (neq, eq, gt, gteq, lt, lteq) are not supported"
)
);
}
};
Ok(statistics_expr)
Expand Down