From 6fd6260c43e9c1de43e99708b892e441191390df Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sun, 28 Jul 2019 01:59:39 +0900 Subject: [PATCH 1/6] Implement doExecuteBroadcast and doExecuteColumnar to enable debugging broadcast or columnar related queries --- .../spark/sql/execution/debug/package.scala | 10 ++++++++ .../sql/execution/debug/DebuggingSuite.scala | 23 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 53b74c7c85594..2891dd4694a94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -21,6 +21,7 @@ import java.util.Collections import scala.collection.JavaConverters._ +import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -33,6 +34,7 @@ import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamingQuery +import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.{AccumulatorV2, LongAccumulator} /** @@ -255,5 +257,13 @@ package object debug { override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { consume(ctx, input) } + + override def doExecuteBroadcast[T](): Broadcast[T] = { + child.executeBroadcast() + } + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + child.executeColumnar() + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index 8251ff159e05f..b448b22e89762 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.debug +import scala.util.{Failure, Success, Try} + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext @@ -48,4 +50,25 @@ class DebuggingSuite extends SparkFunSuite with SharedSQLContext { assert(res.forall{ case (subtree, code) => subtree.contains("Range") && code.contains("Object[]")}) } + + test("debug() or broadcast and columnar") { + val rightDF = spark.range(10) + val leftDF = spark.range(10) + val joinedDF = leftDF.join(rightDF, leftDF("id") === rightDF("id")) + Try { + joinedDF.debug() + } match { + case Success(_) => + case Failure(e) => fail(e) + } + + val df = spark.range(5) + df.persist() + Try { + df.debug() + } match { + case Success(_) => + case Failure(e) => fail(e) + } + } } From 8ee28f5719a690485e2a2be8a509da4d4ec2601b Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sun, 28 Jul 2019 17:29:19 +0900 Subject: [PATCH 2/6] Change the test cases added to compare expected result. --- .../sql/execution/debug/DebuggingSuite.scala | 48 ++++++++++++++----- 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index b448b22e89762..e74dc045b88d2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.debug -import scala.util.{Failure, Success, Try} +import java.io.ByteArrayOutputStream import org.apache.spark.SparkFunSuite import org.apache.spark.sql.functions._ @@ -51,24 +51,48 @@ class DebuggingSuite extends SparkFunSuite with SharedSQLContext { subtree.contains("Range") && code.contains("Object[]")}) } - test("debug() or broadcast and columnar") { + test("SPARK-28537: DebugExec cannot debug broadcast or columnar related queries") { val rightDF = spark.range(10) val leftDF = spark.range(10) val joinedDF = leftDF.join(rightDF, leftDF("id") === rightDF("id")) - Try { - joinedDF.debug() - } match { - case Success(_) => - case Failure(e) => fail(e) + try { + val captured = new ByteArrayOutputStream() + Console.withOut(captured) { + joinedDF.debug() + } + + val output = captured.toString() + assert(output.contains( + """== BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) == + |Tuples output: 0 + | id LongType: {} + |== WholeStageCodegen == + |Tuples output: 10 + | id LongType: {java.lang.Long} + |== Range (0, 10, step=1, splits=2) == + |Tuples output: 0 + | id LongType: {}""".stripMargin)) + } catch { + case e: Throwable => fail("debug() for broadcast failed with exception", e) } val df = spark.range(5) df.persist() - Try { - df.debug() - } match { - case Success(_) => - case Failure(e) => fail(e) + try { + val captured = new ByteArrayOutputStream() + Console.withOut(captured) { + df.debug() + } + + val exprId = df.queryExecution.executedPlan.output.head.toString + val output = captured.toString() + assert(output.contains( + s"""== InMemoryTableScan [$exprId] == + |Tuples output: 0 + | id LongType: {} + |""".stripMargin)) + } catch { + case e: Throwable => fail("debug() for columnar failed with exception", e) } } } From 2685272b688f893865489616372ca4d97890b978 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 29 Jul 2019 18:01:00 +0900 Subject: [PATCH 3/6] Reflect comments for the test cases --- .../spark/sql/execution/debug/DebuggingSuite.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index e74dc045b88d2..358e0ef60f101 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.debug import java.io.ByteArrayOutputStream +import scala.util.control.NonFatal + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext @@ -51,7 +53,7 @@ class DebuggingSuite extends SparkFunSuite with SharedSQLContext { subtree.contains("Range") && code.contains("Object[]")}) } - test("SPARK-28537: DebugExec cannot debug broadcast or columnar related queries") { + test("SPARK-28537: DebugExec cannot debug broadcast and columnar related queries") { val rightDF = spark.range(10) val leftDF = spark.range(10) val joinedDF = leftDF.join(rightDF, leftDF("id") === rightDF("id")) @@ -73,7 +75,7 @@ class DebuggingSuite extends SparkFunSuite with SharedSQLContext { |Tuples output: 0 | id LongType: {}""".stripMargin)) } catch { - case e: Throwable => fail("debug() for broadcast failed with exception", e) + case NonFatal(e) => fail("debug() for broadcast failed with exception", e) } val df = spark.range(5) @@ -84,15 +86,14 @@ class DebuggingSuite extends SparkFunSuite with SharedSQLContext { df.debug() } - val exprId = df.queryExecution.executedPlan.output.head.toString - val output = captured.toString() + val output = captured.toString()replaceAll ("#\\d+", "#x") assert(output.contains( - s"""== InMemoryTableScan [$exprId] == + s"""== InMemoryTableScan [id#xL] == |Tuples output: 0 | id LongType: {} |""".stripMargin)) } catch { - case e: Throwable => fail("debug() for columnar failed with exception", e) + case NonFatal(e) => fail("debug() for columnar failed with exception", e) } } } From ee1c26f83e6d362f5a1476f9565ae7654fd08004 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 5 Aug 2019 00:15:05 +0900 Subject: [PATCH 4/6] Split added test cases --- .../sql/execution/debug/DebuggingSuite.scala | 60 +++++++++---------- 1 file changed, 28 insertions(+), 32 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index 358e0ef60f101..2e5d12087e495 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -53,47 +53,43 @@ class DebuggingSuite extends SparkFunSuite with SharedSQLContext { subtree.contains("Range") && code.contains("Object[]")}) } - test("SPARK-28537: DebugExec cannot debug broadcast and columnar related queries") { + test("SPARK-28537: DebugExec cannot debug broadcast related queries") { val rightDF = spark.range(10) val leftDF = spark.range(10) val joinedDF = leftDF.join(rightDF, leftDF("id") === rightDF("id")) - try { - val captured = new ByteArrayOutputStream() - Console.withOut(captured) { - joinedDF.debug() - } - val output = captured.toString() - assert(output.contains( - """== BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) == - |Tuples output: 0 - | id LongType: {} - |== WholeStageCodegen == - |Tuples output: 10 - | id LongType: {java.lang.Long} - |== Range (0, 10, step=1, splits=2) == - |Tuples output: 0 - | id LongType: {}""".stripMargin)) - } catch { - case NonFatal(e) => fail("debug() for broadcast failed with exception", e) + val captured = new ByteArrayOutputStream() + Console.withOut(captured) { + joinedDF.debug() } + val output = captured.toString() + assert(output.contains( + """== BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) == + |Tuples output: 0 + | id LongType: {} + |== WholeStageCodegen == + |Tuples output: 10 + | id LongType: {java.lang.Long} + |== Range (0, 10, step=1, splits=2) == + |Tuples output: 0 + | id LongType: {}""".stripMargin)) + } + + test("SPARK-28537: DebugExec cannot debug columnar related queries") { val df = spark.range(5) df.persist() - try { - val captured = new ByteArrayOutputStream() - Console.withOut(captured) { - df.debug() - } - val output = captured.toString()replaceAll ("#\\d+", "#x") - assert(output.contains( - s"""== InMemoryTableScan [id#xL] == - |Tuples output: 0 - | id LongType: {} - |""".stripMargin)) - } catch { - case NonFatal(e) => fail("debug() for columnar failed with exception", e) + val captured = new ByteArrayOutputStream() + Console.withOut(captured) { + df.debug() } + + val output = captured.toString()replaceAll ("#\\d+", "#x") + assert(output.contains( + s"""== InMemoryTableScan [id#xL] == + |Tuples output: 0 + | id LongType: {} + |""".stripMargin)) } } From e67d004b43e519ae6299d4cadd33abb61ce96449 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 5 Aug 2019 01:06:12 +0900 Subject: [PATCH 5/6] Fix styles --- .../sql/execution/debug/DebuggingSuite.scala | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index 2e5d12087e495..3cc930012acc9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -66,14 +66,14 @@ class DebuggingSuite extends SparkFunSuite with SharedSQLContext { val output = captured.toString() assert(output.contains( """== BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) == - |Tuples output: 0 - | id LongType: {} - |== WholeStageCodegen == - |Tuples output: 10 - | id LongType: {java.lang.Long} - |== Range (0, 10, step=1, splits=2) == - |Tuples output: 0 - | id LongType: {}""".stripMargin)) + |Tuples output: 0 + | id LongType: {} + |== WholeStageCodegen == + |Tuples output: 10 + | id LongType: {java.lang.Long} + |== Range (0, 10, step=1, splits=2) == + |Tuples output: 0 + | id LongType: {}""".stripMargin)) } test("SPARK-28537: DebugExec cannot debug columnar related queries") { @@ -84,10 +84,11 @@ class DebuggingSuite extends SparkFunSuite with SharedSQLContext { Console.withOut(captured) { df.debug() } + df.unpersist() - val output = captured.toString()replaceAll ("#\\d+", "#x") + val output = captured.toString().replaceAll("#\\d+", "#x") assert(output.contains( - s"""== InMemoryTableScan [id#xL] == + """== InMemoryTableScan [id#xL] == |Tuples output: 0 | id LongType: {} |""".stripMargin)) From 46c6598cf791191fbf5d0fbaeafb31460d4b9319 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 5 Aug 2019 01:07:50 +0900 Subject: [PATCH 6/6] Removed NonFatal --- .../org/apache/spark/sql/execution/debug/DebuggingSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index 3cc930012acc9..e423420c2914a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.debug import java.io.ByteArrayOutputStream -import scala.util.control.NonFatal - import org.apache.spark.SparkFunSuite import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext