From 8ca3031948b56f5f54dced1b58de35ec27f1e370 Mon Sep 17 00:00:00 2001 From: Carson Wang Date: Thu, 26 Nov 2015 14:41:10 +0800 Subject: [PATCH 1/4] Fix SQLListenerMemoryLeakSuite test error --- .../org/apache/spark/sql/execution/ui/SQLListenerSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index f93d081d0c30e..ff585db5d3586 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -343,6 +343,8 @@ class SQLListenerMemoryLeakSuite extends SparkFunSuite { .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly val sc = new SparkContext(conf) try { + // Clear the sql listener created by a previous test suite. + SQLContext.clearSqlListener() val sqlContext = new SQLContext(sc) import sqlContext.implicits._ // Run 100 successful executions and 100 failed executions. From 4549f62866f8917451dcc4d775943b5232186c46 Mon Sep 17 00:00:00 2001 From: Carson Wang Date: Fri, 27 Nov 2015 14:29:52 +0800 Subject: [PATCH 2/4] Add SparkContext stop hook to clear resource in SQLContext --- .../main/scala/org/apache/spark/SparkContext.scala | 12 +++++++++++- .../main/scala/org/apache/spark/sql/SQLContext.scala | 6 ++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2c10779f2b893..8e1595bdaf6ba 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -29,7 +29,7 @@ import java.util.UUID.randomUUID import scala.collection.JavaConverters._ import scala.collection.{Map, Set} import scala.collection.generic.Growable -import scala.collection.mutable.HashMap +import scala.collection.mutable.{ListBuffer, HashMap} import scala.reflect.{ClassTag, classTag} import scala.util.control.NonFatal @@ -241,6 +241,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private var _jars: Seq[String] = _ private var _files: Seq[String] = _ private var _shutdownHookRef: AnyRef = _ + private val _stopHook = new ListBuffer[() => Unit]() /* ------------------------------------------------------------------------------------- * | Accessors and public fields. These provide access to the internal state of the | @@ -1618,6 +1619,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli listenerBus.post(SparkListenerUnpersistRDD(rddId)) } + /** + * Adds a stop hook which can be used to clean up additional resource. This is called when the + * sparkContext is being stopped. + */ + private[spark] def addStopHook(hook: () => Unit): Unit = { + _stopHook += hook + } + /** * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported @@ -1764,6 +1773,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Unset YARN mode system env variable, to allow switching between cluster types. System.clearProperty("SPARK_YARN_MODE") SparkContext.clearActiveContext() + _stopHook.foreach(_()) logInfo("Successfully stopped SparkContext") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 1c2ac5f6f11bf..b34b01610d5cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -78,6 +78,12 @@ class SQLContext private[sql]( } def this(sparkContext: JavaSparkContext) = this(sparkContext.sc) + sparkContext.addStopHook(() => { + SQLContext.clearInstantiatedContext() + SQLContext.clearActive() + SQLContext.clearSqlListener() + }) + // If spark.sql.allowMultipleContexts is true, we will throw an exception if a user // wants to create a new root SQLContext (a SLQContext that is not created by newSession). private val allowMultipleContexts = From b694e27979a2acad3e9653e082c08e8b3f7e41b7 Mon Sep 17 00:00:00 2001 From: Carson Wang Date: Fri, 27 Nov 2015 14:58:55 +0800 Subject: [PATCH 3/4] update variable name --- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8e1595bdaf6ba..ca4eaf4cebb23 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -241,7 +241,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private var _jars: Seq[String] = _ private var _files: Seq[String] = _ private var _shutdownHookRef: AnyRef = _ - private val _stopHook = new ListBuffer[() => Unit]() + private val _stopHooks = new ListBuffer[() => Unit]() /* ------------------------------------------------------------------------------------- * | Accessors and public fields. These provide access to the internal state of the | @@ -1624,7 +1624,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * sparkContext is being stopped. */ private[spark] def addStopHook(hook: () => Unit): Unit = { - _stopHook += hook + _stopHooks += hook } /** @@ -1773,7 +1773,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Unset YARN mode system env variable, to allow switching between cluster types. System.clearProperty("SPARK_YARN_MODE") SparkContext.clearActiveContext() - _stopHook.foreach(_()) + _stopHooks.foreach(_()) logInfo("Successfully stopped SparkContext") } From 00df329c8af56949e2e38a3a8f0218a9d6cc6c9e Mon Sep 17 00:00:00 2001 From: Carson Wang Date: Mon, 30 Nov 2015 09:21:52 +0800 Subject: [PATCH 4/4] Wrap calls to the hooks with Utils.tryLogNonFatalError. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ca4eaf4cebb23..dbb8dd198b6a3 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1773,11 +1773,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Unset YARN mode system env variable, to allow switching between cluster types. System.clearProperty("SPARK_YARN_MODE") SparkContext.clearActiveContext() - _stopHooks.foreach(_()) + _stopHooks.foreach(hook => Utils.tryLogNonFatalError { + hook() + }) logInfo("Successfully stopped SparkContext") } - /** * Get Spark's home location from either a value set through the constructor, * or the spark.home Java property, or the SPARK_HOME environment variable