Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ message LogicalPlanNode {
CreateCatalogSchemaNode create_catalog_schema = 18;
UnionNode union = 19;
CreateCatalogNode create_catalog = 20;
SubqueryAliasNode subquery_alias = 21;
}
}

Expand Down Expand Up @@ -241,6 +242,11 @@ message SelectionExecNode {
datafusion.LogicalExprNode expr = 1;
}

message SubqueryAliasNode {
LogicalPlanNode input = 1;
string alias = 2;
}

///////////////////////////////////////////////////////////////////////////////////////////////////
// Ballista Physical Plan
///////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
26 changes: 24 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};

use datafusion::logical_plan::plan::{
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, Window,
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, Window,
};
use datafusion::logical_plan::{
Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr,
Expand Down Expand Up @@ -377,6 +376,14 @@ impl AsLogicalPlan for LogicalPlanNode {
.build()
.map_err(|e| e.into())
}
LogicalPlanType::SubqueryAlias(aliased_relation) => {
let input: LogicalPlan =
into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
LogicalPlanBuilder::from(input)
.alias(&aliased_relation.alias)?
.build()
.map_err(|e| e.into())
}
LogicalPlanType::Limit(limit) => {
let input: LogicalPlan =
into_logical_plan!(limit.input, ctx, extension_codec)?;
Expand Down Expand Up @@ -700,6 +707,21 @@ impl AsLogicalPlan for LogicalPlanNode {
))),
})
}
LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
let input: protobuf::LogicalPlanNode =
protobuf::LogicalPlanNode::try_from_logical_plan(
input.as_ref(),
extension_codec,
)?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
protobuf::SubqueryAliasNode {
input: Some(Box::new(input)),
alias: alias.clone(),
},
))),
})
}
LogicalPlan::Limit(Limit { input, n }) => {
let input: protobuf::LogicalPlanNode =
protobuf::LogicalPlanNode::try_from_logical_plan(
Expand Down
14 changes: 13 additions & 1 deletion datafusion/core/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::error::{DataFusionError, Result};
use crate::logical_plan::expr_schema::ExprSchemable;
use crate::logical_plan::plan::{
Aggregate, Analyze, EmptyRelation, Explain, Filter, Join, Projection, Sort,
TableScan, ToStringifiedPlan, Union, Window,
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Window,
};
use crate::optimizer::utils;
use crate::prelude::*;
Expand Down Expand Up @@ -518,6 +518,18 @@ impl LogicalPlanBuilder {
})))
}

/// Apply an alias
pub fn alias(&self, alias: &str) -> Result<Self> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way that this same aliasing can be represented is a Projection node

So if the input has schema with columns a, b, c

Then to implement alias("foo") you could build a LogicalPlanNode that was Project([a foo.a, b as foo.b, c as foo.c])

There may be some good reason to introduce a new type of LogicalPlanNode too that I don't understand, but I wanted to point out this alternative

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, and I see you don't want to do that :) in the description

I wonder if you could use the DFSchema::relation name for this purpose?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I need something to very specifically say that a table is being used as an alias. An easier change might be just to add an additional field to the TableScan to record the original table name. I think I will put up a separate PR for that approach.

So to explain why I need this. I want to use DataFusion as a SQL parser and planner but I want to execute the query in a different engine. I can provide a TableProvider so that DataFuision can get the schema for table person and I get a logical plan. When I go to translate that plan to a physical plan, it refers to a table called peeps (the alias) and I have no way to know the actual table name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the Spark plan for this use case:

'Project [*]
+- 'SubqueryAlias peeps
   +- 'UnresolvedRelation [person], [], false

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I prefer the approach taken in this PR, as it could handle subqueries rather than just table scans (as suggested by the name).

I think this is a cleaner, more generic approach.

let schema: Schema = self.schema().as_ref().clone().into();
let schema =
DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?);
Ok(Self::from(LogicalPlan::SubqueryAlias(SubqueryAlias {
input: Arc::new(self.plan.clone()),
alias: alias.to_string(),
schema,
})))
}

/// Add missing sort columns to all downstream projection
fn add_missing_columns(
&self,
Expand Down
25 changes: 25 additions & 0 deletions datafusion/core/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ pub struct Projection {
pub alias: Option<String>,
}

/// Aliased subquery
#[derive(Clone)]
pub struct SubqueryAlias {
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
/// The alias for the input relation
pub alias: String,
/// The schema with qualified field names
pub schema: DFSchemaRef,
}

/// Filters rows from its input that do not match an
/// expression (essentially a WHERE clause with a predicate
/// expression).
Expand Down Expand Up @@ -372,6 +383,8 @@ pub enum LogicalPlan {
TableScan(TableScan),
/// Produces no rows: An empty relation with an empty schema
EmptyRelation(EmptyRelation),
/// Aliased relation provides, or changes, the name of a relation.
SubqueryAlias(SubqueryAlias),
/// Produces the first `n` tuples from its input and discards the rest.
Limit(Limit),
/// Creates an external table.
Expand Down Expand Up @@ -416,6 +429,7 @@ impl LogicalPlan {
LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
schema
}
Expand Down Expand Up @@ -464,6 +478,9 @@ impl LogicalPlan {
schemas.insert(0, schema);
schemas
}
LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => {
vec![schema]
}
LogicalPlan::Union(Union { schema, .. }) => {
vec![schema]
}
Expand Down Expand Up @@ -525,6 +542,7 @@ impl LogicalPlan {
// plans without expressions
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Limit(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
Expand Down Expand Up @@ -553,6 +571,7 @@ impl LogicalPlan {
LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![left, right],
LogicalPlan::Limit(Limit { input, .. }) => vec![input],
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
LogicalPlan::Extension(extension) => extension.node.inputs(),
LogicalPlan::Union(Union { inputs, .. }) => inputs.iter().collect(),
LogicalPlan::Explain(explain) => vec![&explain.plan],
Expand Down Expand Up @@ -701,6 +720,9 @@ impl LogicalPlan {
true
}
LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?,
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
input.accept(visitor)?
}
LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
input.accept(visitor)?
}
Expand Down Expand Up @@ -1072,6 +1094,9 @@ impl LogicalPlan {
}
},
LogicalPlan::Limit(Limit { ref n, .. }) => write!(f, "Limit: {}", n),
LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
write!(f, "SubqueryAlias: {}", alias)
}
LogicalPlan::CreateExternalTable(CreateExternalTable {
ref name,
..
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/optimizer/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
| LogicalPlan::TableScan { .. }
| LogicalPlan::Values(_)
| LogicalPlan::EmptyRelation(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Limit(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::Explain { .. }
Expand Down
48 changes: 47 additions & 1 deletion datafusion/core/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use crate::error::{DataFusionError, Result};
use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::{
Aggregate, Analyze, Join, Projection, TableScan, Window,
Aggregate, Analyze, Join, Projection, SubqueryAlias, TableScan, Window,
};
use crate::logical_plan::{
build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan,
Expand Down Expand Up @@ -432,6 +432,34 @@ fn optimize_plan(
alias: alias.clone(),
}))
}
LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it worth a test for projection pushdown specifically?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Test added.

match input.as_ref() {
LogicalPlan::TableScan(TableScan { table_name, .. }) => {
let new_required_columns = new_required_columns
.iter()
.map(|c| match &c.relation {
Some(q) if q == alias => Column {
relation: Some(table_name.clone()),
name: c.name.clone(),
},
_ => c.clone(),
})
.collect();
let new_inputs = vec![optimize_plan(
_optimizer,
input,
&new_required_columns,
has_projection,
_execution_props,
)?];
let expr = vec![];
utils::from_plan(plan, &expr, &new_inputs)
}
_ => Err(DataFusionError::Plan(
"SubqueryAlias should only wrap TableScan".to_string(),
)),
}
}
// all other nodes: Add any additional columns used by
// expressions in this node to the list of required columns
LogicalPlan::Limit(_)
Expand Down Expand Up @@ -515,6 +543,24 @@ mod tests {
Ok(())
}

#[test]
fn aggregate_group_by_with_table_alias() -> Result<()> {
let table_scan = test_table_scan()?;

let plan = LogicalPlanBuilder::from(table_scan)
.alias("a")?
.aggregate(vec![col("c")], vec![max(col("b"))])?
.build()?;

let expected = "Aggregate: groupBy=[[#a.c]], aggr=[[MAX(#a.b)]]\
\n SubqueryAlias: a\
\n TableScan: test projection=Some([1, 2])";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}

#[test]
fn aggregate_no_group_by_with_filter() -> Result<()> {
let table_scan = test_table_scan()?;
Expand Down
13 changes: 12 additions & 1 deletion datafusion/core/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use super::optimizer::OptimizerRule;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::{
Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, Window,
Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, SubqueryAlias, Window,
};

use crate::logical_plan::{
Expand All @@ -34,6 +34,7 @@ use crate::{
error::{DataFusionError, Result},
logical_plan::ExpressionVisitor,
};
use datafusion_common::DFSchema;
use std::{collections::HashSet, sync::Arc};

const CASE_EXPR_MARKER: &str = "__DATAFUSION_CASE_EXPR__";
Expand Down Expand Up @@ -222,6 +223,16 @@ pub fn from_plan(
let right = &inputs[1];
LogicalPlanBuilder::from(left).cross_join(right)?.build()
}
LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
let schema = inputs[0].schema().as_ref().clone().into();
let schema =
DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?);
Ok(LogicalPlan::SubqueryAlias(SubqueryAlias {
alias: alias.clone(),
input: Arc::new(inputs[0].clone()),
schema,
}))
}
LogicalPlan::Limit(Limit { n, .. }) => Ok(LogicalPlan::Limit(Limit {
n: *n,
input: Arc::new(inputs[0].clone()),
Expand Down
13 changes: 12 additions & 1 deletion datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use super::{
};
use crate::execution::context::{ExecutionProps, SessionState};
use crate::logical_plan::plan::{
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, TableScan, Window,
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, TableScan,
Window,
};
use crate::logical_plan::{
unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, Operator,
Expand Down Expand Up @@ -785,6 +786,16 @@ impl DefaultPhysicalPlanner {
*produce_one_row,
SchemaRef::new(schema.as_ref().to_owned().into()),
))),
LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
match input.as_ref() {
LogicalPlan::TableScan(scan) => {
let mut scan = scan.clone();
scan.table_name = alias.clone();
self.create_initial_plan(input, session_state).await
}
_ => Err(DataFusionError::Plan("SubqueryAlias should only wrap TableScan".to_string()))
}
}
LogicalPlan::Limit(Limit { input, n, .. }) => {
let limit = *n;
let input = self.create_initial_plan(input, session_state).await?;
Expand Down
40 changes: 27 additions & 13 deletions datafusion/core/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,16 +644,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
self.schema_provider.get_table_provider(name.try_into()?),
) {
(Some(cte_plan), _) => Ok(cte_plan.clone()),
(_, Some(provider)) => LogicalPlanBuilder::scan(
// take alias into account to support `JOIN table1 as table2`
alias
.as_ref()
.map(|a| a.name.value.as_str())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

.unwrap_or(&table_name),
provider,
None,
)?
.build(),
(_, Some(provider)) => {
let scan =
LogicalPlanBuilder::scan(&table_name, provider, None);
let scan = match alias {
Some(ref name) => scan?.alias(name.name.value.as_str()),
_ => scan,
};
scan?.build()
}
(None, None) => Err(DataFusionError::Plan(format!(
"Table or CTE with name '{}' not found",
name
Expand Down Expand Up @@ -2492,7 +2491,8 @@ mod tests {
FROM lineitem l (a, b, c)";
let expected = "Projection: #l.a, #l.b, #l.c\
\n Projection: #l.l_item_id AS a, #l.l_description AS b, #l.price AS c, alias=l\
\n TableScan: l projection=None";
\n SubqueryAlias: l\
\n TableScan: lineitem projection=None";
quick_test(sql, expected);
}

Expand Down Expand Up @@ -3458,7 +3458,8 @@ mod tests {
let expected = "Projection: #person.first_name, #person.id\
\n Inner Join: Using #person.id = #person2.id\
\n TableScan: person projection=None\
\n TableScan: person2 projection=None";
\n SubqueryAlias: person2\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the appearance of SubqueryAlias in this query is somewhat strange to me as there is no subquery in this SQL (maybe a better name is RelationAlias or something)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally named this AliasedRelation and then changed it to SubqueryAlias because that is what Spark uses. I don't have a strong preference either way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

me neither. Whatever you prefer is fine with me

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's leave it as is for now since we will make use of it for subqueries (soon, hopefully).

I filed follow-on issues for use this approach for projection and union:

\n TableScan: person projection=None";
quick_test(sql, expected);
}

Expand All @@ -3471,7 +3472,8 @@ mod tests {
let expected = "Projection: #lineitem.l_item_id, #lineitem.l_description, #lineitem.price, #lineitem2.l_description, #lineitem2.price\
\n Inner Join: Using #lineitem.l_item_id = #lineitem2.l_item_id\
\n TableScan: lineitem projection=None\
\n TableScan: lineitem2 projection=None";
\n SubqueryAlias: lineitem2\
\n TableScan: lineitem projection=None";
quick_test(sql, expected);
}

Expand Down Expand Up @@ -4067,6 +4069,18 @@ mod tests {
quick_test(sql, expected);
}

#[test]
fn join_with_aliases() {
let sql = "select peeps.id, folks.first_name from person as peeps join person as folks on peeps.id = folks.id";
let expected = "Projection: #peeps.id, #folks.first_name\
\n Inner Join: #peeps.id = #folks.id\
\n SubqueryAlias: peeps\
\n TableScan: person projection=None\
\n SubqueryAlias: folks\
\n TableScan: person projection=None";
quick_test(sql, expected);
}

#[test]
fn cte_use_same_name_multiple_times() {
let sql = "with a as (select * from person), a as (select * from orders) select * from a;";
Expand Down