From 68bc5cd01cd7a09623bf02ef89f7d7d3cb268597 Mon Sep 17 00:00:00 2001 From: jackwener Date: Wed, 16 Nov 2022 13:44:21 +0800 Subject: [PATCH] separate `projection` and `alias-projection` --- datafusion/core/tests/sql/explain_analyze.rs | 7 +- datafusion/core/tests/sql/subqueries.rs | 65 ++++++------ datafusion/core/tests/sql/window.rs | 52 +++++----- datafusion/expr/src/logical_plan/builder.rs | 98 +++++++++++++------ .../optimizer/src/decorrelate_where_in.rs | 2 +- datafusion/optimizer/src/filter_push_down.rs | 6 +- datafusion/optimizer/src/inline_table_scan.rs | 2 +- .../optimizer/src/projection_push_down.rs | 1 + .../optimizer/src/scalar_subquery_to_join.rs | 2 +- .../optimizer/src/subquery_filter_to_join.rs | 2 +- datafusion/proto/src/logical_plan.rs | 24 +++-- datafusion/sql/src/planner.rs | 49 +++++++--- 12 files changed, 190 insertions(+), 120 deletions(-) diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 693a99984ce54..9647d9e8ccc9d 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -744,9 +744,10 @@ async fn test_physical_plan_display_indent_multi_children() { " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([Column { name: \"c2\", index: 0 }], 9000)", " ProjectionExec: expr=[c2@0 as c2]", - " ProjectionExec: expr=[c1@0 as c2]", - " RepartitionExec: partitioning=RoundRobinBatch(9000)", - " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1, c2]", + " ProjectionExec: expr=[c2@0 as c2]", + " ProjectionExec: expr=[c1@0 as c2]", + " RepartitionExec: partitioning=RoundRobinBatch(9000)", + " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1, c2]", ]; let normalizer = ExplainNormalizer::new(); diff --git a/datafusion/core/tests/sql/subqueries.rs b/datafusion/core/tests/sql/subqueries.rs index 064ef3a35edfe..fbd6aec1b0760 100644 --- a/datafusion/core/tests/sql/subqueries.rs +++ b/datafusion/core/tests/sql/subqueries.rs @@ -56,13 +56,15 @@ where c_acctbal < ( Inner Join: customer.c_custkey = __sq_2.o_custkey TableScan: customer projection=[c_custkey, c_acctbal] Projection: orders.o_custkey, SUM(orders.o_totalprice) AS __value, alias=__sq_2 - Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]] - Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < __sq_1.__value - Inner Join: orders.o_orderkey = __sq_1.l_orderkey - TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] - Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice) AS price AS __value, alias=__sq_1 - Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]] - TableScan: lineitem projection=[l_orderkey, l_extendedprice]"# + Projection: orders.o_custkey + Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]] + Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < __sq_1.__value + Inner Join: orders.o_orderkey = __sq_1.l_orderkey + TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] + Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice) AS price AS __value, alias=__sq_1 + Projection: lineitem.l_orderkey + Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]] + TableScan: lineitem projection=[l_orderkey, l_extendedprice]"# .to_string(); assert_eq!(actual, expected); @@ -154,15 +156,16 @@ order by s_acctbal desc, n_name, s_name, p_partkey;"#; Filter: region.r_name = Utf8("EUROPE") TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8("EUROPE")] Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value, alias=__sq_1 - Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]] - Inner Join: nation.n_regionkey = region.r_regionkey - Inner Join: supplier.s_nationkey = nation.n_nationkey - Inner Join: partsupp.ps_suppkey = supplier.s_suppkey - TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost] - TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment] - TableScan: nation projection=[n_nationkey, n_name, n_regionkey] - Filter: region.r_name = Utf8("EUROPE") - TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8("EUROPE")]"# + Projection: partsupp.ps_partkey + Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]] + Inner Join: nation.n_regionkey = region.r_regionkey + Inner Join: supplier.s_nationkey = nation.n_nationkey + Inner Join: partsupp.ps_suppkey = supplier.s_suppkey + TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost] + TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment] + TableScan: nation projection=[n_nationkey, n_name, n_regionkey] + Filter: region.r_name = Utf8("EUROPE") + TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8("EUROPE")]"# .to_string(); assert_eq!(actual, expected); @@ -336,9 +339,10 @@ order by s_name; Filter: part.p_name LIKE Utf8("forest%") TableScan: part projection=[p_partkey, p_name], partial_filters=[part.p_name LIKE Utf8("forest%")] Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value, alias=__sq_3 - Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]] - Filter: lineitem.l_shipdate >= Date32("8766") - TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate], partial_filters=[lineitem.l_shipdate >= Date32("8766")]"# + Projection: lineitem.l_partkey, lineitem.l_suppkey + Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]] + Filter: lineitem.l_shipdate >= Date32("8766") + TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate], partial_filters=[lineitem.l_shipdate >= Date32("8766")]"# .to_string(); assert_eq!(actual, expected); @@ -383,17 +387,18 @@ order by cntrycode;"#; Projection: custsale.cntrycode, COUNT(UInt8(1)) AS numcust, SUM(custsale.c_acctbal) AS totacctbal Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[COUNT(UInt8(1)), SUM(custsale.c_acctbal)]] Projection: cntrycode, customer.c_acctbal, alias=custsale - Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal - Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __sq_1.__value - CrossJoin: - LeftAnti Join: customer.c_custkey = orders.o_custkey - Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]) - TableScan: customer projection=[c_custkey, c_phone, c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])] - TableScan: orders projection=[o_custkey] - Projection: AVG(customer.c_acctbal) AS __value, alias=__sq_1 - Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]] - Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]) - TableScan: customer projection=[c_phone, c_acctbal], partial_filters=[CAST(customer.c_acctbal AS Decimal128(30, 15)) > Decimal128(Some(0),30,15), substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]), customer.c_acctbal > Decimal128(Some(0),15,2)]"# + Projection: cntrycode, customer.c_acctbal + Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal + Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __sq_1.__value + CrossJoin: + LeftAnti Join: customer.c_custkey = orders.o_custkey + Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]) + TableScan: customer projection=[c_custkey, c_phone, c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])] + TableScan: orders projection=[o_custkey] + Projection: AVG(customer.c_acctbal) AS __value, alias=__sq_1 + Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]] + Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]) + TableScan: customer projection=[c_phone, c_acctbal], partial_filters=[CAST(customer.c_acctbal AS Decimal128(30, 15)) > Decimal128(Some(0),30,15), substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]), customer.c_acctbal > Decimal128(Some(0),15,2)]"# .to_string(); assert_eq!(expected, actual); diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs index 9367ee63b5f71..db9a680269ee0 100644 --- a/datafusion/core/tests/sql/window.rs +++ b/datafusion/core/tests/sql/window.rs @@ -336,17 +336,20 @@ async fn window_expr_eliminate() -> Result<()> { " Projection: d.b, MAX(d.a) AS max_a [b:Utf8, max_a:Int64;N]", " Aggregate: groupBy=[[d.b]], aggr=[[MAX(d.a)]] [b:Utf8, MAX(d.a):Int64;N]", " Projection: _data2.a, _data2.b, alias=d [a:Int64, b:Utf8]", - " Projection: s.a, s.b, alias=_data2 [a:Int64, b:Utf8]", - " Projection: a, b, alias=s [a:Int64, b:Utf8]", - " Union [a:Int64, b:Utf8]", - " Projection: Int64(1) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", - " Projection: Int64(3) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", - " Projection: Int64(5) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", - " Projection: Int64(7) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", + " Projection: _data2.a, _data2.b [a:Int64, b:Utf8]", + " Projection: s.a, s.b, alias=_data2 [a:Int64, b:Utf8]", + " Projection: s.a, s.b [a:Int64, b:Utf8]", + " Projection: a, b, alias=s [a:Int64, b:Utf8]", + " Projection: a, b [a:Int64, b:Utf8]", + " Union [a:Int64, b:Utf8]", + " Projection: Int64(1) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]", + " EmptyRelation []", + " Projection: Int64(3) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]", + " EmptyRelation []", + " Projection: Int64(5) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]", + " EmptyRelation []", + " Projection: Int64(7) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]", + " EmptyRelation []", ]; let formatted = plan.display_indent_schema().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); @@ -399,18 +402,21 @@ async fn window_expr_eliminate() -> Result<()> { " Projection: d.b, MAX(d.a) AS max_a, MAX(d.seq) [b:Utf8, max_a:Int64;N, MAX(d.seq):UInt64;N]", " Aggregate: groupBy=[[d.b]], aggr=[[MAX(d.a), MAX(d.seq)]] [b:Utf8, MAX(d.a):Int64;N, MAX(d.seq):UInt64;N]", " Projection: _data2.seq, _data2.a, _data2.b, alias=d [seq:UInt64;N, a:Int64, b:Utf8]", - " Projection: ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] AS seq, s.a, s.b, alias=_data2 [seq:UInt64;N, a:Int64, b:Utf8]", - " WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST]]] [ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST]:UInt64;N, a:Int64, b:Utf8]", - " Projection: a, b, alias=s [a:Int64, b:Utf8]", - " Union [a:Int64, b:Utf8]", - " Projection: Int64(1) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", - " Projection: Int64(3) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", - " Projection: Int64(5) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", - " Projection: Int64(7) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", + " Projection: _data2.seq, _data2.a, _data2.b [seq:UInt64;N, a:Int64, b:Utf8]", + " Projection: ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] AS seq, s.a, s.b, alias=_data2 [seq:UInt64;N, a:Int64, b:Utf8]", + " Projection: s.a, s.b [a:Int64, b:Utf8]", + " WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST]]] [ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST]:UInt64;N, a:Int64, b:Utf8]", + " Projection: a, b, alias=s [a:Int64, b:Utf8]", + " Projection: a, b [a:Int64, b:Utf8]", + " Union [a:Int64, b:Utf8]", + " Projection: Int64(1) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]", + " EmptyRelation []", + " Projection: Int64(3) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]", + " EmptyRelation []", + " Projection: Int64(5) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]", + " EmptyRelation []", + " Projection: Int64(7) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]", + " EmptyRelation []", ]; let formatted = plan.display_indent_schema().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 166b6c1d2d15a..18118b079a533 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -267,14 +267,31 @@ impl LogicalPlanBuilder { &self, expr: impl IntoIterator>, ) -> Result { - self.project_with_alias(expr, None) + Ok(Self::from(project( + self.plan.clone(), + expr, + )?)) + } + + /// Apply a projection with alias + pub fn subquery_alias( + &self, + alias: String, + ) -> Result { + let input_schema = (**self.plan.schema()).clone(); + let new_schema = input_schema.replace_qualifier(alias.as_str()); + Ok(Self::from(LogicalPlan::SubqueryAlias(SubqueryAlias { + input: Arc::new(self.plan.clone()), + alias, + schema: Arc::new(new_schema), + }))) } /// Apply a projection with alias pub fn project_with_alias( &self, expr: impl IntoIterator>, - alias: Option, + alias: String, ) -> Result { Ok(Self::from(project_with_alias( self.plan.clone(), @@ -343,7 +360,10 @@ impl LogicalPlanBuilder { // projected alias. missing_exprs.retain(|e| !expr.contains(e)); expr.extend(missing_exprs); - Ok(project_with_alias((*input).clone(), expr, alias)?) + match alias { + Some(alias) => Ok(project_with_alias((*input).clone(), expr, alias)?), + None => Ok(project((*input).clone(), expr)?) + } } _ => { let new_inputs = curr_plan @@ -845,14 +865,14 @@ pub(crate) fn validate_unique_names<'a>( None => { unique_names.insert(name, (position, expr)); Ok(()) - }, + } Some((existing_position, existing_expr)) => { Err(DataFusionError::Plan( format!("{} require unique expression names \ but the expression \"{:?}\" at position {} and \"{:?}\" \ at position {} have the same name. Consider aliasing (\"AS\") one of them.", - node_name, existing_expr, existing_position, expr, position, - ) + node_name, existing_expr, existing_position, expr, position, + ) )) } } @@ -906,12 +926,12 @@ pub fn union_with_alias( comparison_coercion(left_field.data_type(), right_field.data_type()) .ok_or_else(|| { DataFusionError::Plan(format!( - "UNION Column {} (type: {}) is not compatible with column {} (type: {})", - right_field.name(), - right_field.data_type(), - left_field.name(), - left_field.data_type() - )) + "UNION Column {} (type: {}) is not compatible with column {} (type: {})", + right_field.name(), + right_field.data_type(), + left_field.name(), + left_field.data_type() + )) })?; Ok(DFField::new( @@ -961,15 +981,14 @@ pub fn union_with_alias( })) } -/// Project with optional alias +/// Project without alias /// # Errors /// This function errors under any of the following conditions: /// * Two or more expressions have the same name /// * An invalid expression is used (e.g. a `sort` expression) -pub fn project_with_alias( +pub fn project( plan: LogicalPlan, expr: impl IntoIterator>, - alias: Option, ) -> Result { let input_schema = plan.schema(); let mut projected_expr = vec![]; @@ -986,20 +1005,37 @@ pub fn project_with_alias( } } validate_unique_names("Projections", projected_expr.iter())?; - let input_schema = DFSchema::new_with_metadata( + let schema = DFSchema::new_with_metadata( exprlist_to_fields(&projected_expr, &plan)?, plan.schema().metadata().clone(), )?; - let schema = match alias { - Some(ref alias) => input_schema.replace_qualifier(alias.as_str()), - None => input_schema, - }; Ok(LogicalPlan::Projection(Projection::try_new_with_schema( projected_expr, Arc::new(plan.clone()), DFSchemaRef::new(schema), - alias, + None, + )?)) +} + +/// Project with optional alias +/// # Errors +/// This function errors under any of the following conditions: +/// * Two or more expressions have the same name +/// * An invalid expression is used (e.g. a `sort` expression) +pub fn project_with_alias( + plan: LogicalPlan, + expr: impl IntoIterator>, + alias: String, +) -> Result { + let project = project(plan, expr)?; + let schema = (**project.schema()).clone().replace_qualifier(alias.as_str()); + + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + project.expressions(), + Arc::new(project), + DFSchemaRef::new(schema), + Some(alias), )?)) } @@ -1319,14 +1355,14 @@ mod tests { // project id and first_name by column index Some(vec![0, 1]), )? - // two columns with the same name => error - .project(vec![col("id"), col("first_name").alias("id")]); + // two columns with the same name => error + .project(vec![col("id"), col("first_name").alias("id")]); match plan { Err(DataFusionError::SchemaError(SchemaError::AmbiguousReference { - qualifier, - name, - })) => { + qualifier, + name, + })) => { assert_eq!("employee_csv", qualifier.unwrap().as_str()); assert_eq!("id", &name); Ok(()) @@ -1345,14 +1381,14 @@ mod tests { // project state and salary by column index Some(vec![3, 4]), )? - // two columns with the same name => error - .aggregate(vec![col("state")], vec![sum(col("salary")).alias("state")]); + // two columns with the same name => error + .aggregate(vec![col("state")], vec![sum(col("salary")).alias("state")]); match plan { Err(DataFusionError::SchemaError(SchemaError::AmbiguousReference { - qualifier, - name, - })) => { + qualifier, + name, + })) => { assert_eq!("employee_csv", qualifier.unwrap().as_str()); assert_eq!("state", &name); Ok(()) diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs index fa0367c454d76..ebac90d591f62 100644 --- a/datafusion/optimizer/src/decorrelate_where_in.rs +++ b/datafusion/optimizer/src/decorrelate_where_in.rs @@ -179,7 +179,7 @@ fn optimize_where_in( } let projection = alias_cols(&subqry_cols); let subqry_plan = subqry_plan - .project_with_alias(projection, Some(subqry_alias.clone()))? + .project_with_alias(projection, subqry_alias.clone())? .build()?; debug!("subquery plan:\n{}", subqry_plan.display_indent()); diff --git a/datafusion/optimizer/src/filter_push_down.rs b/datafusion/optimizer/src/filter_push_down.rs index eed833cf2aaf4..e5796c27fb486 100644 --- a/datafusion/optimizer/src/filter_push_down.rs +++ b/datafusion/optimizer/src/filter_push_down.rs @@ -1208,7 +1208,7 @@ mod tests { fn union_all_on_projection() -> Result<()> { let table_scan = test_table_scan()?; let table = LogicalPlanBuilder::from(table_scan) - .project_with_alias(vec![col("a").alias("b")], Some("test2".to_string()))?; + .project_with_alias(vec![col("a").alias("b")], "test2".to_string())?; let plan = table .union(table.build()?)? @@ -2316,8 +2316,8 @@ mod tests { fn test_propagation_of_optimized_inner_filters_with_projections() -> Result<()> { // SELECT a FROM (SELECT 1 AS a) b WHERE b.a = 1 let plan = LogicalPlanBuilder::empty(true) - .project_with_alias(vec![lit(0i64).alias("a")], Some("b".to_owned()))? - .project_with_alias(vec![col("b.a")], Some("b".to_owned()))? + .project_with_alias(vec![lit(0i64).alias("a")], "b".to_owned())? + .project_with_alias(vec![col("b.a")], "b".to_owned())? .filter(col("b.a").eq(lit(1i64)))? .project(vec![col("b.a")])? .build()?; diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs index e790ce0b723a2..617c43efc9cdd 100644 --- a/datafusion/optimizer/src/inline_table_scan.rs +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -56,7 +56,7 @@ impl OptimizerRule for InlineTableScan { utils::optimize_children(self, sub_plan, _optimizer_config)?; let plan = LogicalPlanBuilder::from(plan).project_with_alias( vec![Expr::Wildcard], - Some(table_name.to_string()), + table_name.to_string(), )?; plan.build() } else { diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index 5244bbcdbd798..d59a2f3318b7f 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -50,6 +50,7 @@ impl OptimizerRule for ProjectionPushDown { plan: &LogicalPlan, optimizer_config: &mut OptimizerConfig, ) -> Result { + let str = format!("{:?}", plan); // set of all columns referred by the plan (and thus considered required by the root) let required_columns = plan .schema() diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 0e53ecfae6a31..64d633525c3dd 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -271,7 +271,7 @@ fn optimize_scalar( .collect(); let subqry_plan = subqry_plan .aggregate(group_by, aggr.aggr_expr.clone())? - .project_with_alias(proj, Some(subqry_alias.clone()))? + .project_with_alias(proj, subqry_alias.clone())? .build()?; // qualify the join columns for outside the subquery diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs b/datafusion/optimizer/src/subquery_filter_to_join.rs index c1717da0dce75..14f6f4ad273de 100644 --- a/datafusion/optimizer/src/subquery_filter_to_join.rs +++ b/datafusion/optimizer/src/subquery_filter_to_join.rs @@ -400,7 +400,7 @@ mod tests { let table_scan = test_table_scan()?; let plan = LogicalPlanBuilder::from(table_scan) .filter(in_subquery(col("c"), test_subquery_with_name("sq_inner")?))? - .project_with_alias(vec![col("b"), col("c")], Some("wrapped".to_string()))? + .project_with_alias(vec![col("b"), col("c")], "wrapped".to_string())? .filter(or( binary_expr(col("b"), Operator::Lt, lit(30_u32)), in_subquery(col("c"), test_subquery_with_name("sq_outer")?), diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index 9ab754c921363..1f66f66404ddd 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -345,16 +345,20 @@ impl AsLogicalPlan for LogicalPlanNode { .iter() .map(|expr| parse_expr(expr, ctx)) .collect::, _>>()?; - LogicalPlanBuilder::from(input) - .project_with_alias( - x, - projection.optional_alias.as_ref().map(|a| match a { - protobuf::projection_node::OptionalAlias::Alias(alias) => { - alias.clone() - } - }), - )? - .build() + + let alias_opt = projection.optional_alias.as_ref().map(|a| match a { + protobuf::projection_node::OptionalAlias::Alias(alias) => { + alias.clone() + }}); + + match alias_opt { + Some(alias) => LogicalPlanBuilder::from(input) + .project_with_alias(x, alias)? + .build(), + None => LogicalPlanBuilder::from(input) + .project(x)? + .build(), + } } LogicalPlanType::Selection(selection) => { let input: LogicalPlan = diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 58d221e07bf06..bf4e4c80c72b7 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -51,7 +51,7 @@ use datafusion_common::{ field_not_found, Column, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, }; use datafusion_expr::expr::{Between, BinaryExpr, Case, Cast, GroupingSet, Like}; -use datafusion_expr::logical_plan::builder::project_with_alias; +use datafusion_expr::logical_plan::builder::{project, project_with_alias}; use datafusion_expr::logical_plan::{Filter, Subquery}; use datafusion_expr::Expr::Alias; @@ -772,7 +772,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { Some(cte_alias) => project_with_alias( cte_plan.clone(), vec![Expr::Wildcard], - Some(cte_alias), + cte_alias, ), _ => Ok(cte_plan.clone()), }, @@ -800,18 +800,32 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ctes, outer_query_schema, )?; - ( - project_with_alias( - logical_plan.clone(), - logical_plan - .schema() - .fields() - .iter() - .map(|field| col(field.name())), - normalized_alias, - )?, - alias, - ) + match normalized_alias { + Some(normalized_alias) => ( + project_with_alias( + logical_plan.clone(), + logical_plan + .schema() + .fields() + .iter() + .map(|field| col(field.name())), + normalized_alias, + )?, + alias, + ), + None => ( + project( + logical_plan.clone(), + logical_plan + .schema() + .fields() + .iter() + .map(|field| col(field.name())), + )?, + alias + ) + } + } TableFactor::NestedJoin { table_with_joins, @@ -857,7 +871,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { plan.schema().fields().iter().zip(columns_alias.iter()).map( |(field, ident)| col(field.name()).alias(normalize_ident(ident)), ), - Some(normalize_ident(&alias.name)), + normalize_ident(&alias.name), )? .build()?) } @@ -1199,7 +1213,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }; // final projection - let plan = project_with_alias(plan, select_exprs_post_aggr, alias)?; + let plan = match alias { + Some(alias) => project_with_alias(plan, select_exprs_post_aggr, alias)?, + None => project(plan, select_exprs_post_aggr)? + }; // process distinct clause let plan = if select.distinct {