From 93988531d7ab4a4e902949744f80121593ba1f52 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 2 May 2014 15:43:34 -0500 Subject: [PATCH 1/3] change to have doAs in executor higher up. --- .../apache/spark/deploy/SparkHadoopUtil.scala | 9 +++- .../CoarseGrainedExecutorBackend.scala | 46 +++++++++++-------- .../org/apache/spark/executor/Executor.scala | 4 +- .../spark/executor/MesosExecutorBackend.scala | 16 ++++--- .../spark/deploy/yarn/ApplicationMaster.scala | 12 +++-- .../spark/deploy/yarn/ExecutorLauncher.scala | 7 ++- .../spark/deploy/yarn/ApplicationMaster.scala | 14 +++--- .../spark/deploy/yarn/ExecutorLauncher.scala | 7 ++- 8 files changed, 72 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 498fcc520ac5e..ca30933d83359 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -24,25 +24,30 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.{Logging, SparkContext, SparkException} import scala.collection.JavaConversions._ /** * Contains util methods to interact with Hadoop from Spark. */ -class SparkHadoopUtil { +class SparkHadoopUtil extends Logging { val conf: Configuration = newConfiguration() UserGroupInformation.setConfiguration(conf) + // IMPORTANT NOTE: If this function is going to be called repeated in the same process + // you need to look https://issues.apache.org/jira/browse/HDFS-3545 and possibly + // do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems def runAsUser(user: String)(func: () => Unit) { if (user != SparkContext.SPARK_UNKNOWN_USER) { + logInfo("running as user: " + user) val ugi = UserGroupInformation.createRemoteUser(user) transferCredentials(UserGroupInformation.getCurrentUser(), ugi) ugi.doAs(new PrivilegedExceptionAction[Unit] { def run: Unit = func() }) } else { + logInfo("running as SPARK_UNKNOWN_USER") func() } } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 9ac7365f47f9f..55e93b839ed90 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -22,8 +22,9 @@ import java.nio.ByteBuffer import akka.actor._ import akka.remote._ -import org.apache.spark.{SecurityManager, SparkConf, Logging} +import org.apache.spark.{SparkContext, Logging, SecurityManager, SparkConf} import org.apache.spark.TaskState.TaskState +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.{AkkaUtils, Utils} @@ -94,25 +95,32 @@ private[spark] class CoarseGrainedExecutorBackend( private[spark] object CoarseGrainedExecutorBackend { def run(driverUrl: String, executorId: String, hostname: String, cores: Int, - workerUrl: Option[String]) { - // Debug code - Utils.checkHost(hostname) - - val conf = new SparkConf - // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor - // before getting started with all our system properties, etc - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0, - indestructible = true, conf = conf, new SecurityManager(conf)) - // set it - val sparkHostPort = hostname + ":" + boundPort - actorSystem.actorOf( - Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, - sparkHostPort, cores), - name = "Executor") - workerUrl.foreach{ url => - actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") + workerUrl: Option[String]) { + + val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER) + SparkHadoopUtil.get.runAsUser(sparkUser) { () => + + // Debug code + Utils.checkHost(hostname) + + val conf = new SparkConf + // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor + // before getting started with all our system properties, etc + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0, + indestructible = true, conf = conf, new SecurityManager(conf)) + // set it + val sparkHostPort = hostname + ":" + boundPort + actorSystem.actorOf( + Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, + sparkHostPort, cores), + name = "Executor") + workerUrl.foreach { + url => + actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") + } + actorSystem.awaitTermination() + } - actorSystem.awaitTermination() } def main(args: Array[String]) { diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 272bcda5f8f2f..98e7e0be813be 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -128,8 +128,6 @@ private[spark] class Executor( // Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] - val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER) - def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { val tr = new TaskRunner(context, taskId, serializedTask) runningTasks.put(taskId, tr) @@ -172,7 +170,7 @@ private[spark] class Executor( } } - override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () => + override def run() { val startTime = System.currentTimeMillis() SparkEnv.set(env) Thread.currentThread.setContextClassLoader(replClassLoader) diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index 64e24506e8038..adc667bbb1368 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -23,10 +23,10 @@ import com.google.protobuf.ByteString import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary} import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _} -import org.apache.spark.Logging -import org.apache.spark.TaskState +import org.apache.spark.{SparkContext, Logging, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.util.Utils +import org.apache.spark.deploy.SparkHadoopUtil private[spark] class MesosExecutorBackend extends MesosExecutor @@ -95,9 +95,13 @@ private[spark] class MesosExecutorBackend */ private[spark] object MesosExecutorBackend { def main(args: Array[String]) { - MesosNativeLibrary.load() - // Create a new Executor and start it running - val runner = new MesosExecutorBackend() - new MesosExecutorDriver(runner).run() + val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER) + SparkHadoopUtil.get.runAsUser(sparkUser) { () => + + MesosNativeLibrary.load() + // Create a new Executor and start it running + val runner = new MesosExecutorBackend() + new MesosExecutorDriver(runner).run() + } } } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index fc13dbecb4555..dddcedf5468da 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -70,9 +70,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private var registered = false - private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( - SparkContext.SPARK_UNKNOWN_USER) - def run() { // Setup the directories so things go to yarn approved directories rather // then user specified and /tmp. @@ -192,7 +189,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, false /* initialize */ , Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) val t = new Thread { - override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () => + override def run() { + var successed = false try { // Copy @@ -480,6 +478,10 @@ object ApplicationMaster { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) - new ApplicationMaster(args).run() + val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( + SparkContext.SPARK_UNKNOWN_USER) + SparkHadoopUtil.get.runAsUser(sparkUser) { () => + new ApplicationMaster(args).run() + } } } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 65b7215afbd4c..96a7567e643e8 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -33,6 +33,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo +import org.apache.spark.deploy.SparkHadoopUtil /** * An application master that allocates executors on behalf of a driver that is running outside @@ -279,6 +280,10 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp object ExecutorLauncher { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) - new ExecutorLauncher(args).run() + val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( + SparkContext.SPARK_UNKNOWN_USER) + SparkHadoopUtil.get.runAsUser(sparkUser) { () => + new ExecutorLauncher(args).run() + } } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 90e807160d4b6..0281db016841c 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -71,9 +71,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) private var registered = false - - private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( - SparkContext.SPARK_UNKNOWN_USER) def run() { // Setup the directories so things go to YARN approved directories rather @@ -179,8 +176,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, false /* initialize */ , Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) val t = new Thread { - override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () => - var successed = false + override def run() { + + var successed = false try { // Copy var mainArgs: Array[String] = new Array[String](args.userArgs.size) @@ -462,6 +460,10 @@ object ApplicationMaster { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) - new ApplicationMaster(args).run() + val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( + SparkContext.SPARK_UNKNOWN_USER) + SparkHadoopUtil.get.runAsUser(sparkUser) { () => + new ApplicationMaster(args).run() + } } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index a14bb377aa133..4b1ec3e3c793e 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -34,6 +34,7 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.spark.deploy.SparkHadoopUtil /** * An application master that allocates executors on behalf of a driver that is running outside @@ -255,6 +256,10 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp object ExecutorLauncher { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) - new ExecutorLauncher(args).run() + val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( + SparkContext.SPARK_UNKNOWN_USER) + SparkHadoopUtil.get.runAsUser(sparkUser) { () => + new ExecutorLauncher(args).run() + } } } From 44163d498b805c0c59689480a92073adc74df256 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 2 May 2014 21:30:29 -0500 Subject: [PATCH 2/3] Rework --- .../apache/spark/deploy/SparkHadoopUtil.scala | 18 ++++++++++++------ .../CoarseGrainedExecutorBackend.scala | 6 ++---- .../spark/executor/MesosExecutorBackend.scala | 6 ++---- .../spark/deploy/yarn/ApplicationMaster.scala | 4 +--- .../spark/deploy/yarn/ExecutorLauncher.scala | 6 ++---- .../spark/deploy/yarn/ApplicationMaster.scala | 4 +--- .../spark/deploy/yarn/ExecutorLauncher.scala | 6 ++---- 7 files changed, 22 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index ca30933d83359..e2df1b8954124 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -35,19 +35,25 @@ class SparkHadoopUtil extends Logging { val conf: Configuration = newConfiguration() UserGroupInformation.setConfiguration(conf) - // IMPORTANT NOTE: If this function is going to be called repeated in the same process - // you need to look https://issues.apache.org/jira/browse/HDFS-3545 and possibly - // do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems - def runAsUser(user: String)(func: () => Unit) { + /** + * Runs the given function with a Hadoop UserGroupInformation as a thread local variable + * (distributed to child threads), used for authenticating HDFS and YARN calls. + * + * IMPORTANT NOTE: If this function is going to be called repeated in the same process + * you need to look https://issues.apache.org/jira/browse/HDFS-3545 and possibly + * do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems + */ + def runAsSparkUser(func: () => Unit) { + val user = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER) if (user != SparkContext.SPARK_UNKNOWN_USER) { - logInfo("running as user: " + user) + logDebug("running as user: " + user) val ugi = UserGroupInformation.createRemoteUser(user) transferCredentials(UserGroupInformation.getCurrentUser(), ugi) ugi.doAs(new PrivilegedExceptionAction[Unit] { def run: Unit = func() }) } else { - logInfo("running as SPARK_UNKNOWN_USER") + logDebug("running as SPARK_UNKNOWN_USER") func() } } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 55e93b839ed90..f34d72932babd 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -22,7 +22,7 @@ import java.nio.ByteBuffer import akka.actor._ import akka.remote._ -import org.apache.spark.{SparkContext, Logging, SecurityManager, SparkConf} +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher @@ -97,9 +97,7 @@ private[spark] object CoarseGrainedExecutorBackend { def run(driverUrl: String, executorId: String, hostname: String, cores: Int, workerUrl: Option[String]) { - val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER) - SparkHadoopUtil.get.runAsUser(sparkUser) { () => - + SparkHadoopUtil.get.runAsSparkUser { () => // Debug code Utils.checkHost(hostname) diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index adc667bbb1368..9b56f711e0e0b 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -23,7 +23,7 @@ import com.google.protobuf.ByteString import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary} import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _} -import org.apache.spark.{SparkContext, Logging, TaskState} +import org.apache.spark.{Logging, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.util.Utils import org.apache.spark.deploy.SparkHadoopUtil @@ -95,9 +95,7 @@ private[spark] class MesosExecutorBackend */ private[spark] object MesosExecutorBackend { def main(args: Array[String]) { - val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER) - SparkHadoopUtil.get.runAsUser(sparkUser) { () => - + SparkHadoopUtil.get.runAsSparkUser { () => MesosNativeLibrary.load() // Create a new Executor and start it running val runner = new MesosExecutorBackend() diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index dddcedf5468da..8f0ecb855718e 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -478,9 +478,7 @@ object ApplicationMaster { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) - val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( - SparkContext.SPARK_UNKNOWN_USER) - SparkHadoopUtil.get.runAsUser(sparkUser) { () => + SparkHadoopUtil.get.runAsSparkUser { () => new ApplicationMaster(args).run() } } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 96a7567e643e8..a3bd91590fc25 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ import akka.remote._ import akka.actor.Terminated -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo @@ -280,9 +280,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp object ExecutorLauncher { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) - val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( - SparkContext.SPARK_UNKNOWN_USER) - SparkHadoopUtil.get.runAsUser(sparkUser) { () => + SparkHadoopUtil.get.runAsSparkUser { () => new ExecutorLauncher(args).run() } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 0281db016841c..c1dfe3f53b40b 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -460,9 +460,7 @@ object ApplicationMaster { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) - val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( - SparkContext.SPARK_UNKNOWN_USER) - SparkHadoopUtil.get.runAsUser(sparkUser) { () => + SparkHadoopUtil.get.runAsSparkUser { () => new ApplicationMaster(args).run() } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 4b1ec3e3c793e..a4ce8766d347c 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ import akka.remote._ import akka.actor.Terminated -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo @@ -256,9 +256,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp object ExecutorLauncher { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) - val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( - SparkContext.SPARK_UNKNOWN_USER) - SparkHadoopUtil.get.runAsUser(sparkUser) { () => + SparkHadoopUtil.get.runAsSparkUser { () => new ExecutorLauncher(args).run() } } From 244d55a6ad9583e3118b40bac350a54669fe2c82 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 2 May 2014 21:41:16 -0500 Subject: [PATCH 3/3] fix line length --- .../apache/spark/executor/CoarseGrainedExecutorBackend.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index f34d72932babd..e912ae8a5d3c5 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -102,8 +102,8 @@ private[spark] object CoarseGrainedExecutorBackend { Utils.checkHost(hostname) val conf = new SparkConf - // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor - // before getting started with all our system properties, etc + // Create a new ActorSystem to run the backend, because we can't create a + // SparkEnv / Executor before getting started with all our system properties, etc val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0, indestructible = true, conf = conf, new SecurityManager(conf)) // set it