Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions benchmarks/expected-plans/q18.txt
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
Sort: orders.o_totalprice DESC NULLS FIRST, orders.o_orderdate ASC NULLS LAST
Projection: customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, SUM(lineitem.l_quantity)
Aggregate: groupBy=[[customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice]], aggr=[[SUM(lineitem.l_quantity)]]
LeftSemi Join: orders.o_orderkey = __correlated_sq_1.l_orderkey
Inner Join: orders.o_orderkey = lineitem.l_orderkey
Inner Join: customer.c_custkey = orders.o_custkey
TableScan: customer projection=[c_custkey, c_name]
TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice, o_orderdate]
TableScan: lineitem projection=[l_orderkey, l_quantity]
SubqueryAlias: __correlated_sq_1
Projection: lineitem.l_orderkey AS l_orderkey
Filter: SUM(lineitem.l_quantity) > Decimal128(Some(30000),25,2)
Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_quantity)]]
TableScan: lineitem projection=[l_orderkey, l_quantity]
Aggregate: groupBy=[[customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice]], aggr=[[SUM(lineitem.l_quantity)]]
LeftSemi Join: orders.o_orderkey = __correlated_sq_1.l_orderkey
Inner Join: orders.o_orderkey = lineitem.l_orderkey
Inner Join: customer.c_custkey = orders.o_custkey
TableScan: customer projection=[c_custkey, c_name]
TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice, o_orderdate]
TableScan: lineitem projection=[l_orderkey, l_quantity]
SubqueryAlias: __correlated_sq_1
Projection: lineitem.l_orderkey AS l_orderkey
Filter: SUM(lineitem.l_quantity) > Decimal128(Some(30000),25,2)
Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_quantity)]]
TableScan: lineitem projection=[l_orderkey, l_quantity]
5 changes: 2 additions & 3 deletions benchmarks/expected-plans/q21.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ Sort: numwait DESC NULLS FIRST, supplier.s_name ASC NULLS LAST
TableScan: orders projection=[o_orderkey, o_orderstatus]
Filter: nation.n_name = Utf8("SAUDI ARABIA")
TableScan: nation projection=[n_nationkey, n_name]
Projection: l2.l_orderkey, l2.l_suppkey
SubqueryAlias: l2
TableScan: lineitem projection=[l_orderkey, l_suppkey]
SubqueryAlias: l2
TableScan: lineitem projection=[l_orderkey, l_suppkey]
Projection: l3.l_orderkey, l3.l_suppkey
SubqueryAlias: l3
Filter: lineitem.l_receiptdate > lineitem.l_commitdate
Expand Down
3 changes: 1 addition & 2 deletions benchmarks/expected-plans/q22.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ Sort: custsale.cntrycode ASC NULLS LAST
LeftAnti Join: customer.c_custkey = orders.o_custkey
Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
TableScan: customer projection=[c_custkey, c_phone, c_acctbal]
Projection: orders.o_custkey
TableScan: orders projection=[o_custkey]
TableScan: orders projection=[o_custkey]
SubqueryAlias: __scalar_sq_1
Projection: AVG(customer.c_acctbal) AS __value
Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]]
Expand Down
13 changes: 5 additions & 8 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,7 @@ mod tests {
let expected = "\
Explain\
\n CreateView: Bare { table: \"xyz\" }\
\n Projection: abc.column1, abc.column2, abc.column3\
\n TableScan: abc projection=[column1, column2, column3]";
\n TableScan: abc projection=[column1, column2, column3]";
assert_eq!(expected, actual);

let dataframe = session_ctx
Expand All @@ -512,9 +511,8 @@ mod tests {
let expected = "\
Explain\
\n CreateView: Bare { table: \"xyz\" }\
\n Projection: abc.column1, abc.column2, abc.column3\
\n Filter: abc.column2 = Int64(5)\
\n TableScan: abc projection=[column1, column2, column3]";
\n Filter: abc.column2 = Int64(5)\
\n TableScan: abc projection=[column1, column2, column3]";
assert_eq!(expected, actual);

let dataframe = session_ctx
Expand All @@ -525,9 +523,8 @@ mod tests {
let expected = "\
Explain\
\n CreateView: Bare { table: \"xyz\" }\
\n Projection: abc.column1, abc.column2\
\n Filter: abc.column2 = Int64(5)\
\n TableScan: abc projection=[column1, column2]";
\n Filter: abc.column2 = Int64(5)\
\n TableScan: abc projection=[column1, column2]";
assert_eq!(expected, actual);

Ok(())
Expand Down
16 changes: 2 additions & 14 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,6 @@ mod tests {
use crate::config::ConfigOptions;
use crate::datasource::MemTable;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::{memory::MemoryExec, repartition::RepartitionExec};
use crate::prelude::SessionContext;
use crate::test::create_vec_batches;
Expand All @@ -308,12 +307,7 @@ mod tests {

let ctx = SessionContext::with_config(config.into());
let plan = create_physical_plan(ctx).await?;
let projection = plan.as_any().downcast_ref::<ProjectionExec>().unwrap();
let coalesce = projection
.input()
.as_any()
.downcast_ref::<CoalesceBatchesExec>()
.unwrap();
let coalesce = plan.as_any().downcast_ref::<CoalesceBatchesExec>().unwrap();
assert_eq!(1234, coalesce.target_batch_size);
Ok(())
}
Expand All @@ -325,13 +319,7 @@ mod tests {

let ctx = SessionContext::with_config(config.into());
let plan = create_physical_plan(ctx).await?;
let projection = plan.as_any().downcast_ref::<ProjectionExec>().unwrap();
// projection should directly wrap filter with no coalesce step
let _filter = projection
.input()
.as_any()
.downcast_ref::<FilterExec>()
.unwrap();
let _filter = plan.as_any().downcast_ref::<FilterExec>().unwrap();
Ok(())
}

Expand Down
11 changes: 5 additions & 6 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,15 +318,14 @@ pub fn with_new_children_if_necessary(
/// let normalized = Path::from_filesystem_path(working_directory).unwrap();
/// let plan_string = plan_string.replace(normalized.as_ref(), "WORKING_DIR");
///
/// assert_eq!("ProjectionExec: expr=[a@0 as a]\
/// \n CoalesceBatchesExec: target_batch_size=8192\
/// \n FilterExec: a@0 < 5\
/// \n RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1\
/// \n CsvExec: files={1 group: [[WORKING_DIR/tests/data/example.csv]]}, has_header=true, limit=None, projection=[a]",
/// assert_eq!("CoalesceBatchesExec: target_batch_size=8192\
/// \n FilterExec: a@0 < 5\
/// \n RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1\
/// \n CsvExec: files={1 group: [[WORKING_DIR/tests/data/example.csv]]}, has_header=true, limit=None, projection=[a]",
/// plan_string.trim());
///
/// let one_line = format!("{}", displayable_plan.one_line());
/// assert_eq!("ProjectionExec: expr=[a@0 as a]", one_line.trim());
/// assert_eq!("CoalesceBatchesExec: target_batch_size=8192", one_line.trim());
/// }
/// ```
///
Expand Down
34 changes: 14 additions & 20 deletions datafusion/core/tests/custom_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow::record_batch::RecordBatch;
use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
use datafusion::from_slice::FromSlice;
use datafusion::logical_expr::{
col, Expr, LogicalPlan, LogicalPlanBuilder, Projection, TableScan, UNNAMED_TABLE,
col, Expr, LogicalPlan, LogicalPlanBuilder, TableScan, UNNAMED_TABLE,
};
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
Expand Down Expand Up @@ -214,24 +214,18 @@ async fn custom_source_dataframe() -> Result<()> {

let optimized_plan = state.optimize(&logical_plan)?;
match &optimized_plan {
LogicalPlan::Projection(Projection { input, .. }) => match &**input {
LogicalPlan::TableScan(TableScan {
source,
projected_schema,
..
}) => {
assert_eq!(source.schema().fields().len(), 2);
assert_eq!(projected_schema.fields().len(), 1);
}
_ => panic!("input to projection should be TableScan"),
},
_ => panic!("expect optimized_plan to be projection"),
LogicalPlan::TableScan(TableScan {
source,
projected_schema,
..
}) => {
assert_eq!(source.schema().fields().len(), 2);
assert_eq!(projected_schema.fields().len(), 1);
}
_ => panic!("input to projection should be TableScan"),
}

let expected = format!(
"Projection: {UNNAMED_TABLE}.c2\
\n TableScan: {UNNAMED_TABLE} projection=[c2]"
);
let expected = format!("TableScan: {UNNAMED_TABLE} projection=[c2]");
assert_eq!(format!("{optimized_plan:?}"), expected);

let physical_plan = state.create_physical_plan(&optimized_plan).await?;
Expand All @@ -242,7 +236,7 @@ async fn custom_source_dataframe() -> Result<()> {
let batches = collect(physical_plan, state.task_ctx()).await?;
let origin_rec_batch = TEST_CUSTOM_RECORD_BATCH!()?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
assert_eq!(2, batches[0].num_columns());
assert_eq!(origin_rec_batch.num_rows(), batches[0].num_rows());

Ok(())
Expand Down Expand Up @@ -270,8 +264,8 @@ async fn optimizers_catch_all_statistics() {
let expected = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("COUNT(UInt8(1))", DataType::Int64, false),
Field::new("MIN(test.c1)", DataType::Int32, false),
Field::new("MAX(test.c1)", DataType::Int32, false),
Field::new("MIN(c1)", DataType::Int32, false),
Field::new("MAX(c1)", DataType::Int32, false),
])),
vec![
Arc::new(Int64Array::from_slice([4])),
Expand Down
22 changes: 10 additions & 12 deletions datafusion/core/tests/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ async fn select_with_alias_overwrite() -> Result<()> {
)?;

let ctx = SessionContext::new();
ctx.register_batch("t", batch).unwrap();
ctx.register_batch("t", batch)?;

let df = ctx
.table("t")
Expand Down Expand Up @@ -502,12 +502,11 @@ async fn right_semi_with_alias_filter() -> Result<()> {
.select(vec![col("t2.a"), col("t2.b"), col("t2.c")])?;
let optimized_plan = df.clone().into_optimized_plan()?;
let expected = vec![
"Projection: t2.a, t2.b, t2.c [a:UInt32, b:Utf8, c:Int32]",
" RightSemi Join: t1.a = t2.a [a:UInt32, b:Utf8, c:Int32]",
" Filter: t1.c > Int32(1) [a:UInt32, c:Int32]",
" TableScan: t1 projection=[a, c] [a:UInt32, c:Int32]",
" Filter: t2.c > Int32(1) [a:UInt32, b:Utf8, c:Int32]",
" TableScan: t2 projection=[a, b, c] [a:UInt32, b:Utf8, c:Int32]",
"RightSemi Join: t1.a = t2.a [a:UInt32, b:Utf8, c:Int32]",
" Filter: t1.c > Int32(1) [a:UInt32, c:Int32]",
" TableScan: t1 projection=[a, c] [a:UInt32, c:Int32]",
" Filter: t2.c > Int32(1) [a:UInt32, b:Utf8, c:Int32]",
" TableScan: t2 projection=[a, b, c] [a:UInt32, b:Utf8, c:Int32]",
];

let formatted = optimized_plan.display_indent_schema().to_string();
Expand Down Expand Up @@ -547,11 +546,10 @@ async fn right_anti_filter_push_down() -> Result<()> {
.select(vec![col("t2.a"), col("t2.b"), col("t2.c")])?;
let optimized_plan = df.clone().into_optimized_plan()?;
let expected = vec![
"Projection: t2.a, t2.b, t2.c [a:UInt32, b:Utf8, c:Int32]",
" RightAnti Join: t1.a = t2.a Filter: t2.c > Int32(1) [a:UInt32, b:Utf8, c:Int32]",
" Filter: t1.c > Int32(1) [a:UInt32, c:Int32]",
" TableScan: t1 projection=[a, c] [a:UInt32, c:Int32]",
" TableScan: t2 projection=[a, b, c] [a:UInt32, b:Utf8, c:Int32]",
"RightAnti Join: t1.a = t2.a Filter: t2.c > Int32(1) [a:UInt32, b:Utf8, c:Int32]",
" Filter: t1.c > Int32(1) [a:UInt32, c:Int32]",
" TableScan: t1 projection=[a, c] [a:UInt32, c:Int32]",
" TableScan: t2 projection=[a, b, c] [a:UInt32, b:Utf8, c:Int32]",
];

let formatted = optimized_plan.display_indent_schema().to_string();
Expand Down
20 changes: 10 additions & 10 deletions datafusion/core/tests/sql/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ async fn aggregate_timestamps_count() -> Result<()> {
.await;

let expected = vec![
"+----------------+-----------------+-----------------+---------------+",
"| COUNT(t.nanos) | COUNT(t.micros) | COUNT(t.millis) | COUNT(t.secs) |",
"+----------------+-----------------+-----------------+---------------+",
"| 3 | 3 | 3 | 3 |",
"+----------------+-----------------+-----------------+---------------+",
"+--------------+---------------+---------------+-------------+",
"| COUNT(nanos) | COUNT(micros) | COUNT(millis) | COUNT(secs) |",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like an improvement to me

"+--------------+---------------+---------------+-------------+",
"| 3 | 3 | 3 | 3 |",
"+--------------+---------------+---------------+-------------+",
];
assert_batches_sorted_eq!(expected, &results);

Expand Down Expand Up @@ -185,11 +185,11 @@ async fn aggregate_times_count() -> Result<()> {
.await;

let expected = vec![
"+----------------+-----------------+-----------------+---------------+",
"| COUNT(t.nanos) | COUNT(t.micros) | COUNT(t.millis) | COUNT(t.secs) |",
"+----------------+-----------------+-----------------+---------------+",
"| 4 | 4 | 4 | 4 |",
"+----------------+-----------------+-----------------+---------------+",
"+--------------+---------------+---------------+-------------+",
"| COUNT(nanos) | COUNT(micros) | COUNT(millis) | COUNT(secs) |",
"+--------------+---------------+---------------+-------------+",
"| 4 | 4 | 4 | 4 |",
"+--------------+---------------+---------------+-------------+",
];
assert_batches_sorted_eq!(expected, &results);

Expand Down
16 changes: 7 additions & 9 deletions datafusion/core/tests/sql/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,18 +140,16 @@ async fn avro_explain() {
let expected = vec![
vec![
"logical_plan",
"Projection: COUNT(UInt8(1))\
\n Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
\n TableScan: alltypes_plain projection=[id]",
"Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
\n TableScan: alltypes_plain projection=[id]",
],
vec![
"physical_plan",
"ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))]\
\n AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
\n CoalescePartitionsExec\
\n AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\
\n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1\
\n AvroExec: files={1 group: [[ARROW_TEST_DATA/avro/alltypes_plain.avro]]}, limit=None\
"AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
\n CoalescePartitionsExec\
\n AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\
\n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1\
\n AvroExec: files={1 group: [[ARROW_TEST_DATA/avro/alltypes_plain.avro]]}, limit=None\
\n",
],
];
Expand Down
12 changes: 5 additions & 7 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,10 +757,9 @@ async fn explain_logical_plan_only() {
let expected = vec![
vec![
"logical_plan",
"Projection: COUNT(UInt8(1))\
\n Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
\n SubqueryAlias: t\
\n Values: (Utf8(\"a\"), Int64(1), Int64(100)), (Utf8(\"a\"), Int64(2), Int64(150))",
"Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
\n SubqueryAlias: t\
\n Values: (Utf8(\"a\"), Int64(1), Int64(100)), (Utf8(\"a\"), Int64(2), Int64(150))",
]];
assert_eq!(expected, actual);
}
Expand All @@ -776,9 +775,8 @@ async fn explain_physical_plan_only() {

let expected = vec![vec![
"physical_plan",
"ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))]\
\n ProjectionExec: expr=[2 as COUNT(UInt8(1))]\
\n EmptyExec: produce_one_row=true\
"ProjectionExec: expr=[2 as COUNT(UInt8(1))]\
\n EmptyExec: produce_one_row=true\
\n",
]];
assert_eq!(expected, actual);
Expand Down
Loading