Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions datafusion/physical-plan/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,16 @@ impl DisplayAs for StreamingTableExec {
Ok(())
}
DisplayFormatType::TreeRender => {
// TODO: collect info
write!(f, "")
if self.infinite {
writeln!(f, "infinite={}", self.infinite)?;
}
if let Some(limit) = self.limit {
write!(f, "limit={limit}")?;
} else {
write!(f, "limit=None")?;
}

Ok(())
}
}
}
Expand Down
229 changes: 227 additions & 2 deletions datafusion/sqllogictest/test_files/explain_tree.slt
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,10 @@ physical_plan
08)└─────────────┬─────────────┘
09)┌─────────────┴─────────────┐
10)│ StreamingTableExec │
11)└───────────────────────────┘
11)│ -------------------- │
12)│ infinite: true │
13)│ limit: None │
14)└───────────────────────────┘

query TT
EXPLAIN SELECT *
Expand All @@ -779,7 +782,10 @@ physical_plan
10)└─────────────┬─────────────┘
11)┌─────────────┴─────────────┐
12)│ StreamingTableExec │
13)└───────────────────────────┘
13)│ -------------------- │
14)│ infinite: true │
15)│ limit: None │
16)└───────────────────────────┘

# Query with hash join.
query TT
Expand Down Expand Up @@ -912,3 +918,222 @@ drop table table4;

statement ok
drop table table5;

# Test on StreamingTableExec
# prepare table
statement ok
CREATE UNBOUNDED EXTERNAL TABLE data (
"date" DATE,
"ticker" VARCHAR,
"time" TIMESTAMP,
) STORED AS CSV
WITH ORDER ("date", "ticker", "time")
LOCATION './a.parquet';


# query
query TT
explain SELECT * FROM data
WHERE ticker = 'A'
ORDER BY "date", "time";
----
logical_plan
01)Sort: data.date ASC NULLS LAST, data.time ASC NULLS LAST
02)--Filter: data.ticker = Utf8("A")
03)----TableScan: data projection=[date, ticker, time]
physical_plan
01)┌───────────────────────────┐
02)│ SortPreservingMergeExec │
03)└─────────────┬─────────────┘
04)┌─────────────┴─────────────┐
05)│ CoalesceBatchesExec │
06)└─────────────┬─────────────┘
07)┌─────────────┴─────────────┐
08)│ FilterExec │
09)│ -------------------- │
10)│ predicate: │
11)│ ticker@1 = A │
12)└─────────────┬─────────────┘
13)┌─────────────┴─────────────┐
14)│ RepartitionExec │
15)└─────────────┬─────────────┘
16)┌─────────────┴─────────────┐
17)│ StreamingTableExec │
18)│ -------------------- │
19)│ infinite: true │
20)│ limit: None │
21)└───────────────────────────┘


# constant ticker, CAST(time AS DATE) = time, order by time
query TT
explain SELECT * FROM data
WHERE ticker = 'A' AND CAST(time AS DATE) = date
ORDER BY "time"
----
logical_plan
01)Sort: data.time ASC NULLS LAST
02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
03)----TableScan: data projection=[date, ticker, time]
physical_plan
01)┌───────────────────────────┐
02)│ SortPreservingMergeExec │
03)└─────────────┬─────────────┘
04)┌─────────────┴─────────────┐
05)│ CoalesceBatchesExec │
06)└─────────────┬─────────────┘
07)┌─────────────┴─────────────┐
08)│ FilterExec │
09)│ -------------------- │
10)│ predicate: │
11)│ ticker@1 = A AND CAST(time│
12)│ @2 AS Date32) = date@0 │
13)└─────────────┬─────────────┘
14)┌─────────────┴─────────────┐
15)│ RepartitionExec │
16)└─────────────┬─────────────┘
17)┌─────────────┴─────────────┐
18)│ StreamingTableExec │
19)│ -------------------- │
20)│ infinite: true │
21)│ limit: None │
22)└───────────────────────────┘

# same thing but order by date
query TT
explain SELECT * FROM data
WHERE ticker = 'A' AND CAST(time AS DATE) = date
ORDER BY "date"
----
logical_plan
01)Sort: data.date ASC NULLS LAST
02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
03)----TableScan: data projection=[date, ticker, time]
physical_plan
01)┌───────────────────────────┐
02)│ SortPreservingMergeExec │
03)└─────────────┬─────────────┘
04)┌─────────────┴─────────────┐
05)│ CoalesceBatchesExec │
06)└─────────────┬─────────────┘
07)┌─────────────┴─────────────┐
08)│ FilterExec │
09)│ -------------------- │
10)│ predicate: │
11)│ ticker@1 = A AND CAST(time│
12)│ @2 AS Date32) = date@0 │
13)└─────────────┬─────────────┘
14)┌─────────────┴─────────────┐
15)│ RepartitionExec │
16)└─────────────┬─────────────┘
17)┌─────────────┴─────────────┐
18)│ StreamingTableExec │
19)│ -------------------- │
20)│ infinite: true │
21)│ limit: None │
22)└───────────────────────────┘

# same thing but order by ticker
query TT
explain SELECT * FROM data
WHERE ticker = 'A' AND CAST(time AS DATE) = date
ORDER BY "ticker"
----
logical_plan
01)Sort: data.ticker ASC NULLS LAST
02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
03)----TableScan: data projection=[date, ticker, time]
physical_plan
01)┌───────────────────────────┐
02)│ CoalescePartitionsExec │
03)└─────────────┬─────────────┘
04)┌─────────────┴─────────────┐
05)│ CoalesceBatchesExec │
06)└─────────────┬─────────────┘
07)┌─────────────┴─────────────┐
08)│ FilterExec │
09)│ -------------------- │
10)│ predicate: │
11)│ ticker@1 = A AND CAST(time│
12)│ @2 AS Date32) = date@0 │
13)└─────────────┬─────────────┘
14)┌─────────────┴─────────────┐
15)│ RepartitionExec │
16)└─────────────┬─────────────┘
17)┌─────────────┴─────────────┐
18)│ StreamingTableExec │
19)│ -------------------- │
20)│ infinite: true │
21)│ limit: None │
22)└───────────────────────────┘


# same thing but order by time, date
query TT
explain SELECT * FROM data
WHERE ticker = 'A' AND CAST(time AS DATE) = date
ORDER BY "time", "date";
----
logical_plan
01)Sort: data.time ASC NULLS LAST, data.date ASC NULLS LAST
02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
03)----TableScan: data projection=[date, ticker, time]
physical_plan
01)┌───────────────────────────┐
02)│ SortPreservingMergeExec │
03)└─────────────┬─────────────┘
04)┌─────────────┴─────────────┐
05)│ CoalesceBatchesExec │
06)└─────────────┬─────────────┘
07)┌─────────────┴─────────────┐
08)│ FilterExec │
09)│ -------------------- │
10)│ predicate: │
11)│ ticker@1 = A AND CAST(time│
12)│ @2 AS Date32) = date@0 │
13)└─────────────┬─────────────┘
14)┌─────────────┴─────────────┐
15)│ RepartitionExec │
16)└─────────────┬─────────────┘
17)┌─────────────┴─────────────┐
18)│ StreamingTableExec │
19)│ -------------------- │
20)│ infinite: true │
21)│ limit: None │
22)└───────────────────────────┘




# query
query TT
explain SELECT * FROM data
WHERE date = '2006-01-02'
ORDER BY "ticker", "time";
----
logical_plan
01)Sort: data.ticker ASC NULLS LAST, data.time ASC NULLS LAST
02)--Filter: data.date = Date32("2006-01-02")
03)----TableScan: data projection=[date, ticker, time]
physical_plan
01)┌───────────────────────────┐
02)│ SortPreservingMergeExec │
03)└─────────────┬─────────────┘
04)┌─────────────┴─────────────┐
05)│ CoalesceBatchesExec │
06)└─────────────┬─────────────┘
07)┌─────────────┴─────────────┐
08)│ FilterExec │
09)│ -------------------- │
10)│ predicate: │
11)│ date@0 = 2006-01-02 │
12)└─────────────┬─────────────┘
13)┌─────────────┴─────────────┐
14)│ RepartitionExec │
15)└─────────────┬─────────────┘
16)┌─────────────┴─────────────┐
17)│ StreamingTableExec │
18)│ -------------------- │
19)│ infinite: true │
20)│ limit: None │
21)└───────────────────────────┘