From 2a422ea887ff0a913ecf0474ef62b57c509060a6 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 20 Sep 2019 18:28:47 +0800 Subject: [PATCH 1/5] Enable SPARK-28527 --- .../apache/spark/sql/test/SharedSparkSession.scala | 3 +++ .../SparkExecuteStatementOperation.scala | 6 +----- .../thriftserver/ThriftServerQueryTestSuite.scala | 13 ++++++++----- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala index ee29b4b8fb32b..b13bd9b2d62a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala @@ -72,6 +72,9 @@ trait SharedSparkSessionBase // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) + // Hive Thrift server should not executes SQL queries in an asynchronous way + // because we may set session configuration. + .set("spark.sql.hive.thriftServer.async", "false") conf.set( StaticSQLConf.WAREHOUSE_PATH, conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + getClass.getCanonicalName) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index f246f43435c75..de0b569b5637c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -276,11 +276,7 @@ private[hive] class SparkExecuteStatementOperation( setState(OperationState.ERROR) HiveThriftServer2.listener.onStatementError( statementId, e.getMessage, SparkUtils.exceptionString(e)) - if (e.isInstanceOf[HiveSQLException]) { - throw e.asInstanceOf[HiveSQLException] - } else { - throw new HiveSQLException("Error running query: " + e.toString, e) - } + e } } finally { synchronized { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index 381b8f2324ca6..989bdb3c670d5 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -26,7 +26,6 @@ import scala.util.control.NonFatal import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hive.service.cli.HiveSQLException -import org.scalatest.Ignore import org.apache.spark.sql.{AnalysisException, SQLQueryTestSuite} import org.apache.spark.sql.catalyst.util.fileToString @@ -43,12 +42,12 @@ import org.apache.spark.sql.types._ * 2. Support DESC command. * 3. Support SHOW command. */ -@Ignore class ThriftServerQueryTestSuite extends SQLQueryTestSuite { private var hiveServer2: HiveThriftServer2 = _ - override def beforeEach(): Unit = { + override def beforeAll(): Unit = { + super.beforeAll() // Chooses a random port between 10000 and 19999 var listeningPort = 10000 + Random.nextInt(10000) @@ -65,8 +64,12 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite { logInfo("HiveThriftServer2 started successfully") } - override def afterEach(): Unit = { - hiveServer2.stop() + override def afterAll(): Unit = { + try { + hiveServer2.stop() + } finally { + super.afterAll() + } } override val isTestWithConfigSets = false From 0b75d5befa11b9d7c119bb0eabfec0dce9d90e7a Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 22 Sep 2019 13:28:09 +0800 Subject: [PATCH 2/5] fix test error --- .../SparkExecuteStatementOperation.scala | 6 ++- .../ThriftServerQueryTestSuite.scala | 52 ++++++++++++++----- 2 files changed, 44 insertions(+), 14 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index de0b569b5637c..f246f43435c75 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -276,7 +276,11 @@ private[hive] class SparkExecuteStatementOperation( setState(OperationState.ERROR) HiveThriftServer2.listener.onStatementError( statementId, e.getMessage, SparkUtils.exceptionString(e)) - e + if (e.isInstanceOf[HiveSQLException]) { + throw e.asInstanceOf[HiveSQLException] + } else { + throw new HiveSQLException("Error running query: " + e.toString, e) + } } } finally { synchronized { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index 989bdb3c670d5..4c6917de34ee1 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -18,16 +18,19 @@ package org.apache.spark.sql.hive.thriftserver import java.io.File -import java.sql.{DriverManager, SQLException, Statement, Timestamp} -import java.util.Locale +import java.sql.{DriverManager, Statement, Timestamp} +import java.util.{Locale, MissingFormatArgumentException} import scala.util.{Random, Try} import scala.util.control.NonFatal +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hive.service.cli.HiveSQLException -import org.apache.spark.sql.{AnalysisException, SQLQueryTestSuite} +import org.apache.spark.SparkException +import org.apache.spark.sql.SQLQueryTestSuite +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.util.fileToString import org.apache.spark.sql.execution.HiveResult import org.apache.spark.sql.internal.SQLConf @@ -169,19 +172,47 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite { || d.sql.toUpperCase(Locale.ROOT).startsWith("DESC\n") || d.sql.toUpperCase(Locale.ROOT).startsWith("DESCRIBE ") || d.sql.toUpperCase(Locale.ROOT).startsWith("DESCRIBE\n") => + // Skip show command, see HiveResult.hiveResultString case s if s.sql.toUpperCase(Locale.ROOT).startsWith("SHOW ") || s.sql.toUpperCase(Locale.ROOT).startsWith("SHOW\n") => - // AnalysisException should exactly match. - // SQLException should not exactly match. We only assert the result contains Exception. - case _ if output.output.startsWith(classOf[SQLException].getName) => + + case _ if output.output.startsWith(classOf[RuntimeException].getName) => assert(expected.output.contains("Exception"), s"Exception did not match for query #$i\n${expected.sql}, " + s"expected: ${expected.output}, but got: ${output.output}") + + case _ if output.output.startsWith(classOf[NoSuchTableException].getPackage.getName) => + assert(expected.output.startsWith(classOf[NoSuchTableException].getPackage.getName), + s"Exception did not match for query #$i\n${expected.sql}, " + + s"expected: ${expected.output}, but got: ${output.output}") + + case _ if output.output.startsWith(classOf[SparkException].getName) + && output.output.contains("overflow") => + assert(expected.output.contains(classOf[ArithmeticException].getName) + && expected.output.contains("overflow"), + s"Exception did not match for query #$i\n${expected.sql}, " + + s"expected: ${expected.output}, but got: ${output.output}") + + case _ if output.output.startsWith(classOf[ArithmeticException].getName) + && output.output.contains("causes overflow") => + assert(expected.output.contains(classOf[ArithmeticException].getName) + && expected.output.contains("causes overflow"), + s"Exception did not match for query #$i\n${expected.sql}, " + + s"expected: ${expected.output}, but got: ${output.output}") + + case _ if output.output.startsWith(classOf[MissingFormatArgumentException].getName) + && output.output.contains("Format specifier") => + assert(expected.output.contains(classOf[MissingFormatArgumentException].getName) + && expected.output.contains("Format specifier"), + s"Exception did not match for query #$i\n${expected.sql}, " + + s"expected: ${expected.output}, but got: ${output.output}") + // HiveSQLException is usually a feature that our ThriftServer cannot support. // Please add SQL to blackList. case _ if output.output.startsWith(classOf[HiveSQLException].getName) => assert(false, s"${output.output} for query #$i\n${expected.sql}") + case _ => assertResult(expected.output, s"Result did not match for query #$i\n${expected.sql}") { output.output @@ -244,15 +275,10 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite { answer } } catch { - case a: AnalysisException => - // Do not output the logical plan tree which contains expression IDs. - // Also implement a crude way of masking expression IDs in the error message - // with a generic pattern "###". - val msg = if (a.plan.nonEmpty) a.getSimpleMessage else a.getMessage - Seq(a.getClass.getName, msg.replaceAll("#\\d+", "#x")).sorted case NonFatal(e) => + val rootCause = ExceptionUtils.getRootCause(e) // If there is an exception, put the exception class followed by the message. - Seq(e.getClass.getName, e.getMessage) + Seq(rootCause.getClass.getName, rootCause.getMessage) } } From 1fede278faaaadc3135b9cf2bc6307b5db6b7922 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 22 Sep 2019 14:59:06 +0800 Subject: [PATCH 3/5] Enable more tests --- .../hive/thriftserver/ThriftServerQueryTestSuite.scala | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index 4c6917de34ee1..bdfb41c05e481 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -85,9 +85,6 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite { "pgSQL/case.sql", // SPARK-28624 "date.sql", - // SPARK-28619 - "pgSQL/aggregates_part1.sql", - "group-by.sql", // SPARK-28620 "pgSQL/float4.sql", // SPARK-28636 @@ -95,12 +92,10 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite { "literals.sql", "subquery/scalar-subquery/scalar-subquery-predicate.sql", "subquery/in-subquery/in-limit.sql", + "subquery/in-subquery/in-group-by.sql", "subquery/in-subquery/simple-in.sql", "subquery/in-subquery/in-order-by.sql", - "subquery/in-subquery/in-set-operations.sql", - // SPARK-28637 - "cast.sql", - "ansi/interval.sql" + "subquery/in-subquery/in-set-operations.sql" ) override def runQueries( From eeda69946426ea70d33e6c3a1f8e1d056b41c8b0 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 22 Sep 2019 23:10:38 +0800 Subject: [PATCH 4/5] Move spark.sql.hive.thriftServer.async --- .../org/apache/spark/sql/test/SharedSparkSession.scala | 3 --- .../hive/thriftserver/ThriftServerQueryTestSuite.scala | 8 +++++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala index b13bd9b2d62a1..ee29b4b8fb32b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala @@ -72,9 +72,6 @@ trait SharedSparkSessionBase // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) - // Hive Thrift server should not executes SQL queries in an asynchronous way - // because we may set session configuration. - .set("spark.sql.hive.thriftServer.async", "false") conf.set( StaticSQLConf.WAREHOUSE_PATH, conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + getClass.getCanonicalName) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index bdfb41c05e481..4f1cc58c91389 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -28,11 +28,12 @@ import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hive.service.cli.HiveSQLException -import org.apache.spark.SparkException +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.SQLQueryTestSuite import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.util.fileToString import org.apache.spark.sql.execution.HiveResult +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -75,6 +76,11 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite { } } + override def sparkConf: SparkConf = super.sparkConf + // Hive Thrift server should not executes SQL queries in an asynchronous way + // because we may set session configuration. + .set(HiveUtils.HIVE_THRIFT_SERVER_ASYNC, false) + override val isTestWithConfigSets = false /** List of test cases to ignore, in lower cases. */ From 2eac612a33320e64a464df7ba9da683f36aa6b05 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 23 Sep 2019 09:03:40 +0800 Subject: [PATCH 5/5] Remove HiveSQLException --- .../ThriftServerQueryTestSuite.scala | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index 4f1cc58c91389..fbcf97c2b6686 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -26,10 +26,9 @@ import scala.util.control.NonFatal import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hive.service.cli.HiveSQLException import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.sql.SQLQueryTestSuite +import org.apache.spark.sql.{AnalysisException, SQLQueryTestSuite} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.util.fileToString import org.apache.spark.sql.execution.HiveResult @@ -178,41 +177,36 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite { case s if s.sql.toUpperCase(Locale.ROOT).startsWith("SHOW ") || s.sql.toUpperCase(Locale.ROOT).startsWith("SHOW\n") => - case _ if output.output.startsWith(classOf[RuntimeException].getName) => - assert(expected.output.contains("Exception"), - s"Exception did not match for query #$i\n${expected.sql}, " + - s"expected: ${expected.output}, but got: ${output.output}") - case _ if output.output.startsWith(classOf[NoSuchTableException].getPackage.getName) => assert(expected.output.startsWith(classOf[NoSuchTableException].getPackage.getName), s"Exception did not match for query #$i\n${expected.sql}, " + s"expected: ${expected.output}, but got: ${output.output}") - case _ if output.output.startsWith(classOf[SparkException].getName) - && output.output.contains("overflow") => - assert(expected.output.contains(classOf[ArithmeticException].getName) - && expected.output.contains("overflow"), + case _ if output.output.startsWith(classOf[SparkException].getName) && + output.output.contains("overflow") => + assert(expected.output.contains(classOf[ArithmeticException].getName) && + expected.output.contains("overflow"), s"Exception did not match for query #$i\n${expected.sql}, " + s"expected: ${expected.output}, but got: ${output.output}") - case _ if output.output.startsWith(classOf[ArithmeticException].getName) - && output.output.contains("causes overflow") => - assert(expected.output.contains(classOf[ArithmeticException].getName) - && expected.output.contains("causes overflow"), + case _ if output.output.startsWith(classOf[RuntimeException].getName) => + assert(expected.output.contains("Exception"), s"Exception did not match for query #$i\n${expected.sql}, " + s"expected: ${expected.output}, but got: ${output.output}") - case _ if output.output.startsWith(classOf[MissingFormatArgumentException].getName) - && output.output.contains("Format specifier") => - assert(expected.output.contains(classOf[MissingFormatArgumentException].getName) - && expected.output.contains("Format specifier"), + case _ if output.output.startsWith(classOf[ArithmeticException].getName) && + output.output.contains("causes overflow") => + assert(expected.output.contains(classOf[ArithmeticException].getName) && + expected.output.contains("causes overflow"), s"Exception did not match for query #$i\n${expected.sql}, " + s"expected: ${expected.output}, but got: ${output.output}") - // HiveSQLException is usually a feature that our ThriftServer cannot support. - // Please add SQL to blackList. - case _ if output.output.startsWith(classOf[HiveSQLException].getName) => - assert(false, s"${output.output} for query #$i\n${expected.sql}") + case _ if output.output.startsWith(classOf[MissingFormatArgumentException].getName) && + output.output.contains("Format specifier") => + assert(expected.output.contains(classOf[MissingFormatArgumentException].getName) && + expected.output.contains("Format specifier"), + s"Exception did not match for query #$i\n${expected.sql}, " + + s"expected: ${expected.output}, but got: ${output.output}") case _ => assertResult(expected.output, s"Result did not match for query #$i\n${expected.sql}") { @@ -276,6 +270,12 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite { answer } } catch { + case a: AnalysisException => + // Do not output the logical plan tree which contains expression IDs. + // Also implement a crude way of masking expression IDs in the error message + // with a generic pattern "###". + val msg = if (a.plan.nonEmpty) a.getSimpleMessage else a.getMessage + Seq(a.getClass.getName, msg.replaceAll("#\\d+", "#x")).sorted case NonFatal(e) => val rootCause = ExceptionUtils.getRootCause(e) // If there is an exception, put the exception class followed by the message.