From d9e6ba97e09bbb2a2075775c10eccd1c38801425 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 5 Dec 2019 22:02:58 +0800 Subject: [PATCH 1/2] Fix QueryTest.checkAnswer usage --- .../org/apache/spark/sql/SQLQuerySuite.scala | 5 ++++- .../org/apache/spark/sql/SessionStateSuite.scala | 5 ++++- .../binaryfile/BinaryFileFormatSuite.scala | 15 ++++++++++++--- .../apache/spark/sql/streaming/StreamSuite.scala | 5 ++++- 4 files changed, 24 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 37d98f7c87420..e3c6a1208b93a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2030,7 +2030,10 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession { def verifyCallCount(df: DataFrame, expectedResult: Row, expectedCount: Int): Unit = { countAcc.setValue(0) QueryTest.checkAnswer( - df, Seq(expectedResult), checkToRDD = false /* avoid duplicate exec */) + df, Seq(expectedResult), checkToRDD = false /* avoid duplicate exec */) match { + case Some(errorMessage) => fail(errorMessage) + case None => + } assert(countAcc.value == expectedCount) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala index 31957a99e15af..5004689bac100 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala @@ -193,7 +193,10 @@ class SessionStateSuite extends SparkFunSuite { |FROM df x JOIN df y ON x.str = y.str |GROUP BY x.str """.stripMargin), - Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil) + Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil) match { + case Some(errorMessage) => fail(errorMessage) + case None => + } } val spark = activeSession diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala index 70ec9bbf4819d..2dac58a64c4b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala @@ -352,15 +352,24 @@ class BinaryFileFormatSuite extends QueryTest with SharedSparkSession { .select(CONTENT) } val expected = Seq(Row(content)) - QueryTest.checkAnswer(readContent(), expected) + QueryTest.checkAnswer(readContent(), expected) match { + case Some(errorMessage) => fail(errorMessage) + case None => + } withSQLConf(SOURCES_BINARY_FILE_MAX_LENGTH.key -> content.length.toString) { - QueryTest.checkAnswer(readContent(), expected) + QueryTest.checkAnswer(readContent(), expected) match { + case Some(errorMessage) => fail(errorMessage) + case None => + } } // Disable read. If the implementation attempts to read, the exception would be different. file.setReadable(false) val caught = intercept[SparkException] { withSQLConf(SOURCES_BINARY_FILE_MAX_LENGTH.key -> (content.length - 1).toString) { - QueryTest.checkAnswer(readContent(), expected) + QueryTest.checkAnswer(readContent(), expected) match { + case Some(errorMessage) => fail(errorMessage) + case None => + } } } assert(caught.getMessage.contains("exceeds the max length allowed")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 75ad041ccb801..7f2ed52c98029 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -757,7 +757,10 @@ class StreamSuite extends StreamTest { QueryTest.checkAnswer(spark.table("counts").toDF(), Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) :: - Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Row("9", 1) :: Nil) + Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Row("9", 1) :: Nil) match { + case Some(errorMessage) => fail(errorMessage) + case None => + } } finally { if (streamingQuery ne null) { streamingQuery.stop() From 61eacc940381ef717f102c809ef80afd9b0facf7 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 6 Dec 2019 10:10:15 +0800 Subject: [PATCH 2/2] Fix data type --- .../scala/org/apache/spark/sql/streaming/StreamSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 7f2ed52c98029..69862c651fc0a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -756,8 +756,8 @@ class StreamSuite extends StreamTest { streamingQuery.processAllAvailable() QueryTest.checkAnswer(spark.table("counts").toDF(), - Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) :: - Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Row("9", 1) :: Nil) match { + Row(1, 1L) :: Row(2, 1L) :: Row(3, 2L) :: Row(4, 2L) :: + Row(5, 2L) :: Row(6, 2L) :: Row(7, 1L) :: Row(8, 1L) :: Row(9, 1L) :: Nil) match { case Some(errorMessage) => fail(errorMessage) case None => }