From fc3bbd0d72d9319885b877490f57ed4f1b870fa2 Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Thu, 28 Apr 2016 16:16:51 +0530 Subject: [PATCH 1/3] [SPARK-3767] [CORE] Support wildcard in Spark properties --- .../apache/spark/deploy/worker/ExecutorRunner.scala | 10 ++++++++-- .../cluster/mesos/CoarseMesosSchedulerBackend.scala | 3 ++- .../cluster/mesos/MesosSchedulerBackend.scala | 3 ++- core/src/main/scala/org/apache/spark/util/Utils.scala | 7 +++++++ docs/configuration.md | 5 +++++ .../apache/spark/deploy/yarn/ExecutorRunnable.scala | 3 ++- 6 files changed, 26 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 06066248ea5d0..3a883ba031945 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._ import com.google.common.io.Files import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} +import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState} import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged import org.apache.spark.internal.Logging import org.apache.spark.rpc.RpcEndpointRef @@ -142,7 +142,13 @@ private[deploy] class ExecutorRunner( private def fetchAndRunExecutor() { try { // Launch the process - val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf), + val subsCommand = Command(appDesc.command.mainClass, + appDesc.command.arguments, appDesc.command.environment, + appDesc.command.classPathEntries, + appDesc.command.libraryPathEntries, + appDesc.command.javaOpts.map { + opt => Utils.substituteExecIdWildCard(opt, execId.toString) }) + val builder = CommandUtils.buildProcessBuilder(subsCommand, new SecurityManager(conf), memory, sparkHome.getAbsolutePath, substituteVariables) val command = builder.command() val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 50b452c72f8aa..23815cc17eeb2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -166,7 +166,7 @@ private[spark] class CoarseMesosSchedulerBackend( environment.addVariables( Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build()) } - val extraJavaOpts = conf.get("spark.executor.extraJavaOptions", "") + var extraJavaOpts = conf.get("spark.executor.extraJavaOptions", "") // Set the environment variable through a command prefix // to append to the existing value of the variable @@ -174,6 +174,7 @@ private[spark] class CoarseMesosSchedulerBackend( Utils.libraryPathEnvPrefix(Seq(p)) }.getOrElse("") + extraJavaOpts = Utils.substituteExecIdWildCard(extraJavaOpts, taskId) environment.addVariables( Environment.Variable.newBuilder() .setName("SPARK_EXECUTOR_OPTS") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 1a94aee2ca30c..f958f76db62ff 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -102,12 +102,13 @@ private[spark] class MesosSchedulerBackend( environment.addVariables( Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build()) } - val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").getOrElse("") + var extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").getOrElse("") val prefixEnv = sc.conf.getOption("spark.executor.extraLibraryPath").map { p => Utils.libraryPathEnvPrefix(Seq(p)) }.getOrElse("") + extraJavaOpts = Utils.substituteExecIdWildCard(extraJavaOpts, execId) environment.addVariables( Environment.Variable.newBuilder() .setName("SPARK_EXECUTOR_OPTS") diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ea49991493fd7..57b563adce446 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2286,6 +2286,13 @@ private[spark] object Utils extends Logging { log.info(s"Started daemon with process name: ${Utils.getProcessName()}") SignalUtils.registerLogger(log) } + + /** + * Replaces all the @execid@ occurrences with the Executor Id. + */ + def substituteExecIdWildCard(opt: String, execId: String): String = { + opt.replace("@execid@", execId) + } } /** diff --git a/docs/configuration.md b/docs/configuration.md index 6512e16faf4c1..607966c30a8e9 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -275,6 +275,11 @@ Apart from these, the following properties are also available, and may be useful Note that it is illegal to set Spark properties or maximum heap size (-Xmx) settings with this option. Spark properties should be set using a SparkConf object or the spark-defaults.conf file used with the spark-submit script. Maximum heap size settings can be set with spark.executor.memory. + + The following symbol, if present, will be interpolated: @execid@ is replaced by Executor Id. + Any other occurrences of '@' will go unchanged. For example, to enable verbose gc logging to + a file named for the Executor Id in /tmp, pass a 'value' of: + -verbose:gc -Xloggc:/tmp/@execid@.gc diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 3d370e6d71426..da28de41d4d28 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -148,7 +148,8 @@ private[yarn] class ExecutorRunnable( // Set extra Java options for the executor, if defined sparkConf.get(EXECUTOR_JAVA_OPTIONS).foreach { opts => - javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell) + val subsOpt = Utils.substituteExecIdWildCard(opts, slaveId.toString) + javaOpts ++= Utils.splitCommandString(subsOpt).map(YarnSparkHadoopUtil.escapeForShell) } sys.env.get("SPARK_JAVA_OPTS").foreach { opts => javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell) From fdfae738f0cf557cb778f999c74202cdf14db666 Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Wed, 4 May 2016 12:18:36 +0530 Subject: [PATCH 2/3] Fixed the review comment --- .../cluster/mesos/CoarseMesosSchedulerBackend.scala | 5 +++-- .../scheduler/cluster/mesos/MesosSchedulerBackend.scala | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 23815cc17eeb2..d6a7867ceb707 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -166,7 +166,9 @@ private[spark] class CoarseMesosSchedulerBackend( environment.addVariables( Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build()) } - var extraJavaOpts = conf.get("spark.executor.extraJavaOptions", "") + val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions").map { + Utils.substituteExecIdWildCard(_, taskId) + }.getOrElse("") // Set the environment variable through a command prefix // to append to the existing value of the variable @@ -174,7 +176,6 @@ private[spark] class CoarseMesosSchedulerBackend( Utils.libraryPathEnvPrefix(Seq(p)) }.getOrElse("") - extraJavaOpts = Utils.substituteExecIdWildCard(extraJavaOpts, taskId) environment.addVariables( Environment.Variable.newBuilder() .setName("SPARK_EXECUTOR_OPTS") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index f958f76db62ff..5ec119b422279 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -102,13 +102,14 @@ private[spark] class MesosSchedulerBackend( environment.addVariables( Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build()) } - var extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").getOrElse("") + val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").map { + Utils.substituteExecIdWildCard(_, execId) + }.getOrElse("") val prefixEnv = sc.conf.getOption("spark.executor.extraLibraryPath").map { p => Utils.libraryPathEnvPrefix(Seq(p)) }.getOrElse("") - extraJavaOpts = Utils.substituteExecIdWildCard(extraJavaOpts, execId) environment.addVariables( Environment.Variable.newBuilder() .setName("SPARK_EXECUTOR_OPTS") From 23fa1e3763c3a2f9b3a608c69dfd3c0face67d54 Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Thu, 4 Aug 2016 15:30:37 -0700 Subject: [PATCH 3/3] Merge conflict resolve --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c58ce282b71b3..654fdee87a06d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2418,7 +2418,6 @@ private[spark] object Utils extends Logging { sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten } } -} /** * Replaces all the @execid@ occurrences with the Executor Id. @@ -2426,6 +2425,7 @@ private[spark] object Utils extends Logging { def substituteExecIdWildCard(opt: String, execId: String): String = { opt.replace("@execid@", execId) } +} /** * A utility class to redirect the child process's stdout or stderr.