From 6c20d37a29a0e5f97ea6becf8a25d16de17044f2 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 22 Mar 2016 17:53:55 +0800 Subject: [PATCH 1/7] Upload metrics.properties automatically with distributed cache --- .../org/apache/spark/deploy/yarn/Client.scala | 14 +++++--- .../deploy/yarn/BaseYarnClusterSuite.scala | 13 +++++++ .../spark/deploy/yarn/YarnClusterSuite.scala | 36 ++++++++++++++----- 3 files changed, 50 insertions(+), 13 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 6bbc8c2dfa19a..1af055bf12033 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -577,11 +577,15 @@ private[spark] class Client( // required when user changes log4j.properties directly to set the log configurations. If // configuration file is provided through --files then executors will be taking configurations // from --files instead of $SPARK_CONF_DIR/log4j.properties. - val log4jFileName = "log4j.properties" - Option(Utils.getContextOrSparkClassLoader.getResource(log4jFileName)).foreach { url => - if (url.getProtocol == "file") { - hadoopConfFiles(log4jFileName) = new File(url.getPath) - } + + // Also uploading metrics.properties to distributed cache if exists in classpath. + // If user specify this file using --files then executors will use the one + // from --files instead. + for { prop <- Seq("log4j.properties", "metrics.properties") + url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop)) + if url.getProtocol == "file" + } { + hadoopConfFiles(prop) = new File(url.getPath) } Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey => diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index 2f3a31cb046bd..ecc7bfca2c968 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -56,11 +56,17 @@ abstract class BaseYarnClusterSuite |log4j.logger.org.spark-project.jetty=WARN """.stripMargin + protected val METRICS_CONF = + """ + |*.source.jvm.class=org.apache.spark.metrics.source.JvmSource + """.stripMargin + private var yarnCluster: MiniYARNCluster = _ protected var tempDir: File = _ private var fakeSparkJar: File = _ protected var hadoopConfDir: File = _ private var logConfDir: File = _ + private var metricsConfDir: File = _ var oldSystemProperties: Properties = null @@ -78,6 +84,11 @@ abstract class BaseYarnClusterSuite val logConfFile = new File(logConfDir, "log4j.properties") Files.write(LOG4J_CONF, logConfFile, StandardCharsets.UTF_8) + metricsConfDir = new File(tempDir, "metrics") + metricsConfDir.mkdir() + val metricsConfFile = new File(metricsConfDir, "metrics.properties") + Files.write(METRICS_CONF, metricsConfFile, StandardCharsets.UTF_8) + // Disable the disk utilization check to avoid the test hanging when people's disks are // getting full. val yarnConf = newYarnConfig() @@ -210,6 +221,8 @@ abstract class BaseYarnClusterSuite .buildClassPath( logConfDir.getAbsolutePath() + File.pathSeparator + + metricsConfDir.getAbsolutePath() + + File.pathSeparator + extraClassPath.mkString(File.pathSeparator)) .asScala .mkString(File.pathSeparator) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 26520529ecabc..f0985d6d93abe 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -139,6 +139,14 @@ class YarnClusterSuite extends BaseYarnClusterSuite { } } + test("upload metrics.properties to distributed cache in client mode") { + testMetricsConf(true) + } + + test("upload metrics.properties to distributed cache in cluster mode") { + testMetricsConf(false) + } + private def testBasicYarnApp(clientMode: Boolean): Unit = { val result = File.createTempFile("result", null, tempDir) val finalState = runSpark(clientMode, mainClassName(YarnClusterDriver.getClass), @@ -187,12 +195,13 @@ class YarnClusterSuite extends BaseYarnClusterSuite { private def testUseClassPathFirst(clientMode: Boolean): Unit = { // Create a jar file that contains a different version of "test.resource". + val resource = "test.resource" val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir) val userJar = TestUtils.createJarWithFiles(Map("test.resource" -> "OVERRIDDEN"), tempDir) val driverResult = File.createTempFile("driver", null, tempDir) val executorResult = File.createTempFile("executor", null, tempDir) val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass), - appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()), + appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath(), resource), extraClassPath = Seq(originalJar.getPath()), extraJars = Seq("local:" + userJar.getPath()), extraConf = Map( @@ -202,6 +211,17 @@ class YarnClusterSuite extends BaseYarnClusterSuite { checkResult(finalState, executorResult, "OVERRIDDEN") } + private def testMetricsConf(clientMode: Boolean): Unit = { + // Create a jar file that contains a different version of "test.resource". + val resource = "metrics.properties" + val driverResult = File.createTempFile("driver", null, tempDir) + val executorResult = File.createTempFile("executor", null, tempDir) + val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass), + appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath(), resource)) + checkResult(finalState, driverResult, METRICS_CONF) + checkResult(finalState, executorResult, METRICS_CONF) + } + } private[spark] class SaveExecutorInfo extends SparkListener { @@ -294,36 +314,36 @@ private object YarnClasspathTest extends Logging { } def main(args: Array[String]): Unit = { - if (args.length != 2) { + if (args.length != 3) { error( s""" |Invalid command line: ${args.mkString(" ")} | - |Usage: YarnClasspathTest [driver result file] [executor result file] + |Usage: YarnClasspathTest [driver result file] [executor result file] [resource name] """.stripMargin) // scalastyle:on println } - readResource(args(0)) + readResource(args(0), args(2)) val sc = new SparkContext(new SparkConf()) try { - sc.parallelize(Seq(1)).foreach { x => readResource(args(1)) } + sc.parallelize(Seq(1)).foreach { x => readResource(args(1), args(2)) } } finally { sc.stop() } System.exit(exitCode) } - private def readResource(resultPath: String): Unit = { + private def readResource(resultPath: String, resourceName: String): Unit = { var result = "failure" try { val ccl = Thread.currentThread().getContextClassLoader() - val resource = ccl.getResourceAsStream("test.resource") + val resource = ccl.getResourceAsStream(resourceName) val bytes = ByteStreams.toByteArray(resource) result = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8) } catch { case t: Throwable => - error(s"loading test.resource to $resultPath", t) + error(s"loading $resourceName to $resultPath", t) // set the exit code if not yet set exitCode = 2 } finally { From f9cb06bc5832b4405ad43fb93a8745564abe0849 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 23 Mar 2016 15:13:30 +0800 Subject: [PATCH 2/7] Fix bug and add unit test --- .../org/apache/spark/deploy/yarn/Client.scala | 31 +++++++++---------- .../spark/deploy/yarn/YarnClusterSuite.scala | 21 ++++++++----- 2 files changed, 28 insertions(+), 24 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1af055bf12033..cfdc50246f3f7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -549,6 +549,21 @@ private[spark] class Client( appMasterOnly = true) require(confLocalizedPath != null) + // Uploading $SPARK_CONF_DIR/log4j.properties file to the distributed cache to make sure that + // the executors will use the latest configurations instead of the default values. This is + // required when user changes log4j.properties directly to set the log configurations. If + // configuration file is provided through --files then executors will be taking configurations + // from --files instead of $SPARK_CONF_DIR/log4j.properties. + + // Also uploading metrics.properties to distributed cache if exists in classpath. + // If user specify this file using --files then executors will use the one + // from --files instead. + for { prop <- Seq("log4j.properties", "metrics.properties") + url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop)) + if url.getProtocol == "file" } { + distribute(url.getPath, LocalResourceType.FILE) + } + localResources } @@ -572,22 +587,6 @@ private[spark] class Client( private def createConfArchive(): File = { val hadoopConfFiles = new HashMap[String, File]() - // Uploading $SPARK_CONF_DIR/log4j.properties file to the distributed cache to make sure that - // the executors will use the latest configurations instead of the default values. This is - // required when user changes log4j.properties directly to set the log configurations. If - // configuration file is provided through --files then executors will be taking configurations - // from --files instead of $SPARK_CONF_DIR/log4j.properties. - - // Also uploading metrics.properties to distributed cache if exists in classpath. - // If user specify this file using --files then executors will use the one - // from --files instead. - for { prop <- Seq("log4j.properties", "metrics.properties") - url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop)) - if url.getProtocol == "file" - } { - hadoopConfFiles(prop) = new File(url.getPath) - } - Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey => sys.env.get(envKey).foreach { path => val dir = new File(path) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index f0985d6d93abe..302778b22754f 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -140,11 +140,19 @@ class YarnClusterSuite extends BaseYarnClusterSuite { } test("upload metrics.properties to distributed cache in client mode") { - testMetricsConf(true) + testLocalResource(true, "metrics.properties", METRICS_CONF) } test("upload metrics.properties to distributed cache in cluster mode") { - testMetricsConf(false) + testLocalResource(false, "metrics.properties", METRICS_CONF) + } + + test("upload log4j.properties to distributed cache in client mode") { + testLocalResource(true, "log4j.properties", LOG4J_CONF) + } + + test("upload log4j.properties to distributed cache in cluster mode") { + testLocalResource(false, "log4j.properties", LOG4J_CONF) } private def testBasicYarnApp(clientMode: Boolean): Unit = { @@ -211,17 +219,14 @@ class YarnClusterSuite extends BaseYarnClusterSuite { checkResult(finalState, executorResult, "OVERRIDDEN") } - private def testMetricsConf(clientMode: Boolean): Unit = { - // Create a jar file that contains a different version of "test.resource". - val resource = "metrics.properties" + private def testLocalResource(clientMode: Boolean, resource: String, result: String): Unit = { val driverResult = File.createTempFile("driver", null, tempDir) val executorResult = File.createTempFile("executor", null, tempDir) val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass), appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath(), resource)) - checkResult(finalState, driverResult, METRICS_CONF) - checkResult(finalState, executorResult, METRICS_CONF) + checkResult(finalState, driverResult, result) + checkResult(finalState, executorResult, result) } - } private[spark] class SaveExecutorInfo extends SparkListener { From 260ff0e093fb01b8821d01db407ff961fcec5920 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 23 Mar 2016 15:55:23 +0800 Subject: [PATCH 3/7] revert the unit test since it cannot fully cover it --- .../deploy/yarn/BaseYarnClusterSuite.scala | 13 ------ .../spark/deploy/yarn/YarnClusterSuite.scala | 41 ++++--------------- 2 files changed, 8 insertions(+), 46 deletions(-) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index ecc7bfca2c968..2f3a31cb046bd 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -56,17 +56,11 @@ abstract class BaseYarnClusterSuite |log4j.logger.org.spark-project.jetty=WARN """.stripMargin - protected val METRICS_CONF = - """ - |*.source.jvm.class=org.apache.spark.metrics.source.JvmSource - """.stripMargin - private var yarnCluster: MiniYARNCluster = _ protected var tempDir: File = _ private var fakeSparkJar: File = _ protected var hadoopConfDir: File = _ private var logConfDir: File = _ - private var metricsConfDir: File = _ var oldSystemProperties: Properties = null @@ -84,11 +78,6 @@ abstract class BaseYarnClusterSuite val logConfFile = new File(logConfDir, "log4j.properties") Files.write(LOG4J_CONF, logConfFile, StandardCharsets.UTF_8) - metricsConfDir = new File(tempDir, "metrics") - metricsConfDir.mkdir() - val metricsConfFile = new File(metricsConfDir, "metrics.properties") - Files.write(METRICS_CONF, metricsConfFile, StandardCharsets.UTF_8) - // Disable the disk utilization check to avoid the test hanging when people's disks are // getting full. val yarnConf = newYarnConfig() @@ -221,8 +210,6 @@ abstract class BaseYarnClusterSuite .buildClassPath( logConfDir.getAbsolutePath() + File.pathSeparator + - metricsConfDir.getAbsolutePath() + - File.pathSeparator + extraClassPath.mkString(File.pathSeparator)) .asScala .mkString(File.pathSeparator) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 302778b22754f..26520529ecabc 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -139,22 +139,6 @@ class YarnClusterSuite extends BaseYarnClusterSuite { } } - test("upload metrics.properties to distributed cache in client mode") { - testLocalResource(true, "metrics.properties", METRICS_CONF) - } - - test("upload metrics.properties to distributed cache in cluster mode") { - testLocalResource(false, "metrics.properties", METRICS_CONF) - } - - test("upload log4j.properties to distributed cache in client mode") { - testLocalResource(true, "log4j.properties", LOG4J_CONF) - } - - test("upload log4j.properties to distributed cache in cluster mode") { - testLocalResource(false, "log4j.properties", LOG4J_CONF) - } - private def testBasicYarnApp(clientMode: Boolean): Unit = { val result = File.createTempFile("result", null, tempDir) val finalState = runSpark(clientMode, mainClassName(YarnClusterDriver.getClass), @@ -203,13 +187,12 @@ class YarnClusterSuite extends BaseYarnClusterSuite { private def testUseClassPathFirst(clientMode: Boolean): Unit = { // Create a jar file that contains a different version of "test.resource". - val resource = "test.resource" val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir) val userJar = TestUtils.createJarWithFiles(Map("test.resource" -> "OVERRIDDEN"), tempDir) val driverResult = File.createTempFile("driver", null, tempDir) val executorResult = File.createTempFile("executor", null, tempDir) val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass), - appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath(), resource), + appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()), extraClassPath = Seq(originalJar.getPath()), extraJars = Seq("local:" + userJar.getPath()), extraConf = Map( @@ -219,14 +202,6 @@ class YarnClusterSuite extends BaseYarnClusterSuite { checkResult(finalState, executorResult, "OVERRIDDEN") } - private def testLocalResource(clientMode: Boolean, resource: String, result: String): Unit = { - val driverResult = File.createTempFile("driver", null, tempDir) - val executorResult = File.createTempFile("executor", null, tempDir) - val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass), - appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath(), resource)) - checkResult(finalState, driverResult, result) - checkResult(finalState, executorResult, result) - } } private[spark] class SaveExecutorInfo extends SparkListener { @@ -319,36 +294,36 @@ private object YarnClasspathTest extends Logging { } def main(args: Array[String]): Unit = { - if (args.length != 3) { + if (args.length != 2) { error( s""" |Invalid command line: ${args.mkString(" ")} | - |Usage: YarnClasspathTest [driver result file] [executor result file] [resource name] + |Usage: YarnClasspathTest [driver result file] [executor result file] """.stripMargin) // scalastyle:on println } - readResource(args(0), args(2)) + readResource(args(0)) val sc = new SparkContext(new SparkConf()) try { - sc.parallelize(Seq(1)).foreach { x => readResource(args(1), args(2)) } + sc.parallelize(Seq(1)).foreach { x => readResource(args(1)) } } finally { sc.stop() } System.exit(exitCode) } - private def readResource(resultPath: String, resourceName: String): Unit = { + private def readResource(resultPath: String): Unit = { var result = "failure" try { val ccl = Thread.currentThread().getContextClassLoader() - val resource = ccl.getResourceAsStream(resourceName) + val resource = ccl.getResourceAsStream("test.resource") val bytes = ByteStreams.toByteArray(resource) result = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8) } catch { case t: Throwable => - error(s"loading $resourceName to $resultPath", t) + error(s"loading test.resource to $resultPath", t) // set the exit code if not yet set exitCode = 2 } finally { From 6702927333de8df8ed416cec14cb130a30e9ab05 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 24 Mar 2016 14:25:32 +0800 Subject: [PATCH 4/7] Add log4j and metrics properties back to conf dir, also be visible to executor --- .../org/apache/spark/deploy/yarn/Client.scala | 45 +++++++++---------- .../spark/deploy/yarn/ExecutorRunnable.scala | 3 +- 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index cfdc50246f3f7..eb805f7d8f1c6 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -545,25 +545,9 @@ private[spark] class Client( // Distribute an archive with Hadoop and Spark configuration for the AM. val (_, confLocalizedPath) = distribute(createConfArchive().toURI().getPath(), resType = LocalResourceType.ARCHIVE, - destName = Some(LOCALIZED_CONF_DIR), - appMasterOnly = true) + destName = Some(LOCALIZED_CONF_DIR)) require(confLocalizedPath != null) - // Uploading $SPARK_CONF_DIR/log4j.properties file to the distributed cache to make sure that - // the executors will use the latest configurations instead of the default values. This is - // required when user changes log4j.properties directly to set the log configurations. If - // configuration file is provided through --files then executors will be taking configurations - // from --files instead of $SPARK_CONF_DIR/log4j.properties. - - // Also uploading metrics.properties to distributed cache if exists in classpath. - // If user specify this file using --files then executors will use the one - // from --files instead. - for { prop <- Seq("log4j.properties", "metrics.properties") - url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop)) - if url.getProtocol == "file" } { - distribute(url.getPath, LocalResourceType.FILE) - } - localResources } @@ -587,6 +571,21 @@ private[spark] class Client( private def createConfArchive(): File = { val hadoopConfFiles = new HashMap[String, File]() + // Uploading $SPARK_CONF_DIR/log4j.properties file to the distributed cache to make sure that + // the executors will use the latest configurations instead of the default values. This is + // required when user changes log4j.properties directly to set the log configurations. If + // configuration file is provided through --files then executors will be taking configurations + // from --files instead of $SPARK_CONF_DIR/log4j.properties. + + // Also uploading metrics.properties to distributed cache if exists in classpath. + // If user specify this file using --files then executors will use the one + // from --files instead. + for { prop <- Seq("log4j.properties", "metrics.properties") + url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop)) + if url.getProtocol == "file" } { + hadoopConfFiles(prop) = new File(url.getPath) + } + Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey => sys.env.get(envKey).foreach { path => val dir = new File(path) @@ -663,7 +662,7 @@ private[spark] class Client( pySparkArchives: Seq[String]): HashMap[String, String] = { logInfo("Setting up the launch environment for our AM container") val env = new HashMap[String, String]() - populateClasspath(args, yarnConf, sparkConf, env, true, sparkConf.get(DRIVER_CLASS_PATH)) + populateClasspath(args, yarnConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH)) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDir env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() @@ -1240,18 +1239,16 @@ object Client extends Logging { conf: Configuration, sparkConf: SparkConf, env: HashMap[String, String], - isAM: Boolean, extraClassPath: Option[String] = None): Unit = { extraClassPath.foreach { cp => addClasspathEntry(getClusterPath(sparkConf, cp), env) } + addClasspathEntry(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env) - if (isAM) { - addClasspathEntry( - YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + - LOCALIZED_CONF_DIR, env) - } + addClasspathEntry( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + + LOCALIZED_CONF_DIR, env) if (sparkConf.get(USER_CLASS_PATH_FIRST)) { // in order to properly add the app jar when user classpath is first diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index f956a4d1d5953..7b55d781f86e8 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -289,8 +289,7 @@ private[yarn] class ExecutorRunnable( private def prepareEnvironment(container: Container): HashMap[String, String] = { val env = new HashMap[String, String]() - Client.populateClasspath(null, yarnConf, sparkConf, env, false, - sparkConf.get(EXECUTOR_CLASS_PATH)) + Client.populateClasspath(null, yarnConf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH)) sparkConf.getExecutorEnv.foreach { case (key, value) => // This assumes each executor environment variable set here is a path From b1da8e5e530923c078d905b453b3b5c26be0d840 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 24 Mar 2016 14:56:40 +0800 Subject: [PATCH 5/7] Fix test compile failure --- .../scala/org/apache/spark/deploy/yarn/ClientSuite.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 24472e006b875..f6a8855f83a39 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -121,7 +121,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val env = new MutableHashMap[String, String]() val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) - populateClasspath(args, conf, sparkConf, env, true) + populateClasspath(args, conf, sparkConf, env) val cp = env("CLASSPATH").split(":|;|") s"$SPARK,$USER,$ADDED".split(",").foreach({ entry => @@ -178,8 +178,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll "/remotePath/1:/remotePath/2") val env = new MutableHashMap[String, String]() - populateClasspath(null, conf, sparkConf, env, false, - extraClassPath = Some("/localPath/my1.jar")) + populateClasspath(null, conf, sparkConf, env, extraClassPath = Some("/localPath/my1.jar")) val cp = classpath(env) cp should contain ("/remotePath/spark.jar") cp should contain ("/remotePath/my1.jar") @@ -355,7 +354,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll private def classpath(client: Client): Array[String] = { val env = new MutableHashMap[String, String]() - populateClasspath(null, client.hadoopConf, client.sparkConf, env, false) + populateClasspath(null, client.hadoopConf, client.sparkConf, env) classpath(env) } From ea17176ebf5030b713a2be363dd9518bdbfd2e5e Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 28 Mar 2016 10:57:00 +0800 Subject: [PATCH 6/7] Remove the support SPARK_LOG4J_CONF --- .../org/apache/spark/deploy/yarn/Client.scala | 33 +++++-------------- 1 file changed, 8 insertions(+), 25 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index eb805f7d8f1c6..e0af5ccedabbb 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -351,14 +351,6 @@ private[spark] class Client( val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - val oldLog4jConf = Option(System.getenv("SPARK_LOG4J_CONF")) - if (oldLog4jConf.isDefined) { - logWarning( - "SPARK_LOG4J_CONF detected in the system environment. This variable has been " + - "deprecated. Please refer to the \"Launching Spark on YARN\" documentation " + - "for alternatives.") - } - def addDistributedUri(uri: URI): Boolean = { val uriStr = uri.toString() if (distributedUris.contains(uriStr)) { @@ -480,25 +472,16 @@ private[spark] class Client( } /** - * Copy a few resources to the distributed cache if their scheme is not "local". + * Copy user jar to the distributed cache if their scheme is not "local". * Otherwise, set the corresponding key in our SparkConf to handle it downstream. - * Each resource is represented by a 3-tuple of: - * (1) destination resource name, - * (2) local path to the resource, - * (3) Spark property key to set if the scheme is not local */ - List( - (APP_JAR_NAME, args.userJar, APP_JAR), - ("log4j.properties", oldLog4jConf.orNull, null) - ).foreach { case (destName, path, confKey) => - if (path != null && !path.trim().isEmpty()) { - val (isLocal, localizedPath) = distribute(path, destName = Some(destName)) - if (isLocal && confKey != null) { - require(localizedPath != null, s"Path $path already distributed.") - // If the resource is intended for local use only, handle this downstream - // by setting the appropriate property - sparkConf.set(confKey, localizedPath) - } + Option(args.userJar).filter(_.trim.nonEmpty).foreach { jar => + val (isLocal, localizedPath) = distribute(jar, destName = Some(APP_JAR_NAME)) + if (isLocal) { + require(localizedPath != null, s"Path $jar already distributed") + // If the resource is intended for local use only, handle this downstream + // by setting the appropriate property + sparkConf.set(APP_JAR, localizedPath) } } From a619dfdf7dc12a3ecb080fcbf292c0c39a734b98 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 31 Mar 2016 22:03:10 +0800 Subject: [PATCH 7/7] Fix the comment --- .../scala/org/apache/spark/deploy/yarn/Client.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index e0af5ccedabbb..b4af73ec4ff96 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -525,7 +525,7 @@ private[spark] class Client( distribute(f, targetDir = targetDir) } - // Distribute an archive with Hadoop and Spark configuration for the AM. + // Distribute an archive with Hadoop and Spark configuration for the AM and executors. val (_, confLocalizedPath) = distribute(createConfArchive().toURI().getPath(), resType = LocalResourceType.ARCHIVE, destName = Some(LOCALIZED_CONF_DIR)) @@ -537,10 +537,10 @@ private[spark] class Client( /** * Create an archive with the config files for distribution. * - * These are only used by the AM, since executors will use the configuration object broadcast by - * the driver. The files are zipped and added to the job as an archive, so that YARN will explode - * it when distributing to the AM. This directory is then added to the classpath of the AM - * process, just to make sure that everybody is using the same default config. + * These will be used by AM and executors. The files are zipped and added to the job as an + * archive, so that YARN will explode it when distributing to AM and executors. This directory + * is then added to the classpath of AM and executor process, just to make sure that everybody + * is using the same default config. * * This follows the order of precedence set by the startup scripts, in which HADOOP_CONF_DIR * shows up in the classpath before YARN_CONF_DIR.