From 8de8eac71b95e5dd696b328857f150378943dda1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 25 Apr 2022 15:10:10 -0600 Subject: [PATCH 1/7] Add Expr::Exists --- .../rust/core/src/serde/logical_plan/mod.rs | 8 +++ .../core/src/datasource/listing/helpers.rs | 1 + datafusion/core/src/logical_plan/builder.rs | 36 +++++++++++++ .../core/src/logical_plan/expr_rewriter.rs | 1 + .../core/src/logical_plan/expr_visitor.rs | 1 + datafusion/core/src/logical_plan/mod.rs | 2 +- datafusion/core/src/logical_plan/plan.rs | 2 +- .../src/optimizer/common_subexpr_eliminate.rs | 4 ++ .../src/optimizer/projection_push_down.rs | 1 + .../src/optimizer/simplify_expressions.rs | 1 + datafusion/core/src/optimizer/utils.rs | 19 ++++++- datafusion/core/src/physical_plan/planner.rs | 4 ++ datafusion/core/src/sql/utils.rs | 7 +-- datafusion/expr/src/expr.rs | 5 ++ datafusion/expr/src/expr_schema.rs | 3 +- datafusion/expr/src/logical_plan/mod.rs | 2 +- datafusion/expr/src/logical_plan/plan.rs | 51 ++++++++++++++++++- 17 files changed, 138 insertions(+), 10 deletions(-) diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index b9ab117044542..5307aff653e45 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -708,6 +708,14 @@ impl AsLogicalPlan for LogicalPlanNode { ))), }) } + LogicalPlan::Subquery(_) => { + // note that the ballista and datafusion proto files need refactoring to allow + // LogicalExprNode to reference a LogicalPlanNode + // see https://github.com/apache/arrow-datafusion/issues/2338 + Err(BallistaError::NotImplemented( + "Ballista does not support subqueries".to_string(), + )) + } LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { let input: protobuf::LogicalPlanNode = protobuf::LogicalPlanNode::try_from_logical_plan( diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 362a8545be734..e366169650de0 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -92,6 +92,7 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> { | Expr::BinaryExpr { .. } | Expr::Between { .. } | Expr::InList { .. } + | Expr::Exists { .. } | Expr::GetIndexedField { .. } | Expr::Case { .. } => Recursion::Continue(self), diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index a0914ae73dc03..469f8109071c6 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -51,6 +51,7 @@ use crate::logical_plan::{ }; use crate::sql::utils::group_window_expr_by_sort_keys; use datafusion_common::ToDFSchema; +use datafusion_expr::logical_plan::Subquery; /// Default table name for unnamed table pub const UNNAMED_TABLE: &str = "?table?"; @@ -531,6 +532,13 @@ impl LogicalPlanBuilder { }))) } + /// Apply an EXISTS subquery expression + pub fn exists(&self, subquery: &LogicalPlan) -> Result { + Ok(Self::from(LogicalPlan::Subquery(Subquery { + subquery: Arc::new(subquery.clone()), + }))) + } + /// Add missing sort columns to all downstream projection fn add_missing_columns( &self, @@ -1204,6 +1212,7 @@ mod tests { use arrow::datatypes::{DataType, Field}; use crate::logical_plan::StringifiedPlan; + use crate::test::test_table_scan_with_name; use super::super::{col, lit, sum}; use super::*; @@ -1339,6 +1348,33 @@ mod tests { Ok(()) } + #[test] + fn exists_subquery() -> Result<()> { + let foo = test_table_scan_with_name("foo")?; + let bar = test_table_scan_with_name("bar")?; + + let subquery = LogicalPlanBuilder::from(foo) + .project(vec![col("a")])? + .filter(col("a").eq(col("bar.a")))? + .build()?; + + let outer_query = LogicalPlanBuilder::from(bar) + .project(vec![col("a")])? + .filter(Expr::Exists(Subquery { + subquery: Arc::new(subquery), + }))? + .build()?; + + let expected = "Filter: EXISTS (\ + Subquery: Filter: #foo.a = #bar.a\ + \n Projection: #foo.a\ + \n TableScan: foo projection=None)\ + \n Projection: #bar.a\n TableScan: bar projection=None"; + assert_eq!(expected, format!("{:?}", outer_query)); + + Ok(()) + } + #[test] fn projection_non_unique_names() -> Result<()> { let plan = LogicalPlanBuilder::scan_empty( diff --git a/datafusion/core/src/logical_plan/expr_rewriter.rs b/datafusion/core/src/logical_plan/expr_rewriter.rs index b8afa8a367bde..e99fc7e66cf83 100644 --- a/datafusion/core/src/logical_plan/expr_rewriter.rs +++ b/datafusion/core/src/logical_plan/expr_rewriter.rs @@ -111,6 +111,7 @@ impl ExprRewritable for Expr { let expr = match self { Expr::Alias(expr, name) => Expr::Alias(rewrite_boxed(expr, rewriter)?, name), Expr::Column(_) => self.clone(), + Expr::Exists(_) => self.clone(), Expr::ScalarVariable(ty, names) => Expr::ScalarVariable(ty, names), Expr::Literal(value) => Expr::Literal(value), Expr::BinaryExpr { left, op, right } => Expr::BinaryExpr { diff --git a/datafusion/core/src/logical_plan/expr_visitor.rs b/datafusion/core/src/logical_plan/expr_visitor.rs index 27fa9d3c79fa4..bfab0ca04c752 100644 --- a/datafusion/core/src/logical_plan/expr_visitor.rs +++ b/datafusion/core/src/logical_plan/expr_visitor.rs @@ -106,6 +106,7 @@ impl ExprVisitable for Expr { Expr::Column(_) | Expr::ScalarVariable(_, _) | Expr::Literal(_) + | Expr::Exists(_) | Expr::Wildcard | Expr::QualifiedWildcard { .. } => Ok(visitor), Expr::BinaryExpr { left, right, .. } => { diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs index f131d3389c528..49e90f65546be 100644 --- a/datafusion/core/src/logical_plan/mod.rs +++ b/datafusion/core/src/logical_plan/mod.rs @@ -60,6 +60,6 @@ pub use plan::{ CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition, StringifiedPlan, - TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values, + Subquery, TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values, }; pub use registry::FunctionRegistry; diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs index 89047af444097..08d1fa120e30f 100644 --- a/datafusion/core/src/logical_plan/plan.rs +++ b/datafusion/core/src/logical_plan/plan.rs @@ -28,7 +28,7 @@ pub use crate::logical_expr::{ CreateMemoryTable, CrossJoin, DropTable, EmptyRelation, Explain, Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort, - StringifiedPlan, SubqueryAlias, TableScan, ToStringifiedPlan, Union, + StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values, Window, }, TableProviderFilterPushDown, TableSource, diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs index 39964df4a6635..4a9bf8e913a0e 100644 --- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs @@ -216,6 +216,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result { desc.push_str("InList-"); desc.push_str(&negated.to_string()); } + Expr::Exists(_) => { + desc.push_str("Exists-"); + } Expr::Wildcard => { desc.push_str("Wildcard-"); } diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs index 10bf5d10f9602..5062082e86433 100644 --- a/datafusion/core/src/optimizer/projection_push_down.rs +++ b/datafusion/core/src/optimizer/projection_push_down.rs @@ -466,6 +466,7 @@ fn optimize_plan( | LogicalPlan::Filter { .. } | LogicalPlan::Repartition(_) | LogicalPlan::EmptyRelation(_) + | LogicalPlan::Subquery(_) | LogicalPlan::Values(_) | LogicalPlan::Sort { .. } | LogicalPlan::CreateExternalTable(_) diff --git a/datafusion/core/src/optimizer/simplify_expressions.rs b/datafusion/core/src/optimizer/simplify_expressions.rs index 394ab68bd0c9e..93d9fb506177e 100644 --- a/datafusion/core/src/optimizer/simplify_expressions.rs +++ b/datafusion/core/src/optimizer/simplify_expressions.rs @@ -375,6 +375,7 @@ impl<'a> ConstEvaluator<'a> { | Expr::AggregateUDF { .. } | Expr::ScalarVariable(_, _) | Expr::Column(_) + | Expr::Exists(_) | Expr::WindowFunction { .. } | Expr::Sort { .. } | Expr::Wildcard diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs index 0dab2d3ed7bcb..2a6f643504c7f 100644 --- a/datafusion/core/src/optimizer/utils.rs +++ b/datafusion/core/src/optimizer/utils.rs @@ -19,8 +19,9 @@ use super::optimizer::OptimizerRule; use crate::execution::context::ExecutionProps; -use crate::logical_plan::plan::{ - Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, SubqueryAlias, Window, +use datafusion_expr::logical_plan::{ + Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, Subquery, + SubqueryAlias, Window, }; use crate::logical_plan::{ @@ -84,6 +85,7 @@ impl ExpressionVisitor for ColumnNameVisitor<'_> { | Expr::AggregateFunction { .. } | Expr::AggregateUDF { .. } | Expr::InList { .. } + | Expr::Exists(_) | Expr::Wildcard | Expr::QualifiedWildcard { .. } | Expr::GetIndexedField { .. } => {} @@ -223,6 +225,17 @@ pub fn from_plan( let right = &inputs[1]; LogicalPlanBuilder::from(left).cross_join(right)?.build() } + LogicalPlan::Subquery(Subquery { .. }) => { + // let schema = inputs[0].schema().as_ref().clone().into(); + // let schema = + // DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?); + // Ok(LogicalPlan::Subquery(Subquery { + // subquery + // input: Arc::new(inputs[0].clone()), + // schema, + // })) + Err(DataFusionError::Plan("not implemented".to_string())) + } LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => { let schema = inputs[0].schema().as_ref().clone().into(); let schema = @@ -363,6 +376,7 @@ pub fn expr_sub_expressions(expr: &Expr) -> Result> { } Ok(expr_list) } + Expr::Exists(_) => Ok(vec![]), Expr::Wildcard { .. } => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), @@ -497,6 +511,7 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result { Expr::Column(_) | Expr::Literal(_) | Expr::InList { .. } + | Expr::Exists(_) | Expr::ScalarVariable(_, _) => Ok(expr.clone()), Expr::Sort { asc, nulls_first, .. diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 5b34a65dc9f91..dd95fab0740a0 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -186,6 +186,9 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result { Ok(format!("{} IN ({:?})", expr, list)) } } + Expr::Exists(_) => Err(DataFusionError::NotImplemented( + "EXISTS is not supported in the physical plan".to_string(), + )), Expr::Between { expr, negated, @@ -780,6 +783,7 @@ impl DefaultPhysicalPlanner { let right = self.create_initial_plan(right, session_state).await?; Ok(Arc::new(CrossJoinExec::try_new(left, right)?)) } + LogicalPlan::Subquery(_) => todo!(), LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row, schema, diff --git a/datafusion/core/src/sql/utils.rs b/datafusion/core/src/sql/utils.rs index 5e090c99b56e1..f9242c6aab639 100644 --- a/datafusion/core/src/sql/utils.rs +++ b/datafusion/core/src/sql/utils.rs @@ -368,9 +368,10 @@ where asc: *asc, nulls_first: *nulls_first, }), - Expr::Column { .. } | Expr::Literal(_) | Expr::ScalarVariable(_, _) => { - Ok(expr.clone()) - } + Expr::Column { .. } + | Expr::Literal(_) + | Expr::ScalarVariable(_, _) + | Expr::Exists(_) => Ok(expr.clone()), Expr::Wildcard => Ok(Expr::Wildcard), Expr::QualifiedWildcard { .. } => Ok(expr.clone()), Expr::GetIndexedField { expr, key } => Ok(Expr::GetIndexedField { diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 4bad6e31f39a4..88c489670588a 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -20,6 +20,7 @@ use crate::aggregate_function; use crate::built_in_function; use crate::expr_fn::binary_expr; +use crate::logical_plan::Subquery; use crate::window_frame; use crate::window_function; use crate::AggregateUDF; @@ -226,6 +227,8 @@ pub enum Expr { /// Whether the expression is negated negated: bool, }, + /// EXISTS subquery + Exists(Subquery), /// Represents a reference to all fields in a schema. Wildcard, /// Represents a reference to all fields in a specific schema. @@ -431,6 +434,7 @@ impl fmt::Debug for Expr { Expr::Negative(expr) => write!(f, "(- {:?})", expr), Expr::IsNull(expr) => write!(f, "{:?} IS NULL", expr), Expr::IsNotNull(expr) => write!(f, "{:?} IS NOT NULL", expr), + Expr::Exists(subquery) => write!(f, "EXISTS ({:?})", subquery), Expr::BinaryExpr { left, op, right } => { write!(f, "{:?} {} {:?}", left, op, right) } @@ -618,6 +622,7 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { let expr = create_name(expr, input_schema)?; Ok(format!("{} IS NOT NULL", expr)) } + Expr::Exists(_) => Ok("EXISTS".to_string()), Expr::GetIndexedField { expr, key } => { let expr = create_name(expr, input_schema)?; Ok(format!("{}[{}]", expr, key)) diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index a216281b3c05a..4c6457962fd36 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -100,6 +100,7 @@ impl ExprSchemable for Expr { } Expr::Not(_) | Expr::IsNull(_) + | Expr::Exists(_) | Expr::Between { .. } | Expr::InList { .. } | Expr::IsNotNull(_) => Ok(DataType::Boolean), @@ -172,7 +173,7 @@ impl ExprSchemable for Expr { | Expr::WindowFunction { .. } | Expr::AggregateFunction { .. } | Expr::AggregateUDF { .. } => Ok(true), - Expr::IsNull(_) | Expr::IsNotNull(_) => Ok(false), + Expr::IsNull(_) | Expr::IsNotNull(_) | Expr::Exists(_) => Ok(false), Expr::BinaryExpr { ref left, ref right, diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 525740274bdb1..a37729f7dc5f6 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -23,7 +23,7 @@ pub use plan::{ Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CrossJoin, DropTable, EmptyRelation, Explain, Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, - PlanVisitor, Projection, Repartition, Sort, StringifiedPlan, SubqueryAlias, + PlanVisitor, Projection, Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Values, Window, }; diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 8fefac67df5cf..dc949e14155e0 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -22,7 +22,8 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::{Column, DFSchemaRef, DataFusionError}; use std::collections::HashSet; ///! Logical plan types -use std::fmt::{self, Display, Formatter}; +use std::fmt::{self, Debug, Display, Formatter}; +use std::hash::{Hash, Hasher}; use std::sync::Arc; /// A LogicalPlan represents the different types of relational @@ -66,6 +67,8 @@ pub enum LogicalPlan { TableScan(TableScan), /// Produces no rows: An empty relation with an empty schema EmptyRelation(EmptyRelation), + /// Subquery + Subquery(Subquery), /// Aliased relation provides, or changes, the name of a relation. SubqueryAlias(SubqueryAlias), /// Produces the first `n` tuples from its input and discards the rest. @@ -112,6 +115,7 @@ impl LogicalPlan { LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema, LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(), LogicalPlan::Limit(Limit { input, .. }) => input.schema(), + LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(), LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema, LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => { schema @@ -161,6 +165,7 @@ impl LogicalPlan { schemas.insert(0, schema); schemas } + LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.all_schemas(), LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => { vec![schema] } @@ -225,6 +230,7 @@ impl LogicalPlan { // plans without expressions LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation(_) + | LogicalPlan::Subquery(_) | LogicalPlan::SubqueryAlias(_) | LogicalPlan::Limit(_) | LogicalPlan::CreateExternalTable(_) @@ -254,6 +260,7 @@ impl LogicalPlan { LogicalPlan::Join(Join { left, right, .. }) => vec![left, right], LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![left, right], LogicalPlan::Limit(Limit { input, .. }) => vec![input], + LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery], LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input], LogicalPlan::Extension(extension) => extension.node.inputs(), LogicalPlan::Union(Union { inputs, .. }) => inputs.iter().collect(), @@ -392,6 +399,9 @@ impl LogicalPlan { true } LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?, + LogicalPlan::Subquery(Subquery { subquery, .. }) => { + subquery.accept(visitor)? + } LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => { input.accept(visitor)? } @@ -766,6 +776,9 @@ impl LogicalPlan { } }, LogicalPlan::Limit(Limit { ref n, .. }) => write!(f, "Limit: {}", n), + LogicalPlan::Subquery(Subquery { subquery, .. }) => { + write!(f, "Subquery: {:?}", subquery) + } LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => { write!(f, "SubqueryAlias: {}", alias) } @@ -1141,6 +1154,42 @@ pub struct Join { pub null_equals_null: bool, } +/// Subquery +#[derive(Clone)] +pub struct Subquery { + /// The subquery + pub subquery: Arc, +} + +impl Debug for Subquery { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "Subquery: {:?}", self.subquery) + } +} + +impl Hash for Subquery { + fn hash(&self, state: &mut H) { + state.finish(); + } + + fn hash_slice(_data: &[Self], state: &mut H) + where + Self: Sized, + { + state.finish(); + } +} + +impl PartialEq for Subquery { + fn eq(&self, _other: &Self) -> bool { + false + } + + fn ne(&self, _other: &Self) -> bool { + true + } +} + /// Logical partitioning schemes supported by the repartition operator. #[derive(Debug, Clone)] pub enum Partitioning { From b2851fba6f32c90421e34f258aff756e735f1fe7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 25 Apr 2022 15:27:32 -0600 Subject: [PATCH 2/7] code cleanup --- datafusion/core/src/logical_plan/builder.rs | 13 ++----------- datafusion/expr/src/expr_fn.rs | 11 ++++++++++- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index 469f8109071c6..e284de9d06100 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -51,7 +51,6 @@ use crate::logical_plan::{ }; use crate::sql::utils::group_window_expr_by_sort_keys; use datafusion_common::ToDFSchema; -use datafusion_expr::logical_plan::Subquery; /// Default table name for unnamed table pub const UNNAMED_TABLE: &str = "?table?"; @@ -532,13 +531,6 @@ impl LogicalPlanBuilder { }))) } - /// Apply an EXISTS subquery expression - pub fn exists(&self, subquery: &LogicalPlan) -> Result { - Ok(Self::from(LogicalPlan::Subquery(Subquery { - subquery: Arc::new(subquery.clone()), - }))) - } - /// Add missing sort columns to all downstream projection fn add_missing_columns( &self, @@ -1210,6 +1202,7 @@ pub(crate) fn expand_qualified_wildcard( #[cfg(test)] mod tests { use arrow::datatypes::{DataType, Field}; + use datafusion_expr::expr_fn::exists; use crate::logical_plan::StringifiedPlan; use crate::test::test_table_scan_with_name; @@ -1360,9 +1353,7 @@ mod tests { let outer_query = LogicalPlanBuilder::from(bar) .project(vec![col("a")])? - .filter(Expr::Exists(Subquery { - subquery: Arc::new(subquery), - }))? + .filter(exists(subquery))? .build()?; let expected = "Filter: EXISTS (\ diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index a723f53062fd6..43e3b71b79186 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -18,7 +18,9 @@ //! Functions for creating logical expressions use crate::conditional_expressions::CaseBuilder; -use crate::{aggregate_function, built_in_function, lit, Expr, Operator}; +use crate::logical_plan::Subquery; +use crate::{aggregate_function, built_in_function, lit, Expr, LogicalPlan, Operator}; +use std::sync::Arc; /// Create a column expression based on a qualified or unqualified column name pub fn col(ident: &str) -> Expr { @@ -180,6 +182,13 @@ pub fn approx_percentile_cont_with_weight( } } +/// Create an EXISTS subquery expression +pub fn exists(subquery: LogicalPlan) -> Expr { + Expr::Exists(Subquery { + subquery: Arc::new(subquery), + }) +} + // TODO(kszucs): this seems buggy, unary_scalar_expr! is used for many // varying arity functions /// Create an convenience function representing a unary scalar function From 99dd3d72191bce438a897d2b0c013590a57fb766 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 25 Apr 2022 16:02:27 -0600 Subject: [PATCH 3/7] Update datafusion/core/src/physical_plan/planner.rs Co-authored-by: Andrew Lamb --- datafusion/core/src/physical_plan/planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index dd95fab0740a0..84785777b016c 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -187,7 +187,7 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result { } } Expr::Exists(_) => Err(DataFusionError::NotImplemented( - "EXISTS is not supported in the physical plan".to_string(), + "EXISTS is not yet supported in the physical plan".to_string(), )), Expr::Between { expr, From 9d2b87eca85fe42b56eb6da291ac8c0ab5eb964f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 25 Apr 2022 16:06:56 -0600 Subject: [PATCH 4/7] Implement from_plan for subquery --- datafusion/core/src/optimizer/utils.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs index 2a6f643504c7f..dc4ecf7ddba15 100644 --- a/datafusion/core/src/optimizer/utils.rs +++ b/datafusion/core/src/optimizer/utils.rs @@ -225,16 +225,12 @@ pub fn from_plan( let right = &inputs[1]; LogicalPlanBuilder::from(left).cross_join(right)?.build() } - LogicalPlan::Subquery(Subquery { .. }) => { - // let schema = inputs[0].schema().as_ref().clone().into(); - // let schema = - // DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?); - // Ok(LogicalPlan::Subquery(Subquery { - // subquery - // input: Arc::new(inputs[0].clone()), - // schema, - // })) - Err(DataFusionError::Plan("not implemented".to_string())) + LogicalPlan::Subquery(Subquery { subquery }) => { + let subquery = + LogicalPlanBuilder::from(inputs[0].as_ref().clone()).build()?; + Ok(LogicalPlan::Subquery(Subquery { + subquery: Arc::new(subquery), + })) } LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => { let schema = inputs[0].schema().as_ref().clone().into(); From b532f4eab669366ae0dd06ee982ab969d13a0bbf Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 25 Apr 2022 16:21:08 -0600 Subject: [PATCH 5/7] address PR feedback --- datafusion/core/src/logical_plan/builder.rs | 2 +- datafusion/core/src/logical_plan/mod.rs | 16 ++++++++-------- datafusion/core/src/optimizer/utils.rs | 5 ++--- datafusion/core/src/prelude.rs | 2 +- datafusion/expr/src/expr_fn.rs | 6 ++---- datafusion/expr/src/lib.rs | 10 +++++----- 6 files changed, 19 insertions(+), 22 deletions(-) diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index e284de9d06100..c303cdec8639f 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -1353,7 +1353,7 @@ mod tests { let outer_query = LogicalPlanBuilder::from(bar) .project(vec![col("a")])? - .filter(exists(subquery))? + .filter(exists(Arc::new(subquery)))? .build()?; let expected = "Filter: EXISTS (\ diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs index 49e90f65546be..d933b0229b1f2 100644 --- a/datafusion/core/src/logical_plan/mod.rs +++ b/datafusion/core/src/logical_plan/mod.rs @@ -40,14 +40,14 @@ pub use expr::{ abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan, avg, bit_length, btrim, call_fn, case, ceil, character_length, chr, coalesce, col, columnize_expr, combine_filters, concat, concat_expr, concat_ws, concat_ws_expr, cos, - count, count_distinct, create_udaf, create_udf, date_part, date_trunc, digest, exp, - exprlist_to_fields, floor, in_list, initcap, left, length, lit, lit_timestamp_nano, - ln, log10, log2, lower, lpad, ltrim, max, md5, min, now, now_expr, nullif, - octet_length, or, random, regexp_match, regexp_replace, repeat, replace, reverse, - right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum, sin, split_part, - sqrt, starts_with, strpos, substr, sum, tan, to_hex, to_timestamp_micros, - to_timestamp_millis, to_timestamp_seconds, translate, trim, trunc, unalias, upper, - when, Column, Expr, ExprSchema, Literal, + count, count_distinct, create_udaf, create_udf, date_part, date_trunc, digest, + exists, exp, exprlist_to_fields, floor, in_list, initcap, left, length, lit, + lit_timestamp_nano, ln, log10, log2, lower, lpad, ltrim, max, md5, min, now, + now_expr, nullif, octet_length, or, random, regexp_match, regexp_replace, repeat, + replace, reverse, right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum, + sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex, + to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim, + trunc, unalias, upper, when, Column, Expr, ExprSchema, Literal, }; pub use expr_rewriter::{ normalize_col, normalize_cols, replace_col, rewrite_sort_cols_by_aggs, diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs index dc4ecf7ddba15..939af80415627 100644 --- a/datafusion/core/src/optimizer/utils.rs +++ b/datafusion/core/src/optimizer/utils.rs @@ -225,9 +225,8 @@ pub fn from_plan( let right = &inputs[1]; LogicalPlanBuilder::from(left).cross_join(right)?.build() } - LogicalPlan::Subquery(Subquery { subquery }) => { - let subquery = - LogicalPlanBuilder::from(inputs[0].as_ref().clone()).build()?; + LogicalPlan::Subquery(_) => { + let subquery = LogicalPlanBuilder::from(inputs[0].clone()).build()?; Ok(LogicalPlan::Subquery(Subquery { subquery: Arc::new(subquery), })) diff --git a/datafusion/core/src/prelude.rs b/datafusion/core/src/prelude.rs index 18343bce7430d..e0c418417c5c4 100644 --- a/datafusion/core/src/prelude.rs +++ b/datafusion/core/src/prelude.rs @@ -33,7 +33,7 @@ pub use crate::execution::options::{ pub use crate::logical_plan::{ approx_percentile_cont, array, ascii, avg, bit_length, btrim, character_length, chr, coalesce, col, concat, concat_ws, count, create_udf, date_part, date_trunc, digest, - in_list, initcap, left, length, lit, lower, lpad, ltrim, max, md5, min, now, + exists, in_list, initcap, left, length, lit, lower, lpad, ltrim, max, md5, min, now, octet_length, random, regexp_match, regexp_replace, repeat, replace, reverse, right, rpad, rtrim, sha224, sha256, sha384, sha512, split_part, starts_with, strpos, substr, sum, to_hex, translate, trim, upper, Column, JoinType, Partitioning, diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 43e3b71b79186..19c311f4fa45e 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -183,10 +183,8 @@ pub fn approx_percentile_cont_with_weight( } /// Create an EXISTS subquery expression -pub fn exists(subquery: LogicalPlan) -> Expr { - Expr::Exists(Subquery { - subquery: Arc::new(subquery), - }) +pub fn exists(subquery: Arc) -> Expr { + Expr::Exists(Subquery { subquery }) } // TODO(kszucs): this seems buggy, unary_scalar_expr! is used for many diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index b1e8220776051..3dd24600a20cd 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -48,11 +48,11 @@ pub use expr_fn::{ abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan, avg, bit_length, btrim, case, ceil, character_length, chr, coalesce, col, concat, concat_expr, concat_ws, concat_ws_expr, cos, count, count_distinct, date_part, - date_trunc, digest, exp, floor, in_list, initcap, left, length, ln, log10, log2, - lower, lpad, ltrim, max, md5, min, now, now_expr, nullif, octet_length, or, random, - regexp_match, regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim, - sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, - substr, sum, tan, to_hex, to_timestamp_micros, to_timestamp_millis, + date_trunc, digest, exists, exp, floor, in_list, initcap, left, length, ln, log10, + log2, lower, lpad, ltrim, max, md5, min, now, now_expr, nullif, octet_length, or, + random, regexp_match, regexp_replace, repeat, replace, reverse, right, round, rpad, + rtrim, sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, starts_with, + strpos, substr, sum, tan, to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim, trunc, upper, when, }; pub use expr_schema::ExprSchemable; From 88d956b87b3f15aaca6b32c2686690d708dab5ac Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 25 Apr 2022 17:05:02 -0600 Subject: [PATCH 6/7] clippy --- datafusion/expr/src/logical_plan/plan.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index dc949e14155e0..bc97528d7cfbb 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1181,13 +1181,9 @@ impl Hash for Subquery { } impl PartialEq for Subquery { - fn eq(&self, _other: &Self) -> bool { + fn eq(&self, other: &Self) -> bool { false } - - fn ne(&self, _other: &Self) -> bool { - true - } } /// Logical partitioning schemes supported by the repartition operator. From 2ed348ad4d89df7623889114a0f22030e8d3a34e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 25 Apr 2022 17:05:10 -0600 Subject: [PATCH 7/7] clippy --- datafusion/expr/src/logical_plan/plan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index bc97528d7cfbb..579898dbe207a 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1181,7 +1181,7 @@ impl Hash for Subquery { } impl PartialEq for Subquery { - fn eq(&self, other: &Self) -> bool { + fn eq(&self, _other: &Self) -> bool { false } }