diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index d97857a39fc21..7d7243c8b29b1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -38,7 +38,7 @@ import org.apache.spark.util.ThreadUtils /** * Manager for delegation tokens in a Spark application. * - * When configured with a principal and a keytab, this manager will make sure long-running apps can + * When delegation token renewal is enabled, this manager will make sure long-running apps can * run without interruption while accessing secured services. It periodically logs in to the KDC * with user-provided credentials, and contacts all the configured secure services to obtain * delegation tokens to be distributed to the rest of the application. @@ -47,6 +47,11 @@ import org.apache.spark.util.ThreadUtils * elapsed. The new tokens are sent to the Spark driver endpoint. The driver is tasked with * distributing the tokens to other processes that might need them. * + * Renewal can be enabled in two different ways: by providing a principal and keytab to Spark, or by + * enabling renewal based on the local credential cache. The latter has the drawback that Spark + * can't create new TGTs by itself, so the user has to manually update the Kerberos ticket cache + * externally. + * * This class can also be used just to create delegation tokens, by calling the * `obtainDelegationTokens` method. This option does not require calling the `start` method nor * providing a driver reference, but leaves it up to the caller to distribute the tokens that were @@ -78,7 +83,11 @@ private[spark] class HadoopDelegationTokenManager( private var renewalExecutor: ScheduledExecutorService = _ /** @return Whether delegation token renewal is enabled. */ - def renewalEnabled: Boolean = principal != null + def renewalEnabled: Boolean = sparkConf.get(KERBEROS_RENEWAL_CREDENTIALS) match { + case "keytab" => principal != null + case "ccache" => UserGroupInformation.getCurrentUser().hasKerberosCredentials() + case _ => false + } /** * Start the token renewer. Requires a principal and keytab. Upon start, the renewer will @@ -118,7 +127,7 @@ private[spark] class HadoopDelegationTokenManager( def stop(): Unit = { if (renewalExecutor != null) { - renewalExecutor.shutdown() + renewalExecutor.shutdownNow() } } @@ -179,7 +188,7 @@ private[spark] class HadoopDelegationTokenManager( private def scheduleRenewal(delay: Long): Unit = { val _delay = math.max(0, delay) - logInfo(s"Scheduling login from keytab in ${UIUtils.formatDuration(delay)}.") + logInfo(s"Scheduling renewal in ${UIUtils.formatDuration(delay)}.") val renewalTask = new Runnable() { override def run(): Unit = { @@ -203,6 +212,9 @@ private[spark] class HadoopDelegationTokenManager( schedulerRef.send(UpdateDelegationTokens(tokens)) tokens } catch { + case _: InterruptedException => + // Ignore, may happen if shutting down. + null case e: Exception => val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT)) logWarning(s"Failed to update tokens, will try again in ${UIUtils.formatDuration(delay)}!" + @@ -236,11 +248,19 @@ private[spark] class HadoopDelegationTokenManager( } private def doLogin(): UserGroupInformation = { - logInfo(s"Attempting to login to KDC using principal: $principal") - require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.") - val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab) - logInfo("Successfully logged into KDC.") - ugi + if (principal != null) { + logInfo(s"Attempting to login to KDC using principal: $principal") + require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.") + val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab) + logInfo("Successfully logged into KDC.") + ugi + } else { + logInfo(s"Attempting to load user's ticket cache.") + val ccache = sparkConf.getenv("KRB5CCNAME") + val user = Option(sparkConf.getenv("KRB5PRINCIPAL")).getOrElse( + UserGroupInformation.getCurrentUser().getUserName()) + UserGroupInformation.getUGIFromTicketCache(ccache, user) + } } private def loadProviders(): Map[String, HadoopDelegationTokenProvider] = { diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index 00200f807d224..f303adf950407 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -100,7 +100,7 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: () => Set[Fil creds: Credentials): Credentials = { filesystems.foreach { fs => - logInfo("getting token for: " + fs) + logInfo(s"getting token for: $fs with renewer $renewer") fs.addDelegationTokens(renewer, creds) } @@ -114,22 +114,22 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: () => Set[Fil // We cannot use the tokens generated with renewer yarn. Trying to renew // those will fail with an access control issue. So create new tokens with the logged in // user as renewer. - sparkConf.get(PRINCIPAL).flatMap { renewer => - val creds = new Credentials() - fetchDelegationTokens(renewer, filesystems, creds) - - val renewIntervals = creds.getAllTokens.asScala.filter { - _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier] - }.flatMap { token => - Try { - val newExpiration = token.renew(hadoopConf) - val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] - val interval = newExpiration - identifier.getIssueDate - logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}") - interval - }.toOption - } - if (renewIntervals.isEmpty) None else Some(renewIntervals.min) + val renewer = UserGroupInformation.getCurrentUser().getUserName() + + val creds = new Credentials() + fetchDelegationTokens(renewer, filesystems, creds) + + val renewIntervals = creds.getAllTokens.asScala.filter { + _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier] + }.flatMap { token => + Try { + val newExpiration = token.renew(hadoopConf) + val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] + val interval = newExpiration - identifier.getIssueDate + logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}") + interval + }.toOption } + if (renewIntervals.isEmpty) None else Some(renewIntervals.min) } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index b5912847968f6..0cb83ad76a3a5 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -333,6 +333,16 @@ package object config { .timeConf(TimeUnit.SECONDS) .createWithDefaultString("1m") + private[spark] val KERBEROS_RENEWAL_CREDENTIALS = + ConfigBuilder("spark.kerberos.renewal.credentials") + .doc( + "Which credentials to use when renewing delegation tokens for executors. Can be either " + + "'keytab', the default, which requires a keytab to be provided, or 'ccache', which uses " + + "the local credentials cache.") + .stringConf + .checkValues(Set("keytab", "ccache")) + .createWithDefault("keytab") + private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances") .intConf .createOptional diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 3b058754e7f54..6bc0bdd97f7d9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -110,14 +110,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private val reviveThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread") - class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) - extends ThreadSafeRpcEndpoint with Logging { + class DriverEndpoint extends ThreadSafeRpcEndpoint with Logging { + + override val rpcEnv: RpcEnv = CoarseGrainedSchedulerBackend.this.rpcEnv // Executors that have been lost, but for which we don't yet know the real exit reason. protected val executorsPendingLossReason = new HashSet[String] protected val addressToExecutorId = new HashMap[RpcAddress, String] + // Spark configuration sent to executors. This is a lazy val so that subclasses of the + // scheduler can modify the SparkConf object before this view is created. + private lazy val sparkProperties = scheduler.sc.conf.getAll + .filter { case (k, _) => k.startsWith("spark.") } + .toSeq + override def onStart() { // Periodically revive offers to allow delay scheduling to work val reviveIntervalMs = conf.get(SCHEDULER_REVIVE_INTERVAL).getOrElse(1000L) @@ -386,23 +393,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } - var driverEndpoint: RpcEndpointRef = null + val driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint()) protected def minRegisteredRatio: Double = _minRegisteredRatio override def start() { - val properties = new ArrayBuffer[(String, String)] - for ((key, value) <- scheduler.sc.conf.getAll) { - if (key.startsWith("spark.")) { - properties += ((key, value)) - } - } - - // TODO (prashant) send conf instead of properties - driverEndpoint = createDriverEndpointRef(properties) - if (UserGroupInformation.isSecurityEnabled()) { - delegationTokenManager = createTokenManager(driverEndpoint) + delegationTokenManager = createTokenManager() delegationTokenManager.foreach { dtm => val tokens = if (dtm.renewalEnabled) { dtm.start() @@ -412,20 +409,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp SparkHadoopUtil.get.serialize(creds) } if (tokens != null) { - delegationTokens.set(tokens) + updateDelegationTokens(tokens) } } } } - protected def createDriverEndpointRef( - properties: ArrayBuffer[(String, String)]): RpcEndpointRef = { - rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties)) - } - - protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { - new DriverEndpoint(rpcEnv, properties) - } + protected def createDriverEndpoint(): DriverEndpoint = new DriverEndpoint() def stopExecutors() { try { @@ -715,12 +705,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * Create the delegation token manager to be used for the application. This method is called * once during the start of the scheduler backend (so after the object has already been * fully constructed), only if security is enabled in the Hadoop configuration. - * - * @param schedulerRef RPC endpoint for the scheduler, where updated delegation tokens should be - * sent. */ - protected def createTokenManager( - schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = None + protected def createTokenManager(): Option[HadoopDelegationTokenManager] = None /** * Called when a new set of delegation tokens is sent to the driver. Child classes can override diff --git a/docs/security.md b/docs/security.md index 8416ed91356aa..a1dc584705a4b 100644 --- a/docs/security.md +++ b/docs/security.md @@ -776,16 +776,32 @@ The following options provides finer-grained control for this feature: Long-running applications may run into issues if their run time exceeds the maximum delegation token lifetime configured in services it needs to access. -Spark supports automatically creating new tokens for these applications when running in YARN mode. -Kerberos credentials need to be provided to the Spark application via the `spark-submit` command, -using the `--principal` and `--keytab` parameters. +This feature is not available everywhere. In particular, it's only implemented +on YARN and Kubernetes (both client and cluster modes), and on Mesos when using client mode. -The provided keytab will be copied over to the machine running the Application Master via the Hadoop -Distributed Cache. For this reason, it's strongly recommended that both YARN and HDFS be secured -with encryption, at least. +Spark supports automatically creating new tokens for these applications. There are two ways to +enable this functionality. -The Kerberos login will be periodically renewed using the provided credentials, and new delegation -tokens for supported will be created. +### Using a Keytab + +By providing Spark with a principal and keytab (e.g. using `spark-submit` with `--principal` +and `--keytab` parameters), the application will maintain a valid Kerberos login that can be +used to retrieve delegation tokens indefinitely. + +Note that when using a keytab in cluster mode, it will be copied over to the machine running the +Spark driver. In the case of YARN, this means using HDFS as a staging area for the keytab, so it's +strongly recommended that both YARN and HDFS be secured with encryption, at least. + +### Using a ticket cache + +By setting `spark.kerberos.renewal.credentials` to `ccache` in Spark's configuration, the local +Kerberos ticket cache will be used for authentication. Spark will keep the ticket renewed during its +renewable life, but after it expires a new ticket needs to be acquired (e.g. by running `kinit`). + +It's up to the user to maintain an updated ticket cache that Spark can use. + +The location of the ticket cache can be customized by setting the `KRB5CCNAME` environment +variable. ## Secure Interaction with Kubernetes diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 0d2737efde573..4a91a2f5f10b3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -144,17 +144,15 @@ private[spark] class KubernetesClusterSchedulerBackend( // Don't do anything else - let event handling from the Kubernetes API do the Spark changes } - override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { - new KubernetesDriverEndpoint(sc.env.rpcEnv, properties) + override def createDriverEndpoint(): DriverEndpoint = { + new KubernetesDriverEndpoint() } - override protected def createTokenManager( - schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = { - Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, schedulerRef)) + override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = { + Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint)) } - private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) - extends DriverEndpoint(rpcEnv, sparkProperties) { + private class KubernetesDriverEndpoint extends DriverEndpoint { override def onDisconnected(rpcAddress: RpcAddress): Unit = { // Don't do anything besides disabling the executor - allow the Kubernetes API events to diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index be2854fc16cf2..7e2a8ba4b07d5 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -772,9 +772,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } } - override protected def createTokenManager( - schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = { - Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, schedulerRef)) + override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = { + Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint)) } private def numExecutors(): Int = { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 79922920267e8..e72bcf5502982 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -296,26 +296,21 @@ private[spark] class Client( /** * Set up security tokens for launching our ApplicationMaster container. * - * This method will obtain delegation tokens from all the registered providers, and set them in - * the AM's launch context. + * In client mode, a set of credentials has been obtained by the scheduler, so they are copied + * and sent to the AM. In cluster mode, new credentials are obtained and then sent to the AM, + * along with whatever credentials the current user already has. */ private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = { - val credentials = UserGroupInformation.getCurrentUser().getCredentials() - val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf, null) - credentialManager.obtainDelegationTokens(credentials) - - // When using a proxy user, copy the delegation tokens to the user's credentials. Avoid - // that for regular users, since in those case the user already has access to the TGT, - // and adding delegation tokens could lead to expired or cancelled tokens being used - // later, as reported in SPARK-15754. val currentUser = UserGroupInformation.getCurrentUser() - if (SparkHadoopUtil.get.isProxyUser(currentUser)) { - currentUser.addCredentials(credentials) + val credentials = currentUser.getCredentials() + + if (isClusterMode) { + val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf, null) + credentialManager.obtainDelegationTokens(credentials) } - val dob = new DataOutputBuffer - credentials.writeTokenStorageToStream(dob) - amContainer.setTokens(ByteBuffer.wrap(dob.getData)) + val serializedCreds = SparkHadoopUtil.get.serialize(credentials) + amContainer.setTokens(ByteBuffer.wrap(serializedCreds)) } /** Get the application report from the ResourceManager for an application we have submitted. */ diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 934fba3e6ff35..befddbd319c8b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -43,6 +43,8 @@ private[spark] class YarnClientSchedulerBackend( * This waits until the application is running. */ override def start() { + super.start() + val driverHost = conf.get(config.DRIVER_HOST_ADDRESS) val driverPort = conf.get(config.DRIVER_PORT) val hostport = driverHost + ":" + driverPort @@ -57,14 +59,12 @@ private[spark] class YarnClientSchedulerBackend( client = new Client(args, conf) bindToYarn(client.submitApplication(), None) - // SPARK-8687: Ensure all necessary properties have already been set before - // we initialize our driver scheduler backend, which serves these properties - // to the executors - super.start() waitForApplication() monitorThread = asyncMonitorApplication() monitorThread.start() + + startBindings() } /** diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index 62bf9818ee248..e6680e1829588 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -21,6 +21,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.SparkContext +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.util.Utils @@ -35,6 +36,7 @@ private[spark] class YarnClusterSchedulerBackend( bindToYarn(attemptId.getApplicationId(), Some(attemptId)) super.start() totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(sc.conf) + startBindings() } override def getDriverLogUrls: Option[Map[String, String]] = { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 3dae11e3b4243..821fbcd956d55 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -90,11 +90,10 @@ private[spark] abstract class YarnSchedulerBackend( this.attemptId = attemptId } - override def start() { + protected def startBindings(): Unit = { require(appId.isDefined, "application ID unset") val binding = SchedulerExtensionServiceBinding(sc, appId.get, attemptId) services.start(binding) - super.start() } override def stop(): Unit = { @@ -209,8 +208,8 @@ private[spark] abstract class YarnSchedulerBackend( } } - override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { - new YarnDriverEndpoint(rpcEnv, properties) + override def createDriverEndpoint(): DriverEndpoint = { + new YarnDriverEndpoint() } /** @@ -223,9 +222,8 @@ private[spark] abstract class YarnSchedulerBackend( sc.executorAllocationManager.foreach(_.reset()) } - override protected def createTokenManager( - schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = { - Some(new YARNHadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration, schedulerRef)) + override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = { + Some(new YARNHadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration, driverEndpoint)) } /** @@ -233,8 +231,7 @@ private[spark] abstract class YarnSchedulerBackend( * This endpoint communicates with the executors and queries the AM for an executor's exit * status when the executor is disconnected. */ - private class YarnDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) - extends DriverEndpoint(rpcEnv, sparkProperties) { + private class YarnDriverEndpoint extends DriverEndpoint { /** * When onDisconnected is received at the driver endpoint, the superclass DriverEndpoint