diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index 8104e8acf1f6e..18c472a7e1874 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -209,8 +209,16 @@ impl DisplayAs for StreamingTableExec { Ok(()) } DisplayFormatType::TreeRender => { - // TODO: collect info - write!(f, "") + if self.infinite { + writeln!(f, "infinite={}", self.infinite)?; + } + if let Some(limit) = self.limit { + write!(f, "limit={limit}")?; + } else { + write!(f, "limit=None")?; + } + + Ok(()) } } } diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 465b09e653b50..b4fd7467acbf6 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -755,7 +755,10 @@ physical_plan 08)└─────────────┬─────────────┘ 09)┌─────────────┴─────────────┐ 10)│ StreamingTableExec │ -11)└───────────────────────────┘ +11)│ -------------------- │ +12)│ infinite: true │ +13)│ limit: None │ +14)└───────────────────────────┘ query TT EXPLAIN SELECT * @@ -779,7 +782,10 @@ physical_plan 10)└─────────────┬─────────────┘ 11)┌─────────────┴─────────────┐ 12)│ StreamingTableExec │ -13)└───────────────────────────┘ +13)│ -------------------- │ +14)│ infinite: true │ +15)│ limit: None │ +16)└───────────────────────────┘ # Query with hash join. query TT @@ -912,3 +918,222 @@ drop table table4; statement ok drop table table5; + +# Test on StreamingTableExec +# prepare table +statement ok +CREATE UNBOUNDED EXTERNAL TABLE data ( + "date" DATE, + "ticker" VARCHAR, + "time" TIMESTAMP, +) STORED AS CSV +WITH ORDER ("date", "ticker", "time") +LOCATION './a.parquet'; + + +# query +query TT +explain SELECT * FROM data +WHERE ticker = 'A' +ORDER BY "date", "time"; +---- +logical_plan +01)Sort: data.date ASC NULLS LAST, data.time ASC NULLS LAST +02)--Filter: data.ticker = Utf8("A") +03)----TableScan: data projection=[date, ticker, time] +physical_plan +01)┌───────────────────────────┐ +02)│ SortPreservingMergeExec │ +03)└─────────────┬─────────────┘ +04)┌─────────────┴─────────────┐ +05)│ CoalesceBatchesExec │ +06)└─────────────┬─────────────┘ +07)┌─────────────┴─────────────┐ +08)│ FilterExec │ +09)│ -------------------- │ +10)│ predicate: │ +11)│ ticker@1 = A │ +12)└─────────────┬─────────────┘ +13)┌─────────────┴─────────────┐ +14)│ RepartitionExec │ +15)└─────────────┬─────────────┘ +16)┌─────────────┴─────────────┐ +17)│ StreamingTableExec │ +18)│ -------------------- │ +19)│ infinite: true │ +20)│ limit: None │ +21)└───────────────────────────┘ + + +# constant ticker, CAST(time AS DATE) = time, order by time +query TT +explain SELECT * FROM data +WHERE ticker = 'A' AND CAST(time AS DATE) = date +ORDER BY "time" +---- +logical_plan +01)Sort: data.time ASC NULLS LAST +02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date +03)----TableScan: data projection=[date, ticker, time] +physical_plan +01)┌───────────────────────────┐ +02)│ SortPreservingMergeExec │ +03)└─────────────┬─────────────┘ +04)┌─────────────┴─────────────┐ +05)│ CoalesceBatchesExec │ +06)└─────────────┬─────────────┘ +07)┌─────────────┴─────────────┐ +08)│ FilterExec │ +09)│ -------------------- │ +10)│ predicate: │ +11)│ ticker@1 = A AND CAST(time│ +12)│ @2 AS Date32) = date@0 │ +13)└─────────────┬─────────────┘ +14)┌─────────────┴─────────────┐ +15)│ RepartitionExec │ +16)└─────────────┬─────────────┘ +17)┌─────────────┴─────────────┐ +18)│ StreamingTableExec │ +19)│ -------------------- │ +20)│ infinite: true │ +21)│ limit: None │ +22)└───────────────────────────┘ + +# same thing but order by date +query TT +explain SELECT * FROM data +WHERE ticker = 'A' AND CAST(time AS DATE) = date +ORDER BY "date" +---- +logical_plan +01)Sort: data.date ASC NULLS LAST +02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date +03)----TableScan: data projection=[date, ticker, time] +physical_plan +01)┌───────────────────────────┐ +02)│ SortPreservingMergeExec │ +03)└─────────────┬─────────────┘ +04)┌─────────────┴─────────────┐ +05)│ CoalesceBatchesExec │ +06)└─────────────┬─────────────┘ +07)┌─────────────┴─────────────┐ +08)│ FilterExec │ +09)│ -------------------- │ +10)│ predicate: │ +11)│ ticker@1 = A AND CAST(time│ +12)│ @2 AS Date32) = date@0 │ +13)└─────────────┬─────────────┘ +14)┌─────────────┴─────────────┐ +15)│ RepartitionExec │ +16)└─────────────┬─────────────┘ +17)┌─────────────┴─────────────┐ +18)│ StreamingTableExec │ +19)│ -------------------- │ +20)│ infinite: true │ +21)│ limit: None │ +22)└───────────────────────────┘ + +# same thing but order by ticker +query TT +explain SELECT * FROM data +WHERE ticker = 'A' AND CAST(time AS DATE) = date +ORDER BY "ticker" +---- +logical_plan +01)Sort: data.ticker ASC NULLS LAST +02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date +03)----TableScan: data projection=[date, ticker, time] +physical_plan +01)┌───────────────────────────┐ +02)│ CoalescePartitionsExec │ +03)└─────────────┬─────────────┘ +04)┌─────────────┴─────────────┐ +05)│ CoalesceBatchesExec │ +06)└─────────────┬─────────────┘ +07)┌─────────────┴─────────────┐ +08)│ FilterExec │ +09)│ -------------------- │ +10)│ predicate: │ +11)│ ticker@1 = A AND CAST(time│ +12)│ @2 AS Date32) = date@0 │ +13)└─────────────┬─────────────┘ +14)┌─────────────┴─────────────┐ +15)│ RepartitionExec │ +16)└─────────────┬─────────────┘ +17)┌─────────────┴─────────────┐ +18)│ StreamingTableExec │ +19)│ -------------------- │ +20)│ infinite: true │ +21)│ limit: None │ +22)└───────────────────────────┘ + + +# same thing but order by time, date +query TT +explain SELECT * FROM data +WHERE ticker = 'A' AND CAST(time AS DATE) = date +ORDER BY "time", "date"; +---- +logical_plan +01)Sort: data.time ASC NULLS LAST, data.date ASC NULLS LAST +02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date +03)----TableScan: data projection=[date, ticker, time] +physical_plan +01)┌───────────────────────────┐ +02)│ SortPreservingMergeExec │ +03)└─────────────┬─────────────┘ +04)┌─────────────┴─────────────┐ +05)│ CoalesceBatchesExec │ +06)└─────────────┬─────────────┘ +07)┌─────────────┴─────────────┐ +08)│ FilterExec │ +09)│ -------------------- │ +10)│ predicate: │ +11)│ ticker@1 = A AND CAST(time│ +12)│ @2 AS Date32) = date@0 │ +13)└─────────────┬─────────────┘ +14)┌─────────────┴─────────────┐ +15)│ RepartitionExec │ +16)└─────────────┬─────────────┘ +17)┌─────────────┴─────────────┐ +18)│ StreamingTableExec │ +19)│ -------------------- │ +20)│ infinite: true │ +21)│ limit: None │ +22)└───────────────────────────┘ + + + + +# query +query TT +explain SELECT * FROM data +WHERE date = '2006-01-02' +ORDER BY "ticker", "time"; +---- +logical_plan +01)Sort: data.ticker ASC NULLS LAST, data.time ASC NULLS LAST +02)--Filter: data.date = Date32("2006-01-02") +03)----TableScan: data projection=[date, ticker, time] +physical_plan +01)┌───────────────────────────┐ +02)│ SortPreservingMergeExec │ +03)└─────────────┬─────────────┘ +04)┌─────────────┴─────────────┐ +05)│ CoalesceBatchesExec │ +06)└─────────────┬─────────────┘ +07)┌─────────────┴─────────────┐ +08)│ FilterExec │ +09)│ -------------------- │ +10)│ predicate: │ +11)│ date@0 = 2006-01-02 │ +12)└─────────────┬─────────────┘ +13)┌─────────────┴─────────────┐ +14)│ RepartitionExec │ +15)└─────────────┬─────────────┘ +16)┌─────────────┴─────────────┐ +17)│ StreamingTableExec │ +18)│ -------------------- │ +19)│ infinite: true │ +20)│ limit: None │ +21)└───────────────────────────┘