Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 27 additions & 44 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -351,14 +351,6 @@ private[spark] class Client(

val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()

val oldLog4jConf = Option(System.getenv("SPARK_LOG4J_CONF"))
if (oldLog4jConf.isDefined) {
logWarning(
"SPARK_LOG4J_CONF detected in the system environment. This variable has been " +
"deprecated. Please refer to the \"Launching Spark on YARN\" documentation " +
"for alternatives.")
}

def addDistributedUri(uri: URI): Boolean = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I removed the support of SPARK_LOG4J_CONF, though I already did it in #11603 , I can handle the merge conflicts.

val uriStr = uri.toString()
if (distributedUris.contains(uriStr)) {
Expand Down Expand Up @@ -480,25 +472,16 @@ private[spark] class Client(
}

/**
* Copy a few resources to the distributed cache if their scheme is not "local".
* Copy user jar to the distributed cache if their scheme is not "local".
* Otherwise, set the corresponding key in our SparkConf to handle it downstream.
* Each resource is represented by a 3-tuple of:
* (1) destination resource name,
* (2) local path to the resource,
* (3) Spark property key to set if the scheme is not local
*/
List(
(APP_JAR_NAME, args.userJar, APP_JAR),
("log4j.properties", oldLog4jConf.orNull, null)
).foreach { case (destName, path, confKey) =>
if (path != null && !path.trim().isEmpty()) {
val (isLocal, localizedPath) = distribute(path, destName = Some(destName))
if (isLocal && confKey != null) {
require(localizedPath != null, s"Path $path already distributed.")
// If the resource is intended for local use only, handle this downstream
// by setting the appropriate property
sparkConf.set(confKey, localizedPath)
}
Option(args.userJar).filter(_.trim.nonEmpty).foreach { jar =>
val (isLocal, localizedPath) = distribute(jar, destName = Some(APP_JAR_NAME))
if (isLocal) {
require(localizedPath != null, s"Path $jar already distributed")
// If the resource is intended for local use only, handle this downstream
// by setting the appropriate property
sparkConf.set(APP_JAR, localizedPath)
}
}

Expand Down Expand Up @@ -542,11 +525,10 @@ private[spark] class Client(
distribute(f, targetDir = targetDir)
}

// Distribute an archive with Hadoop and Spark configuration for the AM.
// Distribute an archive with Hadoop and Spark configuration for the AM and executors.
val (_, confLocalizedPath) = distribute(createConfArchive().toURI().getPath(),
resType = LocalResourceType.ARCHIVE,
destName = Some(LOCALIZED_CONF_DIR),
appMasterOnly = true)
destName = Some(LOCALIZED_CONF_DIR))
require(confLocalizedPath != null)

localResources
Expand All @@ -555,10 +537,10 @@ private[spark] class Client(
/**
* Create an archive with the config files for distribution.
*
* These are only used by the AM, since executors will use the configuration object broadcast by
* the driver. The files are zipped and added to the job as an archive, so that YARN will explode
* it when distributing to the AM. This directory is then added to the classpath of the AM
* process, just to make sure that everybody is using the same default config.
* These will be used by AM and executors. The files are zipped and added to the job as an
* archive, so that YARN will explode it when distributing to AM and executors. This directory
* is then added to the classpath of AM and executor process, just to make sure that everybody
* is using the same default config.
*
* This follows the order of precedence set by the startup scripts, in which HADOOP_CONF_DIR
* shows up in the classpath before YARN_CONF_DIR.
Expand All @@ -577,11 +559,14 @@ private[spark] class Client(
// required when user changes log4j.properties directly to set the log configurations. If
// configuration file is provided through --files then executors will be taking configurations
// from --files instead of $SPARK_CONF_DIR/log4j.properties.
val log4jFileName = "log4j.properties"
Option(Utils.getContextOrSparkClassLoader.getResource(log4jFileName)).foreach { url =>
if (url.getProtocol == "file") {
hadoopConfFiles(log4jFileName) = new File(url.getPath)
}

// Also uploading metrics.properties to distributed cache if exists in classpath.
// If user specify this file using --files then executors will use the one
// from --files instead.
for { prop <- Seq("log4j.properties", "metrics.properties")
url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop))
if url.getProtocol == "file" } {
hadoopConfFiles(prop) = new File(url.getPath)
}

Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey =>
Expand Down Expand Up @@ -660,7 +645,7 @@ private[spark] class Client(
pySparkArchives: Seq[String]): HashMap[String, String] = {
logInfo("Setting up the launch environment for our AM container")
val env = new HashMap[String, String]()
populateClasspath(args, yarnConf, sparkConf, env, true, sparkConf.get(DRIVER_CLASS_PATH))
populateClasspath(args, yarnConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH))
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
Expand Down Expand Up @@ -1237,18 +1222,16 @@ object Client extends Logging {
conf: Configuration,
sparkConf: SparkConf,
env: HashMap[String, String],
isAM: Boolean,
extraClassPath: Option[String] = None): Unit = {
extraClassPath.foreach { cp =>
addClasspathEntry(getClusterPath(sparkConf, cp), env)
}

addClasspathEntry(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env)

if (isAM) {
addClasspathEntry(
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR +
LOCALIZED_CONF_DIR, env)
}
addClasspathEntry(
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR +
LOCALIZED_CONF_DIR, env)

if (sparkConf.get(USER_CLASS_PATH_FIRST)) {
// in order to properly add the app jar when user classpath is first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,7 @@ private[yarn] class ExecutorRunnable(

private def prepareEnvironment(container: Container): HashMap[String, String] = {
val env = new HashMap[String, String]()
Client.populateClasspath(null, yarnConf, sparkConf, env, false,
sparkConf.get(EXECUTOR_CLASS_PATH))
Client.populateClasspath(null, yarnConf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))

sparkConf.getExecutorEnv.foreach { case (key, value) =>
// This assumes each executor environment variable set here is a path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val env = new MutableHashMap[String, String]()
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)

populateClasspath(args, conf, sparkConf, env, true)
populateClasspath(args, conf, sparkConf, env)

val cp = env("CLASSPATH").split(":|;|<CPS>")
s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
Expand Down Expand Up @@ -178,8 +178,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
"/remotePath/1:/remotePath/2")

val env = new MutableHashMap[String, String]()
populateClasspath(null, conf, sparkConf, env, false,
extraClassPath = Some("/localPath/my1.jar"))
populateClasspath(null, conf, sparkConf, env, extraClassPath = Some("/localPath/my1.jar"))
val cp = classpath(env)
cp should contain ("/remotePath/spark.jar")
cp should contain ("/remotePath/my1.jar")
Expand Down Expand Up @@ -355,7 +354,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll

private def classpath(client: Client): Array[String] = {
val env = new MutableHashMap[String, String]()
populateClasspath(null, client.hadoopConf, client.sparkConf, env, false)
populateClasspath(null, client.hadoopConf, client.sparkConf, env)
classpath(env)
}

Expand Down