From 5b21d42e79ec15aab54c808118dd3e0874bc3f69 Mon Sep 17 00:00:00 2001 From: jackwener Date: Sun, 20 Nov 2022 17:45:25 +0800 Subject: [PATCH 1/6] subqueryAlias replace alias-projection --- datafusion/expr/src/logical_plan/builder.rs | 31 +++++++++++++-------- datafusion/optimizer/src/limit_push_down.rs | 3 +- datafusion/sql/src/planner.rs | 28 +++++++------------ 3 files changed, 30 insertions(+), 32 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index e9ef0f0cf8e76..ebbaf58860929 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -980,19 +980,26 @@ pub fn project_with_alias( exprlist_to_fields(&projected_expr, &plan)?, plan.schema().metadata().clone(), )?; - let schema = match alias { - Some(ref alias) => input_schema.replace_qualifier(alias.as_str()), - None => input_schema, - }; - Ok(LogicalPlan::Projection( - Projection::try_new_with_schema_alias( - projected_expr, - Arc::new(plan.clone()), - DFSchemaRef::new(schema), - alias, - )?, - )) + let projection = LogicalPlan::Projection(Projection::try_new_with_schema( + projected_expr, + Arc::new(plan.clone()), + DFSchemaRef::new(input_schema), + )?); + match alias { + Some(alias) => Ok(with_alias(projection, alias)), + None => Ok(projection), + } +} + +pub fn with_alias(plan: LogicalPlan, alias: String) -> LogicalPlan { + let plan_schema = &**plan.schema(); + let schema = (plan_schema.clone()).replace_qualifier(alias.as_str()); + LogicalPlan::SubqueryAlias(SubqueryAlias { + input: Arc::new(plan), + alias, + schema: Arc::new(schema), + }) } /// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema. diff --git a/datafusion/optimizer/src/limit_push_down.rs b/datafusion/optimizer/src/limit_push_down.rs index fd29015d5bb0a..9dbda46537638 100644 --- a/datafusion/optimizer/src/limit_push_down.rs +++ b/datafusion/optimizer/src/limit_push_down.rs @@ -141,11 +141,10 @@ impl OptimizerRule for LimitPushDown { input: Arc::new((*projection.input).clone()), }); // Push down limit directly (projection doesn't change number of rows) - LogicalPlan::Projection(Projection::try_new_with_schema_alias( + LogicalPlan::Projection(Projection::try_new_with_schema( projection.expr.clone(), Arc::new(new_input), projection.schema.clone(), - projection.alias.clone(), )?) } diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index dda8474241be2..c82a9080cc8dd 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -45,7 +45,7 @@ use datafusion_common::{ use datafusion_expr::expr::{Between, BinaryExpr, Case, Cast, GroupingSet, Like}; use datafusion_expr::expr_rewriter::normalize_col; use datafusion_expr::expr_rewriter::normalize_col_with_schemas; -use datafusion_expr::logical_plan::builder::project_with_alias; +use datafusion_expr::logical_plan::builder::{project_with_alias, with_alias}; use datafusion_expr::logical_plan::Join as HashJoin; use datafusion_expr::logical_plan::JoinConstraint as HashJoinConstraint; use datafusion_expr::logical_plan::{ @@ -857,11 +857,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ( match (cte, self.schema_provider.get_table_provider(table_ref)) { (Some(cte_plan), _) => match table_alias { - Some(cte_alias) => project_with_alias( - cte_plan.clone(), - vec![Expr::Wildcard], - Some(cte_alias), - ), + Some(cte_alias) => { + Ok(with_alias(cte_plan.clone(), cte_alias)) + } _ => Ok(cte_plan.clone()), }, (_, Ok(provider)) => { @@ -888,18 +886,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ctes, outer_query_schema, )?; - ( - project_with_alias( - logical_plan.clone(), - logical_plan - .schema() - .fields() - .iter() - .map(|field| col(field.name())), - normalized_alias, - )?, - alias, - ) + + let plan = match normalized_alias { + Some(alias) => with_alias(logical_plan.clone(), alias), + _ => logical_plan.clone(), + }; + (plan, alias) } TableFactor::NestedJoin { table_with_joins, From 28c98f51266d1667575f4582936ffab05f9966f1 Mon Sep 17 00:00:00 2001 From: jackwener Date: Sun, 20 Nov 2022 18:10:44 +0800 Subject: [PATCH 2/6] remove alias in projection. --- datafusion/expr/src/logical_plan/builder.rs | 27 ++++++------ datafusion/expr/src/logical_plan/plan.rs | 21 +--------- datafusion/expr/src/utils.rs | 9 ++-- .../optimizer/src/common_subexpr_eliminate.rs | 14 +++---- datafusion/optimizer/src/filter_push_down.rs | 1 - .../optimizer/src/projection_push_down.rs | 20 +++------ datafusion/proto/src/logical_plan.rs | 41 +++++++++---------- datafusion/sql/src/planner.rs | 4 +- 8 files changed, 49 insertions(+), 88 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index ebbaf58860929..af9f66ca98c23 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -328,7 +328,6 @@ impl LogicalPlanBuilder { input, mut expr, schema: _, - alias, }) if missing_cols .iter() .all(|c| input.schema().field_from_column(c).is_ok()) => @@ -343,7 +342,7 @@ impl LogicalPlanBuilder { // projected alias. missing_exprs.retain(|e| !expr.contains(e)); expr.extend(missing_exprs); - Ok(project_with_alias((*input).clone(), expr, alias)?) + Ok(project_with_alias((*input).clone(), expr, None)?) } _ => { let new_inputs = curr_plan @@ -858,11 +857,10 @@ pub(crate) fn validate_unique_names<'a>( }) } -pub fn project_with_column_index_alias( +pub fn project_with_column_index( expr: Vec, input: Arc, schema: DFSchemaRef, - alias: Option, ) -> Result { let alias_expr = expr .into_iter() @@ -873,9 +871,9 @@ pub fn project_with_column_index_alias( x => x.alias(schema.field(i).name()), }) .collect::>(); - Ok(LogicalPlan::Projection( - Projection::try_new_with_schema_alias(alias_expr, input, schema, alias)?, - )) + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + alias_expr, input, schema, + )?)) } /// Union two logical plans. @@ -928,14 +926,13 @@ pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result Ok(Arc::new(project_with_column_index_alias( - expr.to_vec(), - input, - Arc::new(union_schema.clone()), - alias, - )?)), + LogicalPlan::Projection(Projection { expr, input, .. }) => { + Ok(Arc::new(project_with_column_index( + expr.to_vec(), + input, + Arc::new(union_schema.clone()), + )?)) + } x => Ok(Arc::new(x)), } }) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index d41ad797c3337..82e44986a7b1d 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -781,9 +781,7 @@ impl LogicalPlan { Ok(()) } - LogicalPlan::Projection(Projection { - ref expr, alias, .. - }) => { + LogicalPlan::Projection(Projection { ref expr, .. }) => { write!(f, "Projection: ")?; for (i, expr_item) in expr.iter().enumerate() { if i > 0 { @@ -791,9 +789,6 @@ impl LogicalPlan { } write!(f, "{:?}", expr_item)?; } - if let Some(a) = alias { - write!(f, ", alias={}", a)?; - } Ok(()) } LogicalPlan::Filter(Filter { @@ -1115,8 +1110,6 @@ pub struct Projection { pub input: Arc, /// The schema description of the output pub schema: DFSchemaRef, - /// Projection output relation alias - pub alias: Option, } impl Projection { @@ -1137,16 +1130,6 @@ impl Projection { expr: Vec, input: Arc, schema: DFSchemaRef, - ) -> Result { - Self::try_new_with_schema_alias(expr, input, schema, None) - } - - /// Create a new Projection using the specified output schema - pub fn try_new_with_schema_alias( - expr: Vec, - input: Arc, - schema: DFSchemaRef, - alias: Option, ) -> Result { if expr.len() != schema.fields().len() { return Err(DataFusionError::Plan(format!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len()))); @@ -1155,7 +1138,6 @@ impl Projection { expr, input, schema, - alias, }) } @@ -1171,7 +1153,6 @@ impl Projection { expr, input, schema, - alias: None, } } diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 341ae928bdad4..c84f4b8d709c0 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -366,14 +366,13 @@ pub fn from_plan( inputs: &[LogicalPlan], ) -> Result { match plan { - LogicalPlan::Projection(Projection { schema, alias, .. }) => Ok( - LogicalPlan::Projection(Projection::try_new_with_schema_alias( + LogicalPlan::Projection(Projection { schema, .. }) => { + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( expr.to_vec(), Arc::new(inputs[0].clone()), schema.clone(), - alias.clone(), - )?), - ), + )?)) + } LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values { schema: schema.clone(), values: expr diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index fdbda47f2a4b1..11ed5cdbebb0a 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -101,7 +101,6 @@ impl OptimizerRule for CommonSubexprEliminate { expr, input, schema, - alias, }) => { let input_schema = Arc::clone(input.schema()); let arrays = to_arrays(expr, input_schema, &mut expr_set)?; @@ -114,14 +113,11 @@ impl OptimizerRule for CommonSubexprEliminate { optimizer_config, )?; - Ok(LogicalPlan::Projection( - Projection::try_new_with_schema_alias( - pop_expr(&mut new_expr)?, - Arc::new(new_input), - schema.clone(), - alias.clone(), - )?, - )) + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + pop_expr(&mut new_expr)?, + Arc::new(new_input), + schema.clone(), + )?)) } LogicalPlan::Filter(filter) => { let input = filter.input(); diff --git a/datafusion/optimizer/src/filter_push_down.rs b/datafusion/optimizer/src/filter_push_down.rs index 9ba30fae5d25c..a50124cb26275 100644 --- a/datafusion/optimizer/src/filter_push_down.rs +++ b/datafusion/optimizer/src/filter_push_down.rs @@ -550,7 +550,6 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { input, expr, schema, - alias: _, }) => { // A projection is filter-commutable, but re-writes all predicate expressions // collect projection. diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index 916071bd56752..1e54d7184cccc 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -87,7 +87,6 @@ fn optimize_plan( input, expr, schema, - alias, }) => { // projection: // * remove any expression that is not required @@ -139,16 +138,11 @@ fn optimize_plan( Ok(new_input) } else { let metadata = new_input.schema().metadata().clone(); - Ok(LogicalPlan::Projection( - Projection::try_new_with_schema_alias( - new_expr, - Arc::new(new_input), - DFSchemaRef::new(DFSchema::new_with_metadata( - new_fields, metadata, - )?), - alias.clone(), - )?, - )) + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + new_expr, + Arc::new(new_input), + DFSchemaRef::new(DFSchema::new_with_metadata(new_fields, metadata)?), + )?)) } } LogicalPlan::Join(Join { @@ -438,9 +432,7 @@ fn optimize_plan( } fn projection_equal(p: &Projection, p2: &Projection) -> bool { - p.expr.len() == p2.expr.len() - && p.alias == p2.alias - && p.expr.iter().zip(&p2.expr).all(|(l, r)| l == r) + p.expr.len() == p2.expr.len() && p.expr.iter().zip(&p2.expr).all(|(l, r)| l == r) } fn replace_alias( diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index 42d3401eff5d4..c63258858202a 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -969,28 +969,25 @@ impl AsLogicalPlan for LogicalPlanNode { Ok(node) } } - LogicalPlan::Projection(Projection { - expr, input, alias, .. - }) => Ok(protobuf::LogicalPlanNode { - logical_plan_type: Some(LogicalPlanType::Projection(Box::new( - protobuf::ProjectionNode { - input: Some(Box::new( - protobuf::LogicalPlanNode::try_from_logical_plan( - input.as_ref(), - extension_codec, - )?, - )), - expr: expr.iter().map(|expr| expr.try_into()).collect::, - to_proto::Error, - >>( - )?, - optional_alias: alias - .clone() - .map(protobuf::projection_node::OptionalAlias::Alias), - }, - ))), - }), + LogicalPlan::Projection(Projection { expr, input, .. }) => { + Ok(protobuf::LogicalPlanNode { + logical_plan_type: Some(LogicalPlanType::Projection(Box::new( + protobuf::ProjectionNode { + input: Some(Box::new( + protobuf::LogicalPlanNode::try_from_logical_plan( + input.as_ref(), + extension_codec, + )?, + )), + expr: expr + .iter() + .map(|expr| expr.try_into()) + .collect::, to_proto::Error>>()?, + optional_alias: None, + }, + ))), + }) + } LogicalPlan::Filter(filter) => { let input: protobuf::LogicalPlanNode = protobuf::LogicalPlanNode::try_from_logical_plan( diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index c82a9080cc8dd..718d601891382 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -888,8 +888,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { )?; let plan = match normalized_alias { - Some(alias) => with_alias(logical_plan.clone(), alias), - _ => logical_plan.clone(), + Some(alias) => with_alias(logical_plan, alias), + _ => logical_plan, }; (plan, alias) } From 4b210a674c6857ee50cd23d368c0f3164aac01db Mon Sep 17 00:00:00 2001 From: jackwener Date: Sun, 20 Nov 2022 17:45:42 +0800 Subject: [PATCH 3/6] correct plan in test --- datafusion/core/src/dataframe.rs | 6 +- datafusion/core/src/datasource/view.rs | 5 +- datafusion/core/tests/sql/explain_analyze.rs | 24 +-- datafusion/core/tests/sql/subqueries.rs | 207 ++++++++++--------- datafusion/core/tests/sql/window.rs | 52 ++--- 5 files changed, 150 insertions(+), 144 deletions(-) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 3d1c8d009418c..4a35332274ab4 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -1342,10 +1342,10 @@ mod tests { \n Limit: skip=0, fetch=1\ \n Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\ \n Inner Join: t1.c1 = t2.c1\ - \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t1\ + \n SubqueryAlias: t1\ \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\ \n TableScan: aggregate_test_100 projection=[c1, c2, c3]\ - \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t2\ + \n SubqueryAlias: t2\ \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\ \n TableScan: aggregate_test_100 projection=[c1, c2, c3]", format!("{:?}", df_renamed.to_logical_plan()?) @@ -1388,7 +1388,7 @@ mod tests { let plan = df.explain(false, false)?.collect().await?; // Filters all the way to Parquet let formatted = pretty::pretty_format_batches(&plan).unwrap().to_string(); - assert!(formatted.contains("predicate=id_min@0 <= 1 AND 1 <= id_max@1")); + assert!(formatted.contains("FilterExec: id@0 = 1")); Ok(()) } diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index 5238d47339ed9..34a427030ae42 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -445,7 +445,7 @@ mod tests { let formatted = arrow::util::pretty::pretty_format_batches(&plan) .unwrap() .to_string(); - assert!(formatted.contains("predicate=id_min@0 <= 1 AND 1 <= id_max@1")); + assert!(formatted.contains("FilterExec: id@0 = 1")); Ok(()) } @@ -474,7 +474,8 @@ mod tests { let formatted = arrow::util::pretty::pretty_format_batches(&plan) .unwrap() .to_string(); - assert!(formatted.contains("ParquetExec: limit=Some(10)")); + // TODO: limit_push_down support SubqueryAlias + assert!(formatted.contains("GlobalLimitExec: skip=0, fetch=10")); Ok(()) } diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index da290daa9f401..c4802b43f6d1c 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -663,7 +663,7 @@ order by \n Filter: lineitem.l_returnflag = Utf8(\"R\")\ \n TableScan: lineitem projection=[l_orderkey, l_extendedprice, l_discount, l_returnflag], partial_filters=[lineitem.l_returnflag = Utf8(\"R\")]\ \n TableScan: nation projection=[n_nationkey, n_name]"; - assert_eq!(expected, format!("{:?}", plan.unwrap()),); + assert_eq!(expected, format!("{:?}", plan.unwrap())); Ok(()) } @@ -738,15 +738,13 @@ async fn test_physical_plan_display_indent_multi_children() { " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000)", " ProjectionExec: expr=[c1@0 as c1]", - " ProjectionExec: expr=[c1@0 as c1]", - " RepartitionExec: partitioning=RoundRobinBatch(9000)", - " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1]", + " RepartitionExec: partitioning=RoundRobinBatch(9000)", + " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1]", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([Column { name: \"c2\", index: 0 }], 9000)", - " ProjectionExec: expr=[c2@0 as c2]", - " ProjectionExec: expr=[c1@0 as c2]", - " RepartitionExec: partitioning=RoundRobinBatch(9000)", - " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1, c2]", + " ProjectionExec: expr=[c1@0 as c2]", + " RepartitionExec: partitioning=RoundRobinBatch(9000)", + " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1, c2]", ]; let normalizer = ExplainNormalizer::new(); @@ -782,15 +780,15 @@ async fn csv_explain() { "logical_plan", "Projection: aggregate_test_100.c1\ \n Filter: aggregate_test_100.c2 > Int8(10)\ - \n TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)]" + \n TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)]", ], vec!["physical_plan", - "ProjectionExec: expr=[c1@0 as c1]\ + "ProjectionExec: expr=[c1@0 as c1]\ \n CoalesceBatchesExec: target_batch_size=4096\ \n FilterExec: c2@1 > 10\ \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\ \n CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1, c2]\ - \n" + \n", ]]; assert_eq!(expected, actual); @@ -885,7 +883,9 @@ async fn explain_logical_plan_only() { "logical_plan", "Projection: COUNT(UInt8(1))\ \n Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\ - \n Values: (Utf8(\"a\"), Int64(1), Int64(100)), (Utf8(\"a\"), Int64(2), Int64(150))", + \n SubqueryAlias: t\ + \n SubqueryAlias: t\ + \n Values: (Utf8(\"a\"), Int64(1), Int64(100)), (Utf8(\"a\"), Int64(2), Int64(150))", ]]; assert_eq!(expected, actual); } diff --git a/datafusion/core/tests/sql/subqueries.rs b/datafusion/core/tests/sql/subqueries.rs index 98bf56a02ccf2..719c0c3d7a258 100644 --- a/datafusion/core/tests/sql/subqueries.rs +++ b/datafusion/core/tests/sql/subqueries.rs @@ -50,20 +50,21 @@ where c_acctbal < ( let plan = ctx.optimize(&plan).unwrap(); let actual = format!("{}", plan.display_indent()); - let expected = r#"Sort: customer.c_custkey ASC NULLS LAST - Projection: customer.c_custkey - Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __sq_2.__value - Inner Join: customer.c_custkey = __sq_2.o_custkey - TableScan: customer projection=[c_custkey, c_acctbal] - Projection: orders.o_custkey, SUM(orders.o_totalprice) AS __value, alias=__sq_2 - Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]] - Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < __sq_1.__value - Inner Join: orders.o_orderkey = __sq_1.l_orderkey - TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] - Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice) AS price AS __value, alias=__sq_1 - Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]] - TableScan: lineitem projection=[l_orderkey, l_extendedprice]"# - .to_string(); + let expected = "Sort: customer.c_custkey ASC NULLS LAST\ + \n Projection: customer.c_custkey\ + \n Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __sq_2.__value\ + \n Inner Join: customer.c_custkey = __sq_2.o_custkey\ + \n TableScan: customer projection=[c_custkey, c_acctbal]\ + \n SubqueryAlias: __sq_2\ + \n Projection: orders.o_custkey, SUM(orders.o_totalprice) AS __value\ + \n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]]\ + \n Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < __sq_1.__value\ + \n Inner Join: orders.o_orderkey = __sq_1.l_orderkey\ + \n TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice]\ + \n SubqueryAlias: __sq_1\ + \n Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice) AS price AS __value\ + \n Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]]\ + \n TableScan: lineitem projection=[l_orderkey, l_extendedprice]"; assert_eq!(actual, expected); Ok(()) @@ -93,12 +94,12 @@ where o_orderstatus in ( let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); let actual = format!("{}", plan.display_indent()); - let expected = r#"Projection: orders.o_orderkey - LeftSemi Join: orders.o_orderstatus = __sq_1.l_linestatus, orders.o_orderkey = __sq_1.l_orderkey - TableScan: orders projection=[o_orderkey, o_orderstatus] - Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey AS l_orderkey, alias=__sq_1 - TableScan: lineitem projection=[l_orderkey, l_linestatus]"# - .to_string(); + let expected = "Projection: orders.o_orderkey\ + \n LeftSemi Join: orders.o_orderstatus = __sq_1.l_linestatus, orders.o_orderkey = __sq_1.l_orderkey\ + \n TableScan: orders projection=[o_orderkey, o_orderstatus]\ + \n SubqueryAlias: __sq_1\ + \n Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey AS l_orderkey\ + \n TableScan: lineitem projection=[l_orderkey, l_linestatus]"; assert_eq!(actual, expected); // assert data @@ -139,32 +140,32 @@ order by s_acctbal desc, n_name, s_name, p_partkey;"#; let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); let actual = format!("{}", plan.display_indent()); - let expected = r#"Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST - Projection: supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment - Projection: part.p_partkey, part.p_mfgr, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, nation.n_name - Inner Join: part.p_partkey = __sq_1.ps_partkey, partsupp.ps_supplycost = __sq_1.__value - Inner Join: nation.n_regionkey = region.r_regionkey - Inner Join: supplier.s_nationkey = nation.n_nationkey - Inner Join: partsupp.ps_suppkey = supplier.s_suppkey - Inner Join: part.p_partkey = partsupp.ps_partkey - Filter: part.p_size = Int32(15) AND part.p_type LIKE Utf8("%BRASS") - TableScan: part projection=[p_partkey, p_mfgr, p_type, p_size], partial_filters=[part.p_size = Int32(15), part.p_type LIKE Utf8("%BRASS")] - TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost] - TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment] - TableScan: nation projection=[n_nationkey, n_name, n_regionkey] - Filter: region.r_name = Utf8("EUROPE") - TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8("EUROPE")] - Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value, alias=__sq_1 - Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]] - Inner Join: nation.n_regionkey = region.r_regionkey - Inner Join: supplier.s_nationkey = nation.n_nationkey - Inner Join: partsupp.ps_suppkey = supplier.s_suppkey - TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost] - TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment] - TableScan: nation projection=[n_nationkey, n_name, n_regionkey] - Filter: region.r_name = Utf8("EUROPE") - TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8("EUROPE")]"# - .to_string(); + let expected = "Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST\ + \n Projection: supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment\ + \n Projection: part.p_partkey, part.p_mfgr, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, nation.n_name\ + \n Inner Join: part.p_partkey = __sq_1.ps_partkey, partsupp.ps_supplycost = __sq_1.__value\ + \n Inner Join: nation.n_regionkey = region.r_regionkey\ + \n Inner Join: supplier.s_nationkey = nation.n_nationkey\ + \n Inner Join: partsupp.ps_suppkey = supplier.s_suppkey\ + \n Inner Join: part.p_partkey = partsupp.ps_partkey\ + \n Filter: part.p_size = Int32(15) AND part.p_type LIKE Utf8(\"%BRASS\")\ + \n TableScan: part projection=[p_partkey, p_mfgr, p_type, p_size], partial_filters=[part.p_size = Int32(15), part.p_type LIKE Utf8(\"%BRASS\")]\ + \n TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]\ + \n TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]\ + \n TableScan: nation projection=[n_nationkey, n_name, n_regionkey]\ + \n Filter: region.r_name = Utf8(\"EUROPE\")\ + \n TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8(\"EUROPE\")]\ + \n SubqueryAlias: __sq_1\ + \n Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value\ + \n Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]]\ + \n Inner Join: nation.n_regionkey = region.r_regionkey\ + \n Inner Join: supplier.s_nationkey = nation.n_nationkey\ + \n Inner Join: partsupp.ps_suppkey = supplier.s_suppkey\ + \n TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]\ + \n TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]\ + \n TableScan: nation projection=[n_nationkey, n_name, n_regionkey]\ + \n Filter: region.r_name = Utf8(\"EUROPE\")\ + \n TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8(\"EUROPE\")]"; assert_eq!(actual, expected); // assert data @@ -321,26 +322,28 @@ order by s_name; .map_err(|e| format!("{:?} at {}", e, "error")) .unwrap(); let actual = format!("{}", plan.display_indent()); - let expected = r#"Sort: supplier.s_name ASC NULLS LAST - Projection: supplier.s_name, supplier.s_address - LeftSemi Join: supplier.s_suppkey = __sq_2.ps_suppkey - Inner Join: supplier.s_nationkey = nation.n_nationkey - TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey] - Filter: nation.n_name = Utf8("CANADA") - TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("CANADA")] - Projection: partsupp.ps_suppkey AS ps_suppkey, alias=__sq_2 - Filter: CAST(partsupp.ps_availqty AS Float64) > __sq_3.__value - Inner Join: partsupp.ps_partkey = __sq_3.l_partkey, partsupp.ps_suppkey = __sq_3.l_suppkey - LeftSemi Join: partsupp.ps_partkey = __sq_1.p_partkey - TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty] - Projection: part.p_partkey AS p_partkey, alias=__sq_1 - Filter: part.p_name LIKE Utf8("forest%") - TableScan: part projection=[p_partkey, p_name], partial_filters=[part.p_name LIKE Utf8("forest%")] - Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value, alias=__sq_3 - Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]] - Filter: lineitem.l_shipdate >= Date32("8766") - TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate], partial_filters=[lineitem.l_shipdate >= Date32("8766")]"# - .to_string(); + let expected = "Sort: supplier.s_name ASC NULLS LAST\ + \n Projection: supplier.s_name, supplier.s_address\ + \n LeftSemi Join: supplier.s_suppkey = __sq_2.ps_suppkey\ + \n Inner Join: supplier.s_nationkey = nation.n_nationkey\ + \n TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey]\ + \n Filter: nation.n_name = Utf8(\"CANADA\")\ + \n TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8(\"CANADA\")]\ + \n SubqueryAlias: __sq_2\ + \n Projection: partsupp.ps_suppkey AS ps_suppkey\ + \n Filter: CAST(partsupp.ps_availqty AS Float64) > __sq_3.__value\ + \n Inner Join: partsupp.ps_partkey = __sq_3.l_partkey, partsupp.ps_suppkey = __sq_3.l_suppkey\ + \n LeftSemi Join: partsupp.ps_partkey = __sq_1.p_partkey\ + \n TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty]\ + \n SubqueryAlias: __sq_1\ + \n Projection: part.p_partkey AS p_partkey\ + \n Filter: part.p_name LIKE Utf8(\"forest%\")\ + \n TableScan: part projection=[p_partkey, p_name], partial_filters=[part.p_name LIKE Utf8(\"forest%\")]\ + \n SubqueryAlias: __sq_3\ + \n Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value\ + \n Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]]\ + \n Filter: lineitem.l_shipdate >= Date32(\"8766\")\ + \n TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate], partial_filters=[lineitem.l_shipdate >= Date32(\"8766\")]"; assert_eq!(actual, expected); // assert data @@ -380,22 +383,22 @@ order by cntrycode;"#; .map_err(|e| format!("{:?} at {}", e, "error")) .unwrap(); let actual = format!("{}", plan.display_indent()); - let expected = r#"Sort: custsale.cntrycode ASC NULLS LAST - Projection: custsale.cntrycode, COUNT(UInt8(1)) AS numcust, SUM(custsale.c_acctbal) AS totacctbal - Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[COUNT(UInt8(1)), SUM(custsale.c_acctbal)]] - Projection: cntrycode, customer.c_acctbal, alias=custsale - Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal - Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __sq_1.__value - CrossJoin: - LeftAnti Join: customer.c_custkey = orders.o_custkey - Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]) - TableScan: customer projection=[c_custkey, c_phone, c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])] - TableScan: orders projection=[o_custkey] - Projection: AVG(customer.c_acctbal) AS __value, alias=__sq_1 - Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]] - Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]) - TableScan: customer projection=[c_phone, c_acctbal], partial_filters=[CAST(customer.c_acctbal AS Decimal128(30, 15)) > Decimal128(Some(0),30,15), substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]), customer.c_acctbal > Decimal128(Some(0),15,2)]"# - .to_string(); + let expected = "Sort: custsale.cntrycode ASC NULLS LAST\ + \n Projection: custsale.cntrycode, COUNT(UInt8(1)) AS numcust, SUM(custsale.c_acctbal) AS totacctbal\ + \n Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[COUNT(UInt8(1)), SUM(custsale.c_acctbal)]]\ + \n SubqueryAlias: custsale\ + \n Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal\ + \n Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __sq_1.__value\ + \n CrossJoin:\ + \n LeftAnti Join: customer.c_custkey = orders.o_custkey\ + \n Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8(\"13\"), Utf8(\"31\"), Utf8(\"23\"), Utf8(\"29\"), Utf8(\"30\"), Utf8(\"18\"), Utf8(\"17\")])\ + \n TableScan: customer projection=[c_custkey, c_phone, c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8(\"13\"), Utf8(\"31\"), Utf8(\"23\"), Utf8(\"29\"), Utf8(\"30\"), Utf8(\"18\"), Utf8(\"17\")])]\ + \n TableScan: orders projection=[o_custkey]\ + \n SubqueryAlias: __sq_1\ + \n Projection: AVG(customer.c_acctbal) AS __value\ + \n Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]]\ + \n Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8(\"13\"), Utf8(\"31\"), Utf8(\"23\"), Utf8(\"29\"), Utf8(\"30\"), Utf8(\"18\"), Utf8(\"17\")])\ + \n TableScan: customer projection=[c_phone, c_acctbal], partial_filters=[CAST(customer.c_acctbal AS Decimal128(30, 15)) > Decimal128(Some(0),30,15), substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8(\"13\"), Utf8(\"31\"), Utf8(\"23\"), Utf8(\"29\"), Utf8(\"30\"), Utf8(\"18\"), Utf8(\"17\")]), customer.c_acctbal > Decimal128(Some(0),15,2)]"; assert_eq!(expected, actual); // assert data @@ -442,26 +445,26 @@ order by value desc; .map_err(|e| format!("{:?} at {}", e, "error")) .unwrap(); let actual = format!("{}", plan.display_indent()); - let expected = r#"Sort: value DESC NULLS FIRST - Projection: partsupp.ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS value - Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > CAST(__sq_1.__value AS Decimal128(38, 15)) - CrossJoin: - Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]] - Inner Join: supplier.s_nationkey = nation.n_nationkey - Inner Join: partsupp.ps_suppkey = supplier.s_suppkey - TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost] - TableScan: supplier projection=[s_suppkey, s_nationkey] - Filter: nation.n_name = Utf8("GERMANY") - TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("GERMANY")] - Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS __value, alias=__sq_1 - Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]] - Inner Join: supplier.s_nationkey = nation.n_nationkey - Inner Join: partsupp.ps_suppkey = supplier.s_suppkey - TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost] - TableScan: supplier projection=[s_suppkey, s_nationkey] - Filter: nation.n_name = Utf8("GERMANY") - TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("GERMANY")]"# - .to_string(); + let expected = "Sort: value DESC NULLS FIRST\ + \n Projection: partsupp.ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS value\ + \n Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > CAST(__sq_1.__value AS Decimal128(38, 15))\ + \n CrossJoin:\ + \n Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]\ + \n Inner Join: supplier.s_nationkey = nation.n_nationkey\ + \n Inner Join: partsupp.ps_suppkey = supplier.s_suppkey\ + \n TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]\ + \n TableScan: supplier projection=[s_suppkey, s_nationkey]\ + \n Filter: nation.n_name = Utf8(\"GERMANY\")\ + \n TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8(\"GERMANY\")]\ + \n SubqueryAlias: __sq_1\ + \n Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS __value\ + \n Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]\ + \n Inner Join: supplier.s_nationkey = nation.n_nationkey\ + \n Inner Join: partsupp.ps_suppkey = supplier.s_suppkey\ + \n TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]\ + \n TableScan: supplier projection=[s_suppkey, s_nationkey]\ + \n Filter: nation.n_name = Utf8(\"GERMANY\")\ + \n TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8(\"GERMANY\")]"; assert_eq!(actual, expected); // assert data diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs index 9367ee63b5f71..95d0ed92944c4 100644 --- a/datafusion/core/tests/sql/window.rs +++ b/datafusion/core/tests/sql/window.rs @@ -335,18 +335,19 @@ async fn window_expr_eliminate() -> Result<()> { " Sort: d.b ASC NULLS LAST [b:Utf8, max_a:Int64;N]", " Projection: d.b, MAX(d.a) AS max_a [b:Utf8, max_a:Int64;N]", " Aggregate: groupBy=[[d.b]], aggr=[[MAX(d.a)]] [b:Utf8, MAX(d.a):Int64;N]", - " Projection: _data2.a, _data2.b, alias=d [a:Int64, b:Utf8]", - " Projection: s.a, s.b, alias=_data2 [a:Int64, b:Utf8]", - " Projection: a, b, alias=s [a:Int64, b:Utf8]", - " Union [a:Int64, b:Utf8]", - " Projection: Int64(1) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", - " Projection: Int64(3) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", - " Projection: Int64(5) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", - " Projection: Int64(7) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", + " SubqueryAlias: d [a:Int64, b:Utf8]", + " SubqueryAlias: _data2 [a:Int64, b:Utf8]", + " Projection: s.a, s.b [a:Int64, b:Utf8]", + " SubqueryAlias: s [a:Int64, b:Utf8]", + " Union [a:Int64, b:Utf8]", + " Projection: Int64(1) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]", + " EmptyRelation []", + " Projection: Int64(3) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]", + " EmptyRelation []", + " Projection: Int64(5) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]", + " EmptyRelation []", + " Projection: Int64(7) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]", + " EmptyRelation []", ]; let formatted = plan.display_indent_schema().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); @@ -398,19 +399,20 @@ async fn window_expr_eliminate() -> Result<()> { " Sort: d.b ASC NULLS LAST [b:Utf8, max_a:Int64;N, MAX(d.seq):UInt64;N]", " Projection: d.b, MAX(d.a) AS max_a, MAX(d.seq) [b:Utf8, max_a:Int64;N, MAX(d.seq):UInt64;N]", " Aggregate: groupBy=[[d.b]], aggr=[[MAX(d.a), MAX(d.seq)]] [b:Utf8, MAX(d.a):Int64;N, MAX(d.seq):UInt64;N]", - " Projection: _data2.seq, _data2.a, _data2.b, alias=d [seq:UInt64;N, a:Int64, b:Utf8]", - " Projection: ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] AS seq, s.a, s.b, alias=_data2 [seq:UInt64;N, a:Int64, b:Utf8]", - " WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST]]] [ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST]:UInt64;N, a:Int64, b:Utf8]", - " Projection: a, b, alias=s [a:Int64, b:Utf8]", - " Union [a:Int64, b:Utf8]", - " Projection: Int64(1) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", - " Projection: Int64(3) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", - " Projection: Int64(5) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", - " Projection: Int64(7) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", + " SubqueryAlias: d [seq:UInt64;N, a:Int64, b:Utf8]", + " SubqueryAlias: _data2 [seq:UInt64;N, a:Int64, b:Utf8]", + " Projection: ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] AS seq, s.a, s.b [seq:UInt64;N, a:Int64, b:Utf8]", + " WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST]]] [ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST]:UInt64;N, a:Int64, b:Utf8]", + " SubqueryAlias: s [a:Int64, b:Utf8]", + " Union [a:Int64, b:Utf8]", + " Projection: Int64(1) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]", + " EmptyRelation []", + " Projection: Int64(3) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]", + " EmptyRelation []", + " Projection: Int64(5) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]", + " EmptyRelation []", + " Projection: Int64(7) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]", + " EmptyRelation []", ]; let formatted = plan.display_indent_schema().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); From 1644879c8263cfd1a02b124786e63c828b9a4340 Mon Sep 17 00:00:00 2001 From: jackwener Date: Sun, 20 Nov 2022 17:52:47 +0800 Subject: [PATCH 4/6] correct tpch plan --- benchmarks/expected-plans/q11.txt | 17 +++++++++-------- benchmarks/expected-plans/q13.txt | 17 +++++++++-------- benchmarks/expected-plans/q15.txt | 25 ++++++++++++++----------- benchmarks/expected-plans/q16.txt | 7 ++++--- benchmarks/expected-plans/q18.txt | 9 +++++---- benchmarks/expected-plans/q2.txt | 21 +++++++++++---------- benchmarks/expected-plans/q20.txt | 27 +++++++++++++++------------ benchmarks/expected-plans/q22.txt | 11 ++++++----- benchmarks/expected-plans/q7.txt | 4 ++-- benchmarks/expected-plans/q8.txt | 2 +- benchmarks/expected-plans/q9.txt | 2 +- 11 files changed, 77 insertions(+), 65 deletions(-) diff --git a/benchmarks/expected-plans/q11.txt b/benchmarks/expected-plans/q11.txt index 0e886e2e74b70..e0ccdfce87c67 100644 --- a/benchmarks/expected-plans/q11.txt +++ b/benchmarks/expected-plans/q11.txt @@ -9,11 +9,12 @@ Sort: value DESC NULLS FIRST TableScan: supplier projection=[s_suppkey, s_nationkey] Filter: nation.n_name = Utf8("GERMANY") TableScan: nation projection=[n_nationkey, n_name] - Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS __value, alias=__sq_1 - Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]] - Inner Join: supplier.s_nationkey = nation.n_nationkey - Inner Join: partsupp.ps_suppkey = supplier.s_suppkey - TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost] - TableScan: supplier projection=[s_suppkey, s_nationkey] - Filter: nation.n_name = Utf8("GERMANY") - TableScan: nation projection=[n_nationkey, n_name] \ No newline at end of file + SubqueryAlias: __sq_1 + Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS __value + Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]] + Inner Join: supplier.s_nationkey = nation.n_nationkey + Inner Join: partsupp.ps_suppkey = supplier.s_suppkey + TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost] + TableScan: supplier projection=[s_suppkey, s_nationkey] + Filter: nation.n_name = Utf8("GERMANY") + TableScan: nation projection=[n_nationkey, n_name] \ No newline at end of file diff --git a/benchmarks/expected-plans/q13.txt b/benchmarks/expected-plans/q13.txt index b6106a97111ef..9eea8a1616598 100644 --- a/benchmarks/expected-plans/q13.txt +++ b/benchmarks/expected-plans/q13.txt @@ -1,11 +1,12 @@ Sort: custdist DESC NULLS FIRST, c_orders.c_count DESC NULLS FIRST Projection: c_orders.c_count, COUNT(UInt8(1)) AS custdist Aggregate: groupBy=[[c_orders.c_count]], aggr=[[COUNT(UInt8(1))]] - Projection: c_orders.COUNT(orders.o_orderkey) AS c_count, alias=c_orders - Projection: COUNT(orders.o_orderkey), alias=c_orders - Projection: COUNT(orders.o_orderkey) - Aggregate: groupBy=[[customer.c_custkey]], aggr=[[COUNT(orders.o_orderkey)]] - Left Join: customer.c_custkey = orders.o_custkey - TableScan: customer projection=[c_custkey] - Filter: orders.o_comment NOT LIKE Utf8("%special%requests%") - TableScan: orders projection=[o_orderkey, o_custkey, o_comment] \ No newline at end of file + SubqueryAlias: c_orders + Projection: c_orders.COUNT(orders.o_orderkey) AS c_count + SubqueryAlias: c_orders + Projection: COUNT(orders.o_orderkey) + Aggregate: groupBy=[[customer.c_custkey]], aggr=[[COUNT(orders.o_orderkey)]] + Left Join: customer.c_custkey = orders.o_custkey + TableScan: customer projection=[c_custkey] + Filter: orders.o_comment NOT LIKE Utf8("%special%requests%") + TableScan: orders projection=[o_orderkey, o_custkey, o_comment] \ No newline at end of file diff --git a/benchmarks/expected-plans/q15.txt b/benchmarks/expected-plans/q15.txt index e78f8e0d98876..1100d17b617d9 100644 --- a/benchmarks/expected-plans/q15.txt +++ b/benchmarks/expected-plans/q15.txt @@ -4,18 +4,21 @@ Sort: supplier.s_suppkey ASC NULLS LAST Inner Join: revenue0.total_revenue = __sq_1.__value Inner Join: supplier.s_suppkey = revenue0.supplier_no TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone] - Projection: supplier_no, total_revenue, alias=revenue0 - Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue - Projection: lineitem.l_suppkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) - Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] - Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587") - TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate] - Projection: MAX(revenue0.total_revenue) AS __value, alias=__sq_1 - Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]] - Projection: total_revenue, alias=revenue0 - Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue - Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) + SubqueryAlias: revenue0 + Projection: supplier_no, total_revenue + Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue + Projection: lineitem.l_suppkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587") TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate] + SubqueryAlias: __sq_1 + Projection: MAX(revenue0.total_revenue) AS __value + Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]] + SubqueryAlias: revenue0 + Projection: total_revenue + Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue + Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) + Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] + Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587") + TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate] EmptyRelation \ No newline at end of file diff --git a/benchmarks/expected-plans/q16.txt b/benchmarks/expected-plans/q16.txt index 5268aee703beb..60ef269334b58 100644 --- a/benchmarks/expected-plans/q16.txt +++ b/benchmarks/expected-plans/q16.txt @@ -8,6 +8,7 @@ Sort: supplier_cnt DESC NULLS FIRST, part.p_brand ASC NULLS LAST, part.p_type AS TableScan: partsupp projection=[ps_partkey, ps_suppkey] Filter: part.p_brand != Utf8("Brand#45") AND part.p_type NOT LIKE Utf8("MEDIUM POLISHED%") AND part.p_size IN ([Int32(49), Int32(14), Int32(23), Int32(45), Int32(19), Int32(3), Int32(36), Int32(9)]) TableScan: part projection=[p_partkey, p_brand, p_type, p_size] - Projection: supplier.s_suppkey AS s_suppkey, alias=__sq_1 - Filter: supplier.s_comment LIKE Utf8("%Customer%Complaints%") - TableScan: supplier projection=[s_suppkey, s_comment] + SubqueryAlias: __sq_1 + Projection: supplier.s_suppkey AS s_suppkey + Filter: supplier.s_comment LIKE Utf8("%Customer%Complaints%") + TableScan: supplier projection=[s_suppkey, s_comment] diff --git a/benchmarks/expected-plans/q18.txt b/benchmarks/expected-plans/q18.txt index ce0b20c201e38..4017722c505a2 100644 --- a/benchmarks/expected-plans/q18.txt +++ b/benchmarks/expected-plans/q18.txt @@ -7,7 +7,8 @@ Sort: orders.o_totalprice DESC NULLS FIRST, orders.o_orderdate ASC NULLS LAST TableScan: customer projection=[c_custkey, c_name] TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice, o_orderdate] TableScan: lineitem projection=[l_orderkey, l_quantity] - Projection: lineitem.l_orderkey AS l_orderkey, alias=__sq_1 - Filter: SUM(lineitem.l_quantity) > Decimal128(Some(30000),25,2) - Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_quantity)]] - TableScan: lineitem projection=[l_orderkey, l_quantity] \ No newline at end of file + SubqueryAlias: __sq_1 + Projection: lineitem.l_orderkey AS l_orderkey + Filter: SUM(lineitem.l_quantity) > Decimal128(Some(30000),25,2) + Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_quantity)]] + TableScan: lineitem projection=[l_orderkey, l_quantity] \ No newline at end of file diff --git a/benchmarks/expected-plans/q2.txt b/benchmarks/expected-plans/q2.txt index e9730550939dd..845d79263d891 100644 --- a/benchmarks/expected-plans/q2.txt +++ b/benchmarks/expected-plans/q2.txt @@ -13,13 +13,14 @@ Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplie TableScan: nation projection=[n_nationkey, n_name, n_regionkey] Filter: region.r_name = Utf8("EUROPE") TableScan: region projection=[r_regionkey, r_name] - Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value, alias=__sq_1 - Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]] - Inner Join: nation.n_regionkey = region.r_regionkey - Inner Join: supplier.s_nationkey = nation.n_nationkey - Inner Join: partsupp.ps_suppkey = supplier.s_suppkey - TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost] - TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment] - TableScan: nation projection=[n_nationkey, n_name, n_regionkey] - Filter: region.r_name = Utf8("EUROPE") - TableScan: region projection=[r_regionkey, r_name] \ No newline at end of file + SubqueryAlias: __sq_1 + Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value + Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]] + Inner Join: nation.n_regionkey = region.r_regionkey + Inner Join: supplier.s_nationkey = nation.n_nationkey + Inner Join: partsupp.ps_suppkey = supplier.s_suppkey + TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost] + TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment] + TableScan: nation projection=[n_nationkey, n_name, n_regionkey] + Filter: region.r_name = Utf8("EUROPE") + TableScan: region projection=[r_regionkey, r_name] \ No newline at end of file diff --git a/benchmarks/expected-plans/q20.txt b/benchmarks/expected-plans/q20.txt index 0d095a735c299..1266622ea6c4d 100644 --- a/benchmarks/expected-plans/q20.txt +++ b/benchmarks/expected-plans/q20.txt @@ -5,15 +5,18 @@ Sort: supplier.s_name ASC NULLS LAST TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey] Filter: nation.n_name = Utf8("CANADA") TableScan: nation projection=[n_nationkey, n_name] - Projection: partsupp.ps_suppkey AS ps_suppkey, alias=__sq_2 - Filter: CAST(partsupp.ps_availqty AS Float64) > __sq_3.__value - Inner Join: partsupp.ps_partkey = __sq_3.l_partkey, partsupp.ps_suppkey = __sq_3.l_suppkey - LeftSemi Join: partsupp.ps_partkey = __sq_1.p_partkey - TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty] - Projection: part.p_partkey AS p_partkey, alias=__sq_1 - Filter: part.p_name LIKE Utf8("forest%") - TableScan: part projection=[p_partkey, p_name] - Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value, alias=__sq_3 - Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]] - Filter: lineitem.l_shipdate >= Date32("8766") AND lineitem.l_shipdate < Date32("9131") - TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate] \ No newline at end of file + SubqueryAlias: __sq_2 + Projection: partsupp.ps_suppkey AS ps_suppkey + Filter: CAST(partsupp.ps_availqty AS Float64) > __sq_3.__value + Inner Join: partsupp.ps_partkey = __sq_3.l_partkey, partsupp.ps_suppkey = __sq_3.l_suppkey + LeftSemi Join: partsupp.ps_partkey = __sq_1.p_partkey + TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty] + SubqueryAlias: __sq_1 + Projection: part.p_partkey AS p_partkey + Filter: part.p_name LIKE Utf8("forest%") + TableScan: part projection=[p_partkey, p_name] + SubqueryAlias: __sq_3 + Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value + Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]] + Filter: lineitem.l_shipdate >= Date32("8766") AND lineitem.l_shipdate < Date32("9131") + TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate] \ No newline at end of file diff --git a/benchmarks/expected-plans/q22.txt b/benchmarks/expected-plans/q22.txt index 81c12ba91a878..82060bd59d8f0 100644 --- a/benchmarks/expected-plans/q22.txt +++ b/benchmarks/expected-plans/q22.txt @@ -1,7 +1,7 @@ Sort: custsale.cntrycode ASC NULLS LAST Projection: custsale.cntrycode, COUNT(UInt8(1)) AS numcust, SUM(custsale.c_acctbal) AS totacctbal Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[COUNT(UInt8(1)), SUM(custsale.c_acctbal)]] - Projection: cntrycode, customer.c_acctbal, alias=custsale + SubqueryAlias: custsale Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __sq_1.__value CrossJoin: @@ -9,7 +9,8 @@ Sort: custsale.cntrycode ASC NULLS LAST Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]) TableScan: customer projection=[c_custkey, c_phone, c_acctbal] TableScan: orders projection=[o_custkey] - Projection: AVG(customer.c_acctbal) AS __value, alias=__sq_1 - Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]] - Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]) - TableScan: customer projection=[c_phone, c_acctbal] \ No newline at end of file + SubqueryAlias: __sq_1 + Projection: AVG(customer.c_acctbal) AS __value + Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]] + Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]) + TableScan: customer projection=[c_phone, c_acctbal] \ No newline at end of file diff --git a/benchmarks/expected-plans/q7.txt b/benchmarks/expected-plans/q7.txt index 3a5d88965abe1..74857c6f94ace 100644 --- a/benchmarks/expected-plans/q7.txt +++ b/benchmarks/expected-plans/q7.txt @@ -1,7 +1,7 @@ Sort: shipping.supp_nation ASC NULLS LAST, shipping.cust_nation ASC NULLS LAST, shipping.l_year ASC NULLS LAST Projection: shipping.supp_nation, shipping.cust_nation, shipping.l_year, SUM(shipping.volume) AS revenue Aggregate: groupBy=[[shipping.supp_nation, shipping.cust_nation, shipping.l_year]], aggr=[[SUM(shipping.volume)]] - Projection: supp_nation, cust_nation, l_year, volume, alias=shipping + SubqueryAlias: shipping Projection: n1.n_name AS supp_nation, n2.n_name AS cust_nation, datepart(Utf8("YEAR"), lineitem.l_shipdate) AS l_year, CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) AS volume Filter: (n1.n_name = Utf8("FRANCE") OR n2.n_name = Utf8("FRANCE")) AND (n2.n_name = Utf8("GERMANY") OR n1.n_name = Utf8("GERMANY")) Inner Join: customer.c_nationkey = n2.n_nationkey @@ -19,4 +19,4 @@ Sort: shipping.supp_nation ASC NULLS LAST, shipping.cust_nation ASC NULLS LAST, TableScan: nation projection=[n_nationkey, n_name] Filter: n2.n_name = Utf8("GERMANY") OR n2.n_name = Utf8("FRANCE") SubqueryAlias: n2 - TableScan: nation projection=[n_nationkey, n_name] + TableScan: nation projection=[n_nationkey, n_name] \ No newline at end of file diff --git a/benchmarks/expected-plans/q8.txt b/benchmarks/expected-plans/q8.txt index 1b8d08ef875a6..75e65d835bb4c 100644 --- a/benchmarks/expected-plans/q8.txt +++ b/benchmarks/expected-plans/q8.txt @@ -1,7 +1,7 @@ Sort: all_nations.o_year ASC NULLS LAST Projection: all_nations.o_year, SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END) / SUM(all_nations.volume) AS mkt_share Aggregate: groupBy=[[all_nations.o_year]], aggr=[[SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Decimal128(Some(0),38,4) END) AS SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END), SUM(all_nations.volume)]] - Projection: o_year, volume, nation, alias=all_nations + SubqueryAlias: all_nations Projection: datepart(Utf8("YEAR"), orders.o_orderdate) AS o_year, CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) AS volume, n2.n_name AS nation Projection: lineitem.l_extendedprice, lineitem.l_discount, orders.o_orderdate, n2.n_name Inner Join: n1.n_regionkey = region.r_regionkey diff --git a/benchmarks/expected-plans/q9.txt b/benchmarks/expected-plans/q9.txt index ae7d4f194a8ce..166c98d97106a 100644 --- a/benchmarks/expected-plans/q9.txt +++ b/benchmarks/expected-plans/q9.txt @@ -1,7 +1,7 @@ Sort: profit.nation ASC NULLS LAST, profit.o_year DESC NULLS FIRST Projection: profit.nation, profit.o_year, SUM(profit.amount) AS sum_profit Aggregate: groupBy=[[profit.nation, profit.o_year]], aggr=[[SUM(profit.amount)]] - Projection: nation, o_year, amount, alias=profit + SubqueryAlias: profit Projection: nation.n_name AS nation, datepart(Utf8("YEAR"), orders.o_orderdate) AS o_year, CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) - CAST(partsupp.ps_supplycost * lineitem.l_quantity AS Decimal128(38, 4)) AS amount Projection: lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, partsupp.ps_supplycost, orders.o_orderdate, nation.n_name Inner Join: supplier.s_nationkey = nation.n_nationkey From 133d5eeb186fcec8e1feeb2ae55bc26616600e61 Mon Sep 17 00:00:00 2001 From: jackwener Date: Tue, 22 Nov 2022 20:51:49 +0800 Subject: [PATCH 5/6] correct plan in UT. --- datafusion/core/tests/sql/joins.rs | 14 +- .../optimizer/src/decorrelate_where_in.rs | 131 +++++++------ datafusion/optimizer/src/filter_push_down.rs | 41 ++-- datafusion/optimizer/src/inline_table_scan.rs | 10 +- .../optimizer/src/scalar_subquery_to_join.rs | 178 ++++++++++-------- .../optimizer/src/subquery_filter_to_join.rs | 14 +- .../optimizer/tests/integration-test.rs | 42 +++-- datafusion/sql/src/planner.rs | 135 ++++++------- 8 files changed, 302 insertions(+), 263 deletions(-) diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index 4831cbe4af911..87fb594c79b3b 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -1635,16 +1635,16 @@ async fn reduce_left_join_3() -> Result<()> { let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", " Projection: t3.t1_id, t3.t1_name, t3.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", - " Left Join: t3.t1_int = t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", - " Projection: t1.t1_id, t1.t1_name, t1.t1_int, alias=t3 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", - " Projection: t1.t1_id, t1.t1_name, t1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", + " Left Join: t3.t1_int = t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", + " Filter: t3.t1_id < UInt32(100) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", + " SubqueryAlias: t3 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", " Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", - " Filter: t1.t1_id < UInt32(100) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", - " TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", - " Filter: t2.t2_int < UInt32(3) AND t2.t2_id < UInt32(100) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", + " TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", + " Filter: t2.t2_int < UInt32(3) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", " TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", " TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", - ]; + ] + ; let formatted = plan.display_indent_schema().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); assert_eq!( diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs index fa0367c454d76..ba8fba543102e 100644 --- a/datafusion/optimizer/src/decorrelate_where_in.rs +++ b/datafusion/optimizer/src/decorrelate_where_in.rs @@ -258,14 +258,15 @@ mod tests { .build()?; debug!("plan to optimize:\n{}", plan.display_indent()); - let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - LeftSemi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8] - LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8] - TableScan: customer [c_custkey:Int64, c_name:Utf8] - Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64] - TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] - Projection: orders.o_custkey AS o_custkey, alias=__sq_2 [o_custkey:Int64] - TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#; + let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ + \n LeftSemi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]\ + \n LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ + \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ + \n SubqueryAlias: __sq_1 [o_custkey:Int64]\ + \n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\ + \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ + \n SubqueryAlias: __sq_2 [o_custkey:Int64]\n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\ + \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; assert_optimized_plan_eq(&DecorrelateWhereIn::new(), &plan, expected); Ok(()) } @@ -296,14 +297,16 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - LeftSemi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8] - TableScan: customer [c_custkey:Int64, c_name:Utf8] - Projection: orders.o_custkey AS o_custkey, alias=__sq_2 [o_custkey:Int64] - LeftSemi Join: orders.o_orderkey = __sq_1.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] - TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] - Projection: lineitem.l_orderkey AS l_orderkey, alias=__sq_1 [l_orderkey:Int64] - TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]"#; + let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ + \n LeftSemi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]\ + \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ + \n SubqueryAlias: __sq_2 [o_custkey:Int64]\ + \n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\ + \n LeftSemi Join: orders.o_orderkey = __sq_1.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ + \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ + \n SubqueryAlias: __sq_1 [l_orderkey:Int64]\ + \n Projection: lineitem.l_orderkey AS l_orderkey [l_orderkey:Int64]\ + \n TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]"; assert_optimized_plan_eq(&DecorrelateWhereIn::new(), &plan, expected); Ok(()) @@ -328,12 +331,13 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8] - TableScan: customer [c_custkey:Int64, c_name:Utf8] - Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64] - Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] - TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#; + let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ + \n LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ + \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ + \n SubqueryAlias: __sq_1 [o_custkey:Int64]\ + \n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\ + \n Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ + \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; assert_optimized_plan_eq(&DecorrelateWhereIn::new(), &plan, expected); Ok(()) @@ -355,12 +359,13 @@ mod tests { .build()?; // Query will fail, but we can still transform the plan - let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8] - TableScan: customer [c_custkey:Int64, c_name:Utf8] - Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64] - Filter: customer.c_custkey = customer.c_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] - TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#; + let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ + \n LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ + \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ + \n SubqueryAlias: __sq_1 [o_custkey:Int64]\ + \n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\ + \n Filter: customer.c_custkey = customer.c_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ + \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; assert_optimized_plan_eq(&DecorrelateWhereIn::new(), &plan, expected); Ok(()) @@ -381,12 +386,13 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8] - TableScan: customer [c_custkey:Int64, c_name:Utf8] - Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64] - Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] - TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#; + let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ + \n LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ + \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ + \n SubqueryAlias: __sq_1 [o_custkey:Int64]\ + \n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\ + \n Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ + \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; assert_optimized_plan_eq(&DecorrelateWhereIn::new(), &plan, expected); Ok(()) @@ -407,11 +413,12 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - LeftSemi Join: customer.c_custkey = __sq_1.o_custkey Filter: customer.c_custkey != orders.o_custkey [c_custkey:Int64, c_name:Utf8] - TableScan: customer [c_custkey:Int64, c_name:Utf8] - Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64] - TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#; + let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ + \n LeftSemi Join: customer.c_custkey = __sq_1.o_custkey Filter: customer.c_custkey != orders.o_custkey [c_custkey:Int64, c_name:Utf8]\ + \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ + \n SubqueryAlias: __sq_1 [o_custkey:Int64]\ + \n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\ + \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; assert_optimized_plan_eq(&DecorrelateWhereIn::new(), &plan, expected); Ok(()) @@ -583,12 +590,13 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8] - LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8] - TableScan: customer [c_custkey:Int64, c_name:Utf8] - Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64] - TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#; + let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ + \n Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]\ + \n LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ + \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ + \n SubqueryAlias: __sq_1 [o_custkey:Int64]\ + \n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\ + \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; assert_optimized_plan_eq(&DecorrelateWhereIn::new(), &plan, expected); Ok(()) @@ -640,11 +648,12 @@ mod tests { .project(vec![col("test.b")])? .build()?; - let expected = r#"Projection: test.b [b:UInt32] - LeftSemi Join: test.c = __sq_1.c, test.a = __sq_1.a [a:UInt32, b:UInt32, c:UInt32] - TableScan: test [a:UInt32, b:UInt32, c:UInt32] - Projection: sq.c AS c, sq.a AS a, alias=__sq_1 [c:UInt32, a:UInt32] - TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"#; + let expected = "Projection: test.b [b:UInt32]\ + \n LeftSemi Join: test.c = __sq_1.c, test.a = __sq_1.a [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ + \n SubqueryAlias: __sq_1 [c:UInt32, a:UInt32]\ + \n Projection: sq.c AS c, sq.a AS a [c:UInt32, a:UInt32]\ + \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_eq(&DecorrelateWhereIn::new(), &plan, expected); Ok(()) @@ -659,11 +668,12 @@ mod tests { .project(vec![col("test.b")])? .build()?; - let expected = r#"Projection: test.b [b:UInt32] - LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32] - TableScan: test [a:UInt32, b:UInt32, c:UInt32] - Projection: sq.c AS c, alias=__sq_1 [c:UInt32] - TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"#; + let expected = "Projection: test.b [b:UInt32]\ + \n LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ + \n SubqueryAlias: __sq_1 [c:UInt32]\ + \n Projection: sq.c AS c [c:UInt32]\ + \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_eq(&DecorrelateWhereIn::new(), &plan, expected); Ok(()) @@ -678,11 +688,12 @@ mod tests { .project(vec![col("test.b")])? .build()?; - let expected = r#"Projection: test.b [b:UInt32] - LeftAnti Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32] - TableScan: test [a:UInt32, b:UInt32, c:UInt32] - Projection: sq.c AS c, alias=__sq_1 [c:UInt32] - TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"#; + let expected = "Projection: test.b [b:UInt32]\ + \n LeftAnti Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ + \n SubqueryAlias: __sq_1 [c:UInt32]\ + \n Projection: sq.c AS c [c:UInt32]\ + \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_eq(&DecorrelateWhereIn::new(), &plan, expected); Ok(()) diff --git a/datafusion/optimizer/src/filter_push_down.rs b/datafusion/optimizer/src/filter_push_down.rs index a50124cb26275..2f8a8a8b4d881 100644 --- a/datafusion/optimizer/src/filter_push_down.rs +++ b/datafusion/optimizer/src/filter_push_down.rs @@ -1193,14 +1193,15 @@ mod tests { .build()?; // filter appears below Union - let expected = "\ - Union\ - \n Projection: test.a AS b, alias=test2\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test\ - \n Projection: test.a AS b, alias=test2\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test"; + let expected = "Union\ + \n SubqueryAlias: test2\ + \n Projection: test.a AS b\ + \n Filter: test.a = Int64(1)\ + \n TableScan: test\ + \n SubqueryAlias: test2\ + \n Projection: test.a AS b\ + \n Filter: test.a = Int64(1)\ + \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); Ok(()) } @@ -2299,22 +2300,24 @@ mod tests { .project(vec![col("b.a")])? .build()?; - let expected_before = "\ - Projection: b.a\ + let expected_before = "Projection: b.a\ \n Filter: b.a = Int64(1)\ - \n Projection: b.a, alias=b\ - \n Projection: Int64(0) AS a, alias=b\ - \n EmptyRelation"; + \n SubqueryAlias: b\ + \n Projection: b.a\ + \n SubqueryAlias: b\ + \n Projection: Int64(0) AS a\ + \n EmptyRelation"; assert_eq!(format!("{:?}", plan), expected_before); // Ensure that the predicate without any columns (0 = 1) is // still there. - let expected_after = "\ - Projection: b.a\ - \n Projection: b.a, alias=b\ - \n Projection: Int64(0) AS a, alias=b\ - \n Filter: Int64(0) = Int64(1)\ - \n EmptyRelation"; + let expected_after = "Projection: b.a\ + \n Filter: b.a = Int64(1)\ + \n SubqueryAlias: b\ + \n Projection: b.a\ + \n SubqueryAlias: b\ + \n Projection: Int64(0) AS a\ + \n EmptyRelation"; assert_optimized_plan_eq(&plan, expected_after); Ok(()) diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs index e790ce0b723a2..4004a2b3da6fe 100644 --- a/datafusion/optimizer/src/inline_table_scan.rs +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -110,6 +110,7 @@ mod tests { pub struct CustomSource { plan: LogicalPlan, } + impl CustomSource { fn new() -> Self { Self { @@ -120,6 +121,7 @@ mod tests { } } } + impl TableSource for CustomSource { fn as_any(&self) -> &dyn std::any::Any { self @@ -156,10 +158,10 @@ mod tests { .optimize(&plan, &mut OptimizerConfig::new()) .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); - let expected = "\ - Filter: x.a = Int32(1)\ - \n Projection: y.a, alias=x\ - \n TableScan: y"; + let expected = "Filter: x.a = Int32(1)\ + \n SubqueryAlias: x\ + \n Projection: y.a\ + \n TableScan: y"; assert_eq!(formatted_plan, expected); assert_eq!(plan.schema(), optimized_plan.schema()); diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 0e53ecfae6a31..5e5167ab4e160 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -393,18 +393,20 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - Filter: Int32(1) < __sq_2.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N, o_custkey:Int64, __value:Int64;N] - Inner Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N, o_custkey:Int64, __value:Int64;N] - Filter: Int32(1) < __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N] - Inner Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N] - TableScan: customer [c_custkey:Int64, c_name:Utf8] - Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value, alias=__sq_1 [o_custkey:Int64, __value:Int64;N] - Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N] - TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] - Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value, alias=__sq_2 [o_custkey:Int64, __value:Int64;N] - Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N] - TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#; + let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ + \n Filter: Int32(1) < __sq_2.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N, o_custkey:Int64, __value:Int64;N]\ + \n Inner Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N, o_custkey:Int64, __value:Int64;N]\ + \n Filter: Int32(1) < __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ + \n Inner Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ + \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ + \n SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Int64;N]\ + \n Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\ + \n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\ + \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ + \n SubqueryAlias: __sq_2 [o_custkey:Int64, __value:Int64;N]\ + \n Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\ + \n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\ + \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; assert_optimized_plan_eq(&ScalarSubqueryToJoin::new(), &plan, expected); Ok(()) } @@ -440,18 +442,20 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - Filter: customer.c_acctbal < __sq_2.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Float64;N] - Inner Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Float64;N] - TableScan: customer [c_custkey:Int64, c_name:Utf8] - Projection: orders.o_custkey, SUM(orders.o_totalprice) AS __value, alias=__sq_2 [o_custkey:Int64, __value:Float64;N] - Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]] [o_custkey:Int64, SUM(orders.o_totalprice):Float64;N] - Filter: orders.o_totalprice < __sq_1.__value [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N, l_orderkey:Int64, __value:Float64;N] - Inner Join: orders.o_orderkey = __sq_1.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N, l_orderkey:Int64, __value:Float64;N] - TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] - Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice) AS __value, alias=__sq_1 [l_orderkey:Int64, __value:Float64;N] - Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]] [l_orderkey:Int64, SUM(lineitem.l_extendedprice):Float64;N] - TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]"#; + let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ + \n Filter: customer.c_acctbal < __sq_2.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Float64;N]\ + \n Inner Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Float64;N]\ + \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ + \n SubqueryAlias: __sq_2 [o_custkey:Int64, __value:Float64;N]\ + \n Projection: orders.o_custkey, SUM(orders.o_totalprice) AS __value [o_custkey:Int64, __value:Float64;N]\ + \n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]] [o_custkey:Int64, SUM(orders.o_totalprice):Float64;N]\ + \n Filter: orders.o_totalprice < __sq_1.__value [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N, l_orderkey:Int64, __value:Float64;N]\ + \n Inner Join: orders.o_orderkey = __sq_1.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N, l_orderkey:Int64, __value:Float64;N]\ + \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ + \n SubqueryAlias: __sq_1 [l_orderkey:Int64, __value:Float64;N]\ + \n Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice) AS __value [l_orderkey:Int64, __value:Float64;N]\ + \n Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]] [l_orderkey:Int64, SUM(lineitem.l_extendedprice):Float64;N]\ + \n TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]"; assert_optimized_plan_eq(&ScalarSubqueryToJoin::new(), &plan, expected); Ok(()) } @@ -476,13 +480,14 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - Inner Join: customer.c_custkey = __sq_1.o_custkey, customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N] - TableScan: customer [c_custkey:Int64, c_name:Utf8] - Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value, alias=__sq_1 [o_custkey:Int64, __value:Int64;N] - Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N] - Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] - TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#; + let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ + \n Inner Join: customer.c_custkey = __sq_1.o_custkey, customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ + \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ + \n SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Int64;N]\ + \n Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\ + \n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\ + \n Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ + \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; assert_optimized_plan_eq(&ScalarSubqueryToJoin::new(), &plan, expected); Ok(()) @@ -505,14 +510,15 @@ mod tests { .build()?; // it will optimize, but fail for the same reason the unoptimized query would - let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - Filter: customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N] - CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N] - TableScan: customer [c_custkey:Int64, c_name:Utf8] - Projection: MAX(orders.o_custkey) AS __value, alias=__sq_1 [__value:Int64;N] - Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N] - Filter: customer.c_custkey = customer.c_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] - TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#; + let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ + \n Filter: customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\ + \n CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\ + \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ + \n SubqueryAlias: __sq_1 [__value:Int64;N]\ + \n Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\ + \n Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\ + \n Filter: customer.c_custkey = customer.c_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ + \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; assert_optimized_plan_eq(&ScalarSubqueryToJoin::new(), &plan, expected); Ok(()) } @@ -533,14 +539,15 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - Filter: customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N] - CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N] - TableScan: customer [c_custkey:Int64, c_name:Utf8] - Projection: MAX(orders.o_custkey) AS __value, alias=__sq_1 [__value:Int64;N] - Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N] - Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] - TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#; + let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ + \n Filter: customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\ + \n CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\ + \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ + \n SubqueryAlias: __sq_1 [__value:Int64;N]\ + \n Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\ + \n Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\ + \n Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ + \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; assert_optimized_plan_eq(&ScalarSubqueryToJoin::new(), &plan, expected); Ok(()) @@ -710,14 +717,15 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N] - Filter: customer.c_custkey >= __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N] - Inner Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N] - TableScan: customer [c_custkey:Int64, c_name:Utf8] - Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value, alias=__sq_1 [o_custkey:Int64, __value:Int64;N] - Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N] - TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#; + let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ + \n Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ + \n Filter: customer.c_custkey >= __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ + \n Inner Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ + \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ + \n SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Int64;N]\ + \n Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\ + \n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\ + \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; assert_optimized_plan_eq(&ScalarSubqueryToJoin::new(), &plan, expected); Ok(()) @@ -742,13 +750,14 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N] - Inner Join: customer.c_custkey = __sq_1.o_custkey, customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N] - TableScan: customer [c_custkey:Int64, c_name:Utf8] - Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value, alias=__sq_1 [o_custkey:Int64, __value:Int64;N] - Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N] - TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#; + let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ + \n Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ + \n Inner Join: customer.c_custkey = __sq_1.o_custkey, customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ + \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ + \n SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Int64;N]\ + \n Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\ + \n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\ + \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; assert_optimized_plan_eq(&ScalarSubqueryToJoin::new(), &plan, expected); Ok(()) @@ -803,13 +812,14 @@ mod tests { .project(vec![col("test.c")])? .build()?; - let expected = r#"Projection: test.c [c:UInt32] - Filter: test.c < __sq_1.__value [a:UInt32, b:UInt32, c:UInt32, a:UInt32, __value:UInt32;N] - Inner Join: test.a = __sq_1.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, __value:UInt32;N] - TableScan: test [a:UInt32, b:UInt32, c:UInt32] - Projection: sq.a, MIN(sq.c) AS __value, alias=__sq_1 [a:UInt32, __value:UInt32;N] - Aggregate: groupBy=[[sq.a]], aggr=[[MIN(sq.c)]] [a:UInt32, MIN(sq.c):UInt32;N] - TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"#; + let expected = "Projection: test.c [c:UInt32]\ + \n Filter: test.c < __sq_1.__value [a:UInt32, b:UInt32, c:UInt32, a:UInt32, __value:UInt32;N]\ + \n Inner Join: test.a = __sq_1.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, __value:UInt32;N]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ + \n SubqueryAlias: __sq_1 [a:UInt32, __value:UInt32;N]\ + \n Projection: sq.a, MIN(sq.c) AS __value [a:UInt32, __value:UInt32;N]\ + \n Aggregate: groupBy=[[sq.a]], aggr=[[MIN(sq.c)]] [a:UInt32, MIN(sq.c):UInt32;N]\ + \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_eq(&ScalarSubqueryToJoin::new(), &plan, expected); Ok(()) @@ -830,13 +840,14 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - Filter: customer.c_custkey < __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N] - CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N] - TableScan: customer [c_custkey:Int64, c_name:Utf8] - Projection: MAX(orders.o_custkey) AS __value, alias=__sq_1 [__value:Int64;N] - Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N] - TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#; + let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ + \n Filter: customer.c_custkey < __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\ + \n CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\ + \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ + \n SubqueryAlias: __sq_1 [__value:Int64;N]\ + \n Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\ + \n Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\ + \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; assert_optimized_plan_eq(&ScalarSubqueryToJoin::new(), &plan, expected); Ok(()) @@ -856,13 +867,14 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - Filter: customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N] - CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N] - TableScan: customer [c_custkey:Int64, c_name:Utf8] - Projection: MAX(orders.o_custkey) AS __value, alias=__sq_1 [__value:Int64;N] - Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N] - TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#; + let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ + \n Filter: customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\ + \n CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\ + \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ + \n SubqueryAlias: __sq_1 [__value:Int64;N]\ + \n Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\ + \n Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\ + \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; assert_optimized_plan_eq(&ScalarSubqueryToJoin::new(), &plan, expected); Ok(()) diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs b/datafusion/optimizer/src/subquery_filter_to_join.rs index c1717da0dce75..504167d21fe20 100644 --- a/datafusion/optimizer/src/subquery_filter_to_join.rs +++ b/datafusion/optimizer/src/subquery_filter_to_join.rs @@ -410,13 +410,15 @@ mod tests { let expected = "Projection: wrapped.b [b:UInt32]\ \n Filter: wrapped.b < UInt32(30) OR wrapped.c IN () [b:UInt32, c:UInt32]\ - \n Subquery: [c:UInt32]\n Projection: sq_outer.c [c:UInt32]\ + \n Subquery: [c:UInt32]\ + \n Projection: sq_outer.c [c:UInt32]\ \n TableScan: sq_outer [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: test.b, test.c, alias=wrapped [b:UInt32, c:UInt32]\ - \n LeftSemi Join: test.c = sq_inner.c [a:UInt32, b:UInt32, c:UInt32]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: sq_inner.c [c:UInt32]\ - \n TableScan: sq_inner [a:UInt32, b:UInt32, c:UInt32]"; + \n SubqueryAlias: wrapped [b:UInt32, c:UInt32]\ + \n Projection: test.b, test.c [b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.c = sq_inner.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: sq_inner.c [c:UInt32]\ + \n TableScan: sq_inner [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_eq(&plan, expected); Ok(()) diff --git a/datafusion/optimizer/tests/integration-test.rs b/datafusion/optimizer/tests/integration-test.rs index fb27ed5edf443..c4911439cd3c3 100644 --- a/datafusion/optimizer/tests/integration-test.rs +++ b/datafusion/optimizer/tests/integration-test.rs @@ -63,14 +63,15 @@ fn subquery_filter_with_cast() -> Result<()> { AND (cast('2002-05-08' as date) + interval '5 days')\ )"; let plan = test_sql(sql)?; - let expected = - "Projection: test.col_int32\n Filter: CAST(test.col_int32 AS Float64) > __sq_1.__value\ - \n CrossJoin:\ - \n TableScan: test projection=[col_int32]\ - \n Projection: AVG(test.col_int32) AS __value, alias=__sq_1\ - \n Aggregate: groupBy=[[]], aggr=[[AVG(test.col_int32)]]\ - \n Filter: test.col_utf8 >= Utf8(\"2002-05-08\") AND test.col_utf8 <= Utf8(\"2002-05-13\")\ - \n TableScan: test projection=[col_int32, col_utf8]"; + let expected = "Projection: test.col_int32\ + \n Filter: CAST(test.col_int32 AS Float64) > __sq_1.__value\ + \n CrossJoin:\ + \n TableScan: test projection=[col_int32]\ + \n SubqueryAlias: __sq_1\ + \n Projection: AVG(test.col_int32) AS __value\ + \n Aggregate: groupBy=[[]], aggr=[[AVG(test.col_int32)]]\ + \n Filter: test.col_utf8 >= Utf8(\"2002-05-08\") AND test.col_utf8 <= Utf8(\"2002-05-13\")\ + \n TableScan: test projection=[col_int32, col_utf8]"; assert_eq!(expected, format!("{:?}", plan)); Ok(()) } @@ -271,14 +272,14 @@ fn propagate_empty_relation() { fn join_keys_in_subquery_alias() { let sql = "SELECT * FROM test AS A, ( SELECT col_int32 as key FROM test ) AS B where A.col_int32 = B.key;"; let plan = test_sql(sql).unwrap(); - let expected = "Projection: a.col_int32, a.col_uint32, a.col_utf8, a.col_date32, a.col_date64, a.col_ts_nano_none, a.col_ts_nano_utc, b.key\ + let expected = "Projection: a.col_int32, a.col_uint32, a.col_utf8, a.col_date32, a.col_date64, a.col_ts_nano_none, a.col_ts_nano_utc, b.key\ \n Inner Join: a.col_int32 = b.key\ \n Filter: a.col_int32 IS NOT NULL\ \n SubqueryAlias: a\ \n TableScan: test projection=[col_int32, col_uint32, col_utf8, col_date32, col_date64, col_ts_nano_none, col_ts_nano_utc]\ - \n Projection: key, alias=b\ - \n Projection: test.col_int32 AS key\ - \n Filter: test.col_int32 IS NOT NULL\ + \n Filter: b.key IS NOT NULL\ + \n SubqueryAlias: b\ + \n Projection: test.col_int32 AS key\ \n TableScan: test projection=[col_int32]"; assert_eq!(expected, format!("{:?}", plan)); } @@ -292,14 +293,15 @@ fn join_keys_in_subquery_alias_1() { \n Filter: a.col_int32 IS NOT NULL\ \n SubqueryAlias: a\ \n TableScan: test projection=[col_int32, col_uint32, col_utf8, col_date32, col_date64, col_ts_nano_none, col_ts_nano_utc]\ - \n Projection: key, alias=b\ - \n Projection: test.col_int32 AS key\ - \n Inner Join: test.col_int32 = c.col_int32\ - \n Filter: test.col_int32 IS NOT NULL\ - \n TableScan: test projection=[col_int32]\ - \n Filter: c.col_int32 IS NOT NULL\ - \n SubqueryAlias: c\ - \n TableScan: test projection=[col_int32]"; + \n Filter: b.key IS NOT NULL\ + \n SubqueryAlias: b\ + \n Projection: test.col_int32 AS key\ + \n Inner Join: test.col_int32 = c.col_int32\ + \n Filter: test.col_int32 IS NOT NULL\ + \n TableScan: test projection=[col_int32]\ + \n Filter: c.col_int32 IS NOT NULL\ + \n SubqueryAlias: c\ + \n TableScan: test projection=[col_int32]"; assert_eq!(expected, format!("{:?}", plan)); } diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 718d601891382..36b45390e47af 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -3201,9 +3201,8 @@ mod tests { quick_test( "SELECT CAST (a AS FLOAT) FROM (SELECT 1 AS a)", "Projection: CAST(a AS Float32)\ - \n Projection: a\ - \n Projection: Int64(1) AS a\ - \n EmptyRelation", + \n Projection: Int64(1) AS a\ + \n EmptyRelation", ); } @@ -3403,11 +3402,11 @@ mod tests { ) AS a ) AS b"; let expected = "Projection: b.fn2, b.last_name\ - \n Projection: fn2, a.last_name, a.birth_date, alias=b\ - \n Projection: a.fn1 AS fn2, a.last_name, a.birth_date\ - \n Projection: fn1, person.last_name, person.birth_date, person.age, alias=a\ - \n Projection: person.first_name AS fn1, person.last_name, person.birth_date, person.age\ - \n TableScan: person"; + \n SubqueryAlias: b\ + \n Projection: a.fn1 AS fn2, a.last_name, a.birth_date\ + \n SubqueryAlias: a\ + \n Projection: person.first_name AS fn1, person.last_name, person.birth_date, person.age\ + \n TableScan: person"; quick_test(sql, expected); } @@ -3422,11 +3421,11 @@ mod tests { WHERE fn1 = 'X' AND age < 30"; let expected = "Projection: a.fn1, a.age\ - \n Filter: a.fn1 = Utf8(\"X\") AND a.age < Int64(30)\ - \n Projection: fn1, person.age, alias=a\ - \n Projection: person.first_name AS fn1, person.age\ - \n Filter: person.age > Int64(20)\ - \n TableScan: person"; + \n Filter: a.fn1 = Utf8(\"X\") AND a.age < Int64(30)\ + \n SubqueryAlias: a\ + \n Projection: person.first_name AS fn1, person.age\ + \n Filter: person.age > Int64(20)\ + \n TableScan: person"; quick_test(sql, expected); } @@ -3436,9 +3435,10 @@ mod tests { let sql = "SELECT a, b, c FROM lineitem l (a, b, c)"; let expected = "Projection: l.a, l.b, l.c\ - \n Projection: l.l_item_id AS a, l.l_description AS b, l.price AS c, alias=l\ - \n SubqueryAlias: l\ - \n TableScan: lineitem"; + \n SubqueryAlias: l\ + \n Projection: l.l_item_id AS a, l.l_description AS b, l.price AS c\ + \n SubqueryAlias: l\ + \n TableScan: lineitem"; quick_test(sql, expected); } @@ -3796,10 +3796,10 @@ mod tests { quick_test( "SELECT * FROM (SELECT first_name, last_name FROM person) AS a GROUP BY first_name, last_name", "Projection: a.first_name, a.last_name\ - \n Aggregate: groupBy=[[a.first_name, a.last_name]], aggr=[[]]\ - \n Projection: person.first_name, person.last_name, alias=a\ - \n Projection: person.first_name, person.last_name\ - \n TableScan: person", + \n Aggregate: groupBy=[[a.first_name, a.last_name]], aggr=[[]]\ + \n SubqueryAlias: a\ + \n Projection: person.first_name, person.last_name\ + \n TableScan: person", ); } @@ -3865,9 +3865,10 @@ mod tests { quick_test( "SELECT col1, col2 FROM (VALUES (TIMESTAMP '2021-06-10 17:01:00Z', DATE '2004-04-09')) as t (col1, col2)", "Projection: t.col1, t.col2\ - \n Projection: t.column1 AS col1, t.column2 AS col2, alias=t\ - \n Projection: column1, column2, alias=t\ - \n Values: (CAST(Utf8(\"2021-06-10 17:01:00Z\") AS Timestamp(Nanosecond, None)), CAST(Utf8(\"2004-04-09\") AS Date32))", + \n SubqueryAlias: t\ + \n Projection: t.column1 AS col1, t.column2 AS col2\ + \n SubqueryAlias: t\ + \n Values: (CAST(Utf8(\"2021-06-10 17:01:00Z\") AS Timestamp(Nanosecond, None)), CAST(Utf8(\"2004-04-09\") AS Date32))", ); } @@ -4703,17 +4704,17 @@ mod tests { fn sorted_union_with_different_types_and_group_by() { let sql = "SELECT a FROM (select 1 a) x GROUP BY 1 UNION ALL (SELECT a FROM (select 1.1 a) x GROUP BY 1) ORDER BY 1"; let expected = "Sort: a ASC NULLS LAST\ - \n Union\ - \n Projection: CAST(x.a AS Float64) AS a\ - \n Aggregate: groupBy=[[x.a]], aggr=[[]]\ - \n Projection: a, alias=x\ - \n Projection: Int64(1) AS a\ - \n EmptyRelation\ - \n Projection: x.a\ - \n Aggregate: groupBy=[[x.a]], aggr=[[]]\ - \n Projection: a, alias=x\ - \n Projection: Float64(1.1) AS a\ - \n EmptyRelation"; + \n Union\ + \n Projection: CAST(x.a AS Float64) AS a\ + \n Aggregate: groupBy=[[x.a]], aggr=[[]]\ + \n SubqueryAlias: x\ + \n Projection: Int64(1) AS a\ + \n EmptyRelation\ + \n Projection: x.a\ + \n Aggregate: groupBy=[[x.a]], aggr=[[]]\ + \n SubqueryAlias: x\ + \n Projection: Float64(1.1) AS a\ + \n EmptyRelation"; quick_test(sql, expected); } @@ -4721,16 +4722,16 @@ mod tests { fn union_with_binary_expr_and_cast() { let sql = "SELECT cast(0.0 + a as integer) FROM (select 1 a) x GROUP BY 1 UNION ALL (SELECT 2.1 + a FROM (select 1 a) x GROUP BY 1)"; let expected = "Union\ - \n Projection: CAST(Float64(0) + x.a AS Float64) AS Float64(0) + x.a\ - \n Aggregate: groupBy=[[CAST(Float64(0) + x.a AS Int32)]], aggr=[[]]\ - \n Projection: a, alias=x\ - \n Projection: Int64(1) AS a\ - \n EmptyRelation\ - \n Projection: Float64(2.1) + x.a\ - \n Aggregate: groupBy=[[Float64(2.1) + x.a]], aggr=[[]]\ - \n Projection: a, alias=x\ - \n Projection: Int64(1) AS a\ - \n EmptyRelation"; + \n Projection: CAST(Float64(0) + x.a AS Float64) AS Float64(0) + x.a\ + \n Aggregate: groupBy=[[CAST(Float64(0) + x.a AS Int32)]], aggr=[[]]\ + \n SubqueryAlias: x\ + \n Projection: Int64(1) AS a\ + \n EmptyRelation\ + \n Projection: Float64(2.1) + x.a\ + \n Aggregate: groupBy=[[Float64(2.1) + x.a]], aggr=[[]]\ + \n SubqueryAlias: x\ + \n Projection: Int64(1) AS a\ + \n EmptyRelation"; quick_test(sql, expected); } @@ -4738,16 +4739,16 @@ mod tests { fn union_with_aliases() { let sql = "SELECT a as a1 FROM (select 1 a) x GROUP BY 1 UNION ALL (SELECT a as a1 FROM (select 1.1 a) x GROUP BY 1)"; let expected = "Union\ - \n Projection: CAST(x.a AS Float64) AS a1\ - \n Aggregate: groupBy=[[x.a]], aggr=[[]]\ - \n Projection: a, alias=x\ - \n Projection: Int64(1) AS a\ - \n EmptyRelation\ - \n Projection: x.a AS a1\ - \n Aggregate: groupBy=[[x.a]], aggr=[[]]\ - \n Projection: a, alias=x\ - \n Projection: Float64(1.1) AS a\ - \n EmptyRelation"; + \n Projection: CAST(x.a AS Float64) AS a1\ + \n Aggregate: groupBy=[[x.a]], aggr=[[]]\ + \n SubqueryAlias: x\ + \n Projection: Int64(1) AS a\ + \n EmptyRelation\ + \n Projection: x.a AS a1\ + \n Aggregate: groupBy=[[x.a]], aggr=[[]]\ + \n SubqueryAlias: x\ + \n Projection: Float64(1.1) AS a\ + \n EmptyRelation"; quick_test(sql, expected); } @@ -5495,8 +5496,9 @@ mod tests { \n Subquery:\ \n Projection: cte.id, cte.first_name, cte.last_name, cte.age, cte.state, cte.salary, cte.birth_date, cte.😀\ \n Filter: cte.id = person.id\ - \n Projection: person.id, person.first_name, person.last_name, person.age, person.state, person.salary, person.birth_date, person.😀, alias=cte\ - \n TableScan: person\ + \n SubqueryAlias: cte\ + \n Projection: person.id, person.first_name, person.last_name, person.age, person.state, person.salary, person.birth_date, person.😀\ + \n TableScan: person\ \n TableScan: person"; quick_test(sql, expected) @@ -5511,8 +5513,9 @@ mod tests { SELECT * FROM numbers;"; let expected = "Projection: numbers.a, numbers.b, numbers.c\ - \n Projection: Int64(1) AS a, Int64(2) AS b, Int64(3) AS c, alias=numbers\ - \n EmptyRelation"; + \n SubqueryAlias: numbers\ + \n Projection: Int64(1) AS a, Int64(2) AS b, Int64(3) AS c\ + \n EmptyRelation"; quick_test(sql, expected) } @@ -5526,9 +5529,11 @@ mod tests { SELECT * FROM numbers;"; let expected = "Projection: numbers.a, numbers.b, numbers.c\ - \n Projection: numbers.Int64(1) AS a, numbers.Int64(2) AS b, numbers.Int64(3) AS c, alias=numbers\ - \n Projection: Int64(1), Int64(2), Int64(3), alias=numbers\ - \n EmptyRelation"; + \n SubqueryAlias: numbers\ + \n Projection: numbers.Int64(1) AS a, numbers.Int64(2) AS b, numbers.Int64(3) AS c\ + \n SubqueryAlias: numbers\ + \n Projection: Int64(1), Int64(2), Int64(3)\ + \n EmptyRelation"; quick_test(sql, expected) } @@ -5543,9 +5548,11 @@ mod tests { SELECT * FROM numbers;"; let expected = "Projection: numbers.a, numbers.b, numbers.c\ - \n Projection: numbers.x AS a, numbers.y AS b, numbers.z AS c, alias=numbers\ - \n Projection: Int64(1) AS x, Int64(2) AS y, Int64(3) AS z, alias=numbers\ - \n EmptyRelation"; + \n SubqueryAlias: numbers\ + \n Projection: numbers.x AS a, numbers.y AS b, numbers.z AS c\ + \n SubqueryAlias: numbers\ + \n Projection: Int64(1) AS x, Int64(2) AS y, Int64(3) AS z\ + \n EmptyRelation"; quick_test(sql, expected) } From 48b49b27f0a4d4185effbe1ea1c0857beb986245 Mon Sep 17 00:00:00 2001 From: jackwener Date: Sat, 26 Nov 2022 21:33:55 +0800 Subject: [PATCH 6/6] fix conflict and add doc for `with_alias` --- datafusion/expr/src/logical_plan/builder.rs | 1 + datafusion/optimizer/src/optimizer.rs | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index af9f66ca98c23..71097f5a61c6a 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -989,6 +989,7 @@ pub fn project_with_alias( } } +/// Create a SubqueryAlias to wrap a LogicalPlan. pub fn with_alias(plan: LogicalPlan, alias: String) -> LogicalPlan { let plan_schema = &**plan.schema(); let schema = (plan_schema.clone()).replace_qualifier(alias.as_str()); diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 2ac3fda23bd40..5f442bb69fcfa 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -366,7 +366,6 @@ mod tests { expr: vec![col("a"), col("b"), col("c")], input, schema: add_metadata_to_fields(input_schema.as_ref()), - alias: None, }); // optimizing should be ok, but the schema will have changed (no metadata)