From 773eacdb3f2275863e3e521a6aa9a94178c5ce7c Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Wed, 5 Aug 2020 17:31:14 +0800 Subject: [PATCH 1/5] [SPARK-32537][SQL][TEST] Add a CTEHintSuite for test coverage --- .../org/apache/spark/sql/CTEHintSuite.scala | 158 ++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/CTEHintSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CTEHintSuite.scala new file mode 100644 index 0000000000000..f3f68215b684c --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEHintSuite.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.log4j.Level + +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint, Repartition, RepartitionByExpression, ResolvedHint, SHUFFLE_HASH, SHUFFLE_MERGE, SHUFFLE_REPLICATE_NL} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.test.SharedSparkSession + +class CTEHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkPlanHelper { + + def verifyCoalesceHint(df: DataFrame): Unit = { + val optimized = df.queryExecution.optimizedPlan + val repartitions = optimized collect { + case r: Repartition => r + case r: RepartitionByExpression => r + case _: ResolvedHint => fail("ResolvedHint should not appear after optimize.") + } + assert(repartitions.nonEmpty) + } + + def verifyJoinHint(df: DataFrame, expectedHints: Seq[JoinHint]): Unit = { + val optimized = df.queryExecution.optimizedPlan + val joinHints = optimized collect { + case Join(_, _, _, _, hint) => hint + case _: ResolvedHint => fail("ResolvedHint should not appear after optimize.") + } + assert(joinHints == expectedHints) + } + + def verifyJoinHintWithWarnings( + df: => DataFrame, + expectedHints: Seq[JoinHint], + warnings: Seq[String]): Unit = { + val logAppender = new LogAppender("join hints") + withLogAppender(logAppender) { + verifyJoinHint(df, expectedHints) + } + val warningMessages = logAppender.loggingEvents + .filter(_.getLevel == Level.WARN) + .map(_.getRenderedMessage) + .filter(_.contains("hint")) + assert(warningMessages.size == warnings.size) + warnings.foreach { w => + assert(warningMessages.contains(w)) + } + } + + def msgNoJoinForJoinHint(strategy: String): String = + s"A join hint (strategy=$strategy) is specified but it is not part of a join relation." + + def msgJoinHintOverridden(strategy: String): String = + s"Hint (strategy=$strategy) is overridden by another hint and will not take effect." + + test("Resolve coalesce hint in CTE") { + // COALESCE, + // REPARTITION, + // REPARTITION_BY_RANGE + withTable("t") { + sql("CREATE TABLE t USING PARQUET AS SELECT 1 AS id") + verifyCoalesceHint( + sql("WITH cte AS (SELECT /*+ COALESCE(1) */ * FROM t) SELECT * FROM cte")) + verifyCoalesceHint( + sql("WITH cte AS (SELECT /*+ REPARTITION(3) */ * FROM t) SELECT * FROM cte")) + verifyCoalesceHint( + sql("WITH cte AS (SELECT /*+ REPARTITION(id) */ * FROM t) SELECT * FROM cte")) + verifyCoalesceHint( + sql("WITH cte AS (SELECT /*+ REPARTITION(3, id) */ * FROM t) SELECT * FROM cte")) + verifyCoalesceHint( + sql("WITH cte AS (SELECT /*+ REPARTITION_BY_RANGE(id) */ * FROM t) SELECT * FROM cte")) + verifyCoalesceHint( + sql("WITH cte AS (SELECT /*+ REPARTITION_BY_RANGE(3, id) */ * FROM t) SELECT * FROM cte")) + } + } + + test("Resolve join hint in CTE") { + // BROADCAST, + // SHUFFLE_MERGE, + // SHUFFLE_HASH, + // SHUFFLE_REPLICATE_NL + withTable("t", "s") { + sql("CREATE TABLE a USING PARQUET AS SELECT 1 AS a1") + sql("CREATE TABLE b USING PARQUET AS SELECT 1 AS b1") + sql("CREATE TABLE c USING PARQUET AS SELECT 1 AS c1") + verifyJoinHint( + sql( + """ + |WITH cte AS ( + | SELECT /*+ BROADCAST(a) */ * FROM a JOIN b ON a.a1 = b.b1 + |) + |SELECT * FROM cte + |""".stripMargin), + JoinHint( + Some(HintInfo(strategy = Some(BROADCAST))), + None) :: Nil + ) + verifyJoinHint( + sql( + """ + |WITH cte AS ( + | SELECT /*+ SHUFFLE_HASH(a) */ * FROM a JOIN b ON a.a1 = b.b1 + |) + |SELECT * FROM cte + |""".stripMargin), + JoinHint( + Some(HintInfo(strategy = Some(SHUFFLE_HASH))), + None) :: Nil + ) + verifyJoinHintWithWarnings( + sql( + """ + |WITH cte AS ( + | SELECT /*+ SHUFFLE_HASH MERGE(a, c) BROADCAST(a, b)*/ * FROM a, b, c + | WHERE a.a1 = b.b1 AND b.b1 = c.c1 + |) + |SELECT * FROM cte + |""".stripMargin), + JoinHint( + None, + Some(HintInfo(strategy = Some(SHUFFLE_MERGE)))) :: + JoinHint( + Some(HintInfo(strategy = Some(SHUFFLE_MERGE))), + Some(HintInfo(strategy = Some(BROADCAST)))) :: Nil, + msgNoJoinForJoinHint("shuffle_hash") :: + msgJoinHintOverridden("broadcast") :: Nil + ) + verifyJoinHint( + sql( + """ + |WITH cte AS ( + | SELECT /*+ SHUFFLE_REPLICATE_NL(a) SHUFFLE_HASH(b) */ * FROM a JOIN b ON a.a1 = b.b1 + |) + |SELECT * FROM cte + |""".stripMargin), + JoinHint( + Some(HintInfo(strategy = Some(SHUFFLE_REPLICATE_NL))), + Some(HintInfo(strategy = Some(SHUFFLE_HASH)))) :: Nil + ) + } + } +} From 6f8a2cc74b91bbf8a15ff60a6e36e5281caaa4ca Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Thu, 6 Aug 2020 16:39:39 +0800 Subject: [PATCH 2/5] address comment --- .../org/apache/spark/sql/CTEHintSuite.scala | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CTEHintSuite.scala index f3f68215b684c..adfba3f311c3b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CTEHintSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEHintSuite.scala @@ -20,23 +20,32 @@ package org.apache.spark.sql import org.apache.log4j.Level import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint, Repartition, RepartitionByExpression, ResolvedHint, SHUFFLE_HASH, SHUFFLE_MERGE, SHUFFLE_REPLICATE_NL} -import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.test.SharedSparkSession -class CTEHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkPlanHelper { +class CTEHintSuite extends PlanTest with SharedSparkSession { def verifyCoalesceHint(df: DataFrame): Unit = { - val optimized = df.queryExecution.optimizedPlan - val repartitions = optimized collect { - case r: Repartition => r - case r: RepartitionByExpression => r - case _: ResolvedHint => fail("ResolvedHint should not appear after optimize.") + def checkPlanContainsRepartition(plan: LogicalPlan): Unit = { + val repartitions = plan collect { + case r: Repartition => r + case r: RepartitionByExpression => r + case _: ResolvedHint => fail("ResolvedHint should not appear after optimize.") + } + assert(repartitions.nonEmpty) } - assert(repartitions.nonEmpty) + val analyzed = df.queryExecution.analyzed + val optimized = df.queryExecution.optimizedPlan + checkPlanContainsRepartition(analyzed) + checkPlanContainsRepartition(optimized) } def verifyJoinHint(df: DataFrame, expectedHints: Seq[JoinHint]): Unit = { + val analyzed = df.queryExecution.analyzed + val resolvedHints = analyzed collect { + case r: ResolvedHint => r + } + assert(resolvedHints.nonEmpty) val optimized = df.queryExecution.optimizedPlan val joinHints = optimized collect { case Join(_, _, _, _, hint) => hint From ce941270ccb3a2080c22d037f940bb8ed4191df2 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Thu, 6 Aug 2020 16:44:43 +0800 Subject: [PATCH 3/5] remove SPARK-32237: Hint in CTE to new suite --- .../scala/org/apache/spark/sql/CTEHintSuite.scala | 15 +++++++++++++-- .../org/apache/spark/sql/SQLQuerySuite.scala | 12 ------------ 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CTEHintSuite.scala index adfba3f311c3b..311988e36591b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CTEHintSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEHintSuite.scala @@ -19,11 +19,10 @@ package org.apache.spark.sql import org.apache.log4j.Level -import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.test.SharedSparkSession -class CTEHintSuite extends PlanTest with SharedSparkSession { +class CTEHintSuite extends QueryTest with SharedSparkSession { def verifyCoalesceHint(df: DataFrame): Unit = { def checkPlanContainsRepartition(plan: LogicalPlan): Unit = { @@ -164,4 +163,16 @@ class CTEHintSuite extends PlanTest with SharedSparkSession { ) } } + + test("SPARK-32237: Hint in CTE") { + withTable("t") { + sql("CREATE TABLE t USING PARQUET AS SELECT 1 AS id") + checkAnswer( + sql(s""" + |WITH cte AS (SELECT /*+ REPARTITION(3) */ * FROM t) + |SELECT * FROM cte + """.stripMargin), + Row(1) :: Nil) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 64d2567f84f1b..72f2e62e7e439 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3561,18 +3561,6 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } - test("SPARK-32237: Hint in CTE") { - withTable("t") { - sql("CREATE TABLE t USING PARQUET AS SELECT 1 AS id") - checkAnswer( - sql(s""" - |WITH cte AS (SELECT /*+ REPARTITION(3) */ * FROM t) - |SELECT * FROM cte - """.stripMargin), - Row(1) :: Nil) - } - } - test("SPARK-32372: ResolveReferences.dedupRight should only rewrite attributes for ancestor " + "plans of the conflict plan") { sql("SELECT name, avg(age) as avg_age FROM person GROUP BY name") From 42563390f9b34c87c9aea1038e047b59c68dcaba Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Fri, 7 Aug 2020 13:23:28 +0800 Subject: [PATCH 4/5] nit --- .../org/apache/spark/sql/CTEHintSuite.scala | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CTEHintSuite.scala index 311988e36591b..64d52607448fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CTEHintSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEHintSuite.scala @@ -24,19 +24,21 @@ import org.apache.spark.sql.test.SharedSparkSession class CTEHintSuite extends QueryTest with SharedSparkSession { - def verifyCoalesceHint(df: DataFrame): Unit = { - def checkPlanContainsRepartition(plan: LogicalPlan): Unit = { + def verifyCoalesceOrRepartitionHint(df: DataFrame): Unit = { + def checkContainsRepartition(plan: LogicalPlan): Unit = { val repartitions = plan collect { case r: Repartition => r case r: RepartitionByExpression => r - case _: ResolvedHint => fail("ResolvedHint should not appear after optimize.") } assert(repartitions.nonEmpty) } val analyzed = df.queryExecution.analyzed val optimized = df.queryExecution.optimizedPlan - checkPlanContainsRepartition(analyzed) - checkPlanContainsRepartition(optimized) + checkContainsRepartition(analyzed) + checkContainsRepartition(optimized) + optimized collect { + case _: ResolvedHint => fail("ResolvedHint should not appear after optimize.") + } } def verifyJoinHint(df: DataFrame, expectedHints: Seq[JoinHint]): Unit = { @@ -83,17 +85,17 @@ class CTEHintSuite extends QueryTest with SharedSparkSession { // REPARTITION_BY_RANGE withTable("t") { sql("CREATE TABLE t USING PARQUET AS SELECT 1 AS id") - verifyCoalesceHint( + verifyCoalesceOrRepartitionHint( sql("WITH cte AS (SELECT /*+ COALESCE(1) */ * FROM t) SELECT * FROM cte")) - verifyCoalesceHint( + verifyCoalesceOrRepartitionHint( sql("WITH cte AS (SELECT /*+ REPARTITION(3) */ * FROM t) SELECT * FROM cte")) - verifyCoalesceHint( + verifyCoalesceOrRepartitionHint( sql("WITH cte AS (SELECT /*+ REPARTITION(id) */ * FROM t) SELECT * FROM cte")) - verifyCoalesceHint( + verifyCoalesceOrRepartitionHint( sql("WITH cte AS (SELECT /*+ REPARTITION(3, id) */ * FROM t) SELECT * FROM cte")) - verifyCoalesceHint( + verifyCoalesceOrRepartitionHint( sql("WITH cte AS (SELECT /*+ REPARTITION_BY_RANGE(id) */ * FROM t) SELECT * FROM cte")) - verifyCoalesceHint( + verifyCoalesceOrRepartitionHint( sql("WITH cte AS (SELECT /*+ REPARTITION_BY_RANGE(3, id) */ * FROM t) SELECT * FROM cte")) } } @@ -168,10 +170,11 @@ class CTEHintSuite extends QueryTest with SharedSparkSession { withTable("t") { sql("CREATE TABLE t USING PARQUET AS SELECT 1 AS id") checkAnswer( - sql(s""" - |WITH cte AS (SELECT /*+ REPARTITION(3) */ * FROM t) - |SELECT * FROM cte - """.stripMargin), + sql( + """ + |WITH cte AS (SELECT /*+ REPARTITION(3) */ * FROM t) + |SELECT * FROM cte + |""".stripMargin), Row(1) :: Nil) } } From a7c4d32894d75696ee9a891219d9c99314ef05ed Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Mon, 10 Aug 2020 09:53:35 +0800 Subject: [PATCH 5/5] remove duplicated test --- .../scala/org/apache/spark/sql/CTEHintSuite.scala | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CTEHintSuite.scala index 64d52607448fd..13039bbbf6bd2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CTEHintSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEHintSuite.scala @@ -165,17 +165,4 @@ class CTEHintSuite extends QueryTest with SharedSparkSession { ) } } - - test("SPARK-32237: Hint in CTE") { - withTable("t") { - sql("CREATE TABLE t USING PARQUET AS SELECT 1 AS id") - checkAnswer( - sql( - """ - |WITH cte AS (SELECT /*+ REPARTITION(3) */ * FROM t) - |SELECT * FROM cte - |""".stripMargin), - Row(1) :: Nil) - } - } }