From 9c25e51e3b28b632a2675496fc4deebaab65ff4c Mon Sep 17 00:00:00 2001 From: Alan Tang Date: Sat, 8 Mar 2025 15:06:48 +0800 Subject: [PATCH 1/2] feat: support tree rendering for StreamingTableExec Signed-off-by: Alan Tang --- datafusion/physical-plan/src/streaming.rs | 23 +- .../sqllogictest/test_files/explain_tree.slt | 259 ++++++++++++++++++ 2 files changed, 280 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index 8104e8acf1f6e..5f93f3979df0c 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -209,8 +209,27 @@ impl DisplayAs for StreamingTableExec { Ok(()) } DisplayFormatType::TreeRender => { - // TODO: collect info - write!(f, "") + write!( + f, + "StreamingTableExec: partition_sizes={:?}", + self.partitions.len(), + )?; + if !self.projected_schema.fields().is_empty() { + write!( + f, + ", projection={}", + ProjectSchemaDisplay(&self.projected_schema) + )?; + } + if self.infinite { + write!(f, ", infinite_source=true")?; + } + if let Some(fetch) = self.limit { + write!(f, ", fetch={fetch}")?; + } + + display_orderings(f, &self.projected_output_ordering)?; + Ok(()) } } } diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 465b09e653b50..62cc9b79d769a 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -912,3 +912,262 @@ drop table table4; statement ok drop table table5; + +# Test on StreamingTableExec +# prepare table +statement ok +CREATE UNBOUNDED EXTERNAL TABLE data ( + "date" DATE, + "ticker" VARCHAR, + "time" TIMESTAMP, +) STORED AS CSV +WITH ORDER ("date", "ticker", "time") +LOCATION './a.parquet'; + + +# query +query TT +explain SELECT * FROM data +WHERE ticker = 'A' +ORDER BY "date", "time"; +---- +logical_plan +01)Sort: data.date ASC NULLS LAST, data.time ASC NULLS LAST +02)--Filter: data.ticker = Utf8("A") +03)----TableScan: data projection=[date, ticker, time] +physical_plan +01)┌───────────────────────────┐ +02)│ SortPreservingMergeExec │ +03)└─────────────┬─────────────┘ +04)┌─────────────┴─────────────┐ +05)│ CoalesceBatchesExec │ +06)└─────────────┬─────────────┘ +07)┌─────────────┴─────────────┐ +08)│ FilterExec │ +09)│ -------------------- │ +10)│ predicate: │ +11)│ ticker@1 = A │ +12)└─────────────┬─────────────┘ +13)┌─────────────┴─────────────┐ +14)│ RepartitionExec │ +15)└─────────────┬─────────────┘ +16)┌─────────────┴─────────────┐ +17)│ StreamingTableExec │ +18)│ -------------------- │ +19)│ StreamingTableExec: │ +20)│ partition_sizes: │ +21)│ 1, projection=[date, │ +22)│ ticker, time], │ +23)│ infinite_source=true │ +24)│ , output_ordering=[date@0 │ +25)│ ASC NULLS LAST, ticker@1 │ +26)│ ASC NULLS LAST, time@2 │ +27)│ ASC NULLS LAST] │ +28)└───────────────────────────┘ + + +# constant ticker, CAST(time AS DATE) = time, order by time +query TT +explain SELECT * FROM data +WHERE ticker = 'A' AND CAST(time AS DATE) = date +ORDER BY "time" +---- +logical_plan +01)Sort: data.time ASC NULLS LAST +02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date +03)----TableScan: data projection=[date, ticker, time] +physical_plan +01)┌───────────────────────────┐ +02)│ SortPreservingMergeExec │ +03)└─────────────┬─────────────┘ +04)┌─────────────┴─────────────┐ +05)│ CoalesceBatchesExec │ +06)└─────────────┬─────────────┘ +07)┌─────────────┴─────────────┐ +08)│ FilterExec │ +09)│ -------------------- │ +10)│ predicate: │ +11)│ ticker@1 = A AND CAST(time│ +12)│ @2 AS Date32) = date@0 │ +13)└─────────────┬─────────────┘ +14)┌─────────────┴─────────────┐ +15)│ RepartitionExec │ +16)└─────────────┬─────────────┘ +17)┌─────────────┴─────────────┐ +18)│ StreamingTableExec │ +19)│ -------------------- │ +20)│ StreamingTableExec: │ +21)│ partition_sizes: │ +22)│ 1, projection=[date, │ +23)│ ticker, time], │ +24)│ infinite_source=true │ +25)│ , output_ordering=[date@0 │ +26)│ ASC NULLS LAST, ticker@1 │ +27)│ ASC NULLS LAST, time@2 │ +28)│ ASC NULLS LAST] │ +29)└───────────────────────────┘ + +# same thing but order by date +query TT +explain SELECT * FROM data +WHERE ticker = 'A' AND CAST(time AS DATE) = date +ORDER BY "date" +---- +logical_plan +01)Sort: data.date ASC NULLS LAST +02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date +03)----TableScan: data projection=[date, ticker, time] +physical_plan +01)┌───────────────────────────┐ +02)│ SortPreservingMergeExec │ +03)└─────────────┬─────────────┘ +04)┌─────────────┴─────────────┐ +05)│ CoalesceBatchesExec │ +06)└─────────────┬─────────────┘ +07)┌─────────────┴─────────────┐ +08)│ FilterExec │ +09)│ -------------------- │ +10)│ predicate: │ +11)│ ticker@1 = A AND CAST(time│ +12)│ @2 AS Date32) = date@0 │ +13)└─────────────┬─────────────┘ +14)┌─────────────┴─────────────┐ +15)│ RepartitionExec │ +16)└─────────────┬─────────────┘ +17)┌─────────────┴─────────────┐ +18)│ StreamingTableExec │ +19)│ -------------------- │ +20)│ StreamingTableExec: │ +21)│ partition_sizes: │ +22)│ 1, projection=[date, │ +23)│ ticker, time], │ +24)│ infinite_source=true │ +25)│ , output_ordering=[date@0 │ +26)│ ASC NULLS LAST, ticker@1 │ +27)│ ASC NULLS LAST, time@2 │ +28)│ ASC NULLS LAST] │ +29)└───────────────────────────┘ + +# same thing but order by ticker +query TT +explain SELECT * FROM data +WHERE ticker = 'A' AND CAST(time AS DATE) = date +ORDER BY "ticker" +---- +logical_plan +01)Sort: data.ticker ASC NULLS LAST +02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date +03)----TableScan: data projection=[date, ticker, time] +physical_plan +01)┌───────────────────────────┐ +02)│ CoalescePartitionsExec │ +03)└─────────────┬─────────────┘ +04)┌─────────────┴─────────────┐ +05)│ CoalesceBatchesExec │ +06)└─────────────┬─────────────┘ +07)┌─────────────┴─────────────┐ +08)│ FilterExec │ +09)│ -------------------- │ +10)│ predicate: │ +11)│ ticker@1 = A AND CAST(time│ +12)│ @2 AS Date32) = date@0 │ +13)└─────────────┬─────────────┘ +14)┌─────────────┴─────────────┐ +15)│ RepartitionExec │ +16)└─────────────┬─────────────┘ +17)┌─────────────┴─────────────┐ +18)│ StreamingTableExec │ +19)│ -------------------- │ +20)│ StreamingTableExec: │ +21)│ partition_sizes: │ +22)│ 1, projection=[date, │ +23)│ ticker, time], │ +24)│ infinite_source=true │ +25)│ , output_ordering=[date@0 │ +26)│ ASC NULLS LAST, ticker@1 │ +27)│ ASC NULLS LAST, time@2 │ +28)│ ASC NULLS LAST] │ +29)└───────────────────────────┘ + +# same thing but order by time, date +query TT +explain SELECT * FROM data +WHERE ticker = 'A' AND CAST(time AS DATE) = date +ORDER BY "time", "date"; +---- +logical_plan +01)Sort: data.time ASC NULLS LAST, data.date ASC NULLS LAST +02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date +03)----TableScan: data projection=[date, ticker, time] +physical_plan +01)┌───────────────────────────┐ +02)│ SortPreservingMergeExec │ +03)└─────────────┬─────────────┘ +04)┌─────────────┴─────────────┐ +05)│ CoalesceBatchesExec │ +06)└─────────────┬─────────────┘ +07)┌─────────────┴─────────────┐ +08)│ FilterExec │ +09)│ -------------------- │ +10)│ predicate: │ +11)│ ticker@1 = A AND CAST(time│ +12)│ @2 AS Date32) = date@0 │ +13)└─────────────┬─────────────┘ +14)┌─────────────┴─────────────┐ +15)│ RepartitionExec │ +16)└─────────────┬─────────────┘ +17)┌─────────────┴─────────────┐ +18)│ StreamingTableExec │ +19)│ -------------------- │ +20)│ StreamingTableExec: │ +21)│ partition_sizes: │ +22)│ 1, projection=[date, │ +23)│ ticker, time], │ +24)│ infinite_source=true │ +25)│ , output_ordering=[date@0 │ +26)│ ASC NULLS LAST, ticker@1 │ +27)│ ASC NULLS LAST, time@2 │ +28)│ ASC NULLS LAST] │ +29)└───────────────────────────┘ + + + +# query +query TT +explain SELECT * FROM data +WHERE date = '2006-01-02' +ORDER BY "ticker", "time"; +---- +logical_plan +01)Sort: data.ticker ASC NULLS LAST, data.time ASC NULLS LAST +02)--Filter: data.date = Date32("2006-01-02") +03)----TableScan: data projection=[date, ticker, time] +physical_plan +01)┌───────────────────────────┐ +02)│ SortPreservingMergeExec │ +03)└─────────────┬─────────────┘ +04)┌─────────────┴─────────────┐ +05)│ CoalesceBatchesExec │ +06)└─────────────┬─────────────┘ +07)┌─────────────┴─────────────┐ +08)│ FilterExec │ +09)│ -------------------- │ +10)│ predicate: │ +11)│ date@0 = 2006-01-02 │ +12)└─────────────┬─────────────┘ +13)┌─────────────┴─────────────┐ +14)│ RepartitionExec │ +15)└─────────────┬─────────────┘ +16)┌─────────────┴─────────────┐ +17)│ StreamingTableExec │ +18)│ -------------------- │ +19)│ StreamingTableExec: │ +20)│ partition_sizes: │ +21)│ 1, projection=[date, │ +22)│ ticker, time], │ +23)│ infinite_source=true │ +24)│ , output_ordering=[date@0 │ +25)│ ASC NULLS LAST, ticker@1 │ +26)│ ASC NULLS LAST, time@2 │ +27)│ ASC NULLS LAST] │ +28)└───────────────────────────┘ From 40397bb5abd4b9723e7683168012226db364b4e9 Mon Sep 17 00:00:00 2001 From: Alan Tang Date: Mon, 10 Mar 2025 09:17:27 +0800 Subject: [PATCH 2/2] feat: simpler expr for streamingExec Signed-off-by: Alan Tang chore: Describe more precisely Signed-off-by: Alan Tang --- datafusion/physical-plan/src/streaming.rs | 21 ++--- .../sqllogictest/test_files/explain_tree.slt | 90 ++++++------------- 2 files changed, 33 insertions(+), 78 deletions(-) diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index 5f93f3979df0c..18c472a7e1874 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -209,26 +209,15 @@ impl DisplayAs for StreamingTableExec { Ok(()) } DisplayFormatType::TreeRender => { - write!( - f, - "StreamingTableExec: partition_sizes={:?}", - self.partitions.len(), - )?; - if !self.projected_schema.fields().is_empty() { - write!( - f, - ", projection={}", - ProjectSchemaDisplay(&self.projected_schema) - )?; - } if self.infinite { - write!(f, ", infinite_source=true")?; + writeln!(f, "infinite={}", self.infinite)?; } - if let Some(fetch) = self.limit { - write!(f, ", fetch={fetch}")?; + if let Some(limit) = self.limit { + write!(f, "limit={limit}")?; + } else { + write!(f, "limit=None")?; } - display_orderings(f, &self.projected_output_ordering)?; Ok(()) } } diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 62cc9b79d769a..b4fd7467acbf6 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -755,7 +755,10 @@ physical_plan 08)└─────────────┬─────────────┘ 09)┌─────────────┴─────────────┐ 10)│ StreamingTableExec │ -11)└───────────────────────────┘ +11)│ -------------------- │ +12)│ infinite: true │ +13)│ limit: None │ +14)└───────────────────────────┘ query TT EXPLAIN SELECT * @@ -779,7 +782,10 @@ physical_plan 10)└─────────────┬─────────────┘ 11)┌─────────────┴─────────────┐ 12)│ StreamingTableExec │ -13)└───────────────────────────┘ +13)│ -------------------- │ +14)│ infinite: true │ +15)│ limit: None │ +16)└───────────────────────────┘ # Query with hash join. query TT @@ -954,16 +960,9 @@ physical_plan 16)┌─────────────┴─────────────┐ 17)│ StreamingTableExec │ 18)│ -------------------- │ -19)│ StreamingTableExec: │ -20)│ partition_sizes: │ -21)│ 1, projection=[date, │ -22)│ ticker, time], │ -23)│ infinite_source=true │ -24)│ , output_ordering=[date@0 │ -25)│ ASC NULLS LAST, ticker@1 │ -26)│ ASC NULLS LAST, time@2 │ -27)│ ASC NULLS LAST] │ -28)└───────────────────────────┘ +19)│ infinite: true │ +20)│ limit: None │ +21)└───────────────────────────┘ # constant ticker, CAST(time AS DATE) = time, order by time @@ -996,16 +995,9 @@ physical_plan 17)┌─────────────┴─────────────┐ 18)│ StreamingTableExec │ 19)│ -------------------- │ -20)│ StreamingTableExec: │ -21)│ partition_sizes: │ -22)│ 1, projection=[date, │ -23)│ ticker, time], │ -24)│ infinite_source=true │ -25)│ , output_ordering=[date@0 │ -26)│ ASC NULLS LAST, ticker@1 │ -27)│ ASC NULLS LAST, time@2 │ -28)│ ASC NULLS LAST] │ -29)└───────────────────────────┘ +20)│ infinite: true │ +21)│ limit: None │ +22)└───────────────────────────┘ # same thing but order by date query TT @@ -1037,16 +1029,9 @@ physical_plan 17)┌─────────────┴─────────────┐ 18)│ StreamingTableExec │ 19)│ -------------------- │ -20)│ StreamingTableExec: │ -21)│ partition_sizes: │ -22)│ 1, projection=[date, │ -23)│ ticker, time], │ -24)│ infinite_source=true │ -25)│ , output_ordering=[date@0 │ -26)│ ASC NULLS LAST, ticker@1 │ -27)│ ASC NULLS LAST, time@2 │ -28)│ ASC NULLS LAST] │ -29)└───────────────────────────┘ +20)│ infinite: true │ +21)│ limit: None │ +22)└───────────────────────────┘ # same thing but order by ticker query TT @@ -1078,16 +1063,10 @@ physical_plan 17)┌─────────────┴─────────────┐ 18)│ StreamingTableExec │ 19)│ -------------------- │ -20)│ StreamingTableExec: │ -21)│ partition_sizes: │ -22)│ 1, projection=[date, │ -23)│ ticker, time], │ -24)│ infinite_source=true │ -25)│ , output_ordering=[date@0 │ -26)│ ASC NULLS LAST, ticker@1 │ -27)│ ASC NULLS LAST, time@2 │ -28)│ ASC NULLS LAST] │ -29)└───────────────────────────┘ +20)│ infinite: true │ +21)│ limit: None │ +22)└───────────────────────────┘ + # same thing but order by time, date query TT @@ -1119,16 +1098,10 @@ physical_plan 17)┌─────────────┴─────────────┐ 18)│ StreamingTableExec │ 19)│ -------------------- │ -20)│ StreamingTableExec: │ -21)│ partition_sizes: │ -22)│ 1, projection=[date, │ -23)│ ticker, time], │ -24)│ infinite_source=true │ -25)│ , output_ordering=[date@0 │ -26)│ ASC NULLS LAST, ticker@1 │ -27)│ ASC NULLS LAST, time@2 │ -28)│ ASC NULLS LAST] │ -29)└───────────────────────────┘ +20)│ infinite: true │ +21)│ limit: None │ +22)└───────────────────────────┘ + @@ -1161,13 +1134,6 @@ physical_plan 16)┌─────────────┴─────────────┐ 17)│ StreamingTableExec │ 18)│ -------------------- │ -19)│ StreamingTableExec: │ -20)│ partition_sizes: │ -21)│ 1, projection=[date, │ -22)│ ticker, time], │ -23)│ infinite_source=true │ -24)│ , output_ordering=[date@0 │ -25)│ ASC NULLS LAST, ticker@1 │ -26)│ ASC NULLS LAST, time@2 │ -27)│ ASC NULLS LAST] │ -28)└───────────────────────────┘ +19)│ infinite: true │ +20)│ limit: None │ +21)└───────────────────────────┘