From 7fa03ec74cc6d586f9db927b558b9cc9e95d025d Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 7 Jan 2019 16:29:31 -0800 Subject: [PATCH 1/5] [SPARK-26595][core] Allow credential renewal based on kerberos ticket cache. This change addes a new mode for credential renewal that does not require a keytab; it uses the local ticket cache instead, so it works while the user keeps the cache valid. This can be useful for, e.g., people running long spark-shell sessions where their kerberos login is kept up-to-date. The main change to enable this behavior is in HadoopDelegationTokenManager, with a small change in the HDFS token provider. The other changes are to avoid creating duplicate tokens when submitting the application to YARN; they allow the tokens from the scheduler to be sent to the YARN AM, reducing the round trips to HDFS. For that, the scheduler initialization code was changed a little bit so that the tokens are available when the YARN client is initialized. That basically takes care of a long-standing TODO that was in the code to clean up configuration propagation to the driver's RPC endpoint (in CoarseGrainedSchedulerBackend). Tested with an app designed to stress this functionality, with both keytab and cache-based logins. Some basic kerberos tests on k8s also. --- .../HadoopDelegationTokenManager.scala | 57 ++++++++++++++----- .../HadoopFSDelegationTokenProvider.scala | 10 +++- .../spark/internal/config/package.scala | 10 ++++ .../CoarseGrainedSchedulerBackend.scala | 34 +++++------ docs/security.md | 32 ++++++++--- .../KubernetesClusterSchedulerBackend.scala | 7 +-- .../org/apache/spark/deploy/yarn/Client.scala | 25 ++++---- .../cluster/YarnClientSchedulerBackend.scala | 8 +-- .../cluster/YarnClusterSchedulerBackend.scala | 2 + .../cluster/YarnSchedulerBackend.scala | 10 ++-- 10 files changed, 121 insertions(+), 74 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index d97857a39fc21..ab33c3791779a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -38,7 +38,7 @@ import org.apache.spark.util.ThreadUtils /** * Manager for delegation tokens in a Spark application. * - * When configured with a principal and a keytab, this manager will make sure long-running apps can + * When delegation token renewal is enabled, this manager will make sure long-running apps can * run without interruption while accessing secured services. It periodically logs in to the KDC * with user-provided credentials, and contacts all the configured secure services to obtain * delegation tokens to be distributed to the rest of the application. @@ -47,6 +47,11 @@ import org.apache.spark.util.ThreadUtils * elapsed. The new tokens are sent to the Spark driver endpoint. The driver is tasked with * distributing the tokens to other processes that might need them. * + * Renewal can be enabled in two different ways: by providing a principal and keytab to Spark, or by + * enabling renewal based on the local credential cache. The latter has the drawback that Spark + * can't create new TGTs by itself, so the user has to manually update the Kerberos ticket cache + * externally. + * * This class can also be used just to create delegation tokens, by calling the * `obtainDelegationTokens` method. This option does not require calling the `start` method nor * providing a driver reference, but leaves it up to the caller to distribute the tokens that were @@ -78,7 +83,11 @@ private[spark] class HadoopDelegationTokenManager( private var renewalExecutor: ScheduledExecutorService = _ /** @return Whether delegation token renewal is enabled. */ - def renewalEnabled: Boolean = principal != null + def renewalEnabled: Boolean = sparkConf.get(KERBEROS_RENEWAL_CREDENTIALS) match { + case "keytab" => principal != null + case "ccache" => UserGroupInformation.getCurrentUser().hasKerberosCredentials() + case _ => false + } /** * Start the token renewer. Requires a principal and keytab. Upon start, the renewer will @@ -97,28 +106,37 @@ private[spark] class HadoopDelegationTokenManager( ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") val ugi = UserGroupInformation.getCurrentUser() - if (ugi.isFromKeytab()) { + val tgtRenewalTask = if (ugi.isFromKeytab()) { // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop 3.x, // it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added in // HADOOP-9567). This task will make sure that the user stays logged in regardless of that // configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if the TGT does // not need to be renewed yet. - val tgtRenewalTask = new Runnable() { + new Runnable() { override def run(): Unit = { ugi.checkTGTAndReloginFromKeytab() } } - val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD) - renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod, - TimeUnit.SECONDS) + } else { + // This seems to be automatically handled by the Hadoop library code, but is here as a + // "just in case" check. As with the above, it's a no-op if the TGT is still valid. + new Runnable() { + override def run(): Unit = { + ugi.reloginFromTicketCache() + } + } } + val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD) + renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod, + TimeUnit.SECONDS) + updateTokensTask() } def stop(): Unit = { if (renewalExecutor != null) { - renewalExecutor.shutdown() + renewalExecutor.shutdownNow() } } @@ -179,7 +197,7 @@ private[spark] class HadoopDelegationTokenManager( private def scheduleRenewal(delay: Long): Unit = { val _delay = math.max(0, delay) - logInfo(s"Scheduling login from keytab in ${UIUtils.formatDuration(delay)}.") + logInfo(s"Scheduling renewal in ${UIUtils.formatDuration(delay)}.") val renewalTask = new Runnable() { override def run(): Unit = { @@ -203,6 +221,9 @@ private[spark] class HadoopDelegationTokenManager( schedulerRef.send(UpdateDelegationTokens(tokens)) tokens } catch { + case _: InterruptedException => + // Ignore, may happen if shutting down. + null case e: Exception => val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT)) logWarning(s"Failed to update tokens, will try again in ${UIUtils.formatDuration(delay)}!" + @@ -236,11 +257,19 @@ private[spark] class HadoopDelegationTokenManager( } private def doLogin(): UserGroupInformation = { - logInfo(s"Attempting to login to KDC using principal: $principal") - require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.") - val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab) - logInfo("Successfully logged into KDC.") - ugi + if (principal != null) { + logInfo(s"Attempting to login to KDC using principal: $principal") + require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.") + val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab) + logInfo("Successfully logged into KDC.") + ugi + } else { + logInfo(s"Attempting to load user's ticket cache.") + val ccache = sparkConf.getenv("KRB5CCNAME") + val user = Option(sparkConf.getenv("KRB5PRINCIPAL")).getOrElse( + UserGroupInformation.getCurrentUser().getUserName()) + UserGroupInformation.getUGIFromTicketCache(ccache, user) + } } private def loadProviders(): Map[String, HadoopDelegationTokenProvider] = { diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index 00200f807d224..03a2bc0cc4ab2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -100,7 +100,7 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: () => Set[Fil creds: Credentials): Credentials = { filesystems.foreach { fs => - logInfo("getting token for: " + fs) + logInfo(s"getting token for: $fs with renewer $renewer") fs.addDelegationTokens(renewer, creds) } @@ -114,7 +114,13 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: () => Set[Fil // We cannot use the tokens generated with renewer yarn. Trying to renew // those will fail with an access control issue. So create new tokens with the logged in // user as renewer. - sparkConf.get(PRINCIPAL).flatMap { renewer => + val user = sparkConf.get(KERBEROS_RENEWAL_CREDENTIALS) match { + case "keytab" => sparkConf.get(PRINCIPAL) + case "ccache" => Some(UserGroupInformation.getCurrentUser().getUserName()) + case _ => None + } + + user.flatMap { renewer => val creds = new Credentials() fetchDelegationTokens(renewer, filesystems, creds) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index c942c2769a42d..f6ce8b2047ddb 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -228,6 +228,16 @@ package object config { .timeConf(TimeUnit.SECONDS) .createWithDefaultString("1m") + private[spark] val KERBEROS_RENEWAL_CREDENTIALS = + ConfigBuilder("spark.kerberos.renewal.credentials") + .doc( + "Which credentials to use when renewing delegation tokens for executors. Can be either " + + "'keytab', the default, which requires a keytab to be provided, or 'ccache', which uses " + + "the local credentials cache.") + .stringConf + .checkValues(Set("keytab", "ccache")) + .createWithDefault("keytab") + private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances") .intConf .createOptional diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 98ed2fffc0ac5..b199901ac9c3a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -108,14 +108,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private val reviveThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread") - class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) - extends ThreadSafeRpcEndpoint with Logging { + class DriverEndpoint extends ThreadSafeRpcEndpoint with Logging { + + override val rpcEnv: RpcEnv = CoarseGrainedSchedulerBackend.this.rpcEnv // Executors that have been lost, but for which we don't yet know the real exit reason. protected val executorsPendingLossReason = new HashSet[String] protected val addressToExecutorId = new HashMap[RpcAddress, String] + // Spark configuration sent to executors. This is a lazy val so that subclasses of the + // scheduler can modify the SparkConf object before this view is created. + private lazy val sparkProperties = scheduler.sc.conf.getAll + .filter { case (k, _) => k.startsWith("spark.") } + .toSeq + override def onStart() { // Periodically revive offers to allow delay scheduling to work val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s") @@ -384,21 +391,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } - var driverEndpoint: RpcEndpointRef = null + protected val driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint()) protected def minRegisteredRatio: Double = _minRegisteredRatio override def start() { - val properties = new ArrayBuffer[(String, String)] - for ((key, value) <- scheduler.sc.conf.getAll) { - if (key.startsWith("spark.")) { - properties += ((key, value)) - } - } - - // TODO (prashant) send conf instead of properties - driverEndpoint = createDriverEndpointRef(properties) - if (UserGroupInformation.isSecurityEnabled()) { delegationTokenManager = createTokenManager(driverEndpoint) delegationTokenManager.foreach { dtm => @@ -410,19 +407,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp SparkHadoopUtil.get.serialize(creds) } if (tokens != null) { - delegationTokens.set(tokens) + updateDelegationTokens(tokens) } } } } - protected def createDriverEndpointRef( - properties: ArrayBuffer[(String, String)]): RpcEndpointRef = { - rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties)) - } - - protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { - new DriverEndpoint(rpcEnv, properties) + protected def createDriverEndpoint(): DriverEndpoint = { + new DriverEndpoint() } def stopExecutors() { diff --git a/docs/security.md b/docs/security.md index 8416ed91356aa..a1dc584705a4b 100644 --- a/docs/security.md +++ b/docs/security.md @@ -776,16 +776,32 @@ The following options provides finer-grained control for this feature: Long-running applications may run into issues if their run time exceeds the maximum delegation token lifetime configured in services it needs to access. -Spark supports automatically creating new tokens for these applications when running in YARN mode. -Kerberos credentials need to be provided to the Spark application via the `spark-submit` command, -using the `--principal` and `--keytab` parameters. +This feature is not available everywhere. In particular, it's only implemented +on YARN and Kubernetes (both client and cluster modes), and on Mesos when using client mode. -The provided keytab will be copied over to the machine running the Application Master via the Hadoop -Distributed Cache. For this reason, it's strongly recommended that both YARN and HDFS be secured -with encryption, at least. +Spark supports automatically creating new tokens for these applications. There are two ways to +enable this functionality. -The Kerberos login will be periodically renewed using the provided credentials, and new delegation -tokens for supported will be created. +### Using a Keytab + +By providing Spark with a principal and keytab (e.g. using `spark-submit` with `--principal` +and `--keytab` parameters), the application will maintain a valid Kerberos login that can be +used to retrieve delegation tokens indefinitely. + +Note that when using a keytab in cluster mode, it will be copied over to the machine running the +Spark driver. In the case of YARN, this means using HDFS as a staging area for the keytab, so it's +strongly recommended that both YARN and HDFS be secured with encryption, at least. + +### Using a ticket cache + +By setting `spark.kerberos.renewal.credentials` to `ccache` in Spark's configuration, the local +Kerberos ticket cache will be used for authentication. Spark will keep the ticket renewed during its +renewable life, but after it expires a new ticket needs to be acquired (e.g. by running `kinit`). + +It's up to the user to maintain an updated ticket cache that Spark can use. + +The location of the ticket cache can be customized by setting the `KRB5CCNAME` environment +variable. ## Secure Interaction with Kubernetes diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index e285e202a1488..bae29db0f712f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -143,8 +143,8 @@ private[spark] class KubernetesClusterSchedulerBackend( // Don't do anything else - let event handling from the Kubernetes API do the Spark changes } - override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { - new KubernetesDriverEndpoint(sc.env.rpcEnv, properties) + override def createDriverEndpoint(): DriverEndpoint = { + new KubernetesDriverEndpoint() } override protected def createTokenManager( @@ -152,8 +152,7 @@ private[spark] class KubernetesClusterSchedulerBackend( Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, schedulerRef)) } - private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) - extends DriverEndpoint(rpcEnv, sparkProperties) { + private class KubernetesDriverEndpoint extends DriverEndpoint { override def onDisconnected(rpcAddress: RpcAddress): Unit = { // Don't do anything besides disabling the executor - allow the Kubernetes API events to diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 84921800a4719..2da42f76ab058 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -296,26 +296,21 @@ private[spark] class Client( /** * Set up security tokens for launching our ApplicationMaster container. * - * This method will obtain delegation tokens from all the registered providers, and set them in - * the AM's launch context. + * In client mode, a set of credentials has been obtained by the scheduler, so they are copied + * and sent to the AM. In cluster mode, new credentials are obtained and then sent to the AM, + * along with whatever credentials the current user already has. */ private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = { - val credentials = UserGroupInformation.getCurrentUser().getCredentials() - val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf, null) - credentialManager.obtainDelegationTokens(credentials) - - // When using a proxy user, copy the delegation tokens to the user's credentials. Avoid - // that for regular users, since in those case the user already has access to the TGT, - // and adding delegation tokens could lead to expired or cancelled tokens being used - // later, as reported in SPARK-15754. val currentUser = UserGroupInformation.getCurrentUser() - if (SparkHadoopUtil.get.isProxyUser(currentUser)) { - currentUser.addCredentials(credentials) + val credentials = currentUser.getCredentials() + + if (isClusterMode) { + val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf, null) + credentialManager.obtainDelegationTokens(credentials) } - val dob = new DataOutputBuffer - credentials.writeTokenStorageToStream(dob) - amContainer.setTokens(ByteBuffer.wrap(dob.getData)) + val serializedCreds = SparkHadoopUtil.get.serialize(credentials) + amContainer.setTokens(ByteBuffer.wrap(serializedCreds)) } /** Get the application report from the ResourceManager for an application we have submitted. */ diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 934fba3e6ff35..befddbd319c8b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -43,6 +43,8 @@ private[spark] class YarnClientSchedulerBackend( * This waits until the application is running. */ override def start() { + super.start() + val driverHost = conf.get(config.DRIVER_HOST_ADDRESS) val driverPort = conf.get(config.DRIVER_PORT) val hostport = driverHost + ":" + driverPort @@ -57,14 +59,12 @@ private[spark] class YarnClientSchedulerBackend( client = new Client(args, conf) bindToYarn(client.submitApplication(), None) - // SPARK-8687: Ensure all necessary properties have already been set before - // we initialize our driver scheduler backend, which serves these properties - // to the executors - super.start() waitForApplication() monitorThread = asyncMonitorApplication() monitorThread.start() + + startBindings() } /** diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index 62bf9818ee248..e6680e1829588 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -21,6 +21,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.SparkContext +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.util.Utils @@ -35,6 +36,7 @@ private[spark] class YarnClusterSchedulerBackend( bindToYarn(attemptId.getApplicationId(), Some(attemptId)) super.start() totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(sc.conf) + startBindings() } override def getDriverLogUrls: Option[Map[String, String]] = { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index a7bed75a02ad5..94d944dd69d57 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -89,11 +89,10 @@ private[spark] abstract class YarnSchedulerBackend( this.attemptId = attemptId } - override def start() { + protected def startBindings(): Unit = { require(appId.isDefined, "application ID unset") val binding = SchedulerExtensionServiceBinding(sc, appId.get, attemptId) services.start(binding) - super.start() } override def stop(): Unit = { @@ -208,8 +207,8 @@ private[spark] abstract class YarnSchedulerBackend( } } - override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { - new YarnDriverEndpoint(rpcEnv, properties) + override def createDriverEndpoint(): DriverEndpoint = { + new YarnDriverEndpoint() } /** @@ -232,8 +231,7 @@ private[spark] abstract class YarnSchedulerBackend( * This endpoint communicates with the executors and queries the AM for an executor's exit * status when the executor is disconnected. */ - private class YarnDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) - extends DriverEndpoint(rpcEnv, sparkProperties) { + private class YarnDriverEndpoint extends DriverEndpoint { /** * When onDisconnected is received at the driver endpoint, the superclass DriverEndpoint From f7e6734cb0a06df4e3f3262ea3b434ffc6c4bcc0 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 11 Jan 2019 14:33:39 -0800 Subject: [PATCH 2/5] Small cleanup. --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 8 ++------ .../cluster/k8s/KubernetesClusterSchedulerBackend.scala | 5 ++--- .../mesos/MesosCoarseGrainedSchedulerBackend.scala | 5 ++--- .../spark/scheduler/cluster/YarnSchedulerBackend.scala | 5 ++--- 4 files changed, 8 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index b199901ac9c3a..688493594f41c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -397,7 +397,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def start() { if (UserGroupInformation.isSecurityEnabled()) { - delegationTokenManager = createTokenManager(driverEndpoint) + delegationTokenManager = createTokenManager() delegationTokenManager.foreach { dtm => val tokens = if (dtm.renewalEnabled) { dtm.start() @@ -705,12 +705,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * Create the delegation token manager to be used for the application. This method is called * once during the start of the scheduler backend (so after the object has already been * fully constructed), only if security is enabled in the Hadoop configuration. - * - * @param schedulerRef RPC endpoint for the scheduler, where updated delegation tokens should be - * sent. */ - protected def createTokenManager( - schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = None + protected def createTokenManager(): Option[HadoopDelegationTokenManager] = None /** * Called when a new set of delegation tokens is sent to the driver. Child classes can override diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index bae29db0f712f..6d63ae97f5784 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -147,9 +147,8 @@ private[spark] class KubernetesClusterSchedulerBackend( new KubernetesDriverEndpoint() } - override protected def createTokenManager( - schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = { - Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, schedulerRef)) + override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = { + Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint)) } private class KubernetesDriverEndpoint extends DriverEndpoint { diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index fb235350700f9..79027c348a4d1 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -773,9 +773,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } } - override protected def createTokenManager( - schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = { - Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, schedulerRef)) + override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = { + Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint)) } private def numExecutors(): Int = { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 94d944dd69d57..374fcf2c1ede8 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -221,9 +221,8 @@ private[spark] abstract class YarnSchedulerBackend( sc.executorAllocationManager.foreach(_.reset()) } - override protected def createTokenManager( - schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = { - Some(new YARNHadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration, schedulerRef)) + override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = { + Some(new YARNHadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration, driverEndpoint)) } /** From df85d6863b202071a6ea5664b7a9fd6b476408a2 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 11 Jan 2019 14:41:53 -0800 Subject: [PATCH 3/5] Test compilation fix. --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 688493594f41c..c1aa0a099117b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -391,7 +391,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } - protected val driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint()) + val driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint()) protected def minRegisteredRatio: Double = _minRegisteredRatio From 331bb6b071e157b661cd5ca56904c950575bbc51 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 22 Jan 2019 11:29:14 -0800 Subject: [PATCH 4/5] Remove tgt renewal task (let Hadoop libs take care of it). --- .../HadoopDelegationTokenManager.scala | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index ab33c3791779a..7d7243c8b29b1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -106,31 +106,22 @@ private[spark] class HadoopDelegationTokenManager( ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") val ugi = UserGroupInformation.getCurrentUser() - val tgtRenewalTask = if (ugi.isFromKeytab()) { + if (ugi.isFromKeytab()) { // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop 3.x, // it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added in // HADOOP-9567). This task will make sure that the user stays logged in regardless of that // configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if the TGT does // not need to be renewed yet. - new Runnable() { + val tgtRenewalTask = new Runnable() { override def run(): Unit = { ugi.checkTGTAndReloginFromKeytab() } } - } else { - // This seems to be automatically handled by the Hadoop library code, but is here as a - // "just in case" check. As with the above, it's a no-op if the TGT is still valid. - new Runnable() { - override def run(): Unit = { - ugi.reloginFromTicketCache() - } - } + val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD) + renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod, + TimeUnit.SECONDS) } - val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD) - renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod, - TimeUnit.SECONDS) - updateTokensTask() } From 57aed4706768f87fb6bbcf1823d35e66f517d485 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 23 Jan 2019 11:22:39 -0800 Subject: [PATCH 5/5] Feedback. --- .../HadoopFSDelegationTokenProvider.scala | 38 ++++++++----------- .../CoarseGrainedSchedulerBackend.scala | 4 +- 2 files changed, 17 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index 03a2bc0cc4ab2..f303adf950407 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -114,28 +114,22 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: () => Set[Fil // We cannot use the tokens generated with renewer yarn. Trying to renew // those will fail with an access control issue. So create new tokens with the logged in // user as renewer. - val user = sparkConf.get(KERBEROS_RENEWAL_CREDENTIALS) match { - case "keytab" => sparkConf.get(PRINCIPAL) - case "ccache" => Some(UserGroupInformation.getCurrentUser().getUserName()) - case _ => None - } - - user.flatMap { renewer => - val creds = new Credentials() - fetchDelegationTokens(renewer, filesystems, creds) - - val renewIntervals = creds.getAllTokens.asScala.filter { - _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier] - }.flatMap { token => - Try { - val newExpiration = token.renew(hadoopConf) - val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] - val interval = newExpiration - identifier.getIssueDate - logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}") - interval - }.toOption - } - if (renewIntervals.isEmpty) None else Some(renewIntervals.min) + val renewer = UserGroupInformation.getCurrentUser().getUserName() + + val creds = new Credentials() + fetchDelegationTokens(renewer, filesystems, creds) + + val renewIntervals = creds.getAllTokens.asScala.filter { + _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier] + }.flatMap { token => + Try { + val newExpiration = token.renew(hadoopConf) + val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] + val interval = newExpiration - identifier.getIssueDate + logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}") + interval + }.toOption } + if (renewIntervals.isEmpty) None else Some(renewIntervals.min) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f944eaa6abf9d..6bc0bdd97f7d9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -415,9 +415,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } - protected def createDriverEndpoint(): DriverEndpoint = { - new DriverEndpoint() - } + protected def createDriverEndpoint(): DriverEndpoint = new DriverEndpoint() def stopExecutors() { try {