diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs index c426d9611c608..a24573c860bb4 100644 --- a/datafusion-examples/examples/custom_datasource.rs +++ b/datafusion-examples/examples/custom_datasource.rs @@ -27,13 +27,14 @@ use datafusion::execution::context::{SessionState, TaskContext}; use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::memory::MemoryStream; use datafusion::physical_plan::{ - project_schema, ExecutionPlan, SendableRecordBatchStream, Statistics, + project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, + SendableRecordBatchStream, Statistics, }; use datafusion::prelude::*; use datafusion_expr::{Expr, LogicalPlanBuilder}; use std::any::Any; use std::collections::{BTreeMap, HashMap}; -use std::fmt::{Debug, Formatter}; +use std::fmt::{self, Debug, Formatter}; use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::time::timeout; @@ -204,6 +205,12 @@ impl CustomExec { } } +impl DisplayAs for CustomExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result { + write!(f, "CustomExec") + } +} + impl ExecutionPlan for CustomExec { fn as_any(&self) -> &dyn Any { self diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 0003ffea8c675..b497ef7f2b59e 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -68,6 +68,17 @@ impl ArrowExec { } } +impl DisplayAs for ArrowExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "ArrowExec: ")?; + self.base_config.fmt_as(t, f) + } +} + impl ExecutionPlan for ArrowExec { fn as_any(&self) -> &dyn Any { self @@ -132,15 +143,6 @@ impl ExecutionPlan for ArrowExec { Some(self.metrics.clone_inner()) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - write!(f, "ArrowExec: ")?; - self.base_config.fmt_as(t, f) - } - fn statistics(&self) -> Statistics { self.projected_statistics.clone() } diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index df82ccca29201..ec2f4db263892 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -65,6 +65,17 @@ impl AvroExec { } } +impl DisplayAs for AvroExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "AvroExec: ")?; + self.base_config.fmt_as(t, f) + } +} + impl ExecutionPlan for AvroExec { fn as_any(&self) -> &dyn Any { self @@ -141,15 +152,6 @@ impl ExecutionPlan for AvroExec { Ok(Box::pin(stream)) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - write!(f, "AvroExec: ")?; - self.base_config.fmt_as(t, f) - } - fn statistics(&self) -> Statistics { self.projected_statistics.clone() } diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 027bd1945be60..c5485b0969e6a 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -99,6 +99,18 @@ impl CsvExec { } } +impl DisplayAs for CsvExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "CsvExec: ")?; + self.base_config.fmt_as(t, f)?; + write!(f, ", has_header={}", self.has_header) + } +} + impl ExecutionPlan for CsvExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -172,16 +184,6 @@ impl ExecutionPlan for CsvExec { Ok(Box::pin(stream) as SendableRecordBatchStream) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - write!(f, "CsvExec: ")?; - self.base_config.fmt_as(t, f)?; - write!(f, ", has_header={}", self.has_header) - } - fn statistics(&self) -> Statistics { self.projected_statistics.clone() } diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index b736fd7839993..b33a3f542c280 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -85,6 +85,17 @@ impl NdJsonExec { } } +impl DisplayAs for NdJsonExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "JsonExec: ")?; + self.base_config.fmt_as(t, f) + } +} + impl ExecutionPlan for NdJsonExec { fn as_any(&self) -> &dyn Any { self @@ -150,15 +161,6 @@ impl ExecutionPlan for NdJsonExec { Ok(Box::pin(stream) as SendableRecordBatchStream) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - write!(f, "JsonExec: ")?; - self.base_config.fmt_as(t, f) - } - fn statistics(&self) -> Statistics { self.projected_statistics.clone() } diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index f538255bc20d0..233a8bf1fc2ec 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -325,6 +325,34 @@ impl ParquetExec { } } +impl DisplayAs for ParquetExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let predicate_string = self + .predicate + .as_ref() + .map(|p| format!(", predicate={p}")) + .unwrap_or_default(); + + let pruning_predicate_string = self + .pruning_predicate + .as_ref() + .map(|pre| format!(", pruning_predicate={}", pre.predicate_expr())) + .unwrap_or_default(); + + write!(f, "ParquetExec: ")?; + self.base_config.fmt_as(t, f)?; + write!(f, "{}{}", predicate_string, pruning_predicate_string,) + } + } + } +} + impl ExecutionPlan for ParquetExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -413,32 +441,6 @@ impl ExecutionPlan for ParquetExec { Ok(Box::pin(stream)) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - let predicate_string = self - .predicate - .as_ref() - .map(|p| format!(", predicate={p}")) - .unwrap_or_default(); - - let pruning_predicate_string = self - .pruning_predicate - .as_ref() - .map(|pre| format!(", pruning_predicate={}", pre.predicate_expr())) - .unwrap_or_default(); - - write!(f, "ParquetExec: ")?; - self.base_config.fmt_as(t, f)?; - write!(f, "{}{}", predicate_string, pruning_predicate_string,) - } - } - } - fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 33ebcec6a6def..f94191258634f 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -338,7 +338,7 @@ mod tests { use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::union::UnionExec; - use crate::physical_plan::{displayable, DisplayFormatType, Statistics}; + use crate::physical_plan::{displayable, DisplayAs, DisplayFormatType, Statistics}; use datafusion_physical_expr::PhysicalSortRequirement; fn schema() -> SchemaRef { @@ -1136,6 +1136,16 @@ mod tests { } } + impl DisplayAs for SortRequiredExec { + fn fmt_as( + &self, + _t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "SortRequiredExec") + } + } + impl ExecutionPlan for SortRequiredExec { fn as_any(&self) -> &dyn std::any::Any { self @@ -1184,13 +1194,5 @@ mod tests { fn statistics(&self) -> Statistics { self.input.statistics() } - - fn fmt_as( - &self, - _t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - write!(f, "SortRequiredExec") - } } } diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 343f7628b71e6..4bf5f664450e4 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -58,6 +58,8 @@ use datafusion_physical_expr::utils::{ get_finer_ordering, ordering_satisfy_requirement_concrete, }; +use super::DisplayAs; + /// Hash aggregate modes #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum AggregateMode { @@ -719,6 +721,80 @@ impl AggregateExec { } } +impl DisplayAs for AggregateExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "AggregateExec: mode={:?}", self.mode)?; + let g: Vec = if self.group_by.groups.len() == 1 { + self.group_by + .expr + .iter() + .map(|(e, alias)| { + let e = e.to_string(); + if &e != alias { + format!("{e} as {alias}") + } else { + e + } + }) + .collect() + } else { + self.group_by + .groups + .iter() + .map(|group| { + let terms = group + .iter() + .enumerate() + .map(|(idx, is_null)| { + if *is_null { + let (e, alias) = &self.group_by.null_expr[idx]; + let e = e.to_string(); + if &e != alias { + format!("{e} as {alias}") + } else { + e + } + } else { + let (e, alias) = &self.group_by.expr[idx]; + let e = e.to_string(); + if &e != alias { + format!("{e} as {alias}") + } else { + e + } + } + }) + .collect::>() + .join(", "); + format!("({terms})") + }) + .collect() + }; + + write!(f, ", gby=[{}]", g.join(", "))?; + + let a: Vec = self + .aggr_expr + .iter() + .map(|agg| agg.name().to_string()) + .collect(); + write!(f, ", aggr=[{}]", a.join(", "))?; + + if let Some(aggregation_ordering) = &self.aggregation_ordering { + write!(f, ", ordering_mode={:?}", aggregation_ordering.mode)?; + } + } + } + Ok(()) + } +} + impl ExecutionPlan for AggregateExec { /// Return a reference to Any that can be used for down-casting fn as_any(&self) -> &dyn Any { @@ -838,78 +914,6 @@ impl ExecutionPlan for AggregateExec { Some(self.metrics.clone_inner()) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "AggregateExec: mode={:?}", self.mode)?; - let g: Vec = if self.group_by.groups.len() == 1 { - self.group_by - .expr - .iter() - .map(|(e, alias)| { - let e = e.to_string(); - if &e != alias { - format!("{e} as {alias}") - } else { - e - } - }) - .collect() - } else { - self.group_by - .groups - .iter() - .map(|group| { - let terms = group - .iter() - .enumerate() - .map(|(idx, is_null)| { - if *is_null { - let (e, alias) = &self.group_by.null_expr[idx]; - let e = e.to_string(); - if &e != alias { - format!("{e} as {alias}") - } else { - e - } - } else { - let (e, alias) = &self.group_by.expr[idx]; - let e = e.to_string(); - if &e != alias { - format!("{e} as {alias}") - } else { - e - } - } - }) - .collect::>() - .join(", "); - format!("({terms})") - }) - .collect() - }; - - write!(f, ", gby=[{}]", g.join(", "))?; - - let a: Vec = self - .aggr_expr - .iter() - .map(|agg| agg.name().to_string()) - .collect(); - write!(f, ", aggr=[{}]", a.join(", "))?; - - if let Some(aggregation_ordering) = &self.aggregation_ordering { - write!(f, ", ordering_mode={:?}", aggregation_ordering.mode)?; - } - } - } - Ok(()) - } - fn statistics(&self) -> Statistics { // TODO stats: group expressions: // - once expressions will be able to compute their own stats, use it here @@ -1245,8 +1249,8 @@ mod tests { }; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::{ - ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, - Statistics, + DisplayAs, ExecutionPlan, Partitioning, RecordBatchStream, + SendableRecordBatchStream, Statistics, }; use crate::prelude::SessionContext; @@ -1579,6 +1583,20 @@ mod tests { pub yield_first: bool, } + impl DisplayAs for TestYieldingExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "TestYieldingExec") + } + } + } + } + impl ExecutionPlan for TestYieldingExec { fn as_any(&self) -> &dyn Any { self diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs index 90b7f4c02faaa..bba1c37e6d61e 100644 --- a/datafusion/core/src/physical_plan/analyze.rs +++ b/datafusion/core/src/physical_plan/analyze.rs @@ -30,7 +30,7 @@ use futures::StreamExt; use super::expressions::PhysicalSortExpr; use super::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter}; -use super::{Distribution, SendableRecordBatchStream}; +use super::{DisplayAs, Distribution, SendableRecordBatchStream}; use datafusion_execution::TaskContext; /// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input, @@ -56,6 +56,20 @@ impl AnalyzeExec { } } +impl DisplayAs for AnalyzeExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "AnalyzeExec verbose={}", self.verbose) + } + } + } +} + impl ExecutionPlan for AnalyzeExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -158,18 +172,6 @@ impl ExecutionPlan for AnalyzeExec { ))) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "AnalyzeExec verbose={}", self.verbose) - } - } - } - fn statistics(&self) -> Statistics { // Statistics an an ANALYZE plan are not relevant Statistics::default() diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs index 0064f353e9d1b..9454a36af775c 100644 --- a/datafusion/core/src/physical_plan/coalesce_batches.rs +++ b/datafusion/core/src/physical_plan/coalesce_batches.rs @@ -38,6 +38,7 @@ use log::trace; use super::expressions::PhysicalSortExpr; use super::metrics::{BaselineMetrics, MetricsSet}; +use super::DisplayAs; use super::{metrics::ExecutionPlanMetricsSet, Statistics}; /// CoalesceBatchesExec combines small batches into larger batches for more efficient use of @@ -73,6 +74,24 @@ impl CoalesceBatchesExec { } } +impl DisplayAs for CoalesceBatchesExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "CoalesceBatchesExec: target_batch_size={}", + self.target_batch_size + ) + } + } + } +} + impl ExecutionPlan for CoalesceBatchesExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -141,22 +160,6 @@ impl ExecutionPlan for CoalesceBatchesExec { })) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "CoalesceBatchesExec: target_batch_size={}", - self.target_batch_size - ) - } - } - } - fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs index d04ffa576a734..82f74a62bbc5f 100644 --- a/datafusion/core/src/physical_plan/coalesce_partitions.rs +++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs @@ -26,7 +26,7 @@ use arrow::datatypes::SchemaRef; use super::expressions::PhysicalSortExpr; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::stream::{ObservedStream, RecordBatchReceiverStream}; -use super::Statistics; +use super::{DisplayAs, Statistics}; use crate::physical_plan::{ DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning, }; @@ -60,6 +60,20 @@ impl CoalescePartitionsExec { } } +impl DisplayAs for CoalescePartitionsExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "CoalescePartitionsExec") + } + } + } +} + impl ExecutionPlan for CoalescePartitionsExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -146,18 +160,6 @@ impl ExecutionPlan for CoalescePartitionsExec { } } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "CoalescePartitionsExec") - } - } - } - fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs index 4d1b1cffc078b..17bfa0af3a872 100644 --- a/datafusion/core/src/physical_plan/empty.rs +++ b/datafusion/core/src/physical_plan/empty.rs @@ -30,7 +30,7 @@ use datafusion_common::{DataFusionError, Result}; use log::trace; use super::expressions::PhysicalSortExpr; -use super::{common, SendableRecordBatchStream, Statistics}; +use super::{common, DisplayAs, SendableRecordBatchStream, Statistics}; use datafusion_execution::TaskContext; @@ -94,6 +94,20 @@ impl EmptyExec { } } +impl DisplayAs for EmptyExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "EmptyExec: produce_one_row={}", self.produce_one_row) + } + } + } +} + impl ExecutionPlan for EmptyExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -148,18 +162,6 @@ impl ExecutionPlan for EmptyExec { )?)) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "EmptyExec: produce_one_row={}", self.produce_one_row) - } - } - } - fn statistics(&self) -> Statistics { let batch = self .data() diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs index 22b0817c33f85..d732e70f8a52a 100644 --- a/datafusion/core/src/physical_plan/explain.rs +++ b/datafusion/core/src/physical_plan/explain.rs @@ -28,6 +28,7 @@ use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, Stati use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; use log::trace; +use super::DisplayAs; use super::{expressions::PhysicalSortExpr, SendableRecordBatchStream}; use crate::physical_plan::stream::RecordBatchStreamAdapter; use datafusion_execution::TaskContext; @@ -70,6 +71,20 @@ impl ExplainExec { } } +impl DisplayAs for ExplainExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "ExplainExec") + } + } + } +} + impl ExecutionPlan for ExplainExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -156,18 +171,6 @@ impl ExecutionPlan for ExplainExec { ))) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "ExplainExec") - } - } - } - fn statistics(&self) -> Statistics { // Statistics an EXPLAIN plan are not relevant Statistics::default() diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs index d56f5820289f5..d2cfc3ce8ac93 100644 --- a/datafusion/core/src/physical_plan/filter.rs +++ b/datafusion/core/src/physical_plan/filter.rs @@ -24,7 +24,9 @@ use std::sync::Arc; use std::task::{Context, Poll}; use super::expressions::PhysicalSortExpr; -use super::{ColumnStatistics, RecordBatchStream, SendableRecordBatchStream, Statistics}; +use super::{ + ColumnStatistics, DisplayAs, RecordBatchStream, SendableRecordBatchStream, Statistics, +}; use crate::physical_plan::{ metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, Column, DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning, @@ -85,6 +87,20 @@ impl FilterExec { } } +impl DisplayAs for FilterExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "FilterExec: {}", self.predicate) + } + } + } +} + impl ExecutionPlan for FilterExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -157,18 +173,6 @@ impl ExecutionPlan for FilterExec { })) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "FilterExec: {}", self.predicate) - } - } - } - fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } diff --git a/datafusion/core/src/physical_plan/insert.rs b/datafusion/core/src/physical_plan/insert.rs index 8bcb9bab83669..8766b62e9a9e2 100644 --- a/datafusion/core/src/physical_plan/insert.rs +++ b/datafusion/core/src/physical_plan/insert.rs @@ -138,6 +138,21 @@ impl InsertExec { } } +impl DisplayAs for InsertExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "InsertExec: sink=")?; + self.sink.fmt_as(t, f) + } + } + } +} + impl ExecutionPlan for InsertExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -232,19 +247,6 @@ impl ExecutionPlan for InsertExec { ))) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "InsertExec: sink=")?; - self.sink.fmt_as(t, f) - } - } - } - fn statistics(&self) -> Statistics { Statistics::default() } diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs index f752d134a1f14..1ecbfbea95ca9 100644 --- a/datafusion/core/src/physical_plan/joins/cross_join.rs +++ b/datafusion/core/src/physical_plan/joins/cross_join.rs @@ -26,6 +26,7 @@ use arrow::datatypes::{Fields, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use crate::physical_plan::DisplayAs; use crate::physical_plan::{ coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec, ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties, @@ -139,6 +140,20 @@ async fn load_left_input( Ok((merged_batch, reservation)) } +impl DisplayAs for CrossJoinExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "CrossJoinExec") + } + } + } +} + impl ExecutionPlan for CrossJoinExec { fn as_any(&self) -> &dyn Any { self @@ -243,18 +258,6 @@ impl ExecutionPlan for CrossJoinExec { })) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "CrossJoinExec") - } - } - } - fn statistics(&self) -> Statistics { stats_cartesian_product( self.left.statistics(), diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index a3c553c9b3789..d258e7529a8eb 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -60,6 +60,7 @@ use crate::physical_plan::joins::utils::{ adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_from_indices, get_final_indices_from_bit_map, need_produce_result_in_final, JoinSide, }; +use crate::physical_plan::DisplayAs; use crate::physical_plan::{ coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec, @@ -204,6 +205,30 @@ impl HashJoinExec { } } +impl DisplayAs for HashJoinExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let display_filter = self.filter.as_ref().map_or_else( + || "".to_string(), + |f| format!(", filter={}", f.expression()), + ); + let on = self + .on + .iter() + .map(|(c1, c2)| format!("({}, {})", c1, c2)) + .collect::>() + .join(", "); + write!( + f, + "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}", + self.mode, self.join_type, on, display_filter + ) + } + } + } +} + impl ExecutionPlan for HashJoinExec { fn as_any(&self) -> &dyn Any { self @@ -420,28 +445,6 @@ impl ExecutionPlan for HashJoinExec { })) } - fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - let display_filter = self.filter.as_ref().map_or_else( - || "".to_string(), - |f| format!(", filter={}", f.expression()), - ); - let on = self - .on - .iter() - .map(|(c1, c2)| format!("({}, {})", c1, c2)) - .collect::>() - .join(", "); - write!( - f, - "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}", - self.mode, self.join_type, on, display_filter - ) - } - } - } - fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } diff --git a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs index 6586456fd276e..d5de88d9339e7 100644 --- a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs +++ b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs @@ -29,8 +29,8 @@ use crate::physical_plan::joins::utils::{ }; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::physical_plan::{ - DisplayFormatType, Distribution, ExecutionPlan, Partitioning, RecordBatchStream, - SendableRecordBatchStream, + DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + RecordBatchStream, SendableRecordBatchStream, }; use arrow::array::{ BooleanBufferBuilder, UInt32Array, UInt32Builder, UInt64Array, UInt64Builder, @@ -120,6 +120,24 @@ impl NestedLoopJoinExec { } } +impl DisplayAs for NestedLoopJoinExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let display_filter = self.filter.as_ref().map_or_else( + || "".to_string(), + |f| format!(", filter={}", f.expression()), + ); + write!( + f, + "NestedLoopJoinExec: join_type={:?}{}", + self.join_type, display_filter + ) + } + } + } +} + impl ExecutionPlan for NestedLoopJoinExec { fn as_any(&self) -> &dyn Any { self @@ -249,22 +267,6 @@ impl ExecutionPlan for NestedLoopJoinExec { })) } - fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - let display_filter = self.filter.as_ref().map_or_else( - || "".to_string(), - |f| format!(", filter={}", f.expression()), - ); - write!( - f, - "NestedLoopJoinExec: join_type={:?}{}", - self.join_type, display_filter - ) - } - } - } - fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs index bc8c686670ea9..2a628a6146a76 100644 --- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs @@ -46,8 +46,9 @@ use crate::physical_plan::joins::utils::{ }; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use crate::physical_plan::{ - metrics, DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, - Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics, + metrics, DisplayAs, DisplayFormatType, Distribution, EquivalenceProperties, + ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream, + SendableRecordBatchStream, Statistics, }; use datafusion_common::DataFusionError; use datafusion_common::JoinType; @@ -200,6 +201,26 @@ impl SortMergeJoinExec { } } +impl DisplayAs for SortMergeJoinExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let on = self + .on + .iter() + .map(|(c1, c2)| format!("({}, {})", c1, c2)) + .collect::>() + .join(", "); + write!( + f, + "SortMergeJoin: join_type={:?}, on=[{}]", + self.join_type, on + ) + } + } + } +} + impl ExecutionPlan for SortMergeJoinExec { fn as_any(&self) -> &dyn Any { self @@ -361,24 +382,6 @@ impl ExecutionPlan for SortMergeJoinExec { Some(self.metrics.clone_inner()) } - fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - let on = self - .on - .iter() - .map(|(c1, c2)| format!("({}, {})", c1, c2)) - .collect::>() - .join(", "); - write!( - f, - "SortMergeJoin: join_type={:?}, on=[{}]", - self.join_type, on - ) - } - } - } - fn statistics(&self) -> Statistics { // TODO stats: it is not possible in general to know the output size of joins // There are some special cases though, for example: diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index b46aba2fb5bc5..dfd38a20c087f 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -57,6 +57,7 @@ use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval, IntervalB use crate::physical_plan::common::SharedMemoryReservation; use crate::physical_plan::joins::hash_join_utils::convert_sort_expr_with_filter_schema; +use crate::physical_plan::DisplayAs; use crate::physical_plan::{ expressions::Column, expressions::PhysicalSortExpr, @@ -385,6 +386,30 @@ impl SymmetricHashJoinExec { } } +impl DisplayAs for SymmetricHashJoinExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let display_filter = self.filter.as_ref().map_or_else( + || "".to_string(), + |f| format!(", filter={}", f.expression()), + ); + let on = self + .on + .iter() + .map(|(c1, c2)| format!("({}, {})", c1, c2)) + .collect::>() + .join(", "); + write!( + f, + "SymmetricHashJoinExec: join_type={:?}, on=[{}]{}", + self.join_type, on, display_filter + ) + } + } + } +} + impl ExecutionPlan for SymmetricHashJoinExec { fn as_any(&self) -> &dyn Any { self @@ -460,28 +485,6 @@ impl ExecutionPlan for SymmetricHashJoinExec { )?)) } - fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - let display_filter = self.filter.as_ref().map_or_else( - || "".to_string(), - |f| format!(", filter={}", f.expression()), - ); - let on = self - .on - .iter() - .map(|(c1, c2)| format!("({}, {})", c1, c2)) - .collect::>() - .join(", "); - write!( - f, - "SymmetricHashJoinExec: join_type={:?}, on=[{}]{}", - self.join_type, on, display_filter - ) - } - } - } - fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs index d1948cc288ce7..572cea006cfc4 100644 --- a/datafusion/core/src/physical_plan/limit.rs +++ b/datafusion/core/src/physical_plan/limit.rs @@ -34,6 +34,7 @@ use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use datafusion_common::{DataFusionError, Result}; use super::expressions::PhysicalSortExpr; +use super::DisplayAs; use super::{ metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -82,6 +83,25 @@ impl GlobalLimitExec { } } +impl DisplayAs for GlobalLimitExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "GlobalLimitExec: skip={}, fetch={}", + self.skip, + self.fetch.map_or("None".to_string(), |x| x.to_string()) + ) + } + } + } +} + impl ExecutionPlan for GlobalLimitExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -164,23 +184,6 @@ impl ExecutionPlan for GlobalLimitExec { ))) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "GlobalLimitExec: skip={}, fetch={}", - self.skip, - self.fetch.map_or("None".to_string(), |x| x.to_string()) - ) - } - } - } - fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } @@ -265,6 +268,20 @@ impl LocalLimitExec { } } +impl DisplayAs for LocalLimitExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "LocalLimitExec: fetch={}", self.fetch) + } + } + } +} + impl ExecutionPlan for LocalLimitExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -331,18 +348,6 @@ impl ExecutionPlan for LocalLimitExec { ))) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "LocalLimitExec: fetch={}", self.fetch) - } - } - } - fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } diff --git a/datafusion/core/src/physical_plan/memory.rs b/datafusion/core/src/physical_plan/memory.rs index fa456d15eebfe..16c22a3b2defb 100644 --- a/datafusion/core/src/physical_plan/memory.rs +++ b/datafusion/core/src/physical_plan/memory.rs @@ -19,7 +19,7 @@ use super::expressions::PhysicalSortExpr; use super::{ - common, project_schema, DisplayFormatType, ExecutionPlan, Partitioning, + common, project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, }; use arrow::datatypes::SchemaRef; @@ -56,6 +56,27 @@ impl fmt::Debug for MemoryExec { } } +impl DisplayAs for MemoryExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let partitions: Vec<_> = + self.partitions.iter().map(|b| b.len()).collect(); + write!( + f, + "MemoryExec: partitions={}, partition_sizes={:?}", + partitions.len(), + partitions + ) + } + } + } +} + impl ExecutionPlan for MemoryExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -102,25 +123,6 @@ impl ExecutionPlan for MemoryExec { )?)) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - let partitions: Vec<_> = - self.partitions.iter().map(|b| b.len()).collect(); - write!( - f, - "MemoryExec: partitions={}, partition_sizes={:?}", - partitions.len(), - partitions - ) - } - } - } - /// We recompute the statistics dynamically from the arrow metadata as it is pretty cheap to do so fn statistics(&self) -> Statistics { common::compute_record_batch_statistics( diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 5abecf6b167c2..e8db10e0a89e5 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -98,7 +98,7 @@ impl Stream for EmptyRecordBatchStream { /// [`ExecutionPlan`] can be displayed in a simplified form using the /// return value from [`displayable`] in addition to the (normally /// quite verbose) `Debug` output. -pub trait ExecutionPlan: Debug + Send + Sync { +pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// Returns the execution plan as [`Any`](std::any::Any) so that it can be /// downcast to a specific implementation. fn as_any(&self) -> &dyn Any; @@ -224,16 +224,6 @@ pub trait ExecutionPlan: Debug + Send + Sync { None } - /// Format this `ExecutionPlan` to `f` in the specified type. - /// - /// Should not include a newline - /// - /// Note this function prints a placeholder by default to preserve - /// backwards compatibility. - fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "ExecutionPlan(PlaceHolder)") - } - /// Returns the global output statistics for this `ExecutionPlan` node. fn statistics(&self) -> Statistics; } diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs index d1e1a4f9db1e3..c6845c7afe681 100644 --- a/datafusion/core/src/physical_plan/projection.rs +++ b/datafusion/core/src/physical_plan/projection.rs @@ -39,7 +39,7 @@ use log::trace; use super::expressions::{Column, PhysicalSortExpr}; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; -use super::{RecordBatchStream, SendableRecordBatchStream, Statistics}; +use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream, Statistics}; use datafusion_physical_expr::{ normalize_out_expr_with_columns_map, project_equivalence_properties, @@ -156,6 +156,33 @@ impl ProjectionExec { } } +impl DisplayAs for ProjectionExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let expr: Vec = self + .expr + .iter() + .map(|(e, alias)| { + let e = e.to_string(); + if &e != alias { + format!("{e} as {alias}") + } else { + e + } + }) + .collect(); + + write!(f, "ProjectionExec: expr=[{}]", expr.join(", ")) + } + } + } +} + impl ExecutionPlan for ProjectionExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -260,31 +287,6 @@ impl ExecutionPlan for ProjectionExec { })) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - let expr: Vec = self - .expr - .iter() - .map(|(e, alias)| { - let e = e.to_string(); - if &e != alias { - format!("{e} as {alias}") - } else { - e - } - }) - .collect(); - - write!(f, "ProjectionExec: expr=[{}]", expr.join(", ")) - } - } - } - fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs index 85225eb471760..d92fd51cca7d9 100644 --- a/datafusion/core/src/physical_plan/repartition/mod.rs +++ b/datafusion/core/src/physical_plan/repartition/mod.rs @@ -43,7 +43,7 @@ use self::distributor_channels::{DistributionReceiver, DistributionSender}; use super::common::{AbortOnDropMany, AbortOnDropSingle, SharedMemoryReservation}; use super::expressions::PhysicalSortExpr; use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; -use super::{RecordBatchStream, SendableRecordBatchStream}; +use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream}; use crate::physical_plan::common::transpose; use crate::physical_plan::metrics::BaselineMetrics; @@ -320,6 +320,26 @@ impl RepartitionExec { } } +impl DisplayAs for RepartitionExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "{}: partitioning={}, input_partitions={}", + self.name(), + self.partitioning, + self.input.output_partitioning().partition_count() + ) + } + } + } +} + impl ExecutionPlan for RepartitionExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -515,24 +535,6 @@ impl ExecutionPlan for RepartitionExec { Some(self.metrics.clone_inner()) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "{}: partitioning={}, input_partitions={}", - self.name(), - self.partitioning, - self.input.output_partitioning().partition_count() - ) - } - } - } - fn statistics(&self) -> Statistics { self.input.statistics() } diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 205ec706b5dcd..f660f0acf89af 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -27,8 +27,8 @@ use crate::physical_plan::metrics::{ use crate::physical_plan::sorts::merge::streaming_merge; use crate::physical_plan::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter}; use crate::physical_plan::{ - DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, Partitioning, - SendableRecordBatchStream, Statistics, + DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, + Partitioning, SendableRecordBatchStream, Statistics, }; pub use arrow::compute::SortOptions; use arrow::compute::{concat_batches, lexsort_to_indices, take}; @@ -513,6 +513,26 @@ impl SortExec { } } +impl DisplayAs for SortExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let expr: Vec = self.expr.iter().map(|e| e.to_string()).collect(); + match self.fetch { + Some(fetch) => { + write!(f, "SortExec: fetch={fetch}, expr=[{}]", expr.join(",")) + } + None => write!(f, "SortExec: expr=[{}]", expr.join(",")), + } + } + } + } +} + impl ExecutionPlan for SortExec { fn as_any(&self) -> &dyn Any { self @@ -619,24 +639,6 @@ impl ExecutionPlan for SortExec { Some(self.metrics_set.clone_inner()) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - let expr: Vec = self.expr.iter().map(|e| e.to_string()).collect(); - match self.fetch { - Some(fetch) => { - write!(f, "SortExec: fetch={fetch}, expr=[{}]", expr.join(",")) - } - None => write!(f, "SortExec: expr=[{}]", expr.join(",")), - } - } - } - } - fn statistics(&self) -> Statistics { self.input.statistics() } diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 397d254162c72..8262a18f2331c 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -28,6 +28,7 @@ use crate::physical_plan::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, }; use crate::physical_plan::sorts::streaming_merge; +use crate::physical_plan::DisplayAs; use crate::physical_plan::{ expressions::PhysicalSortExpr, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, @@ -107,6 +108,26 @@ impl SortPreservingMergeExec { } } +impl DisplayAs for SortPreservingMergeExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let expr: Vec = self.expr.iter().map(|e| e.to_string()).collect(); + write!(f, "SortPreservingMergeExec: [{}]", expr.join(","))?; + if let Some(fetch) = self.fetch { + write!(f, ", fetch={fetch}")?; + }; + + Ok(()) + } + } + } +} + impl ExecutionPlan for SortPreservingMergeExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -215,24 +236,6 @@ impl ExecutionPlan for SortPreservingMergeExec { } } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - let expr: Vec = self.expr.iter().map(|e| e.to_string()).collect(); - write!(f, "SortPreservingMergeExec: [{}]", expr.join(","))?; - if let Some(fetch) = self.fetch { - write!(f, ", fetch={fetch}")?; - }; - - Ok(()) - } - } - } - fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } diff --git a/datafusion/core/src/physical_plan/streaming.rs b/datafusion/core/src/physical_plan/streaming.rs index 48f1409e147fe..887592f2956f4 100644 --- a/datafusion/core/src/physical_plan/streaming.rs +++ b/datafusion/core/src/physical_plan/streaming.rs @@ -32,6 +32,8 @@ use crate::physical_plan::stream::RecordBatchStreamAdapter; use crate::physical_plan::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; use datafusion_execution::TaskContext; +use super::{DisplayAs, DisplayFormatType}; + /// A partition that can be converted into a [`SendableRecordBatchStream`] pub trait PartitionStream: Send + Sync { /// Returns the schema of this partition @@ -90,6 +92,20 @@ impl std::fmt::Debug for StreamingTableExec { } } +impl DisplayAs for StreamingTableExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "StreamingTableExec") + } + } + } +} + #[async_trait] impl ExecutionPlan for StreamingTableExec { fn as_any(&self) -> &dyn Any { diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index ffd3f06ee8bcf..e1f8072085f3c 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -34,6 +34,7 @@ use futures::Stream; use itertools::Itertools; use log::{debug, trace, warn}; +use super::DisplayAs; use super::{ expressions::PhysicalSortExpr, metrics::{ExecutionPlanMetricsSet, MetricsSet}, @@ -144,6 +145,20 @@ impl UnionExec { } } +impl DisplayAs for UnionExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "UnionExec") + } + } + } +} + impl ExecutionPlan for UnionExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -247,18 +262,6 @@ impl ExecutionPlan for UnionExec { ))) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "UnionExec") - } - } - } - fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } @@ -342,6 +345,20 @@ impl InterleaveExec { } } +impl DisplayAs for InterleaveExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "InterleaveExec") + } + } + } +} + impl ExecutionPlan for InterleaveExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -421,18 +438,6 @@ impl ExecutionPlan for InterleaveExec { ))) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "InterleaveExec") - } - } - } - fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } diff --git a/datafusion/core/src/physical_plan/unnest.rs b/datafusion/core/src/physical_plan/unnest.rs index 2e5c92872fc05..7f8d56847e593 100644 --- a/datafusion/core/src/physical_plan/unnest.rs +++ b/datafusion/core/src/physical_plan/unnest.rs @@ -38,6 +38,8 @@ use crate::physical_plan::{ }; use datafusion_common::{DataFusionError, Result, ScalarValue}; +use super::DisplayAs; + /// Unnest the given column by joining the row with each value in the nested type. #[derive(Debug)] pub struct UnnestExec { @@ -60,6 +62,20 @@ impl UnnestExec { } } +impl DisplayAs for UnnestExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "UnnestExec") + } + } + } +} + impl ExecutionPlan for UnnestExec { fn as_any(&self) -> &dyn Any { self @@ -126,18 +142,6 @@ impl ExecutionPlan for UnnestExec { })) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "UnnestExec") - } - } - } - fn statistics(&self) -> Statistics { Default::default() } diff --git a/datafusion/core/src/physical_plan/values.rs b/datafusion/core/src/physical_plan/values.rs index 4099cfdb364d6..70e00ed0340e7 100644 --- a/datafusion/core/src/physical_plan/values.rs +++ b/datafusion/core/src/physical_plan/values.rs @@ -18,7 +18,7 @@ //! Values execution plan use super::expressions::PhysicalSortExpr; -use super::{common, SendableRecordBatchStream, Statistics}; +use super::{common, DisplayAs, SendableRecordBatchStream, Statistics}; use crate::physical_plan::{ memory::MemoryStream, ColumnarValue, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, @@ -94,6 +94,20 @@ impl ValuesExec { } } +impl DisplayAs for ValuesExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "ValuesExec") + } + } + } +} + impl ExecutionPlan for ValuesExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -145,18 +159,6 @@ impl ExecutionPlan for ValuesExec { )?)) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "ValuesExec") - } - } - } - fn statistics(&self) -> Statistics { let batch = self.data(); common::compute_record_batch_statistics(&[batch], &self.schema, None) diff --git a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs index 9c86abec8165b..1f0da4a8e6abd 100644 --- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs @@ -28,8 +28,8 @@ use crate::physical_plan::windows::{ calc_requirements, get_ordered_partition_by_indices, window_ordering_equivalence, }; use crate::physical_plan::{ - ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, - RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr, + ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, + Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr, }; use datafusion_common::Result; use datafusion_execution::TaskContext; @@ -201,6 +201,35 @@ impl BoundedWindowAggExec { } } +impl DisplayAs for BoundedWindowAggExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "BoundedWindowAggExec: ")?; + let g: Vec = self + .window_expr + .iter() + .map(|e| { + format!( + "{}: {:?}, frame: {:?}", + e.name().to_owned(), + e.field(), + e.get_window_frame() + ) + }) + .collect(); + let mode = &self.partition_search_mode; + write!(f, "wdw=[{}], mode=[{:?}]", g.join(", "), mode)?; + } + } + Ok(()) + } +} + impl ExecutionPlan for BoundedWindowAggExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -299,33 +328,6 @@ impl ExecutionPlan for BoundedWindowAggExec { Ok(stream) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "BoundedWindowAggExec: ")?; - let g: Vec = self - .window_expr - .iter() - .map(|e| { - format!( - "{}: {:?}, frame: {:?}", - e.name().to_owned(), - e.field(), - e.get_window_frame() - ) - }) - .collect(); - let mode = &self.partition_search_mode; - write!(f, "wdw=[{}], mode=[{:?}]", g.join(", "), mode)?; - } - } - Ok(()) - } - fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs index 4a0648c8f1b31..7e7fc22965d72 100644 --- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs @@ -26,7 +26,7 @@ use crate::physical_plan::windows::{ calc_requirements, get_ordered_partition_by_indices, window_ordering_equivalence, }; use crate::physical_plan::{ - ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties, + ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr, }; @@ -121,6 +121,34 @@ impl WindowAggExec { } } +impl DisplayAs for WindowAggExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "WindowAggExec: ")?; + let g: Vec = self + .window_expr + .iter() + .map(|e| { + format!( + "{}: {:?}, frame: {:?}", + e.name().to_owned(), + e.field(), + e.get_window_frame() + ) + }) + .collect(); + write!(f, "wdw=[{}]", g.join(", "))?; + } + } + Ok(()) + } +} + impl ExecutionPlan for WindowAggExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -225,32 +253,6 @@ impl ExecutionPlan for WindowAggExec { Ok(stream) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "WindowAggExec: ")?; - let g: Vec = self - .window_expr - .iter() - .map(|e| { - format!( - "{}: {:?}, frame: {:?}", - e.name().to_owned(), - e.field(), - e.get_window_frame() - ) - }) - .collect(); - write!(f, "wdw=[{}]", g.join(", "))?; - } - } - Ok(()) - } - fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index a4aab95635c86..2eb458992fdbd 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1934,10 +1934,10 @@ mod tests { use super::*; use crate::datasource::file_format::options::CsvReadOptions; use crate::datasource::MemTable; - use crate::physical_plan::SendableRecordBatchStream; use crate::physical_plan::{ expressions, DisplayFormatType, Partitioning, Statistics, }; + use crate::physical_plan::{DisplayAs, SendableRecordBatchStream}; use crate::physical_planner::PhysicalPlanner; use crate::prelude::{SessionConfig, SessionContext}; use crate::scalar::ScalarValue; @@ -2500,6 +2500,16 @@ mod tests { schema: SchemaRef, } + impl DisplayAs for NoOpExecutionPlan { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "NoOpExecutionPlan") + } + } + } + } + impl ExecutionPlan for NoOpExecutionPlan { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -2537,14 +2547,6 @@ mod tests { unimplemented!("NoOpExecutionPlan::execute"); } - fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "NoOpExecutionPlan") - } - } - } - fn statistics(&self) -> Statistics { unimplemented!("NoOpExecutionPlan::statistics"); } diff --git a/datafusion/core/src/test/exec.rs b/datafusion/core/src/test/exec.rs index 32e7d4bcfef2e..63c86084ea342 100644 --- a/datafusion/core/src/test/exec.rs +++ b/datafusion/core/src/test/exec.rs @@ -31,11 +31,11 @@ use arrow::{ }; use futures::Stream; -use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::{ common, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use crate::physical_plan::{expressions::PhysicalSortExpr, DisplayAs}; use crate::{ error::{DataFusionError, Result}, physical_plan::stream::RecordBatchReceiverStream, @@ -153,6 +153,20 @@ impl MockExec { } } +impl DisplayAs for MockExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "MockExec") + } + } + } +} + impl ExecutionPlan for MockExec { fn as_any(&self) -> &dyn Any { self @@ -225,18 +239,6 @@ impl ExecutionPlan for MockExec { } } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "MockExec") - } - } - } - // Panics if one of the batches is an error fn statistics(&self) -> Statistics { let data: Result> = self @@ -295,6 +297,20 @@ impl BarrierExec { } } +impl DisplayAs for BarrierExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "BarrierExec") + } + } + } +} + impl ExecutionPlan for BarrierExec { fn as_any(&self) -> &dyn Any { self @@ -352,18 +368,6 @@ impl ExecutionPlan for BarrierExec { Ok(builder.build()) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "BarrierExec") - } - } - } - fn statistics(&self) -> Statistics { common::compute_record_batch_statistics(&self.data, &self.schema, None) } @@ -392,6 +396,20 @@ impl ErrorExec { } } +impl DisplayAs for ErrorExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "ErrorExec") + } + } + } +} + impl ExecutionPlan for ErrorExec { fn as_any(&self) -> &dyn Any { self @@ -431,18 +449,6 @@ impl ExecutionPlan for ErrorExec { ))) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "ErrorExec") - } - } - } - fn statistics(&self) -> Statistics { Statistics::default() } @@ -470,6 +476,26 @@ impl StatisticsExec { } } } + +impl DisplayAs for StatisticsExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "StatisticsExec: col_count={}, row_count={:?}", + self.schema.fields().len(), + self.stats.num_rows, + ) + } + } + } +} + impl ExecutionPlan for StatisticsExec { fn as_any(&self) -> &dyn Any { self @@ -509,23 +535,6 @@ impl ExecutionPlan for StatisticsExec { fn statistics(&self) -> Statistics { self.stats.clone() } - - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "StatisticsExec: col_count={}, row_count={:?}", - self.schema.fields().len(), - self.stats.num_rows, - ) - } - } - } } /// Execution plan that emits streams that block forever. @@ -563,6 +572,20 @@ impl BlockingExec { } } +impl DisplayAs for BlockingExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "BlockingExec",) + } + } + } +} + impl ExecutionPlan for BlockingExec { fn as_any(&self) -> &dyn Any { self @@ -605,18 +628,6 @@ impl ExecutionPlan for BlockingExec { })) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "BlockingExec",) - } - } - } - fn statistics(&self) -> Statistics { unimplemented!() } @@ -697,6 +708,20 @@ impl PanicExec { } } +impl DisplayAs for PanicExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "PanickingExec",) + } + } + } +} + impl ExecutionPlan for PanicExec { fn as_any(&self) -> &dyn Any { self @@ -743,18 +768,6 @@ impl ExecutionPlan for PanicExec { })) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "PanickingExec",) - } - } - } - fn statistics(&self) -> Statistics { unimplemented!() } diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 6715a6ba9857e..7f73e00dca576 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -33,7 +33,7 @@ use crate::execution::context::{SessionState, TaskContext}; use crate::execution::options::ReadOptions; use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE}; use crate::physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, }; use crate::prelude::{CsvReadOptions, SessionContext}; @@ -363,6 +363,25 @@ impl UnboundedExec { } } } + +impl DisplayAs for UnboundedExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "UnboundableExec: unbounded={}", + self.batch_produce.is_none(), + ) + } + } + } +} + impl ExecutionPlan for UnboundedExec { fn as_any(&self) -> &dyn Any { self @@ -406,22 +425,6 @@ impl ExecutionPlan for UnboundedExec { })) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "UnboundableExec: unbounded={}", - self.batch_produce.is_none(), - ) - } - } - } - fn statistics(&self) -> Statistics { Statistics::default() } diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs index 7374a056d69dd..0f742f7b9b24d 100644 --- a/datafusion/core/tests/custom_sources.rs +++ b/datafusion/core/tests/custom_sources.rs @@ -26,8 +26,8 @@ use datafusion::logical_expr::{ use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::{ - project_schema, ColumnStatistics, ExecutionPlan, Partitioning, RecordBatchStream, - SendableRecordBatchStream, Statistics, + project_schema, ColumnStatistics, DisplayAs, ExecutionPlan, Partitioning, + RecordBatchStream, SendableRecordBatchStream, Statistics, }; use datafusion::scalar::ScalarValue; use datafusion::{ @@ -101,6 +101,20 @@ impl Stream for TestCustomRecordBatchStream { } } +impl DisplayAs for CustomExecutionPlan { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "CustomExecutionPlan: projection={:#?}", self.projection) + } + } + } +} + impl ExecutionPlan for CustomExecutionPlan { fn as_any(&self) -> &dyn Any { self @@ -138,18 +152,6 @@ impl ExecutionPlan for CustomExecutionPlan { Ok(Box::pin(TestCustomRecordBatchStream { nb_batch: 1 })) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "CustomExecutionPlan: projection={:#?}", self.projection) - } - } - } - fn statistics(&self) -> Statistics { let batch = TEST_CUSTOM_RECORD_BATCH!().unwrap(); Statistics { diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index 36b15f435943b..b471121d1b585 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -26,7 +26,8 @@ use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, + Statistics, }; use datafusion::prelude::*; use datafusion::scalar::ScalarValue; @@ -58,6 +59,20 @@ struct CustomPlan { batches: Vec, } +impl DisplayAs for CustomPlan { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "CustomPlan: batch_size={}", self.batches.len(),) + } + } + } +} + impl ExecutionPlan for CustomPlan { fn as_any(&self) -> &dyn std::any::Any { self @@ -97,18 +112,6 @@ impl ExecutionPlan for CustomPlan { ))) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "CustomPlan: batch_size={}", self.batches.len(),) - } - } - } - fn statistics(&self) -> Statistics { // here we could provide more accurate statistics // but we want to test the filter pushdown not the CBOs diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 73237fa8574b0..167cf0e7f3512 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -25,7 +25,7 @@ use datafusion::{ error::Result, logical_expr::Expr, physical_plan::{ - expressions::PhysicalSortExpr, project_schema, ColumnStatistics, + expressions::PhysicalSortExpr, project_schema, ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }, @@ -111,6 +111,25 @@ impl TableProvider for StatisticsValidation { } } +impl DisplayAs for StatisticsValidation { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "StatisticsValidation: col_count={}, row_count={:?}", + self.schema.fields().len(), + self.stats.num_rows, + ) + } + } + } +} + impl ExecutionPlan for StatisticsValidation { fn as_any(&self) -> &dyn Any { self @@ -150,23 +169,6 @@ impl ExecutionPlan for StatisticsValidation { fn statistics(&self) -> Statistics { self.stats.clone() } - - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "StatisticsValidation: col_count={}, row_count={:?}", - self.schema.fields().len(), - self.stats.num_rows, - ) - } - } - } } fn init_ctx(stats: Statistics, schema: Schema) -> Result { diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index 493d90d91a81c..0771ef7f7997b 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -80,8 +80,9 @@ use datafusion::{ }, optimizer::{optimize_children, OptimizerConfig, OptimizerRule}, physical_plan::{ - expressions::PhysicalSortExpr, DisplayFormatType, Distribution, ExecutionPlan, - Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, + expressions::PhysicalSortExpr, DisplayAs, DisplayFormatType, Distribution, + ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, + Statistics, }, physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner}, prelude::{SessionConfig, SessionContext}, @@ -421,6 +422,20 @@ impl Debug for TopKExec { } } +impl DisplayAs for TopKExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "TopKExec: k={}", self.k) + } + } + } +} + #[async_trait] impl ExecutionPlan for TopKExec { /// Return a reference to Any that can be used for downcasting @@ -478,18 +493,6 @@ impl ExecutionPlan for TopKExec { })) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "TopKExec: k={}", self.k) - } - } - } - fn statistics(&self) -> Statistics { // to improve the optimizability of this plan // better statistics inference could be provided