diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 0b3ddbf1de650..ed24033d69b38 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use std::collections::hash_map::Entry; +use std::collections::{HashMap, HashSet}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll, ready}; @@ -58,7 +60,7 @@ use datafusion_common::{ use datafusion_execution::TaskContext; use datafusion_expr::Operator; use datafusion_physical_expr::equivalence::ProjectionMapping; -use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit}; +use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, lit}; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns}; use datafusion_physical_expr::{ @@ -308,44 +310,77 @@ impl FilterExec { &self.projection } - /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics. + /// Calculates `Statistics` for `FilterExec`, by applying selectivity + /// (either default, or estimated) to input statistics. + /// + /// Equality predicates (`col = literal`) set NDV to `Exact(1)`, or + /// `Exact(0)` when the predicate is contradictory (e.g. `a = 1 AND a = 2`). pub(crate) fn statistics_helper( schema: &SchemaRef, input_stats: Statistics, predicate: &Arc, default_selectivity: u8, ) -> Result { - if !check_support(predicate, schema) { + let (eq_columns, is_infeasible) = collect_equality_columns(predicate); + + let input_num_rows = input_stats.num_rows; + let input_total_byte_size = input_stats.total_byte_size; + + let (selectivity, num_rows, column_statistics) = if is_infeasible { + // Contradictory predicate: zero rows, and null/min/max are + // undefined on an empty column. + let mut cs = input_stats.to_inexact().column_statistics; + for col_stat in &mut cs { + col_stat.distinct_count = Precision::Exact(0); + col_stat.null_count = Precision::Exact(0); + col_stat.min_value = Precision::Absent; + col_stat.max_value = Precision::Absent; + col_stat.sum_value = Precision::Absent; + col_stat.byte_size = Precision::Exact(0); + } + (0.0, Precision::Exact(0), cs) + } else if !check_support(predicate, schema) { + // Interval analysis is not applicable; fall back to the default + // selectivity but still pin NDV=1 for every `col = literal` column. let selectivity = default_selectivity as f64 / 100.0; - let mut stats = input_stats.to_inexact(); - stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity); - stats.total_byte_size = stats - .total_byte_size - .with_estimated_selectivity(selectivity); - return Ok(stats); - } - - let num_rows = input_stats.num_rows; - let total_byte_size = input_stats.total_byte_size; - let input_analysis_ctx = - AnalysisContext::try_from_statistics(schema, &input_stats.column_statistics)?; - - let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?; + let mut cs = input_stats.to_inexact().column_statistics; + for &idx in &eq_columns { + if idx < cs.len() && cs[idx].distinct_count != Precision::Exact(0) { + cs[idx].distinct_count = Precision::Exact(1); + } + } + ( + selectivity, + input_num_rows.with_estimated_selectivity(selectivity), + cs, + ) + } else { + // Interval-analysis path. `collect_new_statistics` already sets + // distinct_count = Exact(1) when an interval collapses to a single + // value, so no post-fix is needed here. + let input_analysis_ctx = AnalysisContext::try_from_statistics( + schema, + &input_stats.column_statistics, + )?; + let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?; + let selectivity = analysis_ctx.selectivity.unwrap_or(1.0); + let filtered_num_rows = + input_num_rows.with_estimated_selectivity(selectivity); + let cs = collect_new_statistics( + schema, + &input_stats.column_statistics, + analysis_ctx.boundaries, + match &filtered_num_rows { + Precision::Absent => None, + p => Some(*p), + }, + ); + (selectivity, filtered_num_rows, cs) + }; - // Estimate (inexact) selectivity of predicate - let selectivity = analysis_ctx.selectivity.unwrap_or(1.0); - let num_rows = num_rows.with_estimated_selectivity(selectivity); - let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity); + let total_byte_size = + input_total_byte_size.with_estimated_selectivity(selectivity); - let column_statistics = collect_new_statistics( - schema, - &input_stats.column_statistics, - analysis_ctx.boundaries, - match &num_rows { - Precision::Absent => None, - p => Some(*p), - }, - ); Ok(Statistics { num_rows, total_byte_size, @@ -762,6 +797,62 @@ impl EmbeddedProjection for FilterExec { } } +/// Collects column equality information from `col = literal` predicates in a +/// conjunction. +/// +/// Returns `(eq_columns, is_infeasible)`: +/// - `eq_columns`: set of column indices constrained to a single literal value. +/// - `is_infeasible`: `true` when the same column is equated to two different +/// non-null literals (e.g. `name = 'alice' AND name = 'bob'`), which is +/// always unsatisfiable. +/// +/// Only AND conjunctions are traversed; OR is intentionally skipped +/// since `a = 1 OR a = 2` does not pin NDV to 1. +fn collect_equality_columns(predicate: &Arc) -> (HashSet, bool) { + let mut eq_values: HashMap = HashMap::new(); + let mut infeasible = false; + + for expr in split_conjunction(predicate) { + let Some(binary) = expr.downcast_ref::() else { + continue; + }; + if *binary.op() != Operator::Eq { + continue; + } + let left = binary.left(); + let right = binary.right(); + let pair = if let Some(col) = left.downcast_ref::() + && let Some(lit) = right.downcast_ref::() + && !lit.value().is_null() + { + Some((col.index(), lit.value().clone())) + } else if let Some(col) = right.downcast_ref::() + && let Some(lit) = left.downcast_ref::() + && !lit.value().is_null() + { + Some((col.index(), lit.value().clone())) + } else { + None + }; + + if let Some((idx, value)) = pair { + match eq_values.entry(idx) { + Entry::Occupied(prev) => { + if *prev.get() != value { + infeasible = true; + break; + } + } + Entry::Vacant(slot) => { + slot.insert(value); + } + } + } + } + + (eq_values.into_keys().collect(), infeasible) +} + /// Converts an interval bound to a [`Precision`] value. NULL bounds (which /// represent "unbounded" in the interval type) map to [`Precision::Absent`]. fn interval_bound_to_precision( @@ -2264,78 +2355,291 @@ mod tests { } #[tokio::test] - async fn test_filter_statistics_equality_sets_ndv_to_one() -> Result<()> { - // a: min=1, max=100, ndv=80 - let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); - let input = Arc::new(StatisticsExec::new( - Statistics { - num_rows: Precision::Inexact(100), - total_byte_size: Precision::Inexact(400), - column_statistics: vec![ColumnStatistics { - min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), - max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), - distinct_count: Precision::Inexact(80), + async fn test_filter_statistics_equality_ndv() -> Result<()> { + #[expect(clippy::type_complexity)] + let cases: Vec<( + &str, + Vec, + Vec, + Arc, + Vec>, + )> = vec![ + ( + "utf8 equality", + vec![Field::new("name", DataType::Utf8, false)], + vec![ColumnStatistics { + distinct_count: Precision::Inexact(50), ..Default::default() }], - }, - schema.clone(), - )); - - // a = 42 collapses interval to a single value - let predicate = Arc::new(BinaryExpr::new( - Arc::new(Column::new("a", 0)), - Operator::Eq, - Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), - )); - let filter: Arc = - Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.partition_statistics(None)?; - assert_eq!( - statistics.column_statistics[0].distinct_count, - Precision::Exact(1) - ); - Ok(()) - } - - #[tokio::test] - async fn test_filter_statistics_or_equality_preserves_ndv() -> Result<()> { - // a: min=1, max=100, ndv=80 - let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); - let input = Arc::new(StatisticsExec::new( - Statistics { - num_rows: Precision::Inexact(100), - total_byte_size: Precision::Inexact(400), - column_statistics: vec![ColumnStatistics { + Arc::new(BinaryExpr::new( + Arc::new(Column::new("name", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Utf8(Some("hello".to_string())))), + )), + vec![Precision::Exact(1)], + ), + ( + "utf8view equality", + vec![Field::new("name", DataType::Utf8View, false)], + vec![ColumnStatistics { + distinct_count: Precision::Inexact(50), + ..Default::default() + }], + Arc::new(BinaryExpr::new( + Arc::new(Column::new("name", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Utf8View(Some( + "hello".to_string(), + )))), + )), + vec![Precision::Exact(1)], + ), + ( + "largeutf8 equality", + vec![Field::new("name", DataType::LargeUtf8, false)], + vec![ColumnStatistics { + distinct_count: Precision::Inexact(50), + ..Default::default() + }], + Arc::new(BinaryExpr::new( + Arc::new(Column::new("name", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::LargeUtf8(Some( + "hello".to_string(), + )))), + )), + vec![Precision::Exact(1)], + ), + ( + "utf8 reversed (literal = column)", + vec![Field::new("name", DataType::Utf8, false)], + vec![ColumnStatistics { + distinct_count: Precision::Inexact(50), + ..Default::default() + }], + Arc::new(BinaryExpr::new( + Arc::new(Literal::new(ScalarValue::Utf8(Some("hello".to_string())))), + Operator::Eq, + Arc::new(Column::new("name", 0)), + )), + vec![Precision::Exact(1)], + ), + ( + "OR preserves original NDV", + vec![Field::new("name", DataType::Utf8, false)], + vec![ColumnStatistics { + distinct_count: Precision::Inexact(50), + ..Default::default() + }], + Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("name", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Utf8(Some("a".to_string())))), + )), + Operator::Or, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("name", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Utf8(Some("b".to_string())))), + )), + )), + vec![Precision::Inexact(50)], + ), + ( + "AND with mixed types (Utf8 + Int32)", + vec![ + Field::new("name", DataType::Utf8, false), + Field::new("age", DataType::Int32, false), + ], + vec![ + ColumnStatistics { + distinct_count: Precision::Inexact(50), + ..Default::default() + }, + ColumnStatistics { + distinct_count: Precision::Inexact(80), + ..Default::default() + }, + ], + Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("name", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Utf8(Some( + "hello".to_string(), + )))), + )), + Operator::And, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("age", 1)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + )), + )), + vec![Precision::Exact(1), Precision::Exact(1)], + ), + ( + "numeric equality with min/max bounds (interval analysis path)", + vec![Field::new("a", DataType::Int32, false)], + vec![ColumnStatistics { min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), distinct_count: Precision::Inexact(80), ..Default::default() }], - }, - schema.clone(), - )); + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + )), + vec![Precision::Exact(1)], + ), + ( + "timestamp equality", + vec![Field::new( + "ts", + DataType::Timestamp(arrow::datatypes::TimeUnit::Nanosecond, None), + false, + )], + vec![ColumnStatistics { + distinct_count: Precision::Inexact(500), + ..Default::default() + }], + Arc::new(BinaryExpr::new( + Arc::new(Column::new("ts", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::TimestampNanosecond( + Some(1_609_459_200_000_000_000), + None, + ))), + )), + vec![Precision::Exact(1)], + ), + ( + "contradictory numeric equality (infeasible)", + vec![Field::new("a", DataType::Int32, false)], + vec![ColumnStatistics { + distinct_count: Precision::Inexact(50), + ..Default::default() + }], + Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + )), + Operator::And, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(99)))), + )), + )), + vec![Precision::Exact(0)], + ), + ( + "utf8 equality with absent input NDV", + vec![Field::new("name", DataType::Utf8, false)], + vec![ColumnStatistics { + distinct_count: Precision::Absent, + ..Default::default() + }], + Arc::new(BinaryExpr::new( + Arc::new(Column::new("name", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Utf8(Some("hello".to_string())))), + )), + vec![Precision::Exact(1)], + ), + ( + "contradictory utf8 equality (infeasible)", + vec![Field::new("name", DataType::Utf8, false)], + vec![ColumnStatistics { + distinct_count: Precision::Inexact(100), + ..Default::default() + }], + Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("name", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Utf8(Some( + "alice".to_string(), + )))), + )), + Operator::And, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("name", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Utf8(Some( + "bob".to_string(), + )))), + )), + )), + vec![Precision::Exact(0)], + ), + ( + "redundant same-value equality combined with another column", + vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ], + vec![ + ColumnStatistics { + distinct_count: Precision::Inexact(80), + ..Default::default() + }, + ColumnStatistics { + distinct_count: Precision::Inexact(40), + ..Default::default() + }, + ], + Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + )), + Operator::And, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + )), + )), + Operator::And, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("b", 1)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(2)))), + )), + )), + vec![Precision::Exact(1), Precision::Exact(1)], + ), + ]; - // a = 42 OR a = 22: interval stays [1, 100], not a single value - let predicate = Arc::new(BinaryExpr::new( - Arc::new(BinaryExpr::new( - Arc::new(Column::new("a", 0)), - Operator::Eq, - Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), - )), - Operator::Or, - Arc::new(BinaryExpr::new( - Arc::new(Column::new("a", 0)), - Operator::Eq, - Arc::new(Literal::new(ScalarValue::Int32(Some(22)))), - )), - )); - let filter: Arc = - Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.partition_statistics(None)?; - assert_eq!( - statistics.column_statistics[0].distinct_count, - Precision::Inexact(80) - ); + for (desc, fields, col_stats, predicate, expected_ndvs) in cases { + let schema = Schema::new(fields); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(1000), + column_statistics: col_stats, + }, + schema.clone(), + )); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + let statistics = filter.partition_statistics(None)?; + + for (i, expected) in expected_ndvs.iter().enumerate() { + assert_eq!( + statistics.column_statistics[i].distinct_count, *expected, + "case '{desc}': column {i} NDV mismatch" + ); + } + } Ok(()) } @@ -2588,6 +2892,211 @@ mod tests { Ok(()) } + #[test] + fn test_collect_equality_columns() { + use std::collections::HashSet; + // (description, predicate, expected_column_indices, expected_infeasible) + #[expect(clippy::type_complexity)] + let cases: Vec<(&str, Arc, Vec, bool)> = vec![ + ( + "simple col = literal", + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + )), + vec![0], + false, + ), + ( + "reversed literal = col", + Arc::new(BinaryExpr::new( + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + Operator::Eq, + Arc::new(Column::new("a", 0)), + )), + vec![0], + false, + ), + ( + "AND with two equalities", + Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + )), + Operator::And, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("b", 1)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Utf8(Some( + "hello".to_string(), + )))), + )), + )), + vec![0, 1], + false, + ), + ( + "OR produces empty set", + Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + )), + Operator::Or, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(99)))), + )), + )), + vec![], + false, + ), + ( + "greater-than produces empty set", + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + )), + vec![], + false, + ), + ( + "col = col produces empty set", + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Column::new("b", 1)), + )), + vec![], + false, + ), + ( + "nested AND with three equalities", + Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + )), + Operator::And, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("b", 1)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(2)))), + )), + )), + Operator::And, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("c", 2)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(3)))), + )), + )), + vec![0, 1, 2], + false, + ), + ( + "AND with mixed equality and non-equality", + Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + )), + Operator::And, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("b", 1)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))), + )), + )), + vec![0], + false, + ), + ( + "col = NULL is excluded", + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(None))), + )), + vec![], + false, + ), + ( + "NULL = col is excluded", + Arc::new(BinaryExpr::new( + Arc::new(Literal::new(ScalarValue::Utf8(None))), + Operator::Eq, + Arc::new(Column::new("a", 0)), + )), + vec![], + false, + ), + ( + "contradictory: same col, different literals", + Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Utf8(Some( + "alice".to_string(), + )))), + )), + Operator::And, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Utf8(Some( + "bob".to_string(), + )))), + )), + )), + vec![0], + true, + ), + ( + "same col, same literal is not contradictory", + Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + )), + Operator::And, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + )), + )), + vec![0], + false, + ), + ]; + + for (desc, expr, expected_cols, expected_infeasible) in cases { + let (result, infeasible) = collect_equality_columns(&expr); + let expected: HashSet = expected_cols.into_iter().collect(); + if expected_infeasible { + // When infeasible, the scan is short-circuited, so we only + // assert the infeasibility flag — the partial column set + // contents are an implementation detail. + assert!(infeasible, "case '{desc}': expected infeasible"); + } else { + assert_eq!(result, expected, "case '{desc}': columns mismatch"); + assert!(!infeasible, "case '{desc}': expected feasible"); + } + } + } + /// Regression test: ProjectionExec on top of a FilterExec that already has /// an explicit projection must not panic when `try_swapping_with_projection` /// attempts to swap the two nodes.