From 510960b97e4739d43e5155d33051e09773494b3a Mon Sep 17 00:00:00 2001 From: yangzhong Date: Mon, 16 Jan 2023 18:05:30 +0800 Subject: [PATCH 1/2] Refine the statistics estimation for the limit and aggregate operator --- .../core/src/physical_plan/aggregates/mod.rs | 7 +- datafusion/core/src/physical_plan/limit.rs | 81 +++++++++++++++++-- 2 files changed, 79 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 8044f4c15781f..7556eadfbb631 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -515,7 +515,12 @@ impl ExecutionPlan for AggregateExec { ..Default::default() } } - _ => Statistics::default(), + _ => Statistics { + // the output row count is surely not larger than its input row count + num_rows: self.input.statistics().num_rows, + is_exact: false, + ..Default::default() + }, } } } diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs index eec66eb70a41f..c7299fea3ee07 100644 --- a/datafusion/core/src/physical_plan/limit.rs +++ b/datafusion/core/src/physical_plan/limit.rs @@ -189,6 +189,17 @@ impl ExecutionPlan for GlobalLimitExec { fn statistics(&self) -> Statistics { let input_stats = self.input.statistics(); let skip = self.skip; + // the maximum row number needs to be fetched + let max_row_num = self + .fetch + .map(|fetch| { + if fetch >= usize::MAX - skip { + usize::MAX + } else { + fetch + skip + } + }) + .unwrap_or(usize::MAX); match input_stats { Statistics { num_rows: Some(nr), .. @@ -200,22 +211,25 @@ impl ExecutionPlan for GlobalLimitExec { is_exact: input_stats.is_exact, ..Default::default() } - } else if nr - skip <= self.fetch.unwrap_or(usize::MAX) { + } else if nr <= max_row_num { // if the input does not reach the "fetch" globally, return input stats input_stats - } else if nr - skip > self.fetch.unwrap_or(usize::MAX) { + } else { // if the input is greater than the "fetch", the num_row will be the "fetch", // but we won't be able to predict the other statistics Statistics { - num_rows: self.fetch, + num_rows: Some(max_row_num), is_exact: input_stats.is_exact, ..Default::default() } - } else { - Statistics::default() } } - _ => Statistics::default(), + _ => Statistics { + // the result output row number will always be no greater than the limit number + num_rows: Some(max_row_num), + is_exact: false, + ..Default::default() + }, } } } @@ -353,8 +367,12 @@ impl ExecutionPlan for LocalLimitExec { is_exact: input_stats.is_exact, ..Default::default() }, - // if we don't know the input size, we can't predict the limit's behaviour - _ => Statistics::default(), + _ => Statistics { + // the result output row number will always be no greater than the limit number + num_rows: Some(self.fetch * self.output_partitioning().partition_count()), + is_exact: false, + ..Default::default() + }, } } } @@ -623,4 +641,51 @@ mod tests { assert_eq!(row_count, 0); Ok(()) } + + #[tokio::test] + async fn test_row_number_statistics_for_global_limit() -> Result<()> { + let row_count = row_number_statistics_for_global_limit(0, Some(10)).await?; + assert_eq!(row_count, Some(10)); + + let row_count = row_number_statistics_for_global_limit(5, Some(10)).await?; + assert_eq!(row_count, Some(15)); + + Ok(()) + } + + #[tokio::test] + async fn test_row_number_statistics_for_local_limit() -> Result<()> { + let row_count = row_number_statistics_for_local_limit(4, 10).await?; + assert_eq!(row_count, Some(40)); + + Ok(()) + } + + async fn row_number_statistics_for_global_limit( + skip: usize, + fetch: Option, + ) -> Result> { + let num_partitions = 4; + let csv = test::scan_partitioned_csv(num_partitions)?; + + assert_eq!(csv.output_partitioning().partition_count(), num_partitions); + + let offset = + GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), skip, fetch); + + Ok(offset.statistics().num_rows) + } + + async fn row_number_statistics_for_local_limit( + num_partitions: usize, + fetch: usize, + ) -> Result> { + let csv = test::scan_partitioned_csv(num_partitions)?; + + assert_eq!(csv.output_partitioning().partition_count(), num_partitions); + + let offset = LocalLimitExec::new(csv, fetch); + + Ok(offset.statistics().num_rows) + } } From e9b819f7ea9b52fdb50c0a3f96fd841d7bd3e5a2 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Mon, 16 Jan 2023 18:07:47 +0800 Subject: [PATCH 2/2] Fix cargo clippy --- datafusion/optimizer/src/push_down_filter.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index e53c35b85a3ca..92a11bd7e6b64 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -550,9 +550,9 @@ impl OptimizerRule for PushDownFilter { ) .map(|e| (*e).clone()) .collect::>(); - let new_predicate = conjunction(new_predicates).ok_or( - DataFusionError::Plan("at least one expression exists".to_string()), - )?; + let new_predicate = conjunction(new_predicates).ok_or_else(|| { + DataFusionError::Plan("at least one expression exists".to_string()) + })?; let new_plan = LogicalPlan::Filter(Filter::try_new( new_predicate, child_filter.input.clone(),