From be7d8e7fb8439e3bb3238269263a37556e6bf9b1 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Sun, 2 Sep 2018 19:56:18 +0200 Subject: [PATCH 1/8] [SPARK-25150][SQL] Fix attribute deduplication in join --- .../sql/catalyst/analysis/Analyzer.scala | 29 ++++++++++++++----- .../catalyst/expressions/AttributeMap.scala | 2 ++ .../apache/spark/sql/DataFrameJoinSuite.scala | 13 +++++++++ 3 files changed, 36 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 580133dd971b1..0276afa98e44c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -758,7 +758,9 @@ class Analyzer( * Generate a new logical plan for the right child with different expression IDs * for all conflicting attributes. */ - private def dedupRight (left: LogicalPlan, right: LogicalPlan): LogicalPlan = { + private def dedupRight( + left: LogicalPlan, + right: LogicalPlan): (LogicalPlan, AttributeMap[Attribute]) = { val conflictingAttributes = left.outputSet.intersect(right.outputSet) logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} " + s"between $left and $right") @@ -805,10 +807,10 @@ class Analyzer( * that this rule cannot handle. When that is the case, there must be another rule * that resolves these conflicts. Otherwise, the analysis will fail. */ - right + (right, AttributeMap.empty[Attribute]) case Some((oldRelation, newRelation)) => val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) - right transformUp { + (right transformUp { case r if r == oldRelation => newRelation } transformUp { case other => other transformExpressions { @@ -817,7 +819,7 @@ class Analyzer( case s: SubqueryExpression => s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites)) } - } + }, attributeRewrites) } } @@ -895,6 +897,13 @@ class Analyzer( case _ => e.mapChildren(resolve(_, q)) } + private def rewriteJoinCondition( + e: Expression, + attributeRewrites: AttributeMap[Attribute]): Expression = e match { + case a: Attribute => attributeRewrites.getOrElse(a, a) + case _ => e.mapChildren(rewriteJoinCondition(_, attributeRewrites)) + } + def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case p: LogicalPlan if !p.childrenResolved => p @@ -921,12 +930,16 @@ class Analyzer( failAnalysis("Invalid usage of '*' in explode/json_tuple/UDTF") // To resolve duplicate expression IDs for Join and Intersect - case j @ Join(left, right, _, _) if !j.duplicateResolved => - j.copy(right = dedupRight(left, right)) + case j @ Join(left, right, _, condition) if !j.duplicateResolved => + val (dedupedRight, attributeRewrites) = dedupRight(left, right) + val changedCondition = condition.map(rewriteJoinCondition(_, attributeRewrites)) + j.copy(right = dedupedRight, condition = changedCondition) case i @ Intersect(left, right, _) if !i.duplicateResolved => - i.copy(right = dedupRight(left, right)) + val (dedupedRight, _) = dedupRight(left, right) + i.copy(right = dedupedRight) case e @ Except(left, right, _) if !e.duplicateResolved => - e.copy(right = dedupRight(left, right)) + val (dedupedRight, _) = dedupRight(left, right) + e.copy(right = dedupedRight) // When resolve `SortOrder`s in Sort based on child, don't report errors as // we still have chance to resolve it based on its descendants case s @ Sort(ordering, global, child) if child.resolved && !s.resolved => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala index 9f4a0f2b7017a..63266c17496f4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala @@ -26,6 +26,8 @@ object AttributeMap { def apply[A](kvs: Seq[(Attribute, A)]): AttributeMap[A] = { new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap) } + + def empty[A](): AttributeMap[A] = new AttributeMap(Map.empty) } class AttributeMap[A](val baseMap: Map[ExprId, (Attribute, A)]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index e6b30f9956daf..78bd33c4d44b7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -295,4 +295,17 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan } } + + test("SPARK-25150: Attribute deduplication handles attributes in join condition properly") { + withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "false") { + val a = spark.range(1, 5) + val b = spark.range(10) + val c = b.filter($"id" % 2 === 0) + + val r = a.join(b, a("id") === b("id"), "inner").join(c, a("id") === c("id"), "inner") + + checkAnswer(r, Row(2, 2, 2) :: Row(4, 4, 4) :: Nil) + } + } + } From 8e58345671aadc4ae1e2598423511db6bbaabc91 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Mon, 3 Sep 2018 13:45:36 +0200 Subject: [PATCH 2/8] [SPARK-25150][SQL] Fix review findings Change-Id: I52ac5aa76e821f5d74cfa108e0b8269665eb081d --- .../org/apache/spark/sql/DataFrameJoinSuite.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 78bd33c4d44b7..970e00c50a3fe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -297,15 +297,12 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { } test("SPARK-25150: Attribute deduplication handles attributes in join condition properly") { - withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "false") { - val a = spark.range(1, 5) - val b = spark.range(10) - val c = b.filter($"id" % 2 === 0) + val a = spark.range(1, 5) + val b = spark.range(10) + val c = b.filter($"id" % 2 === 0) - val r = a.join(b, a("id") === b("id"), "inner").join(c, a("id") === c("id"), "inner") + val r = a.join(b, a("id") === b("id"), "inner").join(c, a("id") === c("id"), "inner") - checkAnswer(r, Row(2, 2, 2) :: Row(4, 4, 4) :: Nil) - } + checkAnswer(r, Row(2, 2, 2) :: Row(4, 4, 4) :: Nil) } - } From d6e316a92cc4283f52f9cf141fe57bcece2cdf6b Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Mon, 3 Sep 2018 18:27:14 +0200 Subject: [PATCH 3/8] [SPARK-25150][SQL] Fix review findings 2 --- .../sql/catalyst/analysis/Analyzer.scala | 21 +++++++++---------- .../catalyst/expressions/AttributeMap.scala | 2 -- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0276afa98e44c..c049d93755ffb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -754,6 +754,9 @@ class Analyzer( * a logical plan node's children. */ object ResolveReferences extends Rule[LogicalPlan] { + + private val emptyAttrMap = new AttributeMap[Attribute](Map.empty) + /** * Generate a new logical plan for the right child with different expression IDs * for all conflicting attributes. @@ -807,10 +810,10 @@ class Analyzer( * that this rule cannot handle. When that is the case, there must be another rule * that resolves these conflicts. Otherwise, the analysis will fail. */ - (right, AttributeMap.empty[Attribute]) + (right, emptyAttrMap) case Some((oldRelation, newRelation)) => val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) - (right transformUp { + val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { case other => other transformExpressions { @@ -819,7 +822,8 @@ class Analyzer( case s: SubqueryExpression => s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites)) } - }, attributeRewrites) + } + (newRight, attributeRewrites) } } @@ -897,13 +901,6 @@ class Analyzer( case _ => e.mapChildren(resolve(_, q)) } - private def rewriteJoinCondition( - e: Expression, - attributeRewrites: AttributeMap[Attribute]): Expression = e match { - case a: Attribute => attributeRewrites.getOrElse(a, a) - case _ => e.mapChildren(rewriteJoinCondition(_, attributeRewrites)) - } - def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case p: LogicalPlan if !p.childrenResolved => p @@ -932,7 +929,9 @@ class Analyzer( // To resolve duplicate expression IDs for Join and Intersect case j @ Join(left, right, _, condition) if !j.duplicateResolved => val (dedupedRight, attributeRewrites) = dedupRight(left, right) - val changedCondition = condition.map(rewriteJoinCondition(_, attributeRewrites)) + val changedCondition = condition.map(_.transform { + case attr: Attribute if attr.resolved => attributeRewrites.getOrElse(attr, attr) + }) j.copy(right = dedupedRight, condition = changedCondition) case i @ Intersect(left, right, _) if !i.duplicateResolved => val (dedupedRight, _) = dedupRight(left, right) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala index 63266c17496f4..9f4a0f2b7017a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala @@ -26,8 +26,6 @@ object AttributeMap { def apply[A](kvs: Seq[(Attribute, A)]): AttributeMap[A] = { new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap) } - - def empty[A](): AttributeMap[A] = new AttributeMap(Map.empty) } class AttributeMap[A](val baseMap: Map[ExprId, (Attribute, A)]) From d56f33cea56f5ad000e6638c7645a1d13e85eb55 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 5 Sep 2018 14:47:37 +0200 Subject: [PATCH 4/8] [SPARK-25150][SQL] Fix review findings 3 --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 +--- .../apache/spark/sql/catalyst/expressions/AttributeMap.scala | 4 +++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c049d93755ffb..51e099f3604e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -755,8 +755,6 @@ class Analyzer( */ object ResolveReferences extends Rule[LogicalPlan] { - private val emptyAttrMap = new AttributeMap[Attribute](Map.empty) - /** * Generate a new logical plan for the right child with different expression IDs * for all conflicting attributes. @@ -810,7 +808,7 @@ class Analyzer( * that this rule cannot handle. When that is the case, there must be another rule * that resolves these conflicts. Otherwise, the analysis will fail. */ - (right, emptyAttrMap) + (right, AttributeMap.empty) case Some((oldRelation, newRelation)) => val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) val newRight = right transformUp { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala index 9f4a0f2b7017a..f770b1d5670e8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala @@ -23,12 +23,14 @@ package org.apache.spark.sql.catalyst.expressions * of the name, or the expected nullability). */ object AttributeMap { + var empty = new AttributeMap(Map.empty) + def apply[A](kvs: Seq[(Attribute, A)]): AttributeMap[A] = { new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap) } } -class AttributeMap[A](val baseMap: Map[ExprId, (Attribute, A)]) +class AttributeMap[+A](val baseMap: Map[ExprId, (Attribute, A)]) extends Map[Attribute, A] with Serializable { override def get(k: Attribute): Option[A] = baseMap.get(k.exprId).map(_._2) From 809b8a83b7ec3d62ba6d65f6aff6a7d3175bd3e3 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 5 Sep 2018 15:41:26 +0200 Subject: [PATCH 5/8] [SPARK-25150][SQL] Fix review findings 4 Change-Id: I928e9a56410c00c4fbbd11c8e91b7993a4bc1878 --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 1 - .../apache/spark/sql/catalyst/expressions/AttributeMap.scala | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 51e099f3604e0..2f3e486d46245 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -754,7 +754,6 @@ class Analyzer( * a logical plan node's children. */ object ResolveReferences extends Rule[LogicalPlan] { - /** * Generate a new logical plan for the right child with different expression IDs * for all conflicting attributes. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala index f770b1d5670e8..e9ed5f0dbe064 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala @@ -23,7 +23,7 @@ package org.apache.spark.sql.catalyst.expressions * of the name, or the expected nullability). */ object AttributeMap { - var empty = new AttributeMap(Map.empty) + val empty = new AttributeMap(Map.empty) def apply[A](kvs: Seq[(Attribute, A)]): AttributeMap[A] = { new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap) From 8da937ba788dcef393eac8a2c038a4d76dd406f8 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 6 Sep 2018 12:03:10 +0200 Subject: [PATCH 6/8] Revert "[SPARK-25150][SQL] Fix review findings 4" This reverts commit 809b8a83b7ec3d62ba6d65f6aff6a7d3175bd3e3. --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 1 + .../apache/spark/sql/catalyst/expressions/AttributeMap.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2f3e486d46245..51e099f3604e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -754,6 +754,7 @@ class Analyzer( * a logical plan node's children. */ object ResolveReferences extends Rule[LogicalPlan] { + /** * Generate a new logical plan for the right child with different expression IDs * for all conflicting attributes. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala index e9ed5f0dbe064..f770b1d5670e8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala @@ -23,7 +23,7 @@ package org.apache.spark.sql.catalyst.expressions * of the name, or the expected nullability). */ object AttributeMap { - val empty = new AttributeMap(Map.empty) + var empty = new AttributeMap(Map.empty) def apply[A](kvs: Seq[(Attribute, A)]): AttributeMap[A] = { new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap) From 3f15aab0dcb5024f120004fe0adac4d12875d356 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 6 Sep 2018 12:03:16 +0200 Subject: [PATCH 7/8] Revert "[SPARK-25150][SQL] Fix review findings 3" This reverts commit d56f33cea56f5ad000e6638c7645a1d13e85eb55. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 +++- .../apache/spark/sql/catalyst/expressions/AttributeMap.scala | 4 +--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 51e099f3604e0..c049d93755ffb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -755,6 +755,8 @@ class Analyzer( */ object ResolveReferences extends Rule[LogicalPlan] { + private val emptyAttrMap = new AttributeMap[Attribute](Map.empty) + /** * Generate a new logical plan for the right child with different expression IDs * for all conflicting attributes. @@ -808,7 +810,7 @@ class Analyzer( * that this rule cannot handle. When that is the case, there must be another rule * that resolves these conflicts. Otherwise, the analysis will fail. */ - (right, AttributeMap.empty) + (right, emptyAttrMap) case Some((oldRelation, newRelation)) => val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) val newRight = right transformUp { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala index f770b1d5670e8..9f4a0f2b7017a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala @@ -23,14 +23,12 @@ package org.apache.spark.sql.catalyst.expressions * of the name, or the expected nullability). */ object AttributeMap { - var empty = new AttributeMap(Map.empty) - def apply[A](kvs: Seq[(Attribute, A)]): AttributeMap[A] = { new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap) } } -class AttributeMap[+A](val baseMap: Map[ExprId, (Attribute, A)]) +class AttributeMap[A](val baseMap: Map[ExprId, (Attribute, A)]) extends Map[Attribute, A] with Serializable { override def get(k: Attribute): Option[A] = baseMap.get(k.exprId).map(_._2) From 938bd7ffab2796a298548f8e9a1a8d2b3f1aa901 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 6 Sep 2018 12:05:24 +0200 Subject: [PATCH 8/8] [SPARK-25150][SQL] Fix review findings 5 --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c049d93755ffb..2c12524ab6cf1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -754,7 +754,6 @@ class Analyzer( * a logical plan node's children. */ object ResolveReferences extends Rule[LogicalPlan] { - private val emptyAttrMap = new AttributeMap[Attribute](Map.empty) /** @@ -930,7 +929,7 @@ class Analyzer( case j @ Join(left, right, _, condition) if !j.duplicateResolved => val (dedupedRight, attributeRewrites) = dedupRight(left, right) val changedCondition = condition.map(_.transform { - case attr: Attribute if attr.resolved => attributeRewrites.getOrElse(attr, attr) + case attr: Attribute if attr.resolved => dedupAttr(attr, attributeRewrites) }) j.copy(right = dedupedRight, condition = changedCondition) case i @ Intersect(left, right, _) if !i.duplicateResolved =>