From ace2944e342408044825c7bbd4e55648d5158f69 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 13 Dec 2017 17:09:51 +0000 Subject: [PATCH 1/5] initial commit --- .../spark/sql/execution/debug/package.scala | 34 ++++++++++++++----- .../apache/spark/sql/TPCDSQuerySuite.scala | 16 ++++++--- 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index a717cbd4a7df9..26f5aa945864c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeFormatter, CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.util.{AccumulatorV2, LongAccumulator} @@ -68,13 +68,8 @@ package object debug { output } - /** - * Get WholeStageCodegenExec subtrees and the codegen in a query plan - * - * @param plan the query plan for codegen - * @return Sequence of WholeStageCodegen subtrees and corresponding codegen - */ - def codegenStringSeq(plan: SparkPlan): Seq[(String, String)] = { + private def codegenSubtreeSourceSeq(plan: SparkPlan): + Seq[(WholeStageCodegenExec, CodeAndComment)] = { val codegenSubtrees = new collection.mutable.HashSet[WholeStageCodegenExec]() plan transform { case s: WholeStageCodegenExec => @@ -84,10 +79,31 @@ package object debug { } codegenSubtrees.toSeq.map { subtree => val (_, source) = subtree.doCodeGen() - (subtree.toString, CodeFormatter.format(source)) + (subtree, source) } } + /** + * Get WholeStageCodegenExec subtrees and the codegen in a query plan + * + * @param plan the query plan for codegen + * @return Sequence of WholeStageCodegen subtrees and corresponding codegen + */ + def codegenStringSeq(plan: SparkPlan): Seq[(String, String)] = { + codegenSubtreeSourceSeq(plan).map(s => (s._1.toString, CodeFormatter.format(s._2))) + } + + + /** + * Get WholeStageCodegenExec subtrees' CodeAndComment in a query plan + * + * @param plan the query plan for CodeAndComment + * @return Sequence of WholeStageCodegen subtrees' `CodeAndComment` + */ + def codegenCodeAndCommentSeq(plan: SparkPlan): Seq[CodeAndComment] = { + codegenSubtreeSourceSeq(plan).map(_._2) + } + /** * Augments [[Dataset]]s with debug methods. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala index a58000da1543d..e3bbbd4858ab6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala @@ -19,8 +19,10 @@ package org.apache.spark.sql import org.scalatest.BeforeAndAfterAll +import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.util.resourceToString +import org.apache.spark.sql.execution.debug import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils @@ -353,8 +355,11 @@ class TPCDSQuerySuite extends QueryTest with SharedSQLContext with BeforeAndAfte classLoader = Thread.currentThread().getContextClassLoader) test(name) { withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { - // Just check the plans can be properly generated - sql(queryString).queryExecution.executedPlan + // check the plans can be properly generated + val p = sql(queryString).queryExecution.executedPlan + // check the generated code can be properly compiled + val codes = debug.codegenCodeAndCommentSeq(p) + codes.map(c => CodeGenerator.compile(c)) } } } @@ -368,8 +373,11 @@ class TPCDSQuerySuite extends QueryTest with SharedSQLContext with BeforeAndAfte val queryString = resourceToString(s"tpcds-modifiedQueries/$name.sql", classLoader = Thread.currentThread().getContextClassLoader) test(s"modified-$name") { - // Just check the plans can be properly generated - sql(queryString).queryExecution.executedPlan + // check the plans can be properly generated + val p = sql(queryString).queryExecution.executedPlan + // check the generated code can be properly compiled + val codes = debug.codegenCodeAndCommentSeq(p) + codes.map(c => CodeGenerator.compile(c)) } } } From 1480ae31ea28ddf0d5ea3beffb99ae9995932be2 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 14 Dec 2017 05:06:28 +0000 Subject: [PATCH 2/5] address review comments --- .../spark/sql/execution/debug/package.scala | 34 ++++----------- .../apache/spark/sql/TPCDSQuerySuite.scala | 43 ++++++++++++++----- 2 files changed, 41 insertions(+), 36 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 26f5aa945864c..a717cbd4a7df9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeFormatter, CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.util.{AccumulatorV2, LongAccumulator} @@ -68,8 +68,13 @@ package object debug { output } - private def codegenSubtreeSourceSeq(plan: SparkPlan): - Seq[(WholeStageCodegenExec, CodeAndComment)] = { + /** + * Get WholeStageCodegenExec subtrees and the codegen in a query plan + * + * @param plan the query plan for codegen + * @return Sequence of WholeStageCodegen subtrees and corresponding codegen + */ + def codegenStringSeq(plan: SparkPlan): Seq[(String, String)] = { val codegenSubtrees = new collection.mutable.HashSet[WholeStageCodegenExec]() plan transform { case s: WholeStageCodegenExec => @@ -79,31 +84,10 @@ package object debug { } codegenSubtrees.toSeq.map { subtree => val (_, source) = subtree.doCodeGen() - (subtree, source) + (subtree.toString, CodeFormatter.format(source)) } } - /** - * Get WholeStageCodegenExec subtrees and the codegen in a query plan - * - * @param plan the query plan for codegen - * @return Sequence of WholeStageCodegen subtrees and corresponding codegen - */ - def codegenStringSeq(plan: SparkPlan): Seq[(String, String)] = { - codegenSubtreeSourceSeq(plan).map(s => (s._1.toString, CodeFormatter.format(s._2))) - } - - - /** - * Get WholeStageCodegenExec subtrees' CodeAndComment in a query plan - * - * @param plan the query plan for CodeAndComment - * @return Sequence of WholeStageCodegen subtrees' `CodeAndComment` - */ - def codegenCodeAndCommentSeq(plan: SparkPlan): Seq[CodeAndComment] = { - codegenSubtreeSourceSeq(plan).map(_._2) - } - /** * Augments [[Dataset]]s with debug methods. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala index e3bbbd4858ab6..6f9d462a5aba7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala @@ -19,10 +19,11 @@ package org.apache.spark.sql import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodeGenerator} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.util.resourceToString -import org.apache.spark.sql.execution.debug +import org.apache.spark.sql.execution.{SparkPlan, WholeStageCodegenExec} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils @@ -31,7 +32,7 @@ import org.apache.spark.util.Utils * This test suite ensures all the TPC-DS queries can be successfully analyzed and optimized * without hitting the max iteration threshold. */ -class TPCDSQuerySuite extends QueryTest with SharedSQLContext with BeforeAndAfterAll { +class TPCDSQuerySuite extends QueryTest with SharedSQLContext with BeforeAndAfterAll with Logging { // When Utils.isTesting is true, the RuleExecutor will issue an exception when hitting // the max iteration of analyzer/optimizer batches. @@ -350,16 +351,38 @@ class TPCDSQuerySuite extends QueryTest with SharedSQLContext with BeforeAndAfte "q81", "q82", "q83", "q84", "q85", "q86", "q87", "q88", "q89", "q90", "q91", "q92", "q93", "q94", "q95", "q96", "q97", "q98", "q99") + private def checkGeneratedCode(plan: SparkPlan): Unit = { + val codegenSubtrees = new collection.mutable.HashSet[WholeStageCodegenExec]() + plan foreach { + case s: WholeStageCodegenExec => + codegenSubtrees += s + case s => s + } + codegenSubtrees.toSeq.map { subtree => + val code = subtree.doCodeGen()._2 + try { + // Just check the generated code can be properly compiled + CodeGenerator.compile(code) + } catch { + case e: Exception => + logError(s"failed to compile: $e", e) + val msg = + s"Subtree:\n$subtree\n" + + s"Generated code:\n${CodeFormatter.format(code)}\n" + logDebug(msg) + throw e + } + } + } + tpcdsQueries.foreach { name => val queryString = resourceToString(s"tpcds/$name.sql", classLoader = Thread.currentThread().getContextClassLoader) test(name) { withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { // check the plans can be properly generated - val p = sql(queryString).queryExecution.executedPlan - // check the generated code can be properly compiled - val codes = debug.codegenCodeAndCommentSeq(p) - codes.map(c => CodeGenerator.compile(c)) + val plan = sql(queryString).queryExecution.executedPlan + checkGeneratedCode(plan) } } } @@ -374,10 +397,8 @@ class TPCDSQuerySuite extends QueryTest with SharedSQLContext with BeforeAndAfte classLoader = Thread.currentThread().getContextClassLoader) test(s"modified-$name") { // check the plans can be properly generated - val p = sql(queryString).queryExecution.executedPlan - // check the generated code can be properly compiled - val codes = debug.codegenCodeAndCommentSeq(p) - codes.map(c => CodeGenerator.compile(c)) + val plan = sql(queryString).queryExecution.executedPlan + checkGeneratedCode(plan) } } } From 7d2c5995070dd0ef56d3d75bca30d3fbfc8443d3 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 14 Dec 2017 09:33:13 +0000 Subject: [PATCH 3/5] address review comments --- .../test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala index 6f9d462a5aba7..df2390c98be7a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala @@ -358,7 +358,7 @@ class TPCDSQuerySuite extends QueryTest with SharedSQLContext with BeforeAndAfte codegenSubtrees += s case s => s } - codegenSubtrees.toSeq.map { subtree => + codegenSubtrees.toSeq.foreach { subtree => val code = subtree.doCodeGen()._2 try { // Just check the generated code can be properly compiled @@ -369,8 +369,7 @@ class TPCDSQuerySuite extends QueryTest with SharedSQLContext with BeforeAndAfte val msg = s"Subtree:\n$subtree\n" + s"Generated code:\n${CodeFormatter.format(code)}\n" - logDebug(msg) - throw e + throw new Exception(msg, e) } } } From dacb79039c2abb09a082b817d7c52f2e0902de56 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 14 Dec 2017 13:36:59 +0000 Subject: [PATCH 4/5] address review comment --- .../scala/org/apache/spark/sql/TPCDSQuerySuite.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala index df2390c98be7a..bd0a20812907e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala @@ -365,10 +365,14 @@ class TPCDSQuerySuite extends QueryTest with SharedSQLContext with BeforeAndAfte CodeGenerator.compile(code) } catch { case e: Exception => - logError(s"failed to compile: $e", e) val msg = - s"Subtree:\n$subtree\n" + - s"Generated code:\n${CodeFormatter.format(code)}\n" + s""" + |failed to compile: + |Subtree: + |$subtree + |Generated code: + |${CodeFormatter.format(code)} + """ throw new Exception(msg, e) } } From 44abd2f541701a1ff3cc4e10e686f0de56400a98 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 14 Dec 2017 15:18:10 +0000 Subject: [PATCH 5/5] address review comments --- .../test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala index bd0a20812907e..dd427a5c52edf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql import org.scalatest.BeforeAndAfterAll -import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodeGenerator} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.util.resourceToString @@ -32,7 +31,7 @@ import org.apache.spark.util.Utils * This test suite ensures all the TPC-DS queries can be successfully analyzed and optimized * without hitting the max iteration threshold. */ -class TPCDSQuerySuite extends QueryTest with SharedSQLContext with BeforeAndAfterAll with Logging { +class TPCDSQuerySuite extends QueryTest with SharedSQLContext with BeforeAndAfterAll { // When Utils.isTesting is true, the RuleExecutor will issue an exception when hitting // the max iteration of analyzer/optimizer batches. @@ -372,7 +371,7 @@ class TPCDSQuerySuite extends QueryTest with SharedSQLContext with BeforeAndAfte |$subtree |Generated code: |${CodeFormatter.format(code)} - """ + """.stripMargin throw new Exception(msg, e) } }