From f6f5eb75d6be8f3212a197c1284a0558b245c36a Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 30 Oct 2019 01:06:31 -0700 Subject: [PATCH 1/2] [SPARK-29604][SQL] Force initialize SessionState before initializing HiveClient in SparkSQLEnv ### What changes were proposed in this pull request? This patch fixes the issue that external listeners are not initialized properly when `spark.sql.hive.metastore.jars` is set to either "maven" or custom list of jar. ("builtin" is not a case here - all jars in Spark classloader are also available in separate classloader) The culprit is lazy initialization (lazy val or passing builder function) & thread context classloader. HiveClient leverages IsolatedClientLoader to properly load Hive and relevant libraries without issue - to not mess up with Spark classpath it uses separate classloader with leveraging thread context classloader. But there's a messed-up case - SessionState is being initialized while HiveClient changed the thread context classloader from Spark classloader to Hive isolated one, and streaming query listeners are loaded from changed classloader while initializing SessionState. This patch forces initializing SessionState in SparkSQLEnv to avoid such case. ### Why are the changes needed? ClassNotFoundException could occur in spark-sql with specific configuration, as explained above. ### Does this PR introduce any user-facing change? No, as I don't think end users assume the classloader of external listeners is only containing jars for Hive client. ### How was this patch tested? New UT added which fails on master branch and passes with the patch. The error message with master branch when running UT: ``` java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':; org.apache.spark.sql.AnalysisException: java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':; at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:109) at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:221) at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:147) at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:137) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:59) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite.$anonfun$new$2(SparkSQLEnvSuite.scala:44) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite.withSystemProperties(SparkSQLEnvSuite.scala:61) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite.$anonfun$new$1(SparkSQLEnvSuite.scala:43) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:149) at org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184) at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286) at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196) at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178) at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:56) at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221) at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214) at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:56) at org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229) at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:393) at scala.collection.immutable.List.foreach(List.scala:392) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381) at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:376) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:458) at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229) at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228) at org.scalatest.FunSuite.runTests(FunSuite.scala:1560) at org.scalatest.Suite.run(Suite.scala:1124) at org.scalatest.Suite.run$(Suite.scala:1106) at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560) at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233) at org.scalatest.SuperEngine.runImpl(Engine.scala:518) at org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233) at org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232) at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:56) at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213) at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:56) at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45) at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1349) at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1343) at scala.collection.immutable.List.foreach(List.scala:392) at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1343) at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1033) at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1011) at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1509) at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1011) at org.scalatest.tools.Runner$.run(Runner.scala:850) at org.scalatest.tools.Runner.run(Runner.scala) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:133) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27) Caused by: java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder': at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1054) at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:156) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:154) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:151) at org.apache.spark.sql.SparkSession.$anonfun$new$3(SparkSession.scala:105) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.SparkSession.$anonfun$new$1(SparkSession.scala:105) at org.apache.spark.sql.internal.SQLConf$.get(SQLConf.scala:164) at org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:183) at org.apache.spark.sql.hive.client.HiveClientImpl.(HiveClientImpl.scala:127) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:300) at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:421) at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:314) at org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:68) at org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:67) at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:221) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99) ... 58 more Caused by: java.lang.ClassNotFoundException: test.custom.listener.DummyQueryExecutionListener at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.util.Utils$.classForName(Utils.scala:206) at org.apache.spark.util.Utils$.$anonfun$loadExtensions$1(Utils.scala:2746) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245) at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108) at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:2744) at org.apache.spark.sql.util.ExecutionListenerManager.$anonfun$new$1(QueryExecutionListener.scala:83) at org.apache.spark.sql.util.ExecutionListenerManager.$anonfun$new$1$adapted(QueryExecutionListener.scala:82) at scala.Option.foreach(Option.scala:407) at org.apache.spark.sql.util.ExecutionListenerManager.(QueryExecutionListener.scala:82) at org.apache.spark.sql.internal.BaseSessionStateBuilder.$anonfun$listenerManager$2(BaseSessionStateBuilder.scala:293) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.internal.BaseSessionStateBuilder.listenerManager(BaseSessionStateBuilder.scala:293) at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:320) at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1051) ... 80 more ``` Closes #26258 from HeartSaVioR/SPARK-29604. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Dongjoon Hyun --- .../sql/util/QueryExecutionListener.scala | 5 ++ .../sql/hive/thriftserver/SparkSQLEnv.scala | 5 ++ .../hive/thriftserver/DummyListeners.scala | 39 ++++++++++ .../hive/thriftserver/SparkSQLEnvSuite.scala | 76 +++++++++++++++++++ 4 files changed, 125 insertions(+) create mode 100644 sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala create mode 100644 sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnvSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala index 2b46233e1a5df..bbdb379cfc023 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala @@ -108,6 +108,11 @@ class ExecutionListenerManager private extends Logging { listeners.clear() } + /** Only exposed for testing. */ + private[sql] def listListeners(): Array[QueryExecutionListener] = { + listenerBus.listeners.asScala.toArray + } + /** * Get an identical copy of this listener manager. */ diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 8980bcf885589..7a38ddf713547 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -49,6 +49,11 @@ private[hive] object SparkSQLEnv extends Logging { sparkContext = sparkSession.sparkContext sqlContext = sparkSession.sqlContext + // SPARK-29604: force initialization of the session state with the Spark class loader, + // instead of having it happen during the initialization of the Hive client (which may use a + // different class loader). + sparkSession.sessionState + val metadataHive = sparkSession .sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client metadataHive.setOut(new PrintStream(System.out, true, "UTF-8")) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala new file mode 100644 index 0000000000000..d056b3b2153cf --- /dev/null +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * These classes in this package are intentionally placed to the outer package of spark, + * because IsolatedClientLoader leverages Spark classloader for shared classess including + * spark package, and the test should fail if Spark initializes these listeners with + * IsolatedClientLoader. + */ +package test.custom.listener + +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.streaming.StreamingQueryListener +import org.apache.spark.sql.util.QueryExecutionListener + +class DummyQueryExecutionListener extends QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} +} + +class DummyStreamingQueryListener extends StreamingQueryListener { + override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {} + override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {} + override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {} +} diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnvSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnvSuite.scala new file mode 100644 index 0000000000000..ffd1fc48f19fe --- /dev/null +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnvSuite.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver + +import test.custom.listener.{DummyQueryExecutionListener, DummyStreamingQueryListener} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.hive.HiveUtils.{HIVE_METASTORE_JARS, HIVE_METASTORE_VERSION} +import org.apache.spark.sql.hive.test.TestHiveContext +import org.apache.spark.sql.internal.StaticSQLConf.{QUERY_EXECUTION_LISTENERS, STREAMING_QUERY_LISTENERS, WAREHOUSE_PATH} + +class SparkSQLEnvSuite extends SparkFunSuite { + test("SPARK-29604 external listeners should be initialized with Spark classloader") { + withSystemProperties( + QUERY_EXECUTION_LISTENERS.key -> classOf[DummyQueryExecutionListener].getCanonicalName, + STREAMING_QUERY_LISTENERS.key -> classOf[DummyStreamingQueryListener].getCanonicalName, + WAREHOUSE_PATH.key -> TestHiveContext.makeWarehouseDir().toURI.getPath, + // The issue occured from "maven" and list of custom jars, but providing list of custom + // jars to initialize HiveClient isn't trivial, so just use "maven". + HIVE_METASTORE_JARS.key -> "maven", + HIVE_METASTORE_VERSION.key -> null, + SparkLauncher.SPARK_MASTER -> "local[2]", + "spark.app.name" -> "testApp") { + + try { + SparkSQLEnv.init() + + val session = SparkSession.getActiveSession + assert(session.isDefined) + assert(session.get.listenerManager.listListeners() + .exists(_.isInstanceOf[DummyQueryExecutionListener])) + assert(session.get.streams.listListeners() + .exists(_.isInstanceOf[DummyStreamingQueryListener])) + } finally { + SparkSQLEnv.stop() + } + } + } + + private def withSystemProperties(pairs: (String, String)*)(f: => Unit): Unit = { + def setProperties(properties: Seq[(String, String)]): Unit = { + properties.foreach { case (key, value) => + if (value != null) { + System.setProperty(key, value) + } else { + System.clearProperty(key) + } + } + } + + val oldValues = pairs.map { kv => kv._1 -> System.getProperty(kv._1) } + try { + setProperties(pairs) + f + } finally { + setProperties(oldValues) + } + } +} From d0560b1ba23ed18c1ff6ece6493ae092e8bd6c4a Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 30 Oct 2019 17:55:50 +0900 Subject: [PATCH 2/2] Additional fix to make the patch work with Spark 2.4 --- .../apache/spark/sql/streaming/StreamingQueryManager.scala | 6 ++++++ .../org/apache/spark/sql/util/QueryExecutionListener.scala | 4 +--- .../apache/spark/sql/hive/thriftserver/DummyListeners.scala | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 25bb05212d66f..1e27e4b7b706f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.streaming import java.util.UUID import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.hadoop.fs.Path @@ -196,6 +197,11 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo listenerBus.removeListener(listener) } + /** Only exposed for testing. */ + private[sql] def listListeners(): Array[StreamingQueryListener] = { + listenerBus.listeners.asScala.toArray + } + /** Post a listener event */ private[sql] def postListenerEvent(event: StreamingQueryListener.Event): Unit = { listenerBus.post(event) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala index bbdb379cfc023..abfc6773e361f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala @@ -109,9 +109,7 @@ class ExecutionListenerManager private extends Logging { } /** Only exposed for testing. */ - private[sql] def listListeners(): Array[QueryExecutionListener] = { - listenerBus.listeners.asScala.toArray - } + private[sql] def listListeners(): Array[QueryExecutionListener] = listeners.toArray /** * Get an identical copy of this listener manager. diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala index d056b3b2153cf..74637469c0eb2 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.util.QueryExecutionListener class DummyQueryExecutionListener extends QueryExecutionListener { override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {} - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, error: Exception): Unit = {} } class DummyStreamingQueryListener extends StreamingQueryListener {