Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::logical_expr::LogicalPlanBuilder;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, Statistics,
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
};
use datafusion::prelude::*;
use futures::stream::{StreamExt, TryStreamExt};
Expand Down Expand Up @@ -296,8 +296,4 @@ impl ExecutionPlan for BufferingExecutionPlan {
}),
)))
}

fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema))
}
}
4 changes: 0 additions & 4 deletions datafusion/core/tests/custom_sources_cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,6 @@ impl ExecutionPlan for CustomExecutionPlan {
Ok(Box::pin(TestCustomRecordBatchStream { nb_batch: 1 }))
}

fn statistics(&self) -> Result<Statistics> {
self.partition_statistics(None)
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if partition.is_some() {
return Ok(Statistics::new_unknown(&self.schema()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datafusion::logical_expr::TableProviderFilterPushDown;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
SendableRecordBatchStream, Statistics,
SendableRecordBatchStream,
};
use datafusion::prelude::*;
use datafusion::scalar::ScalarValue;
Expand Down Expand Up @@ -149,12 +149,6 @@ impl ExecutionPlan for CustomPlan {
})),
)))
}

fn statistics(&self) -> Result<Statistics> {
// here we could provide more accurate statistics
// but we want to test the filter pushdown not the CBOs
Ok(Statistics::new_unknown(&self.schema()))
}
}

#[derive(Clone, Debug)]
Expand Down
4 changes: 0 additions & 4 deletions datafusion/core/tests/custom_sources_cases/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,6 @@ impl ExecutionPlan for StatisticsValidation {
unimplemented!("This plan only serves for testing statistics")
}

fn statistics(&self) -> Result<Statistics> {
Ok(self.stats.clone())
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if partition.is_some() {
Ok(Statistics::new_unknown(&self.schema))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::union::UnionExec;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlanProperties, PlanProperties, Statistics,
displayable,
DisplayAs, DisplayFormatType, ExecutionPlanProperties, PlanProperties, displayable,
};
use insta::Settings;

Expand Down Expand Up @@ -210,10 +209,6 @@ impl ExecutionPlan for SortRequiredExec {
) -> Result<datafusion_physical_plan::SendableRecordBatchStream> {
unreachable!();
}

fn statistics(&self) -> Result<Statistics> {
self.input.partition_statistics(None)
}
}

fn parquet_exec() -> Arc<DataSourceExec> {
Expand Down
4 changes: 0 additions & 4 deletions datafusion/core/tests/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1176,10 +1176,6 @@ impl ExecutionPlan for StatisticsExec {
unimplemented!("This plan only serves for testing statistics")
}

fn statistics(&self) -> Result<Statistics> {
Ok(self.stats.clone())
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
Ok(if partition.is_some() {
Statistics::new_unknown(&self.schema)
Expand Down
8 changes: 1 addition & 7 deletions datafusion/core/tests/user_defined/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ use datafusion::{
physical_expr::EquivalenceProperties,
physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
PlanProperties, RecordBatchStream, SendableRecordBatchStream,
},
physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner},
prelude::{SessionConfig, SessionContext},
Expand Down Expand Up @@ -742,12 +742,6 @@ impl ExecutionPlan for TopKExec {
state: BTreeMap::new(),
}))
}

fn statistics(&self) -> Result<Statistics> {
// to improve the optimizability of this plan
// better statistics inference could be provided
Ok(Statistics::new_unknown(&self.schema()))
}
}

// A very specialized TopK implementation
Expand Down
10 changes: 0 additions & 10 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,6 @@ pub trait DataSource: Send + Sync + Debug {
/// across all partitions if `partition` is `None`.
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>;

/// Returns aggregate statistics across all partitions.
///
/// # Deprecated
/// Use [`Self::partition_statistics`] instead, which provides more fine-grained
/// control over statistics retrieval (per-partition or aggregate).
#[deprecated(since = "51.0.0", note = "Use partition_statistics instead")]
fn statistics(&self) -> Result<Statistics> {
self.partition_statistics(None)
}

/// Return a copy of this DataSource with a new fetch limit
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
fn fetch(&self) -> Option<usize>;
Expand Down
4 changes: 0 additions & 4 deletions datafusion/ffi/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,6 @@ pub(crate) mod tests {
) -> Result<SendableRecordBatchStream> {
unimplemented!()
}

fn statistics(&self) -> Result<datafusion::common::Statistics> {
unimplemented!()
}
}

#[test]
Expand Down
4 changes: 0 additions & 4 deletions datafusion/physical-optimizer/src/output_requirements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,6 @@ impl ExecutionPlan for OutputRequirementExec {
unreachable!();
}

fn statistics(&self) -> Result<Statistics> {
self.input.partition_statistics(None)
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
self.input.partition_statistics(partition)
}
Expand Down
8 changes: 0 additions & 8 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1403,10 +1403,6 @@ impl ExecutionPlan for AggregateExec {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Result<Statistics> {
self.partition_statistics(None)
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
let child_statistics = self.input().partition_statistics(partition)?;
self.statistics_inner(&child_statistics)
Expand Down Expand Up @@ -2487,10 +2483,6 @@ mod tests {
Ok(Box::pin(stream))
}

fn statistics(&self) -> Result<Statistics> {
self.partition_statistics(None)
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if partition.is_some() {
return Ok(Statistics::new_unknown(self.schema().as_ref()));
Expand Down
4 changes: 0 additions & 4 deletions datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,6 @@ impl ExecutionPlan for CoalesceBatchesExec {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Result<Statistics> {
self.partition_statistics(None)
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
self.input
.partition_statistics(partition)?
Expand Down
4 changes: 0 additions & 4 deletions datafusion/physical-plan/src/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,6 @@ impl ExecutionPlan for CoalescePartitionsExec {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Result<Statistics> {
self.partition_statistics(None)
}

fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
self.input
.partition_statistics(None)?
Expand Down
4 changes: 0 additions & 4 deletions datafusion/physical-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1176,10 +1176,6 @@ mod tests {
todo!()
}

fn statistics(&self) -> Result<Statistics> {
self.partition_statistics(None)
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if partition.is_some() {
return Ok(Statistics::new_unknown(self.schema().as_ref()));
Expand Down
4 changes: 0 additions & 4 deletions datafusion/physical-plan/src/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,6 @@ impl ExecutionPlan for EmptyExec {
)?))
}

fn statistics(&self) -> Result<Statistics> {
self.partition_statistics(None)
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if let Some(partition) = partition {
assert_or_internal_err!(
Expand Down
19 changes: 0 additions & 19 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,17 +472,6 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
None
}

/// Returns statistics for this `ExecutionPlan` node. If statistics are not
/// available, should return [`Statistics::new_unknown`] (the default), not
/// an error.
///
/// For TableScan executors, which supports filter pushdown, special attention
/// needs to be paid to whether the stats returned by this method are exact or not
#[deprecated(since = "48.0.0", note = "Use `partition_statistics` method instead")]
fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}

/// Returns statistics for a specific partition of this `ExecutionPlan` node.
/// If statistics are not available, should return [`Statistics::new_unknown`]
/// (the default), not an error.
Expand Down Expand Up @@ -1508,10 +1497,6 @@ mod tests {
unimplemented!()
}

fn statistics(&self) -> Result<Statistics> {
unimplemented!()
}

fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
unimplemented!()
}
Expand Down Expand Up @@ -1575,10 +1560,6 @@ mod tests {
unimplemented!()
}

fn statistics(&self) -> Result<Statistics> {
unimplemented!()
}

fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
unimplemented!()
}
Expand Down
4 changes: 0 additions & 4 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,10 +542,6 @@ impl ExecutionPlan for FilterExec {

/// The output statistics of a filtering operation can be estimated if the
/// predicate's selectivity value can be determined for the incoming data.
fn statistics(&self) -> Result<Statistics> {
self.partition_statistics(None)
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
let input_stats = self.input.partition_statistics(partition)?;
let stats = Self::statistics_helper(
Expand Down
4 changes: 0 additions & 4 deletions datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,6 @@ impl ExecutionPlan for CrossJoinExec {
}
}

fn statistics(&self) -> Result<Statistics> {
self.partition_statistics(None)
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
// Get the all partitions statistics of the left
let left_stats = self.left.partition_statistics(None)?;
Expand Down
4 changes: 0 additions & 4 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1336,10 +1336,6 @@ impl ExecutionPlan for HashJoinExec {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Result<Statistics> {
self.partition_statistics(None)
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if partition.is_some() {
return Ok(Statistics::new_unknown(&self.schema()));
Expand Down
4 changes: 0 additions & 4 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,10 +611,6 @@ impl ExecutionPlan for NestedLoopJoinExec {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Result<Statistics> {
self.partition_statistics(None)
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
// NestedLoopJoinExec is designed for joins without equijoin keys in the
// ON clause (e.g., `t1 JOIN t2 ON (t1.v1 + t2.v1) % 2 = 0`). Any join
Expand Down
4 changes: 0 additions & 4 deletions datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,10 +519,6 @@ impl ExecutionPlan for SortMergeJoinExec {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Result<Statistics> {
self.partition_statistics(None)
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
// SortMergeJoinExec uses symmetric hash partitioning where both left and right
// inputs are hash-partitioned on the join keys. This means partition `i` of the
Expand Down
7 changes: 1 addition & 6 deletions datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use crate::projection::{
};
use crate::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
PlanProperties, RecordBatchStream, SendableRecordBatchStream,
joins::StreamJoinPartitionMode,
metrics::{ExecutionPlanMetricsSet, MetricsSet},
};
Expand Down Expand Up @@ -470,11 +470,6 @@ impl ExecutionPlan for SymmetricHashJoinExec {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Result<Statistics> {
// TODO stats: it is not possible in general to know the output size of joins
Ok(Statistics::new_unknown(&self.schema()))
}

fn execute(
&self,
partition: usize,
Expand Down
8 changes: 0 additions & 8 deletions datafusion/physical-plan/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,6 @@ impl ExecutionPlan for GlobalLimitExec {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Result<Statistics> {
self.partition_statistics(None)
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
self.input
.partition_statistics(partition)?
Expand Down Expand Up @@ -369,10 +365,6 @@ impl ExecutionPlan for LocalLimitExec {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Result<Statistics> {
self.partition_statistics(None)
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
self.input
.partition_statistics(partition)?
Expand Down
6 changes: 1 addition & 5 deletions datafusion/physical-plan/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::execution_plan::{Boundedness, EmissionType, SchedulingType};
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use crate::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
RecordBatchStream, SendableRecordBatchStream, Statistics,
RecordBatchStream, SendableRecordBatchStream,
};

use arrow::array::RecordBatch;
Expand Down Expand Up @@ -352,10 +352,6 @@ impl ExecutionPlan for LazyMemoryExec {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema))
}

fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
let generators = self
.generators()
Expand Down
4 changes: 0 additions & 4 deletions datafusion/physical-plan/src/placeholder_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,6 @@ impl ExecutionPlan for PlaceholderRowExec {
Ok(Box::pin(cooperative(ms)))
}

fn statistics(&self) -> Result<Statistics> {
self.partition_statistics(None)
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
let batches = self
.data()
Expand Down
Loading