diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index 759d857d56e0e..f769ce468e49c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -140,13 +140,21 @@ private[spark] class HadoopDelegationTokenManager( * @param creds Credentials object where to store the delegation tokens. */ def obtainDelegationTokens(creds: Credentials): Unit = { - val freshUGI = doLogin() - freshUGI.doAs(new PrivilegedExceptionAction[Unit]() { - override def run(): Unit = { - val (newTokens, _) = obtainDelegationTokens() - creds.addAll(newTokens) - } - }) + val currentUser = UserGroupInformation.getCurrentUser() + val hasKerberosCreds = principal != null || + Option(currentUser.getRealUser()).getOrElse(currentUser).hasKerberosCredentials() + + // Delegation tokens can only be obtained if the real user has Kerberos credentials, so + // skip creation when those are not available. + if (hasKerberosCreds) { + val freshUGI = doLogin() + freshUGI.doAs(new PrivilegedExceptionAction[Unit]() { + override def run(): Unit = { + val (newTokens, _) = obtainDelegationTokens() + creds.addAll(newTokens) + } + }) + } } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index d81070c362ba6..c9b408bed1163 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -427,12 +427,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val ugi = UserGroupInformation.getCurrentUser() val tokens = if (dtm.renewalEnabled) { dtm.start() - } else if (ugi.hasKerberosCredentials() || SparkHadoopUtil.get.isProxyUser(ugi)) { + } else { val creds = ugi.getCredentials() dtm.obtainDelegationTokens(creds) - SparkHadoopUtil.get.serialize(creds) - } else { - null + if (creds.numberOfTokens() > 0 || creds.numberOfSecretKeys() > 0) { + SparkHadoopUtil.get.serialize(creds) + } else { + null + } } if (tokens != null) { updateDelegationTokens(tokens) diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala index 70174f7ff939a..bf53386da6304 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala @@ -17,10 +17,14 @@ package org.apache.spark.deploy.security +import java.security.PrivilegedExceptionAction + import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.security.Credentials +import org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.security.HadoopDelegationTokenProvider private class ExceptionThrowingDelegationTokenProvider extends HadoopDelegationTokenProvider { @@ -69,4 +73,37 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite { assert(!manager.isProviderLoaded("hadoopfs")) assert(manager.isProviderLoaded("hbase")) } + + test("SPARK-29082: do not fail if current user does not have credentials") { + // SparkHadoopUtil overrides the UGI configuration during initialization. That normally + // happens early in the Spark application, but here it may affect the test depending on + // how it's run, so force its initialization. + SparkHadoopUtil.get + + val krbConf = new Configuration() + krbConf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos") + + UserGroupInformation.setConfiguration(krbConf) + try { + val manager = new HadoopDelegationTokenManager(new SparkConf(false), krbConf, null) + val testImpl = new PrivilegedExceptionAction[Unit] { + override def run(): Unit = { + assert(UserGroupInformation.isSecurityEnabled()) + val creds = new Credentials() + manager.obtainDelegationTokens(creds) + assert(creds.numberOfTokens() === 0) + assert(creds.numberOfSecretKeys() === 0) + } + } + + val realUser = UserGroupInformation.createUserForTesting("realUser", Array.empty) + realUser.doAs(testImpl) + + val proxyUser = UserGroupInformation.createProxyUserForTesting("proxyUser", realUser, + Array.empty) + proxyUser.doAs(testImpl) + } finally { + UserGroupInformation.reset() + } + } }