From 42e3ad2853297b55acc9b84709c59fd99fdd6af6 Mon Sep 17 00:00:00 2001 From: Luan Date: Fri, 3 Jul 2020 14:30:25 +0800 Subject: [PATCH 01/12] [SPARK-26533][SQL] Support query auto timeout cancel on thriftserver --- .../cli/operation/OperationManager.java | 15 ++++------ .../SparkExecuteStatementOperation.scala | 22 ++++++++++++-- .../server/SparkSQLOperationManager.scala | 5 ++-- .../HiveThriftServer2Suites.scala | 29 +++++++++++++++++++ .../SparkExecuteStatementOperationSuite.scala | 2 +- 5 files changed, 58 insertions(+), 15 deletions(-) diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java index 75edc5763ce44..6cb07287ae3db 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -86,20 +86,15 @@ private void initOperationLogCapture(String loggingMode) { } public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, - String statement, Map confOverlay, boolean runAsync) - throws HiveSQLException { - ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation - .newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync, 0); + String statement, Map confOverlay, boolean runAsync, long queryTimeout) + throws HiveSQLException { + ExecuteStatementOperation executeStatementOperation = + ExecuteStatementOperation.newExecuteStatementOperation(parentSession, statement, + confOverlay, runAsync, queryTimeout); addOperation(executeStatementOperation); return executeStatementOperation; } - public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, - String statement, Map confOverlay, boolean runAsync, long queryTimeout) - throws HiveSQLException { - return newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync); - } - public GetTypeInfoOperation newGetTypeInfoOperation(HiveSession parentSession) { GetTypeInfoOperation operation = new GetTypeInfoOperation(parentSession); addOperation(operation); diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index ec2c795e95c83..63e8e2e797137 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver import java.security.PrivilegedExceptionAction import java.util.{Arrays, Map => JMap} -import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.{Executors, RejectedExecutionException, TimeUnit} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -45,7 +45,8 @@ private[hive] class SparkExecuteStatementOperation( parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], - runInBackground: Boolean = true) + runInBackground: Boolean = true, + queryTimeout: Long) extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground) with SparkOperation with Logging { @@ -200,6 +201,12 @@ private[hive] class SparkExecuteStatementOperation( parentSession.getUsername) setHasResultSet(true) // avoid no resultset for async run + if (queryTimeout > 0) { + Executors.newSingleThreadScheduledExecutor.schedule(new Runnable { + override def run(): Unit = timeoutCancel() + }, queryTimeout, TimeUnit.SECONDS) + } + if (!runInBackground) { execute() } else { @@ -339,6 +346,17 @@ private[hive] class SparkExecuteStatementOperation( } } + private def timeoutCancel(): Unit = { + synchronized { + if (!getStatus.getState.isTerminal) { + setState(OperationState.TIMEDOUT) + cleanup() + logInfo(s"Timeout and Cancel query with $statementId ") + HiveThriftServer2.eventManager.onStatementCanceled(statementId) + } + } + } + override protected def cleanup(): Unit = { if (runInBackground) { val backgroundHandle = getBackgroundHandle() diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index bc9c13eb0d4f8..ba42eefed2a22 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -44,14 +44,15 @@ private[thriftserver] class SparkSQLOperationManager() parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], - async: Boolean): ExecuteStatementOperation = synchronized { + async: Boolean, + queryTimeout: Long): ExecuteStatementOperation = synchronized { val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + s" initialized or had already closed.") val conf = sqlContext.sessionState.conf val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) val operation = new SparkExecuteStatementOperation( - sqlContext, parentSession, statement, confOverlay, runInBackground) + sqlContext, parentSession, statement, confOverlay, runInBackground, queryTimeout) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created Operation for $statement with session=$parentSession, " + s"runInBackground=$runInBackground") diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 75c00000dee47..39d5f7af41427 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -880,6 +880,35 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { assert(rs.getString(1) === expected.toString) } } + + test("SPARK-26533: Support query auto timeout cancel on thriftserver") { + withJdbcStatement() { statement => + if (HiveUtils.isHive23) { + statement.setQueryTimeout(1) + val e = intercept[SQLException] { + statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") + }.getMessage + assert(e.contains("Query timed out after")) + + statement.setQueryTimeout(0) + val rs1 = statement.executeQuery( + "select 'test', java_method('java.lang.Thread', 'sleep', 3000L)") + rs1.next() + assert(rs1.getString(1) == "test") + + statement.setQueryTimeout(-1) + val rs2 = statement.executeQuery( + "select 'test', java_method('java.lang.Thread', 'sleep', 3000L)") + rs2.next() + assert(rs2.getString(1) == "test") + } else { + val e = intercept[SQLException] { + statement.setQueryTimeout(1) + }.getMessage + assert(e.contains("Method not supported")) + } + } + } } class SingleSessionSuite extends HiveThriftJdbcTest { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala index ca1f9a2f74244..7680544a30789 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala @@ -109,7 +109,7 @@ class SparkExecuteStatementOperationSuite extends SparkFunSuite with SharedSpark signal: Semaphore, finalState: OperationState) extends SparkExecuteStatementOperation(sqlContext, hiveSession, statement, - new util.HashMap, false) { + new util.HashMap, false, 0) { override def cleanup(): Unit = { super.cleanup() From 498272a8700e25cdbce40257bae8fb436717fdd0 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 2 Oct 2020 22:07:40 +0900 Subject: [PATCH 02/12] Minor fixes --- .../SparkExecuteStatementOperation.scala | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 63e8e2e797137..a8f141c283bc6 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -203,7 +203,7 @@ private[hive] class SparkExecuteStatementOperation( if (queryTimeout > 0) { Executors.newSingleThreadScheduledExecutor.schedule(new Runnable { - override def run(): Unit = timeoutCancel() + override def run(): Unit = cancel(OperationState.TIMEDOUT) }, queryTimeout, TimeUnit.SECONDS) } @@ -335,26 +335,19 @@ private[hive] class SparkExecuteStatementOperation( } } - override def cancel(): Unit = { + private def cancel(stateAfterCancel: OperationState): Unit = { synchronized { if (!getStatus.getState.isTerminal) { logInfo(s"Cancel query with $statementId") - setState(OperationState.CANCELED) + setState(stateAfterCancel) cleanup() HiveThriftServer2.eventManager.onStatementCanceled(statementId) } } } - private def timeoutCancel(): Unit = { - synchronized { - if (!getStatus.getState.isTerminal) { - setState(OperationState.TIMEDOUT) - cleanup() - logInfo(s"Timeout and Cancel query with $statementId ") - HiveThriftServer2.eventManager.onStatementCanceled(statementId) - } - } + override def cancel(): Unit = { + cancel(OperationState.CANCELED) } override protected def cleanup(): Unit = { From e6787db6f26f76b1d28b983cf93595c450bf8970 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sun, 4 Oct 2020 09:25:13 +0900 Subject: [PATCH 03/12] Address reviews --- .../hive/thriftserver/SparkExecuteStatementOperation.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index a8f141c283bc6..c880db8c0c2e6 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -203,7 +203,10 @@ private[hive] class SparkExecuteStatementOperation( if (queryTimeout > 0) { Executors.newSingleThreadScheduledExecutor.schedule(new Runnable { - override def run(): Unit = cancel(OperationState.TIMEDOUT) + override def run(): Unit = { + logInfo(s"Query with $statementId timed out after $queryTimeout seconds") + cancel(OperationState.TIMEDOUT) + } }, queryTimeout, TimeUnit.SECONDS) } From ec3f0437cf8b5967e376cf8075885e2398e9eefa Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Tue, 6 Oct 2020 09:18:34 +0900 Subject: [PATCH 04/12] Remove tests for hive-1.2 --- .../HiveThriftServer2Suites.scala | 41 ++++++++----------- 1 file changed, 17 insertions(+), 24 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 39d5f7af41427..fef48c98f6299 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -883,30 +883,23 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { test("SPARK-26533: Support query auto timeout cancel on thriftserver") { withJdbcStatement() { statement => - if (HiveUtils.isHive23) { - statement.setQueryTimeout(1) - val e = intercept[SQLException] { - statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") - }.getMessage - assert(e.contains("Query timed out after")) - - statement.setQueryTimeout(0) - val rs1 = statement.executeQuery( - "select 'test', java_method('java.lang.Thread', 'sleep', 3000L)") - rs1.next() - assert(rs1.getString(1) == "test") - - statement.setQueryTimeout(-1) - val rs2 = statement.executeQuery( - "select 'test', java_method('java.lang.Thread', 'sleep', 3000L)") - rs2.next() - assert(rs2.getString(1) == "test") - } else { - val e = intercept[SQLException] { - statement.setQueryTimeout(1) - }.getMessage - assert(e.contains("Method not supported")) - } + statement.setQueryTimeout(1) + val e = intercept[SQLException] { + statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") + }.getMessage + assert(e.contains("Query timed out after")) + + statement.setQueryTimeout(0) + val rs1 = statement.executeQuery( + "select 'test', java_method('java.lang.Thread', 'sleep', 3000L)") + rs1.next() + assert(rs1.getString(1) == "test") + + statement.setQueryTimeout(-1) + val rs2 = statement.executeQuery( + "select 'test', java_method('java.lang.Thread', 'sleep', 3000L)") + rs2.next() + assert(rs2.getString(1) == "test") } } } From 066d622bb6787ab7227a3c1e0c282a8d65618d90 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 12 Oct 2020 13:37:28 +0900 Subject: [PATCH 05/12] Fix --- .../hive/thriftserver/HiveThriftServer2.scala | 2 +- .../SparkExecuteStatementOperation.scala | 22 +++++++++++-------- .../ui/HiveThriftServer2AppStatusStore.scala | 1 + .../ui/HiveThriftServer2EventManager.scala | 7 ++++++ .../ui/HiveThriftServer2Listener.scala | 10 +++++++++ .../ui/HiveThriftServer2ListenerSuite.scala | 1 + 6 files changed, 33 insertions(+), 10 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 4e6729faced43..a1f2d62a0b72c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -116,7 +116,7 @@ object HiveThriftServer2 extends Logging { } private[thriftserver] object ExecutionState extends Enumeration { - val STARTED, COMPILED, CANCELED, FAILED, FINISHED, CLOSED = Value + val STARTED, COMPILED, CANCELED, TIMEDOUT, FAILED, FINISHED, CLOSED = Value type ExecutionState = Value } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index c880db8c0c2e6..bbb9c3fe2edeb 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -203,10 +203,7 @@ private[hive] class SparkExecuteStatementOperation( if (queryTimeout > 0) { Executors.newSingleThreadScheduledExecutor.schedule(new Runnable { - override def run(): Unit = { - logInfo(s"Query with $statementId timed out after $queryTimeout seconds") - cancel(OperationState.TIMEDOUT) - } + override def run(): Unit = timeoutCancel() }, queryTimeout, TimeUnit.SECONDS) } @@ -338,19 +335,26 @@ private[hive] class SparkExecuteStatementOperation( } } - private def cancel(stateAfterCancel: OperationState): Unit = { + private def timeoutCancel(): Unit = { synchronized { if (!getStatus.getState.isTerminal) { - logInfo(s"Cancel query with $statementId") - setState(stateAfterCancel) + logInfo(s"Query with $statementId timed out after $queryTimeout seconds") + setState(OperationState.TIMEDOUT) cleanup() - HiveThriftServer2.eventManager.onStatementCanceled(statementId) + HiveThriftServer2.eventManager.onStatementTimeout(statementId) } } } override def cancel(): Unit = { - cancel(OperationState.CANCELED) + synchronized { + if (!getStatus.getState.isTerminal) { + logInfo(s"Cancel query with $statementId") + setState(OperationState.CANCELED) + cleanup() + HiveThriftServer2.eventManager.onStatementCanceled(statementId) + } + } } override protected def cleanup(): Unit = { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala index 5cb78f6e64650..8bd8f29a4b9ec 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala @@ -119,6 +119,7 @@ private[thriftserver] class ExecutionInfo( def isExecutionActive: Boolean = { !(state == ExecutionState.FAILED || state == ExecutionState.CANCELED || + state == ExecutionState.TIMEDOUT || state == ExecutionState.CLOSED) } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2EventManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2EventManager.scala index fa04c67896a69..202fdf33c0dd9 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2EventManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2EventManager.scala @@ -57,6 +57,10 @@ private[thriftserver] class HiveThriftServer2EventManager(sc: SparkContext) { postLiveListenerBus(SparkListenerThriftServerOperationCanceled(id, System.currentTimeMillis())) } + def onStatementTimeout(id: String): Unit = { + postLiveListenerBus(SparkListenerThriftServerOperationTimeout(id, System.currentTimeMillis())) + } + def onStatementError(id: String, errorMsg: String, errorTrace: String): Unit = { postLiveListenerBus(SparkListenerThriftServerOperationError(id, errorMsg, errorTrace, System.currentTimeMillis())) @@ -96,6 +100,9 @@ private[thriftserver] case class SparkListenerThriftServerOperationParsed( private[thriftserver] case class SparkListenerThriftServerOperationCanceled( id: String, finishTime: Long) extends SparkListenerEvent +private[thriftserver] case class SparkListenerThriftServerOperationTimeout( + id: String, finishTime: Long) extends SparkListenerEvent + private[thriftserver] case class SparkListenerThriftServerOperationError( id: String, errorMsg: String, diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala index 6b7e5ee611417..4cf672e3d9d9e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala @@ -119,6 +119,7 @@ private[thriftserver] class HiveThriftServer2Listener( case e: SparkListenerThriftServerOperationStart => onOperationStart(e) case e: SparkListenerThriftServerOperationParsed => onOperationParsed(e) case e: SparkListenerThriftServerOperationCanceled => onOperationCanceled(e) + case e: SparkListenerThriftServerOperationTimeout => onOperationTimeout(e) case e: SparkListenerThriftServerOperationError => onOperationError(e) case e: SparkListenerThriftServerOperationFinish => onOperationFinished(e) case e: SparkListenerThriftServerOperationClosed => onOperationClosed(e) @@ -181,6 +182,15 @@ private[thriftserver] class HiveThriftServer2Listener( case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}") } + private def onOperationTimeout(e: SparkListenerThriftServerOperationTimeout): Unit = + Option(executionList.get(e.id)) match { + case Some(executionData) => + executionData.finishTimestamp = e.finishTime + executionData.state = ExecutionState.TIMEDOUT + updateLiveStore(executionData) + case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}") + } + private def onOperationError(e: SparkListenerThriftServerOperationError): Unit = Option(executionList.get(e.id)) match { case Some(executionData) => diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala index 9a9f574153a0a..3f0538dd1c943 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala @@ -151,6 +151,7 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter { "stmt", "groupId", 0)) listener.onOtherEvent(SparkListenerThriftServerOperationParsed(unknownOperation, "query")) listener.onOtherEvent(SparkListenerThriftServerOperationCanceled(unknownOperation, 0)) + listener.onOtherEvent(SparkListenerThriftServerOperationTimeout(unknownOperation, 0)) listener.onOtherEvent(SparkListenerThriftServerOperationError(unknownOperation, "msg", "trace", 0)) listener.onOtherEvent(SparkListenerThriftServerOperationFinish(unknownOperation, 0)) From 3bfc8097a0bbeac4ef9f921ed2da8069304784c0 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 12 Oct 2020 21:04:58 +0900 Subject: [PATCH 06/12] Fix --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 10 ++++++++++ .../thriftserver/SparkExecuteStatementOperation.scala | 8 +++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d4c7dd7f3160c..d5cd268e1544a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -893,6 +893,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val THRIFTSERVER_QUERY_TIMEOUT = + buildConf("spark.sql.thriftServer.queryTimeout") + .doc("Specifies a global timeout value for Thrift Server. A query will be cancelled " + + "automatically after the specified time. If a timeout value is set for each statement " + + "(e.g., via `java.sql.Statement.setQueryTimeout`), the value takes precedence. " + + "If the value is zero or negative, no timeout happens.") + .version("3.1.0") + .timeConf(TimeUnit.SECONDS) + .createWithDefault(0L) + val THRIFTSERVER_UI_STATEMENT_LIMIT = buildConf("spark.sql.thriftserver.ui.retainedStatements") .doc("The number of SQL statements kept in the JDBC/ODBC web UI history.") diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index bbb9c3fe2edeb..27a9fb1936b61 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -51,6 +51,12 @@ private[hive] class SparkExecuteStatementOperation( with SparkOperation with Logging { + private val queryTimeoutValue = if (queryTimeout <= 0) { + sqlContext.getConf(SQLConf.THRIFTSERVER_QUERY_TIMEOUT.key).toLong + } else { + queryTimeout + } + private var result: DataFrame = _ // We cache the returned rows to get iterators again in case the user wants to use FETCH_FIRST. @@ -201,7 +207,7 @@ private[hive] class SparkExecuteStatementOperation( parentSession.getUsername) setHasResultSet(true) // avoid no resultset for async run - if (queryTimeout > 0) { + if (queryTimeoutValue > 0) { Executors.newSingleThreadScheduledExecutor.schedule(new Runnable { override def run(): Unit = timeoutCancel() }, queryTimeout, TimeUnit.SECONDS) From 3d35de1a98377d2dc0a2d53f1824dd6718be428b Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 12 Oct 2020 21:25:32 +0900 Subject: [PATCH 07/12] Fix --- .../hive/service/cli/operation/OperationManager.java | 1 + .../apache/hive/service/cli/operation/SQLOperation.java | 7 ++++--- .../hive/thriftserver/SparkExecuteStatementOperation.scala | 2 +- .../thriftserver/SparkExecuteStatementOperationSuite.scala | 1 + 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java index 6cb07287ae3db..04d632eefea41 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -202,6 +202,7 @@ public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { Operation operation = getOperation(opHandle); OperationState opState = operation.getStatus().getState(); if (opState == OperationState.CANCELED || + opState == OperationState.TIMEDOUT || opState == OperationState.CLOSED || opState == OperationState.FINISHED || opState == OperationState.ERROR || diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java index e2ac1ea78c1ab..894793152f409 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -155,11 +155,12 @@ private void runQuery(HiveConf sqlOperationConf) throws HiveSQLException { throw toSQLException("Error while processing statement", response); } } catch (HiveSQLException e) { - // If the operation was cancelled by another thread, + // If the operation was cancelled by another thread or timed out, // Driver#run will return a non-zero response code. - // We will simply return if the operation state is CANCELED, + // We will simply return if the operation state is CANCELED or TIMEDOUT, // otherwise throw an exception - if (getStatus().getState() == OperationState.CANCELED) { + if (getStatus().getState() == OperationState.CANCELED || + getStatus().getState() == OperationState.TIMEDOUT) { return; } else { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 27a9fb1936b61..5d4efff5cb25b 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -341,7 +341,7 @@ private[hive] class SparkExecuteStatementOperation( } } - private def timeoutCancel(): Unit = { + def timeoutCancel(): Unit = { synchronized { if (!getStatus.getState.isTerminal) { logInfo(s"Query with $statementId timed out after $queryTimeout seconds") diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala index 7680544a30789..c8bb6d9ee0821 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala @@ -61,6 +61,7 @@ class SparkExecuteStatementOperationSuite extends SparkFunSuite with SharedSpark Seq( (OperationState.CANCELED, (_: SparkExecuteStatementOperation).cancel()), + (OperationState.TIMEDOUT, (_: SparkExecuteStatementOperation).timeoutCancel()), (OperationState.CLOSED, (_: SparkExecuteStatementOperation).close()) ).foreach { case (finalState, transition) => test("SPARK-32057 SparkExecuteStatementOperation should not transiently become ERROR " + From a1e2f657c7d47568f7c05e73231e08f9d64d6d04 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 12 Oct 2020 21:39:20 +0900 Subject: [PATCH 08/12] Add tests --- .../SparkExecuteStatementOperation.scala | 6 ++--- .../HiveThriftServer2Suites.scala | 22 ++++++++++++++++--- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 5d4efff5cb25b..a3dc1f6810e42 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -52,7 +52,7 @@ private[hive] class SparkExecuteStatementOperation( with Logging { private val queryTimeoutValue = if (queryTimeout <= 0) { - sqlContext.getConf(SQLConf.THRIFTSERVER_QUERY_TIMEOUT.key).toLong + sqlContext.conf.getConf(SQLConf.THRIFTSERVER_QUERY_TIMEOUT) } else { queryTimeout } @@ -210,7 +210,7 @@ private[hive] class SparkExecuteStatementOperation( if (queryTimeoutValue > 0) { Executors.newSingleThreadScheduledExecutor.schedule(new Runnable { override def run(): Unit = timeoutCancel() - }, queryTimeout, TimeUnit.SECONDS) + }, queryTimeoutValue, TimeUnit.SECONDS) } if (!runInBackground) { @@ -344,7 +344,7 @@ private[hive] class SparkExecuteStatementOperation( def timeoutCancel(): Unit = { synchronized { if (!getStatus.getState.isTerminal) { - logInfo(s"Query with $statementId timed out after $queryTimeout seconds") + logInfo(s"Query with $statementId timed out after $queryTimeoutValue seconds") setState(OperationState.TIMEDOUT) cleanup() HiveThriftServer2.eventManager.onStatementTimeout(statementId) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index fef48c98f6299..04a9b83a48f04 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -46,6 +46,7 @@ import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.HiveTestJars +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.HIVE_THRIFT_SERVER_SINGLESESSION import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.util.{ThreadUtils, Utils} @@ -285,7 +286,6 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } test("test multiple session") { - import org.apache.spark.sql.internal.SQLConf var defaultV1: String = null var defaultV2: String = null var data: ArrayBuffer[Int] = null @@ -881,11 +881,11 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } } - test("SPARK-26533: Support query auto timeout cancel on thriftserver") { + test("SPARK-26533: Support query auto timeout cancel on thriftserver - setQueryTimeout") { withJdbcStatement() { statement => statement.setQueryTimeout(1) val e = intercept[SQLException] { - statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") + statement.execute("select java_method('java.lang.Thread', 'sleep', 10000L)") }.getMessage assert(e.contains("Query timed out after")) @@ -902,6 +902,22 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { assert(rs2.getString(1) == "test") } } + + test("SPARK-26533: Support query auto timeout cancel on thriftserver - SQLConf") { + withJdbcStatement() { statement => + statement.execute(s"SET ${SQLConf.THRIFTSERVER_QUERY_TIMEOUT.key}=1") + val e = intercept[SQLException] { + statement.execute("select java_method('java.lang.Thread', 'sleep', 10000L)") + }.getMessage + assert(e.contains("Query timed out after")) + + statement.execute(s"SET ${SQLConf.THRIFTSERVER_QUERY_TIMEOUT.key}=0") + val rs = statement.executeQuery( + "select 'test', java_method('java.lang.Thread', 'sleep', 3000L)") + rs.next() + assert(rs.getString(1) == "test") + } + } } class SingleSessionSuite extends HiveThriftJdbcTest { From 5df36e6a20fdc6edca999ddfd91b37c56ebe9222 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Tue, 13 Oct 2020 14:30:21 +0900 Subject: [PATCH 09/12] Address comments --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d5cd268e1544a..e8452a52f955e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -895,7 +895,7 @@ object SQLConf { val THRIFTSERVER_QUERY_TIMEOUT = buildConf("spark.sql.thriftServer.queryTimeout") - .doc("Specifies a global timeout value for Thrift Server. A query will be cancelled " + + .doc("Specifies timeout in seconds for Thrift Server. A query will be cancelled " + "automatically after the specified time. If a timeout value is set for each statement " + "(e.g., via `java.sql.Statement.setQueryTimeout`), the value takes precedence. " + "If the value is zero or negative, no timeout happens.") From 3bdeb7aaefb87f731b3a7b4172a762b7639abfd1 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Tue, 13 Oct 2020 21:04:42 +0900 Subject: [PATCH 10/12] Update doc --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e8452a52f955e..c962642979ab7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -895,10 +895,10 @@ object SQLConf { val THRIFTSERVER_QUERY_TIMEOUT = buildConf("spark.sql.thriftServer.queryTimeout") - .doc("Specifies timeout in seconds for Thrift Server. A query will be cancelled " + - "automatically after the specified time. If a timeout value is set for each statement " + - "(e.g., via `java.sql.Statement.setQueryTimeout`), the value takes precedence. " + - "If the value is zero or negative, no timeout happens.") + .doc("Set a query duration timeout in seconds in Thrift Server. If the timeout is set to " + + "a positive value, a running query will be cancelled automatically when the timeout is " + + "exceeded, otherwise the query continues to run till completion. Timeout values that " + + "are set for each statement via`java.sql.Statement.setQueryTimeout` take precedence.") .version("3.1.0") .timeConf(TimeUnit.SECONDS) .createWithDefault(0L) From 2a32d0d6b31da27d685a522afc4736a5b9789c62 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Tue, 13 Oct 2020 21:39:56 +0900 Subject: [PATCH 11/12] Fix --- .../service/cli/operation/OperationManager.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java index 04d632eefea41..3df842d2b4af9 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -86,15 +86,21 @@ private void initOperationLogCapture(String loggingMode) { } public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, - String statement, Map confOverlay, boolean runAsync, long queryTimeout) - throws HiveSQLException { - ExecuteStatementOperation executeStatementOperation = - ExecuteStatementOperation.newExecuteStatementOperation(parentSession, statement, - confOverlay, runAsync, queryTimeout); + String statement, Map confOverlay, boolean runAsync) + throws HiveSQLException { + ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation + .newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync, 0); addOperation(executeStatementOperation); return executeStatementOperation; } + public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, + String statement, Map confOverlay, boolean runAsync, long queryTimeout) + throws HiveSQLException { + return newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync, + queryTimeout); + } + public GetTypeInfoOperation newGetTypeInfoOperation(HiveSession parentSession) { GetTypeInfoOperation operation = new GetTypeInfoOperation(parentSession); addOperation(operation); From 5e3c44060567d2234dd79d222f77bc58983bb805 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Wed, 14 Oct 2020 13:16:22 +0900 Subject: [PATCH 12/12] Address comments --- .../apache/spark/sql/internal/SQLConf.scala | 5 +-- .../SparkExecuteStatementOperation.scala | 35 ++++++++++++++----- .../HiveThriftServer2Suites.scala | 19 ++++++++-- 3 files changed, 46 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c962642979ab7..f0cde5d9b4a8a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -897,8 +897,9 @@ object SQLConf { buildConf("spark.sql.thriftServer.queryTimeout") .doc("Set a query duration timeout in seconds in Thrift Server. If the timeout is set to " + "a positive value, a running query will be cancelled automatically when the timeout is " + - "exceeded, otherwise the query continues to run till completion. Timeout values that " + - "are set for each statement via`java.sql.Statement.setQueryTimeout` take precedence.") + "exceeded, otherwise the query continues to run till completion. If timeout values are " + + "set for each statement via `java.sql.Statement.setQueryTimeout` and they are smaller " + + "than this configuration value, they take precedence.") .version("3.1.0") .timeConf(TimeUnit.SECONDS) .createWithDefault(0L) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index a3dc1f6810e42..bc8cc16746a30 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -51,10 +51,16 @@ private[hive] class SparkExecuteStatementOperation( with SparkOperation with Logging { - private val queryTimeoutValue = if (queryTimeout <= 0) { - sqlContext.conf.getConf(SQLConf.THRIFTSERVER_QUERY_TIMEOUT) - } else { - queryTimeout + // If a timeout value `queryTimeout` is specified by users and it is smaller than + // a global timeout value, we use the user-specified value. + // This code follows the Hive timeout behaviour (See #29933 for details). + private val timeout = { + val globalTimeout = sqlContext.conf.getConf(SQLConf.THRIFTSERVER_QUERY_TIMEOUT) + if (globalTimeout > 0 && (queryTimeout <= 0 || globalTimeout < queryTimeout)) { + globalTimeout + } else { + queryTimeout + } } private var result: DataFrame = _ @@ -207,10 +213,21 @@ private[hive] class SparkExecuteStatementOperation( parentSession.getUsername) setHasResultSet(true) // avoid no resultset for async run - if (queryTimeoutValue > 0) { - Executors.newSingleThreadScheduledExecutor.schedule(new Runnable { - override def run(): Unit = timeoutCancel() - }, queryTimeoutValue, TimeUnit.SECONDS) + if (timeout > 0) { + val timeoutExecutor = Executors.newSingleThreadScheduledExecutor() + timeoutExecutor.schedule(new Runnable { + override def run(): Unit = { + try { + timeoutCancel() + } catch { + case NonFatal(e) => + setOperationException(new HiveSQLException(e)) + logError(s"Error cancelling the query after timeout: $timeout seconds") + } finally { + timeoutExecutor.shutdown() + } + } + }, timeout, TimeUnit.SECONDS) } if (!runInBackground) { @@ -344,7 +361,7 @@ private[hive] class SparkExecuteStatementOperation( def timeoutCancel(): Unit = { synchronized { if (!getStatus.getState.isTerminal) { - logInfo(s"Query with $statementId timed out after $queryTimeoutValue seconds") + logInfo(s"Query with $statementId timed out after $timeout seconds") setState(OperationState.TIMEDOUT) cleanup() HiveThriftServer2.eventManager.onStatementTimeout(statementId) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 04a9b83a48f04..7cc60bb505089 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -906,16 +906,31 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { test("SPARK-26533: Support query auto timeout cancel on thriftserver - SQLConf") { withJdbcStatement() { statement => statement.execute(s"SET ${SQLConf.THRIFTSERVER_QUERY_TIMEOUT.key}=1") - val e = intercept[SQLException] { + val e1 = intercept[SQLException] { statement.execute("select java_method('java.lang.Thread', 'sleep', 10000L)") }.getMessage - assert(e.contains("Query timed out after")) + assert(e1.contains("Query timed out after")) statement.execute(s"SET ${SQLConf.THRIFTSERVER_QUERY_TIMEOUT.key}=0") val rs = statement.executeQuery( "select 'test', java_method('java.lang.Thread', 'sleep', 3000L)") rs.next() assert(rs.getString(1) == "test") + + // Uses a smaller timeout value of a config value and an a user-specified one + statement.execute(s"SET ${SQLConf.THRIFTSERVER_QUERY_TIMEOUT.key}=1") + statement.setQueryTimeout(30) + val e2 = intercept[SQLException] { + statement.execute("select java_method('java.lang.Thread', 'sleep', 10000L)") + }.getMessage + assert(e2.contains("Query timed out after")) + + statement.execute(s"SET ${SQLConf.THRIFTSERVER_QUERY_TIMEOUT.key}=30") + statement.setQueryTimeout(1) + val e3 = intercept[SQLException] { + statement.execute("select java_method('java.lang.Thread', 'sleep', 10000L)") + }.getMessage + assert(e3.contains("Query timed out after")) } } }