diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs index a60f8c2f8547..e693d7b209c8 100644 --- a/datafusion/core/src/sql/planner.rs +++ b/datafusion/core/src/sql/planner.rs @@ -49,6 +49,7 @@ use arrow::datatypes::*; use datafusion_expr::{window_function::WindowFunction, BuiltinScalarFunction}; use hashbrown::HashMap; +use datafusion_expr::logical_plan::Subquery; use sqlparser::ast::{ BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg, FunctionArgExpr, Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query, @@ -221,9 +222,23 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } - /// Generate a logic plan from an SQL query + /// Generate a logical plan from an SQL query pub fn query_to_plan(&self, query: Query) -> Result { - self.query_to_plan_with_alias(query, None, &mut HashMap::new()) + self.query_to_plan_with_alias(query, None, &mut HashMap::new(), None) + } + + /// Generate a logical plan from a SQL subquery + pub fn subquery_to_plan( + &self, + query: Query, + outer_query_schema: &DFSchema, + ) -> Result { + self.query_to_plan_with_alias( + query, + None, + &mut HashMap::new(), + Some(outer_query_schema), + ) } /// Generate a logic plan from an SQL query with optional alias @@ -232,6 +247,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { query: Query, alias: Option, ctes: &mut HashMap, + outer_query_schema: Option<&DFSchema>, ) -> Result { let set_expr = query.body; if let Some(with) = query.with { @@ -251,11 +267,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { cte.query, Some(cte.alias.name.value.clone()), &mut ctes.clone(), + outer_query_schema, )?; ctes.insert(cte.alias.name.value, logical_plan); } } - let plan = self.set_expr_to_plan(set_expr, alias, ctes)?; + let plan = self.set_expr_to_plan(set_expr, alias, ctes, outer_query_schema)?; let plan = self.order_by(plan, query.order_by)?; @@ -267,9 +284,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { set_expr: SetExpr, alias: Option, ctes: &mut HashMap, + outer_query_schema: Option<&DFSchema>, ) -> Result { match set_expr { - SetExpr::Select(s) => self.select_to_plan(*s, ctes, alias), + SetExpr::Select(s) => { + self.select_to_plan(*s, ctes, alias, outer_query_schema) + } SetExpr::Values(v) => self.sql_values_to_plan(v), SetExpr::SetOperation { op, @@ -277,8 +297,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { right, all, } => { - let left_plan = self.set_expr_to_plan(*left, None, ctes)?; - let right_plan = self.set_expr_to_plan(*right, None, ctes)?; + let left_plan = + self.set_expr_to_plan(*left, None, ctes, outer_query_schema)?; + let right_plan = + self.set_expr_to_plan(*right, None, ctes, outer_query_schema)?; match (op, all) { (SetOperator::Union, true) => { union_with_alias(left_plan, right_plan, alias) @@ -429,12 +451,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { &self, from: Vec, ctes: &mut HashMap, + outer_query_schema: Option<&DFSchema>, ) -> Result> { match from.len() { 0 => Ok(vec![LogicalPlanBuilder::empty(true).build()?]), _ => from .into_iter() - .map(|t| self.plan_table_with_joins(t, ctes)) + .map(|t| self.plan_table_with_joins(t, ctes, outer_query_schema)) .collect::>>(), } } @@ -443,16 +466,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { &self, t: TableWithJoins, ctes: &mut HashMap, + outer_query_schema: Option<&DFSchema>, ) -> Result { - let left = self.create_relation(t.relation, ctes)?; + let left = self.create_relation(t.relation, ctes, outer_query_schema)?; match t.joins.len() { 0 => Ok(left), _ => { let mut joins = t.joins.into_iter(); - let mut left = - self.parse_relation_join(left, joins.next().unwrap(), ctes)?; + let mut left = self.parse_relation_join( + left, + joins.next().unwrap(), + ctes, + outer_query_schema, + )?; for join in joins { - left = self.parse_relation_join(left, join, ctes)?; + left = + self.parse_relation_join(left, join, ctes, outer_query_schema)?; } Ok(left) } @@ -464,8 +493,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { left: LogicalPlan, join: Join, ctes: &mut HashMap, + outer_query_schema: Option<&DFSchema>, ) -> Result { - let right = self.create_relation(join.relation, ctes)?; + let right = self.create_relation(join.relation, ctes, outer_query_schema)?; match join.join_operator { JoinOperator::LeftOuter(constraint) => { self.parse_join(left, right, constraint, JoinType::Left) @@ -632,6 +662,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { &self, relation: TableFactor, ctes: &mut HashMap, + outer_query_schema: Option<&DFSchema>, ) -> Result { let (plan, alias) = match relation { TableFactor::Table { @@ -675,6 +706,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { *subquery, alias.as_ref().map(|a| a.name.value.to_string()), ctes, + outer_query_schema, )?; ( project_with_alias( @@ -689,9 +721,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { alias, ) } - TableFactor::NestedJoin(table_with_joins) => { - (self.plan_table_with_joins(*table_with_joins, ctes)?, None) - } + TableFactor::NestedJoin(table_with_joins) => ( + self.plan_table_with_joins(*table_with_joins, ctes, outer_query_schema)?, + None, + ), // @todo Support TableFactory::TableFunction? _ => { return Err(DataFusionError::NotImplemented(format!( @@ -734,8 +767,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { &self, selection: Option, plans: Vec, + outer_query_schema: Option<&DFSchema>, ) -> Result { - let plan = match selection { + match selection { Some(predicate_expr) => { // build join schema let mut fields = vec![]; @@ -744,6 +778,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { fields.extend_from_slice(plan.schema().fields()); metadata.extend(plan.schema().metadata().clone()); } + if let Some(outer) = outer_query_schema { + fields.extend_from_slice(outer.fields()); + metadata.extend(outer.metadata().clone()); + } let join_schema = DFSchema::new_with_metadata(fields, metadata)?; let filter_expr = self.sql_to_rex(predicate_expr, &join_schema)?; @@ -853,8 +891,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { Ok(left) } } - }; - plan + } } /// Generate a logic plan from an SQL select @@ -863,17 +900,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { select: Select, ctes: &mut HashMap, alias: Option, + outer_query_schema: Option<&DFSchema>, ) -> Result { // process `from` clause - let plans = self.plan_from_tables(select.from, ctes)?; + let plans = self.plan_from_tables(select.from, ctes, outer_query_schema)?; let empty_from = matches!(plans.first(), Some(LogicalPlan::EmptyRelation(_))); // process `where` clause - let plan = self.plan_selection(select.selection, plans)?; + let plan = self.plan_selection(select.selection, plans, outer_query_schema)?; // process the SELECT expressions, with wildcards expanded. - let select_exprs = - self.prepare_select_exprs(&plan, select.projection, empty_from)?; + let select_exprs = self.prepare_select_exprs( + &plan, + select.projection, + empty_from, + outer_query_schema, + )?; // having and group by clause may reference aliases defined in select projection let projected_plan = self.project(plan.clone(), select_exprs.clone())?; @@ -1010,10 +1052,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { plan: &LogicalPlan, projection: Vec, empty_from: bool, + outer_query_schema: Option<&DFSchema>, ) -> Result> { projection .into_iter() - .map(|expr| self.sql_select_to_rex(expr, plan, empty_from)) + .map(|expr| { + self.sql_select_to_rex(expr, plan, empty_from, outer_query_schema) + }) .flat_map(|result| match result { Ok(vec) => vec.into_iter().map(Ok).collect(), Err(err) => vec![Err(err)], @@ -1208,16 +1253,25 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { sql: SelectItem, plan: &LogicalPlan, empty_from: bool, + outer_query_schema: Option<&DFSchema>, ) -> Result> { - let input_schema = plan.schema(); + let input_schema = match outer_query_schema { + Some(x) => { + let mut input_schema = plan.schema().as_ref().clone(); + input_schema.merge(x); + input_schema + } + _ => plan.schema().as_ref().clone(), + }; + match sql { SelectItem::UnnamedExpr(expr) => { - let expr = self.sql_to_rex(expr, input_schema)?; + let expr = self.sql_to_rex(expr, &input_schema)?; Ok(vec![normalize_col(expr, plan)?]) } SelectItem::ExprWithAlias { expr, alias } => { let expr = Alias( - Box::new(self.sql_to_rex(expr, input_schema)?), + Box::new(self.sql_to_rex(expr, &input_schema)?), normalize_ident(alias), ); Ok(vec![normalize_col(expr, plan)?]) @@ -1228,12 +1282,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { "SELECT * with no tables specified is not valid".to_string(), )); } - expand_wildcard(input_schema, plan) + // do not expand from outer schema + expand_wildcard(plan.schema().as_ref(), plan) } - SelectItem::QualifiedWildcard(ref object_name) => { let qualifier = format!("{}", object_name); - expand_qualified_wildcard(&qualifier, input_schema, plan) + // do not expand from outer schema + expand_qualified_wildcard(&qualifier, plan.schema().as_ref(), plan) } } } @@ -1588,8 +1643,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { right: Box::new(self.sql_expr_to_logical_expr(*right, schema)?), }), - SQLExpr::UnaryOp { op, expr } => { - self.parse_sql_unary_op(op, *expr, schema) + + SQLExpr::UnaryOp { op, expr } => match (&op, expr.as_ref()) { + // The AST for Exists does not support the NOT EXISTS case so it gets + // wrapped in a unary NOT + // https://github.com/sqlparser-rs/sqlparser-rs/issues/472 + (&UnaryOperator::Not, &SQLExpr::Exists(ref subquery)) => self.parse_exists_subquery(subquery, true, schema), + _ => self.parse_sql_unary_op(op, *expr, schema) } SQLExpr::Between { @@ -1820,6 +1880,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { SQLExpr::Nested(e) => self.sql_expr_to_logical_expr(*e, schema), + SQLExpr::Exists(subquery) => self.parse_exists_subquery(&subquery, false, schema), + + SQLExpr::InSubquery { .. } => Err(DataFusionError::NotImplemented( + "IN subqueries are not supported yet".to_owned(), + )), + + SQLExpr::Subquery(_) => Err(DataFusionError::NotImplemented( + "Scalar subqueries are not supported yet".to_owned(), + )), + _ => Err(DataFusionError::NotImplemented(format!( "Unsupported ast node {:?} in sqltorel", sql @@ -1827,6 +1897,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } + fn parse_exists_subquery( + &self, + subquery: &Query, + negated: bool, + input_schema: &DFSchema, + ) -> Result { + Ok(Expr::Exists { + subquery: Subquery { + subquery: Arc::new( + self.subquery_to_plan(subquery.clone(), input_schema)?, + ), + }, + negated, + }) + } + fn function_args_to_expr( &self, args: Vec, @@ -4182,4 +4268,47 @@ mod tests { \n TableScan: test projection=None"; quick_test(sql, expected); } + + #[test] + fn exists_subquery() { + let sql = "SELECT id FROM person p WHERE EXISTS \ + (SELECT first_name FROM person \ + WHERE last_name = p.last_name \ + AND state = p.state)"; + + let subquery_expected = "Subquery: Projection: #person.first_name\ + \n Filter: #person.last_name = #p.last_name AND #person.state = #p.state\ + \n TableScan: person projection=None"; + + let expected = format!( + "Projection: #p.id\ + \n Filter: EXISTS ({})\ + \n SubqueryAlias: p\ + \n TableScan: person projection=None", + subquery_expected + ); + quick_test(sql, &expected); + } + + #[test] + fn exists_subquery_wildcard() { + let sql = "SELECT id FROM person p WHERE EXISTS \ + (SELECT * FROM person \ + WHERE last_name = p.last_name \ + AND state = p.state)"; + + let subquery_expected = "Subquery: Projection: #person.id, #person.first_name, \ + #person.last_name, #person.age, #person.state, #person.salary, #person.birth_date, #person.😀\ + \n Filter: #person.last_name = #p.last_name AND #person.state = #p.state\ + \n TableScan: person projection=None"; + + let expected = format!( + "Projection: #p.id\ + \n Filter: EXISTS ({})\ + \n SubqueryAlias: p\ + \n TableScan: person projection=None", + subquery_expected + ); + quick_test(sql, &expected); + } }