From 9d926575ce5016b3f12a3e93a07caaa8be66be19 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Sat, 7 Mar 2026 16:45:59 -0600 Subject: [PATCH 1/6] feat: Use NDV for equality filter selectivity calculation --- datafusion/physical-expr/src/analysis.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/analysis.rs b/datafusion/physical-expr/src/analysis.rs index 11a60afc90a10..89bdbefa60790 100644 --- a/datafusion/physical-expr/src/analysis.rs +++ b/datafusion/physical-expr/src/analysis.rs @@ -167,6 +167,7 @@ pub fn analyze( schema: &Schema, ) -> Result { let initial_boundaries = &context.boundaries; + if initial_boundaries .iter() .all(|bound| bound.interval.is_none()) @@ -277,8 +278,21 @@ fn calculate_selectivity( let mut acc: f64 = 1.0; for (initial, target) in initial_boundaries.iter().zip(target_boundaries) { match (initial.interval.as_ref(), target.interval.as_ref()) { - (Some(initial), Some(target)) => { - acc *= cardinality_ratio(initial, target); + (Some(initial_interval), Some(target_interval)) => { + // If it is equality predicate, calculate selectivity as `1 / distinct_count` + if !target_interval.lower().is_null() + && target_interval.lower() == target_interval.upper() + { + if let Precision::Exact(distinct_count) + | Precision::Inexact(distinct_count) = target.distinct_count + { + if distinct_count > 0 { + acc *= 1.0 / distinct_count as f64; + continue; + } + } + } + acc *= cardinality_ratio(initial_interval, target_interval); } (None, Some(_)) => { return internal_err!( From 8faf016b550d1f1ebe2c7cef691b1925083465e0 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Sat, 7 Mar 2026 17:01:59 -0600 Subject: [PATCH 2/6] fmt clippy --- datafusion/physical-expr/src/analysis.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-expr/src/analysis.rs b/datafusion/physical-expr/src/analysis.rs index 89bdbefa60790..8859b349963f6 100644 --- a/datafusion/physical-expr/src/analysis.rs +++ b/datafusion/physical-expr/src/analysis.rs @@ -280,17 +280,14 @@ fn calculate_selectivity( match (initial.interval.as_ref(), target.interval.as_ref()) { (Some(initial_interval), Some(target_interval)) => { // If it is equality predicate, calculate selectivity as `1 / distinct_count` - if !target_interval.lower().is_null() + if let Precision::Exact(distinct_count) + | Precision::Inexact(distinct_count) = target.distinct_count + && distinct_count > 0 + && !target_interval.lower().is_null() && target_interval.lower() == target_interval.upper() { - if let Precision::Exact(distinct_count) - | Precision::Inexact(distinct_count) = target.distinct_count - { - if distinct_count > 0 { - acc *= 1.0 / distinct_count as f64; - continue; - } - } + acc *= 1.0 / distinct_count as f64; + continue; } acc *= cardinality_ratio(initial_interval, target_interval); } From 0c571c78abef034dcda94e95181ebc9dd12a3db8 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Sat, 7 Mar 2026 17:14:46 -0600 Subject: [PATCH 3/6] update --- datafusion/core/tests/custom_sources_cases/statistics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index eb81c9f24dfd6..34fd94bf7c74e 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -272,7 +272,7 @@ async fn sql_filter() -> Result<()> { let physical_plan = df.create_physical_plan().await.unwrap(); let stats = physical_plan.partition_statistics(None)?; - assert_eq!(stats.num_rows, Precision::Inexact(1)); + assert_eq!(stats.num_rows, Precision::Inexact(7)); Ok(()) } From 090f62c61ff372804bef7df78327242748a85ec3 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Mon, 23 Mar 2026 21:58:07 -0500 Subject: [PATCH 4/6] apply fixes --- datafusion/physical-expr/src/analysis.rs | 139 +++++++++++++++++++++-- 1 file changed, 130 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-expr/src/analysis.rs b/datafusion/physical-expr/src/analysis.rs index 8859b349963f6..26bb203b0827c 100644 --- a/datafusion/physical-expr/src/analysis.rs +++ b/datafusion/physical-expr/src/analysis.rs @@ -260,6 +260,44 @@ fn shrink_boundaries( Ok(AnalysisContext::new(target_boundaries).with_selectivity(selectivity)) } +/// Returns `Some(1.0 / distinct_count)` when the filter demonstrably collapsed +/// a non-singleton interval down to a single point, i.e. an equality predicate +/// was applied. Returns `None` in all other cases, signalling that the caller +/// should fall back to [`cardinality_ratio`]. +/// +/// The `initial_interval` guard prevents double-counting selectivity when the +/// column statistics already described a singleton before any filter was +/// applied: if the initial interval was already the same single point, no +/// additional selectivity has been gained and the `1 / NDV` shortcut must not +/// fire. +fn singleton_selectivity( + initial_interval: &Interval, + target_interval: &Interval, + distinct_count: usize, +) -> Option { + // The target must have collapsed to a single non-null value. + if distinct_count == 0 + || target_interval.lower().is_null() + || target_interval.lower() != target_interval.upper() + { + return None; + } + + // Only treat this as a newly-applied equality filter when the initial + // interval was not already that same singleton. If it was, the stats + // already encoded this restriction and applying 1/NDV again would + // under-estimate the row count. + let initial_is_same_singleton = !initial_interval.lower().is_null() + && initial_interval.lower() == initial_interval.upper() + && initial_interval.lower() == target_interval.lower(); + + if initial_is_same_singleton { + return None; + } + + Some(1.0 / distinct_count as f64) +} + /// This function calculates the filter predicate's selectivity by comparing /// the initial and pruned column boundaries. Selectivity is defined as the /// ratio of rows in a table that satisfy the filter's predicate. @@ -279,15 +317,17 @@ fn calculate_selectivity( for (initial, target) in initial_boundaries.iter().zip(target_boundaries) { match (initial.interval.as_ref(), target.interval.as_ref()) { (Some(initial_interval), Some(target_interval)) => { - // If it is equality predicate, calculate selectivity as `1 / distinct_count` if let Precision::Exact(distinct_count) | Precision::Inexact(distinct_count) = target.distinct_count - && distinct_count > 0 - && !target_interval.lower().is_null() - && target_interval.lower() == target_interval.upper() { - acc *= 1.0 / distinct_count as f64; - continue; + if let Some(s) = singleton_selectivity( + initial_interval, + target_interval, + distinct_count, + ) { + acc *= s; + continue; + } } acc *= cardinality_ratio(initial_interval, target_interval); } @@ -308,14 +348,14 @@ mod tests { use std::sync::Arc; use arrow::datatypes::{DataType, Field, Schema}; - use datafusion_common::{DFSchema, assert_contains}; + use datafusion_common::{DFSchema, ScalarValue, assert_contains, stats::Precision}; use datafusion_expr::{ Expr, col, execution_props::ExecutionProps, interval_arithmetic::Interval, lit, }; - use crate::{AnalysisContext, create_physical_expr}; + use crate::{AnalysisContext, create_physical_expr, expressions::Column}; - use super::{ExprBoundaries, analyze}; + use super::{ExprBoundaries, analyze, calculate_selectivity, singleton_selectivity}; fn make_field(name: &str, data_type: DataType) -> Field { let nullable = false; @@ -446,4 +486,85 @@ mod tests { .unwrap_err(); assert_contains!(analysis_error.to_string(), expected_error); } + fn make_boundary(lower: i32, upper: i32, distinct_count: usize) -> ExprBoundaries { + ExprBoundaries { + column: Column::new("a", 0), + interval: Some( + Interval::try_new( + ScalarValue::Int32(Some(lower)), + ScalarValue::Int32(Some(upper)), + ) + .unwrap(), + ), + distinct_count: Precision::Exact(distinct_count), + } + } + + /// When the initial interval is already the same singleton as the target, + /// `singleton_selectivity` must return `None` so we do not double-apply + /// 1/NDV selectivity. + #[test] + fn test_singleton_selectivity_skipped_when_initial_is_same_singleton() { + let singleton = + Interval::try_new(ScalarValue::Int32(Some(5)), ScalarValue::Int32(Some(5))) + .unwrap(); + // Both initial and target are [5, 5] — no new equality filter was applied. + assert_eq!( + singleton_selectivity(&singleton, &singleton, 10), + None, + "shortcut must not fire when initial interval was already the same singleton" + ); + } + + /// When the initial interval is a broader range and the target collapses to + /// a singleton, `singleton_selectivity` must return `Some(1/NDV)`. + #[test] + fn test_singleton_selectivity_applied_when_range_collapses() { + let initial = + Interval::try_new(ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(100))) + .unwrap(); + let target = + Interval::try_new(ScalarValue::Int32(Some(5)), ScalarValue::Int32(Some(5))) + .unwrap(); + let result = singleton_selectivity(&initial, &target, 10); + assert_eq!( + result, + Some(0.1), + "shortcut must return 1/NDV when a range collapses to a singleton" + ); + } + + /// Regression test: `calculate_selectivity` must not apply the `1/NDV` + /// shortcut when the column statistics already describe a singleton interval + /// (i.e. before the filter, the column only ever held one value). In that + /// case the target and initial intervals are the same singleton, so the + /// cardinality ratio is 1.0 and the overall selectivity should remain 1.0. + #[test] + fn test_calculate_selectivity_already_singleton_initial_interval() { + let already_singleton = make_boundary(7, 7, 1); + + let selectivity = + calculate_selectivity(&[already_singleton.clone()], &[already_singleton]) + .unwrap(); + + let wide_initial = make_boundary(1, 100, 50); + let same_singleton_target = make_boundary(7, 7, 50); + let selectivity_new = + calculate_selectivity(&[same_singleton_target], &[wide_initial]).unwrap(); + assert!( + (selectivity_new - 0.02).abs() < 1e-10, + "expected selectivity 1/NDV = 0.02, got {selectivity_new}" + ); + + let singleton_initial = make_boundary(7, 7, 50); + let singleton_target = make_boundary(7, 7, 50); + let selectivity_no_new_filter = + calculate_selectivity(&[singleton_target], &[singleton_initial]).unwrap(); + assert!( + (selectivity_no_new_filter - 1.0).abs() < 1e-10, + "expected selectivity 1.0 when initial was already the same singleton, got {selectivity_no_new_filter}" + ); + + let _ = selectivity; // silence unused warning + } } From cc00095e3c10e819be7e36c5599757cc3ad0831d Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Tue, 24 Mar 2026 10:46:10 -0500 Subject: [PATCH 5/6] clippy --- datafusion/physical-expr/src/analysis.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-expr/src/analysis.rs b/datafusion/physical-expr/src/analysis.rs index 26bb203b0827c..b057cae1dc482 100644 --- a/datafusion/physical-expr/src/analysis.rs +++ b/datafusion/physical-expr/src/analysis.rs @@ -319,15 +319,14 @@ fn calculate_selectivity( (Some(initial_interval), Some(target_interval)) => { if let Precision::Exact(distinct_count) | Precision::Inexact(distinct_count) = target.distinct_count - { - if let Some(s) = singleton_selectivity( + && let Some(s) = singleton_selectivity( initial_interval, target_interval, distinct_count, - ) { - acc *= s; - continue; - } + ) + { + acc *= s; + continue; } acc *= cardinality_ratio(initial_interval, target_interval); } From 481394b472f105319b5d2c767ec85998a6fc0c37 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Tue, 24 Mar 2026 20:00:48 -0500 Subject: [PATCH 6/6] clippy --- datafusion/physical-expr/src/analysis.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-expr/src/analysis.rs b/datafusion/physical-expr/src/analysis.rs index b057cae1dc482..ad54d382c95ea 100644 --- a/datafusion/physical-expr/src/analysis.rs +++ b/datafusion/physical-expr/src/analysis.rs @@ -485,6 +485,11 @@ mod tests { .unwrap_err(); assert_contains!(analysis_error.to_string(), expected_error); } + + // --------------------------------------------------------------------------- + // Unit tests for singleton_selectivity and calculate_selectivity + // --------------------------------------------------------------------------- + fn make_boundary(lower: i32, upper: i32, distinct_count: usize) -> ExprBoundaries { ExprBoundaries { column: Column::new("a", 0), @@ -542,9 +547,11 @@ mod tests { fn test_calculate_selectivity_already_singleton_initial_interval() { let already_singleton = make_boundary(7, 7, 1); - let selectivity = - calculate_selectivity(&[already_singleton.clone()], &[already_singleton]) - .unwrap(); + let selectivity = calculate_selectivity( + std::slice::from_ref(&already_singleton), + std::slice::from_ref(&already_singleton), + ) + .unwrap(); let wide_initial = make_boundary(1, 100, 50); let same_singleton_target = make_boundary(7, 7, 50);