From 64b81fc5c5becf15485238859f51c59efc724df6 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 28 Jul 2016 21:41:20 -0700 Subject: [PATCH 1/6] [SPARK-16771][SQL] WITH clause should not fall into infinite loop. --- .../sql/catalyst/analysis/Analyzer.scala | 24 +++++++------- .../sql/catalyst/parser/AstBuilder.scala | 2 +- .../plans/logical/basicLogicalOperators.scala | 6 ++-- .../sql/catalyst/parser/PlanParserSuite.scala | 2 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 31 +++++++++++++++++++ 5 files changed, 48 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 25202b521ac5d..5f144d6f8d322 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -125,21 +125,23 @@ class Analyzer( object CTESubstitution extends Rule[LogicalPlan] { // TODO allow subquery to define CTE def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case With(child, relations) => substituteCTE(child, relations) + case With(child, relations) if relations.nonEmpty => + var resolvedRelations = Seq(relations.head._1 -> ResolveRelations(relations.head._2)) + relations.tail.foreach { case (name, relation) => + resolvedRelations = resolvedRelations ++ + Seq(name -> ResolveRelations(substituteCTE(relation, resolvedRelations))) + } + substituteCTE(child, resolvedRelations) case other => other } - def substituteCTE(plan: LogicalPlan, cteRelations: Map[String, LogicalPlan]): LogicalPlan = { - plan transform { - // In hive, if there is same table name in database and CTE definition, - // hive will use the table in database, not the CTE one. - // Taking into account the reasonableness and the implementation complexity, - // here use the CTE definition first, check table name only and ignore database name - // see https://github.com/apache/spark/pull/4929#discussion_r27186638 for more info + def substituteCTE(plan: LogicalPlan, cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan = { + plan transformDown { case u : UnresolvedRelation => - val substituted = cteRelations.get(u.tableIdentifier.table).map { relation => - val withAlias = u.alias.map(SubqueryAlias(_, relation)) - withAlias.getOrElse(relation) + val substituted = cteRelations.find(_._1 == u.tableIdentifier.table).map(_._2).map { + relation => + val withAlias = u.alias.map(SubqueryAlias(_, relation)) + withAlias.getOrElse(relation) } substituted.getOrElse(u) case other => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index c7fdc287d1995..25c8445b4d33f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -97,7 +97,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { } // Check for duplicate names. checkDuplicateKeys(ctes, ctx) - With(query, ctes.toMap) + With(query, ctes) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index eb612c4c12c75..f7e683ba7dd69 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -392,11 +392,9 @@ case class InsertIntoTable( * This operator will be removed during analysis and the relations will be substituted into child. * * @param child The final query of this CTE. - * @param cteRelations Queries that this CTE defined, - * key is the alias of the CTE definition, - * value is the CTE definition. + * @param cteRelations A sequence of pair (alias, the CTE definition) that this CTE defined */ -case class With(child: LogicalPlan, cteRelations: Map[String, SubqueryAlias]) extends UnaryNode { +case class With(child: LogicalPlan, cteRelations: Seq[(String, SubqueryAlias)]) extends UnaryNode { override def output: Seq[Attribute] = child.output } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 00a37cf6360ae..34d52c75e0af2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -81,7 +81,7 @@ class PlanParserSuite extends PlanTest { val ctes = namedPlans.map { case (name, cte) => name -> SubqueryAlias(name, cte) - }.toMap + } With(plan, ctes) } assertEqual( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index c3f27f80f8ad6..2c2206803a69c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2888,4 +2888,35 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { data.selectExpr("`part.col1`", "`col.1`")) } } + + test("SPARK-16771: WITH clause should not fall into infinite loop") { + val m = intercept[AnalysisException] { + sql("WITH t AS (SELECT 1 FROM t) SELECT * FROM t") + }.getMessage + assert(m.contains("Table or view not found: t")) + + spark.range(3).createOrReplaceTempView("t") + checkAnswer(sql("WITH t AS (SELECT 1 FROM t) SELECT * FROM t"), + Row(1) :: Row(1) :: Row(1) :: Nil) + } + + test("SPARK-16771: WITH clauses should not fall into infinite loop") { + val m = intercept[AnalysisException] { + sql("WITH t1 AS (SELECT 1 FROM t2), t2 AS (SELECT 1 FROM t1) SELECT * FROM t1, t2") + }.getMessage + assert(m.contains("Table or view not found: t2")) + + spark.range(2).createOrReplaceTempView("t2") + checkAnswer(sql("WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1, t2"), + Row(0, 2) :: Row(1, 2) :: Row(0, 2) :: Row(1, 2) :: Nil) + } + + test("current_date and current_timestamp literals") { + // NOTE that I am comparing the result of the literal with the result of the function call. + // This is done to prevent the test from failing because we are comparing a result to an out + // dated timestamp (quite likely) or date (very unlikely - but equally annoying). + checkAnswer( + sql("select current_date = current_date(), current_timestamp = current_timestamp()"), + Seq(Row(true, true))) + } } From 788e136ebf2240046c8ccb8eea61d9b9585c90b5 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 10 Aug 2016 03:27:14 -0700 Subject: [PATCH 2/6] Address comments. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 14 +++++--------- .../plans/logical/basicLogicalOperators.scala | 1 + 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5f144d6f8d322..c459413036043 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -126,23 +126,19 @@ class Analyzer( // TODO allow subquery to define CTE def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case With(child, relations) if relations.nonEmpty => - var resolvedRelations = Seq(relations.head._1 -> ResolveRelations(relations.head._2)) - relations.tail.foreach { case (name, relation) => - resolvedRelations = resolvedRelations ++ - Seq(name -> ResolveRelations(substituteCTE(relation, resolvedRelations))) - } - substituteCTE(child, resolvedRelations) + substituteCTE(child, relations.foldLeft(Seq.empty[(String, LogicalPlan)])((resolved, r) => + resolved ++ Seq(r._1 -> ResolveRelations(substituteCTE(r._2, resolved))))) case other => other } def substituteCTE(plan: LogicalPlan, cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan = { plan transformDown { case u : UnresolvedRelation => - val substituted = cteRelations.find(_._1 == u.tableIdentifier.table).map(_._2).map { - relation => + val substituted = cteRelations.find(x => resolver(x._1, u.tableIdentifier.table)) + .map(_._2).map { relation => val withAlias = u.alias.map(SubqueryAlias(_, relation)) withAlias.getOrElse(relation) - } + } substituted.getOrElse(u) case other => // This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index f7e683ba7dd69..2917d8d2a97aa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -393,6 +393,7 @@ case class InsertIntoTable( * * @param child The final query of this CTE. * @param cteRelations A sequence of pair (alias, the CTE definition) that this CTE defined + * Each CTE can see the base tables and the previously defined CTEs only. */ case class With(child: LogicalPlan, cteRelations: Seq[(String, SubqueryAlias)]) extends UnaryNode { override def output: Seq[Attribute] = child.output From fd27b04fc45924fcd41d8af6bfeda4c2cb3bd9d3 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 10 Aug 2016 04:29:43 -0700 Subject: [PATCH 3/6] WITH clause should be removed. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c459413036043..f738ebcd10bec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -125,9 +125,11 @@ class Analyzer( object CTESubstitution extends Rule[LogicalPlan] { // TODO allow subquery to define CTE def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case With(child, relations) if relations.nonEmpty => - substituteCTE(child, relations.foldLeft(Seq.empty[(String, LogicalPlan)])((resolved, r) => - resolved ++ Seq(r._1 -> ResolveRelations(substituteCTE(r._2, resolved))))) + case With(child, relations) => + substituteCTE(child, relations.foldLeft(Seq.empty[(String, LogicalPlan)]) { + case (resolved, (name, relation)) => + resolved :+ name -> ResolveRelations(substituteCTE(relation, resolved)) + }) case other => other } From ba45b1bd6ac80de8e30defd565af40ce617548e5 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 10 Aug 2016 21:13:52 -0700 Subject: [PATCH 4/6] Remove testcase removed from master. --- .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 9 --------- 1 file changed, 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 2c2206803a69c..f8058963e53be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2910,13 +2910,4 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkAnswer(sql("WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1, t2"), Row(0, 2) :: Row(1, 2) :: Row(0, 2) :: Row(1, 2) :: Nil) } - - test("current_date and current_timestamp literals") { - // NOTE that I am comparing the result of the literal with the result of the function call. - // This is done to prevent the test from failing because we are comparing a result to an out - // dated timestamp (quite likely) or date (very unlikely - but equally annoying). - checkAnswer( - sql("select current_date = current_date(), current_timestamp = current_timestamp()"), - Seq(Row(true, true))) - } } From 7f74ec7745534b40da073c399a6439c9b03a5086 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 11 Aug 2016 01:02:38 -0700 Subject: [PATCH 5/6] Move testcase into `sql-tests` --- .../test/resources/sql-tests/inputs/with.sql | 14 +++++ .../resources/sql-tests/results/with.sql.out | 57 +++++++++++++++++++ .../org/apache/spark/sql/SQLQuerySuite.scala | 22 ------- 3 files changed, 71 insertions(+), 22 deletions(-) create mode 100644 sql/core/src/test/resources/sql-tests/inputs/with.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/with.sql.out diff --git a/sql/core/src/test/resources/sql-tests/inputs/with.sql b/sql/core/src/test/resources/sql-tests/inputs/with.sql new file mode 100644 index 0000000000000..10d34deff4ee3 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/with.sql @@ -0,0 +1,14 @@ +create temporary view t as select * from values 0, 1, 2 as t(id); +create temporary view t2 as select * from values 0, 1 as t(id); + +-- WITH clause should not fall into infinite loop by referencing self +WITH s AS (SELECT 1 FROM s) SELECT * FROM s; + +-- WITH clause should reference the base table +WITH t AS (SELECT 1 FROM t) SELECT * FROM t; + +-- WITH clause should not allow cross reference +WITH s1 AS (SELECT 1 FROM s2), s2 AS (SELECT 1 FROM s1) SELECT * FROM s1, s2; + +-- WITH clause should reference the previous CTE +WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1, t2; diff --git a/sql/core/src/test/resources/sql-tests/results/with.sql.out b/sql/core/src/test/resources/sql-tests/results/with.sql.out new file mode 100644 index 0000000000000..ddee5bf2d473b --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/with.sql.out @@ -0,0 +1,57 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 6 + + +-- !query 0 +create temporary view t as select * from values 0, 1, 2 as t(id) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +create temporary view t2 as select * from values 0, 1 as t(id) +-- !query 1 schema +struct<> +-- !query 1 output + + + +-- !query 2 +WITH s AS (SELECT 1 FROM s) SELECT * FROM s +-- !query 2 schema +struct<> +-- !query 2 output +org.apache.spark.sql.AnalysisException +Table or view not found: s; line 1 pos 25 + + +-- !query 3 +WITH t AS (SELECT 1 FROM t) SELECT * FROM t +-- !query 3 schema +struct<1:int> +-- !query 3 output +1 +1 +1 + + +-- !query 4 +WITH s1 AS (SELECT 1 FROM s2), s2 AS (SELECT 1 FROM s1) SELECT * FROM s1, s2 +-- !query 4 schema +struct<> +-- !query 4 output +org.apache.spark.sql.AnalysisException +Table or view not found: s2; line 1 pos 26 + + +-- !query 5 +WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1, t2 +-- !query 5 schema +struct +-- !query 5 output +0 2 +0 2 +1 2 +1 2 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index f8058963e53be..c3f27f80f8ad6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2888,26 +2888,4 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { data.selectExpr("`part.col1`", "`col.1`")) } } - - test("SPARK-16771: WITH clause should not fall into infinite loop") { - val m = intercept[AnalysisException] { - sql("WITH t AS (SELECT 1 FROM t) SELECT * FROM t") - }.getMessage - assert(m.contains("Table or view not found: t")) - - spark.range(3).createOrReplaceTempView("t") - checkAnswer(sql("WITH t AS (SELECT 1 FROM t) SELECT * FROM t"), - Row(1) :: Row(1) :: Row(1) :: Nil) - } - - test("SPARK-16771: WITH clauses should not fall into infinite loop") { - val m = intercept[AnalysisException] { - sql("WITH t1 AS (SELECT 1 FROM t2), t2 AS (SELECT 1 FROM t1) SELECT * FROM t1, t2") - }.getMessage - assert(m.contains("Table or view not found: t2")) - - spark.range(2).createOrReplaceTempView("t2") - checkAnswer(sql("WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1, t2"), - Row(0, 2) :: Row(1, 2) :: Row(0, 2) :: Row(1, 2) :: Nil) - } } From f1f4a833276c5b78309546beca9dd7a5d86590f8 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 11 Aug 2016 14:19:25 -0700 Subject: [PATCH 6/6] Rename files. --- .../src/test/resources/sql-tests/inputs/{with.sql => cte.sql} | 0 .../resources/sql-tests/results/{with.sql.out => cte.sql.out} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename sql/core/src/test/resources/sql-tests/inputs/{with.sql => cte.sql} (100%) rename sql/core/src/test/resources/sql-tests/results/{with.sql.out => cte.sql.out} (100%) diff --git a/sql/core/src/test/resources/sql-tests/inputs/with.sql b/sql/core/src/test/resources/sql-tests/inputs/cte.sql similarity index 100% rename from sql/core/src/test/resources/sql-tests/inputs/with.sql rename to sql/core/src/test/resources/sql-tests/inputs/cte.sql diff --git a/sql/core/src/test/resources/sql-tests/results/with.sql.out b/sql/core/src/test/resources/sql-tests/results/cte.sql.out similarity index 100% rename from sql/core/src/test/resources/sql-tests/results/with.sql.out rename to sql/core/src/test/resources/sql-tests/results/cte.sql.out