Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,21 @@ private[spark] class HadoopDelegationTokenManager(
* @param creds Credentials object where to store the delegation tokens.
*/
def obtainDelegationTokens(creds: Credentials): Unit = {
val freshUGI = doLogin()
freshUGI.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
val (newTokens, _) = obtainDelegationTokens()
creds.addAll(newTokens)
}
})
val currentUser = UserGroupInformation.getCurrentUser()
val hasKerberosCreds = principal != null ||
Option(currentUser.getRealUser()).getOrElse(currentUser).hasKerberosCredentials()

// Delegation tokens can only be obtained if the real user has Kerberos credentials, so
// skip creation when those are not available.
if (hasKerberosCreds) {
val freshUGI = doLogin()
freshUGI.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
val (newTokens, _) = obtainDelegationTokens()
creds.addAll(newTokens)
}
})
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,12 +427,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
val ugi = UserGroupInformation.getCurrentUser()
val tokens = if (dtm.renewalEnabled) {
dtm.start()
} else if (ugi.hasKerberosCredentials() || SparkHadoopUtil.get.isProxyUser(ugi)) {
} else {
val creds = ugi.getCredentials()
dtm.obtainDelegationTokens(creds)
SparkHadoopUtil.get.serialize(creds)
} else {
null
if (creds.numberOfTokens() > 0 || creds.numberOfSecretKeys() > 0) {
SparkHadoopUtil.get.serialize(creds)
} else {
null
}
}
if (tokens != null) {
updateDelegationTokens(tokens)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package org.apache.spark.deploy.security

import java.security.PrivilegedExceptionAction

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.security.HadoopDelegationTokenProvider

private class ExceptionThrowingDelegationTokenProvider extends HadoopDelegationTokenProvider {
Expand Down Expand Up @@ -69,4 +73,37 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite {
assert(!manager.isProviderLoaded("hadoopfs"))
assert(manager.isProviderLoaded("hbase"))
}

test("SPARK-29082: do not fail if current user does not have credentials") {
// SparkHadoopUtil overrides the UGI configuration during initialization. That normally
// happens early in the Spark application, but here it may affect the test depending on
// how it's run, so force its initialization.
SparkHadoopUtil.get

val krbConf = new Configuration()
krbConf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos")

UserGroupInformation.setConfiguration(krbConf)
try {
val manager = new HadoopDelegationTokenManager(new SparkConf(false), krbConf, null)
val testImpl = new PrivilegedExceptionAction[Unit] {
override def run(): Unit = {
assert(UserGroupInformation.isSecurityEnabled())
val creds = new Credentials()
manager.obtainDelegationTokens(creds)
assert(creds.numberOfTokens() === 0)
assert(creds.numberOfSecretKeys() === 0)
}
}

val realUser = UserGroupInformation.createUserForTesting("realUser", Array.empty)
realUser.doAs(testImpl)

val proxyUser = UserGroupInformation.createProxyUserForTesting("proxyUser", realUser,
Array.empty)
proxyUser.doAs(testImpl)
} finally {
UserGroupInformation.reset()
}
}
}