diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 561c6b3b246ff..8597f28c07b45 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -272,7 +272,7 @@ async fn sql_filter() -> Result<()> { let physical_plan = df.create_physical_plan().await.unwrap(); let stats = physical_plan.partition_statistics(None)?; - assert_eq!(stats.num_rows, Precision::Inexact(1)); + assert_eq!(stats.num_rows, Precision::Inexact(7)); Ok(()) } diff --git a/datafusion/physical-expr/src/analysis.rs b/datafusion/physical-expr/src/analysis.rs index 11a60afc90a10..ad54d382c95ea 100644 --- a/datafusion/physical-expr/src/analysis.rs +++ b/datafusion/physical-expr/src/analysis.rs @@ -167,6 +167,7 @@ pub fn analyze( schema: &Schema, ) -> Result { let initial_boundaries = &context.boundaries; + if initial_boundaries .iter() .all(|bound| bound.interval.is_none()) @@ -259,6 +260,44 @@ fn shrink_boundaries( Ok(AnalysisContext::new(target_boundaries).with_selectivity(selectivity)) } +/// Returns `Some(1.0 / distinct_count)` when the filter demonstrably collapsed +/// a non-singleton interval down to a single point, i.e. an equality predicate +/// was applied. Returns `None` in all other cases, signalling that the caller +/// should fall back to [`cardinality_ratio`]. +/// +/// The `initial_interval` guard prevents double-counting selectivity when the +/// column statistics already described a singleton before any filter was +/// applied: if the initial interval was already the same single point, no +/// additional selectivity has been gained and the `1 / NDV` shortcut must not +/// fire. +fn singleton_selectivity( + initial_interval: &Interval, + target_interval: &Interval, + distinct_count: usize, +) -> Option { + // The target must have collapsed to a single non-null value. + if distinct_count == 0 + || target_interval.lower().is_null() + || target_interval.lower() != target_interval.upper() + { + return None; + } + + // Only treat this as a newly-applied equality filter when the initial + // interval was not already that same singleton. If it was, the stats + // already encoded this restriction and applying 1/NDV again would + // under-estimate the row count. + let initial_is_same_singleton = !initial_interval.lower().is_null() + && initial_interval.lower() == initial_interval.upper() + && initial_interval.lower() == target_interval.lower(); + + if initial_is_same_singleton { + return None; + } + + Some(1.0 / distinct_count as f64) +} + /// This function calculates the filter predicate's selectivity by comparing /// the initial and pruned column boundaries. Selectivity is defined as the /// ratio of rows in a table that satisfy the filter's predicate. @@ -277,8 +316,19 @@ fn calculate_selectivity( let mut acc: f64 = 1.0; for (initial, target) in initial_boundaries.iter().zip(target_boundaries) { match (initial.interval.as_ref(), target.interval.as_ref()) { - (Some(initial), Some(target)) => { - acc *= cardinality_ratio(initial, target); + (Some(initial_interval), Some(target_interval)) => { + if let Precision::Exact(distinct_count) + | Precision::Inexact(distinct_count) = target.distinct_count + && let Some(s) = singleton_selectivity( + initial_interval, + target_interval, + distinct_count, + ) + { + acc *= s; + continue; + } + acc *= cardinality_ratio(initial_interval, target_interval); } (None, Some(_)) => { return internal_err!( @@ -297,14 +347,14 @@ mod tests { use std::sync::Arc; use arrow::datatypes::{DataType, Field, Schema}; - use datafusion_common::{DFSchema, assert_contains}; + use datafusion_common::{DFSchema, ScalarValue, assert_contains, stats::Precision}; use datafusion_expr::{ Expr, col, execution_props::ExecutionProps, interval_arithmetic::Interval, lit, }; - use crate::{AnalysisContext, create_physical_expr}; + use crate::{AnalysisContext, create_physical_expr, expressions::Column}; - use super::{ExprBoundaries, analyze}; + use super::{ExprBoundaries, analyze, calculate_selectivity, singleton_selectivity}; fn make_field(name: &str, data_type: DataType) -> Field { let nullable = false; @@ -435,4 +485,92 @@ mod tests { .unwrap_err(); assert_contains!(analysis_error.to_string(), expected_error); } + + // --------------------------------------------------------------------------- + // Unit tests for singleton_selectivity and calculate_selectivity + // --------------------------------------------------------------------------- + + fn make_boundary(lower: i32, upper: i32, distinct_count: usize) -> ExprBoundaries { + ExprBoundaries { + column: Column::new("a", 0), + interval: Some( + Interval::try_new( + ScalarValue::Int32(Some(lower)), + ScalarValue::Int32(Some(upper)), + ) + .unwrap(), + ), + distinct_count: Precision::Exact(distinct_count), + } + } + + /// When the initial interval is already the same singleton as the target, + /// `singleton_selectivity` must return `None` so we do not double-apply + /// 1/NDV selectivity. + #[test] + fn test_singleton_selectivity_skipped_when_initial_is_same_singleton() { + let singleton = + Interval::try_new(ScalarValue::Int32(Some(5)), ScalarValue::Int32(Some(5))) + .unwrap(); + // Both initial and target are [5, 5] — no new equality filter was applied. + assert_eq!( + singleton_selectivity(&singleton, &singleton, 10), + None, + "shortcut must not fire when initial interval was already the same singleton" + ); + } + + /// When the initial interval is a broader range and the target collapses to + /// a singleton, `singleton_selectivity` must return `Some(1/NDV)`. + #[test] + fn test_singleton_selectivity_applied_when_range_collapses() { + let initial = + Interval::try_new(ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(100))) + .unwrap(); + let target = + Interval::try_new(ScalarValue::Int32(Some(5)), ScalarValue::Int32(Some(5))) + .unwrap(); + let result = singleton_selectivity(&initial, &target, 10); + assert_eq!( + result, + Some(0.1), + "shortcut must return 1/NDV when a range collapses to a singleton" + ); + } + + /// Regression test: `calculate_selectivity` must not apply the `1/NDV` + /// shortcut when the column statistics already describe a singleton interval + /// (i.e. before the filter, the column only ever held one value). In that + /// case the target and initial intervals are the same singleton, so the + /// cardinality ratio is 1.0 and the overall selectivity should remain 1.0. + #[test] + fn test_calculate_selectivity_already_singleton_initial_interval() { + let already_singleton = make_boundary(7, 7, 1); + + let selectivity = calculate_selectivity( + std::slice::from_ref(&already_singleton), + std::slice::from_ref(&already_singleton), + ) + .unwrap(); + + let wide_initial = make_boundary(1, 100, 50); + let same_singleton_target = make_boundary(7, 7, 50); + let selectivity_new = + calculate_selectivity(&[same_singleton_target], &[wide_initial]).unwrap(); + assert!( + (selectivity_new - 0.02).abs() < 1e-10, + "expected selectivity 1/NDV = 0.02, got {selectivity_new}" + ); + + let singleton_initial = make_boundary(7, 7, 50); + let singleton_target = make_boundary(7, 7, 50); + let selectivity_no_new_filter = + calculate_selectivity(&[singleton_target], &[singleton_initial]).unwrap(); + assert!( + (selectivity_no_new_filter - 1.0).abs() < 1e-10, + "expected selectivity 1.0 when initial was already the same singleton, got {selectivity_no_new_filter}" + ); + + let _ = selectivity; // silence unused warning + } }