Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.util.ThreadUtils
/**
* Manager for delegation tokens in a Spark application.
*
* When configured with a principal and a keytab, this manager will make sure long-running apps can
* When delegation token renewal is enabled, this manager will make sure long-running apps can
* run without interruption while accessing secured services. It periodically logs in to the KDC
* with user-provided credentials, and contacts all the configured secure services to obtain
* delegation tokens to be distributed to the rest of the application.
Expand All @@ -47,6 +47,11 @@ import org.apache.spark.util.ThreadUtils
* elapsed. The new tokens are sent to the Spark driver endpoint. The driver is tasked with
* distributing the tokens to other processes that might need them.
*
* Renewal can be enabled in two different ways: by providing a principal and keytab to Spark, or by
* enabling renewal based on the local credential cache. The latter has the drawback that Spark
* can't create new TGTs by itself, so the user has to manually update the Kerberos ticket cache
* externally.
*
* This class can also be used just to create delegation tokens, by calling the
* `obtainDelegationTokens` method. This option does not require calling the `start` method nor
* providing a driver reference, but leaves it up to the caller to distribute the tokens that were
Expand Down Expand Up @@ -78,7 +83,11 @@ private[spark] class HadoopDelegationTokenManager(
private var renewalExecutor: ScheduledExecutorService = _

/** @return Whether delegation token renewal is enabled. */
def renewalEnabled: Boolean = principal != null
def renewalEnabled: Boolean = sparkConf.get(KERBEROS_RENEWAL_CREDENTIALS) match {
case "keytab" => principal != null
case "ccache" => UserGroupInformation.getCurrentUser().hasKerberosCredentials()
case _ => false
}

/**
* Start the token renewer. Requires a principal and keytab. Upon start, the renewer will
Expand Down Expand Up @@ -118,7 +127,7 @@ private[spark] class HadoopDelegationTokenManager(

def stop(): Unit = {
if (renewalExecutor != null) {
renewalExecutor.shutdown()
renewalExecutor.shutdownNow()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why force needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown doesn't stop the executor. Any schedule tasks will still be run. So the executor might stay up for about a day if there's a renewal task scheduled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, thanks for the info.

}
}

Expand Down Expand Up @@ -179,7 +188,7 @@ private[spark] class HadoopDelegationTokenManager(

private def scheduleRenewal(delay: Long): Unit = {
val _delay = math.max(0, delay)
logInfo(s"Scheduling login from keytab in ${UIUtils.formatDuration(delay)}.")
logInfo(s"Scheduling renewal in ${UIUtils.formatDuration(delay)}.")

val renewalTask = new Runnable() {
override def run(): Unit = {
Expand All @@ -203,6 +212,9 @@ private[spark] class HadoopDelegationTokenManager(
schedulerRef.send(UpdateDelegationTokens(tokens))
tokens
} catch {
case _: InterruptedException =>
// Ignore, may happen if shutting down.
null
case e: Exception =>
val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
logWarning(s"Failed to update tokens, will try again in ${UIUtils.formatDuration(delay)}!" +
Expand Down Expand Up @@ -236,11 +248,19 @@ private[spark] class HadoopDelegationTokenManager(
}

private def doLogin(): UserGroupInformation = {
logInfo(s"Attempting to login to KDC using principal: $principal")
require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
logInfo("Successfully logged into KDC.")
ugi
if (principal != null) {
logInfo(s"Attempting to login to KDC using principal: $principal")
require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
logInfo("Successfully logged into KDC.")
ugi
} else {
logInfo(s"Attempting to load user's ticket cache.")
val ccache = sparkConf.getenv("KRB5CCNAME")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if adding an additional optional configuration parameter with the path of the KRB5CC file could also be useful? Possibly more useful when using this in cluster mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how you'd use this in cluster mode; but the biggest issue is that the Hadoop libraries only use the env variable (which is also recognized by all kerberos tools). So we can't really add a Spark-specific option.

val user = Option(sparkConf.getenv("KRB5PRINCIPAL")).getOrElse(
Copy link
Contributor

@LucaCanali LucaCanali Jan 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to also check/use the value of spark.yarn.principal (or an ad-hoc config parameter if "reusing" this one is not OK) if provided by the user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.kerberos.principal is tightly coupled to keytabs. But the problem here is twofold: first, you're already logged in, so you can't define the principal at this point. Second, that env variable is the one used by Hadoop libraries.

UserGroupInformation.getCurrentUser().getUserName())
UserGroupInformation.getUGIFromTicketCache(ccache, user)
}
}

private def loadProviders(): Map[String, HadoopDelegationTokenProvider] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: () => Set[Fil
creds: Credentials): Credentials = {

filesystems.foreach { fs =>
logInfo("getting token for: " + fs)
logInfo(s"getting token for: $fs with renewer $renewer")
fs.addDelegationTokens(renewer, creds)
}

Expand All @@ -114,22 +114,22 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: () => Set[Fil
// We cannot use the tokens generated with renewer yarn. Trying to renew
// those will fail with an access control issue. So create new tokens with the logged in
// user as renewer.
sparkConf.get(PRINCIPAL).flatMap { renewer =>
val creds = new Credentials()
fetchDelegationTokens(renewer, filesystems, creds)

val renewIntervals = creds.getAllTokens.asScala.filter {
_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
}.flatMap { token =>
Try {
val newExpiration = token.renew(hadoopConf)
val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
val interval = newExpiration - identifier.getIssueDate
logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}")
interval
}.toOption
}
if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
val renewer = UserGroupInformation.getCurrentUser().getUserName()

val creds = new Credentials()
fetchDelegationTokens(renewer, filesystems, creds)

val renewIntervals = creds.getAllTokens.asScala.filter {
_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
}.flatMap { token =>
Try {
val newExpiration = token.renew(hadoopConf)
val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
val interval = newExpiration - identifier.getIssueDate
logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}")
interval
}.toOption
}
if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
}
}
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,16 @@ package object config {
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("1m")

private[spark] val KERBEROS_RENEWAL_CREDENTIALS =
ConfigBuilder("spark.kerberos.renewal.credentials")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering why an additional config needed and not just falling back to cache if keytab is not provided + the cluster is secure.

Copy link
Contributor Author

@vanzin vanzin Jan 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be a change in behavior. e.g in YARN that would increase the number of delegation tokens an app creates when it's run without a keytab.

Second, if your TGT is not renewable (kinit -l foo, instead of kinit -l foo -r bar), you'll get noisy errors in the output.

Also, a config opens the path for other ways of providing tokens (e.g. renewing based on a delegation token cache, which some k8s guys were interested in).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I see, it's fine.

A little bit offtopic here but I've considered to apply this to the kafka area. If no global JVM security config provided then use this config or something.

.doc(
"Which credentials to use when renewing delegation tokens for executors. Can be either " +
"'keytab', the default, which requires a keytab to be provided, or 'ccache', which uses " +
"the local credentials cache.")
.stringConf
.checkValues(Set("keytab", "ccache"))
.createWithDefault("keytab")

private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances")
.intConf
.createOptional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")

class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
class DriverEndpoint extends ThreadSafeRpcEndpoint with Logging {

override val rpcEnv: RpcEnv = CoarseGrainedSchedulerBackend.this.rpcEnv

// Executors that have been lost, but for which we don't yet know the real exit reason.
protected val executorsPendingLossReason = new HashSet[String]

protected val addressToExecutorId = new HashMap[RpcAddress, String]

// Spark configuration sent to executors. This is a lazy val so that subclasses of the
// scheduler can modify the SparkConf object before this view is created.
private lazy val sparkProperties = scheduler.sc.conf.getAll
.filter { case (k, _) => k.startsWith("spark.") }
.toSeq

override def onStart() {
// Periodically revive offers to allow delay scheduling to work
val reviveIntervalMs = conf.get(SCHEDULER_REVIVE_INTERVAL).getOrElse(1000L)
Expand Down Expand Up @@ -386,23 +393,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
}

var driverEndpoint: RpcEndpointRef = null
val driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint())

protected def minRegisteredRatio: Double = _minRegisteredRatio

override def start() {
val properties = new ArrayBuffer[(String, String)]
for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.")) {
properties += ((key, value))
}
}

// TODO (prashant) send conf instead of properties
driverEndpoint = createDriverEndpointRef(properties)

if (UserGroupInformation.isSecurityEnabled()) {
delegationTokenManager = createTokenManager(driverEndpoint)
delegationTokenManager = createTokenManager()
delegationTokenManager.foreach { dtm =>
val tokens = if (dtm.renewalEnabled) {
dtm.start()
Expand All @@ -412,20 +409,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
SparkHadoopUtil.get.serialize(creds)
}
if (tokens != null) {
delegationTokens.set(tokens)
updateDelegationTokens(tokens)
}
}
}
}

protected def createDriverEndpointRef(
properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
}

protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
new DriverEndpoint(rpcEnv, properties)
}
protected def createDriverEndpoint(): DriverEndpoint = new DriverEndpoint()

def stopExecutors() {
try {
Expand Down Expand Up @@ -715,12 +705,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* Create the delegation token manager to be used for the application. This method is called
* once during the start of the scheduler backend (so after the object has already been
* fully constructed), only if security is enabled in the Hadoop configuration.
*
* @param schedulerRef RPC endpoint for the scheduler, where updated delegation tokens should be
* sent.
*/
protected def createTokenManager(
schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = None
protected def createTokenManager(): Option[HadoopDelegationTokenManager] = None

/**
* Called when a new set of delegation tokens is sent to the driver. Child classes can override
Expand Down
32 changes: 24 additions & 8 deletions docs/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -776,16 +776,32 @@ The following options provides finer-grained control for this feature:
Long-running applications may run into issues if their run time exceeds the maximum delegation
token lifetime configured in services it needs to access.

Spark supports automatically creating new tokens for these applications when running in YARN mode.
Kerberos credentials need to be provided to the Spark application via the `spark-submit` command,
using the `--principal` and `--keytab` parameters.
This feature is not available everywhere. In particular, it's only implemented
on YARN and Kubernetes (both client and cluster modes), and on Mesos when using client mode.

The provided keytab will be copied over to the machine running the Application Master via the Hadoop
Distributed Cache. For this reason, it's strongly recommended that both YARN and HDFS be secured
with encryption, at least.
Spark supports automatically creating new tokens for these applications. There are two ways to
enable this functionality.

The Kerberos login will be periodically renewed using the provided credentials, and new delegation
tokens for supported will be created.
### Using a Keytab

By providing Spark with a principal and keytab (e.g. using `spark-submit` with `--principal`
and `--keytab` parameters), the application will maintain a valid Kerberos login that can be
used to retrieve delegation tokens indefinitely.

Note that when using a keytab in cluster mode, it will be copied over to the machine running the
Spark driver. In the case of YARN, this means using HDFS as a staging area for the keytab, so it's
strongly recommended that both YARN and HDFS be secured with encryption, at least.

### Using a ticket cache
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice improvement in this PR. I guess it is worth documenting it also on docs/running-on-yarn.md

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This document is linked from the YARN doc. No need to duplicate documentation.


By setting `spark.kerberos.renewal.credentials` to `ccache` in Spark's configuration, the local
Kerberos ticket cache will be used for authentication. Spark will keep the ticket renewed during its
renewable life, but after it expires a new ticket needs to be acquired (e.g. by running `kinit`).

It's up to the user to maintain an updated ticket cache that Spark can use.

The location of the ticket cache can be customized by setting the `KRB5CCNAME` environment
variable.

## Secure Interaction with Kubernetes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,17 +144,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
// Don't do anything else - let event handling from the Kubernetes API do the Spark changes
}

override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
new KubernetesDriverEndpoint(sc.env.rpcEnv, properties)
override def createDriverEndpoint(): DriverEndpoint = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Curly braces can be dropped

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't fit in the same line, so braces remain.

new KubernetesDriverEndpoint()
}

override protected def createTokenManager(
schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = {
Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, schedulerRef))
override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = {
Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint))
}

private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends DriverEndpoint(rpcEnv, sparkProperties) {
private class KubernetesDriverEndpoint extends DriverEndpoint {

override def onDisconnected(rpcAddress: RpcAddress): Unit = {
// Don't do anything besides disabling the executor - allow the Kubernetes API events to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,9 +772,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
}

override protected def createTokenManager(
schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = {
Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, schedulerRef))
override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = {
Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint))
}

private def numExecutors(): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,26 +296,21 @@ private[spark] class Client(
/**
* Set up security tokens for launching our ApplicationMaster container.
*
* This method will obtain delegation tokens from all the registered providers, and set them in
* the AM's launch context.
* In client mode, a set of credentials has been obtained by the scheduler, so they are copied
* and sent to the AM. In cluster mode, new credentials are obtained and then sent to the AM,
* along with whatever credentials the current user already has.
*/
private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf, null)
credentialManager.obtainDelegationTokens(credentials)

// When using a proxy user, copy the delegation tokens to the user's credentials. Avoid
// that for regular users, since in those case the user already has access to the TGT,
// and adding delegation tokens could lead to expired or cancelled tokens being used
// later, as reported in SPARK-15754.
val currentUser = UserGroupInformation.getCurrentUser()
if (SparkHadoopUtil.get.isProxyUser(currentUser)) {
currentUser.addCredentials(credentials)
val credentials = currentUser.getCredentials()

if (isClusterMode) {
val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf, null)
credentialManager.obtainDelegationTokens(credentials)
}

val dob = new DataOutputBuffer
credentials.writeTokenStorageToStream(dob)
amContainer.setTokens(ByteBuffer.wrap(dob.getData))
val serializedCreds = SparkHadoopUtil.get.serialize(credentials)
amContainer.setTokens(ByteBuffer.wrap(serializedCreds))
}

/** Get the application report from the ResourceManager for an application we have submitted. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ private[spark] class YarnClientSchedulerBackend(
* This waits until the application is running.
*/
override def start() {
super.start()

val driverHost = conf.get(config.DRIVER_HOST_ADDRESS)
val driverPort = conf.get(config.DRIVER_PORT)
val hostport = driverHost + ":" + driverPort
Expand All @@ -57,14 +59,12 @@ private[spark] class YarnClientSchedulerBackend(
client = new Client(args, conf)
bindToYarn(client.submitApplication(), None)

// SPARK-8687: Ensure all necessary properties have already been set before
// we initialize our driver scheduler backend, which serves these properties
// to the executors
super.start()
waitForApplication()

monitorThread = asyncMonitorApplication()
monitorThread.start()

startBindings()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.conf.YarnConfiguration

import org.apache.spark.SparkContext
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
Expand All @@ -35,6 +36,7 @@ private[spark] class YarnClusterSchedulerBackend(
bindToYarn(attemptId.getApplicationId(), Some(attemptId))
super.start()
totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(sc.conf)
startBindings()
}

override def getDriverLogUrls: Option[Map[String, String]] = {
Expand Down
Loading