From 7d74efe8ff5ef907f2b770096229fe0080e8eb52 Mon Sep 17 00:00:00 2001 From: Kirill Zaborsky Date: Sat, 17 Jun 2023 19:02:20 +0300 Subject: [PATCH] Display all partitions and files in EXPLAIN VERBOSE Adds DisplayAs trait for structs which could show more details when formatted in the verbose mode Resolves https://github.com/apache/arrow-datafusion/issues/6383 --- benchmarks/src/bin/tpch.rs | 4 +- .../core/src/datasource/file_format/csv.rs | 25 ++- datafusion/core/src/datasource/memory.rs | 16 +- .../datasource/physical_plan/arrow_file.rs | 9 +- .../core/src/datasource/physical_plan/avro.rs | 9 +- .../core/src/datasource/physical_plan/csv.rs | 14 +- .../core/src/datasource/physical_plan/json.rs | 9 +- .../core/src/datasource/physical_plan/mod.rs | 185 ++++++++++++++---- .../src/datasource/physical_plan/parquet.rs | 17 +- .../combine_partial_final_agg.rs | 2 +- .../physical_optimizer/dist_enforcement.rs | 4 +- .../src/physical_optimizer/join_selection.rs | 2 +- .../src/physical_optimizer/repartition.rs | 2 +- .../physical_optimizer/sort_enforcement.rs | 4 +- .../core/src/physical_plan/aggregates/mod.rs | 2 +- datafusion/core/src/physical_plan/analyze.rs | 6 +- .../src/physical_plan/coalesce_batches.rs | 2 +- .../src/physical_plan/coalesce_partitions.rs | 2 +- datafusion/core/src/physical_plan/display.rs | 55 +++++- datafusion/core/src/physical_plan/empty.rs | 2 +- datafusion/core/src/physical_plan/explain.rs | 2 +- datafusion/core/src/physical_plan/filter.rs | 2 +- datafusion/core/src/physical_plan/insert.rs | 12 +- .../src/physical_plan/joins/cross_join.rs | 2 +- .../core/src/physical_plan/joins/hash_join.rs | 2 +- .../physical_plan/joins/nested_loop_join.rs | 2 +- .../physical_plan/joins/sort_merge_join.rs | 2 +- .../joins/symmetric_hash_join.rs | 6 +- datafusion/core/src/physical_plan/limit.rs | 4 +- datafusion/core/src/physical_plan/memory.rs | 2 +- datafusion/core/src/physical_plan/mod.rs | 6 +- .../core/src/physical_plan/projection.rs | 2 +- .../core/src/physical_plan/repartition/mod.rs | 2 +- .../core/src/physical_plan/sorts/sort.rs | 2 +- .../sorts/sort_preserving_merge.rs | 2 +- datafusion/core/src/physical_plan/union.rs | 4 +- datafusion/core/src/physical_plan/unnest.rs | 2 +- datafusion/core/src/physical_plan/values.rs | 2 +- .../windows/bounded_window_agg_exec.rs | 2 +- .../physical_plan/windows/window_agg_exec.rs | 2 +- datafusion/core/src/physical_planner.rs | 22 ++- datafusion/core/src/test/exec.rs | 12 +- datafusion/core/src/test_util/mod.rs | 2 +- datafusion/core/tests/custom_sources.rs | 2 +- .../provider_filter_pushdown.rs | 2 +- .../tests/custom_sources_cases/statistics.rs | 2 +- datafusion/core/tests/sql/explain_analyze.rs | 4 +- datafusion/core/tests/sql/order.rs | 2 +- datafusion/core/tests/user_defined_plan.rs | 2 +- .../tests/cases/roundtrip_physical_plan.rs | 7 +- 50 files changed, 319 insertions(+), 171 deletions(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 4ba8b26bba335..32359dc1f8d1a 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -303,14 +303,14 @@ async fn execute_query( if debug { println!( "=== Physical plan ===\n{}\n", - displayable(physical_plan.as_ref()).indent() + displayable(physical_plan.as_ref()).indent(true) ); } let result = collect(physical_plan.clone(), state.task_ctx()).await?; if debug { println!( "=== Physical plan with metrics ===\n{}\n", - DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()).indent() + DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()).indent(true) ); if !result.is_empty() { // do not call print_batches if there are no batches as the result is confusing diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 01bf76ccf48d1..dd6d3cd7f73d2 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -20,7 +20,7 @@ use std::any::Any; use std::collections::HashSet; use std::fmt; -use std::fmt::{Debug, Display}; +use std::fmt::Debug; use std::sync::Arc; use arrow::csv::WriterBuilder; @@ -51,7 +51,7 @@ use crate::datasource::physical_plan::{ use crate::error::Result; use crate::execution::context::SessionState; use crate::physical_plan::insert::{DataSink, InsertExec}; -use crate::physical_plan::Statistics; +use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics}; use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; /// The default file extension of csv files @@ -443,14 +443,19 @@ impl Debug for CsvSink { } } -impl Display for CsvSink { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "CsvSink(writer_mode={:?}, file_groups={})", - self.config.writer_mode, - FileGroupDisplay(&self.config.file_groups), - ) +impl DisplayAs for CsvSink { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "CsvSink(writer_mode={:?}, file_groups=", + self.config.writer_mode + )?; + FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?; + write!(f, ")") + } + } } } diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 3f6316d28ab56..b7cb013eba4c0 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -20,7 +20,7 @@ use futures::StreamExt; use log::debug; use std::any::Any; -use std::fmt::{self, Debug, Display}; +use std::fmt::{self, Debug}; use std::sync::Arc; use arrow::datatypes::SchemaRef; @@ -36,9 +36,9 @@ use crate::logical_expr::Expr; use crate::physical_plan::common::AbortOnDropSingle; use crate::physical_plan::insert::{DataSink, InsertExec}; use crate::physical_plan::memory::MemoryExec; -use crate::physical_plan::ExecutionPlan; use crate::physical_plan::{common, SendableRecordBatchStream}; use crate::physical_plan::{repartition::RepartitionExec, Partitioning}; +use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; /// Type alias for partition data pub type PartitionData = Arc>>; @@ -213,10 +213,14 @@ impl Debug for MemSink { } } -impl Display for MemSink { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let partition_count = self.batches.len(); - write!(f, "MemoryTable (partitions={partition_count})") +impl DisplayAs for MemSink { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let partition_count = self.batches.len(); + write!(f, "MemoryTable (partitions={partition_count})") + } + } } } diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 43074ccb77c1d..0003ffea8c675 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -22,7 +22,7 @@ use crate::datasource::physical_plan::{ use crate::error::Result; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::physical_plan::{ - ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan, + ordering_equivalence_properties_helper, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, }; use arrow_schema::SchemaRef; @@ -137,11 +137,8 @@ impl ExecutionPlan for ArrowExec { t: DisplayFormatType, f: &mut std::fmt::Formatter, ) -> std::fmt::Result { - match t { - DisplayFormatType::Default => { - write!(f, "ArrowExec: {}", self.base_config) - } - } + write!(f, "ArrowExec: ")?; + self.base_config.fmt_as(t, f) } fn statistics(&self) -> Statistics { diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index 0c286ba19c619..df82ccca29201 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -20,7 +20,7 @@ use crate::error::Result; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::physical_plan::{ - ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan, + ordering_equivalence_properties_helper, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; use datafusion_execution::TaskContext; @@ -146,11 +146,8 @@ impl ExecutionPlan for AvroExec { t: DisplayFormatType, f: &mut std::fmt::Formatter, ) -> std::fmt::Result { - match t { - DisplayFormatType::Default => { - write!(f, "AvroExec: {}", self.base_config) - } - } + write!(f, "AvroExec: ")?; + self.base_config.fmt_as(t, f) } fn statistics(&self) -> Statistics { diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index d2c76ecaf5eac..027bd1945be60 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -27,7 +27,7 @@ use crate::physical_plan::common::AbortOnDropSingle; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::physical_plan::{ - ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan, + ordering_equivalence_properties_helper, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; use arrow::csv; @@ -177,15 +177,9 @@ impl ExecutionPlan for CsvExec { t: DisplayFormatType, f: &mut std::fmt::Formatter, ) -> std::fmt::Result { - match t { - DisplayFormatType::Default => { - write!( - f, - "CsvExec: {}, has_header={}", - self.base_config, self.has_header, - ) - } - } + write!(f, "CsvExec: ")?; + self.base_config.fmt_as(t, f)?; + write!(f, ", has_header={}", self.has_header) } fn statistics(&self) -> Statistics { diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 8340c282a01ee..b736fd7839993 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -26,7 +26,7 @@ use crate::physical_plan::common::AbortOnDropSingle; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::physical_plan::{ - ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan, + ordering_equivalence_properties_helper, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; use datafusion_execution::TaskContext; @@ -155,11 +155,8 @@ impl ExecutionPlan for NdJsonExec { t: DisplayFormatType, f: &mut std::fmt::Formatter, ) -> std::fmt::Result { - match t { - DisplayFormatType::Default => { - write!(f, "JsonExec: {}", self.base_config) - } - } + write!(f, "JsonExec: ")?; + self.base_config.fmt_as(t, f) } fn statistics(&self) -> Statistics { diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index ad22880a43dbe..610b5ee42419b 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -44,12 +44,15 @@ pub use file_stream::{FileOpenFuture, FileOpener, FileStream, OnError}; pub(crate) use json::plan_to_json; pub use json::{JsonOpener, NdJsonExec}; -use crate::datasource::file_format::FileWriterMode; use crate::datasource::{ listing::{FileRange, PartitionedFile}, object_store::ObjectStoreUrl, }; use crate::physical_plan::ExecutionPlan; +use crate::{ + datasource::file_format::FileWriterMode, + physical_plan::{DisplayAs, DisplayFormatType}, +}; use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, @@ -269,15 +272,16 @@ impl Debug for FileScanConfig { write!(f, "statistics={:?}, ", self.statistics)?; - Display::fmt(self, f) + DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f) } } -impl Display for FileScanConfig { - fn fmt(&self, f: &mut Formatter) -> FmtResult { +impl DisplayAs for FileScanConfig { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult { let (schema, _, orderings) = self.project(); - write!(f, "file_groups={}", FileGroupsDisplay(&self.file_groups))?; + write!(f, "file_groups=")?; + FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?; if !schema.fields().is_empty() { write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; @@ -310,16 +314,25 @@ impl Display for FileScanConfig { #[derive(Debug)] struct FileGroupsDisplay<'a>(&'a [Vec]); -impl<'a> Display for FileGroupsDisplay<'a> { - fn fmt(&self, f: &mut Formatter) -> FmtResult { +impl<'a> DisplayAs for FileGroupsDisplay<'a> { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult { let n_groups = self.0.len(); let groups = if n_groups == 1 { "group" } else { "groups" }; write!(f, "{{{n_groups} {groups}: [")?; - // To avoid showing too many partitions - let max_groups = 5; - fmt_up_to_n_elements(self.0, max_groups, f, |group, f| { - write!(f, "{}", FileGroupDisplay(group)) - })?; + match t { + DisplayFormatType::Default => { + // To avoid showing too many partitions + let max_groups = 5; + fmt_up_to_n_elements(self.0, max_groups, f, |group, f| { + FileGroupDisplay(group).fmt_as(t, f) + })?; + } + DisplayFormatType::Verbose => { + fmt_elements_split_by_commas(self.0.iter(), f, |group, f| { + FileGroupDisplay(group).fmt_as(t, f) + })? + } + } write!(f, "]}}") } } @@ -333,18 +346,31 @@ impl<'a> Display for FileGroupsDisplay<'a> { #[derive(Debug)] pub(crate) struct FileGroupDisplay<'a>(pub &'a [PartitionedFile]); -impl<'a> Display for FileGroupDisplay<'a> { - fn fmt(&self, f: &mut Formatter) -> FmtResult { +impl<'a> DisplayAs for FileGroupDisplay<'a> { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult { write!(f, "[")?; - // To avoid showing too many files - let max_files = 5; - fmt_up_to_n_elements(self.0, max_files, f, |pf, f| { - write!(f, "{}", pf.object_meta.location.as_ref())?; - if let Some(range) = pf.range.as_ref() { - write!(f, ":{}..{}", range.start, range.end)?; + match t { + DisplayFormatType::Default => { + // To avoid showing too many files + let max_files = 5; + fmt_up_to_n_elements(self.0, max_files, f, |pf, f| { + write!(f, "{}", pf.object_meta.location.as_ref())?; + if let Some(range) = pf.range.as_ref() { + write!(f, ":{}..{}", range.start, range.end)?; + } + Ok(()) + })? + } + DisplayFormatType::Verbose => { + fmt_elements_split_by_commas(self.0.iter(), f, |pf, f| { + write!(f, "{}", pf.object_meta.location.as_ref())?; + if let Some(range) = pf.range.as_ref() { + write!(f, ":{}..{}", range.start, range.end)?; + } + Ok(()) + })? } - Ok(()) - })?; + } write!(f, "]") } } @@ -360,16 +386,32 @@ where F: Fn(&E, &mut Formatter) -> FmtResult, { let len = elements.len(); - for (idx, element) in elements.iter().take(n).enumerate() { + fmt_elements_split_by_commas(elements.iter().take(n), f, |element, f| { + format_element(element, f) + })?; + // Remaining elements are showed as `...` (to indicate there is more) + if len > n { + write!(f, ", ...")?; + } + Ok(()) +} + +/// helper formatting array elements with a comma and a space between them +fn fmt_elements_split_by_commas( + iter: I, + f: &mut Formatter, + format_element: F, +) -> FmtResult +where + I: Iterator, + F: Fn(E, &mut Formatter) -> FmtResult, +{ + for (idx, element) in iter.enumerate() { if idx > 0 { write!(f, ", ")?; } format_element(element, f)?; } - // Remaining elements are showed as `...` (to indicate there is more) - if len > n { - write!(f, ", ...")?; - } Ok(()) } @@ -888,6 +930,7 @@ mod tests { }; use chrono::Utc; + use crate::physical_plan::{DefaultDisplay, VerboseDisplay}; use crate::{ test::{build_table_i32, columns}, test_util::aggr_test_schema, @@ -1289,7 +1332,7 @@ mod tests { #[test] fn file_groups_display_empty() { let expected = "{0 groups: []}"; - assert_eq!(&FileGroupsDisplay(&[]).to_string(), expected); + assert_eq!(DefaultDisplay(FileGroupsDisplay(&[])).to_string(), expected); } #[test] @@ -1297,11 +1340,29 @@ mod tests { let files = [vec![partitioned_file("foo"), partitioned_file("bar")]]; let expected = "{1 group: [[foo, bar]]}"; - assert_eq!(&FileGroupsDisplay(&files).to_string(), expected); + assert_eq!( + DefaultDisplay(FileGroupsDisplay(&files)).to_string(), + expected + ); + } + + #[test] + fn file_groups_display_many_default() { + let files = [ + vec![partitioned_file("foo"), partitioned_file("bar")], + vec![partitioned_file("baz")], + vec![], + ]; + + let expected = "{3 groups: [[foo, bar], [baz], []]}"; + assert_eq!( + DefaultDisplay(FileGroupsDisplay(&files)).to_string(), + expected + ); } #[test] - fn file_groups_display_many() { + fn file_groups_display_many_verbose() { let files = [ vec![partitioned_file("foo"), partitioned_file("bar")], vec![partitioned_file("baz")], @@ -1309,11 +1370,14 @@ mod tests { ]; let expected = "{3 groups: [[foo, bar], [baz], []]}"; - assert_eq!(&FileGroupsDisplay(&files).to_string(), expected); + assert_eq!( + VerboseDisplay(FileGroupsDisplay(&files)).to_string(), + expected + ); } #[test] - fn file_groups_display_too_many() { + fn file_groups_display_too_many_default() { let files = [ vec![partitioned_file("foo"), partitioned_file("bar")], vec![partitioned_file("baz")], @@ -1325,19 +1389,45 @@ mod tests { ]; let expected = "{7 groups: [[foo, bar], [baz], [qux], [quux], [quuux], ...]}"; - assert_eq!(&FileGroupsDisplay(&files).to_string(), expected); + assert_eq!( + DefaultDisplay(FileGroupsDisplay(&files)).to_string(), + expected + ); + } + + #[test] + fn file_groups_display_too_many_verbose() { + let files = [ + vec![partitioned_file("foo"), partitioned_file("bar")], + vec![partitioned_file("baz")], + vec![partitioned_file("qux")], + vec![partitioned_file("quux")], + vec![partitioned_file("quuux")], + vec![partitioned_file("quuuux")], + vec![], + ]; + + let expected = + "{7 groups: [[foo, bar], [baz], [qux], [quux], [quuux], [quuuux], []]}"; + assert_eq!( + VerboseDisplay(FileGroupsDisplay(&files)).to_string(), + expected + ); } #[test] - fn file_group_display_many() { + fn file_group_display_many_default() { let files = vec![partitioned_file("foo"), partitioned_file("bar")]; let expected = "[foo, bar]"; - assert_eq!(&FileGroupDisplay(&files).to_string(), expected); + assert_eq!( + DefaultDisplay(FileGroupDisplay(&files)).to_string(), + expected + ); } #[test] - fn file_group_display_too_many() { + fn file_group_display_too_many_default() { let files = vec![ partitioned_file("foo"), partitioned_file("bar"), @@ -1348,7 +1438,28 @@ mod tests { ]; let expected = "[foo, bar, baz, qux, quux, ...]"; - assert_eq!(&FileGroupDisplay(&files).to_string(), expected); + assert_eq!( + DefaultDisplay(FileGroupDisplay(&files)).to_string(), + expected + ); + } + + #[test] + fn file_group_display_too_many_verbose() { + let files = vec![ + partitioned_file("foo"), + partitioned_file("bar"), + partitioned_file("baz"), + partitioned_file("qux"), + partitioned_file("quux"), + partitioned_file("quuux"), + ]; + + let expected = "[foo, bar, baz, qux, quux, quuux]"; + assert_eq!( + VerboseDisplay(FileGroupDisplay(&files)).to_string(), + expected + ); } /// create a PartitionedFile for testing diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 48e4d49371704..f538255bc20d0 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -21,7 +21,8 @@ use crate::datasource::physical_plan::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; use crate::datasource::physical_plan::{ - parquet::page_filter::PagePruningPredicate, FileMeta, FileScanConfig, SchemaAdapter, + parquet::page_filter::PagePruningPredicate, DisplayAs, FileMeta, FileScanConfig, + SchemaAdapter, }; use crate::{ config::ConfigOptions, @@ -418,7 +419,7 @@ impl ExecutionPlan for ParquetExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { let predicate_string = self .predicate .as_ref() @@ -431,11 +432,9 @@ impl ExecutionPlan for ParquetExec { .map(|pre| format!(", pruning_predicate={}", pre.predicate_expr())) .unwrap_or_default(); - write!( - f, - "ParquetExec: {}{}{}", - self.base_config, predicate_string, pruning_predicate_string, - ) + write!(f, "ParquetExec: ")?; + self.base_config.fmt_as(t, f)?; + write!(f, "{}{}", predicate_string, pruning_predicate_string,) } } } @@ -1869,7 +1868,9 @@ mod tests { assert!(pruning_predicate.is_some()); // convert to explain plan form - let display = displayable(rt.parquet_exec.as_ref()).indent().to_string(); + let display = displayable(rt.parquet_exec.as_ref()) + .indent(true) + .to_string(); assert_contains!( &display, diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index 3ec9e9bbd0481..05b0dc4f19933 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -225,7 +225,7 @@ mod tests { let config = ConfigOptions::new(); let optimized = optimizer.optimize($PLAN, &config)?; // Now format correctly - let plan = displayable(optimized.as_ref()).indent().to_string(); + let plan = displayable(optimized.as_ref()).indent(true).to_string(); let actual_lines = trim_plan_display(&plan); assert_eq!( diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs index 4e456450bcb2f..89107ac989df5 100644 --- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs @@ -1174,7 +1174,7 @@ mod tests { let optimized = optimizer.optimize(optimized, &config)?; // Now format correctly - let plan = displayable(optimized.as_ref()).indent().to_string(); + let plan = displayable(optimized.as_ref()).indent(true).to_string(); let actual_lines = trim_plan_display(&plan); assert_eq!( @@ -1189,7 +1189,7 @@ mod tests { ($EXPECTED_LINES: expr, $PLAN: expr) => { let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); // Now format correctly - let plan = displayable($PLAN.as_ref()).indent().to_string(); + let plan = displayable($PLAN.as_ref()).indent(true).to_string(); let actual_lines = trim_plan_display(&plan); assert_eq!( diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index a97ef6a3f9d30..accf2d5643a10 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -632,7 +632,7 @@ mod tests { .optimize(Arc::new($PLAN), &ConfigOptions::new()) .unwrap(); - let plan = displayable(optimized.as_ref()).indent().to_string(); + let plan = displayable(optimized.as_ref()).indent(true).to_string(); let actual_lines = plan.split("\n").collect::>(); assert_eq!( diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index fb867ff36c62d..33ebcec6a6def 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -557,7 +557,7 @@ mod tests { }); // Now format correctly - let plan = displayable(optimized.as_ref()).indent().to_string(); + let plan = displayable(optimized.as_ref()).indent(true).to_string(); let actual_lines = trim_plan_display(&plan); assert_eq!( diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index a79552de49de2..de8eba9fd0fba 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -1024,7 +1024,7 @@ mod tests { // Util function to get string representation of a physical plan fn get_plan_string(plan: &Arc) -> Vec { - let formatted = displayable(plan.as_ref()).indent().to_string(); + let formatted = displayable(plan.as_ref()).indent(true).to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); actual.iter().map(|elem| elem.to_string()).collect() } @@ -1400,7 +1400,7 @@ mod tests { let state = session_ctx.state(); let physical_plan = $PLAN; - let formatted = displayable(physical_plan.as_ref()).indent().to_string(); + let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); let expected_plan_lines: Vec<&str> = $EXPECTED_PLAN_LINES diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index a2ae41de1f746..8e68450078cf7 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -827,7 +827,7 @@ impl ExecutionPlan for AggregateExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "AggregateExec: mode={:?}", self.mode)?; let g: Vec = if self.group_by.groups.len() == 1 { self.group_by diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs index 2e4441e307abb..90b7f4c02faaa 100644 --- a/datafusion/core/src/physical_plan/analyze.rs +++ b/datafusion/core/src/physical_plan/analyze.rs @@ -164,7 +164,7 @@ impl ExecutionPlan for AnalyzeExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "AnalyzeExec verbose={}", self.verbose) } } @@ -191,7 +191,7 @@ fn create_output_batch( type_builder.append_value("Plan with Metrics"); let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref()) - .indent() + .indent(verbose) .to_string(); plan_builder.append_value(annotated_plan); @@ -201,7 +201,7 @@ fn create_output_batch( type_builder.append_value("Plan with Full Metrics"); let annotated_plan = DisplayableExecutionPlan::with_full_metrics(input.as_ref()) - .indent() + .indent(verbose) .to_string(); plan_builder.append_value(annotated_plan); diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs index 0ca01aacfa19c..dd658e75f3009 100644 --- a/datafusion/core/src/physical_plan/coalesce_batches.rs +++ b/datafusion/core/src/physical_plan/coalesce_batches.rs @@ -143,7 +143,7 @@ impl ExecutionPlan for CoalesceBatchesExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!( f, "CoalesceBatchesExec: target_batch_size={}", diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs index d05c413caf4f9..d04ffa576a734 100644 --- a/datafusion/core/src/physical_plan/coalesce_partitions.rs +++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs @@ -152,7 +152,7 @@ impl ExecutionPlan for CoalescePartitionsExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "CoalescePartitionsExec") } } diff --git a/datafusion/core/src/physical_plan/display.rs b/datafusion/core/src/physical_plan/display.rs index 2fba06ed29c14..674cbcf2478bb 100644 --- a/datafusion/core/src/physical_plan/display.rs +++ b/datafusion/core/src/physical_plan/display.rs @@ -21,7 +21,7 @@ use std::fmt; -use datafusion_common::display::{StringifiedPlan, ToStringifiedPlan}; +use datafusion_common::display::StringifiedPlan; use super::{accept, ExecutionPlan, ExecutionPlanVisitor}; @@ -30,6 +30,8 @@ use super::{accept, ExecutionPlan, ExecutionPlanVisitor}; pub enum DisplayFormatType { /// Default, compact format. Example: `FilterExec: c12 < 10.0` Default, + /// Verbose, showing all available details + Verbose, } /// Wraps an `ExecutionPlan` with various ways to display this plan @@ -79,16 +81,21 @@ impl<'a> DisplayableExecutionPlan<'a> { /// RepartitionExec: partitioning=RoundRobinBatch(16) /// CsvExec: source=...", /// ``` - pub fn indent(&self) -> impl fmt::Display + 'a { + pub fn indent(&self, verbose: bool) -> impl fmt::Display + 'a { + let format_type = if verbose { + DisplayFormatType::Verbose + } else { + DisplayFormatType::Default + }; struct Wrapper<'a> { + format_type: DisplayFormatType, plan: &'a dyn ExecutionPlan, show_metrics: ShowMetrics, } impl<'a> fmt::Display for Wrapper<'a> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let t = DisplayFormatType::Default; let mut visitor = IndentVisitor { - t, + t: self.format_type, f, indent: 0, show_metrics: self.show_metrics, @@ -97,6 +104,7 @@ impl<'a> DisplayableExecutionPlan<'a> { } } Wrapper { + format_type, plan: self.inner, show_metrics: self.show_metrics, } @@ -128,6 +136,15 @@ impl<'a> DisplayableExecutionPlan<'a> { show_metrics: self.show_metrics, } } + + /// format as a `StringifiedPlan` + pub fn to_stringified( + &self, + verbose: bool, + plan_type: crate::logical_expr::PlanType, + ) -> StringifiedPlan { + StringifiedPlan::new(plan_type, self.indent(verbose).to_string()) + } } #[derive(Debug, Clone, Copy)] @@ -192,11 +209,29 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> { } } -impl<'a> ToStringifiedPlan for DisplayableExecutionPlan<'a> { - fn to_stringified( - &self, - plan_type: crate::logical_expr::PlanType, - ) -> StringifiedPlan { - StringifiedPlan::new(plan_type, self.indent().to_string()) +/// Trait for types which could have additional details when formatted in `Verbose` mode +pub trait DisplayAs { + /// Format according to `DisplayFormatType`, used when verbose representation looks + /// different from the default one + /// + /// Should not include a newline + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result; +} + +/// A newtype wrapper to display `T` implementing`DisplayAs` using the `Default` mode +pub struct DefaultDisplay(pub T); + +impl fmt::Display for DefaultDisplay { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt_as(DisplayFormatType::Default, f) + } +} + +/// A newtype wrapper to display `T` implementing `DisplayAs` using the `Verbose` mode +pub struct VerboseDisplay(pub T); + +impl fmt::Display for VerboseDisplay { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt_as(DisplayFormatType::Verbose, f) } } diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs index 627444ffd94d9..4d1b1cffc078b 100644 --- a/datafusion/core/src/physical_plan/empty.rs +++ b/datafusion/core/src/physical_plan/empty.rs @@ -154,7 +154,7 @@ impl ExecutionPlan for EmptyExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "EmptyExec: produce_one_row={}", self.produce_one_row) } } diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs index e40512f1b142b..22b0817c33f85 100644 --- a/datafusion/core/src/physical_plan/explain.rs +++ b/datafusion/core/src/physical_plan/explain.rs @@ -162,7 +162,7 @@ impl ExecutionPlan for ExplainExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "ExplainExec") } } diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs index a6f00846de8ff..d56f5820289f5 100644 --- a/datafusion/core/src/physical_plan/filter.rs +++ b/datafusion/core/src/physical_plan/filter.rs @@ -163,7 +163,7 @@ impl ExecutionPlan for FilterExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "FilterExec: {}", self.predicate) } } diff --git a/datafusion/core/src/physical_plan/insert.rs b/datafusion/core/src/physical_plan/insert.rs index f3bd701a565de..4742e1617e192 100644 --- a/datafusion/core/src/physical_plan/insert.rs +++ b/datafusion/core/src/physical_plan/insert.rs @@ -19,7 +19,8 @@ use super::expressions::PhysicalSortExpr; use super::{ - DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, + Statistics, }; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -31,7 +32,7 @@ use datafusion_common::Result; use datafusion_physical_expr::PhysicalSortRequirement; use futures::StreamExt; use std::any::Any; -use std::fmt::{Debug, Display}; +use std::fmt::Debug; use std::sync::Arc; use crate::physical_plan::stream::RecordBatchStreamAdapter; @@ -45,7 +46,7 @@ use datafusion_execution::TaskContext; /// The `Display` impl is used to format the sink for explain plan /// output. #[async_trait] -pub trait DataSink: Display + Debug + Send + Sync { +pub trait DataSink: DisplayAs + Debug + Send + Sync { // TODO add desired input ordering // How does this sink want its input ordered? @@ -185,8 +186,9 @@ impl ExecutionPlan for InsertExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { - write!(f, "InsertExec: sink={}", self.sink) + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "InsertExec: sink=")?; + self.sink.fmt_as(t, f) } } } diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs index eb567ee130cad..f752d134a1f14 100644 --- a/datafusion/core/src/physical_plan/joins/cross_join.rs +++ b/datafusion/core/src/physical_plan/joins/cross_join.rs @@ -249,7 +249,7 @@ impl ExecutionPlan for CrossJoinExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "CrossJoinExec") } } diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 0e62540d6d559..0897929f0b51a 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -419,7 +419,7 @@ impl ExecutionPlan for HashJoinExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { let display_filter = self.filter.as_ref().map_or_else( || "".to_string(), |f| format!(", filter={}", f.expression()), diff --git a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs index 82e677f7205d8..8de5c76e514f7 100644 --- a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs +++ b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs @@ -251,7 +251,7 @@ impl ExecutionPlan for NestedLoopJoinExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { let display_filter = self.filter.as_ref().map_or_else( || "".to_string(), |f| format!(", filter={:?}", f.expression()), diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs index aa6a77925e0a4..d87023093f7e5 100644 --- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs @@ -363,7 +363,7 @@ impl ExecutionPlan for SortMergeJoinExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!( f, "SortMergeJoin: join_type={:?}, on={:?}", diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index 7eac619687b28..586eb3678ef2b 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -455,7 +455,7 @@ impl ExecutionPlan for SymmetricHashJoinExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { let display_filter = self.filter.as_ref().map_or_else( || "".to_string(), |f| format!(", filter={}", f.expression()), @@ -2446,7 +2446,7 @@ mod tests { let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10"; let dataframe = ctx.sql(sql).await?; let physical_plan = dataframe.create_physical_plan().await?; - let formatted = displayable(physical_plan.as_ref()).indent().to_string(); + let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); let expected = { [ "SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10", @@ -2499,7 +2499,7 @@ mod tests { let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10"; let dataframe = ctx.sql(sql).await?; let physical_plan = dataframe.create_physical_plan().await?; - let formatted = displayable(physical_plan.as_ref()).indent().to_string(); + let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); let expected = { [ "SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10", diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs index 132bae6141466..d1948cc288ce7 100644 --- a/datafusion/core/src/physical_plan/limit.rs +++ b/datafusion/core/src/physical_plan/limit.rs @@ -170,7 +170,7 @@ impl ExecutionPlan for GlobalLimitExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!( f, "GlobalLimitExec: skip={}, fetch={}", @@ -337,7 +337,7 @@ impl ExecutionPlan for LocalLimitExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "LocalLimitExec: fetch={}", self.fetch) } } diff --git a/datafusion/core/src/physical_plan/memory.rs b/datafusion/core/src/physical_plan/memory.rs index 38fa5d549cba4..fa456d15eebfe 100644 --- a/datafusion/core/src/physical_plan/memory.rs +++ b/datafusion/core/src/physical_plan/memory.rs @@ -108,7 +108,7 @@ impl ExecutionPlan for MemoryExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { let partitions: Vec<_> = self.partitions.iter().map(|b| b.len()).collect(); write!( diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 11385d994b260..e3e9fec24780f 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -34,7 +34,7 @@ pub use datafusion_expr::Accumulator; pub use datafusion_expr::ColumnarValue; pub use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator; use datafusion_physical_expr::equivalence::OrderingEquivalenceProperties; -pub use display::DisplayFormatType; +pub use display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; use futures::stream::{Stream, TryStreamExt}; use std::fmt; use std::fmt::Debug; @@ -312,9 +312,9 @@ pub fn with_new_children_if_necessary( /// let dataframe = ctx.sql("SELECT a FROM example WHERE a < 5").await.unwrap(); /// let physical_plan = dataframe.create_physical_plan().await.unwrap(); /// -/// // Format using display string +/// // Format using display string in verbose mode /// let displayable_plan = displayable(physical_plan.as_ref()); -/// let plan_string = format!("{}", displayable_plan.indent()); +/// let plan_string = format!("{}", displayable_plan.indent(true)); /// /// let working_directory = std::env::current_dir().unwrap(); /// let normalized = Path::from_filesystem_path(working_directory).unwrap(); diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs index 5eb578334ed2e..d1e1a4f9db1e3 100644 --- a/datafusion/core/src/physical_plan/projection.rs +++ b/datafusion/core/src/physical_plan/projection.rs @@ -266,7 +266,7 @@ impl ExecutionPlan for ProjectionExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { let expr: Vec = self .expr .iter() diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs index d7dc54afd6e14..e18ba302e0e80 100644 --- a/datafusion/core/src/physical_plan/repartition/mod.rs +++ b/datafusion/core/src/physical_plan/repartition/mod.rs @@ -450,7 +450,7 @@ impl ExecutionPlan for RepartitionExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!( f, "RepartitionExec: partitioning={:?}, input_partitions={}", diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 58c257c97f996..4983b0ea83e5e 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -625,7 +625,7 @@ impl ExecutionPlan for SortExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { let expr: Vec = self.expr.iter().map(|e| e.to_string()).collect(); match self.fetch { Some(fetch) => { diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 1195959a89b6a..4db1fea2a4f1e 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -207,7 +207,7 @@ impl ExecutionPlan for SortPreservingMergeExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { let expr: Vec = self.expr.iter().map(|e| e.to_string()).collect(); write!(f, "SortPreservingMergeExec: [{}]", expr.join(",")) } diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index a81c43398cde6..ffd3f06ee8bcf 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -253,7 +253,7 @@ impl ExecutionPlan for UnionExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "UnionExec") } } @@ -427,7 +427,7 @@ impl ExecutionPlan for InterleaveExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "InterleaveExec") } } diff --git a/datafusion/core/src/physical_plan/unnest.rs b/datafusion/core/src/physical_plan/unnest.rs index cd42c3305f2d0..2e5c92872fc05 100644 --- a/datafusion/core/src/physical_plan/unnest.rs +++ b/datafusion/core/src/physical_plan/unnest.rs @@ -132,7 +132,7 @@ impl ExecutionPlan for UnnestExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "UnnestExec") } } diff --git a/datafusion/core/src/physical_plan/values.rs b/datafusion/core/src/physical_plan/values.rs index d1cf6927a2e30..4099cfdb364d6 100644 --- a/datafusion/core/src/physical_plan/values.rs +++ b/datafusion/core/src/physical_plan/values.rs @@ -151,7 +151,7 @@ impl ExecutionPlan for ValuesExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "ValuesExec") } } diff --git a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs index 3a95308503e42..2512776e8dd45 100644 --- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs @@ -305,7 +305,7 @@ impl ExecutionPlan for BoundedWindowAggExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "BoundedWindowAggExec: ")?; let g: Vec = self .window_expr diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs index f57dfbc0b68b2..4a0648c8f1b31 100644 --- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs @@ -231,7 +231,7 @@ impl ExecutionPlan for WindowAggExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "WindowAggExec: ")?; let g: Vec = self .window_expr diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 3cb6efc6541aa..d695bcd99ae9c 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1815,7 +1815,7 @@ impl DefaultPhysicalPlanner { Ok(input) => { stringified_plans.push( displayable(input.as_ref()) - .to_stringified(InitialPhysicalPlan), + .to_stringified(e.verbose, InitialPhysicalPlan), ); match self.optimize_internal( @@ -1824,13 +1824,15 @@ impl DefaultPhysicalPlanner { |plan, optimizer| { let optimizer_name = optimizer.name().to_string(); let plan_type = OptimizedPhysicalPlan { optimizer_name }; - stringified_plans - .push(displayable(plan).to_stringified(plan_type)); + stringified_plans.push( + displayable(plan) + .to_stringified(e.verbose, plan_type), + ); }, ) { Ok(input) => stringified_plans.push( displayable(input.as_ref()) - .to_stringified(FinalPhysicalPlan), + .to_stringified(e.verbose, FinalPhysicalPlan), ), Err(DataFusionError::Context(optimizer_name, e)) => { let plan_type = OptimizedPhysicalPlan { optimizer_name }; @@ -1873,11 +1875,11 @@ impl DefaultPhysicalPlanner { let optimizers = session_state.physical_optimizers(); debug!( "Input physical plan:\n{}\n", - displayable(plan.as_ref()).indent() + displayable(plan.as_ref()).indent(false) ); trace!( "Detailed input physical plan:\n{}", - displayable(plan.as_ref()).indent() + displayable(plan.as_ref()).indent(true) ); let mut new_plan = plan; @@ -1903,13 +1905,13 @@ impl DefaultPhysicalPlanner { trace!( "Optimized physical plan by {}:\n{}\n", optimizer.name(), - displayable(new_plan.as_ref()).indent() + displayable(new_plan.as_ref()).indent(false) ); observer(new_plan.as_ref(), optimizer.as_ref()) } debug!( "Optimized physical plan:\n{}\n", - displayable(new_plan.as_ref()).indent() + displayable(new_plan.as_ref()).indent(false) ); trace!("Detailed optimized physical plan:\n{:?}", new_plan); Ok(new_plan) @@ -2420,7 +2422,7 @@ mod tests { } else { panic!( "Plan was not an explain plan: {}", - displayable(plan.as_ref()).indent() + displayable(plan.as_ref()).indent(true) ); } } @@ -2536,7 +2538,7 @@ mod tests { fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "NoOpExecutionPlan") } } diff --git a/datafusion/core/src/test/exec.rs b/datafusion/core/src/test/exec.rs index 41a0a1b4d084a..32e7d4bcfef2e 100644 --- a/datafusion/core/src/test/exec.rs +++ b/datafusion/core/src/test/exec.rs @@ -231,7 +231,7 @@ impl ExecutionPlan for MockExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "MockExec") } } @@ -358,7 +358,7 @@ impl ExecutionPlan for BarrierExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "BarrierExec") } } @@ -437,7 +437,7 @@ impl ExecutionPlan for ErrorExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "ErrorExec") } } @@ -516,7 +516,7 @@ impl ExecutionPlan for StatisticsExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!( f, "StatisticsExec: col_count={}, row_count={:?}", @@ -611,7 +611,7 @@ impl ExecutionPlan for BlockingExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "BlockingExec",) } } @@ -749,7 +749,7 @@ impl ExecutionPlan for PanicExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "PanickingExec",) } } diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 01d502f29f51e..6715a6ba9857e 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -412,7 +412,7 @@ impl ExecutionPlan for UnboundedExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!( f, "UnboundableExec: unbounded={}", diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs index a8871639f7d3a..7374a056d69dd 100644 --- a/datafusion/core/tests/custom_sources.rs +++ b/datafusion/core/tests/custom_sources.rs @@ -144,7 +144,7 @@ impl ExecutionPlan for CustomExecutionPlan { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "CustomExecutionPlan: projection={:#?}", self.projection) } } diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index a809832387332..36b15f435943b 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -103,7 +103,7 @@ impl ExecutionPlan for CustomPlan { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "CustomPlan: batch_size={}", self.batches.len(),) } } diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index ca83ab1cf64bf..73237fa8574b0 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -157,7 +157,7 @@ impl ExecutionPlan for StatisticsValidation { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!( f, "StatisticsValidation: col_count={}, row_count={:?}", diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 971dea81283fe..974721144f092 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -613,7 +613,7 @@ async fn test_physical_plan_display_indent() { ]; let normalizer = ExplainNormalizer::new(); - let actual = format!("{}", displayable(physical_plan.as_ref()).indent()) + let actual = format!("{}", displayable(physical_plan.as_ref()).indent(true)) .trim() .lines() // normalize paths @@ -659,7 +659,7 @@ async fn test_physical_plan_display_indent_multi_children() { ]; let normalizer = ExplainNormalizer::new(); - let actual = format!("{}", displayable(physical_plan.as_ref()).indent()) + let actual = format!("{}", displayable(physical_plan.as_ref()).indent(true)) .trim() .lines() // normalize paths diff --git a/datafusion/core/tests/sql/order.rs b/datafusion/core/tests/sql/order.rs index e1a8221ecc6cf..2054a9937c989 100644 --- a/datafusion/core/tests/sql/order.rs +++ b/datafusion/core/tests/sql/order.rs @@ -220,7 +220,7 @@ ORDER BY 1, 2; " ProjectionExec: expr=[column1@0 as t]", " ValuesExec", ]; - let formatted = displayable(plan.as_ref()).indent().to_string(); + let formatted = displayable(plan.as_ref()).indent(true).to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); assert_eq!( expected, actual, diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs index c237d8e632c97..493d90d91a81c 100644 --- a/datafusion/core/tests/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined_plan.rs @@ -484,7 +484,7 @@ impl ExecutionPlan for TopKExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "TopKExec: k={}", self.k) } } diff --git a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs index dd1504679ab03..25d60471a9cd2 100644 --- a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs @@ -66,8 +66,11 @@ async fn parquet_exec() -> Result<()> { consumer::from_substrait_rel(&mut ctx, substrait_rel.as_ref(), &HashMap::new()) .await?; - let expected = format!("{}", displayable(parquet_exec.as_ref()).indent()); - let actual = format!("{}", displayable(parquet_exec_roundtrip.as_ref()).indent()); + let expected = format!("{}", displayable(parquet_exec.as_ref()).indent(true)); + let actual = format!( + "{}", + displayable(parquet_exec_roundtrip.as_ref()).indent(true) + ); assert_eq!(expected, actual); Ok(())