diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 25bb05212d66f..1e27e4b7b706f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.streaming import java.util.UUID import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.hadoop.fs.Path @@ -196,6 +197,11 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo listenerBus.removeListener(listener) } + /** Only exposed for testing. */ + private[sql] def listListeners(): Array[StreamingQueryListener] = { + listenerBus.listeners.asScala.toArray + } + /** Post a listener event */ private[sql] def postListenerEvent(event: StreamingQueryListener.Event): Unit = { listenerBus.post(event) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala index 2b46233e1a5df..abfc6773e361f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala @@ -108,6 +108,9 @@ class ExecutionListenerManager private extends Logging { listeners.clear() } + /** Only exposed for testing. */ + private[sql] def listListeners(): Array[QueryExecutionListener] = listeners.toArray + /** * Get an identical copy of this listener manager. */ diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 8980bcf885589..7a38ddf713547 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -49,6 +49,11 @@ private[hive] object SparkSQLEnv extends Logging { sparkContext = sparkSession.sparkContext sqlContext = sparkSession.sqlContext + // SPARK-29604: force initialization of the session state with the Spark class loader, + // instead of having it happen during the initialization of the Hive client (which may use a + // different class loader). + sparkSession.sessionState + val metadataHive = sparkSession .sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client metadataHive.setOut(new PrintStream(System.out, true, "UTF-8")) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala new file mode 100644 index 0000000000000..74637469c0eb2 --- /dev/null +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * These classes in this package are intentionally placed to the outer package of spark, + * because IsolatedClientLoader leverages Spark classloader for shared classess including + * spark package, and the test should fail if Spark initializes these listeners with + * IsolatedClientLoader. + */ +package test.custom.listener + +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.streaming.StreamingQueryListener +import org.apache.spark.sql.util.QueryExecutionListener + +class DummyQueryExecutionListener extends QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, error: Exception): Unit = {} +} + +class DummyStreamingQueryListener extends StreamingQueryListener { + override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {} + override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {} + override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {} +} diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnvSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnvSuite.scala new file mode 100644 index 0000000000000..ffd1fc48f19fe --- /dev/null +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnvSuite.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver + +import test.custom.listener.{DummyQueryExecutionListener, DummyStreamingQueryListener} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.hive.HiveUtils.{HIVE_METASTORE_JARS, HIVE_METASTORE_VERSION} +import org.apache.spark.sql.hive.test.TestHiveContext +import org.apache.spark.sql.internal.StaticSQLConf.{QUERY_EXECUTION_LISTENERS, STREAMING_QUERY_LISTENERS, WAREHOUSE_PATH} + +class SparkSQLEnvSuite extends SparkFunSuite { + test("SPARK-29604 external listeners should be initialized with Spark classloader") { + withSystemProperties( + QUERY_EXECUTION_LISTENERS.key -> classOf[DummyQueryExecutionListener].getCanonicalName, + STREAMING_QUERY_LISTENERS.key -> classOf[DummyStreamingQueryListener].getCanonicalName, + WAREHOUSE_PATH.key -> TestHiveContext.makeWarehouseDir().toURI.getPath, + // The issue occured from "maven" and list of custom jars, but providing list of custom + // jars to initialize HiveClient isn't trivial, so just use "maven". + HIVE_METASTORE_JARS.key -> "maven", + HIVE_METASTORE_VERSION.key -> null, + SparkLauncher.SPARK_MASTER -> "local[2]", + "spark.app.name" -> "testApp") { + + try { + SparkSQLEnv.init() + + val session = SparkSession.getActiveSession + assert(session.isDefined) + assert(session.get.listenerManager.listListeners() + .exists(_.isInstanceOf[DummyQueryExecutionListener])) + assert(session.get.streams.listListeners() + .exists(_.isInstanceOf[DummyStreamingQueryListener])) + } finally { + SparkSQLEnv.stop() + } + } + } + + private def withSystemProperties(pairs: (String, String)*)(f: => Unit): Unit = { + def setProperties(properties: Seq[(String, String)]): Unit = { + properties.foreach { case (key, value) => + if (value != null) { + System.setProperty(key, value) + } else { + System.clearProperty(key) + } + } + } + + val oldValues = pairs.map { kv => kv._1 -> System.getProperty(kv._1) } + try { + setProperties(pairs) + f + } finally { + setProperties(oldValues) + } + } +}