From e9009e046e496cab6c1a226756eca09b545c7179 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Wed, 30 Apr 2014 13:51:54 -0700 Subject: [PATCH] SPARK-1676 Cache Hadoop UGIs by default to prevent FileSystem leak UserGroupInformation objects (UGIs) are used for Hadoop security. A relatively recent PR (#29) makes Spark always use UGIs when executing tasks. Unfortunately, this causes HDFS-3545, which causes the FileSystem cache to continuously create new FileSystems, as the UGIs look different (even though they're logically identical). This causes a memory and sometimes file descriptor leak for FileSystems (like S3N) which maintain open connections. This solution is to introduce a config option (enabled by default) which reuses a single Spark user UGI, rather than creating new ones for each task. The downside to this approach is that UGIs cannot be safely cached (see the notes in HDFS-3545). For example, if a token expires, it will never be cleared from the UGI but may be used anyway (usage of a particular token on a UGI is nondeterministic as it is backed by a Set). This setting is enabled by default because the memory leak can become serious very quickly. In one benchmark, attempting to read 10k files from an S3 directory caused 45k connections to remain open to S3 after the job completed. These file descriptors are never cleaned up, nor the memory used by the associated FileSystems. --- .../apache/spark/deploy/SparkHadoopUtil.scala | 27 ++++++++++++++----- .../org/apache/spark/executor/Executor.scala | 10 +++++-- docs/configuration.md | 11 ++++++++ .../spark/deploy/yarn/ApplicationMaster.scala | 3 +-- .../spark/deploy/yarn/ApplicationMaster.scala | 3 +-- 5 files changed, 41 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 9bdbfb33bf54f..58d686681709c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -35,15 +35,28 @@ class SparkHadoopUtil { val conf: Configuration = newConfiguration() UserGroupInformation.setConfiguration(conf) - def runAsUser(user: String)(func: () => Unit) { + /** Creates a UserGroupInformation for Spark based on SPARK_USER environment variable. */ + def createSparkUser(): Option[UserGroupInformation] = { + val user = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER) if (user != SparkContext.SPARK_UNKNOWN_USER) { - val ugi = UserGroupInformation.createRemoteUser(user) - transferCredentials(UserGroupInformation.getCurrentUser(), ugi) - ugi.doAs(new PrivilegedExceptionAction[Unit] { - def run: Unit = func() - }) + Some(UserGroupInformation.createRemoteUser(user)) } else { - func() + None + } + } + + /** + * If a user is specified, we will run the function as that user. We additionally transfer + * Spark's tokens to the given UGI to ensure it has access to data written by Spark. + */ + def runAsUser(user: Option[UserGroupInformation])(func: () => Unit) { + user match { + case Some(ugi) => { + transferCredentials(UserGroupInformation.getCurrentUser(), ugi) + ugi.doAs(new PrivilegedExceptionAction[Unit] { + def run: Unit = func() + })} + case None => func() } } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 272bcda5f8f2f..e2a83ba425137 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -128,7 +128,13 @@ private[spark] class Executor( // Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] - val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER) + // NB: Workaround for SPARK-1676. Caching UGIs prevents continuously creating FileSystem + // objects with "unique" UGIs, but is not a good solution if real UGIs and tokens are needed, + // mainly because expired tokens cannot be removed from the UGI. + val cacheUgi = conf.getBoolean("spark.user.cacheUserGroupInformation", true) + + val cachedSparkUser = SparkHadoopUtil.get.createSparkUser() + def getSparkUser = if (cacheUgi) cachedSparkUser else SparkHadoopUtil.get.createSparkUser() def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { val tr = new TaskRunner(context, taskId, serializedTask) @@ -172,7 +178,7 @@ private[spark] class Executor( } } - override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () => + override def run(): Unit = SparkHadoopUtil.get.runAsUser(getSparkUser) { () => val startTime = System.currentTimeMillis() SparkEnv.set(env) Thread.currentThread.setContextClassLoader(replClassLoader) diff --git a/docs/configuration.md b/docs/configuration.md index e7e1dd56cf124..af3c3e32fc98a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -679,6 +679,17 @@ Apart from these, the following properties are also available, and may be useful Set a special library path to use when launching executor JVM's. + + spark.user.cacheUserGroupInformation + true + + Caching UGIs is a workaround for [SPARK-1676](https://issues.apache.org/jira/browse/SPARK-1676) + for users who are not using security in a very serious manner. Caching UGIs can produce + security-related exceptions when tokens have an expiry, or are shared between users. On the other + hand, not caching UGIs means that every FileSystem.get() call can potentially create and cache a + new FileSystem object, which leads to leaked memory and file descriptors. + + diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 2f74965900baf..edb2de1fa4b62 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -70,8 +70,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private var registered = false - private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( - SparkContext.SPARK_UNKNOWN_USER) + private val sparkUser = SparkHadoopUtil.createSparkUser() def run() { // Setup the directories so things go to yarn approved directories rather diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 90e807160d4b6..64fba45ef28a8 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -72,8 +72,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private var registered = false - private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( - SparkContext.SPARK_UNKNOWN_USER) + private val sparkUser = SparkHadoopUtil.createSparkUser() def run() { // Setup the directories so things go to YARN approved directories rather