From 943cbf450d32f49a16091247f2bf7e0679d184ae Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 20 May 2015 17:29:29 -0700 Subject: [PATCH 01/10] [SPARK-5479] [yarn] Handle --py-files correctly in YARN. The bug description is a little misleading: the actual issue is that .py files are not handled correctly when distributed by YARN. They're added to "spark.submit.pyFiles", which, when processed by context.py, explicitly whitelists certain extensions (see PACKAGE_EXTENSIONS), and that does not include .py files. On top of that, archives were not handled at all! They made it to the driver's python path, but never made it to executors, since the mechanism used to propagate their location (spark.submit.pyFiles) only works on the driver side. So, instead, ignore "spark.submit.pyFiles" and just build PYTHONPATH correctly for both driver and executors. Individual .py files are placed in a subdirectory of the container's local dir in the cluster, which is then added to the python path. Archives are added directly. The change, as a side effect, ends up solving the symptom described in the bug. The issue was not that the files were not being distributed, but that they were never made visible to the python application running under Spark. Also included is a proper unit test for running python on YARN, which broke in several different ways with the previous code. A short walk around of the changes: - SparkSubmit does not try to be smart about how YARN handles python files anymore. It just passes down the configs to the YARN client code. - The YARN client distributes python files and archives differently, placing the files in a subdirectory. - The YARN client now sets PYTHONPATH for the processes it launches; to properly handle different locations, it uses YARN's support for embedding env variables, so to avoid YARN expanding those at the wrong time, SparkConf is now propagate to the AM using a conf file instead of command line options. - Because the Client initialization code is a maze of implicit dependencies, some code needed to be moved around to make sure all needed state was available when the code ran. - The pyspark tests in YarnClusterSuite now actually distribute and try to use both a python file and an archive containing a different python module. Also added a yarn-client tests for completeness. - I cleaned up some of the code around distributing files to YARN, to avoid adding more copied & pasted code to handle the new files being distributed. --- .../org/apache/spark/deploy/SparkSubmit.scala | 111 ++++---- .../spark/deploy/yarn/ApplicationMaster.scala | 16 +- .../yarn/ApplicationMasterArguments.scala | 12 +- .../org/apache/spark/deploy/yarn/Client.scala | 264 +++++++++++------- .../spark/deploy/yarn/ClientArguments.scala | 4 +- .../spark/deploy/yarn/ExecutorRunnable.scala | 4 + .../cluster/YarnClientSchedulerBackend.scala | 5 +- .../spark/deploy/yarn/ClientSuite.scala | 2 +- .../spark/deploy/yarn/YarnClusterSuite.scala | 61 ++-- 9 files changed, 274 insertions(+), 205 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 329fa06ba8ba5..b3c042cf73f55 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -315,62 +315,58 @@ object SparkSubmit { case _ => } - // If we're running a python app, set the main class to our specific python runner - if (args.isPython && deployMode == CLIENT) { - if (args.primaryResource == PYSPARK_SHELL) { - args.mainClass = "org.apache.spark.api.python.PythonGatewayServer" - } else { - // If a python file is provided, add it to the child arguments and list of files to deploy. - // Usage: PythonAppRunner
[app arguments] - args.mainClass = "org.apache.spark.deploy.PythonRunner" - args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs - args.files = mergeFileLists(args.files, args.primaryResource) - } - args.files = mergeFileLists(args.files, args.pyFiles) - if (args.pyFiles != null) { - sysProps("spark.submit.pyFiles") = args.pyFiles - } - } - - // In yarn mode for a python app, add pyspark archives to files - // that can be distributed with the job - if (args.isPython && clusterManager == YARN) { - var pyArchives: String = null - val pyArchivesEnvOpt = sys.env.get("PYSPARK_ARCHIVES_PATH") - if (pyArchivesEnvOpt.isDefined) { - pyArchives = pyArchivesEnvOpt.get - } else { - if (!sys.env.contains("SPARK_HOME")) { - printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.") - } - val pythonPath = new ArrayBuffer[String] - for (sparkHome <- sys.env.get("SPARK_HOME")) { - val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator) - val pyArchivesFile = new File(pyLibPath, "pyspark.zip") - if (!pyArchivesFile.exists()) { - printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.") + if (args.isPython) { + // In yarn mode for a python app, add pyspark archives to files + // that can be distributed with the job + if (clusterManager == YARN) { + var pyArchives: String = null + val pyArchivesEnvOpt = sys.env.get("PYSPARK_ARCHIVES_PATH") + if (pyArchivesEnvOpt.isDefined) { + pyArchives = pyArchivesEnvOpt.get + } else { + if (!sys.env.contains("SPARK_HOME")) { + printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.") } - val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip") - if (!py4jFile.exists()) { - printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " + - "in yarn mode.") + val pythonPath = new ArrayBuffer[String] + for (sparkHome <- sys.env.get("SPARK_HOME")) { + val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator) + val pyArchivesFile = new File(pyLibPath, "pyspark.zip") + if (!pyArchivesFile.exists()) { + printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.") + } + val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip") + if (!py4jFile.exists()) { + printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " + + "in yarn mode.") + } + pythonPath += pyArchivesFile.getAbsolutePath() + pythonPath += py4jFile.getAbsolutePath() } - pythonPath += pyArchivesFile.getAbsolutePath() - pythonPath += py4jFile.getAbsolutePath() + pyArchives = pythonPath.mkString(",") } - pyArchives = pythonPath.mkString(",") + args.pyFiles = mergeFileLists(args.pyFiles, pyArchives) } - pyArchives = pyArchives.split(",").map { localPath=> - val localURI = Utils.resolveURI(localPath) - if (localURI.getScheme != "local") { - args.files = mergeFileLists(args.files, localURI.toString) - new Path(localPath).getName + // If we're running a python app, set the main class to our specific python runner + if (deployMode == CLIENT) { + if (args.primaryResource == PYSPARK_SHELL) { + args.mainClass = "org.apache.spark.api.python.PythonGatewayServer" } else { - localURI.getPath + // If a python file is provided, add it to the child arguments and list of files to deploy. + // Usage: PythonAppRunner
[app arguments] + args.mainClass = "org.apache.spark.deploy.PythonRunner" + args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs + if (clusterManager != YARN) { + args.files = mergeFileLists(args.files, args.primaryResource) + } + } + if (clusterManager != YARN) { + args.files = mergeFileLists(args.files, args.pyFiles) + } + if (args.pyFiles != null) { + sysProps("spark.submit.pyFiles") = args.pyFiles } - }.mkString(File.pathSeparator) - sysProps("spark.submit.pyArchives") = pyArchives + } } // If we're running a R app, set the main class to our specific R runner @@ -387,13 +383,6 @@ object SparkSubmit { } if (isYarnCluster) { - // In yarn-cluster mode for a python app, add primary resource and pyFiles to files - // that can be distributed with the job - if (args.isPython) { - args.files = mergeFileLists(args.files, args.primaryResource) - args.files = mergeFileLists(args.files, args.pyFiles) - } - // In yarn-cluster mode for a R app, add primary resource to files // that can be distributed with the job if (args.isR) { @@ -520,13 +509,9 @@ object SparkSubmit { if (isYarnCluster) { childMainClass = "org.apache.spark.deploy.yarn.Client" if (args.isPython) { - val mainPyFile = new Path(args.primaryResource).getName - childArgs += ("--primary-py-file", mainPyFile) + childArgs += ("--primary-py-file", args.primaryResource) if (args.pyFiles != null) { - // These files will be distributed to each machine's working directory, so strip the - // path prefix - val pyFilesNames = args.pyFiles.split(",").map(p => (new Path(p)).getName).mkString(",") - childArgs += ("--py-files", pyFilesNames) + childArgs += ("--py-files", args.pyFiles) } childArgs += ("--class", "org.apache.spark.deploy.PythonRunner") } else if (args.isR) { @@ -864,7 +849,7 @@ private[spark] object SparkSubmitUtils { md.addDependency(dd) } } - + /** Add exclusion rules for dependencies already included in the spark-assembly */ def addExclusionRules( ivySettings: IvySettings, diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 29752969e6152..28e2d00372c6d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -46,6 +46,14 @@ private[spark] class ApplicationMaster( client: YarnRMClient) extends Logging { + // Load the properties file with the Spark configuration and set entries as system properties, + // so that user code run inside the AM also has access to them. + if (args.propertiesFile != null) { + Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) => + sys.props(k) = v + } + } + // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be // optimal as more containers are available. Might need to handle this better. @@ -465,9 +473,9 @@ private[spark] class ApplicationMaster( new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader) } + var userArgs = args.userArgs if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) { - System.setProperty("spark.submit.pyFiles", - PythonRunner.formatPaths(args.pyFiles).mkString(",")) + userArgs = Seq(args.primaryPyFile, "") ++ userArgs } if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) { // TODO(davies): add R dependencies here @@ -478,8 +486,8 @@ private[spark] class ApplicationMaster( val userThread = new Thread { override def run() { try { - val mainArgs = new Array[String](args.userArgs.size) - args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size) + val mainArgs = new Array[String](userArgs.size) + userArgs.copyToArray(mainArgs, 0, userArgs.size) mainMethod.invoke(null, mainArgs) finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) logDebug("Done running users class") diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index ae6dc1094d724..68e9f6b4db7f4 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -26,11 +26,11 @@ class ApplicationMasterArguments(val args: Array[String]) { var userClass: String = null var primaryPyFile: String = null var primaryRFile: String = null - var pyFiles: String = null - var userArgs: Seq[String] = Seq[String]() + var userArgs: Seq[String] = Nil var executorMemory = 1024 var executorCores = 1 var numExecutors = DEFAULT_NUMBER_EXECUTORS + var propertiesFile: String = null parseArgs(args.toList) @@ -59,10 +59,6 @@ class ApplicationMasterArguments(val args: Array[String]) { primaryRFile = value args = tail - case ("--py-files") :: value :: tail => - pyFiles = value - args = tail - case ("--args" | "--arg") :: value :: tail => userArgsBuffer += value args = tail @@ -79,6 +75,10 @@ class ApplicationMasterArguments(val args: Array[String]) { executorCores = value args = tail + case ("--properties-file") :: value :: tail => + propertiesFile = value + args = tail + case _ => printUsageAndExit(1, args) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 7e023f2d92578..4cf8a54b076ae 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -17,11 +17,12 @@ package org.apache.spark.deploy.yarn -import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException} +import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException, + OutputStreamWriter} import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} import java.nio.ByteBuffer import java.security.PrivilegedExceptionAction -import java.util.UUID +import java.util.{Properties, UUID} import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.JavaConversions._ @@ -29,6 +30,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map} import scala.reflect.runtime.universe import scala.util.{Try, Success, Failure} +import com.google.common.base.Charsets.UTF_8 import com.google.common.base.Objects import com.google.common.io.Files @@ -270,20 +272,6 @@ private[spark] class Client( "for alternatives.") } - // If we passed in a keytab, make sure we copy the keytab to the staging directory on - // HDFS, and setup the relevant environment vars, so the AM can login again. - if (loginFromKeytab) { - logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" + - " via the YARN Secure Distributed Cache.") - val localUri = new URI(args.keytab) - val localPath = getQualifiedLocalPath(localUri, hadoopConf) - val destinationPath = copyFileToRemote(dst, localPath, replication) - val destFs = FileSystem.get(destinationPath.toUri(), hadoopConf) - distCacheMgr.addResource( - destFs, hadoopConf, destinationPath, localResources, LocalResourceType.FILE, - sparkConf.get("spark.yarn.keytab"), statCache, appMasterOnly = true) - } - def addDistributedUri(uri: URI): Boolean = { val uriStr = uri.toString() if (distributedUris.contains(uriStr)) { @@ -295,6 +283,53 @@ private[spark] class Client( } } + /** + * Distribute a file to the cluster. + * + * @param path URI of the file to distribute. + * @param resType Type of resource being distributed. + * @param destName Name of the file in the distributed cache. + * @param targetDir Subdirectory where to place the file. + * @param appMasterOnly Whether to distribute only to the AM. + * @return A 2-tuple. First item is whether the file is a "local:" URI. Second item is the + * localized path for non-local paths, or the input `path` for local paths. + */ + def distribute( + path: String, + resType: LocalResourceType = LocalResourceType.FILE, + destName: Option[String] = None, + targetDir: Option[String] = None, + appMasterOnly: Boolean = false): (Boolean, String) = { + val localURI = new URI(path.trim()) + if (localURI.getScheme != LOCAL_SCHEME) { + if (addDistributedUri(localURI)) { + val localPath = getQualifiedLocalPath(localURI, hadoopConf) + val linkname = targetDir.map(_ + "/").getOrElse("") + + destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName()) + val destPath = copyFileToRemote(dst, localPath, replication) + distCacheMgr.addResource( + fs, hadoopConf, destPath, localResources, resType, linkname, statCache, + appMasterOnly = appMasterOnly) + (false, linkname) + } else { + (false, null) + } + } else { + (true, path.trim()) + } + } + + // If we passed in a keytab, make sure we copy the keytab to the staging directory on + // HDFS, and setup the relevant environment vars, so the AM can login again. + if (loginFromKeytab) { + logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" + + " via the YARN Secure Distributed Cache.") + val (_, localizedPath) = distribute(args.keytab, + destName = Some(sparkConf.get("spark.yarn.keytab")), + appMasterOnly = true) + require(localizedPath != null) + } + /** * Copy the given main resource to the distributed cache if the scheme is not "local". * Otherwise, set the corresponding key in our SparkConf to handle it downstream. @@ -307,33 +342,18 @@ private[spark] class Client( (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR), (APP_JAR, args.userJar, CONF_SPARK_USER_JAR), ("log4j.properties", oldLog4jConf.orNull, null) - ).foreach { case (destName, _localPath, confKey) => - val localPath: String = if (_localPath != null) _localPath.trim() else "" - if (!localPath.isEmpty()) { - val localURI = new URI(localPath) - if (localURI.getScheme != LOCAL_SCHEME) { - if (addDistributedUri(localURI)) { - val src = getQualifiedLocalPath(localURI, hadoopConf) - val destPath = copyFileToRemote(dst, src, replication) - val destFs = FileSystem.get(destPath.toUri(), hadoopConf) - distCacheMgr.addResource(destFs, hadoopConf, destPath, - localResources, LocalResourceType.FILE, destName, statCache) - } - } else if (confKey != null) { + ).foreach { case (destName, path, confKey) => + if (path != null && !path.isEmpty()) { + val (isLocal, localizedPath) = distribute(path, destName=Some(destName)) + if (isLocal && confKey != null) { // If the resource is intended for local use only, handle this downstream // by setting the appropriate property - sparkConf.set(confKey, localPath) + require(localizedPath != null) + sparkConf.set(confKey, localizedPath) } } } - createConfArchive().foreach { file => - require(addDistributedUri(file.toURI())) - val destPath = copyFileToRemote(dst, new Path(file.toURI()), replication) - distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE, - LOCALIZED_HADOOP_CONF_DIR, statCache, appMasterOnly = true) - } - /** * Do the same for any additional resources passed in through ClientArguments. * Each resource category is represented by a 3-tuple of: @@ -349,21 +369,10 @@ private[spark] class Client( ).foreach { case (flist, resType, addToClasspath) => if (flist != null && !flist.isEmpty()) { flist.split(',').foreach { file => - val localURI = new URI(file.trim()) - if (localURI.getScheme != LOCAL_SCHEME) { - if (addDistributedUri(localURI)) { - val localPath = new Path(localURI) - val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) - val destPath = copyFileToRemote(dst, localPath, replication) - distCacheMgr.addResource( - fs, hadoopConf, destPath, localResources, resType, linkname, statCache) - if (addToClasspath) { - cachedSecondaryJarLinks += linkname - } - } - } else if (addToClasspath) { - // Resource is intended for local use only and should be added to the class path - cachedSecondaryJarLinks += file.trim() + val (isLocal, localizedPath) = distribute(file, resType = resType) + require(localizedPath != null) + if (addToClasspath) { + cachedSecondaryJarLinks += localizedPath } } } @@ -372,11 +381,29 @@ private[spark] class Client( sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(",")) } + if (args.primaryPyFile != null) { + distribute(args.primaryPyFile) + } + + // The python files list needs to be treated especially. All files that are not in an + // archive need to be placed in a subdirectory that will be added to PYTHONPATH. + args.pyFiles.foreach { f => + val targetDir = if (f.endsWith(".py")) Some(LOCALIZED_PYTHON_DIR) else None + distribute(f, targetDir = targetDir) + } + + // Distribute an archive with Hadoop and Spark configuration for the AM. + val (_, confLocalizedPath) = distribute(createConfArchive().getAbsolutePath(), + resType = LocalResourceType.ARCHIVE, + destName = Some(LOCALIZED_CONF_DIR), + appMasterOnly = true) + require(confLocalizedPath != null) + localResources } /** - * Create an archive with the Hadoop config files for distribution. + * Create an archive with the config files for distribution. * * These are only used by the AM, since executors will use the configuration object broadcast by * the driver. The files are zipped and added to the job as an archive, so that YARN will explode @@ -388,8 +415,11 @@ private[spark] class Client( * * Currently this makes a shallow copy of the conf directory. If there are cases where a * Hadoop config directory contains subdirectories, this code will have to be fixed. + * + * The archive also contains some Spark configuration. Namely, it saves the contents of + * SparkConf in a file to be loaded by the AM process. */ - private def createConfArchive(): Option[File] = { + private def createConfArchive(): File = { val hadoopConfFiles = new HashMap[String, File]() Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey => sys.env.get(envKey).foreach { path => @@ -404,28 +434,32 @@ private[spark] class Client( } } - if (!hadoopConfFiles.isEmpty) { - val hadoopConfArchive = File.createTempFile(LOCALIZED_HADOOP_CONF_DIR, ".zip", - new File(Utils.getLocalDir(sparkConf))) + val confArchive = File.createTempFile(LOCALIZED_CONF_DIR, ".zip", + new File(Utils.getLocalDir(sparkConf))) + val confStream = new ZipOutputStream(new FileOutputStream(confArchive)) - val hadoopConfStream = new ZipOutputStream(new FileOutputStream(hadoopConfArchive)) - try { - hadoopConfStream.setLevel(0) - hadoopConfFiles.foreach { case (name, file) => - if (file.canRead()) { - hadoopConfStream.putNextEntry(new ZipEntry(name)) - Files.copy(file, hadoopConfStream) - hadoopConfStream.closeEntry() - } + try { + confStream.setLevel(0) + hadoopConfFiles.foreach { case (name, file) => + if (file.canRead()) { + confStream.putNextEntry(new ZipEntry(name)) + Files.copy(file, confStream) + confStream.closeEntry() } - } finally { - hadoopConfStream.close() } - Some(hadoopConfArchive) - } else { - None + // Save Spark configuration to a file in the archive. + val props = new Properties() + sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) } + confStream.putNextEntry(new ZipEntry(SPARK_CONF_FILE)) + val writer = new OutputStreamWriter(confStream, UTF_8) + props.store(writer, "Spark configuration.") + writer.flush() + confStream.closeEntry() + } finally { + confStream.close() } + confArchive } /** @@ -471,9 +505,6 @@ private[spark] class Client( val renewalInterval = getTokenRenewalInterval(stagingDirPath) sparkConf.set("spark.yarn.token.renewal.interval", renewalInterval.toString) } - // Set the environment variables to be passed on to the executors. - distCacheMgr.setDistFilesEnv(env) - distCacheMgr.setDistArchivesEnv(env) // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.* val amEnvPrefix = "spark.yarn.appMasterEnv." @@ -490,15 +521,32 @@ private[spark] class Client( env("SPARK_YARN_USER_ENV") = userEnvs } - // if spark.submit.pyArchives is in sparkConf, append pyArchives to PYTHONPATH - // that can be passed on to the ApplicationMaster and the executors. - if (sparkConf.contains("spark.submit.pyArchives")) { - var pythonPath = sparkConf.get("spark.submit.pyArchives") - if (env.contains("PYTHONPATH")) { - pythonPath = Seq(env.get("PYTHONPATH"), pythonPath).mkString(File.pathSeparator) + // If pyFiles contains any .py files, we need to add LOCALIZED_PYTHON_DIR to the PYTHONPATH + // of the container processes too. Add all non-.py files directly to PYTHONPATH. + // + // NOTE: the code currently does not handle .py files defined with a "local:" scheme. + val pythonPath = new ListBuffer[String]() + val (pyFiles, pyArchives) = args.pyFiles.partition(_.endsWith(".py")) + if (pyFiles.nonEmpty) { + pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), + LOCALIZED_PYTHON_DIR) + } + pyArchives.foreach { path => + val uri = new URI(path) + if (uri.getScheme != LOCAL_SCHEME) { + pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), + new Path(path).getName()) + } else { + pythonPath += uri.getPath() } - env("PYTHONPATH") = pythonPath - sparkConf.setExecutorEnv("PYTHONPATH", pythonPath) + } + + // Finally, update the Spark config to propagate PYTHONPATH to the AM and executors. + if (pythonPath.nonEmpty) { + val pythonPathStr = (sys.env.get("PYTHONPATH") ++ pythonPath) + .mkString(YarnSparkHadoopUtil.getClassPathSeparator) + env("PYTHONPATH") = pythonPathStr + sparkConf.set(PYTHON_PATH_CONF_KEY, pythonPathStr) } // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to @@ -548,8 +596,13 @@ private[spark] class Client( logInfo("Setting up container launch context for our AM") val appId = newAppResponse.getApplicationId val appStagingDir = getAppStagingDir(appId) - val localResources = prepareLocalResources(appStagingDir) val launchEnv = setupLaunchEnv(appStagingDir) + val localResources = prepareLocalResources(appStagingDir) + + // Set the environment variables to be passed on to the executors. + distCacheMgr.setDistFilesEnv(launchEnv) + distCacheMgr.setDistArchivesEnv(launchEnv) + val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) amContainer.setLocalResources(localResources) amContainer.setEnvironment(launchEnv) @@ -589,13 +642,6 @@ private[spark] class Client( javaOpts += "-XX:CMSIncrementalDutyCycle=10" } - // Forward the Spark configuration to the application master / executors. - // TODO: it might be nicer to pass these as an internal environment variable rather than - // as Java options, due to complications with string parsing of nested quotes. - for ((k, v) <- sparkConf.getAll) { - javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") - } - // Include driver-specific java options if we are launching a driver if (isClusterMode) { val driverOpts = sparkConf.getOption("spark.driver.extraJavaOptions") @@ -648,14 +694,8 @@ private[spark] class Client( Nil } val primaryPyFile = - if (args.primaryPyFile != null) { - Seq("--primary-py-file", args.primaryPyFile) - } else { - Nil - } - val pyFiles = - if (args.pyFiles != null) { - Seq("--py-files", args.pyFiles) + if (isClusterMode && args.primaryPyFile != null) { + Seq("--primary-py-file", new Path(args.primaryPyFile).getName()) } else { Nil } @@ -671,9 +711,6 @@ private[spark] class Client( } else { Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName } - if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) { - args.userArgs = ArrayBuffer(args.primaryPyFile, args.pyFiles) ++ args.userArgs - } if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) { args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs } @@ -681,11 +718,13 @@ private[spark] class Client( Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg)) } val amArgs = - Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ pyFiles ++ primaryRFile ++ + Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++ Seq( "--executor-memory", args.executorMemory.toString + "m", "--executor-cores", args.executorCores.toString, - "--num-executors ", args.numExecutors.toString) + "--num-executors ", args.numExecutors.toString, + "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), + LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) // Command for the ApplicationMaster val commands = prefixEnv ++ Seq( @@ -899,8 +938,21 @@ object Client extends Logging { // Distribution-defined classpath to add to processes val ENV_DIST_CLASSPATH = "SPARK_DIST_CLASSPATH" - // Subdirectory where the user's hadoop config files will be placed. - val LOCALIZED_HADOOP_CONF_DIR = "__hadoop_conf__" + // Subdirectory where the user's Spark and Hadoop config files will be placed. + val LOCALIZED_CONF_DIR = "__spark_conf__" + + // Name fo the file in the conf archive containing Spark configuration. + val SPARK_CONF_FILE = "__spark_conf__.properties" + + // Subdirectory where the user's python files (not archives) will be placed. + val LOCALIZED_PYTHON_DIR = "__pyfiles__" + + // Key in SparkConf where to find the executors' PYTHONPATH. This cannot be set using + // `SparkConf.setExecutorEnv`, because that would cause it to propagate to the python + // code via the configuration, and then override the process's own environment when + // launching workers. Since it contains variables that are expanded by YARN, that cannot + // happen. + val PYTHON_PATH_CONF_KEY = "spark.yarn.pythonPath" /** * Find the user-defined Spark jar if configured, or return the jar containing this @@ -1025,7 +1077,7 @@ object Client extends Logging { if (isAM) { addClasspathEntry( YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + - LOCALIZED_HADOOP_CONF_DIR, env) + LOCALIZED_CONF_DIR, env) } if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 5653c9f14dc6d..a8aa3cb7f1b1f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -30,7 +30,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) var archives: String = null var userJar: String = null var userClass: String = null - var pyFiles: String = null + var pyFiles: Seq[String] = Nil var primaryPyFile: String = null var primaryRFile: String = null var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]() @@ -222,7 +222,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) args = tail case ("--py-files") :: value :: tail => - pyFiles = value + pyFiles = value.split(",") args = tail case ("--files") :: value :: tail => diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 9d04d241dae9e..e9b147ed7f440 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -285,6 +285,10 @@ class ExecutorRunnable( YarnSparkHadoopUtil.addPathToEnvironment(env, key, value) } + sparkConf.getOption(Client.PYTHON_PATH_CONF_KEY).foreach { path => + YarnSparkHadoopUtil.addPathToEnvironment(env, "PYTHONPATH", path) + } + // Keep this for backwards compatibility but users should move to the config sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs => YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs) diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 99c05329b4d73..1c8d7ec57635f 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -76,7 +76,8 @@ private[spark] class YarnClientSchedulerBackend( ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"), ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"), ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"), - ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue") + ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"), + ("--py-files", null, "spark.submit.pyFiles") ) // Warn against the following deprecated environment variables: env var -> suggestion val deprecatedEnvVars = Map( @@ -86,7 +87,7 @@ private[spark] class YarnClientSchedulerBackend( optionTuples.foreach { case (optionName, envVar, sparkProp) => if (sc.getConf.contains(sparkProp)) { extraArgs += (optionName, sc.getConf.get(sparkProp)) - } else if (System.getenv(envVar) != null) { + } else if (envVar != null && System.getenv(envVar) != null) { extraArgs += (optionName, System.getenv(envVar)) if (deprecatedEnvVars.contains(envVar)) { logWarning(s"NOTE: $envVar is deprecated. Use ${deprecatedEnvVars(envVar)} instead.") diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 508819e242a26..4a6d68d4ebd17 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -113,7 +113,7 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll { Environment.PWD.$() } cp should contain(pwdVar) - cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_HADOOP_CONF_DIR}") + cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_CONF_DIR}") cp should not contain (Client.SPARK_JAR) cp should not contain (Client.APP_JAR) } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index d3c606e0ed998..ede9e7044bc68 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -54,6 +54,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit """.stripMargin private val TEST_PYFILE = """ + |import mod1, mod2 |import sys |from operator import add | @@ -65,7 +66,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit | sc = SparkContext(conf=SparkConf()) | status = open(sys.argv[1],'w') | result = "failure" - | rdd = sc.parallelize(range(10)) + | rdd = sc.parallelize(range(10)).map(lambda x: x * mod1.func() * mod2.func()) | cnt = rdd.count() | if cnt == 10: | result = "success" @@ -74,6 +75,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit | sc.stop() """.stripMargin + private val TEST_PYMODULE = """ + |def func(): + | return 42 + """.stripMargin + private var yarnCluster: MiniYARNCluster = _ private var tempDir: File = _ private var fakeSparkJar: File = _ @@ -122,7 +128,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}") fakeSparkJar = File.createTempFile("sparkJar", null, tempDir) - hadoopConfDir = new File(tempDir, Client.LOCALIZED_HADOOP_CONF_DIR) + hadoopConfDir = new File(tempDir, Client.LOCALIZED_CONF_DIR) assert(hadoopConfDir.mkdir()) File.createTempFile("token", ".txt", hadoopConfDir) } @@ -149,26 +155,12 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit } } - // Enable this once fix SPARK-6700 - test("run Python application in yarn-cluster mode") { - val primaryPyFile = new File(tempDir, "test.py") - Files.write(TEST_PYFILE, primaryPyFile, UTF_8) - val pyFile = new File(tempDir, "test2.py") - Files.write(TEST_PYFILE, pyFile, UTF_8) - var result = File.createTempFile("result", null, tempDir) - - // The sbt assembly does not include pyspark / py4j python dependencies, so we need to - // propagate SPARK_HOME so that those are added to PYTHONPATH. See PythonUtils.scala. - val sparkHome = sys.props("spark.test.home") - val extraConf = Map( - "spark.executorEnv.SPARK_HOME" -> sparkHome, - "spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome) + test("run Python application in yarn-client mode") { + testPySpark(true) + } - runSpark(false, primaryPyFile.getAbsolutePath(), - sparkArgs = Seq("--py-files", pyFile.getAbsolutePath()), - appArgs = Seq(result.getAbsolutePath()), - extraConf = extraConf) - checkResult(result) + test("run Python application in yarn-cluster mode") { + testPySpark(false) } test("user class path first in client mode") { @@ -186,6 +178,33 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit checkResult(result) } + private def testPySpark(clientMode: Boolean): Unit = { + val primaryPyFile = new File(tempDir, "test.py") + Files.write(TEST_PYFILE, primaryPyFile, UTF_8) + + val moduleDir = + if (clientMode) { + // In client-mode, .py files added with --py-files are not visible in the driver. + // This is something that the launcher library would have to handle. + tempDir + } else { + val subdir = new File(tempDir, "pyModules") + subdir.mkdir() + subdir + } + val pyModule = new File(moduleDir, "mod1.py") + Files.write(TEST_PYMODULE, pyModule, UTF_8) + + val mod2Archive = TestUtils.createJarWithFiles(Map("mod2.py" -> TEST_PYMODULE), moduleDir) + val pyFiles = Seq(pyModule.getAbsolutePath(), mod2Archive.getPath()).mkString(",") + val result = File.createTempFile("result", null, tempDir) + + runSpark(false, primaryPyFile.getAbsolutePath(), + sparkArgs = Seq("--py-files", pyFiles), + appArgs = Seq(result.getAbsolutePath())) + checkResult(result) + } + private def testUseClassPathFirst(clientMode: Boolean): Unit = { // Create a jar file that contains a different version of "test.resource". val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir) From 09045f1db1f51657a040fec7c27c0ed1ed1463cb Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 22 May 2015 14:19:21 -0700 Subject: [PATCH 02/10] Style. --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index b3c042cf73f55..16aa8209e5b9e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -352,7 +352,8 @@ object SparkSubmit { if (args.primaryResource == PYSPARK_SHELL) { args.mainClass = "org.apache.spark.api.python.PythonGatewayServer" } else { - // If a python file is provided, add it to the child arguments and list of files to deploy. + // If a python file is provided, add it to the child arguments and list of files to + // deploy. // Usage: PythonAppRunner
[app arguments] args.mainClass = "org.apache.spark.deploy.PythonRunner" args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs From 7fe3cd4aa603032ded2461339d6326a6c93caa78 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 25 May 2015 14:08:50 -0700 Subject: [PATCH 03/10] No need to distribute primary file to executors. --- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 4cf8a54b076ae..3990907d8cda4 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -381,11 +381,11 @@ private[spark] class Client( sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(",")) } - if (args.primaryPyFile != null) { - distribute(args.primaryPyFile) + if (isClusterMode && args.primaryPyFile != null) { + distribute(args.primaryPyFile, appMasterOnly = true) } - // The python files list needs to be treated especially. All files that are not in an + // The python files list needs to be treated especially. All files that are not an // archive need to be placed in a subdirectory that will be added to PYTHONPATH. args.pyFiles.foreach { f => val targetDir = if (f.endsWith(".py")) Some(LOCALIZED_PYTHON_DIR) else None From 220358bc53ff2cc97c8f31d00a7166563f82046d Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 2 Jun 2015 08:38:16 -0700 Subject: [PATCH 04/10] Scalastyle. --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index dfd4887f60891..3baabcdb85043 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -344,7 +344,7 @@ private[spark] class Client( ("log4j.properties", oldLog4jConf.orNull, null) ).foreach { case (destName, path, confKey) => if (path != null && !path.isEmpty()) { - val (isLocal, localizedPath) = distribute(path, destName=Some(destName)) + val (isLocal, localizedPath) = distribute(path, destName = Some(destName)) if (isLocal && confKey != null) { // If the resource is intended for local use only, handle this downstream // by setting the appropriate property From 1dd4d0c7c741e4d8f50a122d7886d2d771a1f3c1 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 4 Jun 2015 17:33:02 -0700 Subject: [PATCH 05/10] Review feedback. --- .../org/apache/spark/deploy/SparkSubmit.scala | 91 +++++++++---------- .../spark/deploy/yarn/ApplicationMaster.scala | 2 + .../org/apache/spark/deploy/yarn/Client.scala | 23 ++--- .../spark/deploy/yarn/ExecutorRunnable.scala | 4 - 4 files changed, 56 insertions(+), 64 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 725c0497f486c..2c30d269168dd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -315,59 +315,58 @@ object SparkSubmit { case _ => } - if (args.isPython) { + if (args.isPython && clusterManager == YARN) { // In yarn mode for a python app, add pyspark archives to files // that can be distributed with the job - if (clusterManager == YARN) { - var pyArchives: String = null - val pyArchivesEnvOpt = sys.env.get("PYSPARK_ARCHIVES_PATH") - if (pyArchivesEnvOpt.isDefined) { - pyArchives = pyArchivesEnvOpt.get - } else { - if (!sys.env.contains("SPARK_HOME")) { - printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.") + var pyArchives: String = null + val pyArchivesEnvOpt = sys.env.get("PYSPARK_ARCHIVES_PATH") + if (pyArchivesEnvOpt.isDefined) { + pyArchives = pyArchivesEnvOpt.get + } else { + if (!sys.env.contains("SPARK_HOME")) { + printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.") + } + val pythonPath = new ArrayBuffer[String] + for (sparkHome <- sys.env.get("SPARK_HOME")) { + val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator) + val pyArchivesFile = new File(pyLibPath, "pyspark.zip") + if (!pyArchivesFile.exists()) { + printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.") } - val pythonPath = new ArrayBuffer[String] - for (sparkHome <- sys.env.get("SPARK_HOME")) { - val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator) - val pyArchivesFile = new File(pyLibPath, "pyspark.zip") - if (!pyArchivesFile.exists()) { - printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.") - } - val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip") - if (!py4jFile.exists()) { - printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " + - "in yarn mode.") - } - pythonPath += pyArchivesFile.getAbsolutePath() - pythonPath += py4jFile.getAbsolutePath() + val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip") + if (!py4jFile.exists()) { + printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " + + "in yarn mode.") } - pyArchives = pythonPath.mkString(",") + pythonPath += pyArchivesFile.getAbsolutePath() + pythonPath += py4jFile.getAbsolutePath() } - args.pyFiles = mergeFileLists(args.pyFiles, pyArchives) + pyArchives = pythonPath.mkString(",") } + args.pyFiles = mergeFileLists(args.pyFiles, pyArchives) + } - // If we're running a python app, set the main class to our specific python runner - if (deployMode == CLIENT) { - if (args.primaryResource == PYSPARK_SHELL) { - args.mainClass = "org.apache.spark.api.python.PythonGatewayServer" - } else { - // If a python file is provided, add it to the child arguments and list of files to - // deploy. - // Usage: PythonAppRunner
[app arguments] - args.mainClass = "org.apache.spark.deploy.PythonRunner" - args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs - if (clusterManager != YARN) { - args.files = mergeFileLists(args.files, args.primaryResource) - } - } + // If we're running a python app, set the main class to our specific python runner + if (args.isPython && deployMode == CLIENT) { + if (args.primaryResource == PYSPARK_SHELL) { + args.mainClass = "org.apache.spark.api.python.PythonGatewayServer" + } else { + // If a python file is provided, add it to the child arguments and list of files to deploy. + // Usage: PythonAppRunner
[app arguments] + args.mainClass = "org.apache.spark.deploy.PythonRunner" + args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs if (clusterManager != YARN) { - args.files = mergeFileLists(args.files, args.pyFiles) - } - if (args.pyFiles != null) { - sysProps("spark.submit.pyFiles") = args.pyFiles + // The YARN backend distributes the primary file differently, so don't merge it. + args.files = mergeFileLists(args.files, args.primaryResource) } } + if (clusterManager != YARN) { + // The YARN backend handles python files differently, so don't merge the lists. + args.files = mergeFileLists(args.files, args.pyFiles) + } + if (args.pyFiles != null) { + sysProps("spark.submit.pyFiles") = args.pyFiles + } } // If we're running a R app, set the main class to our specific R runner @@ -383,12 +382,10 @@ object SparkSubmit { } } - if (isYarnCluster) { + if (isYarnCluster && args.isR) { // In yarn-cluster mode for a R app, add primary resource to files // that can be distributed with the job - if (args.isR) { - args.files = mergeFileLists(args.files, args.primaryResource) - } + args.files = mergeFileLists(args.files, args.primaryResource) } // Special flag to avoid deprecation warnings at the client diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index c209cf2460ea7..952c86edf75c4 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -500,6 +500,8 @@ private[spark] class ApplicationMaster( var userArgs = args.userArgs if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) { + // Second argument is the list of files to add to PYTHONPATH, which Client.scala already + // handles, so it's empty. userArgs = Seq(args.primaryPyFile, "") ++ userArgs } if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 3baabcdb85043..2b358a89d59c3 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -286,6 +286,9 @@ private[spark] class Client( /** * Distribute a file to the cluster. * + * If the file's path is a "local:" URI, it's actually not distributed. Other files are copied + * to HDFS (if not already there) and added to the application's distributed cache. + * * @param path URI of the file to distribute. * @param resType Type of resource being distributed. * @param destName Name of the file in the distributed cache. @@ -293,6 +296,7 @@ private[spark] class Client( * @param appMasterOnly Whether to distribute only to the AM. * @return A 2-tuple. First item is whether the file is a "local:" URI. Second item is the * localized path for non-local paths, or the input `path` for local paths. + * The localized path will be null if the URI has already been added to the cache. */ def distribute( path: String, @@ -327,7 +331,7 @@ private[spark] class Client( val (_, localizedPath) = distribute(args.keytab, destName = Some(sparkConf.get("spark.yarn.keytab")), appMasterOnly = true) - require(localizedPath != null) + require(localizedPath != null, "Keytab file already distributed.") } /** @@ -343,12 +347,12 @@ private[spark] class Client( (APP_JAR, args.userJar, CONF_SPARK_USER_JAR), ("log4j.properties", oldLog4jConf.orNull, null) ).foreach { case (destName, path, confKey) => - if (path != null && !path.isEmpty()) { + if (path != null && !path.trim().isEmpty()) { val (isLocal, localizedPath) = distribute(path, destName = Some(destName)) if (isLocal && confKey != null) { + require(localizedPath != null, s"Path $path already distributed.") // If the resource is intended for local use only, handle this downstream // by setting the appropriate property - require(localizedPath != null) sparkConf.set(confKey, localizedPath) } } @@ -369,7 +373,7 @@ private[spark] class Client( ).foreach { case (flist, resType, addToClasspath) => if (flist != null && !flist.isEmpty()) { flist.split(',').foreach { file => - val (isLocal, localizedPath) = distribute(file, resType = resType) + val (_, localizedPath) = distribute(file, resType = resType) require(localizedPath != null) if (addToClasspath) { cachedSecondaryJarLinks += localizedPath @@ -546,7 +550,7 @@ private[spark] class Client( val pythonPathStr = (sys.env.get("PYTHONPATH") ++ pythonPath) .mkString(YarnSparkHadoopUtil.getClassPathSeparator) env("PYTHONPATH") = pythonPathStr - sparkConf.set(PYTHON_PATH_CONF_KEY, pythonPathStr) + sparkConf.setExecutorEnv("PYTHONPATH", pythonPathStr) } // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to @@ -941,19 +945,12 @@ object Client extends Logging { // Subdirectory where the user's Spark and Hadoop config files will be placed. val LOCALIZED_CONF_DIR = "__spark_conf__" - // Name fo the file in the conf archive containing Spark configuration. + // Name of the file in the conf archive containing Spark configuration. val SPARK_CONF_FILE = "__spark_conf__.properties" // Subdirectory where the user's python files (not archives) will be placed. val LOCALIZED_PYTHON_DIR = "__pyfiles__" - // Key in SparkConf where to find the executors' PYTHONPATH. This cannot be set using - // `SparkConf.setExecutorEnv`, because that would cause it to propagate to the python - // code via the configuration, and then override the process's own environment when - // launching workers. Since it contains variables that are expanded by YARN, that cannot - // happen. - val PYTHON_PATH_CONF_KEY = "spark.yarn.pythonPath" - /** * Find the user-defined Spark jar if configured, or return the jar containing this * class if not. diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index e9b147ed7f440..9d04d241dae9e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -285,10 +285,6 @@ class ExecutorRunnable( YarnSparkHadoopUtil.addPathToEnvironment(env, key, value) } - sparkConf.getOption(Client.PYTHON_PATH_CONF_KEY).foreach { path => - YarnSparkHadoopUtil.addPathToEnvironment(env, "PYTHONPATH", path) - } - // Keep this for backwards compatibility but users should move to the config sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs => YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs) From 705571d6f999f3a8a4681626e79b4bf1cd134eb1 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 4 Jun 2015 17:46:18 -0700 Subject: [PATCH 06/10] Move some code to the YARN module. This gets rid of an ordering issue with the code in SparkSubmit that was easy to miss. --- .../org/apache/spark/deploy/SparkSubmit.scala | 31 ----------------- .../org/apache/spark/deploy/yarn/Client.scala | 33 ++++++++++++++++--- .../spark/deploy/yarn/ClientSuite.scala | 2 +- 3 files changed, 29 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 2c30d269168dd..1c4edfaf98505 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -315,37 +315,6 @@ object SparkSubmit { case _ => } - if (args.isPython && clusterManager == YARN) { - // In yarn mode for a python app, add pyspark archives to files - // that can be distributed with the job - var pyArchives: String = null - val pyArchivesEnvOpt = sys.env.get("PYSPARK_ARCHIVES_PATH") - if (pyArchivesEnvOpt.isDefined) { - pyArchives = pyArchivesEnvOpt.get - } else { - if (!sys.env.contains("SPARK_HOME")) { - printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.") - } - val pythonPath = new ArrayBuffer[String] - for (sparkHome <- sys.env.get("SPARK_HOME")) { - val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator) - val pyArchivesFile = new File(pyLibPath, "pyspark.zip") - if (!pyArchivesFile.exists()) { - printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.") - } - val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip") - if (!py4jFile.exists()) { - printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " + - "in yarn mode.") - } - pythonPath += pyArchivesFile.getAbsolutePath() - pythonPath += py4jFile.getAbsolutePath() - } - pyArchives = pythonPath.mkString(",") - } - args.pyFiles = mergeFileLists(args.pyFiles, pyArchives) - } - // If we're running a python app, set the main class to our specific python runner if (args.isPython && deployMode == CLIENT) { if (args.primaryResource == PYSPARK_SHELL) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 2b358a89d59c3..24a03753904f1 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -242,7 +242,9 @@ private[spark] class Client( * This is used for setting up a container launch context for our ApplicationMaster. * Exposed for testing. */ - def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = { + def prepareLocalResources( + appStagingDir: String, + pySparkArchives: Seq[String]): HashMap[String, LocalResource] = { logInfo("Preparing resources for our AM container") // Upload Spark and the application JAR to the remote file system if necessary, // and add them as local resources to the application master. @@ -389,6 +391,8 @@ private[spark] class Client( distribute(args.primaryPyFile, appMasterOnly = true) } + pySparkArchives.foreach { f => distribute(f) } + // The python files list needs to be treated especially. All files that are not an // archive need to be placed in a subdirectory that will be added to PYTHONPATH. args.pyFiles.foreach { f => @@ -491,7 +495,9 @@ private[spark] class Client( /** * Set up the environment for launching our ApplicationMaster container. */ - private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = { + private def setupLaunchEnv( + stagingDir: String, + pySparkArchives: Seq[String]): HashMap[String, String] = { logInfo("Setting up the launch environment for our AM container") val env = new HashMap[String, String]() val extraCp = sparkConf.getOption("spark.driver.extraClassPath") @@ -535,7 +541,7 @@ private[spark] class Client( pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), LOCALIZED_PYTHON_DIR) } - pyArchives.foreach { path => + (pySparkArchives ++ pyArchives).foreach { path => val uri = new URI(path) if (uri.getScheme != LOCAL_SCHEME) { pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), @@ -600,8 +606,9 @@ private[spark] class Client( logInfo("Setting up container launch context for our AM") val appId = newAppResponse.getApplicationId val appStagingDir = getAppStagingDir(appId) - val launchEnv = setupLaunchEnv(appStagingDir) - val localResources = prepareLocalResources(appStagingDir) + val pySparkArchives = findPySparkArchives() + val launchEnv = setupLaunchEnv(appStagingDir, pySparkArchives) + val localResources = prepareLocalResources(appStagingDir, pySparkArchives) // Set the environment variables to be passed on to the executors. distCacheMgr.setDistFilesEnv(launchEnv) @@ -892,6 +899,22 @@ private[spark] class Client( } } } + + private def findPySparkArchives(): Seq[String] = { + sys.env.get("PYSPARK_ARCHIVES_PATH") + .map(_.split(",").toSeq) + .getOrElse { + val pyLibPath = Seq(sys.env("SPARK_HOME"), "python", "lib").mkString(File.separator) + val pyArchivesFile = new File(pyLibPath, "pyspark.zip") + require(pyArchivesFile.exists(), + "pyspark.zip not found; cannot run pyspark application in YARN mode.") + val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip") + require(py4jFile.exists(), + "py4j-0.8.2.1-src.zip not found; cannot run pyspark application in YARN mode.") + Seq(pyArchivesFile.getAbsolutePath(), py4jFile.getAbsolutePath()) + } + } + } object Client extends Logging { diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index a49e5c69c6a19..4ec976aa31387 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -129,7 +129,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll { val tempDir = Utils.createTempDir() try { - client.prepareLocalResources(tempDir.getAbsolutePath()) + client.prepareLocalResources(tempDir.getAbsolutePath(), Nil) sparkConf.getOption(Client.CONF_SPARK_USER_JAR) should be (Some(USER)) // The non-local path should be propagated by name only, since it will end up in the app's From c8e5a82b9add370b0b03f0af98af7fc167ab22a5 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 4 Jun 2015 19:46:29 -0700 Subject: [PATCH 07/10] Actually run pyspark in client mode. --- .../scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 29ab6994bb4b1..99fc73a56b1d3 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -201,7 +201,7 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher val pyFiles = Seq(pyModule.getAbsolutePath(), mod2Archive.getPath()).mkString(",") val result = File.createTempFile("result", null, tempDir) - runSpark(false, primaryPyFile.getAbsolutePath(), + runSpark(clientMode, primaryPyFile.getAbsolutePath(), sparkArgs = Seq("--py-files", pyFiles), appArgs = Seq(result.getAbsolutePath())) checkResult(result) From c74377873bde48341e4eda9f33c7f79f6cf2cc74 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 4 Jun 2015 20:13:58 -0700 Subject: [PATCH 08/10] Only pyspark cares about python archives. --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 24a03753904f1..9f920e2ab66f6 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -606,7 +606,7 @@ private[spark] class Client( logInfo("Setting up container launch context for our AM") val appId = newAppResponse.getApplicationId val appStagingDir = getAppStagingDir(appId) - val pySparkArchives = findPySparkArchives() + val pySparkArchives = if (args.primaryPyFile != null) findPySparkArchives() else Nil val launchEnv = setupLaunchEnv(appStagingDir, pySparkArchives) val localResources = prepareLocalResources(appStagingDir, pySparkArchives) From c47501f650b39beeaf35829cd62fd5b2c6a7451e Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 5 Jun 2015 14:49:25 -0700 Subject: [PATCH 09/10] Fix yarn-client mode. Need a different way to tell Client.scala to distribute pyspark libs. --- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 5 +++++ .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 7 ++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index bd40adb309fac..b8978e25a02d2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -471,6 +471,11 @@ object SparkSubmit { } } + // Let YARN know it's a pyspark app, so it distributes needed libraries. + if (clusterManager == YARN && args.isPython) { + sysProps.put("spark.yarn.isPython", "true") + } + // In yarn-cluster mode, use yarn.Client as a wrapper around the user class if (isYarnCluster) { childMainClass = "org.apache.spark.deploy.yarn.Client" diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 9f920e2ab66f6..0aec538059b35 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -606,7 +606,12 @@ private[spark] class Client( logInfo("Setting up container launch context for our AM") val appId = newAppResponse.getApplicationId val appStagingDir = getAppStagingDir(appId) - val pySparkArchives = if (args.primaryPyFile != null) findPySparkArchives() else Nil + val pySparkArchives = + if (sys.props.getOrElse("spark.yarn.isPython", null) == "true") { + findPySparkArchives() + } else { + Nil + } val launchEnv = setupLaunchEnv(appStagingDir, pySparkArchives) val localResources = prepareLocalResources(appStagingDir, pySparkArchives) From bcaf7e66f62043286a4dde93a9d29b6e462e5804 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 10 Jun 2015 11:05:59 -0700 Subject: [PATCH 10/10] Feedback. --- .../apache/spark/deploy/yarn/ApplicationMaster.scala | 10 ++++------ .../scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 952c86edf75c4..83dafa4a125d2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.rpc._ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv} import org.apache.spark.SparkException -import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -500,8 +500,8 @@ private[spark] class ApplicationMaster( var userArgs = args.userArgs if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) { - // Second argument is the list of files to add to PYTHONPATH, which Client.scala already - // handles, so it's empty. + // When running pyspark, the app is run using PythonRunner. The second argument is the list + // of files to add to PYTHONPATH, which Client.scala already handles, so it's empty. userArgs = Seq(args.primaryPyFile, "") ++ userArgs } if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) { @@ -513,9 +513,7 @@ private[spark] class ApplicationMaster( val userThread = new Thread { override def run() { try { - val mainArgs = new Array[String](userArgs.size) - userArgs.copyToArray(mainArgs, 0, userArgs.size) - mainMethod.invoke(null, mainArgs) + mainMethod.invoke(null, userArgs.toArray) finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) logDebug("Done running users class") } catch { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 0aec538059b35..5fd5cbe86a839 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -607,7 +607,7 @@ private[spark] class Client( val appId = newAppResponse.getApplicationId val appStagingDir = getAppStagingDir(appId) val pySparkArchives = - if (sys.props.getOrElse("spark.yarn.isPython", null) == "true") { + if (sys.props.getOrElse("spark.yarn.isPython", "false").toBoolean) { findPySparkArchives() } else { Nil