From b2411ba2a2ff2a440d10cb4ed06a1e8b5b95f135 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Wed, 3 Nov 2021 16:49:30 -0700 Subject: [PATCH 1/6] wip --- .../org/apache/spark/deploy/yarn/Client.scala | 27 +++++++++++++++++-- .../org/apache/spark/deploy/yarn/config.scala | 14 ++++++++++ 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index ed54044a13ff3..15e94af398fa7 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -34,7 +34,7 @@ import com.google.common.base.Objects import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission -import org.apache.hadoop.io.Text +import org.apache.hadoop.io.{DataOutputBuffer, Text} import org.apache.hadoop.mapreduce.MRJobConfig import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.util.StringUtils @@ -60,7 +60,7 @@ import org.apache.spark.internal.config.Python._ import org.apache.spark.launcher.{JavaModuleOptions, LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils} import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc.RpcEnv -import org.apache.spark.util.{CallerContext, Utils, YarnContainerInfoHelper} +import org.apache.spark.util.{CallerContext, Utils, VersionUtils, YarnContainerInfoHelper} private[spark] class Client( val args: ClientArguments, @@ -340,6 +340,28 @@ private[spark] class Client( amContainer.setTokens(ByteBuffer.wrap(serializedCreds)) } + /** + * Set configurations sent from AM to RM for renewing delegation tokens. + */ + private def setTokenConf(amContainer: ContainerLaunchContext): Unit = { + // SPARK-37205: this regex is used to grep a list of configurations and send them to YARN RM + // for fetching delegation tokens. See YARN-5910 for more details. + // The feature is only supported in Hadoop 3.x and up, hence the check below. + val regex = sparkConf.get(config.AM_SEND_TOKEN_CONF) + if (regex != null && regex.nonEmpty && VersionUtils.isHadoop3) { + val dob = new DataOutputBuffer(); + val copy = new Configuration(false); + copy.clear(); + hadoopConf.asScala.foreach { entry => + if (entry.getKey.matches(regex)) { + copy.set(entry.getKey, entry.getValue) + } + } + copy.write(dob); + amContainer.setTokensConf(ByteBuffer.wrap(dob.getData)) + } + } + /** Get the application report from the ResourceManager for an application we have submitted. */ def getApplicationReport(appId: ApplicationId): ApplicationReport = yarnClient.getApplicationReport(appId) @@ -1084,6 +1106,7 @@ private[spark] class Client( amContainer.setApplicationACLs( YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava) setupSecurityToken(amContainer) + setTokenConf(amContainer) amContainer } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 1270f1e4a6661..a22237a7981d4 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -78,6 +78,20 @@ package object config extends Logging { .booleanConf .createWithDefault(false) + private[spark] val AM_SEND_TOKEN_CONF = + ConfigBuilder("spark.yarn.am.sendTokenConf") + .doc("The value of this config is a regex expression used to grep a list of " + + "config entries from the job's configuration file (e.g., hdfs-site.xml) and send to " + + "RM, which will then use them when renewing delegation tokens. A typical use case of " + + "this feature is to support delegation tokens in a multi-cluster environment, where " + + "the RM may not have configs for all the (HDFS) clusters a YARN job wants to talk to, " + + "e.g., dfs.nameservices, dfs.ha.namenodes.x, dfs.namenode.rpc-address.x, and so on. " + + "This config mirrors 'mapreduce.job.send-token-conf'. For more details, please check " + + "YARN-5910.") + .version("3.3.0") + .stringConf + .createWithDefault("") + private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS = ConfigBuilder("spark.yarn.executor.failuresValidityInterval") .doc("Interval after which Executor failures will be considered independent and not " + From 8c6e5b827ca7b3f52f37e79ad57bab32edc1317b Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Wed, 17 Nov 2021 09:38:17 -0800 Subject: [PATCH 2/6] logging & rewording --- .../scala/org/apache/spark/deploy/yarn/Client.scala | 2 ++ .../scala/org/apache/spark/deploy/yarn/config.scala | 12 ++++++------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 15e94af398fa7..024ed16b3732e 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -349,12 +349,14 @@ private[spark] class Client( // The feature is only supported in Hadoop 3.x and up, hence the check below. val regex = sparkConf.get(config.AM_SEND_TOKEN_CONF) if (regex != null && regex.nonEmpty && VersionUtils.isHadoop3) { + logInfo(s"Processing token conf (spark.yarn.am.sendTokenConf) with regex $regex") val dob = new DataOutputBuffer(); val copy = new Configuration(false); copy.clear(); hadoopConf.asScala.foreach { entry => if (entry.getKey.matches(regex)) { copy.set(entry.getKey, entry.getValue) + logInfo(s"Captured key: ${entry.getKey} -> value: ${entry.getValue}") } } copy.write(dob); diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index a22237a7981d4..27d2e08bed1a0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -82,12 +82,12 @@ package object config extends Logging { ConfigBuilder("spark.yarn.am.sendTokenConf") .doc("The value of this config is a regex expression used to grep a list of " + "config entries from the job's configuration file (e.g., hdfs-site.xml) and send to " + - "RM, which will then use them when renewing delegation tokens. A typical use case of " + - "this feature is to support delegation tokens in a multi-cluster environment, where " + - "the RM may not have configs for all the (HDFS) clusters a YARN job wants to talk to, " + - "e.g., dfs.nameservices, dfs.ha.namenodes.x, dfs.namenode.rpc-address.x, and so on. " + - "This config mirrors 'mapreduce.job.send-token-conf'. For more details, please check " + - "YARN-5910.") + "RM, which uses them when renewing delegation tokens. A typical use case of " + + "this feature is to support delegation tokens in an environment where a YARN cluster " + + "needs to talk to multiple downstream HDFS clusters, where the YARN RM may not have " + + "configs (e.g., dfs.nameservices, dfs.ha.namenodes.*, dfs.namenode.rpc-address.*)" + + "to connect to these clusters. This config is very similar to " + + "'mapreduce.job.send-token-conf'. Please check YARN-5910 for more details.") .version("3.3.0") .stringConf .createWithDefault("") From 37c430cb029c81ecb0fadf9305aa7b9271ac5760 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Wed, 17 Nov 2021 20:46:53 -0800 Subject: [PATCH 3/6] address comments --- .../org/apache/spark/deploy/yarn/Client.scala | 20 ++++++++++++++--- .../org/apache/spark/deploy/yarn/config.scala | 22 +++++++++---------- 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 024ed16b3732e..fee47702a212f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -76,6 +76,10 @@ private[spark] class Client( private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster" + // ContainerLaunchContext.setTokensConf is only available in Hadoop 2.9+ and 3.x, so here we use + // reflection to avoid compilation for Hadoop 2.7 profile. + private val SET_TOKENS_CONF_METHOD = "setTokensConf" + private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && !isClusterMode private var appMaster: ApplicationMaster = _ private var stagingDirPath: Path = _ @@ -348,19 +352,29 @@ private[spark] class Client( // for fetching delegation tokens. See YARN-5910 for more details. // The feature is only supported in Hadoop 3.x and up, hence the check below. val regex = sparkConf.get(config.AM_SEND_TOKEN_CONF) - if (regex != null && regex.nonEmpty && VersionUtils.isHadoop3) { + if (regex.nonEmpty && VersionUtils.isHadoop3) { logInfo(s"Processing token conf (spark.yarn.am.sendTokenConf) with regex $regex") val dob = new DataOutputBuffer(); val copy = new Configuration(false); copy.clear(); hadoopConf.asScala.foreach { entry => - if (entry.getKey.matches(regex)) { + if (entry.getKey.matches(regex.get)) { copy.set(entry.getKey, entry.getValue) logInfo(s"Captured key: ${entry.getKey} -> value: ${entry.getValue}") } } copy.write(dob); - amContainer.setTokensConf(ByteBuffer.wrap(dob.getData)) + + // since this method was added in Hadoop 2.9 and 3.0, we use reflection here to avoid + // compilation error for Hadoop 2.7 profile. + val setTokensConfMethod = try { + amContainer.getClass.getMethod(SET_TOKENS_CONF_METHOD, classOf[ByteBuffer]) + } catch { + case _: NoSuchMethodException => + throw new SparkException(s"Cannot find setTokensConf method in ${amContainer.getClass}." + + s" Please check YARN version and make sure it is 2.9+ or 3.x") + } + setTokensConfMethod.invoke(ByteBuffer.wrap(dob.getData)) } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 27d2e08bed1a0..39c83e796b276 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -80,17 +80,17 @@ package object config extends Logging { private[spark] val AM_SEND_TOKEN_CONF = ConfigBuilder("spark.yarn.am.sendTokenConf") - .doc("The value of this config is a regex expression used to grep a list of " + - "config entries from the job's configuration file (e.g., hdfs-site.xml) and send to " + - "RM, which uses them when renewing delegation tokens. A typical use case of " + - "this feature is to support delegation tokens in an environment where a YARN cluster " + - "needs to talk to multiple downstream HDFS clusters, where the YARN RM may not have " + - "configs (e.g., dfs.nameservices, dfs.ha.namenodes.*, dfs.namenode.rpc-address.*)" + - "to connect to these clusters. This config is very similar to " + - "'mapreduce.job.send-token-conf'. Please check YARN-5910 for more details.") - .version("3.3.0") - .stringConf - .createWithDefault("") + .doc("This config is only supported for Hadoop 3.x profile. The value of this config is a " + + "regex expression used to grep a list of config entries from the job's configuration " + + "file (e.g., hdfs-site.xml) and send to RM, which uses them when renewing delegation " + + "tokens. A typical use case of this feature is to support delegation tokens in an " + + "environment where a YARN cluster needs to talk to multiple downstream HDFS clusters, " + + "where the YARN RM may not have configs (e.g., dfs.nameservices, dfs.ha.namenodes.*, " + + "dfs.namenode.rpc-address.*) to connect to these clusters. This config is very similar " + + "to 'mapreduce.job.send-token-conf'. Please check YARN-5910 for more details.") + .version("3.3.0") + .stringConf + .createOptional private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS = ConfigBuilder("spark.yarn.executor.failuresValidityInterval") From 8ff6be1ba3c74352d9dd29cc4c86cbab55329455 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Fri, 19 Nov 2021 11:51:52 -0800 Subject: [PATCH 4/6] address review comments --- .../scala/org/apache/spark/deploy/yarn/Client.scala | 4 ++-- .../scala/org/apache/spark/deploy/yarn/config.scala | 11 +++++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index fee47702a212f..b026cab94535d 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -351,9 +351,9 @@ private[spark] class Client( // SPARK-37205: this regex is used to grep a list of configurations and send them to YARN RM // for fetching delegation tokens. See YARN-5910 for more details. // The feature is only supported in Hadoop 3.x and up, hence the check below. - val regex = sparkConf.get(config.AM_SEND_TOKEN_CONF) + val regex = sparkConf.get(config.AM_TOKEN_CONF_REGEX) if (regex.nonEmpty && VersionUtils.isHadoop3) { - logInfo(s"Processing token conf (spark.yarn.am.sendTokenConf) with regex $regex") + logInfo(s"Processing token conf (spark.yarn.am.tokenConfRegex) with regex $regex") val dob = new DataOutputBuffer(); val copy = new Configuration(false); copy.clear(); diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 39c83e796b276..a49a52b7857d3 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -78,16 +78,19 @@ package object config extends Logging { .booleanConf .createWithDefault(false) - private[spark] val AM_SEND_TOKEN_CONF = - ConfigBuilder("spark.yarn.am.sendTokenConf") + private[spark] val AM_TOKEN_CONF_REGEX = + ConfigBuilder("spark.yarn.am.tokenConfRegex") .doc("This config is only supported for Hadoop 3.x profile. The value of this config is a " + "regex expression used to grep a list of config entries from the job's configuration " + "file (e.g., hdfs-site.xml) and send to RM, which uses them when renewing delegation " + "tokens. A typical use case of this feature is to support delegation tokens in an " + "environment where a YARN cluster needs to talk to multiple downstream HDFS clusters, " + "where the YARN RM may not have configs (e.g., dfs.nameservices, dfs.ha.namenodes.*, " + - "dfs.namenode.rpc-address.*) to connect to these clusters. This config is very similar " + - "to 'mapreduce.job.send-token-conf'. Please check YARN-5910 for more details.") + "dfs.namenode.rpc-address.*) to connect to these clusters. In this scenario, Spark " + + "users can specify the config value to be " + + "'^dfs.nameservices$|^dfs.namenode.rpc-address.*$|^dfs.ha.namenodes.*$' to parse " + + "these HDFS configs from the job's local configuration files. This config is very " + + "similar to 'mapreduce.job.send-token-conf'. Please check YARN-5910 for more details.") .version("3.3.0") .stringConf .createOptional From a109ab6dd04e16cc6fcde2c4f89f71626d51cce6 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Mon, 29 Nov 2021 22:30:19 -0800 Subject: [PATCH 5/6] address comments --- .../scala/org/apache/spark/deploy/yarn/Client.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index b026cab94535d..ce44ed6a85aef 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -38,6 +38,7 @@ import org.apache.hadoop.io.{DataOutputBuffer, Text} import org.apache.hadoop.mapreduce.MRJobConfig import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.util.StringUtils +import org.apache.hadoop.util.VersionInfo import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.protocolrecords._ @@ -350,9 +351,14 @@ private[spark] class Client( private def setTokenConf(amContainer: ContainerLaunchContext): Unit = { // SPARK-37205: this regex is used to grep a list of configurations and send them to YARN RM // for fetching delegation tokens. See YARN-5910 for more details. - // The feature is only supported in Hadoop 3.x and up, hence the check below. val regex = sparkConf.get(config.AM_TOKEN_CONF_REGEX) - if (regex.nonEmpty && VersionUtils.isHadoop3) { + // The feature is only supported in Hadoop 2.9+ and 3.x, hence the check below. + val isSupported = VersionUtils.majorMinorVersion(VersionInfo.getVersion) match { + case (2, n) if n >= 9 => true + case (3, _) => true + case _ => false + } + if (regex.nonEmpty && isSupported) { logInfo(s"Processing token conf (spark.yarn.am.tokenConfRegex) with regex $regex") val dob = new DataOutputBuffer(); val copy = new Configuration(false); From c69cb6e9720e8e1f443aa2c52492c5639928435a Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Mon, 29 Nov 2021 22:33:41 -0800 Subject: [PATCH 6/6] improve documentation --- .../org/apache/spark/deploy/yarn/config.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index a49a52b7857d3..838c42d8e55e6 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -80,14 +80,14 @@ package object config extends Logging { private[spark] val AM_TOKEN_CONF_REGEX = ConfigBuilder("spark.yarn.am.tokenConfRegex") - .doc("This config is only supported for Hadoop 3.x profile. The value of this config is a " + - "regex expression used to grep a list of config entries from the job's configuration " + - "file (e.g., hdfs-site.xml) and send to RM, which uses them when renewing delegation " + - "tokens. A typical use case of this feature is to support delegation tokens in an " + - "environment where a YARN cluster needs to talk to multiple downstream HDFS clusters, " + - "where the YARN RM may not have configs (e.g., dfs.nameservices, dfs.ha.namenodes.*, " + - "dfs.namenode.rpc-address.*) to connect to these clusters. In this scenario, Spark " + - "users can specify the config value to be " + + .doc("This config is only supported when Hadoop version is 2.9+ or 3.x (e.g., when using " + + "the Hadoop 3.x profile). The value of this config is a regex expression used to grep a " + + "list of config entries from the job's configuration file (e.g., hdfs-site.xml) and send " + + "to RM, which uses them when renewing delegation tokens. A typical use case of this " + + "feature is to support delegation tokens in an environment where a YARN cluster needs to " + + "talk to multiple downstream HDFS clusters, where the YARN RM may not have configs " + + "(e.g., dfs.nameservices, dfs.ha.namenodes.*, dfs.namenode.rpc-address.*) to connect to " + + "these clusters. In this scenario, Spark users can specify the config value to be " + "'^dfs.nameservices$|^dfs.namenode.rpc-address.*$|^dfs.ha.namenodes.*$' to parse " + "these HDFS configs from the job's local configuration files. This config is very " + "similar to 'mapreduce.job.send-token-conf'. Please check YARN-5910 for more details.")