From 163ba19f04bd93247d56e79a6cfa5950427d25e3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 19 Jan 2015 19:18:36 -0800 Subject: [PATCH 01/11] [SPARK-5190] Allow SparkListeners to be registered before SparkContext starts. --- .../scala/org/apache/spark/SparkContext.scala | 51 ++++++++++++++++--- .../spark/api/java/JavaSparkContext.scala | 17 ++++++- .../org/apache/spark/SparkContextSuite.scala | 21 +++++++- 3 files changed, 79 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6a354ed4d1486..95f702796d530 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -63,8 +63,12 @@ import org.apache.spark.util._ * * @param config a Spark Config object describing the application configuration. Any settings in * this config overrides the default configs as well as system properties. + * @param sparkListeners an optional list of [[SparkListener]]s to register. */ -class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient { +class SparkContext( + config: SparkConf, + sparkListeners: Seq[SparkListener] = Nil + ) extends Logging with ExecutorAllocationClient { // The call site where this SparkContext was constructed. private val creationSite: CallSite = Utils.getCallSite() @@ -89,7 +93,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * Create a SparkContext that loads settings from system properties (for instance, when * launching with ./bin/spark-submit). */ - def this() = this(new SparkConf()) + def this() = this(new SparkConf(), Nil) + + /** + * Alternative constructor for binary compatibility. + * + * @param config a Spark Config object describing the application configuration. Any settings in + * this config overrides the default configs as well as system properties. + */ + def this(config: SparkConf) = this(config, Nil) /** * :: DeveloperApi :: @@ -124,6 +136,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @param jars Collection of JARs to send to the cluster. These can be paths on the local file * system or HDFS, HTTP, HTTPS, or FTP URLs. * @param environment Environment variables to set on worker nodes. + * @param sparkListeners an optional list of [[SparkListener]]s to register. */ def this( master: String, @@ -131,12 +144,32 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map(), - preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) = - { - this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment)) + preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map(), + sparkListeners: Seq[SparkListener] = Nil) = { + this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment), + sparkListeners) this.preferredNodeLocationData = preferredNodeLocationData } + /** + * Alternative constructor for binary compatibility. + * + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param appName A name for your application, to display on the cluster web UI. + * @param sparkHome Location where Spark is installed on cluster nodes. + * @param jars Collection of JARs to send to the cluster. These can be paths on the local file + * system or HDFS, HTTP, HTTPS, or FTP URLs. + * @param environment Environment variables to set on worker nodes. + */ + def this( + master: String, + appName: String, + sparkHome: String, + jars: Seq[String], + environment: Map[String, String], + preferredNodeLocationData: Map[String, Set[SplitInfo]]) = + this(master, appName, sparkHome, jars, environment, preferredNodeLocationData, Nil) + // NOTE: The below constructors could be consolidated using default arguments. Due to // Scala bug SI-8479, however, this causes the compile step to fail when generating docs. // Until we have a good workaround for that bug the constructors remain broken out. @@ -148,7 +181,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @param appName A name for your application, to display on the cluster web UI. */ private[spark] def this(master: String, appName: String) = - this(master, appName, null, Nil, Map(), Map()) + this(master, appName, null, Nil, Map(), Map(), Nil) /** * Alternative constructor that allows setting common Spark properties directly @@ -158,7 +191,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @param sparkHome Location where Spark is installed on cluster nodes. */ private[spark] def this(master: String, appName: String, sparkHome: String) = - this(master, appName, sparkHome, Nil, Map(), Map()) + this(master, appName, sparkHome, Nil, Map(), Map(), Nil) /** * Alternative constructor that allows setting common Spark properties directly @@ -170,7 +203,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * system or HDFS, HTTP, HTTPS, or FTP URLs. */ private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) = - this(master, appName, sparkHome, jars, Map(), Map()) + this(master, appName, sparkHome, jars, Map(), Map(), Nil) // log out Spark Version in Spark driver log logInfo(s"Running Spark version $SPARK_VERSION") @@ -379,6 +412,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } executorAllocationManager.foreach(_.start()) + sparkListeners.foreach(listenerBus.addListener) + // At this point, all relevant SparkListeners have been registered, so begin releasing events listenerBus.start() diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 97f5c9f257e09..4cbc624ad9cc0 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -38,6 +38,7 @@ import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD} +import org.apache.spark.scheduler.SparkListener /** * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns @@ -104,7 +105,21 @@ class JavaSparkContext(val sc: SparkContext) */ def this(master: String, appName: String, sparkHome: String, jars: Array[String], environment: JMap[String, String]) = - this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, Map())) + this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, Map(), Nil)) + + /** + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param appName A name for your application, to display on the cluster web UI + * @param sparkHome The SPARK_HOME directory on the slave nodes + * @param jars Collection of JARs to send to the cluster. These can be paths on the local file + * system or HDFS, HTTP, HTTPS, or FTP URLs. + * @param environment Environment variables to set on worker nodes + * @param sparkListeners an optional list of [[SparkListener]]s to register. + */ + def this(master: String, appName: String, sparkHome: String, jars: Array[String], + environment: JMap[String, String], sparkListeners: Array[SparkListener]) = + this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, Map(), + sparkListeners)) private[spark] val env = sc.env diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 8b3c6871a7b39..e41610aaf5121 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -17,9 +17,14 @@ package org.apache.spark +import org.apache.hadoop.io.BytesWritable import org.scalatest.FunSuite +import org.scalatest.concurrent.Eventually._ -import org.apache.hadoop.io.BytesWritable +import scala.concurrent.duration._ +import scala.language.{implicitConversions, postfixOps} + +import org.apache.spark.scheduler.{SparkListener, SparkListenerEnvironmentUpdate} class SparkContextSuite extends FunSuite with LocalSparkContext { @@ -72,4 +77,18 @@ class SparkContextSuite extends FunSuite with LocalSparkContext { val byteArray2 = converter.convert(bytesWritable) assert(byteArray2.length === 0) } + + test("SparkListeners can be registered via the SparkContext constructor (SPARK-5190)") { + @volatile var gotEnvironmentUpdate: Boolean = false + val listener = new SparkListener { + override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { + gotEnvironmentUpdate = true + } + } + val conf = new SparkConf().setAppName("test").setMaster("local") + sc = new SparkContext(conf, Seq(listener)) + eventually(timeout(10 seconds)) { + assert(gotEnvironmentUpdate === true) + } + } } From 25988f377fc0cbeadbddd630f9564697b07ef877 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 19 Jan 2015 19:22:27 -0800 Subject: [PATCH 02/11] Add addSparkListener to JavaSparkContext --- .../org/apache/spark/api/java/JavaSparkContext.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 4cbc624ad9cc0..2a55edd2db0d4 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark._ import org.apache.spark.AccumulatorParam._ -import org.apache.spark.annotation.Experimental +import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD} @@ -688,6 +688,15 @@ class JavaSparkContext(val sc: SparkContext) sc.clearFiles() } + /** + * :: DeveloperApi :: + * Register a listener to receive up-calls from events that happen during execution. + */ + @DeveloperApi + def addSparkListener(listener: SparkListener): Unit = { + sc.addSparkListener(listener) + } + /** * Returns the Hadoop configuration used for the Hadoop code (e.g. file systems) we reuse. */ From 217ecc0e49285ffee7d9c951793cf2e5024efe9a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 20 Jan 2015 14:20:09 -0800 Subject: [PATCH 03/11] Revert "Add addSparkListener to JavaSparkContext" This reverts commit 25988f377fc0cbeadbddd630f9564697b07ef877. --- .../org/apache/spark/api/java/JavaSparkContext.scala | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 2a55edd2db0d4..4cbc624ad9cc0 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark._ import org.apache.spark.AccumulatorParam._ -import org.apache.spark.annotation.{DeveloperApi, Experimental} +import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD} @@ -688,15 +688,6 @@ class JavaSparkContext(val sc: SparkContext) sc.clearFiles() } - /** - * :: DeveloperApi :: - * Register a listener to receive up-calls from events that happen during execution. - */ - @DeveloperApi - def addSparkListener(listener: SparkListener): Unit = { - sc.addSparkListener(listener) - } - /** * Returns the Hadoop configuration used for the Hadoop code (e.g. file systems) we reuse. */ From 9c0d8f1c754e554059b7cfd70e7ce54abca9aa8a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 20 Jan 2015 14:20:16 -0800 Subject: [PATCH 04/11] Revert "[SPARK-5190] Allow SparkListeners to be registered before SparkContext starts." This reverts commit 163ba19f04bd93247d56e79a6cfa5950427d25e3. --- .../scala/org/apache/spark/SparkContext.scala | 51 +++---------------- .../spark/api/java/JavaSparkContext.scala | 17 +------ .../org/apache/spark/SparkContextSuite.scala | 21 +------- 3 files changed, 10 insertions(+), 79 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 95f702796d530..6a354ed4d1486 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -63,12 +63,8 @@ import org.apache.spark.util._ * * @param config a Spark Config object describing the application configuration. Any settings in * this config overrides the default configs as well as system properties. - * @param sparkListeners an optional list of [[SparkListener]]s to register. */ -class SparkContext( - config: SparkConf, - sparkListeners: Seq[SparkListener] = Nil - ) extends Logging with ExecutorAllocationClient { +class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient { // The call site where this SparkContext was constructed. private val creationSite: CallSite = Utils.getCallSite() @@ -93,15 +89,7 @@ class SparkContext( * Create a SparkContext that loads settings from system properties (for instance, when * launching with ./bin/spark-submit). */ - def this() = this(new SparkConf(), Nil) - - /** - * Alternative constructor for binary compatibility. - * - * @param config a Spark Config object describing the application configuration. Any settings in - * this config overrides the default configs as well as system properties. - */ - def this(config: SparkConf) = this(config, Nil) + def this() = this(new SparkConf()) /** * :: DeveloperApi :: @@ -136,7 +124,6 @@ class SparkContext( * @param jars Collection of JARs to send to the cluster. These can be paths on the local file * system or HDFS, HTTP, HTTPS, or FTP URLs. * @param environment Environment variables to set on worker nodes. - * @param sparkListeners an optional list of [[SparkListener]]s to register. */ def this( master: String, @@ -144,32 +131,12 @@ class SparkContext( sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map(), - preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map(), - sparkListeners: Seq[SparkListener] = Nil) = { - this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment), - sparkListeners) + preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) = + { + this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment)) this.preferredNodeLocationData = preferredNodeLocationData } - /** - * Alternative constructor for binary compatibility. - * - * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). - * @param appName A name for your application, to display on the cluster web UI. - * @param sparkHome Location where Spark is installed on cluster nodes. - * @param jars Collection of JARs to send to the cluster. These can be paths on the local file - * system or HDFS, HTTP, HTTPS, or FTP URLs. - * @param environment Environment variables to set on worker nodes. - */ - def this( - master: String, - appName: String, - sparkHome: String, - jars: Seq[String], - environment: Map[String, String], - preferredNodeLocationData: Map[String, Set[SplitInfo]]) = - this(master, appName, sparkHome, jars, environment, preferredNodeLocationData, Nil) - // NOTE: The below constructors could be consolidated using default arguments. Due to // Scala bug SI-8479, however, this causes the compile step to fail when generating docs. // Until we have a good workaround for that bug the constructors remain broken out. @@ -181,7 +148,7 @@ class SparkContext( * @param appName A name for your application, to display on the cluster web UI. */ private[spark] def this(master: String, appName: String) = - this(master, appName, null, Nil, Map(), Map(), Nil) + this(master, appName, null, Nil, Map(), Map()) /** * Alternative constructor that allows setting common Spark properties directly @@ -191,7 +158,7 @@ class SparkContext( * @param sparkHome Location where Spark is installed on cluster nodes. */ private[spark] def this(master: String, appName: String, sparkHome: String) = - this(master, appName, sparkHome, Nil, Map(), Map(), Nil) + this(master, appName, sparkHome, Nil, Map(), Map()) /** * Alternative constructor that allows setting common Spark properties directly @@ -203,7 +170,7 @@ class SparkContext( * system or HDFS, HTTP, HTTPS, or FTP URLs. */ private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) = - this(master, appName, sparkHome, jars, Map(), Map(), Nil) + this(master, appName, sparkHome, jars, Map(), Map()) // log out Spark Version in Spark driver log logInfo(s"Running Spark version $SPARK_VERSION") @@ -412,8 +379,6 @@ class SparkContext( } executorAllocationManager.foreach(_.start()) - sparkListeners.foreach(listenerBus.addListener) - // At this point, all relevant SparkListeners have been registered, so begin releasing events listenerBus.start() diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 4cbc624ad9cc0..97f5c9f257e09 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -38,7 +38,6 @@ import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD} -import org.apache.spark.scheduler.SparkListener /** * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns @@ -105,21 +104,7 @@ class JavaSparkContext(val sc: SparkContext) */ def this(master: String, appName: String, sparkHome: String, jars: Array[String], environment: JMap[String, String]) = - this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, Map(), Nil)) - - /** - * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). - * @param appName A name for your application, to display on the cluster web UI - * @param sparkHome The SPARK_HOME directory on the slave nodes - * @param jars Collection of JARs to send to the cluster. These can be paths on the local file - * system or HDFS, HTTP, HTTPS, or FTP URLs. - * @param environment Environment variables to set on worker nodes - * @param sparkListeners an optional list of [[SparkListener]]s to register. - */ - def this(master: String, appName: String, sparkHome: String, jars: Array[String], - environment: JMap[String, String], sparkListeners: Array[SparkListener]) = - this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, Map(), - sparkListeners)) + this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, Map())) private[spark] val env = sc.env diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index e41610aaf5121..8b3c6871a7b39 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -17,14 +17,9 @@ package org.apache.spark -import org.apache.hadoop.io.BytesWritable import org.scalatest.FunSuite -import org.scalatest.concurrent.Eventually._ - -import scala.concurrent.duration._ -import scala.language.{implicitConversions, postfixOps} -import org.apache.spark.scheduler.{SparkListener, SparkListenerEnvironmentUpdate} +import org.apache.hadoop.io.BytesWritable class SparkContextSuite extends FunSuite with LocalSparkContext { @@ -77,18 +72,4 @@ class SparkContextSuite extends FunSuite with LocalSparkContext { val byteArray2 = converter.convert(bytesWritable) assert(byteArray2.length === 0) } - - test("SparkListeners can be registered via the SparkContext constructor (SPARK-5190)") { - @volatile var gotEnvironmentUpdate: Boolean = false - val listener = new SparkListener { - override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { - gotEnvironmentUpdate = true - } - } - val conf = new SparkConf().setAppName("test").setMaster("local") - sc = new SparkContext(conf, Seq(listener)) - eventually(timeout(10 seconds)) { - assert(gotEnvironmentUpdate === true) - } - } } From b22b379dcb7dcb24e86fb9ab17f4de6ec2d2e5fe Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 20 Jan 2015 15:42:21 -0800 Subject: [PATCH 05/11] Instantiate SparkListeners from classes listed in configurations. --- .../scala/org/apache/spark/SparkContext.scala | 35 +++++++++ .../spark/scheduler/SparkListenerBus.scala | 4 +- .../spark/scheduler/SparkListenerSuite.scala | 71 ++++++++++++++----- docs/configuration.md | 11 +++ 4 files changed, 101 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6a354ed4d1486..79310c28cdbd2 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -379,6 +379,41 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } executorAllocationManager.foreach(_.start()) + // Use reflection to instantiate listeners specified via the `spark.extraListeners` configuration + // or the SPARK_EXTRA_LISTENERS environment variable + try { + val listenerClassNames: Seq[String] = { + val fromSparkConf = conf.get("spark.extraListeners", "").split(',') + val fromEnvVar = Option(conf.getenv("SPARK_EXTRA_LISTENERS")).getOrElse("").split(',') + (fromSparkConf ++ fromEnvVar).map(_.trim).filter(_ != "") + } + for (className <- listenerClassNames) { + val listenerClass = Class.forName(className).asInstanceOf[Class[_ <: SparkListener]] + val listener = try { + listenerClass.getConstructor(classOf[SparkConf]).newInstance(conf) + } catch { + case e: NoSuchMethodException => + try { + listenerClass.newInstance() + } catch { + case e: NoSuchMethodException => + throw new SparkException( + s"$listenerClass did not have a zero-argument constructor or a" + + " single-argument constructor that accepts SparkConf (is it a nested Scala class?)") + } + } + listenerBus.addListener(listener) + logInfo(s"Registered listener $listenerClass") + } + } catch { + case e: Exception => + try { + stop() + } finally { + throw new SparkException(s"Exception when registering SparkListener", e) + } + } + // At this point, all relevant SparkListeners have been registered, so begin releasing events listenerBus.start() diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index e700c6af542f4..57840a81ec342 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -28,8 +28,8 @@ import org.apache.spark.util.Utils */ private[spark] trait SparkListenerBus extends Logging { - // SparkListeners attached to this event bus - protected val sparkListeners = new ArrayBuffer[SparkListener] + // SparkListeners attached to this event bus. Marked `protected[spark]` for access in tests. + protected[spark] val sparkListeners = new ArrayBuffer[SparkListener] with mutable.SynchronizedBuffer[SparkListener] def addListener(listener: SparkListener) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 0fb1bdd30d975..ec9edbc3b5986 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -21,25 +21,21 @@ import java.util.concurrent.Semaphore import scala.collection.mutable -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} -import org.scalatest.Matchers - -import org.apache.spark.{LocalSparkContext, SparkContext} +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext} import org.apache.spark.executor.TaskMetrics import org.apache.spark.util.ResetSystemProperties -class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers with BeforeAndAfter - with BeforeAndAfterAll with ResetSystemProperties { +import org.mockito.Mockito._ +import org.scalatest.{FunSuite, Matchers} + +class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers + with ResetSystemProperties { /** Length of time to wait while draining listener events. */ val WAIT_TIMEOUT_MILLIS = 10000 val jobCompletionTime = 1421191296660L - before { - sc = new SparkContext("local", "SparkListenerSuite") - } - test("basic creation and shutdown of LiveListenerBus") { val counter = new BasicJobCounter val bus = new LiveListenerBus @@ -127,6 +123,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers } test("basic creation of StageInfo") { + sc = new SparkContext("local", "SparkListenerSuite") val listener = new SaveStageAndTaskInfo sc.addSparkListener(listener) val rdd1 = sc.parallelize(1 to 100, 4) @@ -148,6 +145,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers } test("basic creation of StageInfo with shuffle") { + sc = new SparkContext("local", "SparkListenerSuite") val listener = new SaveStageAndTaskInfo sc.addSparkListener(listener) val rdd1 = sc.parallelize(1 to 100, 4) @@ -185,6 +183,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers } test("StageInfo with fewer tasks than partitions") { + sc = new SparkContext("local", "SparkListenerSuite") val listener = new SaveStageAndTaskInfo sc.addSparkListener(listener) val rdd1 = sc.parallelize(1 to 100, 4) @@ -201,6 +200,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers } test("local metrics") { + sc = new SparkContext("local", "SparkListenerSuite") val listener = new SaveStageAndTaskInfo sc.addSparkListener(listener) sc.addSparkListener(new StatsReportListener) @@ -267,6 +267,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers } test("onTaskGettingResult() called when result fetched remotely") { + sc = new SparkContext("local", "SparkListenerSuite") val listener = new SaveTaskEvents sc.addSparkListener(listener) @@ -287,6 +288,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers } test("onTaskGettingResult() not called when result sent directly") { + sc = new SparkContext("local", "SparkListenerSuite") val listener = new SaveTaskEvents sc.addSparkListener(listener) @@ -302,6 +304,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers } test("onTaskEnd() should be called for all started tasks, even after job has been killed") { + sc = new SparkContext("local", "SparkListenerSuite") val WAIT_TIMEOUT_MILLIS = 10000 val listener = new SaveTaskEvents sc.addSparkListener(listener) @@ -356,6 +359,30 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers assert(jobCounter2.count === 5) } + test("registering listeners via spark.extraListeners") { + val conf = new SparkConf().setMaster("local").setAppName("test") + .set("spark.extraListeners", classOf[ListenerThatAcceptsSparkConf].getName + "," + + classOf[BasicJobCounter].getName) + sc = new SparkContext(conf) + sc.listenerBus.sparkListeners.collect { case x: BasicJobCounter => x}.size should be (1) + sc.listenerBus.sparkListeners.collect { + case x: ListenerThatAcceptsSparkConf => x + }.size should be (1) + } + + test("registering listeners via SPARK_EXTRA_LISTENERS") { + val SPARK_EXTRA_LISTENERS = classOf[ListenerThatAcceptsSparkConf].getName + "," + + classOf[BasicJobCounter].getName + val conf = spy(new SparkConf().setMaster("local").setAppName("test")) + when(conf.getenv("SPARK_EXTRA_LISTENERS")).thenReturn(SPARK_EXTRA_LISTENERS) + when(conf.clone).thenReturn(conf) // so that our mock is still used + sc = new SparkContext(conf) + sc.listenerBus.sparkListeners.collect { case x: BasicJobCounter => x}.size should be (1) + sc.listenerBus.sparkListeners.collect { + case x: ListenerThatAcceptsSparkConf => x + }.size should be (1) + } + /** * Assert that the given list of numbers has an average that is greater than zero. */ @@ -363,14 +390,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers assert(m.sum / m.size.toDouble > 0.0, msg) } - /** - * A simple listener that counts the number of jobs observed. - */ - private class BasicJobCounter extends SparkListener { - var count = 0 - override def onJobEnd(job: SparkListenerJobEnd) = count += 1 - } - /** * A simple listener that saves all task infos and task metrics. */ @@ -423,3 +442,19 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers } } + +// These classes can't be declared inside of the SparkListenerSuite class because we don't want +// their constructors to contain references to SparkListenerSuite: + +/** + * A simple listener that counts the number of jobs observed. + */ +private class BasicJobCounter extends SparkListener { + var count = 0 + override def onJobEnd(job: SparkListenerJobEnd) = count += 1 +} + +private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListener { + var count = 0 + override def onJobEnd(job: SparkListenerJobEnd) = count += 1 +} diff --git a/docs/configuration.md b/docs/configuration.md index efbab4085317a..4a4527d38d8ce 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -190,6 +190,17 @@ of the most common options to set are: Logs the effective SparkConf as INFO when a SparkContext is started. + + spark.extraListeners + (none) + + A comma-separated list of classes that implement SparkListener; when initializing + SparkContext, instances of these classes will be created and registered with Spark's listener + bus. If a class has a single-argument constructor that accepts a SparkConf, that constructor + will be called; otherwise, a zero-argument constructor will be called. If no valid constructor + can be found, the SparkContext creation will fail with an exception. + + Apart from these, the following properties are also available, and may be useful in some situations: From d0d276dee992515e3ab2ab3f930cf489defd2b98 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 25 Jan 2015 19:42:03 -0800 Subject: [PATCH 06/11] Move code into setupAndStartListenerBus() method --- .../scala/org/apache/spark/SparkContext.scala | 86 +++++++++++-------- .../spark/scheduler/LiveListenerBus.scala | 2 + 2 files changed, 50 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 79310c28cdbd2..45ddc587c1b41 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -379,44 +379,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } executorAllocationManager.foreach(_.start()) - // Use reflection to instantiate listeners specified via the `spark.extraListeners` configuration - // or the SPARK_EXTRA_LISTENERS environment variable - try { - val listenerClassNames: Seq[String] = { - val fromSparkConf = conf.get("spark.extraListeners", "").split(',') - val fromEnvVar = Option(conf.getenv("SPARK_EXTRA_LISTENERS")).getOrElse("").split(',') - (fromSparkConf ++ fromEnvVar).map(_.trim).filter(_ != "") - } - for (className <- listenerClassNames) { - val listenerClass = Class.forName(className).asInstanceOf[Class[_ <: SparkListener]] - val listener = try { - listenerClass.getConstructor(classOf[SparkConf]).newInstance(conf) - } catch { - case e: NoSuchMethodException => - try { - listenerClass.newInstance() - } catch { - case e: NoSuchMethodException => - throw new SparkException( - s"$listenerClass did not have a zero-argument constructor or a" + - " single-argument constructor that accepts SparkConf (is it a nested Scala class?)") - } - } - listenerBus.addListener(listener) - logInfo(s"Registered listener $listenerClass") - } - } catch { - case e: Exception => - try { - stop() - } finally { - throw new SparkException(s"Exception when registering SparkListener", e) - } - } - - // At this point, all relevant SparkListeners have been registered, so begin releasing events - listenerBus.start() - private[spark] val cleaner: Option[ContextCleaner] = { if (conf.getBoolean("spark.cleaner.referenceTracking", true)) { Some(new ContextCleaner(this)) @@ -426,6 +388,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } cleaner.foreach(_.start()) + setupAndStartListenerBus() postEnvironmentUpdate() postApplicationStart() @@ -1521,6 +1484,53 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** Register a new RDD, returning its RDD ID */ private[spark] def newRddId(): Int = nextRddId.getAndIncrement() + /** + * Registers listeners specified in spark.extraListeners, then starts the listener bus. + * This should be called after all internal listeners have been registered with the listener bus + * (e.g. after the web UI and event logging listeners have been registered). + */ + private def setupAndStartListenerBus(): Unit = { + if (listenerBus.hasBeenStarted) { + throw new IllegalStateException("listener bus has already been started") + } + // Use reflection to instantiate listeners specified via the `spark.extraListeners` + // configuration or the SPARK_EXTRA_LISTENERS environment variable + try { + val listenerClassNames: Seq[String] = { + val fromSparkConf = conf.get("spark.extraListeners", "").split(',') + val fromEnvVar = Option(conf.getenv("SPARK_EXTRA_LISTENERS")).getOrElse("").split(',') + (fromSparkConf ++ fromEnvVar).map(_.trim).filter(_ != "") + } + for (className <- listenerClassNames) { + val listenerClass = Class.forName(className).asInstanceOf[Class[_ <: SparkListener]] + val listener = try { + listenerClass.getConstructor(classOf[SparkConf]).newInstance(conf) + } catch { + case e: NoSuchMethodException => + try { + listenerClass.newInstance() + } catch { + case e: NoSuchMethodException => + throw new SparkException( + s"$listenerClass did not have a zero-argument constructor or a" + + " single-argument constructor that accepts SparkConf (is it a nested Scala class?)") + } + } + listenerBus.addListener(listener) + logInfo(s"Registered listener $listenerClass") + } + } catch { + case e: Exception => + try { + stop() + } finally { + throw new SparkException(s"Exception when registering SparkListener", e) + } + } + + listenerBus.start() + } + /** Post the application start event */ private def postApplicationStart() { // Note: this code assumes that the task scheduler has been initialized and has contacted diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 36a6e6338faa6..c4df779928604 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -38,6 +38,8 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { private var queueFullErrorMessageLogged = false private var started = false + private[spark] def hasBeenStarted: Boolean = started + // A counter that represents the number of events produced and consumed in the queue private val eventLock = new Semaphore(0) From d6f3113c890c721f50b0944671e7eabc906262be Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 25 Jan 2015 20:21:39 -0800 Subject: [PATCH 07/11] Use getConstructors() instead of try-catch to find right constructor. --- .../scala/org/apache/spark/SparkContext.scala | 40 ++++++++++++------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 45ddc587c1b41..50dd260fb23ef 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -20,6 +20,7 @@ package org.apache.spark import scala.language.implicitConversions import java.io._ +import java.lang.reflect.Constructor import java.net.URI import java.util.{Arrays, Properties, UUID} import java.util.concurrent.atomic.AtomicInteger @@ -1502,22 +1503,33 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli (fromSparkConf ++ fromEnvVar).map(_.trim).filter(_ != "") } for (className <- listenerClassNames) { - val listenerClass = Class.forName(className).asInstanceOf[Class[_ <: SparkListener]] - val listener = try { - listenerClass.getConstructor(classOf[SparkConf]).newInstance(conf) - } catch { - case e: NoSuchMethodException => - try { - listenerClass.newInstance() - } catch { - case e: NoSuchMethodException => - throw new SparkException( - s"$listenerClass did not have a zero-argument constructor or a" + - " single-argument constructor that accepts SparkConf (is it a nested Scala class?)") - } + val constructors = { + val listenerClass = Class.forName(className) + listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: SparkListener]]] + } + val constructorTakingSparkConf = constructors.find { c => + c.getParameterTypes.sameElements(Array(classOf[SparkConf])) + } + lazy val zeroArgumentConstructor = constructors.find { c => + c.getParameterTypes.isEmpty + } + val listener: SparkListener = { + if (constructorTakingSparkConf.isDefined) { + constructorTakingSparkConf.get.newInstance(conf) + } else if (zeroArgumentConstructor.isDefined) { + zeroArgumentConstructor.get.newInstance() + } else { + throw new SparkException( + s"$className did not have a zero-argument constructor or a" + + " single-argument constructor that accepts SparkConf. Note: if the class is" + + " defined inside of another Scala class, then its constructors may accept an" + + " implicit parameter that references the enclosing class; in this case, you must" + + " define the listener as a top-level class in order to prevent this extra" + + " parameter from breaking Spark's ability to find a valid constructor.") + } } listenerBus.addListener(listener) - logInfo(s"Registered listener $listenerClass") + logInfo(s"Registered listener $className") } } catch { case e: Exception => From b9973dac04c76a51e7bc163087483a4f43a88ec7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 25 Jan 2015 20:25:25 -0800 Subject: [PATCH 08/11] Add test to ensure that conf and env var settings are merged, not overriden. --- .../spark/scheduler/SparkListenerSuite.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index ec9edbc3b5986..bca4808b3c0b8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -383,6 +383,21 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers }.size should be (1) } + test("spark.extraListeners and SPARK_EXTRA_LISTENERS configurations are merged") { + // This test ensures that we don't accidentally change the behavior such that one setting + // overrides the other: + val SPARK_EXTRA_LISTENERS = classOf[ListenerThatAcceptsSparkConf].getName + val conf = spy(new SparkConf().setMaster("local").setAppName("test") + .set("spark.extraListeners", classOf[BasicJobCounter].getName)) + when(conf.getenv("SPARK_EXTRA_LISTENERS")).thenReturn(SPARK_EXTRA_LISTENERS) + when(conf.clone).thenReturn(conf) // so that our mock is still used + sc = new SparkContext(conf) + sc.listenerBus.sparkListeners.collect { case x: BasicJobCounter => x}.size should be (1) + sc.listenerBus.sparkListeners.collect { + case x: ListenerThatAcceptsSparkConf => x + }.size should be (1) + } + /** * Assert that the given list of numbers has an average that is greater than zero. */ From 2daff9b3387f1d924bc5dbd847b9439338bd5afb Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 25 Jan 2015 20:31:18 -0800 Subject: [PATCH 09/11] Add a couple of explanatory comments for SPARK_EXTRA_LISTENERS. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 50dd260fb23ef..5a4891aff2b3a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1495,14 +1495,20 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli throw new IllegalStateException("listener bus has already been started") } // Use reflection to instantiate listeners specified via the `spark.extraListeners` - // configuration or the SPARK_EXTRA_LISTENERS environment variable + // configuration or the SPARK_EXTRA_LISTENERS environment variable. The purpose of + // SPARK_EXTRA_LISTENERS is to allow the execution environment to inject custom listeners + // without having to worry about them being overridden by user settings in SparkConf. try { val listenerClassNames: Seq[String] = { + // Merge configurations from both sources val fromSparkConf = conf.get("spark.extraListeners", "").split(',') val fromEnvVar = Option(conf.getenv("SPARK_EXTRA_LISTENERS")).getOrElse("").split(',') + // Filter out empty entries, which could occur when overriding environment variables + // (e.g. `export SPARK_EXTRA_LISTENERS="foo,$SPARK_EXTRA_LISTENERS`) (fromSparkConf ++ fromEnvVar).map(_.trim).filter(_ != "") } for (className <- listenerClassNames) { + // Use reflection to find the right constructor val constructors = { val listenerClass = Class.forName(className) listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: SparkListener]]] From 1a5b9a011ebd3fc0d2e210b536d3c40e42a3689a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 26 Jan 2015 13:43:46 -0800 Subject: [PATCH 10/11] Remove SPARK_EXTRA_LISTENERS environment variable. --- .../scala/org/apache/spark/SparkContext.scala | 15 ++------ .../spark/scheduler/SparkListenerSuite.scala | 35 ++----------------- 2 files changed, 6 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5a4891aff2b3a..444b7c905a2c9 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1494,19 +1494,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli if (listenerBus.hasBeenStarted) { throw new IllegalStateException("listener bus has already been started") } - // Use reflection to instantiate listeners specified via the `spark.extraListeners` - // configuration or the SPARK_EXTRA_LISTENERS environment variable. The purpose of - // SPARK_EXTRA_LISTENERS is to allow the execution environment to inject custom listeners - // without having to worry about them being overridden by user settings in SparkConf. + // Use reflection to instantiate listeners specified via `spark.extraListeners` try { - val listenerClassNames: Seq[String] = { - // Merge configurations from both sources - val fromSparkConf = conf.get("spark.extraListeners", "").split(',') - val fromEnvVar = Option(conf.getenv("SPARK_EXTRA_LISTENERS")).getOrElse("").split(',') - // Filter out empty entries, which could occur when overriding environment variables - // (e.g. `export SPARK_EXTRA_LISTENERS="foo,$SPARK_EXTRA_LISTENERS`) - (fromSparkConf ++ fromEnvVar).map(_.trim).filter(_ != "") - } + val listenerClassNames: Seq[String] = + conf.get("spark.extraListeners", "").split(',').map(_.trim).filter(_ != "") for (className <- listenerClassNames) { // Use reflection to find the right constructor val constructors = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index bca4808b3c0b8..47ab8bd0717b0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -21,12 +21,11 @@ import java.util.concurrent.Semaphore import scala.collection.mutable -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext} +import org.scalatest.{FunSuite, Matchers} + import org.apache.spark.executor.TaskMetrics import org.apache.spark.util.ResetSystemProperties - -import org.mockito.Mockito._ -import org.scalatest.{FunSuite, Matchers} +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext} class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers with ResetSystemProperties { @@ -370,34 +369,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers }.size should be (1) } - test("registering listeners via SPARK_EXTRA_LISTENERS") { - val SPARK_EXTRA_LISTENERS = classOf[ListenerThatAcceptsSparkConf].getName + "," + - classOf[BasicJobCounter].getName - val conf = spy(new SparkConf().setMaster("local").setAppName("test")) - when(conf.getenv("SPARK_EXTRA_LISTENERS")).thenReturn(SPARK_EXTRA_LISTENERS) - when(conf.clone).thenReturn(conf) // so that our mock is still used - sc = new SparkContext(conf) - sc.listenerBus.sparkListeners.collect { case x: BasicJobCounter => x}.size should be (1) - sc.listenerBus.sparkListeners.collect { - case x: ListenerThatAcceptsSparkConf => x - }.size should be (1) - } - - test("spark.extraListeners and SPARK_EXTRA_LISTENERS configurations are merged") { - // This test ensures that we don't accidentally change the behavior such that one setting - // overrides the other: - val SPARK_EXTRA_LISTENERS = classOf[ListenerThatAcceptsSparkConf].getName - val conf = spy(new SparkConf().setMaster("local").setAppName("test") - .set("spark.extraListeners", classOf[BasicJobCounter].getName)) - when(conf.getenv("SPARK_EXTRA_LISTENERS")).thenReturn(SPARK_EXTRA_LISTENERS) - when(conf.clone).thenReturn(conf) // so that our mock is still used - sc = new SparkContext(conf) - sc.listenerBus.sparkListeners.collect { case x: BasicJobCounter => x}.size should be (1) - sc.listenerBus.sparkListeners.collect { - case x: ListenerThatAcceptsSparkConf => x - }.size should be (1) - } - /** * Assert that the given list of numbers has an average that is greater than zero. */ From 8370839914a6f28af5c3c2d17364ff5d5e994b70 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 2 Feb 2015 08:58:40 -0800 Subject: [PATCH 11/11] Two minor fixes after merging with master --- core/src/main/scala/org/apache/spark/SparkContext.scala | 3 --- core/src/main/scala/org/apache/spark/util/ListenerBus.scala | 3 ++- .../org/apache/spark/scheduler/SparkListenerSuite.scala | 5 +++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 08597a96fcb81..f454df48f127c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1535,9 +1535,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * (e.g. after the web UI and event logging listeners have been registered). */ private def setupAndStartListenerBus(): Unit = { - if (listenerBus.hasBeenStarted) { - throw new IllegalStateException("listener bus has already been started") - } // Use reflection to instantiate listeners specified via `spark.extraListeners` try { val listenerClassNames: Seq[String] = diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index bd0aa4dc4650f..d60b8b9a31a9b 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -28,7 +28,8 @@ import org.apache.spark.Logging */ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { - private val listeners = new CopyOnWriteArrayList[L] + // Marked `private[spark]` for access in tests. + private[spark] val listeners = new CopyOnWriteArrayList[L] /** * Add a listener to listen events. This method is thread-safe and can be called in any thread. diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 47ab8bd0717b0..3a41ee8d4ae0c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler import java.util.concurrent.Semaphore import scala.collection.mutable +import scala.collection.JavaConversions._ import org.scalatest.{FunSuite, Matchers} @@ -363,8 +364,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers .set("spark.extraListeners", classOf[ListenerThatAcceptsSparkConf].getName + "," + classOf[BasicJobCounter].getName) sc = new SparkContext(conf) - sc.listenerBus.sparkListeners.collect { case x: BasicJobCounter => x}.size should be (1) - sc.listenerBus.sparkListeners.collect { + sc.listenerBus.listeners.collect { case x: BasicJobCounter => x}.size should be (1) + sc.listenerBus.listeners.collect { case x: ListenerThatAcceptsSparkConf => x }.size should be (1) }