From 8854dabc0e51b809b939edd2f88000c13024ccce Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Fri, 13 Dec 2013 09:22:56 -0800 Subject: [PATCH 01/12] Print out the class that was not found --- .../org/apache/spark/scheduler/cluster/TaskResultGetter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala index 2064d97b49cc0..a0b63a4ef1f13 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala @@ -70,7 +70,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterSche } catch { case cnf: ClassNotFoundException => val loader = Thread.currentThread.getContextClassLoader - taskSetManager.abort("ClassNotFound with classloader: " + loader) + taskSetManager.abort("ClassNotFound [" + cnf.getMessage + "] with classloader: " + loader) case ex => taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex)) } From ae1c199a88cb0ba5c12e49cfe200b0cee97ab1e1 Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Fri, 13 Dec 2013 15:40:14 -0800 Subject: [PATCH 02/12] Fix bug where classes in jars added via addJar() are not available to the driver Adds a driver-specific class loader that knows about the jars added via addJar(), and initialize it first thing in SparkEnv, so that all other ThreadPool/ActorSystem initialization will take advantage of the classloader, and user-added jars are made available to all Spark subsystems. --- .../scala/org/apache/spark/SparkContext.scala | 23 ++++++++++++++++++- .../scala/org/apache/spark/SparkEnv.scala | 21 +++++++++++++++-- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3da13133dacde..b0de2f7aa0d6d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -18,7 +18,7 @@ package org.apache.spark import java.io._ -import java.net.URI +import java.net.{URI, URL} import java.util.Properties import java.util.concurrent.atomic.AtomicInteger @@ -610,6 +610,7 @@ class SparkContext( /** * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. + * This also makes the JAR available to this driver process. * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. */ @@ -648,12 +649,32 @@ class SparkContext( case _ => path } + + // Add jar to driver class loader so it is available for driver, even if it is not on the classpath + uri.getScheme match { + case null | "file" | "local" => + // Assume file exists on current (driver) node as well. Unlike executors, driver doesn't need to + // download the jar since it's local. + addUrlToDriverLoader(new URL("file:" + uri.getPath)) + case "http" | "https" | "ftp" => + // Should be handled by the URLClassLoader, pass along entire URL + addUrlToDriverLoader(new URL(path)) + case other => + logWarning("This URI scheme for URI " + path + " is not supported by the driver class loader") + } } addedJars(key) = System.currentTimeMillis logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key)) } } + private def addUrlToDriverLoader(url: URL) { + if (!env.classLoader.getURLs.contains(url)) { + logInfo("Adding JAR " + url + " to driver class loader") + env.classLoader.addURL(url) + } + } + /** * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to * any new nodes. diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index ff2df8fb6a2fc..8e55d3c0a2687 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -17,6 +17,8 @@ package org.apache.spark +import java.net.URL + import collection.mutable import serializer.Serializer @@ -24,6 +26,7 @@ import akka.actor.{Actor, ActorRef, Props, ActorSystemImpl, ActorSystem} import akka.remote.RemoteActorRefProvider import org.apache.spark.broadcast.BroadcastManager +import org.apache.spark.executor.ExecutorURLClassLoader import org.apache.spark.metrics.MetricsSystem import org.apache.spark.storage.{BlockManagerMasterActor, BlockManager, BlockManagerMaster} import org.apache.spark.network.ConnectionManager @@ -54,7 +57,8 @@ class SparkEnv ( val connectionManager: ConnectionManager, val httpFileServer: HttpFileServer, val sparkFilesDir: String, - val metricsSystem: MetricsSystem) { + val metricsSystem: MetricsSystem, + val classLoader: ExecutorURLClassLoader) { private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() @@ -116,6 +120,12 @@ object SparkEnv extends Logging { isDriver: Boolean, isLocal: Boolean): SparkEnv = { + // Create a classLoader for use by the driver so that jars added via addJar are available to the driver + // Do this before all other initialization so that any thread pools created for this SparkContext + // uses the class loader + val driverLoader = getDriverClassLoader() + Thread.currentThread.setContextClassLoader(driverLoader) + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port) // Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port), @@ -230,6 +240,13 @@ object SparkEnv extends Logging { connectionManager, httpFileServer, sparkFilesDir, - metricsSystem) + metricsSystem, + driverLoader) + } + + private def getDriverClassLoader(): ExecutorURLClassLoader = { + // Initially there are no jars + val parentLoader = this.getClass.getClassLoader + new ExecutorURLClassLoader(Array.empty[URL], parentLoader) } } From d3df2415eaed4da50fc1ffeef42e0140f6b49d88 Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Sun, 29 Dec 2013 00:15:17 -0800 Subject: [PATCH 03/12] Move classLoader initialization to SparkContext SparkEnv is used by Executor as well; we want this change to affect driver only. --- .../scala/org/apache/spark/SparkContext.scala | 11 +++++++++-- .../scala/org/apache/spark/SparkEnv.scala | 19 ++----------------- 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 75185f0cbfc97..1345abc8380ce 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -52,6 +52,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.mesos.MesosNativeLibrary import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} +import org.apache.spark.executor.ExecutorURLClassLoader import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ @@ -101,6 +102,12 @@ class SparkContext( val isLocal = (master == "local" || master.startsWith("local[")) + // Create a classLoader for use by the driver so that jars added via addJar are available to the driver + // Do this before all other initialization so that any thread pools created for this SparkContext + // uses the class loader + private[spark] val classLoader = new ExecutorURLClassLoader(Array.empty[URL], this.getClass.getClassLoader) + Thread.currentThread.setContextClassLoader(classLoader) + // Create the Spark execution environment (cache, map output tracker, etc) private[spark] val env = SparkEnv.createFromSystemProperties( "", @@ -667,9 +674,9 @@ class SparkContext( } private def addUrlToDriverLoader(url: URL) { - if (!env.classLoader.getURLs.contains(url)) { + if (!classLoader.getURLs.contains(url)) { logInfo("Adding JAR " + url + " to driver class loader") - env.classLoader.addURL(url) + classLoader.addURL(url) } } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index ed47440323ec1..94ad6b9f4046b 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -26,7 +26,6 @@ import akka.actor._ import akka.remote.RemoteActorRefProvider import org.apache.spark.broadcast.BroadcastManager -import org.apache.spark.executor.ExecutorURLClassLoader import org.apache.spark.metrics.MetricsSystem import org.apache.spark.storage.{BlockManagerMasterActor, BlockManager, BlockManagerMaster} import org.apache.spark.network.ConnectionManager @@ -57,8 +56,7 @@ class SparkEnv ( val connectionManager: ConnectionManager, val httpFileServer: HttpFileServer, val sparkFilesDir: String, - val metricsSystem: MetricsSystem, - val classLoader: ExecutorURLClassLoader) { + val metricsSystem: MetricsSystem) { private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() @@ -121,12 +119,6 @@ object SparkEnv extends Logging { isDriver: Boolean, isLocal: Boolean): SparkEnv = { - // Create a classLoader for use by the driver so that jars added via addJar are available to the driver - // Do this before all other initialization so that any thread pools created for this SparkContext - // uses the class loader - val driverLoader = getDriverClassLoader() - Thread.currentThread.setContextClassLoader(driverLoader) - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port) // Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port), @@ -241,13 +233,6 @@ object SparkEnv extends Logging { connectionManager, httpFileServer, sparkFilesDir, - metricsSystem, - driverLoader) - } - - private def getDriverClassLoader(): ExecutorURLClassLoader = { - // Initially there are no jars - val parentLoader = this.getClass.getClassLoader - new ExecutorURLClassLoader(Array.empty[URL], parentLoader) + metricsSystem) } } From 5c24b0bc541a61cb3551fee9386602a2472b571a Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Sun, 29 Dec 2013 00:25:53 -0800 Subject: [PATCH 04/12] Pass the current class loader when creating Akka threadpool --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 7 ++++--- core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 8 +++++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 94ad6b9f4046b..c2df0ef1f6228 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -119,7 +119,10 @@ object SparkEnv extends Logging { isDriver: Boolean, isLocal: Boolean): SparkEnv = { - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port) + val classLoader = Thread.currentThread.getContextClassLoader + + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, + classLoader = classLoader) // Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port), // figure out which port number Akka actually bound to and set spark.driver.port to it. @@ -137,8 +140,6 @@ object SparkEnv extends Logging { System.setProperty("spark.hostPort", hostname + ":" + boundPort) } - val classLoader = Thread.currentThread.getContextClassLoader - // Create an instance of the class named by the given Java system property, or by // defaultClassName if the property is not set, and return it as a T def instantiateClass[T](propertyName: String, defaultClassName: String): T = { diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 1c8b51b8bc1e1..fb09b7ed44f68 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -37,7 +37,9 @@ private[spark] object AkkaUtils { * If indestructible is set to true, the Actor System will continue running in the event * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]]. */ - def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false) + def createActorSystem(name: String, host: String, port: Int, + indestructible: Boolean = false, + classLoader: ClassLoader = this.getClass.getClassLoader) : (ActorSystem, Int) = { val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt @@ -76,9 +78,9 @@ private[spark] object AkkaUtils { """.stripMargin) val actorSystem = if (indestructible) { - IndestructibleActorSystem(name, akkaConf) + IndestructibleActorSystem(name, akkaConf, classLoader) } else { - ActorSystem(name, akkaConf) + ActorSystem(name, akkaConf, classLoader) } val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider From cbabe809baf1f02c7c9097734b84a3c94ce364db Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Sun, 12 Jan 2014 07:59:24 -0800 Subject: [PATCH 05/12] CR from pwendell --- core/src/main/scala/org/apache/spark/SparkContext.scala | 9 +++++---- .../src/main/scala/org/apache/spark/util/AkkaUtils.scala | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1345abc8380ce..d2f915fc248f6 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -102,10 +102,11 @@ class SparkContext( val isLocal = (master == "local" || master.startsWith("local[")) - // Create a classLoader for use by the driver so that jars added via addJar are available to the driver - // Do this before all other initialization so that any thread pools created for this SparkContext - // uses the class loader - private[spark] val classLoader = new ExecutorURLClassLoader(Array.empty[URL], this.getClass.getClassLoader) + // Create a classLoader for use by the driver so that jars added via addJar are available to the + // driver. Do this before all other initialization so that any thread pools created for this + // SparkContext uses the class loader. + private[spark] val classLoader = new ExecutorURLClassLoader(Array.empty[URL], + this.getClass.getClassLoader) Thread.currentThread.setContextClassLoader(classLoader) // Create the Spark execution environment (cache, map output tracker, etc) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index fb09b7ed44f68..c114046e84d88 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -39,7 +39,7 @@ private[spark] object AkkaUtils { */ def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false, - classLoader: ClassLoader = this.getClass.getClassLoader) + classLoader: ClassLoader = Thread.currentThread.getContextClassLoader) : (ActorSystem, Int) = { val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt From 97a19b5f38a84376176aef8b6e35ef578b29d3c7 Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Thu, 16 Jan 2014 22:19:27 -0800 Subject: [PATCH 06/12] CR: Rename ExecutorURLClassLoader -> SparkURLClassLoader --- core/src/main/scala/org/apache/spark/SparkContext.scala | 5 ++--- .../src/main/scala/org/apache/spark/executor/Executor.scala | 6 +++--- .../SparkURLClassLoader.scala} | 4 ++-- 3 files changed, 7 insertions(+), 8 deletions(-) rename core/src/main/scala/org/apache/spark/{executor/ExecutorURLClassLoader.scala => util/SparkURLClassLoader.scala} (89%) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d27a6320c41dd..64d5a2dcf8a5f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -39,7 +39,6 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.mesos.MesosNativeLibrary import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} -import org.apache.spark.executor.ExecutorURLClassLoader import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ @@ -50,7 +49,7 @@ import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils} import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Utils, TimeStampedHashMap, MetadataCleaner, MetadataCleanerType, - ClosureCleaner} + ClosureCleaner, SparkURLClassLoader} /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -139,7 +138,7 @@ class SparkContext( // Create a classLoader for use by the driver so that jars added via addJar are available to the // driver. Do this before all other initialization so that any thread pools created for this // SparkContext uses the class loader. - private[spark] val classLoader = new ExecutorURLClassLoader(Array.empty[URL], + private[spark] val classLoader = new SparkURLClassLoader(Array.empty[URL], this.getClass.getClassLoader) Thread.currentThread.setContextClassLoader(classLoader) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index c1b57f74d7e9a..6713a082b3bf8 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -29,7 +29,7 @@ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler._ import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} -import org.apache.spark.util.Utils +import org.apache.spark.util.{Utils, SparkURLClassLoader} /** * Spark executor used with Mesos and the standalone scheduler. @@ -293,7 +293,7 @@ private[spark] class Executor( * Create a ClassLoader for use in tasks, adding any JARs specified by the user or any classes * created by the interpreter to the search path */ - private def createClassLoader(): ExecutorURLClassLoader = { + private def createClassLoader(): SparkURLClassLoader = { val loader = this.getClass.getClassLoader // For each of the jars in the jarSet, add them to the class loader. @@ -301,7 +301,7 @@ private[spark] class Executor( val urls = currentJars.keySet.map { uri => new File(uri.split("/").last).toURI.toURL }.toArray - new ExecutorURLClassLoader(urls, loader) + new SparkURLClassLoader(urls, loader) } /** diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala b/core/src/main/scala/org/apache/spark/util/SparkURLClassLoader.scala similarity index 89% rename from core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala rename to core/src/main/scala/org/apache/spark/util/SparkURLClassLoader.scala index f9bfe8ed2f5ba..19134aca496f4 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkURLClassLoader.scala @@ -15,14 +15,14 @@ * limitations under the License. */ -package org.apache.spark.executor +package org.apache.spark.util import java.net.{URLClassLoader, URL} /** * The addURL method in URLClassLoader is protected. We subclass it to make this accessible. */ -private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader) +private[spark] class SparkURLClassLoader(urls: Array[URL], parent: ClassLoader) extends URLClassLoader(urls, parent) { override def addURL(url: URL) { From 9757c6ffbc9e0e1354c19ad5fafcd7083634cb32 Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Thu, 16 Jan 2014 22:34:34 -0800 Subject: [PATCH 07/12] Use SparkURLClassLoader for driver only if spark.driver.add-dynamic-jars is set --- .../scala/org/apache/spark/SparkContext.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 64d5a2dcf8a5f..ddee17d43ac6e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -138,9 +138,11 @@ class SparkContext( // Create a classLoader for use by the driver so that jars added via addJar are available to the // driver. Do this before all other initialization so that any thread pools created for this // SparkContext uses the class loader. - private[spark] val classLoader = new SparkURLClassLoader(Array.empty[URL], - this.getClass.getClassLoader) - Thread.currentThread.setContextClassLoader(classLoader) + private[spark] val classLoader = if (conf.getBoolean("spark.driver.add-dynamic-jars", false)) { + val loader = new SparkURLClassLoader(Array.empty[URL], this.getClass.getClassLoader) + Thread.currentThread.setContextClassLoader(loader) + Some(loader) + } else None // Create the Spark execution environment (cache, map output tracker, etc) private[spark] val env = SparkEnv.create( @@ -749,9 +751,11 @@ class SparkContext( } private def addUrlToDriverLoader(url: URL) { - if (!classLoader.getURLs.contains(url)) { - logInfo("Adding JAR " + url + " to driver class loader") - classLoader.addURL(url) + classLoader.foreach { loader => + if (!loader.getURLs.contains(url)) { + logInfo("Adding JAR " + url + " to driver class loader") + loader.addURL(url) + } } } From 3e52b0c4984cecdaea67412a7259c3a8ebbbfdf6 Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Thu, 16 Jan 2014 23:54:58 -0800 Subject: [PATCH 08/12] Document new config option and effect on addJar --- .../main/scala/org/apache/spark/SparkContext.scala | 4 +++- docs/cluster-overview.md | 3 ++- docs/configuration.md | 14 ++++++++++++-- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ddee17d43ac6e..39c5ad08e77e3 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -138,6 +138,7 @@ class SparkContext( // Create a classLoader for use by the driver so that jars added via addJar are available to the // driver. Do this before all other initialization so that any thread pools created for this // SparkContext uses the class loader. + // Note that this is config-enabled as classloaders can introduce subtle side effects private[spark] val classLoader = if (conf.getBoolean("spark.driver.add-dynamic-jars", false)) { val loader = new SparkURLClassLoader(Array.empty[URL], this.getClass.getClassLoader) Thread.currentThread.setContextClassLoader(loader) @@ -692,9 +693,10 @@ class SparkContext( /** * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. - * This also makes the JAR available to this driver process. * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. + * NOTE: If you enable spark.driver.add-dynamic-jars, then the JAR will also be made available + * to this SparkContext. local: JARs must be available on the driver node. */ def addJar(path: String) { if (path == null) { diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md index e16703292cc22..bfeccf5fc8e05 100644 --- a/docs/cluster-overview.md +++ b/docs/cluster-overview.md @@ -64,7 +64,8 @@ and `addFile`. - **hdfs:**, **http:**, **https:**, **ftp:** - these pull down files and JARs from the URI as expected - **local:** - a URI starting with local:/ is expected to exist as a local file on each worker node. This means that no network IO will be incurred, and works well for large files/JARs that are pushed to each worker, - or shared via NFS, GlusterFS, etc. + or shared via NFS, GlusterFS, etc. Note that if `spark.driver.add-dynamic-jars` is set, then the file + must be visible to the node running the SparkContext as well. Note that JARs and files are copied to the working directory for each SparkContext on the executor nodes. Over time this can use up a significant amount of space and will need to be cleaned up. diff --git a/docs/configuration.md b/docs/configuration.md index da70cabba2d9b..7ecb10194ca44 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -111,6 +111,7 @@ Apart from these, the following properties are also available, and may be useful it if you configure your own old generation size. + spark.shuffle.memoryFraction 0.3 @@ -329,7 +330,7 @@ Apart from these, the following properties are also available, and may be useful spark.akka.heartbeat.interval 1000 - This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those. + This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those. @@ -346,6 +347,15 @@ Apart from these, the following properties are also available, and may be useful Port for the driver to listen on. + + spark.driver.add-dynamic-jars + false + + If true, the SparkContext uses a class loader to make jars added via `addJar` available to the SparkContext. + The default behavior is that jars added via `addJar` are only made available to executors, and Spark apps + must include all its jars in the application CLASSPATH even if `addJar` is used. + + spark.cleaner.ttl (infinite) @@ -375,7 +385,7 @@ Apart from these, the following properties are also available, and may be useful spark.broadcast.blockSize 4096 - Size of each piece of a block in kilobytes for TorrentBroadcastFactory. + Size of each piece of a block in kilobytes for TorrentBroadcastFactory. Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, BlockManager might take a performance hit. From b132d7bd61f8ba4bd4994b8fee94099125d8ade7 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 10 Mar 2014 21:56:49 -0700 Subject: [PATCH 09/12] Minor changes --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- core/src/main/scala/org/apache/spark/SparkEnv.scala | 2 +- .../src/main/scala/org/apache/spark/util/AkkaUtils.scala | 8 +++----- docs/cluster-overview.md | 4 ++-- docs/configuration.md | 9 +++++---- 5 files changed, 12 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d72ee6fcae1fe..4b82e6b10079e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -134,7 +134,7 @@ class SparkContext( // driver. Do this before all other initialization so that any thread pools created for this // SparkContext uses the class loader. // Note that this is config-enabled as classloaders can introduce subtle side effects - private[spark] val classLoader = if (conf.getBoolean("spark.driver.add-dynamic-jars", false)) { + private[spark] val classLoader = if (conf.getBoolean("spark.driver.loadAddedJars", false)) { val loader = new SparkURLClassLoader(Array.empty[URL], this.getClass.getClassLoader) Thread.currentThread.setContextClassLoader(loader) Some(loader) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index df9d8b5b9fbb5..57cf2ab502473 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -127,7 +127,7 @@ object SparkEnv extends Logging { val securityManager = new SecurityManager(conf) val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf, - securityManager = securityManager, classLoader = classLoader) + securityManager = securityManager) // Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port), // figure out which port number Akka actually bound to and set spark.driver.port to it. diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 4fc2fdf8f3fc8..a6c9a9aaba8eb 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -42,9 +42,7 @@ private[spark] object AkkaUtils extends Logging { * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]]. */ def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false, - conf: SparkConf, securityManager: SecurityManager, - classLoader: ClassLoader = Thread.currentThread.getContextClassLoader): - (ActorSystem, Int) = { + conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = { val akkaThreads = conf.getInt("spark.akka.threads", 4) val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15) @@ -104,9 +102,9 @@ private[spark] object AkkaUtils extends Logging { """.stripMargin)) val actorSystem = if (indestructible) { - IndestructibleActorSystem(name, akkaConf, classLoader) + IndestructibleActorSystem(name, akkaConf) } else { - ActorSystem(name, akkaConf, classLoader) + ActorSystem(name, akkaConf) } val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md index bfeccf5fc8e05..e4af0e5c4ff6c 100644 --- a/docs/cluster-overview.md +++ b/docs/cluster-overview.md @@ -64,8 +64,8 @@ and `addFile`. - **hdfs:**, **http:**, **https:**, **ftp:** - these pull down files and JARs from the URI as expected - **local:** - a URI starting with local:/ is expected to exist as a local file on each worker node. This means that no network IO will be incurred, and works well for large files/JARs that are pushed to each worker, - or shared via NFS, GlusterFS, etc. Note that if `spark.driver.add-dynamic-jars` is set, then the file - must be visible to the node running the SparkContext as well. + or shared via NFS, GlusterFS, etc. Note that if `spark.driver.loadAddedJars` is set, + then the file must be visible to the node running the SparkContext as well. Note that JARs and files are copied to the working directory for each SparkContext on the executor nodes. Over time this can use up a significant amount of space and will need to be cleaned up. diff --git a/docs/configuration.md b/docs/configuration.md index fb05d49e13448..34cb564f03c09 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -394,12 +394,13 @@ Apart from these, the following properties are also available, and may be useful - spark.driver.add-dynamic-jars + spark.driver.loadAddedJars false - If true, the SparkContext uses a class loader to make jars added via `addJar` available to the SparkContext. - The default behavior is that jars added via `addJar` are only made available to executors, and Spark apps - must include all its jars in the application CLASSPATH even if `addJar` is used. + If true, the SparkContext uses a class loader to make jars added via `addJar` available to + the SparkContext. The default behavior is that jars added via `addJar` are only made + available to executors, and Spark apps must include all its jars in the driver's + CLASSPATH even if `addJar` is used. From 95b24f21b8efb11a84529f0a4698773b9cd5fb18 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 11 Mar 2014 10:55:32 -0700 Subject: [PATCH 10/12] Adding unit tests. --- .../org/apache/spark/FileServerSuite.scala | 24 +----- .../scala/org/apache/spark/FileSuite.scala | 23 ++++++ .../scala/org/apache/spark/TestUtils.scala | 80 +++++++++++++++++++ 3 files changed, 107 insertions(+), 20 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/TestUtils.scala diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index aee9ab9091dac..9fb74341df0ff 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -33,11 +33,12 @@ class FileServerSuite extends FunSuite with LocalSparkContext { override def beforeEach() { super.beforeEach() resetSparkContext() - System.setProperty("spark.authenticate", "false") } override def beforeAll() { super.beforeAll() + System.setProperty("spark.authenticate", "false") + val tmpDir = new File(Files.createTempDir(), "test") tmpDir.mkdir() @@ -47,27 +48,10 @@ class FileServerSuite extends FunSuite with LocalSparkContext { pw.close() val jarFile = new File(tmpDir, "test.jar") - val jarStream = new FileOutputStream(jarFile) - val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest()) - System.setProperty("spark.authenticate", "false") - - val jarEntry = new JarEntry(textFile.getName) - jar.putNextEntry(jarEntry) - - val in = new FileInputStream(textFile) - val buffer = new Array[Byte](10240) - var nRead = 0 - while (nRead <= 0) { - nRead = in.read(buffer, 0, buffer.length) - jar.write(buffer, 0, nRead) - } - - in.close() - jar.close() - jarStream.close() + val jarUrl = TestUtils.createJar(Seq(textFile), jarFile) tmpFile = textFile - tmpJarUrl = jarFile.toURI.toURL.toString + tmpJarUrl = jarUrl.toString } test("Distributing files locally") { diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 76173608e9f70..afaaa549e8242 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -29,8 +29,31 @@ import org.scalatest.FunSuite import org.apache.spark.SparkContext._ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat +import scala.util.Try class FileSuite extends FunSuite with LocalSparkContext { + test("adding jars to classpath at the driver") { + val tmpDir = Files.createTempDir() + val classFile = TestUtils.createCompiledClass("HelloSpark", tmpDir) + val jarFile = new File(tmpDir, "test.jar") + TestUtils.createJar(Seq(classFile), jarFile) + + def canLoadClass(clazz: String) = + Try(Class.forName(clazz, true, Thread.currentThread().getContextClassLoader)).isSuccess + + val driverLoadedBefore = canLoadClass("HelloSpark") + + val conf = new SparkConf().setMaster("local-cluster[1,1,512]").setAppName("test") + .set("spark.driver.loadAddedJars", "true") + + val sc = new SparkContext(conf) + sc.addJar(jarFile.getAbsolutePath) + + val driverLoadedAfter = canLoadClass("HelloSpark") + + assert(false === driverLoadedBefore, "Class visible before being added") + assert(true === driverLoadedAfter, "Class was not visible after being added") + } test("text files") { sc = new SparkContext("local", "test") diff --git a/core/src/test/scala/org/apache/spark/TestUtils.scala b/core/src/test/scala/org/apache/spark/TestUtils.scala new file mode 100644 index 0000000000000..6b72bacac47a2 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/TestUtils.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.collection.JavaConversions._ + +import java.io.{FileInputStream, FileOutputStream, File} +import java.util.jar.{JarEntry, JarOutputStream} +import java.net.{URL, URI} +import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider} + +object TestUtils { + + /** Create a jar file that contains this set of files. All files will be located at the root + * of the jar. */ + def createJar(files: Seq[File], jarFile: File): URL = { + val jarFileStream = new FileOutputStream(jarFile) + val jarStream = new JarOutputStream(jarFileStream, new java.util.jar.Manifest()) + + for (file <- files) { + val jarEntry = new JarEntry(file.getName) + jarStream.putNextEntry(jarEntry) + + val in = new FileInputStream(file) + val buffer = new Array[Byte](10240) + var nRead = 0 + while (nRead <= 0) { + nRead = in.read(buffer, 0, buffer.length) + jarStream.write(buffer, 0, nRead) + } + in.close() + } + jarStream.close() + jarFileStream.close() + + jarFile.toURI.toURL + } + + // Adapted from the JavaCompiler.java doc examples + private val SOURCE = JavaFileObject.Kind.SOURCE + private def createURI(name: String) = { + URI.create(s"string:///${name.replace(".", "/")}${SOURCE.extension}") + } + private class JavaSourceFromString(val name: String, val code: String) + extends SimpleJavaFileObject(createURI(name), SOURCE) { + override def getCharContent(ignoreEncodingErrors: Boolean) = code + } + + /** Creates a compiled class with the given name. Class file will be placed in destDir. */ + def createCompiledClass(className: String, destDir: File): File = { + val compiler = ToolProvider.getSystemJavaCompiler + val sourceFile = new JavaSourceFromString(className, s"public class $className {}") + + // Calling this outputs a class file in pwd. It's easier to just rename the file than + // build a custom FileManager that controls the output location. + compiler.getTask(null, null, null, null, null, Seq(sourceFile)).call() + + val fileName = className + ".class" + val result = new File(fileName) + if (!result.exists()) throw new Exception("Compiled file not found: " + fileName) + val out = new File(destDir, fileName) + result.renameTo(out) + out + } +} From fbcb4a06d77adaa65ed020b6c54b12576b153938 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 12 Mar 2014 10:00:56 -0700 Subject: [PATCH 11/12] Review feedback --- .../scala/org/apache/spark/SparkContext.scala | 6 ++- .../scala/org/apache/spark/FileSuite.scala | 43 ++++++++++++++++--- docs/configuration.md | 9 ++-- 3 files changed, 44 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4b82e6b10079e..3dcb5204b6d42 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -133,6 +133,8 @@ class SparkContext( // Create a classLoader for use by the driver so that jars added via addJar are available to the // driver. Do this before all other initialization so that any thread pools created for this // SparkContext uses the class loader. + // In the future it might make sense to expose this to users so they can assign it as the + // context class loader for other threads. // Note that this is config-enabled as classloaders can introduce subtle side effects private[spark] val classLoader = if (conf.getBoolean("spark.driver.loadAddedJars", false)) { val loader = new SparkURLClassLoader(Array.empty[URL], this.getClass.getClassLoader) @@ -736,8 +738,8 @@ class SparkContext( * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. - * NOTE: If you enable spark.driver.add-dynamic-jars, then the JAR will also be made available - * to this SparkContext. local: JARs must be available on the driver node. + * NOTE: If you enable spark.driver.loadAddedJars, then the JAR will also be made available + * to this SparkContext and chld threads. local: JARs must be available on the driver node. */ def addJar(path: String) { if (path == null) { diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index afaaa549e8242..b487cecfeb669 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark import java.io.{File, FileWriter} +import java.util.concurrent.Semaphore import scala.io.Source @@ -41,18 +42,46 @@ class FileSuite extends FunSuite with LocalSparkContext { def canLoadClass(clazz: String) = Try(Class.forName(clazz, true, Thread.currentThread().getContextClassLoader)).isSuccess - val driverLoadedBefore = canLoadClass("HelloSpark") + val loadedBefore = canLoadClass("HelloSpark") val conf = new SparkConf().setMaster("local-cluster[1,1,512]").setAppName("test") .set("spark.driver.loadAddedJars", "true") - val sc = new SparkContext(conf) - sc.addJar(jarFile.getAbsolutePath) - - val driverLoadedAfter = canLoadClass("HelloSpark") - - assert(false === driverLoadedBefore, "Class visible before being added") + var driverLoadedAfter = false + var childLoadedAfter = false + + val sem = new Semaphore(1) + sem.acquire() + + new Thread() { + override def run() { + val sc = new SparkContext(conf) + sc.addJar(jarFile.getAbsolutePath) + driverLoadedAfter = canLoadClass("HelloSpark") + + // Test visibility in a child thread + val childSem = new Semaphore(1) + childSem.acquire() + new Thread() { + override def run() { + childLoadedAfter = canLoadClass("HelloSpark") + childSem.release() + } + }.start() + + childSem.acquire() + sem.release() + } + }.start() + sem.acquire() + + // Test visibility in a parent thread + val parentLoadedAfter = canLoadClass("HelloSpark") + + assert(false === loadedBefore, "Class visible before being added") assert(true === driverLoadedAfter, "Class was not visible after being added") + assert(true === childLoadedAfter, "Class was not visible to child thread after being added") + assert(false === parentLoadedAfter, "Class was visible to parent thread after being added") } test("text files") { diff --git a/docs/configuration.md b/docs/configuration.md index 34cb564f03c09..f9ecb1cd53c11 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -111,7 +111,6 @@ Apart from these, the following properties are also available, and may be useful it if you configure your own old generation size. - spark.shuffle.memoryFraction 0.3 @@ -376,7 +375,7 @@ Apart from these, the following properties are also available, and may be useful spark.akka.heartbeat.interval 1000 - This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those. + This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those. @@ -398,9 +397,9 @@ Apart from these, the following properties are also available, and may be useful false If true, the SparkContext uses a class loader to make jars added via `addJar` available to - the SparkContext. The default behavior is that jars added via `addJar` are only made - available to executors, and Spark apps must include all its jars in the driver's - CLASSPATH even if `addJar` is used. + the SparkContext. The default behavior is that jars added via `addJar` must already be on + the classpath. Jar contents will be visible to the thread that created the SparkContext + and all of its child threads. From 9637d2110b4566e6f5db1ee67feb8c8ce3837e7a Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 12 Mar 2014 10:51:45 -0700 Subject: [PATCH 12/12] Style fixes --- core/src/main/scala/org/apache/spark/SparkContext.scala | 9 +++++---- .../org/apache/spark/scheduler/TaskResultGetter.scala | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3dcb5204b6d42..5ec949edf090e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -782,17 +782,18 @@ class SparkContext( path } - // Add jar to driver class loader so it is available for driver, even if it is not on the classpath + // Add jar to driver class loader so it is available for driver, + // even if it is not on the classpath uri.getScheme match { case null | "file" | "local" => - // Assume file exists on current (driver) node as well. Unlike executors, driver doesn't need to - // download the jar since it's local. + // Assume file exists on current (driver) node as well. Unlike executors, driver + // doesn't need to download the jar since it's local. addUrlToDriverLoader(new URL("file:" + uri.getPath)) case "http" | "https" | "ftp" => // Should be handled by the URLClassLoader, pass along entire URL addUrlToDriverLoader(new URL(path)) case other => - logWarning("This URI scheme for URI " + path + " is not supported by the driver class loader") + logWarning(s"This URI scheme for URI $path is not supported by the driver class loader") } } if (key != null) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index 75f3d789eb42f..1747add15dd9e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -69,7 +69,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul } catch { case cnf: ClassNotFoundException => val loader = Thread.currentThread.getContextClassLoader - taskSetManager.abort("ClassNotFound [" + cnf.getMessage + "] with classloader: " + loader) + taskSetManager.abort(s"ClassNotFound [${cnf.getMessage}] with classloader: " + loader) case ex: Throwable => taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex)) }