diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 87c6ccabb2448..bab783a9e4bd4 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -52,6 +52,7 @@ message LogicalPlanNode { CreateCatalogSchemaNode create_catalog_schema = 18; UnionNode union = 19; CreateCatalogNode create_catalog = 20; + SubqueryAliasNode subquery_alias = 21; } } @@ -241,6 +242,11 @@ message SelectionExecNode { datafusion.LogicalExprNode expr = 1; } +message SubqueryAliasNode { + LogicalPlanNode input = 1; + string alias = 2; +} + /////////////////////////////////////////////////////////////////////////////////////////////////// // Ballista Physical Plan /////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index 57cb35decede5..a0264271a5eea 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -28,9 +28,8 @@ use datafusion::datasource::file_format::csv::CsvFormat; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig}; - use datafusion::logical_plan::plan::{ - Aggregate, EmptyRelation, Filter, Join, Projection, Sort, Window, + Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, Window, }; use datafusion::logical_plan::{ Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr, @@ -377,6 +376,14 @@ impl AsLogicalPlan for LogicalPlanNode { .build() .map_err(|e| e.into()) } + LogicalPlanType::SubqueryAlias(aliased_relation) => { + let input: LogicalPlan = + into_logical_plan!(aliased_relation.input, ctx, extension_codec)?; + LogicalPlanBuilder::from(input) + .alias(&aliased_relation.alias)? + .build() + .map_err(|e| e.into()) + } LogicalPlanType::Limit(limit) => { let input: LogicalPlan = into_logical_plan!(limit.input, ctx, extension_codec)?; @@ -700,6 +707,21 @@ impl AsLogicalPlan for LogicalPlanNode { ))), }) } + LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { + let input: protobuf::LogicalPlanNode = + protobuf::LogicalPlanNode::try_from_logical_plan( + input.as_ref(), + extension_codec, + )?; + Ok(protobuf::LogicalPlanNode { + logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new( + protobuf::SubqueryAliasNode { + input: Some(Box::new(input)), + alias: alias.clone(), + }, + ))), + }) + } LogicalPlan::Limit(Limit { input, n }) => { let input: protobuf::LogicalPlanNode = protobuf::LogicalPlanNode::try_from_logical_plan( diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index d2f5c04182fb5..c88b25d0a2251 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -26,7 +26,7 @@ use crate::error::{DataFusionError, Result}; use crate::logical_plan::expr_schema::ExprSchemable; use crate::logical_plan::plan::{ Aggregate, Analyze, EmptyRelation, Explain, Filter, Join, Projection, Sort, - TableScan, ToStringifiedPlan, Union, Window, + SubqueryAlias, TableScan, ToStringifiedPlan, Union, Window, }; use crate::optimizer::utils; use crate::prelude::*; @@ -518,6 +518,18 @@ impl LogicalPlanBuilder { }))) } + /// Apply an alias + pub fn alias(&self, alias: &str) -> Result { + let schema: Schema = self.schema().as_ref().clone().into(); + let schema = + DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?); + Ok(Self::from(LogicalPlan::SubqueryAlias(SubqueryAlias { + input: Arc::new(self.plan.clone()), + alias: alias.to_string(), + schema, + }))) + } + /// Add missing sort columns to all downstream projection fn add_missing_columns( &self, diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs index 88e0a185b1d8c..66307c6aba464 100644 --- a/datafusion/core/src/logical_plan/plan.rs +++ b/datafusion/core/src/logical_plan/plan.rs @@ -87,6 +87,17 @@ pub struct Projection { pub alias: Option, } +/// Aliased subquery +#[derive(Clone)] +pub struct SubqueryAlias { + /// The incoming logical plan + pub input: Arc, + /// The alias for the input relation + pub alias: String, + /// The schema with qualified field names + pub schema: DFSchemaRef, +} + /// Filters rows from its input that do not match an /// expression (essentially a WHERE clause with a predicate /// expression). @@ -372,6 +383,8 @@ pub enum LogicalPlan { TableScan(TableScan), /// Produces no rows: An empty relation with an empty schema EmptyRelation(EmptyRelation), + /// Aliased relation provides, or changes, the name of a relation. + SubqueryAlias(SubqueryAlias), /// Produces the first `n` tuples from its input and discards the rest. Limit(Limit), /// Creates an external table. @@ -416,6 +429,7 @@ impl LogicalPlan { LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema, LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(), LogicalPlan::Limit(Limit { input, .. }) => input.schema(), + LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema, LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => { schema } @@ -464,6 +478,9 @@ impl LogicalPlan { schemas.insert(0, schema); schemas } + LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => { + vec![schema] + } LogicalPlan::Union(Union { schema, .. }) => { vec![schema] } @@ -525,6 +542,7 @@ impl LogicalPlan { // plans without expressions LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation(_) + | LogicalPlan::SubqueryAlias(_) | LogicalPlan::Limit(_) | LogicalPlan::CreateExternalTable(_) | LogicalPlan::CreateMemoryTable(_) @@ -553,6 +571,7 @@ impl LogicalPlan { LogicalPlan::Join(Join { left, right, .. }) => vec![left, right], LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![left, right], LogicalPlan::Limit(Limit { input, .. }) => vec![input], + LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input], LogicalPlan::Extension(extension) => extension.node.inputs(), LogicalPlan::Union(Union { inputs, .. }) => inputs.iter().collect(), LogicalPlan::Explain(explain) => vec![&explain.plan], @@ -701,6 +720,9 @@ impl LogicalPlan { true } LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?, + LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => { + input.accept(visitor)? + } LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => { input.accept(visitor)? } @@ -1072,6 +1094,9 @@ impl LogicalPlan { } }, LogicalPlan::Limit(Limit { ref n, .. }) => write!(f, "Limit: {}", n), + LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => { + write!(f, "SubqueryAlias: {}", alias) + } LogicalPlan::CreateExternalTable(CreateExternalTable { ref name, .. diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs index 3db1ca4d35451..39964df4a6635 100644 --- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs @@ -216,6 +216,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result { + match input.as_ref() { + LogicalPlan::TableScan(TableScan { table_name, .. }) => { + let new_required_columns = new_required_columns + .iter() + .map(|c| match &c.relation { + Some(q) if q == alias => Column { + relation: Some(table_name.clone()), + name: c.name.clone(), + }, + _ => c.clone(), + }) + .collect(); + let new_inputs = vec![optimize_plan( + _optimizer, + input, + &new_required_columns, + has_projection, + _execution_props, + )?]; + let expr = vec![]; + utils::from_plan(plan, &expr, &new_inputs) + } + _ => Err(DataFusionError::Plan( + "SubqueryAlias should only wrap TableScan".to_string(), + )), + } + } // all other nodes: Add any additional columns used by // expressions in this node to the list of required columns LogicalPlan::Limit(_) @@ -515,6 +543,24 @@ mod tests { Ok(()) } + #[test] + fn aggregate_group_by_with_table_alias() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .alias("a")? + .aggregate(vec![col("c")], vec![max(col("b"))])? + .build()?; + + let expected = "Aggregate: groupBy=[[#a.c]], aggr=[[MAX(#a.b)]]\ + \n SubqueryAlias: a\ + \n TableScan: test projection=Some([1, 2])"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } + #[test] fn aggregate_no_group_by_with_filter() -> Result<()> { let table_scan = test_table_scan()?; diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs index c01515d353736..0dab2d3ed7bcb 100644 --- a/datafusion/core/src/optimizer/utils.rs +++ b/datafusion/core/src/optimizer/utils.rs @@ -20,7 +20,7 @@ use super::optimizer::OptimizerRule; use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::{ - Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, Window, + Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, SubqueryAlias, Window, }; use crate::logical_plan::{ @@ -34,6 +34,7 @@ use crate::{ error::{DataFusionError, Result}, logical_plan::ExpressionVisitor, }; +use datafusion_common::DFSchema; use std::{collections::HashSet, sync::Arc}; const CASE_EXPR_MARKER: &str = "__DATAFUSION_CASE_EXPR__"; @@ -222,6 +223,16 @@ pub fn from_plan( let right = &inputs[1]; LogicalPlanBuilder::from(left).cross_join(right)?.build() } + LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => { + let schema = inputs[0].schema().as_ref().clone().into(); + let schema = + DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?); + Ok(LogicalPlan::SubqueryAlias(SubqueryAlias { + alias: alias.clone(), + input: Arc::new(inputs[0].clone()), + schema, + })) + } LogicalPlan::Limit(Limit { n, .. }) => Ok(LogicalPlan::Limit(Limit { n: *n, input: Arc::new(inputs[0].clone()), diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 32fa12fb931bf..98076d1365bcd 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -24,7 +24,8 @@ use super::{ }; use crate::execution::context::{ExecutionProps, SessionState}; use crate::logical_plan::plan::{ - Aggregate, EmptyRelation, Filter, Join, Projection, Sort, TableScan, Window, + Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, TableScan, + Window, }; use crate::logical_plan::{ unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, Operator, @@ -785,6 +786,16 @@ impl DefaultPhysicalPlanner { *produce_one_row, SchemaRef::new(schema.as_ref().to_owned().into()), ))), + LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { + match input.as_ref() { + LogicalPlan::TableScan(scan) => { + let mut scan = scan.clone(); + scan.table_name = alias.clone(); + self.create_initial_plan(input, session_state).await + } + _ => Err(DataFusionError::Plan("SubqueryAlias should only wrap TableScan".to_string())) + } + } LogicalPlan::Limit(Limit { input, n, .. }) => { let limit = *n; let input = self.create_initial_plan(input, session_state).await?; diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs index 9a7c5eb445d0c..84ddace9a4a0a 100644 --- a/datafusion/core/src/sql/planner.rs +++ b/datafusion/core/src/sql/planner.rs @@ -644,16 +644,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { self.schema_provider.get_table_provider(name.try_into()?), ) { (Some(cte_plan), _) => Ok(cte_plan.clone()), - (_, Some(provider)) => LogicalPlanBuilder::scan( - // take alias into account to support `JOIN table1 as table2` - alias - .as_ref() - .map(|a| a.name.value.as_str()) - .unwrap_or(&table_name), - provider, - None, - )? - .build(), + (_, Some(provider)) => { + let scan = + LogicalPlanBuilder::scan(&table_name, provider, None); + let scan = match alias { + Some(ref name) => scan?.alias(name.name.value.as_str()), + _ => scan, + }; + scan?.build() + } (None, None) => Err(DataFusionError::Plan(format!( "Table or CTE with name '{}' not found", name @@ -2492,7 +2491,8 @@ mod tests { FROM lineitem l (a, b, c)"; let expected = "Projection: #l.a, #l.b, #l.c\ \n Projection: #l.l_item_id AS a, #l.l_description AS b, #l.price AS c, alias=l\ - \n TableScan: l projection=None"; + \n SubqueryAlias: l\ + \n TableScan: lineitem projection=None"; quick_test(sql, expected); } @@ -3458,7 +3458,8 @@ mod tests { let expected = "Projection: #person.first_name, #person.id\ \n Inner Join: Using #person.id = #person2.id\ \n TableScan: person projection=None\ - \n TableScan: person2 projection=None"; + \n SubqueryAlias: person2\ + \n TableScan: person projection=None"; quick_test(sql, expected); } @@ -3471,7 +3472,8 @@ mod tests { let expected = "Projection: #lineitem.l_item_id, #lineitem.l_description, #lineitem.price, #lineitem2.l_description, #lineitem2.price\ \n Inner Join: Using #lineitem.l_item_id = #lineitem2.l_item_id\ \n TableScan: lineitem projection=None\ - \n TableScan: lineitem2 projection=None"; + \n SubqueryAlias: lineitem2\ + \n TableScan: lineitem projection=None"; quick_test(sql, expected); } @@ -4067,6 +4069,18 @@ mod tests { quick_test(sql, expected); } + #[test] + fn join_with_aliases() { + let sql = "select peeps.id, folks.first_name from person as peeps join person as folks on peeps.id = folks.id"; + let expected = "Projection: #peeps.id, #folks.first_name\ + \n Inner Join: #peeps.id = #folks.id\ + \n SubqueryAlias: peeps\ + \n TableScan: person projection=None\ + \n SubqueryAlias: folks\ + \n TableScan: person projection=None"; + quick_test(sql, expected); + } + #[test] fn cte_use_same_name_multiple_times() { let sql = "with a as (select * from person), a as (select * from orders) select * from a;";