From d5d19a1f2b81549932d15e311590930c4f7b07c9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 5 Apr 2022 18:18:27 -0600 Subject: [PATCH 1/8] Add LogicalPlan::AliasedRelation --- ballista/rust/core/proto/ballista.proto | 6 +++++ .../rust/core/src/serde/logical_plan/mod.rs | 26 +++++++++++++++++-- datafusion/core/src/logical_plan/builder.rs | 12 +++++++-- datafusion/core/src/logical_plan/plan.rs | 21 +++++++++++++++ .../src/optimizer/common_subexpr_eliminate.rs | 1 + .../src/optimizer/projection_push_down.rs | 1 + datafusion/core/src/optimizer/utils.rs | 9 ++++++- datafusion/core/src/physical_plan/planner.rs | 7 ++++- 8 files changed, 77 insertions(+), 6 deletions(-) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 228c8bee23067..4bc9360640912 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -51,6 +51,7 @@ message LogicalPlanNode { LogicalExtensionNode extension = 17; CreateCatalogSchemaNode create_catalog_schema = 18; UnionNode union = 19; + AliasedRelationNode aliased_relation = 20; } } @@ -233,6 +234,11 @@ message SelectionExecNode { datafusion.LogicalExprNode expr = 1; } +message AliasedRelationNode { + LogicalPlanNode input = 1; + string alias = 2; +} + /////////////////////////////////////////////////////////////////////////////////////////////////// // Ballista Physical Plan /////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index 490b2ebecf814..1c048c7354100 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -28,9 +28,8 @@ use datafusion::datasource::file_format::csv::CsvFormat; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig}; - use datafusion::logical_plan::plan::{ - Aggregate, EmptyRelation, Filter, Join, Projection, Sort, Window, + Aggregate, AliasedRelation, EmptyRelation, Filter, Join, Projection, Sort, Window, }; use datafusion::logical_plan::{ Column, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr, JoinConstraint, @@ -360,6 +359,14 @@ impl AsLogicalPlan for LogicalPlanNode { .build() .map_err(|e| e.into()) } + LogicalPlanType::AliasedRelation(aliased_relation) => { + let input: LogicalPlan = + into_logical_plan!(aliased_relation.input, ctx, extension_codec)?; + LogicalPlanBuilder::from(input) + .alias(&aliased_relation.alias)? + .build() + .map_err(|e| e.into()) + } LogicalPlanType::Limit(limit) => { let input: LogicalPlan = into_logical_plan!(limit.input, ctx, extension_codec)?; @@ -683,6 +690,21 @@ impl AsLogicalPlan for LogicalPlanNode { ))), }) } + LogicalPlan::AliasedRelation(AliasedRelation { input, alias }) => { + let input: protobuf::LogicalPlanNode = + protobuf::LogicalPlanNode::try_from_logical_plan( + input.as_ref(), + extension_codec, + )?; + Ok(protobuf::LogicalPlanNode { + logical_plan_type: Some(LogicalPlanType::AliasedRelation(Box::new( + protobuf::AliasedRelationNode { + input: Some(Box::new(input)), + alias: alias.clone(), + }, + ))), + }) + } LogicalPlan::Limit(Limit { input, n }) => { let input: protobuf::LogicalPlanNode = protobuf::LogicalPlanNode::try_from_logical_plan( diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index d2f5c04182fb5..5f593c2acbe54 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -25,8 +25,8 @@ use crate::datasource::{ use crate::error::{DataFusionError, Result}; use crate::logical_plan::expr_schema::ExprSchemable; use crate::logical_plan::plan::{ - Aggregate, Analyze, EmptyRelation, Explain, Filter, Join, Projection, Sort, - TableScan, ToStringifiedPlan, Union, Window, + Aggregate, AliasedRelation, Analyze, EmptyRelation, Explain, Filter, Join, + Projection, Sort, TableScan, ToStringifiedPlan, Union, Window, }; use crate::optimizer::utils; use crate::prelude::*; @@ -518,6 +518,14 @@ impl LogicalPlanBuilder { }))) } + /// Apply an alias + pub fn alias(&self, alias: &str) -> Result { + Ok(Self::from(LogicalPlan::AliasedRelation(AliasedRelation { + input: Arc::new(self.plan.clone()), + alias: alias.to_string(), + }))) + } + /// Add missing sort columns to all downstream projection fn add_missing_columns( &self, diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs index 327f4d764aa47..cfb15c279ec7d 100644 --- a/datafusion/core/src/logical_plan/plan.rs +++ b/datafusion/core/src/logical_plan/plan.rs @@ -87,6 +87,15 @@ pub struct Projection { pub alias: Option, } +/// Aliased relation +#[derive(Clone)] +pub struct AliasedRelation { + /// The incoming logical plan + pub input: Arc, + /// The alias for the input relation + pub alias: String, +} + /// Filters rows from its input that do not match an /// expression (essentially a WHERE clause with a predicate /// expression). @@ -359,6 +368,8 @@ pub enum LogicalPlan { TableScan(TableScan), /// Produces no rows: An empty relation with an empty schema EmptyRelation(EmptyRelation), + /// Aliased relation provides, or changes, the name of a relation. + AliasedRelation(AliasedRelation), /// Produces the first `n` tuples from its input and discards the rest. Limit(Limit), /// Creates an external table. @@ -401,6 +412,7 @@ impl LogicalPlan { LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema, LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(), LogicalPlan::Limit(Limit { input, .. }) => input.schema(), + LogicalPlan::AliasedRelation(AliasedRelation { input, .. }) => input.schema(), LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => { schema } @@ -463,6 +475,7 @@ impl LogicalPlan { | LogicalPlan::Repartition(Repartition { input, .. }) | LogicalPlan::Sort(Sort { input, .. }) | LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) + | LogicalPlan::AliasedRelation(AliasedRelation { input, .. }) | LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(), LogicalPlan::DropTable(_) => vec![], } @@ -508,6 +521,7 @@ impl LogicalPlan { // plans without expressions LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation(_) + | LogicalPlan::AliasedRelation(_) | LogicalPlan::Limit(_) | LogicalPlan::CreateExternalTable(_) | LogicalPlan::CreateMemoryTable(_) @@ -535,6 +549,7 @@ impl LogicalPlan { LogicalPlan::Join(Join { left, right, .. }) => vec![left, right], LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![left, right], LogicalPlan::Limit(Limit { input, .. }) => vec![input], + LogicalPlan::AliasedRelation(AliasedRelation { input, .. }) => vec![input], LogicalPlan::Extension(extension) => extension.node.inputs(), LogicalPlan::Union(Union { inputs, .. }) => inputs.iter().collect(), LogicalPlan::Explain(explain) => vec![&explain.plan], @@ -682,6 +697,9 @@ impl LogicalPlan { true } LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?, + LogicalPlan::AliasedRelation(AliasedRelation { input, .. }) => { + input.accept(visitor)? + } LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => { input.accept(visitor)? } @@ -1052,6 +1070,9 @@ impl LogicalPlan { } }, LogicalPlan::Limit(Limit { ref n, .. }) => write!(f, "Limit: {}", n), + LogicalPlan::AliasedRelation(AliasedRelation { + ref alias, .. + }) => write!(f, "AliasedRelation: {}", alias), LogicalPlan::CreateExternalTable(CreateExternalTable { ref name, .. diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs index b2884ea969d53..b934668b0374e 100644 --- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs @@ -216,6 +216,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result { + Ok(LogicalPlan::AliasedRelation(AliasedRelation { + alias: alias.clone(), + input: Arc::new(inputs[0].clone()), + })) + } LogicalPlan::Limit(Limit { n, .. }) => Ok(LogicalPlan::Limit(Limit { n: *n, input: Arc::new(inputs[0].clone()), diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 4dbaca203751e..ad70f10ebb587 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -24,7 +24,8 @@ use super::{ }; use crate::execution::context::{ExecutionProps, SessionState}; use crate::logical_plan::plan::{ - Aggregate, EmptyRelation, Filter, Join, Projection, Sort, TableScan, Window, + Aggregate, AliasedRelation, EmptyRelation, Filter, Join, Projection, Sort, TableScan, + Window, }; use crate::logical_plan::{ unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, Operator, @@ -785,6 +786,10 @@ impl DefaultPhysicalPlanner { *produce_one_row, SchemaRef::new(schema.as_ref().to_owned().into()), ))), + LogicalPlan::AliasedRelation(AliasedRelation { input, .. }) => { + // there is no physical plan for an aliased relation + self.create_initial_plan(input, session_state).await + } LogicalPlan::Limit(Limit { input, n, .. }) => { let limit = *n; let input = self.create_initial_plan(input, session_state).await?; From 0ca7b690c07620c60fc673bc73971703f4d5b485 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 5 Apr 2022 18:34:51 -0600 Subject: [PATCH 2/8] unit test --- datafusion/core/src/sql/planner.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs index 770d168727e9f..fcbfe4229e6dd 100644 --- a/datafusion/core/src/sql/planner.rs +++ b/datafusion/core/src/sql/planner.rs @@ -4055,6 +4055,18 @@ mod tests { quick_test(sql, expected); } + #[test] + fn join_with_aliases() { + let sql = "select peeps.id, folks.first_name from person as peeps join person as folks on peeps.id = folks.id"; + let expected = "Projection: #peeps.id, #folks.first_name\ + \n InnerJoin: #peeps.id = #folks.id\\ + \n AliasedRelation: peeps\ + \n TableScan: person projection=None + \n AliasedRelation: folks\ + \n TableScan: person projection=None"; + quick_test(sql, expected); + } + #[test] fn cte_use_same_name_multiple_times() { let sql = "with a as (select * from person), a as (select * from orders) select * from a;"; From 332444b885b3c6da2ba049be0dccf03bd55014e7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 6 Apr 2022 08:51:10 -0600 Subject: [PATCH 3/8] unit test passes --- .../rust/core/src/serde/logical_plan/mod.rs | 2 +- datafusion/core/src/logical_plan/builder.rs | 4 +++ datafusion/core/src/logical_plan/plan.rs | 4 ++- datafusion/core/src/optimizer/utils.rs | 3 +- datafusion/core/src/sql/planner.rs | 29 +++++++++---------- 5 files changed, 24 insertions(+), 18 deletions(-) diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index 1c048c7354100..473bf05573fb1 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -690,7 +690,7 @@ impl AsLogicalPlan for LogicalPlanNode { ))), }) } - LogicalPlan::AliasedRelation(AliasedRelation { input, alias }) => { + LogicalPlan::AliasedRelation(AliasedRelation { input, alias, .. }) => { let input: protobuf::LogicalPlanNode = protobuf::LogicalPlanNode::try_from_logical_plan( input.as_ref(), diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index 5f593c2acbe54..c88537508d80e 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -520,9 +520,13 @@ impl LogicalPlanBuilder { /// Apply an alias pub fn alias(&self, alias: &str) -> Result { + let schema: Schema = self.schema().as_ref().clone().into(); + let schema = + DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?); Ok(Self::from(LogicalPlan::AliasedRelation(AliasedRelation { input: Arc::new(self.plan.clone()), alias: alias.to_string(), + schema, }))) } diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs index cfb15c279ec7d..2c05eb2d49975 100644 --- a/datafusion/core/src/logical_plan/plan.rs +++ b/datafusion/core/src/logical_plan/plan.rs @@ -94,6 +94,8 @@ pub struct AliasedRelation { pub input: Arc, /// The alias for the input relation pub alias: String, + /// The schema with qualified field names + pub schema: DFSchemaRef, } /// Filters rows from its input that do not match an @@ -412,7 +414,7 @@ impl LogicalPlan { LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema, LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(), LogicalPlan::Limit(Limit { input, .. }) => input.schema(), - LogicalPlan::AliasedRelation(AliasedRelation { input, .. }) => input.schema(), + LogicalPlan::AliasedRelation(AliasedRelation { schema, .. }) => schema, LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => { schema } diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs index ce5065c229d63..a26afe47ab3c2 100644 --- a/datafusion/core/src/optimizer/utils.rs +++ b/datafusion/core/src/optimizer/utils.rs @@ -223,10 +223,11 @@ pub fn from_plan( let right = &inputs[1]; LogicalPlanBuilder::from(left).cross_join(right)?.build() } - LogicalPlan::AliasedRelation(AliasedRelation { alias, .. }) => { + LogicalPlan::AliasedRelation(AliasedRelation { alias, schema, .. }) => { Ok(LogicalPlan::AliasedRelation(AliasedRelation { alias: alias.clone(), input: Arc::new(inputs[0].clone()), + schema: schema.clone(), })) } LogicalPlan::Limit(Limit { n, .. }) => Ok(LogicalPlan::Limit(Limit { diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs index fcbfe4229e6dd..ba74d40827968 100644 --- a/datafusion/core/src/sql/planner.rs +++ b/datafusion/core/src/sql/planner.rs @@ -633,16 +633,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { self.schema_provider.get_table_provider(name.try_into()?), ) { (Some(cte_plan), _) => Ok(cte_plan.clone()), - (_, Some(provider)) => LogicalPlanBuilder::scan( - // take alias into account to support `JOIN table1 as table2` - alias - .as_ref() - .map(|a| a.name.value.as_str()) - .unwrap_or(&table_name), - provider, - None, - )? - .build(), + (_, Some(provider)) => { + let scan = + LogicalPlanBuilder::scan(&table_name, provider, None); + let scan = match alias { + Some(ref name) => scan?.alias(name.name.value.as_str()), + _ => scan, + }; + scan?.build() + } (None, None) => Err(DataFusionError::Plan(format!( "Table or CTE with name '{}' not found", name @@ -4059,11 +4058,11 @@ mod tests { fn join_with_aliases() { let sql = "select peeps.id, folks.first_name from person as peeps join person as folks on peeps.id = folks.id"; let expected = "Projection: #peeps.id, #folks.first_name\ - \n InnerJoin: #peeps.id = #folks.id\\ - \n AliasedRelation: peeps\ - \n TableScan: person projection=None - \n AliasedRelation: folks\ - \n TableScan: person projection=None"; + \n Inner Join: #peeps.id = #folks.id\ + \n AliasedRelation: peeps\ + \n TableScan: person projection=None\ + \n AliasedRelation: folks\ + \n TableScan: person projection=None"; quick_test(sql, expected); } From fccd8d0be9dd800478dbb9f608c4a621f46aa70c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 6 Apr 2022 09:25:26 -0600 Subject: [PATCH 4/8] fix some regressions --- .../src/optimizer/projection_push_down.rs | 31 +++++++++++++++++-- datafusion/core/src/physical_plan/planner.rs | 12 +++++-- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs index 6a670db8ddf5a..4c8d35343f28e 100644 --- a/datafusion/core/src/optimizer/projection_push_down.rs +++ b/datafusion/core/src/optimizer/projection_push_down.rs @@ -21,7 +21,7 @@ use crate::error::{DataFusionError, Result}; use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::{ - Aggregate, Analyze, Join, Projection, TableScan, Window, + Aggregate, AliasedRelation, Analyze, Join, Projection, TableScan, Window, }; use crate::logical_plan::{ build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan, @@ -432,11 +432,38 @@ fn optimize_plan( alias: alias.clone(), })) } + LogicalPlan::AliasedRelation(AliasedRelation { input, alias, .. }) => { + match input.as_ref() { + LogicalPlan::TableScan(TableScan { table_name, .. }) => { + let new_required_columns = new_required_columns + .iter() + .map(|c| match &c.relation { + Some(q) if q == alias => Column { + relation: Some(table_name.clone()), + name: c.name.clone(), + }, + _ => c.clone(), + }) + .collect(); + let new_inputs = vec![optimize_plan( + optimizer, + input, + &new_required_columns, + has_projection, + execution_props, + )?]; + let expr = vec![]; + utils::from_plan(plan, &expr, &new_inputs) + } + _ => Err(DataFusionError::Plan( + "AliasedRelation should only wrap TableScan".to_string(), + )), + } + } // all other nodes: Add any additional columns used by // expressions in this node to the list of required columns LogicalPlan::Limit(_) | LogicalPlan::Filter { .. } - | LogicalPlan::AliasedRelation { .. } | LogicalPlan::Repartition(_) | LogicalPlan::EmptyRelation(_) | LogicalPlan::Values(_) diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index ad70f10ebb587..1984d7afd8b6a 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -786,9 +786,15 @@ impl DefaultPhysicalPlanner { *produce_one_row, SchemaRef::new(schema.as_ref().to_owned().into()), ))), - LogicalPlan::AliasedRelation(AliasedRelation { input, .. }) => { - // there is no physical plan for an aliased relation - self.create_initial_plan(input, session_state).await + LogicalPlan::AliasedRelation(AliasedRelation { input, alias, .. }) => { + match input.as_ref() { + LogicalPlan::TableScan(scan) => { + let mut scan = scan.clone(); + scan.table_name = alias.clone(); + self.create_initial_plan(input, session_state).await + } + _ => Err(DataFusionError::Plan("AliasedRelation should only wrap TableScan".to_string())) + } } LogicalPlan::Limit(Limit { input, n, .. }) => { let limit = *n; From 8c60e6ae1b9ec74254ea579603921ac84b7a0725 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 10 Apr 2022 14:05:34 -0600 Subject: [PATCH 5/8] fix regressions --- datafusion/core/src/logical_plan/plan.rs | 4 +++- datafusion/core/src/optimizer/utils.rs | 8 ++++++-- datafusion/core/src/sql/planner.rs | 9 ++++++--- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs index 2c05eb2d49975..a4b461925a97a 100644 --- a/datafusion/core/src/logical_plan/plan.rs +++ b/datafusion/core/src/logical_plan/plan.rs @@ -462,6 +462,9 @@ impl LogicalPlan { schemas.insert(0, schema); schemas } + LogicalPlan::AliasedRelation(AliasedRelation { schema, .. }) => { + vec![schema] + } LogicalPlan::Union(Union { schema, .. }) => { vec![schema] } @@ -477,7 +480,6 @@ impl LogicalPlan { | LogicalPlan::Repartition(Repartition { input, .. }) | LogicalPlan::Sort(Sort { input, .. }) | LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) - | LogicalPlan::AliasedRelation(AliasedRelation { input, .. }) | LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(), LogicalPlan::DropTable(_) => vec![], } diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs index a26afe47ab3c2..16ca1d0506d42 100644 --- a/datafusion/core/src/optimizer/utils.rs +++ b/datafusion/core/src/optimizer/utils.rs @@ -35,6 +35,7 @@ use crate::{ error::{DataFusionError, Result}, logical_plan::ExpressionVisitor, }; +use datafusion_common::DFSchema; use std::{collections::HashSet, sync::Arc}; const CASE_EXPR_MARKER: &str = "__DATAFUSION_CASE_EXPR__"; @@ -223,11 +224,14 @@ pub fn from_plan( let right = &inputs[1]; LogicalPlanBuilder::from(left).cross_join(right)?.build() } - LogicalPlan::AliasedRelation(AliasedRelation { alias, schema, .. }) => { + LogicalPlan::AliasedRelation(AliasedRelation { alias, .. }) => { + let schema = inputs[0].schema().as_ref().clone().into(); + let schema = + DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?); Ok(LogicalPlan::AliasedRelation(AliasedRelation { alias: alias.clone(), input: Arc::new(inputs[0].clone()), - schema: schema.clone(), + schema, })) } LogicalPlan::Limit(Limit { n, .. }) => Ok(LogicalPlan::Limit(Limit { diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs index ba74d40827968..588ef61b43d91 100644 --- a/datafusion/core/src/sql/planner.rs +++ b/datafusion/core/src/sql/planner.rs @@ -2479,7 +2479,8 @@ mod tests { FROM lineitem l (a, b, c)"; let expected = "Projection: #l.a, #l.b, #l.c\ \n Projection: #l.l_item_id AS a, #l.l_description AS b, #l.price AS c, alias=l\ - \n TableScan: l projection=None"; + \n AliasedRelation: l\ + \n TableScan: lineitem projection=None"; quick_test(sql, expected); } @@ -3445,7 +3446,8 @@ mod tests { let expected = "Projection: #person.first_name, #person.id\ \n Inner Join: Using #person.id = #person2.id\ \n TableScan: person projection=None\ - \n TableScan: person2 projection=None"; + \n AliasedRelation: person2\ + \n TableScan: person projection=None"; quick_test(sql, expected); } @@ -3458,7 +3460,8 @@ mod tests { let expected = "Projection: #lineitem.l_item_id, #lineitem.l_description, #lineitem.price, #lineitem2.l_description, #lineitem2.price\ \n Inner Join: Using #lineitem.l_item_id = #lineitem2.l_item_id\ \n TableScan: lineitem projection=None\ - \n TableScan: lineitem2 projection=None"; + \n AliasedRelation: lineitem2\ + \n TableScan: lineitem projection=None"; quick_test(sql, expected); } From 8d2d9647295acad1d900b4ee2b78fa65cf63ab1b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 10 Apr 2022 14:11:31 -0600 Subject: [PATCH 6/8] rename AliasedRelation to SubqueryAlias --- ballista/rust/core/proto/ballista.proto | 4 ++-- .../rust/core/src/serde/logical_plan/mod.rs | 10 +++++----- datafusion/core/src/logical_plan/builder.rs | 4 ++-- datafusion/core/src/logical_plan/plan.rs | 20 +++++++++---------- .../src/optimizer/common_subexpr_eliminate.rs | 2 +- .../src/optimizer/projection_push_down.rs | 6 +++--- datafusion/core/src/optimizer/utils.rs | 6 +++--- datafusion/core/src/physical_plan/planner.rs | 6 +++--- datafusion/core/src/sql/planner.rs | 10 +++++----- 9 files changed, 34 insertions(+), 34 deletions(-) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 4bc9360640912..9434da75e4c34 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -51,7 +51,7 @@ message LogicalPlanNode { LogicalExtensionNode extension = 17; CreateCatalogSchemaNode create_catalog_schema = 18; UnionNode union = 19; - AliasedRelationNode aliased_relation = 20; + SubqueryAliasNode subquery_alias = 20; } } @@ -234,7 +234,7 @@ message SelectionExecNode { datafusion.LogicalExprNode expr = 1; } -message AliasedRelationNode { +message SubqueryAliasNode { LogicalPlanNode input = 1; string alias = 2; } diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index 473bf05573fb1..88ab38e296c57 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -29,7 +29,7 @@ use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig}; use datafusion::logical_plan::plan::{ - Aggregate, AliasedRelation, EmptyRelation, Filter, Join, Projection, Sort, Window, + Aggregate, SubqueryAlias, EmptyRelation, Filter, Join, Projection, Sort, Window, }; use datafusion::logical_plan::{ Column, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr, JoinConstraint, @@ -359,7 +359,7 @@ impl AsLogicalPlan for LogicalPlanNode { .build() .map_err(|e| e.into()) } - LogicalPlanType::AliasedRelation(aliased_relation) => { + LogicalPlanType::SubqueryAlias(aliased_relation) => { let input: LogicalPlan = into_logical_plan!(aliased_relation.input, ctx, extension_codec)?; LogicalPlanBuilder::from(input) @@ -690,15 +690,15 @@ impl AsLogicalPlan for LogicalPlanNode { ))), }) } - LogicalPlan::AliasedRelation(AliasedRelation { input, alias, .. }) => { + LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { let input: protobuf::LogicalPlanNode = protobuf::LogicalPlanNode::try_from_logical_plan( input.as_ref(), extension_codec, )?; Ok(protobuf::LogicalPlanNode { - logical_plan_type: Some(LogicalPlanType::AliasedRelation(Box::new( - protobuf::AliasedRelationNode { + logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new( + protobuf::SubqueryAliasNode { input: Some(Box::new(input)), alias: alias.clone(), }, diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index c88537508d80e..50345db8f0741 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -25,7 +25,7 @@ use crate::datasource::{ use crate::error::{DataFusionError, Result}; use crate::logical_plan::expr_schema::ExprSchemable; use crate::logical_plan::plan::{ - Aggregate, AliasedRelation, Analyze, EmptyRelation, Explain, Filter, Join, + Aggregate, SubqueryAlias, Analyze, EmptyRelation, Explain, Filter, Join, Projection, Sort, TableScan, ToStringifiedPlan, Union, Window, }; use crate::optimizer::utils; @@ -523,7 +523,7 @@ impl LogicalPlanBuilder { let schema: Schema = self.schema().as_ref().clone().into(); let schema = DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?); - Ok(Self::from(LogicalPlan::AliasedRelation(AliasedRelation { + Ok(Self::from(LogicalPlan::SubqueryAlias(SubqueryAlias { input: Arc::new(self.plan.clone()), alias: alias.to_string(), schema, diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs index a4b461925a97a..a1fcd09c53825 100644 --- a/datafusion/core/src/logical_plan/plan.rs +++ b/datafusion/core/src/logical_plan/plan.rs @@ -87,9 +87,9 @@ pub struct Projection { pub alias: Option, } -/// Aliased relation +/// Aliased subquery #[derive(Clone)] -pub struct AliasedRelation { +pub struct SubqueryAlias { /// The incoming logical plan pub input: Arc, /// The alias for the input relation @@ -371,7 +371,7 @@ pub enum LogicalPlan { /// Produces no rows: An empty relation with an empty schema EmptyRelation(EmptyRelation), /// Aliased relation provides, or changes, the name of a relation. - AliasedRelation(AliasedRelation), + SubqueryAlias(SubqueryAlias), /// Produces the first `n` tuples from its input and discards the rest. Limit(Limit), /// Creates an external table. @@ -414,7 +414,7 @@ impl LogicalPlan { LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema, LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(), LogicalPlan::Limit(Limit { input, .. }) => input.schema(), - LogicalPlan::AliasedRelation(AliasedRelation { schema, .. }) => schema, + LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema, LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => { schema } @@ -462,7 +462,7 @@ impl LogicalPlan { schemas.insert(0, schema); schemas } - LogicalPlan::AliasedRelation(AliasedRelation { schema, .. }) => { + LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => { vec![schema] } LogicalPlan::Union(Union { schema, .. }) => { @@ -525,7 +525,7 @@ impl LogicalPlan { // plans without expressions LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation(_) - | LogicalPlan::AliasedRelation(_) + | LogicalPlan::SubqueryAlias(_) | LogicalPlan::Limit(_) | LogicalPlan::CreateExternalTable(_) | LogicalPlan::CreateMemoryTable(_) @@ -553,7 +553,7 @@ impl LogicalPlan { LogicalPlan::Join(Join { left, right, .. }) => vec![left, right], LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![left, right], LogicalPlan::Limit(Limit { input, .. }) => vec![input], - LogicalPlan::AliasedRelation(AliasedRelation { input, .. }) => vec![input], + LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input], LogicalPlan::Extension(extension) => extension.node.inputs(), LogicalPlan::Union(Union { inputs, .. }) => inputs.iter().collect(), LogicalPlan::Explain(explain) => vec![&explain.plan], @@ -701,7 +701,7 @@ impl LogicalPlan { true } LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?, - LogicalPlan::AliasedRelation(AliasedRelation { input, .. }) => { + LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => { input.accept(visitor)? } LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => { @@ -1074,9 +1074,9 @@ impl LogicalPlan { } }, LogicalPlan::Limit(Limit { ref n, .. }) => write!(f, "Limit: {}", n), - LogicalPlan::AliasedRelation(AliasedRelation { + LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. - }) => write!(f, "AliasedRelation: {}", alias), + }) => write!(f, "SubqueryAlias: {}", alias), LogicalPlan::CreateExternalTable(CreateExternalTable { ref name, .. diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs index b934668b0374e..bbed1cc74be1d 100644 --- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs @@ -216,7 +216,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result { + LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { match input.as_ref() { LogicalPlan::TableScan(TableScan { table_name, .. }) => { let new_required_columns = new_required_columns @@ -456,7 +456,7 @@ fn optimize_plan( utils::from_plan(plan, &expr, &new_inputs) } _ => Err(DataFusionError::Plan( - "AliasedRelation should only wrap TableScan".to_string(), + "SubqueryAlias should only wrap TableScan".to_string(), )), } } diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs index 16ca1d0506d42..bcfe9a6895164 100644 --- a/datafusion/core/src/optimizer/utils.rs +++ b/datafusion/core/src/optimizer/utils.rs @@ -20,7 +20,7 @@ use super::optimizer::OptimizerRule; use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::{ - Aggregate, AliasedRelation, Analyze, Extension, Filter, Join, Projection, Sort, + Aggregate, SubqueryAlias, Analyze, Extension, Filter, Join, Projection, Sort, Window, }; @@ -224,11 +224,11 @@ pub fn from_plan( let right = &inputs[1]; LogicalPlanBuilder::from(left).cross_join(right)?.build() } - LogicalPlan::AliasedRelation(AliasedRelation { alias, .. }) => { + LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => { let schema = inputs[0].schema().as_ref().clone().into(); let schema = DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?); - Ok(LogicalPlan::AliasedRelation(AliasedRelation { + Ok(LogicalPlan::SubqueryAlias(SubqueryAlias { alias: alias.clone(), input: Arc::new(inputs[0].clone()), schema, diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 1984d7afd8b6a..e7eada9cfcd30 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -24,7 +24,7 @@ use super::{ }; use crate::execution::context::{ExecutionProps, SessionState}; use crate::logical_plan::plan::{ - Aggregate, AliasedRelation, EmptyRelation, Filter, Join, Projection, Sort, TableScan, + Aggregate, SubqueryAlias, EmptyRelation, Filter, Join, Projection, Sort, TableScan, Window, }; use crate::logical_plan::{ @@ -786,14 +786,14 @@ impl DefaultPhysicalPlanner { *produce_one_row, SchemaRef::new(schema.as_ref().to_owned().into()), ))), - LogicalPlan::AliasedRelation(AliasedRelation { input, alias, .. }) => { + LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { match input.as_ref() { LogicalPlan::TableScan(scan) => { let mut scan = scan.clone(); scan.table_name = alias.clone(); self.create_initial_plan(input, session_state).await } - _ => Err(DataFusionError::Plan("AliasedRelation should only wrap TableScan".to_string())) + _ => Err(DataFusionError::Plan("SubqueryAlias should only wrap TableScan".to_string())) } } LogicalPlan::Limit(Limit { input, n, .. }) => { diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs index 588ef61b43d91..4525514c369da 100644 --- a/datafusion/core/src/sql/planner.rs +++ b/datafusion/core/src/sql/planner.rs @@ -2479,7 +2479,7 @@ mod tests { FROM lineitem l (a, b, c)"; let expected = "Projection: #l.a, #l.b, #l.c\ \n Projection: #l.l_item_id AS a, #l.l_description AS b, #l.price AS c, alias=l\ - \n AliasedRelation: l\ + \n SubqueryAlias: l\ \n TableScan: lineitem projection=None"; quick_test(sql, expected); } @@ -3446,7 +3446,7 @@ mod tests { let expected = "Projection: #person.first_name, #person.id\ \n Inner Join: Using #person.id = #person2.id\ \n TableScan: person projection=None\ - \n AliasedRelation: person2\ + \n SubqueryAlias: person2\ \n TableScan: person projection=None"; quick_test(sql, expected); } @@ -3460,7 +3460,7 @@ mod tests { let expected = "Projection: #lineitem.l_item_id, #lineitem.l_description, #lineitem.price, #lineitem2.l_description, #lineitem2.price\ \n Inner Join: Using #lineitem.l_item_id = #lineitem2.l_item_id\ \n TableScan: lineitem projection=None\ - \n AliasedRelation: lineitem2\ + \n SubqueryAlias: lineitem2\ \n TableScan: lineitem projection=None"; quick_test(sql, expected); } @@ -4062,9 +4062,9 @@ mod tests { let sql = "select peeps.id, folks.first_name from person as peeps join person as folks on peeps.id = folks.id"; let expected = "Projection: #peeps.id, #folks.first_name\ \n Inner Join: #peeps.id = #folks.id\ - \n AliasedRelation: peeps\ + \n SubqueryAlias: peeps\ \n TableScan: person projection=None\ - \n AliasedRelation: folks\ + \n SubqueryAlias: folks\ \n TableScan: person projection=None"; quick_test(sql, expected); } From 2716f69dfd0379bbae4237a3a6b910988b01fcee Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 10 Apr 2022 19:19:44 -0600 Subject: [PATCH 7/8] fmt --- ballista/rust/core/src/serde/logical_plan/mod.rs | 2 +- datafusion/core/src/logical_plan/builder.rs | 4 ++-- datafusion/core/src/logical_plan/plan.rs | 6 +++--- datafusion/core/src/optimizer/projection_push_down.rs | 2 +- datafusion/core/src/optimizer/utils.rs | 3 +-- datafusion/core/src/physical_plan/planner.rs | 2 +- 6 files changed, 9 insertions(+), 10 deletions(-) diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index f8a2a75065155..a0264271a5eea 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -29,7 +29,7 @@ use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig}; use datafusion::logical_plan::plan::{ - Aggregate, SubqueryAlias, EmptyRelation, Filter, Join, Projection, Sort, Window, + Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, Window, }; use datafusion::logical_plan::{ Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr, diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index 50345db8f0741..c88b25d0a2251 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -25,8 +25,8 @@ use crate::datasource::{ use crate::error::{DataFusionError, Result}; use crate::logical_plan::expr_schema::ExprSchemable; use crate::logical_plan::plan::{ - Aggregate, SubqueryAlias, Analyze, EmptyRelation, Explain, Filter, Join, - Projection, Sort, TableScan, ToStringifiedPlan, Union, Window, + Aggregate, Analyze, EmptyRelation, Explain, Filter, Join, Projection, Sort, + SubqueryAlias, TableScan, ToStringifiedPlan, Union, Window, }; use crate::optimizer::utils; use crate::prelude::*; diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs index 37b381fb5ae68..66307c6aba464 100644 --- a/datafusion/core/src/logical_plan/plan.rs +++ b/datafusion/core/src/logical_plan/plan.rs @@ -1094,9 +1094,9 @@ impl LogicalPlan { } }, LogicalPlan::Limit(Limit { ref n, .. }) => write!(f, "Limit: {}", n), - LogicalPlan::SubqueryAlias(SubqueryAlias { - ref alias, .. - }) => write!(f, "SubqueryAlias: {}", alias), + LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => { + write!(f, "SubqueryAlias: {}", alias) + } LogicalPlan::CreateExternalTable(CreateExternalTable { ref name, .. diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs index 2a7047c989425..5841c2ef8f402 100644 --- a/datafusion/core/src/optimizer/projection_push_down.rs +++ b/datafusion/core/src/optimizer/projection_push_down.rs @@ -21,7 +21,7 @@ use crate::error::{DataFusionError, Result}; use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::{ - Aggregate, SubqueryAlias, Analyze, Join, Projection, TableScan, Window, + Aggregate, Analyze, Join, Projection, SubqueryAlias, TableScan, Window, }; use crate::logical_plan::{ build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan, diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs index 799737a2c25ec..0dab2d3ed7bcb 100644 --- a/datafusion/core/src/optimizer/utils.rs +++ b/datafusion/core/src/optimizer/utils.rs @@ -20,8 +20,7 @@ use super::optimizer::OptimizerRule; use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::{ - Aggregate, SubqueryAlias, Analyze, Extension, Filter, Join, Projection, Sort, - Window, + Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, SubqueryAlias, Window, }; use crate::logical_plan::{ diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index a2b75533103f9..98076d1365bcd 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -24,7 +24,7 @@ use super::{ }; use crate::execution::context::{ExecutionProps, SessionState}; use crate::logical_plan::plan::{ - Aggregate, SubqueryAlias, EmptyRelation, Filter, Join, Projection, Sort, TableScan, + Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, TableScan, Window, }; use crate::logical_plan::{ From 1f043b884f185e57d53ffe55da4640199e93e65a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 12 Apr 2022 08:53:17 -0600 Subject: [PATCH 8/8] Add unit test for projection push down with table alias --- .../core/src/optimizer/projection_push_down.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs index 5841c2ef8f402..10bf5d10f9602 100644 --- a/datafusion/core/src/optimizer/projection_push_down.rs +++ b/datafusion/core/src/optimizer/projection_push_down.rs @@ -543,6 +543,24 @@ mod tests { Ok(()) } + #[test] + fn aggregate_group_by_with_table_alias() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .alias("a")? + .aggregate(vec![col("c")], vec![max(col("b"))])? + .build()?; + + let expected = "Aggregate: groupBy=[[#a.c]], aggr=[[MAX(#a.b)]]\ + \n SubqueryAlias: a\ + \n TableScan: test projection=Some([1, 2])"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } + #[test] fn aggregate_no_group_by_with_filter() -> Result<()> { let table_scan = test_table_scan()?;