From f75c7dc2c61a85baede4698c5fc456803d46a925 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Mon, 31 Mar 2025 19:12:25 +0800 Subject: [PATCH 01/21] save --- datafusion/datasource/src/source.rs | 17 +++++++++++++++++ datafusion/physical-plan/src/execution_plan.rs | 10 ++++++++++ datafusion/physical-plan/src/repartition/mod.rs | 4 ++++ datafusion/physical-plan/src/sorts/sort.rs | 4 ++++ datafusion/physical-plan/src/union.rs | 11 +++++++++++ 5 files changed, 46 insertions(+) diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 2d6ea1a8b3915..f0ed08708ba31 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -188,6 +188,23 @@ impl ExecutionPlan for DataSourceExec { self.data_source.statistics() } + fn statistics_by_partition(&self) -> datafusion_common::Result> { + let mut statistics = vec![ + Statistics::new_unknown(&self.schema()); + self.properties().partitioning.partition_count() + ]; + if let Some(file_config) = + self.data_source.as_any().downcast_ref::() + { + for (idx, file_group) in file_config.file_groups.iter().enumerate() { + if let Some(stat) = file_group.statistics() { + statistics[idx] = stat.clone(); + } + } + } + Ok(statistics) + } + fn with_fetch(&self, limit: Option) -> Option> { let data_source = self.data_source.with_fetch(limit)?; let cache = self.cache.clone(); diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 2b6eac7be0675..ab69ac9af32a2 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -430,6 +430,16 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { Ok(Statistics::new_unknown(&self.schema())) } + /// Returns statistics for each partition of this `ExecutionPlan` node. + /// If statistics are not available, returns an array of + /// [`Statistics::new_unknown`] for each partition. + fn statistics_by_partition(&self) -> Result> { + Ok(vec![ + Statistics::new_unknown(&self.schema()); + self.properties().partitioning.partition_count() + ]) + } + /// Returns `true` if a limit can be safely pushed down through this /// `ExecutionPlan` node. /// diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index c480fc2abaa1a..2bbaddd8fca07 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -691,6 +691,10 @@ impl ExecutionPlan for RepartitionExec { self.input.statistics() } + fn statistics_by_partition(&self) -> Result> { + todo!() + } + fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Equal } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 9d0f34cc7f0fd..3bcd7050da486 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1300,6 +1300,10 @@ impl ExecutionPlan for SortExec { Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1) } + fn statistics_by_partition(&self) -> Result> { + todo!() + } + fn with_fetch(&self, limit: Option) -> Option> { Some(Arc::new(SortExec::with_fetch(self, limit))) } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 2b666093f29e0..7225b1eee4dc6 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -270,6 +270,17 @@ impl ExecutionPlan for UnionExec { .unwrap_or_else(|| Statistics::new_unknown(&self.schema()))) } + fn statistics_by_partition(&self) -> Result> { + Ok(self + .inputs + .iter() + .map(|child| child.statistics_by_partition()) + .collect::>>()? + .into_iter() + .flatten() + .collect()) + } + fn benefits_from_input_partitioning(&self) -> Vec { vec![false; self.children().len()] } From 2b4a14feb773e1efbc126461a22b8cdbdebd25b3 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Tue, 1 Apr 2025 19:14:49 +0800 Subject: [PATCH 02/21] save --- .../datasource/physical_plan/arrow_file.rs | 5 +++ datafusion/datasource-avro/src/source.rs | 4 ++ datafusion/datasource-csv/src/source.rs | 4 ++ .../physical-plan/src/coalesce_batches.rs | 4 ++ .../physical-plan/src/coalesce_partitions.rs | 4 ++ datafusion/physical-plan/src/filter.rs | 32 ++++++++++++--- .../physical-plan/src/joins/cross_join.rs | 25 ++++++++++++ .../physical-plan/src/joins/hash_join.rs | 4 +- .../src/joins/nested_loop_join.rs | 4 +- .../src/joins/sort_merge_join.rs | 4 +- datafusion/physical-plan/src/joins/utils.rs | 7 +--- datafusion/physical-plan/src/limit.rs | 4 ++ datafusion/physical-plan/src/sorts/sort.rs | 13 +++++- .../src/windows/bounded_window_agg_exec.rs | 40 +++++++++++++------ 14 files changed, 123 insertions(+), 31 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index f0a1f94d87e1f..ba23cf4095398 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -190,6 +190,11 @@ impl ExecutionPlan for ArrowExec { fn statistics(&self) -> Result { self.inner.statistics() } + + fn statistics_by_partition(&self) -> Result> { + self.inner.statistics_by_partition() + } + fn fetch(&self) -> Option { self.inner.fetch() } diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index ce3722e7b11ee..5c662511929c7 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -141,6 +141,10 @@ impl ExecutionPlan for AvroExec { self.inner.statistics() } + fn statistics_by_partition(&self) -> Result> { + self.inner.statistics_by_partition() + } + fn metrics(&self) -> Option { self.inner.metrics() } diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index f5d45cd3fc881..22fbb288c0396 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -381,6 +381,10 @@ impl ExecutionPlan for CsvExec { self.inner.statistics() } + fn statistics_by_partition(&self) -> Result> { + self.inner.statistics_by_partition() + } + fn metrics(&self) -> Option { self.inner.metrics() } diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index faab5fdc5eb6c..314e5548b36a9 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -199,6 +199,10 @@ impl ExecutionPlan for CoalesceBatchesExec { Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1) } + fn statistics_by_partition(&self) -> Result> { + Ok(vec![self.statistics()?]) + } + fn with_fetch(&self, limit: Option) -> Option> { Some(Arc::new(CoalesceBatchesExec { input: Arc::clone(&self.input), diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 715dd159e7e8c..1de838e830c23 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -199,6 +199,10 @@ impl ExecutionPlan for CoalescePartitionsExec { Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1) } + fn statistics_by_partition(&self) -> Result> { + Ok(vec![self.statistics()?]) + } + fn supports_limit_pushdown(&self) -> bool { true } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 95fa67025e90d..f179caddc0785 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -174,12 +174,11 @@ impl FilterExec { /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics. fn statistics_helper( - input: &Arc, + schema: SchemaRef, + input_stats: Statistics, predicate: &Arc, default_selectivity: u8, ) -> Result { - let input_stats = input.statistics()?; - let schema = input.schema(); if !check_support(predicate, &schema) { let selectivity = default_selectivity as f64 / 100.0; let mut stats = input_stats.to_inexact(); @@ -193,7 +192,7 @@ impl FilterExec { let num_rows = input_stats.num_rows; let total_byte_size = input_stats.total_byte_size; let input_analysis_ctx = AnalysisContext::try_from_statistics( - &input.schema(), + &schema, &input_stats.column_statistics, )?; @@ -260,7 +259,12 @@ impl FilterExec { ) -> Result { // Combine the equal predicates with the input equivalence properties // to construct the equivalence properties: - let stats = Self::statistics_helper(input, predicate, default_selectivity)?; + let stats = Self::statistics_helper( + input.schema(), + input.statistics()?, + predicate, + default_selectivity, + )?; let mut eq_properties = input.equivalence_properties().clone(); let (equal_pairs, _) = collect_columns_from_predicate(predicate); for (lhs, rhs) in equal_pairs { @@ -401,13 +405,29 @@ impl ExecutionPlan for FilterExec { /// predicate's selectivity value can be determined for the incoming data. fn statistics(&self) -> Result { let stats = Self::statistics_helper( - &self.input, + self.schema(), + self.input().statistics()?, self.predicate(), self.default_selectivity, )?; Ok(stats.project(self.projection.as_ref())) } + fn statistics_by_partition(&self) -> Result> { + let input_stats = self.input.statistics_by_partition()?; + let mut stats = Vec::with_capacity(input_stats.len()); + for input_stat in input_stats { + let stat = Self::statistics_helper( + self.schema(), + input_stat, + self.predicate(), + self.default_selectivity, + )?; + stats.push(stat.project(self.projection.as_ref())); + } + Ok(stats) + } + fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::LowerEqual } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 8dd1addff15ce..f8e101f61dd61 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -343,6 +343,31 @@ impl ExecutionPlan for CrossJoinExec { )) } + fn statistics_by_partition(&self) -> Result> { + todo!() + /* + let left_stats = self.left.statistics_by_partition()?; + // Summarize the `left_stats` + let statistics = + compute_summary_statistics(file_group.iter(), &file_schema, |file| { + file.statistics.as_ref().map(|stats| stats.as_ref()) + }); + let right_stats = self.right.statistics_by_partition()?; + + if left_stats.is_empty() || right_stats.is_empty() { + return Ok(vec![]); + } + + let mut stats = Vec::new(); + for left in left_stats.iter() { + for right in right_stats.iter() { + stats.push(stats_cartesian_product(left.clone(), right.clone())); + } + } + Ok(stats) + */ + } + /// Tries to swap the projection with its input [`CrossJoinExec`]. If it can be done, /// it returns the new swapped version having the [`CrossJoinExec`] as the top plan. /// Otherwise, it returns None. diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index e8904db0f3eaf..50282e056777d 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -883,8 +883,8 @@ impl ExecutionPlan for HashJoinExec { // There are some special cases though, for example: // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` let stats = estimate_join_statistics( - Arc::clone(&self.left), - Arc::clone(&self.right), + self.left.statistics()?, + self.right.statistics()?, self.on.clone(), &self.join_type, &self.join_schema, diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index b902795950966..94b6725bb19cf 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -568,8 +568,8 @@ impl ExecutionPlan for NestedLoopJoinExec { fn statistics(&self) -> Result { estimate_join_statistics( - Arc::clone(&self.left), - Arc::clone(&self.right), + self.left.statistics()?, + self.right.statistics()?, vec![], &self.join_type, &self.join_schema, diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 89f2e3c911f89..28612540bce5b 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -518,8 +518,8 @@ impl ExecutionPlan for SortMergeJoinExec { // There are some special cases though, for example: // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` estimate_join_statistics( - Arc::clone(&self.left), - Arc::clone(&self.right), + self.left.statistics()?, + self.right.statistics()?, self.on.clone(), &self.join_type, &self.schema, diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 5516f172d5101..ce1860d20565c 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -403,15 +403,12 @@ struct PartialJoinStatistics { /// Estimate the statistics for the given join's output. pub(crate) fn estimate_join_statistics( - left: Arc, - right: Arc, + left_stats: Statistics, + right_stats: Statistics, on: JoinOn, join_type: &JoinType, schema: &Schema, ) -> Result { - let left_stats = left.statistics()?; - let right_stats = right.statistics()?; - let join_stats = estimate_join_cardinality(join_type, left_stats, right_stats, &on); let (num_rows, column_statistics) = match join_stats { Some(stats) => (Precision::Inexact(stats.num_rows), stats.column_statistics), diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 89cf47a6d6508..78bca3c794413 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -202,6 +202,10 @@ impl ExecutionPlan for GlobalLimitExec { ) } + fn statistics_by_partition(&self) -> Result> { + Ok(vec![self.statistics()?]) + } + fn fetch(&self) -> Option { self.fetch } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 3bcd7050da486..1334f3b091ecb 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1301,7 +1301,18 @@ impl ExecutionPlan for SortExec { } fn statistics_by_partition(&self) -> Result> { - todo!() + let input_stats = self.input.statistics_by_partition()?; + let mut stats = Vec::with_capacity(input_stats.len()); + for stat in input_stats { + stats.push(Statistics::with_fetch( + stat, + self.schema(), + self.fetch, + 0, + 1, + )?); + } + Ok(stats) } fn with_fetch(&self, limit: Option) -> Option> { diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 92138bf6a7a1a..af6c5f29653ee 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -226,6 +226,23 @@ impl BoundedWindowAggExec { .unwrap_or_else(Vec::new) } } + + fn statistics_helper(&self, statistics: Statistics) -> Result { + let win_cols = self.window_expr.len(); + let input_cols = self.input.schema().fields().len(); + // TODO stats: some windowing function will maintain invariants such as min, max... + let mut column_statistics = Vec::with_capacity(win_cols + input_cols); + // copy stats of the input to the beginning of the schema. + column_statistics.extend(statistics.column_statistics); + for _ in 0..win_cols { + column_statistics.push(ColumnStatistics::new_unknown()) + } + Ok(Statistics { + num_rows: statistics.num_rows, + column_statistics, + total_byte_size: Precision::Absent, + }) + } } impl DisplayAs for BoundedWindowAggExec { @@ -344,20 +361,17 @@ impl ExecutionPlan for BoundedWindowAggExec { fn statistics(&self) -> Result { let input_stat = self.input.statistics()?; - let win_cols = self.window_expr.len(); - let input_cols = self.input.schema().fields().len(); - // TODO stats: some windowing function will maintain invariants such as min, max... - let mut column_statistics = Vec::with_capacity(win_cols + input_cols); - // copy stats of the input to the beginning of the schema. - column_statistics.extend(input_stat.column_statistics); - for _ in 0..win_cols { - column_statistics.push(ColumnStatistics::new_unknown()) + self.statistics_helper(input_stat) + } + + fn statistics_by_partition(&self) -> Result> { + let input_stats = self.input.statistics_by_partition()?; + let mut output_stats = Vec::with_capacity(input_stats.len()); + for stat in input_stats { + let output_stat = self.statistics_helper(stat.clone())?; + output_stats.push(output_stat); } - Ok(Statistics { - num_rows: input_stat.num_rows, - column_statistics, - total_byte_size: Precision::Absent, - }) + Ok(output_stats) } } From 992baaf556b731711c373ae476de6513dde3d660 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Wed, 2 Apr 2025 11:33:47 +0800 Subject: [PATCH 03/21] save --- .../core/src/datasource/listing/table.rs | 1 + datafusion/datasource/src/statistics.rs | 3 + .../physical-plan/src/joins/cross_join.rs | 22 ++- datafusion/physical-plan/src/lib.rs | 1 + datafusion/physical-plan/src/limit.rs | 16 ++ datafusion/physical-plan/src/projection.rs | 14 ++ .../src/sorts/sort_preserving_merge.rs | 4 + datafusion/physical-plan/src/statistics.rs | 151 ++++++++++++++++++ 8 files changed, 200 insertions(+), 12 deletions(-) create mode 100644 datafusion/physical-plan/src/statistics.rs diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index a9834da92e5a4..1b5ad566549e6 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -57,6 +57,7 @@ use datafusion_common::stats::Precision; use datafusion_datasource::compute_all_files_statistics; use datafusion_datasource::file_groups::FileGroup; use datafusion_physical_expr_common::sort_expr::LexRequirement; +use datafusion_physical_plan::statistics::add_row_stats; use futures::{future, stream, Stream, StreamExt, TryStreamExt}; use itertools::Itertools; use object_store::ObjectStore; diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index 8a04d77b273d4..5b44860091256 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -36,6 +36,9 @@ use datafusion_common::stats::Precision; use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError, Result}; use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_physical_plan::statistics::{ + add_row_stats, compute_summary_statistics, set_max_if_greater, set_min_if_lesser, +}; use datafusion_physical_plan::{ColumnStatistics, Statistics}; /// A normalized representation of file min/max statistics that allows for efficient sorting & comparison. diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index f8e101f61dd61..93fce93fff1d3 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -46,6 +46,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::join_equivalence_properties; +use crate::statistics::compute_summary_statistics; use async_trait::async_trait; use futures::{ready, Stream, StreamExt, TryStreamExt}; @@ -344,28 +345,25 @@ impl ExecutionPlan for CrossJoinExec { } fn statistics_by_partition(&self) -> Result> { - todo!() - /* let left_stats = self.left.statistics_by_partition()?; - // Summarize the `left_stats` - let statistics = - compute_summary_statistics(file_group.iter(), &file_schema, |file| { - file.statistics.as_ref().map(|stats| stats.as_ref()) - }); let right_stats = self.right.statistics_by_partition()?; if left_stats.is_empty() || right_stats.is_empty() { return Ok(vec![]); } + // Summarize the `left_stats` + let statistics = compute_summary_statistics( + left_stats.iter(), + self.schema.fields().len(), + |stats| Some(stats), + ); + let mut stats = Vec::new(); - for left in left_stats.iter() { - for right in right_stats.iter() { - stats.push(stats_cartesian_product(left.clone(), right.clone())); - } + for right in right_stats.iter() { + stats.push(stats_cartesian_product(statistics.clone(), right.clone())); } Ok(stats) - */ } /// Tries to swap the projection with its input [`CrossJoinExec`]. If it can be done, diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index a1862554b303e..4493569ae6ffd 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -92,5 +92,6 @@ pub mod udaf { } pub mod coalesce; +pub mod statistics; #[cfg(test)] pub mod test; diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 78bca3c794413..b66e1c53f2041 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -347,6 +347,22 @@ impl ExecutionPlan for LocalLimitExec { ) } + fn statistics_by_partition(&self) -> Result> { + let input_stats = self.input.statistics_by_partition()?; + let mut stats = Vec::with_capacity(input_stats.len()); + for input_stat in input_stats { + let stat = Statistics::with_fetch( + input_stat, + self.schema(), + Some(self.fetch), + 0, + 1, + )?; + stats.push(stat); + } + Ok(stats) + } + fn fetch(&self) -> Option { Some(self.fetch) } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 72934c74446eb..da67766078de4 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -251,6 +251,20 @@ impl ExecutionPlan for ProjectionExec { )) } + fn statistics_by_partition(&self) -> Result> { + let input_stats = self.input.statistics_by_partition()?; + let mut stats = Vec::with_capacity(input_stats.len()); + for input_stat in input_stats { + let stat = stats_projection( + input_stat.clone(), + self.expr.iter().map(|(e, _)| Arc::clone(e)), + Arc::clone(&self.schema), + ); + stats.push(stat); + } + Ok(stats) + } + fn supports_limit_pushdown(&self) -> bool { true } diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index b987dff36441d..a1cfb3233751f 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -346,6 +346,10 @@ impl ExecutionPlan for SortPreservingMergeExec { self.input.statistics() } + fn statistics_by_partition(&self) -> Result> { + Ok(vec![self.statistics()?]) + } + fn supports_limit_pushdown(&self) -> bool { true } diff --git a/datafusion/physical-plan/src/statistics.rs b/datafusion/physical-plan/src/statistics.rs new file mode 100644 index 0000000000000..3d70d907219d4 --- /dev/null +++ b/datafusion/physical-plan/src/statistics.rs @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the cross join plan for loading the left side of the cross join +//! and producing batches in parallel for the right partitions + +use datafusion_common::stats::Precision; +use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; +use std::mem; + +/// Generic function to compute statistics across multiple items that have statistics +pub fn compute_summary_statistics( + items: I, + column_count: usize, + stats_extractor: impl Fn(&T) -> Option<&Statistics>, +) -> Statistics +where + I: IntoIterator, +{ + let mut col_stats_set = vec![ColumnStatistics::default(); column_count]; + let mut num_rows = Precision::::Absent; + let mut total_byte_size = Precision::::Absent; + + for (idx, item) in items.into_iter().enumerate() { + if let Some(item_stats) = stats_extractor(&item) { + if idx == 0 { + // First item, set values directly + num_rows = item_stats.num_rows; + total_byte_size = item_stats.total_byte_size; + for (index, column_stats) in + item_stats.column_statistics.iter().enumerate() + { + col_stats_set[index].null_count = column_stats.null_count; + col_stats_set[index].max_value = column_stats.max_value.clone(); + col_stats_set[index].min_value = column_stats.min_value.clone(); + col_stats_set[index].sum_value = column_stats.sum_value.clone(); + } + continue; + } + + // Accumulate statistics for subsequent items + num_rows = add_row_stats(item_stats.num_rows, num_rows); + total_byte_size = add_row_stats(item_stats.total_byte_size, total_byte_size); + + for (item_col_stats, col_stats) in item_stats + .column_statistics + .iter() + .zip(col_stats_set.iter_mut()) + { + col_stats.null_count = + add_row_stats(item_col_stats.null_count, col_stats.null_count); + set_max_if_greater(&item_col_stats.max_value, &mut col_stats.max_value); + set_min_if_lesser(&item_col_stats.min_value, &mut col_stats.min_value); + col_stats.sum_value = item_col_stats.sum_value.add(&col_stats.sum_value); + } + } + } + + Statistics { + num_rows, + total_byte_size, + column_statistics: col_stats_set, + } +} + +/// If the given value is numerically greater than the original maximum value, +/// return the new maximum value with appropriate exactness information. +pub fn set_max_if_greater( + max_nominee: &Precision, + max_value: &mut Precision, +) { + match (&max_value, max_nominee) { + (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => { + *max_value = max_nominee.clone(); + } + (Precision::Exact(val1), Precision::Inexact(val2)) + | (Precision::Inexact(val1), Precision::Inexact(val2)) + | (Precision::Inexact(val1), Precision::Exact(val2)) + if val1 < val2 => + { + *max_value = max_nominee.clone().to_inexact(); + } + (Precision::Exact(_), Precision::Absent) => { + let exact_max = mem::take(max_value); + *max_value = exact_max.to_inexact(); + } + (Precision::Absent, Precision::Exact(_)) => { + *max_value = max_nominee.clone().to_inexact(); + } + (Precision::Absent, Precision::Inexact(_)) => { + *max_value = max_nominee.clone(); + } + _ => {} + } +} + +/// If the given value is numerically lesser than the original minimum value, +/// return the new minimum value with appropriate exactness information. +pub fn set_min_if_lesser( + min_nominee: &Precision, + min_value: &mut Precision, +) { + match (&min_value, min_nominee) { + (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => { + *min_value = min_nominee.clone(); + } + (Precision::Exact(val1), Precision::Inexact(val2)) + | (Precision::Inexact(val1), Precision::Inexact(val2)) + | (Precision::Inexact(val1), Precision::Exact(val2)) + if val1 > val2 => + { + *min_value = min_nominee.clone().to_inexact(); + } + (Precision::Exact(_), Precision::Absent) => { + let exact_min = mem::take(min_value); + *min_value = exact_min.to_inexact(); + } + (Precision::Absent, Precision::Exact(_)) => { + *min_value = min_nominee.clone().to_inexact(); + } + (Precision::Absent, Precision::Inexact(_)) => { + *min_value = min_nominee.clone(); + } + _ => {} + } +} + +pub fn add_row_stats( + file_num_rows: Precision, + num_rows: Precision, +) -> Precision { + match (file_num_rows, &num_rows) { + (Precision::Absent, _) => num_rows.to_inexact(), + (lhs, Precision::Absent) => lhs.to_inexact(), + (lhs, rhs) => lhs.add(rhs), + } +} From 5854095068c66341e63740ecf65dfb216b30e185 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Wed, 2 Apr 2025 11:48:38 +0800 Subject: [PATCH 04/21] functional way --- datafusion/physical-plan/src/filter.rs | 25 ++++++++++--------- .../physical-plan/src/joins/cross_join.rs | 11 ++++---- datafusion/physical-plan/src/limit.rs | 20 ++++++--------- datafusion/physical-plan/src/projection.rs | 23 +++++++++-------- .../physical-plan/src/repartition/mod.rs | 4 --- datafusion/physical-plan/src/sorts/sort.rs | 17 ++++--------- .../src/windows/bounded_window_agg_exec.rs | 12 ++++----- 7 files changed, 47 insertions(+), 65 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index f179caddc0785..8378974be5d9b 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -414,18 +414,19 @@ impl ExecutionPlan for FilterExec { } fn statistics_by_partition(&self) -> Result> { - let input_stats = self.input.statistics_by_partition()?; - let mut stats = Vec::with_capacity(input_stats.len()); - for input_stat in input_stats { - let stat = Self::statistics_helper( - self.schema(), - input_stat, - self.predicate(), - self.default_selectivity, - )?; - stats.push(stat.project(self.projection.as_ref())); - } - Ok(stats) + self.input + .statistics_by_partition()? + .into_iter() + .map(|input_stat| { + Self::statistics_helper( + self.schema(), + input_stat, + self.predicate(), + self.default_selectivity, + ) + .map(|stat| stat.project(self.projection.as_ref())) + }) + .collect() } fn cardinality_effect(&self) -> CardinalityEffect { diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 93fce93fff1d3..22f67d5baf627 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -354,16 +354,15 @@ impl ExecutionPlan for CrossJoinExec { // Summarize the `left_stats` let statistics = compute_summary_statistics( - left_stats.iter(), + left_stats.into_iter(), self.schema.fields().len(), |stats| Some(stats), ); - let mut stats = Vec::new(); - for right in right_stats.iter() { - stats.push(stats_cartesian_product(statistics.clone(), right.clone())); - } - Ok(stats) + Ok(right_stats + .into_iter() + .map(|right| stats_cartesian_product(statistics.clone(), right)) + .collect()) } /// Tries to swap the projection with its input [`CrossJoinExec`]. If it can be done, diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index b66e1c53f2041..436b937f7086b 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -348,19 +348,13 @@ impl ExecutionPlan for LocalLimitExec { } fn statistics_by_partition(&self) -> Result> { - let input_stats = self.input.statistics_by_partition()?; - let mut stats = Vec::with_capacity(input_stats.len()); - for input_stat in input_stats { - let stat = Statistics::with_fetch( - input_stat, - self.schema(), - Some(self.fetch), - 0, - 1, - )?; - stats.push(stat); - } - Ok(stats) + self.input + .statistics_by_partition()? + .into_iter() + .map(|input_stat| { + Statistics::with_fetch(input_stat, self.schema(), Some(self.fetch), 0, 1) + }) + .collect() } fn fetch(&self) -> Option { diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index da67766078de4..629d9a2802af7 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -252,17 +252,18 @@ impl ExecutionPlan for ProjectionExec { } fn statistics_by_partition(&self) -> Result> { - let input_stats = self.input.statistics_by_partition()?; - let mut stats = Vec::with_capacity(input_stats.len()); - for input_stat in input_stats { - let stat = stats_projection( - input_stat.clone(), - self.expr.iter().map(|(e, _)| Arc::clone(e)), - Arc::clone(&self.schema), - ); - stats.push(stat); - } - Ok(stats) + Ok(self + .input + .statistics_by_partition()? + .into_iter() + .map(|input_stats| { + stats_projection( + input_stats, + self.expr.iter().map(|(e, _)| Arc::clone(e)), + Arc::clone(&self.schema), + ) + }) + .collect()) } fn supports_limit_pushdown(&self) -> bool { diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 2bbaddd8fca07..c480fc2abaa1a 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -691,10 +691,6 @@ impl ExecutionPlan for RepartitionExec { self.input.statistics() } - fn statistics_by_partition(&self) -> Result> { - todo!() - } - fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Equal } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 1334f3b091ecb..bc729b9c18d16 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1301,18 +1301,11 @@ impl ExecutionPlan for SortExec { } fn statistics_by_partition(&self) -> Result> { - let input_stats = self.input.statistics_by_partition()?; - let mut stats = Vec::with_capacity(input_stats.len()); - for stat in input_stats { - stats.push(Statistics::with_fetch( - stat, - self.schema(), - self.fetch, - 0, - 1, - )?); - } - Ok(stats) + self.input + .statistics_by_partition()? + .into_iter() + .map(|stat| Statistics::with_fetch(stat, self.schema(), self.fetch, 0, 1)) + .collect() } fn with_fetch(&self, limit: Option) -> Option> { diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index af6c5f29653ee..139b1e3414a7e 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -365,13 +365,11 @@ impl ExecutionPlan for BoundedWindowAggExec { } fn statistics_by_partition(&self) -> Result> { - let input_stats = self.input.statistics_by_partition()?; - let mut output_stats = Vec::with_capacity(input_stats.len()); - for stat in input_stats { - let output_stat = self.statistics_helper(stat.clone())?; - output_stats.push(output_stat); - } - Ok(output_stats) + self.input + .statistics_by_partition()? + .into_iter() + .map(|stat| self.statistics_helper(stat)) + .collect() } } From 2fdb663b9df8e70ecf35cfe4c24f484733e94f4e Mon Sep 17 00:00:00 2001 From: xudong963 Date: Wed, 2 Apr 2025 13:00:56 +0800 Subject: [PATCH 05/21] fix sort --- datafusion/physical-plan/src/joins/cross_join.rs | 9 ++++----- datafusion/physical-plan/src/sorts/sort.rs | 3 +++ 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 22f67d5baf627..3ad5276a547c6 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -353,11 +353,10 @@ impl ExecutionPlan for CrossJoinExec { } // Summarize the `left_stats` - let statistics = compute_summary_statistics( - left_stats.into_iter(), - self.schema.fields().len(), - |stats| Some(stats), - ); + let statistics = + compute_summary_statistics(left_stats, self.schema.fields().len(), |stats| { + Some(stats) + }); Ok(right_stats .into_iter() diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index bc729b9c18d16..e6f52496d3110 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1301,6 +1301,9 @@ impl ExecutionPlan for SortExec { } fn statistics_by_partition(&self) -> Result> { + if !self.preserve_partitioning() { + return Ok(vec![self.statistics()?]); + } self.input .statistics_by_partition()? .into_iter() From b3d16e67196481affece34906b8ebeba7a0800f5 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Wed, 2 Apr 2025 16:45:50 +0800 Subject: [PATCH 06/21] adding test --- .../core/tests/physical_optimizer/mod.rs | 1 + .../partition_statistics.rs | 115 ++++++++++++++++++ 2 files changed, 116 insertions(+) create mode 100644 datafusion/core/tests/physical_optimizer/partition_statistics.rs diff --git a/datafusion/core/tests/physical_optimizer/mod.rs b/datafusion/core/tests/physical_optimizer/mod.rs index 6643e7fd59b7a..43cec6d00d85c 100644 --- a/datafusion/core/tests/physical_optimizer/mod.rs +++ b/datafusion/core/tests/physical_optimizer/mod.rs @@ -29,3 +29,4 @@ mod push_down_filter; mod replace_with_order_preserving_variants; mod sanity_checker; mod test_utils; +mod partition_statistics; diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs new file mode 100644 index 0000000000000..8440976cbc68b --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[cfg(test)] +mod test { + use std::sync::Arc; + use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig}; + use datafusion::execution::SessionStateBuilder; + use datafusion::prelude::SessionContext; + use datafusion_catalog::TableProvider; + use datafusion_common::config::ConfigOptions; + use datafusion_datasource::ListingTableUrl; + use datafusion_datasource::source::DataSourceExec; + use datafusion_datasource_parquet::ParquetFormat; + use datafusion_execution::config::SessionConfig; + use datafusion_execution::runtime_env::RuntimeEnvBuilder; + use datafusion_physical_plan::ExecutionPlan; + + async fn generate_listing_table_with_statistics() -> Arc { + let testdata = datafusion::test_util::parquet_test_data(); + let filename = format!("{}/{}", testdata, "alltypes_tiny_pages.parquet"); + let table_path = ListingTableUrl::parse(filename).unwrap(); + let opt = ListingOptions::new(Arc::new(ParquetFormat::default())).with_collect_stat(true); + let rt = RuntimeEnvBuilder::new() + .build_arc() + .expect("could not build runtime environment"); + + let state = SessionContext::new_with_config_rt(SessionConfig::default(), rt).state(); + let schema = opt + .infer_schema( + &SessionStateBuilder::new().with_default_features().build(), + &table_path, + ) + .await + .unwrap(); + let config = ListingTableConfig::new(table_path.clone()) + .with_listing_options(opt.clone()) + .with_schema(schema); + let table = ListingTable::try_new(config).unwrap(); + let res= table.scan(&state, None, &[], None).await.unwrap(); + dbg!(&res.statistics().unwrap()); + dbg!(&res.statistics_by_partition().unwrap()); + let mut config = ConfigOptions::new(); + config.set("datafusion.optimizer.repartition_file_min_size", "10").unwrap(); + let res = res.repartitioned(5, &config).unwrap().unwrap(); + dbg!(&res.statistics_by_partition().unwrap()); + res + } + + #[tokio::test] + async fn test_statistics_by_partition_of_data_source() -> datafusion_common::Result<()> { + generate_listing_table_with_statistics().await; + Ok(()) + } + + #[test] + fn test_statistics_by_partition_of_projection() -> datafusion_common::Result<()> { + Ok(()) + } + + #[test] + fn test_statistics_by_partition_of_sort() -> datafusion_common::Result<()> { + Ok(()) + } + + #[test] + fn test_statistics_by_partition_of_filter() -> datafusion_common::Result<()> { + Ok(()) + } + + #[test] + fn test_statistics_by_partition_of_aggregate() -> datafusion_common::Result<()> { + Ok(()) + } + + #[test] + fn test_statistic_by_partition_of_cross_join() -> datafusion_common::Result<()> { + Ok(()) + } + + #[test] + fn test_statistic_by_partition_of_union() -> datafusion_common::Result<()> { + Ok(()) + } + + #[test] + fn test_statistic_by_partition_of_smp() -> datafusion_common::Result<()> { + Ok(()) + } + + #[test] + fn test_statistic_by_partition_of_limit() -> datafusion_common::Result<()> { + Ok(()) + } + + #[test] + fn test_statistic_by_partition_of_coalesce() -> datafusion_common::Result<()> { + Ok(()) + } + +} \ No newline at end of file From 8a8f3508472cae60810aaa9cb95d46aaf333d5cc Mon Sep 17 00:00:00 2001 From: xudong963 Date: Thu, 3 Apr 2025 15:56:04 +0800 Subject: [PATCH 07/21] add tests --- .../core/src/datasource/listing/table.rs | 2 +- .../core/tests/physical_optimizer/mod.rs | 2 +- .../partition_statistics.rs | 347 ++++++++++++++---- 3 files changed, 282 insertions(+), 69 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 1b5ad566549e6..7082589ffffba 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -713,7 +713,7 @@ impl ListingOptions { /// # Ok(()) /// # } /// ``` -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ListingTable { table_paths: Vec, /// `file_schema` contains only the columns physically stored in the data files themselves. diff --git a/datafusion/core/tests/physical_optimizer/mod.rs b/datafusion/core/tests/physical_optimizer/mod.rs index 43cec6d00d85c..c115a5253adcc 100644 --- a/datafusion/core/tests/physical_optimizer/mod.rs +++ b/datafusion/core/tests/physical_optimizer/mod.rs @@ -24,9 +24,9 @@ mod enforce_sorting; mod join_selection; mod limit_pushdown; mod limited_distinct_aggregation; +mod partition_statistics; mod projection_pushdown; mod push_down_filter; mod replace_with_order_preserving_variants; mod sanity_checker; mod test_utils; -mod partition_statistics; diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 8440976cbc68b..f794a086764b8 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -17,99 +17,312 @@ #[cfg(test)] mod test { - use std::sync::Arc; - use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig}; - use datafusion::execution::SessionStateBuilder; + use arrow_schema::{DataType, Field, Schema, SortOptions}; + use datafusion::datasource::listing::ListingTable; use datafusion::prelude::SessionContext; use datafusion_catalog::TableProvider; - use datafusion_common::config::ConfigOptions; - use datafusion_datasource::ListingTableUrl; - use datafusion_datasource::source::DataSourceExec; - use datafusion_datasource_parquet::ParquetFormat; + use datafusion_common::stats::Precision; + use datafusion_common::{ScalarValue, Statistics}; use datafusion_execution::config::SessionConfig; - use datafusion_execution::runtime_env::RuntimeEnvBuilder; + use datafusion_expr_common::operator::Operator; + use datafusion_physical_expr::expressions::{binary, lit, Column}; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; + use datafusion_physical_plan::filter::FilterExec; + use datafusion_physical_plan::projection::ProjectionExec; + use datafusion_physical_plan::sorts::sort::SortExec; + use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::ExecutionPlan; + use std::sync::Arc; - async fn generate_listing_table_with_statistics() -> Arc { - let testdata = datafusion::test_util::parquet_test_data(); - let filename = format!("{}/{}", testdata, "alltypes_tiny_pages.parquet"); - let table_path = ListingTableUrl::parse(filename).unwrap(); - let opt = ListingOptions::new(Arc::new(ParquetFormat::default())).with_collect_stat(true); - let rt = RuntimeEnvBuilder::new() - .build_arc() - .expect("could not build runtime environment"); - - let state = SessionContext::new_with_config_rt(SessionConfig::default(), rt).state(); - let schema = opt - .infer_schema( - &SessionStateBuilder::new().with_default_features().build(), - &table_path, - ) + async fn generate_listing_table_with_statistics( + target_partition: Option, + ) -> Arc { + // Delete the existing data directory if it exists + let data_dir = "./data/"; + let _ = std::fs::remove_dir_all(data_dir); + let mut session_config = SessionConfig::new().with_collect_statistics(true); + if let Some(partition) = target_partition { + session_config = session_config.with_target_partitions(partition); + } + let ctx = SessionContext::new_with_config(session_config); + // Create table with partition + let create_table_sql = "CREATE EXTERNAL TABLE t1 (id INT not null, date DATE) STORED AS PARQUET LOCATION './data/' PARTITIONED BY (date) WITH ORDER (id ASC);"; + ctx.sql(create_table_sql) + .await + .unwrap() + .collect() .await .unwrap(); - let config = ListingTableConfig::new(table_path.clone()) - .with_listing_options(opt.clone()) - .with_schema(schema); - let table = ListingTable::try_new(config).unwrap(); - let res= table.scan(&state, None, &[], None).await.unwrap(); - dbg!(&res.statistics().unwrap()); - dbg!(&res.statistics_by_partition().unwrap()); - let mut config = ConfigOptions::new(); - config.set("datafusion.optimizer.repartition_file_min_size", "10").unwrap(); - let res = res.repartitioned(5, &config).unwrap().unwrap(); - dbg!(&res.statistics_by_partition().unwrap()); - res + // Insert data into the table, will generate partition files with parquet format + let insert_data = "INSERT INTO t1 VALUES (4, '2025-03-01'), (3, '2025-3-02'), (2, '2025-03-03'), (1, '2025-03-04');"; + ctx.sql(insert_data).await.unwrap().collect().await.unwrap(); + let table = ctx.table_provider("t1").await.unwrap(); + let listing_table = table + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + listing_table + .scan(&ctx.state(), None, &[], None) + .await + .unwrap() } - #[tokio::test] - async fn test_statistics_by_partition_of_data_source() -> datafusion_common::Result<()> { - generate_listing_table_with_statistics().await; - Ok(()) - } + fn check_unchanged_statistics(statistics: Vec) { + // Check the statistics of each partition + for stat in &statistics { + assert_eq!(stat.num_rows, Precision::Exact(1)); + // First column (id) should have non-null values + assert_eq!(stat.column_statistics[0].null_count, Precision::Exact(0)); + } - #[test] - fn test_statistics_by_partition_of_projection() -> datafusion_common::Result<()> { - Ok(()) + // Verify specific id values for each partition + assert_eq!( + statistics[0].column_statistics[0].max_value, + Precision::Exact(ScalarValue::Int32(Some(4))) + ); + assert_eq!( + statistics[1].column_statistics[0].max_value, + Precision::Exact(ScalarValue::Int32(Some(3))) + ); + assert_eq!( + statistics[2].column_statistics[0].max_value, + Precision::Exact(ScalarValue::Int32(Some(2))) + ); + assert_eq!( + statistics[3].column_statistics[0].max_value, + Precision::Exact(ScalarValue::Int32(Some(1))) + ); } - #[test] - fn test_statistics_by_partition_of_sort() -> datafusion_common::Result<()> { + #[tokio::test] + async fn test_statistics_by_partition_of_data_source() -> datafusion_common::Result<()> + { + let scan = generate_listing_table_with_statistics(None).await; + let statistics = scan.statistics_by_partition()?; + // Check the statistics of each partition + assert_eq!(statistics.len(), 4); + for stat in &statistics { + assert_eq!(stat.column_statistics.len(), 2); + assert_eq!(stat.total_byte_size, Precision::Exact(55)); + } + check_unchanged_statistics(statistics); Ok(()) } - #[test] - fn test_statistics_by_partition_of_filter() -> datafusion_common::Result<()> { + #[tokio::test] + async fn test_statistics_by_partition_of_projection() -> datafusion_common::Result<()> + { + let scan = generate_listing_table_with_statistics(None).await; + // Add projection execution plan + let exprs: Vec<(Arc, String)> = + vec![(Arc::new(Column::new("id", 0)), "id".to_string())]; + let projection = ProjectionExec::try_new(exprs, scan)?; + let statistics = projection.statistics_by_partition()?; + for stat in &statistics { + assert_eq!(stat.column_statistics.len(), 1); + assert_eq!(stat.total_byte_size, Precision::Exact(4)); + } + check_unchanged_statistics(statistics); Ok(()) } - #[test] - fn test_statistics_by_partition_of_aggregate() -> datafusion_common::Result<()> { + #[tokio::test] + async fn test_statistics_by_partition_of_sort() -> datafusion_common::Result<()> { + let scan = generate_listing_table_with_statistics(Some(2)).await; + // Add sort execution plan + let sort = SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr { + expr: Arc::new(Column::new("id", 0)), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }]), + scan, + ); + let mut sort_exec = Arc::new(sort.clone()); + let statistics = sort_exec.statistics_by_partition()?; + assert_eq!(statistics.len(), 1); + assert_eq!(statistics[0].num_rows, Precision::Exact(4)); + assert_eq!(statistics[0].column_statistics.len(), 2); + assert_eq!(statistics[0].total_byte_size, Precision::Exact(220)); + assert_eq!( + statistics[0].column_statistics[0].null_count, + Precision::Exact(0) + ); + assert_eq!( + statistics[0].column_statistics[0].max_value, + Precision::Exact(ScalarValue::Int32(Some(4))) + ); + assert_eq!( + statistics[0].column_statistics[0].min_value, + Precision::Exact(ScalarValue::Int32(Some(1))) + ); + sort_exec = Arc::new(sort.with_preserve_partitioning(true)); + let statistics = sort_exec.statistics_by_partition()?; + dbg!(&statistics); + assert_eq!(statistics.len(), 2); + assert_eq!(statistics[0].num_rows, Precision::Exact(2)); + assert_eq!(statistics[1].num_rows, Precision::Exact(2)); + assert_eq!(statistics[0].column_statistics.len(), 2); + assert_eq!(statistics[1].column_statistics.len(), 2); + assert_eq!(statistics[0].total_byte_size, Precision::Exact(110)); + assert_eq!(statistics[1].total_byte_size, Precision::Exact(110)); + assert_eq!( + statistics[0].column_statistics[0].null_count, + Precision::Exact(0) + ); + assert_eq!( + statistics[0].column_statistics[0].max_value, + Precision::Exact(ScalarValue::Int32(Some(4))) + ); + assert_eq!( + statistics[0].column_statistics[0].min_value, + Precision::Exact(ScalarValue::Int32(Some(3))) + ); + assert_eq!( + statistics[1].column_statistics[0].null_count, + Precision::Exact(0) + ); + assert_eq!( + statistics[1].column_statistics[0].max_value, + Precision::Exact(ScalarValue::Int32(Some(2))) + ); + assert_eq!( + statistics[1].column_statistics[0].min_value, + Precision::Exact(ScalarValue::Int32(Some(1))) + ); Ok(()) } - #[test] - fn test_statistic_by_partition_of_cross_join() -> datafusion_common::Result<()> { + #[tokio::test] + async fn test_statistics_by_partition_of_filter() -> datafusion_common::Result<()> { + let scan = generate_listing_table_with_statistics(None).await; + let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); + let predicate = binary( + Arc::new(Column::new("id", 0)), + Operator::Lt, + lit(1i32), + &schema, + )?; + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, scan)?); + let _full_statistics = filter.statistics()?; + // The full statistics is invalid, at least, we can improve the selectivity estimation of the filter + /* + Statistics { + num_rows: Inexact(0), + total_byte_size: Inexact(0), + column_statistics: [ + ColumnStatistics { + null_count: Exact(0), + max_value: Exact(NULL), + min_value: Exact(NULL), + sum_value: Exact(NULL), + distinct_count: Exact(0), + }, + ColumnStatistics { + null_count: Exact(0), + max_value: Exact(NULL), + min_value: Exact(NULL), + sum_value: Exact(NULL), + distinct_count: Exact(0), + }, + ], + } + */ + let statistics = filter.statistics_by_partition()?; + // Also the statistics of each partition is also invalid due to above + // But we can ensure the current behavior by tests + assert_eq!(statistics.len(), 4); + for stat in &statistics { + assert_eq!(stat.column_statistics.len(), 2); + assert_eq!(stat.total_byte_size, Precision::Inexact(0)); + assert_eq!(stat.num_rows, Precision::Inexact(0)); + assert_eq!(stat.column_statistics[0].null_count, Precision::Exact(0)); + assert_eq!( + stat.column_statistics[0].max_value, + Precision::Exact(ScalarValue::Null) + ); + assert_eq!( + stat.column_statistics[0].min_value, + Precision::Exact(ScalarValue::Null) + ); + } Ok(()) } - #[test] - fn test_statistic_by_partition_of_union() -> datafusion_common::Result<()> { - Ok(()) - } + #[tokio::test] + async fn test_statistic_by_partition_of_union() -> datafusion_common::Result<()> { + let scan = generate_listing_table_with_statistics(Some(2)).await; + let union_exec = Arc::new(UnionExec::new(vec![scan.clone(), scan])); + let statistics = union_exec.statistics_by_partition()?; + // Check that we have 4 partitions (2 from each scan) + assert_eq!(statistics.len(), 4); - #[test] - fn test_statistic_by_partition_of_smp() -> datafusion_common::Result<()> { - Ok(()) - } + // Verify first partition (from first scan) + assert_eq!(statistics[0].num_rows, Precision::Exact(2)); + assert_eq!(statistics[0].total_byte_size, Precision::Exact(110)); + assert_eq!(statistics[0].column_statistics.len(), 2); + assert_eq!( + statistics[0].column_statistics[0].null_count, + Precision::Exact(0) + ); + assert_eq!( + statistics[0].column_statistics[0].max_value, + Precision::Exact(ScalarValue::Int32(Some(4))) + ); + assert_eq!( + statistics[0].column_statistics[0].min_value, + Precision::Exact(ScalarValue::Int32(Some(3))) + ); - #[test] - fn test_statistic_by_partition_of_limit() -> datafusion_common::Result<()> { - Ok(()) - } + // Verify second partition (from first scan) + assert_eq!(statistics[1].num_rows, Precision::Exact(2)); + assert_eq!(statistics[1].total_byte_size, Precision::Exact(110)); + assert_eq!( + statistics[1].column_statistics[0].null_count, + Precision::Exact(0) + ); + assert_eq!( + statistics[1].column_statistics[0].max_value, + Precision::Exact(ScalarValue::Int32(Some(2))) + ); + assert_eq!( + statistics[1].column_statistics[0].min_value, + Precision::Exact(ScalarValue::Int32(Some(1))) + ); + + // Verify third partition (from second scan - same as first partition) + assert_eq!(statistics[2].num_rows, Precision::Exact(2)); + assert_eq!(statistics[2].total_byte_size, Precision::Exact(110)); + assert_eq!( + statistics[2].column_statistics[0].max_value, + Precision::Exact(ScalarValue::Int32(Some(4))) + ); + assert_eq!( + statistics[2].column_statistics[0].min_value, + Precision::Exact(ScalarValue::Int32(Some(3))) + ); + + // Verify fourth partition (from second scan - same as second partition) + assert_eq!(statistics[3].num_rows, Precision::Exact(2)); + assert_eq!(statistics[3].total_byte_size, Precision::Exact(110)); + assert_eq!( + statistics[3].column_statistics[0].max_value, + Precision::Exact(ScalarValue::Int32(Some(2))) + ); + assert_eq!( + statistics[3].column_statistics[0].min_value, + Precision::Exact(ScalarValue::Int32(Some(1))) + ); + + // Delete the existing data directory if it exists + let data_dir = "./data/"; + let _ = std::fs::remove_dir_all(data_dir); - #[test] - fn test_statistic_by_partition_of_coalesce() -> datafusion_common::Result<()> { Ok(()) } - -} \ No newline at end of file +} From 491046a4d5b1db6e1d6b30b39aee02e802ee629e Mon Sep 17 00:00:00 2001 From: xudong963 Date: Thu, 3 Apr 2025 16:07:43 +0800 Subject: [PATCH 08/21] save --- .../date=2025-03-01/j5fUeSDQo22oPyPU.parquet | Bin 0 -> 461 bytes .../date=2025-03-02/j5fUeSDQo22oPyPU.parquet | Bin 0 -> 461 bytes .../date=2025-03-03/j5fUeSDQo22oPyPU.parquet | Bin 0 -> 461 bytes .../date=2025-03-04/j5fUeSDQo22oPyPU.parquet | Bin 0 -> 461 bytes .../physical_optimizer/partition_statistics.rs | 13 +------------ 5 files changed, 1 insertion(+), 12 deletions(-) create mode 100644 datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet create mode 100644 datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet create mode 100644 datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet create mode 100644 datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet diff --git a/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet b/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet new file mode 100644 index 0000000000000000000000000000000000000000..ec164c6df7b5e0b73774c8e0e9f4ddb1cb84bea5 GIT binary patch literal 461 zcmZWm%Syvg5S?5~$RdkII+t9z3tbqxD6K{9La;E3XnkNTlqv`k+FFa+LaOFJT>1@e z{Re+aa3+bka299goHLJ05>vm874&fR3>Cr(K>f4z4Oi;`6#zhBQ#aVEqKmHWK)oV* zm}3xdS@cV#OrT8RlnIs50LJPr-f#iT8^?Z0qKlG{JzZmWoH^UF7C9yRn zj!OZKQ=*IxrgS0_XQKUFXUd$YCyb@v%xVg w71p1h?!4P;v1ul=(IS>9=fTVcys%b&IH0Cf3B{s11@e z{Re+aa3+bka299goHLJ05>vl}6|Caw87hPofaYiW8?M#?6aXNwsatH-(M8vGp;-~# z&(ZU^Ec&HV##5$n%7n^j0AqCzZ@2*Ft>apfri<)$Il7-bC8${30AJ0D1QxqfqkUQo5g_5$_4x9Bei8~WYB5O_odRUT`|eZNNi{>5;&xaog5ZHcWZ zaa;;;oDyYpFr^cbI1}yXI#cFEkua8iGqXdH1_E3S%Ko@q8I}B6{Cr3XibjIG?>xVg w71p1h?Am$(W11gqR-2eap literal 0 HcmV?d00001 diff --git a/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet b/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet new file mode 100644 index 0000000000000000000000000000000000000000..6398cc43a2f5dd659c2054929eb7a3600ccfb678 GIT binary patch literal 461 zcmZWm%Syvg5S?5~=%R~8I+t9z3tbqxD5*v5La;E3XnbHSlqv`k+FFa+LaOFJT>1@e z{Re+aa3+bka299goHLJ*8&ki76|Caw87hPofaYiW8?M#?8~{LIQ@7Zvql>QXLbD>e zpQGn-S@cV#jHgWDlnIs50LJPb-f#iTTgSB|O&8hia&$j=N>H)50lu1zK`Ud84Zw2S z4!CtAD%P~pbN%K7y`XA~?FH(8-=e=9Z0L6fL*Nk=RC%l+_x&36`xnFA;->%Mv?aEt z#BnLWaY~fY!IVx!;!L!k>r9ywMZ#G6&CCu(8VGPTDEs4bWmNKO@$(@mC>ja!zVrM} wR#<<2zLQAqA3Z)W?tyQ{$#Zh|vYajEYp;_u&)eN_Gwc{^`L_=FNn(D;A0B~M)Bpeg literal 0 HcmV?d00001 diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index f794a086764b8..d633c62899915 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -38,25 +38,19 @@ mod test { async fn generate_listing_table_with_statistics( target_partition: Option, ) -> Arc { - // Delete the existing data directory if it exists - let data_dir = "./data/"; - let _ = std::fs::remove_dir_all(data_dir); let mut session_config = SessionConfig::new().with_collect_statistics(true); if let Some(partition) = target_partition { session_config = session_config.with_target_partitions(partition); } let ctx = SessionContext::new_with_config(session_config); // Create table with partition - let create_table_sql = "CREATE EXTERNAL TABLE t1 (id INT not null, date DATE) STORED AS PARQUET LOCATION './data/' PARTITIONED BY (date) WITH ORDER (id ASC);"; + let create_table_sql = "CREATE EXTERNAL TABLE t1 (id INT not null, date DATE) STORED AS PARQUET LOCATION './tests/data/test_statistics_per_partition' PARTITIONED BY (date) WITH ORDER (id ASC);"; ctx.sql(create_table_sql) .await .unwrap() .collect() .await .unwrap(); - // Insert data into the table, will generate partition files with parquet format - let insert_data = "INSERT INTO t1 VALUES (4, '2025-03-01'), (3, '2025-3-02'), (2, '2025-03-03'), (1, '2025-03-04');"; - ctx.sql(insert_data).await.unwrap().collect().await.unwrap(); let table = ctx.table_provider("t1").await.unwrap(); let listing_table = table .as_any() @@ -162,7 +156,6 @@ mod test { ); sort_exec = Arc::new(sort.with_preserve_partitioning(true)); let statistics = sort_exec.statistics_by_partition()?; - dbg!(&statistics); assert_eq!(statistics.len(), 2); assert_eq!(statistics[0].num_rows, Precision::Exact(2)); assert_eq!(statistics[1].num_rows, Precision::Exact(2)); @@ -319,10 +312,6 @@ mod test { Precision::Exact(ScalarValue::Int32(Some(1))) ); - // Delete the existing data directory if it exists - let data_dir = "./data/"; - let _ = std::fs::remove_dir_all(data_dir); - Ok(()) } } From 9d1beb9f1630404822b5828f1a7bb3ea8418dc5b Mon Sep 17 00:00:00 2001 From: xudong963 Date: Thu, 3 Apr 2025 16:20:54 +0800 Subject: [PATCH 09/21] update --- .../partition_statistics.rs | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index d633c62899915..b98c499e1b240 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -66,7 +66,7 @@ mod test { fn check_unchanged_statistics(statistics: Vec) { // Check the statistics of each partition for stat in &statistics { - assert_eq!(stat.num_rows, Precision::Exact(1)); + assert_eq!(stat.num_rows, Precision::Exact(2)); // First column (id) should have non-null values assert_eq!(stat.column_statistics[0].null_count, Precision::Exact(0)); } @@ -77,15 +77,15 @@ mod test { Precision::Exact(ScalarValue::Int32(Some(4))) ); assert_eq!( - statistics[1].column_statistics[0].max_value, + statistics[0].column_statistics[0].min_value, Precision::Exact(ScalarValue::Int32(Some(3))) ); assert_eq!( - statistics[2].column_statistics[0].max_value, + statistics[1].column_statistics[0].max_value, Precision::Exact(ScalarValue::Int32(Some(2))) ); assert_eq!( - statistics[3].column_statistics[0].max_value, + statistics[1].column_statistics[0].min_value, Precision::Exact(ScalarValue::Int32(Some(1))) ); } @@ -93,13 +93,13 @@ mod test { #[tokio::test] async fn test_statistics_by_partition_of_data_source() -> datafusion_common::Result<()> { - let scan = generate_listing_table_with_statistics(None).await; + let scan = generate_listing_table_with_statistics(Some(2)).await; let statistics = scan.statistics_by_partition()?; // Check the statistics of each partition - assert_eq!(statistics.len(), 4); + assert_eq!(statistics.len(), 2); for stat in &statistics { assert_eq!(stat.column_statistics.len(), 2); - assert_eq!(stat.total_byte_size, Precision::Exact(55)); + assert_eq!(stat.total_byte_size, Precision::Exact(110)); } check_unchanged_statistics(statistics); Ok(()) @@ -108,7 +108,7 @@ mod test { #[tokio::test] async fn test_statistics_by_partition_of_projection() -> datafusion_common::Result<()> { - let scan = generate_listing_table_with_statistics(None).await; + let scan = generate_listing_table_with_statistics(Some(2)).await; // Add projection execution plan let exprs: Vec<(Arc, String)> = vec![(Arc::new(Column::new("id", 0)), "id".to_string())]; @@ -116,7 +116,7 @@ mod test { let statistics = projection.statistics_by_partition()?; for stat in &statistics { assert_eq!(stat.column_statistics.len(), 1); - assert_eq!(stat.total_byte_size, Precision::Exact(4)); + assert_eq!(stat.total_byte_size, Precision::Exact(8)); } check_unchanged_statistics(statistics); Ok(()) @@ -192,7 +192,7 @@ mod test { #[tokio::test] async fn test_statistics_by_partition_of_filter() -> datafusion_common::Result<()> { - let scan = generate_listing_table_with_statistics(None).await; + let scan = generate_listing_table_with_statistics(Some(2)).await; let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); let predicate = binary( Arc::new(Column::new("id", 0)), @@ -229,7 +229,7 @@ mod test { let statistics = filter.statistics_by_partition()?; // Also the statistics of each partition is also invalid due to above // But we can ensure the current behavior by tests - assert_eq!(statistics.len(), 4); + assert_eq!(statistics.len(), 2); for stat in &statistics { assert_eq!(stat.column_statistics.len(), 2); assert_eq!(stat.total_byte_size, Precision::Inexact(0)); From fc73cd1d7f10f9869626a1c9f75194aff2870b2f Mon Sep 17 00:00:00 2001 From: xudong963 Date: Thu, 3 Apr 2025 22:54:23 +0800 Subject: [PATCH 10/21] add PartitionedStatistics structure --- datafusion/physical-plan/src/statistics.rs | 45 ++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/datafusion/physical-plan/src/statistics.rs b/datafusion/physical-plan/src/statistics.rs index 3d70d907219d4..3e5c1006a84a7 100644 --- a/datafusion/physical-plan/src/statistics.rs +++ b/datafusion/physical-plan/src/statistics.rs @@ -22,6 +22,51 @@ use datafusion_common::stats::Precision; use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; use std::mem; +/// Represents statistics data grouped by partition. +/// +/// This structure maintains a collection of statistics, one for each partition +/// of a distributed dataset, allowing access to statistics by partition index. +#[derive(Debug, Clone)] +pub struct PartitionedStatistics { + inner: Vec, +} + +impl PartitionedStatistics { + /// Creates a new PartitionedStatistics instance from a vector of Statistics. + pub fn new(statistics: Vec) -> Self { + Self { inner: statistics } + } + + /// Returns the number of partitions. + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Returns true if there are no partitions. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Returns the statistics for the specified partition. + /// + /// # Panics + /// + /// Panics if `partition_idx` is out of bounds. + pub fn statistics(&self, partition_idx: usize) -> &Statistics { + &self.inner[partition_idx] + } + + /// Returns the statistics for the specified partition, or None if the index is out of bounds. + pub fn get_statistics(&self, partition_idx: usize) -> Option<&Statistics> { + self.inner.get(partition_idx) + } + + /// Returns an iterator over the statistics. + pub fn iter(&self) -> impl Iterator { + self.inner.iter() + } +} + /// Generic function to compute statistics across multiple items that have statistics pub fn compute_summary_statistics( items: I, From fd73b97d55157d8f130043dec9c0b90bdc1d0ef1 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Thu, 3 Apr 2025 23:08:05 +0800 Subject: [PATCH 11/21] use Arc --- datafusion/physical-plan/src/statistics.rs | 27 ++++------------------ 1 file changed, 5 insertions(+), 22 deletions(-) diff --git a/datafusion/physical-plan/src/statistics.rs b/datafusion/physical-plan/src/statistics.rs index 3e5c1006a84a7..0d8f15f0947df 100644 --- a/datafusion/physical-plan/src/statistics.rs +++ b/datafusion/physical-plan/src/statistics.rs @@ -21,6 +21,7 @@ use datafusion_common::stats::Precision; use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; use std::mem; +use std::sync::Arc; /// Represents statistics data grouped by partition. /// @@ -28,42 +29,24 @@ use std::mem; /// of a distributed dataset, allowing access to statistics by partition index. #[derive(Debug, Clone)] pub struct PartitionedStatistics { - inner: Vec, + inner: Vec>, } impl PartitionedStatistics { - /// Creates a new PartitionedStatistics instance from a vector of Statistics. - pub fn new(statistics: Vec) -> Self { + pub fn new(statistics: Vec>) -> Self { Self { inner: statistics } } - /// Returns the number of partitions. - pub fn len(&self) -> usize { - self.inner.len() - } - - /// Returns true if there are no partitions. - pub fn is_empty(&self) -> bool { - self.inner.is_empty() - } - - /// Returns the statistics for the specified partition. - /// - /// # Panics - /// - /// Panics if `partition_idx` is out of bounds. pub fn statistics(&self, partition_idx: usize) -> &Statistics { &self.inner[partition_idx] } - /// Returns the statistics for the specified partition, or None if the index is out of bounds. pub fn get_statistics(&self, partition_idx: usize) -> Option<&Statistics> { - self.inner.get(partition_idx) + self.inner.get(partition_idx).map(|arc| arc.as_ref()) } - /// Returns an iterator over the statistics. pub fn iter(&self) -> impl Iterator { - self.inner.iter() + self.inner.iter().map(|arc| arc.as_ref()) } } From 48eebaab86e2f4045f47c79824dfdd73758adc9f Mon Sep 17 00:00:00 2001 From: xudong963 Date: Fri, 4 Apr 2025 00:22:49 +0800 Subject: [PATCH 12/21] refine tests --- .../partition_statistics.rs | 271 +++++++----------- 1 file changed, 99 insertions(+), 172 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index b98c499e1b240..3da9488a9dfe8 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -22,7 +22,7 @@ mod test { use datafusion::prelude::SessionContext; use datafusion_catalog::TableProvider; use datafusion_common::stats::Precision; - use datafusion_common::{ScalarValue, Statistics}; + use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; use datafusion_execution::config::SessionConfig; use datafusion_expr_common::operator::Operator; use datafusion_physical_expr::expressions::{binary, lit, Column}; @@ -35,6 +35,18 @@ mod test { use datafusion_physical_plan::ExecutionPlan; use std::sync::Arc; + /// Creates a test table with statistics from the test data directory. + /// + /// This function: + /// - Creates an external table from './tests/data/test_statistics_per_partition' + /// - If we set the `target_partition` to `2, the data contains 2 partitions, each with 2 rows + /// - Each partition has an "id" column (INT) with the following values: + /// - First partition: [3, 4] + /// - Second partition: [1, 2] + /// - Each row is 110 bytes in size + /// + /// @param target_partition Optional parameter to set the target partitions + /// @return ExecutionPlan representing the scan of the table with statistics async fn generate_listing_table_with_statistics( target_partition: Option, ) -> Arc { @@ -63,31 +75,37 @@ mod test { .unwrap() } - fn check_unchanged_statistics(statistics: Vec) { - // Check the statistics of each partition - for stat in &statistics { - assert_eq!(stat.num_rows, Precision::Exact(2)); - // First column (id) should have non-null values - assert_eq!(stat.column_statistics[0].null_count, Precision::Exact(0)); + /// Helper function to create expected statistics for a partition with Int32 column + fn create_partition_statistics( + num_rows: usize, + total_byte_size: usize, + min_value: i32, + max_value: i32, + include_date_column: bool, + ) -> Statistics { + let mut column_stats = vec![ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Int32(Some(max_value))), + min_value: Precision::Exact(ScalarValue::Int32(Some(min_value))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }]; + + if include_date_column { + column_stats.push(ColumnStatistics { + null_count: Precision::Absent, + max_value: Precision::Absent, + min_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }); } - // Verify specific id values for each partition - assert_eq!( - statistics[0].column_statistics[0].max_value, - Precision::Exact(ScalarValue::Int32(Some(4))) - ); - assert_eq!( - statistics[0].column_statistics[0].min_value, - Precision::Exact(ScalarValue::Int32(Some(3))) - ); - assert_eq!( - statistics[1].column_statistics[0].max_value, - Precision::Exact(ScalarValue::Int32(Some(2))) - ); - assert_eq!( - statistics[1].column_statistics[0].min_value, - Precision::Exact(ScalarValue::Int32(Some(1))) - ); + Statistics { + num_rows: Precision::Exact(num_rows), + total_byte_size: Precision::Exact(total_byte_size), + column_statistics: column_stats, + } } #[tokio::test] @@ -95,13 +113,14 @@ mod test { { let scan = generate_listing_table_with_statistics(Some(2)).await; let statistics = scan.statistics_by_partition()?; + let expected_statistic_partition_1 = + create_partition_statistics(2, 110, 3, 4, true); + let expected_statistic_partition_2 = + create_partition_statistics(2, 110, 1, 2, true); // Check the statistics of each partition assert_eq!(statistics.len(), 2); - for stat in &statistics { - assert_eq!(stat.column_statistics.len(), 2); - assert_eq!(stat.total_byte_size, Precision::Exact(110)); - } - check_unchanged_statistics(statistics); + assert_eq!(statistics[0], expected_statistic_partition_1); + assert_eq!(statistics[1], expected_statistic_partition_2); Ok(()) } @@ -114,11 +133,14 @@ mod test { vec![(Arc::new(Column::new("id", 0)), "id".to_string())]; let projection = ProjectionExec::try_new(exprs, scan)?; let statistics = projection.statistics_by_partition()?; - for stat in &statistics { - assert_eq!(stat.column_statistics.len(), 1); - assert_eq!(stat.total_byte_size, Precision::Exact(8)); - } - check_unchanged_statistics(statistics); + let expected_statistic_partition_1 = + create_partition_statistics(2, 8, 3, 4, false); + let expected_statistic_partition_2 = + create_partition_statistics(2, 8, 1, 2, false); + // Check the statistics of each partition + assert_eq!(statistics.len(), 2); + assert_eq!(statistics[0], expected_statistic_partition_1); + assert_eq!(statistics[1], expected_statistic_partition_2); Ok(()) } @@ -138,55 +160,20 @@ mod test { ); let mut sort_exec = Arc::new(sort.clone()); let statistics = sort_exec.statistics_by_partition()?; + let expected_statistic_partition = + create_partition_statistics(4, 220, 1, 4, true); assert_eq!(statistics.len(), 1); - assert_eq!(statistics[0].num_rows, Precision::Exact(4)); - assert_eq!(statistics[0].column_statistics.len(), 2); - assert_eq!(statistics[0].total_byte_size, Precision::Exact(220)); - assert_eq!( - statistics[0].column_statistics[0].null_count, - Precision::Exact(0) - ); - assert_eq!( - statistics[0].column_statistics[0].max_value, - Precision::Exact(ScalarValue::Int32(Some(4))) - ); - assert_eq!( - statistics[0].column_statistics[0].min_value, - Precision::Exact(ScalarValue::Int32(Some(1))) - ); + assert_eq!(statistics[0], expected_statistic_partition); + sort_exec = Arc::new(sort.with_preserve_partitioning(true)); + let expected_statistic_partition_1 = + create_partition_statistics(2, 110, 3, 4, true); + let expected_statistic_partition_2 = + create_partition_statistics(2, 110, 1, 2, true); let statistics = sort_exec.statistics_by_partition()?; assert_eq!(statistics.len(), 2); - assert_eq!(statistics[0].num_rows, Precision::Exact(2)); - assert_eq!(statistics[1].num_rows, Precision::Exact(2)); - assert_eq!(statistics[0].column_statistics.len(), 2); - assert_eq!(statistics[1].column_statistics.len(), 2); - assert_eq!(statistics[0].total_byte_size, Precision::Exact(110)); - assert_eq!(statistics[1].total_byte_size, Precision::Exact(110)); - assert_eq!( - statistics[0].column_statistics[0].null_count, - Precision::Exact(0) - ); - assert_eq!( - statistics[0].column_statistics[0].max_value, - Precision::Exact(ScalarValue::Int32(Some(4))) - ); - assert_eq!( - statistics[0].column_statistics[0].min_value, - Precision::Exact(ScalarValue::Int32(Some(3))) - ); - assert_eq!( - statistics[1].column_statistics[0].null_count, - Precision::Exact(0) - ); - assert_eq!( - statistics[1].column_statistics[0].max_value, - Precision::Exact(ScalarValue::Int32(Some(2))) - ); - assert_eq!( - statistics[1].column_statistics[0].min_value, - Precision::Exact(ScalarValue::Int32(Some(1))) - ); + assert_eq!(statistics[0], expected_statistic_partition_1); + assert_eq!(statistics[1], expected_statistic_partition_2); Ok(()) } @@ -202,48 +189,33 @@ mod test { )?; let filter: Arc = Arc::new(FilterExec::try_new(predicate, scan)?); - let _full_statistics = filter.statistics()?; - // The full statistics is invalid, at least, we can improve the selectivity estimation of the filter - /* - Statistics { - num_rows: Inexact(0), - total_byte_size: Inexact(0), - column_statistics: [ - ColumnStatistics { - null_count: Exact(0), - max_value: Exact(NULL), - min_value: Exact(NULL), - sum_value: Exact(NULL), - distinct_count: Exact(0), - }, - ColumnStatistics { - null_count: Exact(0), - max_value: Exact(NULL), - min_value: Exact(NULL), - sum_value: Exact(NULL), - distinct_count: Exact(0), - }, - ], - } - */ + let full_statistics = filter.statistics()?; + let expected_full_statistic = Statistics { + num_rows: Precision::Inexact(0), + total_byte_size: Precision::Inexact(0), + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Null), + min_value: Precision::Exact(ScalarValue::Null), + sum_value: Precision::Exact(ScalarValue::Null), + distinct_count: Precision::Exact(0), + }, + ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Null), + min_value: Precision::Exact(ScalarValue::Null), + sum_value: Precision::Exact(ScalarValue::Null), + distinct_count: Precision::Exact(0), + }, + ], + }; + assert_eq!(full_statistics, expected_full_statistic); + let statistics = filter.statistics_by_partition()?; - // Also the statistics of each partition is also invalid due to above - // But we can ensure the current behavior by tests assert_eq!(statistics.len(), 2); - for stat in &statistics { - assert_eq!(stat.column_statistics.len(), 2); - assert_eq!(stat.total_byte_size, Precision::Inexact(0)); - assert_eq!(stat.num_rows, Precision::Inexact(0)); - assert_eq!(stat.column_statistics[0].null_count, Precision::Exact(0)); - assert_eq!( - stat.column_statistics[0].max_value, - Precision::Exact(ScalarValue::Null) - ); - assert_eq!( - stat.column_statistics[0].min_value, - Precision::Exact(ScalarValue::Null) - ); - } + assert_eq!(statistics[0], expected_full_statistic); + assert_eq!(statistics[1], expected_full_statistic); Ok(()) } @@ -254,63 +226,18 @@ mod test { let statistics = union_exec.statistics_by_partition()?; // Check that we have 4 partitions (2 from each scan) assert_eq!(statistics.len(), 4); - + let expected_statistic_partition_1 = + create_partition_statistics(2, 110, 3, 4, true); + let expected_statistic_partition_2 = + create_partition_statistics(2, 110, 1, 2, true); // Verify first partition (from first scan) - assert_eq!(statistics[0].num_rows, Precision::Exact(2)); - assert_eq!(statistics[0].total_byte_size, Precision::Exact(110)); - assert_eq!(statistics[0].column_statistics.len(), 2); - assert_eq!( - statistics[0].column_statistics[0].null_count, - Precision::Exact(0) - ); - assert_eq!( - statistics[0].column_statistics[0].max_value, - Precision::Exact(ScalarValue::Int32(Some(4))) - ); - assert_eq!( - statistics[0].column_statistics[0].min_value, - Precision::Exact(ScalarValue::Int32(Some(3))) - ); - + assert_eq!(statistics[0], expected_statistic_partition_1); // Verify second partition (from first scan) - assert_eq!(statistics[1].num_rows, Precision::Exact(2)); - assert_eq!(statistics[1].total_byte_size, Precision::Exact(110)); - assert_eq!( - statistics[1].column_statistics[0].null_count, - Precision::Exact(0) - ); - assert_eq!( - statistics[1].column_statistics[0].max_value, - Precision::Exact(ScalarValue::Int32(Some(2))) - ); - assert_eq!( - statistics[1].column_statistics[0].min_value, - Precision::Exact(ScalarValue::Int32(Some(1))) - ); - + assert_eq!(statistics[1], expected_statistic_partition_2); // Verify third partition (from second scan - same as first partition) - assert_eq!(statistics[2].num_rows, Precision::Exact(2)); - assert_eq!(statistics[2].total_byte_size, Precision::Exact(110)); - assert_eq!( - statistics[2].column_statistics[0].max_value, - Precision::Exact(ScalarValue::Int32(Some(4))) - ); - assert_eq!( - statistics[2].column_statistics[0].min_value, - Precision::Exact(ScalarValue::Int32(Some(3))) - ); - + assert_eq!(statistics[2], expected_statistic_partition_1); // Verify fourth partition (from second scan - same as second partition) - assert_eq!(statistics[3].num_rows, Precision::Exact(2)); - assert_eq!(statistics[3].total_byte_size, Precision::Exact(110)); - assert_eq!( - statistics[3].column_statistics[0].max_value, - Precision::Exact(ScalarValue::Int32(Some(2))) - ); - assert_eq!( - statistics[3].column_statistics[0].min_value, - Precision::Exact(ScalarValue::Int32(Some(1))) - ); + assert_eq!(statistics[3], expected_statistic_partition_2); Ok(()) } From 7c144967e09b0a90ae1edbcf114569feb3a4e542 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Mon, 7 Apr 2025 16:50:41 +0800 Subject: [PATCH 13/21] save --- .../src/datasource/physical_plan/arrow_file.rs | 2 +- .../physical_optimizer/partition_statistics.rs | 14 +++++++------- datafusion/datasource-avro/src/source.rs | 2 +- datafusion/datasource-csv/src/source.rs | 2 +- datafusion/physical-plan/src/coalesce_batches.rs | 8 ++++++-- .../physical-plan/src/coalesce_partitions.rs | 2 +- datafusion/physical-plan/src/execution_plan.rs | 9 +++++---- datafusion/physical-plan/src/filter.rs | 2 +- datafusion/physical-plan/src/joins/cross_join.rs | 2 +- datafusion/physical-plan/src/limit.rs | 4 ++-- datafusion/physical-plan/src/projection.rs | 2 +- datafusion/physical-plan/src/sorts/sort.rs | 2 +- .../src/sorts/sort_preserving_merge.rs | 2 +- datafusion/physical-plan/src/union.rs | 2 +- .../src/windows/bounded_window_agg_exec.rs | 2 +- 15 files changed, 31 insertions(+), 26 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index ba23cf4095398..72723b8f20fb2 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -191,7 +191,7 @@ impl ExecutionPlan for ArrowExec { self.inner.statistics() } - fn statistics_by_partition(&self) -> Result> { + fn statistics_by_partition(&self) -> Result { self.inner.statistics_by_partition() } diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 3da9488a9dfe8..4292b9b3bb753 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -39,7 +39,7 @@ mod test { /// /// This function: /// - Creates an external table from './tests/data/test_statistics_per_partition' - /// - If we set the `target_partition` to `2, the data contains 2 partitions, each with 2 rows + /// - If we set the `target_partition` to 2, the data contains 2 partitions, each with 2 rows /// - Each partition has an "id" column (INT) with the following values: /// - First partition: [3, 4] /// - Second partition: [1, 2] @@ -47,7 +47,7 @@ mod test { /// /// @param target_partition Optional parameter to set the target partitions /// @return ExecutionPlan representing the scan of the table with statistics - async fn generate_listing_table_with_statistics( + async fn create_scan_exec_with_statistics( target_partition: Option, ) -> Arc { let mut session_config = SessionConfig::new().with_collect_statistics(true); @@ -111,7 +111,7 @@ mod test { #[tokio::test] async fn test_statistics_by_partition_of_data_source() -> datafusion_common::Result<()> { - let scan = generate_listing_table_with_statistics(Some(2)).await; + let scan = create_scan_exec_with_statistics(Some(2)).await; let statistics = scan.statistics_by_partition()?; let expected_statistic_partition_1 = create_partition_statistics(2, 110, 3, 4, true); @@ -127,7 +127,7 @@ mod test { #[tokio::test] async fn test_statistics_by_partition_of_projection() -> datafusion_common::Result<()> { - let scan = generate_listing_table_with_statistics(Some(2)).await; + let scan = create_scan_exec_with_statistics(Some(2)).await; // Add projection execution plan let exprs: Vec<(Arc, String)> = vec![(Arc::new(Column::new("id", 0)), "id".to_string())]; @@ -146,7 +146,7 @@ mod test { #[tokio::test] async fn test_statistics_by_partition_of_sort() -> datafusion_common::Result<()> { - let scan = generate_listing_table_with_statistics(Some(2)).await; + let scan = create_scan_exec_with_statistics(Some(2)).await; // Add sort execution plan let sort = SortExec::new( LexOrdering::new(vec![PhysicalSortExpr { @@ -179,7 +179,7 @@ mod test { #[tokio::test] async fn test_statistics_by_partition_of_filter() -> datafusion_common::Result<()> { - let scan = generate_listing_table_with_statistics(Some(2)).await; + let scan = create_scan_exec_with_statistics(Some(2)).await; let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); let predicate = binary( Arc::new(Column::new("id", 0)), @@ -221,7 +221,7 @@ mod test { #[tokio::test] async fn test_statistic_by_partition_of_union() -> datafusion_common::Result<()> { - let scan = generate_listing_table_with_statistics(Some(2)).await; + let scan = create_scan_exec_with_statistics(Some(2)).await; let union_exec = Arc::new(UnionExec::new(vec![scan.clone(), scan])); let statistics = union_exec.statistics_by_partition()?; // Check that we have 4 partitions (2 from each scan) diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index 5c662511929c7..4a81798619c34 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -141,7 +141,7 @@ impl ExecutionPlan for AvroExec { self.inner.statistics() } - fn statistics_by_partition(&self) -> Result> { + fn statistics_by_partition(&self) -> Result { self.inner.statistics_by_partition() } diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 22fbb288c0396..5ca086d03af36 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -381,7 +381,7 @@ impl ExecutionPlan for CsvExec { self.inner.statistics() } - fn statistics_by_partition(&self) -> Result> { + fn statistics_by_partition(&self) -> Result { self.inner.statistics_by_partition() } diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 314e5548b36a9..2bdd1e58c2d1a 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -199,8 +199,12 @@ impl ExecutionPlan for CoalesceBatchesExec { Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1) } - fn statistics_by_partition(&self) -> Result> { - Ok(vec![self.statistics()?]) + fn statistics_by_partition(&self) -> Result { + self.input + .statistics_by_partition()? + .into_iter() + .map(|stat| Statistics::with_fetch(stat, self.schema(), self.fetch, 0, 1)) + .collect() } fn with_fetch(&self, limit: Option) -> Option> { diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 1de838e830c23..7835b97244737 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -199,7 +199,7 @@ impl ExecutionPlan for CoalescePartitionsExec { Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1) } - fn statistics_by_partition(&self) -> Result> { + fn statistics_by_partition(&self) -> Result { Ok(vec![self.statistics()?]) } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index ab69ac9af32a2..49aa46b59b577 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -55,6 +55,7 @@ use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::stream::{StreamExt, TryStreamExt}; +use crate::statistics::PartitionedStatistics; /// Represent nodes in the DataFusion Physical Plan. /// @@ -433,11 +434,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// Returns statistics for each partition of this `ExecutionPlan` node. /// If statistics are not available, returns an array of /// [`Statistics::new_unknown`] for each partition. - fn statistics_by_partition(&self) -> Result> { - Ok(vec![ - Statistics::new_unknown(&self.schema()); + fn statistics_by_partition(&self) -> Result { + Ok(PartitionedStatistics::new(vec![ + Arc::new(Statistics::new_unknown(&self.schema())); self.properties().partitioning.partition_count() - ]) + ])) } /// Returns `true` if a limit can be safely pushed down through this diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 8378974be5d9b..a0c49347f0fb3 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -413,7 +413,7 @@ impl ExecutionPlan for FilterExec { Ok(stats.project(self.projection.as_ref())) } - fn statistics_by_partition(&self) -> Result> { + fn statistics_by_partition(&self) -> Result { self.input .statistics_by_partition()? .into_iter() diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 3ad5276a547c6..a7c2f20c253b5 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -344,7 +344,7 @@ impl ExecutionPlan for CrossJoinExec { )) } - fn statistics_by_partition(&self) -> Result> { + fn statistics_by_partition(&self) -> Result { let left_stats = self.left.statistics_by_partition()?; let right_stats = self.right.statistics_by_partition()?; diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 436b937f7086b..00b4a68499839 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -202,7 +202,7 @@ impl ExecutionPlan for GlobalLimitExec { ) } - fn statistics_by_partition(&self) -> Result> { + fn statistics_by_partition(&self) -> Result { Ok(vec![self.statistics()?]) } @@ -347,7 +347,7 @@ impl ExecutionPlan for LocalLimitExec { ) } - fn statistics_by_partition(&self) -> Result> { + fn statistics_by_partition(&self) -> Result { self.input .statistics_by_partition()? .into_iter() diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 629d9a2802af7..789e80dea672f 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -251,7 +251,7 @@ impl ExecutionPlan for ProjectionExec { )) } - fn statistics_by_partition(&self) -> Result> { + fn statistics_by_partition(&self) -> Result { Ok(self .input .statistics_by_partition()? diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index e6f52496d3110..4d35f98b081c6 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1300,7 +1300,7 @@ impl ExecutionPlan for SortExec { Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1) } - fn statistics_by_partition(&self) -> Result> { + fn statistics_by_partition(&self) -> Result { if !self.preserve_partitioning() { return Ok(vec![self.statistics()?]); } diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index a1cfb3233751f..b4825a1f59247 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -346,7 +346,7 @@ impl ExecutionPlan for SortPreservingMergeExec { self.input.statistics() } - fn statistics_by_partition(&self) -> Result> { + fn statistics_by_partition(&self) -> Result { Ok(vec![self.statistics()?]) } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 7225b1eee4dc6..45533c4cdfd81 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -270,7 +270,7 @@ impl ExecutionPlan for UnionExec { .unwrap_or_else(|| Statistics::new_unknown(&self.schema()))) } - fn statistics_by_partition(&self) -> Result> { + fn statistics_by_partition(&self) -> Result { Ok(self .inputs .iter() diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 139b1e3414a7e..3653ee949c744 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -364,7 +364,7 @@ impl ExecutionPlan for BoundedWindowAggExec { self.statistics_helper(input_stat) } - fn statistics_by_partition(&self) -> Result> { + fn statistics_by_partition(&self) -> Result { self.input .statistics_by_partition()? .into_iter() From a69b24170252daeb425df3b8d260c05a7e558c74 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Mon, 7 Apr 2025 16:52:28 +0800 Subject: [PATCH 14/21] resolve conflicts --- datafusion/datasource/src/statistics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index 5b44860091256..f26099a815995 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -36,7 +36,7 @@ use datafusion_common::stats::Precision; use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError, Result}; use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; -use datafusion_physical_plan::statistics::{ +pub(crate) use datafusion_physical_plan::statistics::{ add_row_stats, compute_summary_statistics, set_max_if_greater, set_min_if_lesser, }; use datafusion_physical_plan::{ColumnStatistics, Statistics}; From 7e3d1e689909e973fc7677766bfd7fac7a6a41ea Mon Sep 17 00:00:00 2001 From: xudong963 Date: Mon, 7 Apr 2025 19:03:46 +0800 Subject: [PATCH 15/21] use PartitionedStatistics --- .../datasource/physical_plan/arrow_file.rs | 1 + datafusion/datasource-avro/src/source.rs | 1 + datafusion/datasource-csv/src/source.rs | 1 + datafusion/datasource/src/file_groups.rs | 7 ++++- datafusion/datasource/src/source.rs | 8 +++--- .../physical-plan/src/coalesce_batches.rs | 22 +++++++++++---- .../physical-plan/src/coalesce_partitions.rs | 5 +++- .../physical-plan/src/execution_plan.rs | 10 ++++--- datafusion/physical-plan/src/filter.rs | 21 +++++++++------ .../physical-plan/src/joins/cross_join.rs | 27 +++++++++++-------- datafusion/physical-plan/src/limit.rs | 27 ++++++++++++++----- datafusion/physical-plan/src/projection.rs | 26 +++++++++--------- datafusion/physical-plan/src/sorts/sort.rs | 27 ++++++++++++++----- .../src/sorts/sort_preserving_merge.rs | 5 +++- datafusion/physical-plan/src/statistics.rs | 4 +++ datafusion/physical-plan/src/union.rs | 17 +++++++----- .../src/windows/bounded_window_agg_exec.rs | 15 +++++++---- 17 files changed, 155 insertions(+), 69 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 72723b8f20fb2..5a237c29372f2 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -42,6 +42,7 @@ use datafusion_physical_plan::{ }; use datafusion_datasource::file_groups::FileGroup; +use datafusion_physical_plan::statistics::PartitionedStatistics; use futures::StreamExt; use itertools::Itertools; use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore}; diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index 4a81798619c34..1a0feea3f1535 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -40,6 +40,7 @@ use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, }; +use datafusion_physical_plan::statistics::PartitionedStatistics; use object_store::ObjectStore; /// Execution plan for scanning Avro data source diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 5ca086d03af36..28aad5e43f147 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -51,6 +51,7 @@ use datafusion_physical_plan::{ use crate::file_format::CsvDecoder; use datafusion_datasource::file_groups::FileGroup; +use datafusion_physical_plan::statistics::PartitionedStatistics; use futures::{StreamExt, TryStreamExt}; use object_store::buffered::BufWriter; use object_store::{GetOptions, GetResultPayload, ObjectStore}; diff --git a/datafusion/datasource/src/file_groups.rs b/datafusion/datasource/src/file_groups.rs index 15c86427ed00a..907ea62e4e7e3 100644 --- a/datafusion/datasource/src/file_groups.rs +++ b/datafusion/datasource/src/file_groups.rs @@ -421,7 +421,12 @@ impl FileGroup { } /// Get the statistics for this group - pub fn statistics(&self) -> Option<&Statistics> { + pub fn statistics(&self) -> &Option> { + &self.statistics + } + + /// Get the statistics for this group + pub fn statistics_ref(&self) -> Option<&Statistics> { self.statistics.as_deref() } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index f0ed08708ba31..ea02653d0bbc4 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -188,9 +188,11 @@ impl ExecutionPlan for DataSourceExec { self.data_source.statistics() } - fn statistics_by_partition(&self) -> datafusion_common::Result> { + fn statistics_by_partition( + &self, + ) -> datafusion_common::Result { let mut statistics = vec![ - Statistics::new_unknown(&self.schema()); + Arc::new(Statistics::new_unknown(&self.schema())); self.properties().partitioning.partition_count() ]; if let Some(file_config) = @@ -202,7 +204,7 @@ impl ExecutionPlan for DataSourceExec { } } } - Ok(statistics) + Ok(PartitionedStatistics::new(statistics)) } fn with_fetch(&self, limit: Option) -> Option> { diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 2bdd1e58c2d1a..3cebd53921a21 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -200,11 +200,23 @@ impl ExecutionPlan for CoalesceBatchesExec { } fn statistics_by_partition(&self) -> Result { - self.input - .statistics_by_partition()? - .into_iter() - .map(|stat| Statistics::with_fetch(stat, self.schema(), self.fetch, 0, 1)) - .collect() + let input_stats = self.input.statistics_by_partition()?; + + let stats: Result>> = input_stats + .iter() + .map(|stat| { + let fetched_stat = Statistics::with_fetch( + stat.clone(), + self.schema(), + self.fetch, + 0, + 1, + )?; + Ok(Arc::new(fetched_stat)) + }) + .collect(); + + Ok(PartitionedStatistics::new(stats?)) } fn with_fetch(&self, limit: Option) -> Option> { diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 7835b97244737..8906aa1b04b69 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -31,6 +31,7 @@ use crate::execution_plan::CardinalityEffect; use crate::projection::{make_with_child, ProjectionExec}; use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; +use crate::statistics::PartitionedStatistics; use datafusion_common::{internal_err, Result}; use datafusion_execution::TaskContext; @@ -200,7 +201,9 @@ impl ExecutionPlan for CoalescePartitionsExec { } fn statistics_by_partition(&self) -> Result { - Ok(vec![self.statistics()?]) + Ok(PartitionedStatistics::new(vec![Arc::new( + self.statistics()?, + )])) } fn supports_limit_pushdown(&self) -> bool { diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 49aa46b59b577..d5878e12cf594 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -54,8 +54,8 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use datafusion_physical_expr_common::sort_expr::LexRequirement; -use futures::stream::{StreamExt, TryStreamExt}; use crate::statistics::PartitionedStatistics; +use futures::stream::{StreamExt, TryStreamExt}; /// Represent nodes in the DataFusion Physical Plan. /// @@ -436,8 +436,12 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// [`Statistics::new_unknown`] for each partition. fn statistics_by_partition(&self) -> Result { Ok(PartitionedStatistics::new(vec![ - Arc::new(Statistics::new_unknown(&self.schema())); - self.properties().partitioning.partition_count() + Arc::new( + Statistics::new_unknown(&self.schema()) + ); + self.properties() + .partitioning + .partition_count() ])) } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index a0c49347f0fb3..59df4a5f3f861 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -58,6 +58,7 @@ use datafusion_physical_expr::{ ExprBoundaries, PhysicalExpr, }; +use crate::statistics::PartitionedStatistics; use datafusion_physical_expr_common::physical_expr::fmt_sql; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -414,19 +415,23 @@ impl ExecutionPlan for FilterExec { } fn statistics_by_partition(&self) -> Result { - self.input - .statistics_by_partition()? - .into_iter() - .map(|input_stat| { - Self::statistics_helper( + let input_stats = self.input.statistics_by_partition()?; + + let stats: Result>> = input_stats + .iter() + .map(|stat| { + let stat = Self::statistics_helper( self.schema(), - input_stat, + stat.clone(), self.predicate(), self.default_selectivity, ) - .map(|stat| stat.project(self.projection.as_ref())) + .map(|stat| stat.project(self.projection.as_ref()))?; + Ok(Arc::new(stat)) }) - .collect() + .collect(); + + Ok(PartitionedStatistics::new(stats?)) } fn cardinality_effect(&self) -> CardinalityEffect { diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index a7c2f20c253b5..56c3f301fc20f 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -46,7 +46,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::join_equivalence_properties; -use crate::statistics::compute_summary_statistics; +use crate::statistics::{compute_summary_statistics, PartitionedStatistics}; use async_trait::async_trait; use futures::{ready, Stream, StreamExt, TryStreamExt}; @@ -349,19 +349,24 @@ impl ExecutionPlan for CrossJoinExec { let right_stats = self.right.statistics_by_partition()?; if left_stats.is_empty() || right_stats.is_empty() { - return Ok(vec![]); + return Ok(PartitionedStatistics::new(vec![])); } // Summarize the `left_stats` - let statistics = - compute_summary_statistics(left_stats, self.schema.fields().len(), |stats| { - Some(stats) - }); - - Ok(right_stats - .into_iter() - .map(|right| stats_cartesian_product(statistics.clone(), right)) - .collect()) + let statistics = compute_summary_statistics( + left_stats.iter(), + self.schema.fields().len(), + |stats| Some(stats), + ); + + Ok(PartitionedStatistics::new( + right_stats + .iter() + .map(|right| { + Arc::new(stats_cartesian_product(statistics.clone(), right.clone())) + }) + .collect(), + )) } /// Tries to swap the projection with its input [`CrossJoinExec`]. If it can be done, diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 00b4a68499839..b2eedff1a63b1 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -35,6 +35,7 @@ use arrow::record_batch::RecordBatch; use datafusion_common::{internal_err, Result}; use datafusion_execution::TaskContext; +use crate::statistics::PartitionedStatistics; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -203,7 +204,9 @@ impl ExecutionPlan for GlobalLimitExec { } fn statistics_by_partition(&self) -> Result { - Ok(vec![self.statistics()?]) + Ok(PartitionedStatistics::new(vec![Arc::new( + self.statistics()?, + )])) } fn fetch(&self) -> Option { @@ -348,13 +351,23 @@ impl ExecutionPlan for LocalLimitExec { } fn statistics_by_partition(&self) -> Result { - self.input - .statistics_by_partition()? - .into_iter() - .map(|input_stat| { - Statistics::with_fetch(input_stat, self.schema(), Some(self.fetch), 0, 1) + let input_stats = self.input.statistics_by_partition()?; + + let stats: Result>> = input_stats + .iter() + .map(|stat| { + let fetched_stat = Statistics::with_fetch( + stat.clone(), + self.schema(), + Some(self.fetch), + 0, + 1, + )?; + Ok(Arc::new(fetched_stat)) }) - .collect() + .collect(); + + Ok(PartitionedStatistics::new(stats?)) } fn fetch(&self) -> Option { diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 789e80dea672f..831fb46ed3e86 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -48,6 +48,7 @@ use datafusion_physical_expr::equivalence::ProjectionMapping; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::PhysicalExprRef; +use crate::statistics::PartitionedStatistics; use datafusion_physical_expr_common::physical_expr::fmt_sql; use futures::stream::{Stream, StreamExt}; use itertools::Itertools; @@ -252,18 +253,19 @@ impl ExecutionPlan for ProjectionExec { } fn statistics_by_partition(&self) -> Result { - Ok(self - .input - .statistics_by_partition()? - .into_iter() - .map(|input_stats| { - stats_projection( - input_stats, - self.expr.iter().map(|(e, _)| Arc::clone(e)), - Arc::clone(&self.schema), - ) - }) - .collect()) + Ok(PartitionedStatistics::new( + self.input + .statistics_by_partition()? + .iter() + .map(|input_stats| { + Arc::new(stats_projection( + input_stats.clone(), + self.expr.iter().map(|(e, _)| Arc::clone(e)), + Arc::clone(&self.schema), + )) + }) + .collect(), + )) } fn supports_limit_pushdown(&self) -> bool { diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 4d35f98b081c6..62b6e040031a9 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -60,6 +60,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::LexOrdering; use datafusion_physical_expr_common::sort_expr::LexRequirement; +use crate::statistics::PartitionedStatistics; use futures::{StreamExt, TryStreamExt}; use log::{debug, trace}; @@ -1302,13 +1303,27 @@ impl ExecutionPlan for SortExec { fn statistics_by_partition(&self) -> Result { if !self.preserve_partitioning() { - return Ok(vec![self.statistics()?]); + return Ok(PartitionedStatistics::new(vec![Arc::new( + self.statistics()?, + )])); } - self.input - .statistics_by_partition()? - .into_iter() - .map(|stat| Statistics::with_fetch(stat, self.schema(), self.fetch, 0, 1)) - .collect() + let input_stats = self.input.statistics_by_partition()?; + + let stats: Result>> = input_stats + .iter() + .map(|stat| { + let fetched_stat = Statistics::with_fetch( + stat.clone(), + self.schema(), + self.fetch, + 0, + 1, + )?; + Ok(Arc::new(fetched_stat)) + }) + .collect(); + + Ok(PartitionedStatistics::new(stats?)) } fn with_fetch(&self, limit: Option) -> Option> { diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index b4825a1f59247..8d44962cdb33c 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -36,6 +36,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; +use crate::statistics::PartitionedStatistics; use log::{debug, trace}; /// Sort preserving merge execution plan @@ -347,7 +348,9 @@ impl ExecutionPlan for SortPreservingMergeExec { } fn statistics_by_partition(&self) -> Result { - Ok(vec![self.statistics()?]) + Ok(PartitionedStatistics::new(vec![Arc::new( + self.statistics()?, + )])) } fn supports_limit_pushdown(&self) -> bool { diff --git a/datafusion/physical-plan/src/statistics.rs b/datafusion/physical-plan/src/statistics.rs index 0d8f15f0947df..855e007e8709d 100644 --- a/datafusion/physical-plan/src/statistics.rs +++ b/datafusion/physical-plan/src/statistics.rs @@ -48,6 +48,10 @@ impl PartitionedStatistics { pub fn iter(&self) -> impl Iterator { self.inner.iter().map(|arc| arc.as_ref()) } + + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } } /// Generic function to compute statistics across multiple items that have statistics diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 45533c4cdfd81..c310ff5692e8d 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -46,6 +46,7 @@ use datafusion_common::{exec_err, internal_err, DataFusionError, Result}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{calculate_union, EquivalenceProperties}; +use crate::statistics::PartitionedStatistics; use futures::Stream; use itertools::Itertools; use log::{debug, trace, warn}; @@ -271,14 +272,18 @@ impl ExecutionPlan for UnionExec { } fn statistics_by_partition(&self) -> Result { - Ok(self + let input_stats_vec = self .inputs .iter() - .map(|child| child.statistics_by_partition()) - .collect::>>()? - .into_iter() - .flatten() - .collect()) + .map(|input| input.statistics_by_partition()) + .collect::>>()?; + + let all_stats: Vec<_> = input_stats_vec + .iter() + .flat_map(|input_stats| input_stats.iter().map(|stat| Arc::new(stat.clone()))) + .collect(); + + Ok(PartitionedStatistics::new(all_stats)) } fn benefits_from_input_partitioning(&self) -> Vec { diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 3653ee949c744..36dc2a3799c8d 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -63,6 +63,7 @@ use datafusion_physical_expr::window::{ use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; +use crate::statistics::PartitionedStatistics; use futures::stream::Stream; use futures::{ready, StreamExt}; use hashbrown::hash_table::HashTable; @@ -365,11 +366,15 @@ impl ExecutionPlan for BoundedWindowAggExec { } fn statistics_by_partition(&self) -> Result { - self.input - .statistics_by_partition()? - .into_iter() - .map(|stat| self.statistics_helper(stat)) - .collect() + let input_stats = self.input.statistics_by_partition()?; + let stats: Result>> = input_stats + .iter() + .map(|stat| { + let stat = self.statistics_helper(stat.clone())?; + Ok(Arc::new(stat)) + }) + .collect(); + Ok(PartitionedStatistics::new(stats?)) } } From 0cac2df951fcf83149512585f1adb1a57e041219 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Mon, 7 Apr 2025 19:35:36 +0800 Subject: [PATCH 16/21] impl index and len for PartitionedStatistics --- datafusion/physical-plan/src/statistics.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/datafusion/physical-plan/src/statistics.rs b/datafusion/physical-plan/src/statistics.rs index 855e007e8709d..d005d861c9321 100644 --- a/datafusion/physical-plan/src/statistics.rs +++ b/datafusion/physical-plan/src/statistics.rs @@ -21,6 +21,7 @@ use datafusion_common::stats::Precision; use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; use std::mem; +use std::ops::Index; use std::sync::Arc; /// Represents statistics data grouped by partition. @@ -52,6 +53,18 @@ impl PartitionedStatistics { pub fn is_empty(&self) -> bool { self.inner.is_empty() } + + pub fn len(&self) -> usize { + self.inner.len() + } +} + +impl Index for PartitionedStatistics { + type Output = Statistics; + + fn index(&self, partition_idx: usize) -> &Self::Output { + self.statistics(partition_idx) + } } /// Generic function to compute statistics across multiple items that have statistics From 495d73ee661c964cb9b045c17cdebe5f0245176d Mon Sep 17 00:00:00 2001 From: xudong963 Date: Mon, 7 Apr 2025 20:12:44 +0800 Subject: [PATCH 17/21] add test for cross join --- .../partition_statistics.rs | 67 +++++++++++++++++-- .../physical-plan/src/joins/cross_join.rs | 2 +- 2 files changed, 61 insertions(+), 8 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 4292b9b3bb753..672dcf246b40e 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -29,6 +29,7 @@ mod test { use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_plan::filter::FilterExec; + use datafusion_physical_plan::joins::CrossJoinExec; use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::union::UnionExec; @@ -48,6 +49,7 @@ mod test { /// @param target_partition Optional parameter to set the target partitions /// @return ExecutionPlan representing the scan of the table with statistics async fn create_scan_exec_with_statistics( + create_table_sql: Option<&str>, target_partition: Option, ) -> Arc { let mut session_config = SessionConfig::new().with_collect_statistics(true); @@ -56,14 +58,25 @@ mod test { } let ctx = SessionContext::new_with_config(session_config); // Create table with partition - let create_table_sql = "CREATE EXTERNAL TABLE t1 (id INT not null, date DATE) STORED AS PARQUET LOCATION './tests/data/test_statistics_per_partition' PARTITIONED BY (date) WITH ORDER (id ASC);"; + let create_table_sql = create_table_sql.unwrap_or( + "CREATE EXTERNAL TABLE t1 (id INT NOT NULL, date DATE) \ + STORED AS PARQUET LOCATION './tests/data/test_statistics_per_partition'\ + PARTITIONED BY (date) \ + WITH ORDER (id ASC);", + ); + // Get table name from `create_table_sql` + let table_name = create_table_sql + .split_whitespace() + .nth(3) + .unwrap_or("t1") + .to_string(); ctx.sql(create_table_sql) .await .unwrap() .collect() .await .unwrap(); - let table = ctx.table_provider("t1").await.unwrap(); + let table = ctx.table_provider(table_name.as_str()).await.unwrap(); let listing_table = table .as_any() .downcast_ref::() @@ -111,7 +124,7 @@ mod test { #[tokio::test] async fn test_statistics_by_partition_of_data_source() -> datafusion_common::Result<()> { - let scan = create_scan_exec_with_statistics(Some(2)).await; + let scan = create_scan_exec_with_statistics(None, Some(2)).await; let statistics = scan.statistics_by_partition()?; let expected_statistic_partition_1 = create_partition_statistics(2, 110, 3, 4, true); @@ -127,7 +140,7 @@ mod test { #[tokio::test] async fn test_statistics_by_partition_of_projection() -> datafusion_common::Result<()> { - let scan = create_scan_exec_with_statistics(Some(2)).await; + let scan = create_scan_exec_with_statistics(None, Some(2)).await; // Add projection execution plan let exprs: Vec<(Arc, String)> = vec![(Arc::new(Column::new("id", 0)), "id".to_string())]; @@ -146,7 +159,7 @@ mod test { #[tokio::test] async fn test_statistics_by_partition_of_sort() -> datafusion_common::Result<()> { - let scan = create_scan_exec_with_statistics(Some(2)).await; + let scan = create_scan_exec_with_statistics(None, Some(2)).await; // Add sort execution plan let sort = SortExec::new( LexOrdering::new(vec![PhysicalSortExpr { @@ -179,7 +192,7 @@ mod test { #[tokio::test] async fn test_statistics_by_partition_of_filter() -> datafusion_common::Result<()> { - let scan = create_scan_exec_with_statistics(Some(2)).await; + let scan = create_scan_exec_with_statistics(None, Some(2)).await; let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); let predicate = binary( Arc::new(Column::new("id", 0)), @@ -221,7 +234,7 @@ mod test { #[tokio::test] async fn test_statistic_by_partition_of_union() -> datafusion_common::Result<()> { - let scan = create_scan_exec_with_statistics(Some(2)).await; + let scan = create_scan_exec_with_statistics(None, Some(2)).await; let union_exec = Arc::new(UnionExec::new(vec![scan.clone(), scan])); let statistics = union_exec.statistics_by_partition()?; // Check that we have 4 partitions (2 from each scan) @@ -241,4 +254,44 @@ mod test { Ok(()) } + + #[tokio::test] + async fn test_statistic_by_partition_of_cross_join() -> datafusion_common::Result<()> + { + let left_scan = create_scan_exec_with_statistics(None, Some(2)).await; + let right_create_table_sql = "CREATE EXTERNAL TABLE t2 (id INT NOT NULL) \ + STORED AS PARQUET LOCATION './tests/data/test_statistics_per_partition'\ + WITH ORDER (id ASC);"; + let right_scan = + create_scan_exec_with_statistics(Some(right_create_table_sql), Some(2)).await; + let cross_join = CrossJoinExec::new(left_scan, right_scan); + let statistics = cross_join.statistics_by_partition()?; + // Check that we have 2 partitions + assert_eq!(statistics.len(), 2); + let mut expected_statistic_partition_1 = + create_partition_statistics(8, 48400, 1, 4, true); + expected_statistic_partition_1 + .column_statistics + .push(ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Int32(Some(4))), + min_value: Precision::Exact(ScalarValue::Int32(Some(3))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }); + let mut expected_statistic_partition_2 = + create_partition_statistics(8, 48400, 1, 4, true); + expected_statistic_partition_2 + .column_statistics + .push(ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Int32(Some(2))), + min_value: Precision::Exact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }); + assert_eq!(statistics[0], expected_statistic_partition_1); + assert_eq!(statistics[1], expected_statistic_partition_2); + Ok(()) + } } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 56c3f301fc20f..4e14e41178b58 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -355,7 +355,7 @@ impl ExecutionPlan for CrossJoinExec { // Summarize the `left_stats` let statistics = compute_summary_statistics( left_stats.iter(), - self.schema.fields().len(), + self.left.schema().fields().len(), |stats| Some(stats), ); From 33cf80a8cbb3f19a2a94da81303df463223e3f77 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Mon, 7 Apr 2025 20:27:12 +0800 Subject: [PATCH 18/21] fix clippy --- datafusion/datasource/src/source.rs | 13 ++++++++----- datafusion/physical-plan/src/execution_plan.rs | 15 +++++++-------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index ea02653d0bbc4..2c65fb6957ee5 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -191,16 +191,19 @@ impl ExecutionPlan for DataSourceExec { fn statistics_by_partition( &self, ) -> datafusion_common::Result { - let mut statistics = vec![ - Arc::new(Statistics::new_unknown(&self.schema())); - self.properties().partitioning.partition_count() - ]; + let mut statistics = { + let mut v = + Vec::with_capacity(self.properties().partitioning.partition_count()); + (0..self.properties().partitioning.partition_count()) + .for_each(|_| v.push(Arc::new(Statistics::new_unknown(&self.schema())))); + v + }; if let Some(file_config) = self.data_source.as_any().downcast_ref::() { for (idx, file_group) in file_config.file_groups.iter().enumerate() { if let Some(stat) = file_group.statistics() { - statistics[idx] = stat.clone(); + statistics[idx] = Arc::clone(stat); } } } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index d5878e12cf594..cc17ebef26e89 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -435,14 +435,13 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// If statistics are not available, returns an array of /// [`Statistics::new_unknown`] for each partition. fn statistics_by_partition(&self) -> Result { - Ok(PartitionedStatistics::new(vec![ - Arc::new( - Statistics::new_unknown(&self.schema()) - ); - self.properties() - .partitioning - .partition_count() - ])) + Ok(PartitionedStatistics::new({ + let mut v = + Vec::with_capacity(self.properties().partitioning.partition_count()); + (0..self.properties().partitioning.partition_count()) + .for_each(|_| v.push(Arc::new(Statistics::new_unknown(&self.schema())))); + v + })) } /// Returns `true` if a limit can be safely pushed down through this From de08c3becb1b56738b1c5a051691997393202580 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Tue, 8 Apr 2025 15:44:59 +0800 Subject: [PATCH 19/21] Check the statistics_by_partition with real results --- .../partition_statistics.rs | 191 ++++++++++++++++-- 1 file changed, 177 insertions(+), 14 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 672dcf246b40e..38b8e86c4638c 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -17,23 +17,30 @@ #[cfg(test)] mod test { + use arrow::array::{Int32Array, RecordBatch}; use arrow_schema::{DataType, Field, Schema, SortOptions}; use datafusion::datasource::listing::ListingTable; use datafusion::prelude::SessionContext; use datafusion_catalog::TableProvider; use datafusion_common::stats::Precision; + use datafusion_common::Result; use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; use datafusion_execution::config::SessionConfig; + use datafusion_execution::TaskContext; use datafusion_expr_common::operator::Operator; use datafusion_physical_expr::expressions::{binary, lit, Column}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; + use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; + use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::filter::FilterExec; use datafusion_physical_plan::joins::CrossJoinExec; + use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::union::UnionExec; - use datafusion_physical_plan::ExecutionPlan; + use datafusion_physical_plan::{execute_stream_partitioned, ExecutionPlan}; + use futures::TryStreamExt; use std::sync::Arc; /// Creates a test table with statistics from the test data directory. @@ -121,9 +128,63 @@ mod test { } } + /// Helper function to validate that statistics from statistics_by_partition match the actual data + async fn validate_statistics_with_data( + plan: Arc, + expected_stats: Vec<(i32, i32, usize)>, // (min_id, max_id, row_count) + id_column_index: usize, + ) -> Result<()> { + let ctx = TaskContext::default(); + let partitions = execute_stream_partitioned(plan, Arc::new(ctx))?; + + let mut actual_stats = Vec::new(); + for partition_stream in partitions.into_iter() { + let result: Vec = partition_stream.try_collect().await?; + + let mut min_id = i32::MAX; + let mut max_id = i32::MIN; + let mut row_count = 0; + + for batch in result { + if batch.num_columns() > id_column_index { + let id_array = batch + .column(id_column_index) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + let id_value = id_array.value(i); + min_id = min_id.min(id_value); + max_id = max_id.max(id_value); + row_count += 1; + } + } + } + + if row_count > 0 { + actual_stats.push((min_id, max_id, row_count)); + } + } + + // Compare actual data with expected statistics + assert_eq!( + actual_stats.len(), + expected_stats.len(), + "Number of partitions with data doesn't match expected" + ); + for i in 0..actual_stats.len() { + assert_eq!( + actual_stats[i], expected_stats[i], + "Partition {} data doesn't match statistics", + i + ); + } + + Ok(()) + } + #[tokio::test] - async fn test_statistics_by_partition_of_data_source() -> datafusion_common::Result<()> - { + async fn test_statistics_by_partition_of_data_source() -> Result<()> { let scan = create_scan_exec_with_statistics(None, Some(2)).await; let statistics = scan.statistics_by_partition()?; let expected_statistic_partition_1 = @@ -134,12 +195,19 @@ mod test { assert_eq!(statistics.len(), 2); assert_eq!(statistics[0], expected_statistic_partition_1); assert_eq!(statistics[1], expected_statistic_partition_2); + + // Check the statistics_by_partition with real results + let expected_stats = vec![ + (3, 4, 2), // (min_id, max_id, row_count) for first partition + (1, 2, 2), // (min_id, max_id, row_count) for second partition + ]; + validate_statistics_with_data(scan, expected_stats, 0).await?; + Ok(()) } #[tokio::test] - async fn test_statistics_by_partition_of_projection() -> datafusion_common::Result<()> - { + async fn test_statistics_by_partition_of_projection() -> Result<()> { let scan = create_scan_exec_with_statistics(None, Some(2)).await; // Add projection execution plan let exprs: Vec<(Arc, String)> = @@ -154,12 +222,16 @@ mod test { assert_eq!(statistics.len(), 2); assert_eq!(statistics[0], expected_statistic_partition_1); assert_eq!(statistics[1], expected_statistic_partition_2); + + // Check the statistics_by_partition with real results + let expected_stats = vec![(3, 4, 2), (1, 2, 2)]; + validate_statistics_with_data(Arc::new(projection), expected_stats, 0).await?; Ok(()) } #[tokio::test] - async fn test_statistics_by_partition_of_sort() -> datafusion_common::Result<()> { - let scan = create_scan_exec_with_statistics(None, Some(2)).await; + async fn test_statistics_by_partition_of_sort() -> Result<()> { + let scan_1 = create_scan_exec_with_statistics(None, Some(1)).await; // Add sort execution plan let sort = SortExec::new( LexOrdering::new(vec![PhysicalSortExpr { @@ -169,16 +241,34 @@ mod test { nulls_first: false, }, }]), - scan, + scan_1, ); - let mut sort_exec = Arc::new(sort.clone()); + let sort_exec = Arc::new(sort.clone()); let statistics = sort_exec.statistics_by_partition()?; let expected_statistic_partition = create_partition_statistics(4, 220, 1, 4, true); assert_eq!(statistics.len(), 1); assert_eq!(statistics[0], expected_statistic_partition); + // Check the statistics_by_partition with real results + let expected_stats = vec![(1, 4, 4)]; + validate_statistics_with_data(sort_exec.clone(), expected_stats, 0).await?; - sort_exec = Arc::new(sort.with_preserve_partitioning(true)); + // Sort with preserve_partitioning + let scan_2 = create_scan_exec_with_statistics(None, Some(2)).await; + // Add sort execution plan + let sort_exec = Arc::new( + SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr { + expr: Arc::new(Column::new("id", 0)), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }]), + scan_2, + ) + .with_preserve_partitioning(true), + ); let expected_statistic_partition_1 = create_partition_statistics(2, 110, 3, 4, true); let expected_statistic_partition_2 = @@ -187,11 +277,15 @@ mod test { assert_eq!(statistics.len(), 2); assert_eq!(statistics[0], expected_statistic_partition_1); assert_eq!(statistics[1], expected_statistic_partition_2); + + // Check the statistics_by_partition with real results + let expected_stats = vec![(3, 4, 2), (1, 2, 2)]; + validate_statistics_with_data(sort_exec, expected_stats, 0).await?; Ok(()) } #[tokio::test] - async fn test_statistics_by_partition_of_filter() -> datafusion_common::Result<()> { + async fn test_statistics_by_partition_of_filter() -> Result<()> { let scan = create_scan_exec_with_statistics(None, Some(2)).await; let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); let predicate = binary( @@ -233,7 +327,7 @@ mod test { } #[tokio::test] - async fn test_statistic_by_partition_of_union() -> datafusion_common::Result<()> { + async fn test_statistic_by_partition_of_union() -> Result<()> { let scan = create_scan_exec_with_statistics(None, Some(2)).await; let union_exec = Arc::new(UnionExec::new(vec![scan.clone(), scan])); let statistics = union_exec.statistics_by_partition()?; @@ -252,12 +346,14 @@ mod test { // Verify fourth partition (from second scan - same as second partition) assert_eq!(statistics[3], expected_statistic_partition_2); + // Check the statistics_by_partition with real results + let expected_stats = vec![(3, 4, 2), (1, 2, 2), (3, 4, 2), (1, 2, 2)]; + validate_statistics_with_data(union_exec, expected_stats, 0).await?; Ok(()) } #[tokio::test] - async fn test_statistic_by_partition_of_cross_join() -> datafusion_common::Result<()> - { + async fn test_statistic_by_partition_of_cross_join() -> Result<()> { let left_scan = create_scan_exec_with_statistics(None, Some(2)).await; let right_create_table_sql = "CREATE EXTERNAL TABLE t2 (id INT NOT NULL) \ STORED AS PARQUET LOCATION './tests/data/test_statistics_per_partition'\ @@ -292,6 +388,73 @@ mod test { }); assert_eq!(statistics[0], expected_statistic_partition_1); assert_eq!(statistics[1], expected_statistic_partition_2); + + // Check the statistics_by_partition with real results + let expected_stats = vec![(1, 4, 8), (1, 4, 8)]; + validate_statistics_with_data(Arc::new(cross_join), expected_stats, 0).await?; + Ok(()) + } + + #[tokio::test] + async fn test_statistic_by_partition_of_coalesce_batches() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + let coalesce_batches = CoalesceBatchesExec::new(scan, 2); + let expected_statistic_partition_1 = + create_partition_statistics(2, 110, 3, 4, true); + let expected_statistic_partition_2 = + create_partition_statistics(2, 110, 1, 2, true); + let statistics = coalesce_batches.statistics_by_partition()?; + assert_eq!(statistics.len(), 2); + assert_eq!(statistics[0], expected_statistic_partition_1); + assert_eq!(statistics[1], expected_statistic_partition_2); + + // Check the statistics_by_partition with real results + let expected_stats = vec![(3, 4, 2), (1, 2, 2)]; + validate_statistics_with_data(Arc::new(coalesce_batches), expected_stats, 0) + .await?; + Ok(()) + } + + #[tokio::test] + async fn test_statistic_by_partition_of_coalesce_partitions() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + let coalesce_partitions = CoalescePartitionsExec::new(scan); + let expected_statistic_partition = + create_partition_statistics(4, 220, 1, 4, true); + let statistics = coalesce_partitions.statistics_by_partition()?; + assert_eq!(statistics.len(), 1); + assert_eq!(statistics[0], expected_statistic_partition); + + // Check the statistics_by_partition with real results + let expected_stats = vec![(1, 4, 4)]; + validate_statistics_with_data(Arc::new(coalesce_partitions), expected_stats, 0) + .await?; + Ok(()) + } + + #[tokio::test] + async fn test_statistic_by_partition_of_local_limit() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + let local_limit = LocalLimitExec::new(scan.clone(), 1); + let statistics = local_limit.statistics_by_partition()?; + assert_eq!(statistics.len(), 2); + let schema = scan.schema(); + let mut expected_statistic_partition = Statistics::new_unknown(&schema); + expected_statistic_partition.num_rows = Precision::Exact(1); + assert_eq!(statistics[0], expected_statistic_partition); + assert_eq!(statistics[1], expected_statistic_partition); + Ok(()) + } + + #[tokio::test] + async fn test_statistic_by_partition_of_global_limit_partitions() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + let global_limit = GlobalLimitExec::new(scan.clone(), 0, Some(2)); + let statistics = global_limit.statistics_by_partition()?; + assert_eq!(statistics.len(), 1); + let mut expected_statistic_partition = Statistics::new_unknown(&scan.schema()); + expected_statistic_partition.num_rows = Precision::Exact(2); + assert_eq!(statistics[0], expected_statistic_partition); Ok(()) } } From 5e402700dbfcdcbc6422fab26a3438020fb9f5c8 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Tue, 8 Apr 2025 16:08:24 +0800 Subject: [PATCH 20/21] rebase main and fix cross join test --- .../core/tests/physical_optimizer/partition_statistics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 38b8e86c4638c..22f1404b94f16 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -354,7 +354,7 @@ mod test { #[tokio::test] async fn test_statistic_by_partition_of_cross_join() -> Result<()> { - let left_scan = create_scan_exec_with_statistics(None, Some(2)).await; + let left_scan = create_scan_exec_with_statistics(None, Some(1)).await; let right_create_table_sql = "CREATE EXTERNAL TABLE t2 (id INT NOT NULL) \ STORED AS PARQUET LOCATION './tests/data/test_statistics_per_partition'\ WITH ORDER (id ASC);"; From 57269548d409ff4af7a803792e2f450bec242652 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Fri, 25 Apr 2025 13:58:08 +0800 Subject: [PATCH 21/21] resolve conflicts --- datafusion/core/src/datasource/listing/table.rs | 1 - datafusion/datasource/src/source.rs | 5 ++--- datafusion/datasource/src/statistics.rs | 5 +---- .../replace_with_order_preserving_variants.rs | 7 +++---- datafusion/physical-plan/src/coalesce_batches.rs | 1 + 5 files changed, 7 insertions(+), 12 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 7082589ffffba..cbff518946955 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -57,7 +57,6 @@ use datafusion_common::stats::Precision; use datafusion_datasource::compute_all_files_statistics; use datafusion_datasource::file_groups::FileGroup; use datafusion_physical_expr_common::sort_expr::LexRequirement; -use datafusion_physical_plan::statistics::add_row_stats; use futures::{future, stream, Stream, StreamExt, TryStreamExt}; use itertools::Itertools; use object_store::ObjectStore; diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 2c65fb6957ee5..deff4cb205f0a 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::statistics::PartitionedStatistics; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, }; @@ -188,9 +189,7 @@ impl ExecutionPlan for DataSourceExec { self.data_source.statistics() } - fn statistics_by_partition( - &self, - ) -> datafusion_common::Result { + fn statistics_by_partition(&self) -> Result { let mut statistics = { let mut v = Vec::with_capacity(self.properties().partitioning.partition_count()); diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index f26099a815995..192bb43c85953 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -36,9 +36,6 @@ use datafusion_common::stats::Precision; use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError, Result}; use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; -pub(crate) use datafusion_physical_plan::statistics::{ - add_row_stats, compute_summary_statistics, set_max_if_greater, set_min_if_lesser, -}; use datafusion_physical_plan::{ColumnStatistics, Statistics}; /// A normalized representation of file min/max statistics that allows for efficient sorting & comparison. @@ -479,7 +476,7 @@ pub fn compute_all_files_statistics( // Then summary statistics across all file groups let file_groups_statistics = file_groups_with_stats .iter() - .filter_map(|file_group| file_group.statistics()); + .filter_map(|file_group| file_group.statistics().as_deref()); let mut statistics = Statistics::try_merge_iter(file_groups_statistics, &table_schema)?; diff --git a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs index 65a6198040bea..3c2b66af8539d 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs @@ -34,7 +34,7 @@ use datafusion_physical_plan::execution_plan::EmissionType; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::tree_node::PlanContext; -use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; +use datafusion_physical_plan::ExecutionPlanProperties; use itertools::izip; @@ -205,9 +205,8 @@ pub fn plan_with_order_breaking_variants( // Replace `SortPreservingMergeExec` with a `CoalescePartitionsExec` // SPM may have `fetch`, so pass it to the `CoalescePartitionsExec` let child = Arc::clone(&sort_input.children[0].plan); - let coalesce = CoalescePartitionsExec::new(child) - .with_fetch(plan.fetch()) - .unwrap(); + let coalesce = + Arc::new(CoalescePartitionsExec::new(child).with_fetch(plan.fetch())); sort_input.plan = coalesce; } else { return sort_input.update_plan_from_children(); diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 3cebd53921a21..888cea6d9cdb0 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -24,6 +24,7 @@ use std::task::{Context, Poll}; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics}; +use crate::statistics::PartitionedStatistics; use crate::{ DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, };