From e6efb0601ec123c9fbf58812dcf2007f85149329 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 13 Sep 2019 17:01:13 -0700 Subject: [PATCH 1/4] [SPARK-29082][core] Skip delegation token generation if no credentials are available. This situation can happen when an external system (e.g. Oozie) generates delegation tokens for a Spark application. The Spark driver will then run against secured services, have proper credentials (the tokens), but no kerberos credentials. So trying to do things that requires a kerberos credential fails. Instead, if no kerberos credentials are detected, just skip the whole delegation token code. Tested with an application that simulates Oozie; fails before the fix, passes with the fix. Also with other DT-related tests to make sure other functionality keeps working. --- .../HadoopDelegationTokenManager.scala | 21 ++++++++++++------- .../CoarseGrainedSchedulerBackend.scala | 10 +++++---- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index 759d857d56e0e..862e3e7a6c593 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -140,13 +140,20 @@ private[spark] class HadoopDelegationTokenManager( * @param creds Credentials object where to store the delegation tokens. */ def obtainDelegationTokens(creds: Credentials): Unit = { - val freshUGI = doLogin() - freshUGI.doAs(new PrivilegedExceptionAction[Unit]() { - override def run(): Unit = { - val (newTokens, _) = obtainDelegationTokens() - creds.addAll(newTokens) - } - }) + val currentUser = UserGroupInformation.getCurrentUser() + val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser) + + // Delegation tokens can only be obtained if the real user has Kerberos credentials, so + // skip creation when it does not. + if (realUser.hasKerberosCredentials()) { + val freshUGI = doLogin() + freshUGI.doAs(new PrivilegedExceptionAction[Unit]() { + override def run(): Unit = { + val (newTokens, _) = obtainDelegationTokens() + creds.addAll(newTokens) + } + }) + } } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index d81070c362ba6..c9b408bed1163 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -427,12 +427,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val ugi = UserGroupInformation.getCurrentUser() val tokens = if (dtm.renewalEnabled) { dtm.start() - } else if (ugi.hasKerberosCredentials() || SparkHadoopUtil.get.isProxyUser(ugi)) { + } else { val creds = ugi.getCredentials() dtm.obtainDelegationTokens(creds) - SparkHadoopUtil.get.serialize(creds) - } else { - null + if (creds.numberOfTokens() > 0 || creds.numberOfSecretKeys() > 0) { + SparkHadoopUtil.get.serialize(creds) + } else { + null + } } if (tokens != null) { updateDelegationTokens(tokens) From 29b069dd0b506492ab3d84a402a95849bf5fba6a Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 16 Sep 2019 15:51:32 -0700 Subject: [PATCH 2/4] Fix Kafka unit test. Different scenario because the tests doesn't go through spark-submit, which logs in the user with the keytab. Just need to tweak the check so that a keytab is considered as "having kerberos creds". --- .../deploy/security/HadoopDelegationTokenManager.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index 862e3e7a6c593..f769ce468e49c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -141,11 +141,12 @@ private[spark] class HadoopDelegationTokenManager( */ def obtainDelegationTokens(creds: Credentials): Unit = { val currentUser = UserGroupInformation.getCurrentUser() - val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser) + val hasKerberosCreds = principal != null || + Option(currentUser.getRealUser()).getOrElse(currentUser).hasKerberosCredentials() // Delegation tokens can only be obtained if the real user has Kerberos credentials, so - // skip creation when it does not. - if (realUser.hasKerberosCredentials()) { + // skip creation when those are not available. + if (hasKerberosCreds) { val freshUGI = doLogin() freshUGI.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { From c46d76d45c94a9c09fa3069c9c59b4338e64b402 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 17 Sep 2019 10:24:47 -0700 Subject: [PATCH 3/4] Add a unit test. --- .../HadoopDelegationTokenManagerSuite.scala | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala index 70174f7ff939a..7125c9f0621f7 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala @@ -17,10 +17,14 @@ package org.apache.spark.deploy.security +import java.security.PrivilegedExceptionAction + import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.security.Credentials +import org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.security.HadoopDelegationTokenProvider private class ExceptionThrowingDelegationTokenProvider extends HadoopDelegationTokenProvider { @@ -69,4 +73,37 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite { assert(!manager.isProviderLoaded("hadoopfs")) assert(manager.isProviderLoaded("hbase")) } + + test("do not fail if current user does not have credentials") { + // SparkHadoopUtil overrides the UGI configuration during initialization. That normally + // happens early in the Spark application, but here it may affect the test depending on + // how it's run, so force its initialization. + SparkHadoopUtil.get + + val krbConf = new Configuration() + krbConf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos") + + UserGroupInformation.setConfiguration(krbConf) + try { + val manager = new HadoopDelegationTokenManager(new SparkConf(false), krbConf, null) + val testImpl = new PrivilegedExceptionAction[Unit] { + override def run(): Unit = { + assert(UserGroupInformation.isSecurityEnabled()) + val creds = new Credentials() + manager.obtainDelegationTokens(creds) + assert(creds.numberOfTokens() === 0) + assert(creds.numberOfSecretKeys() === 0) + } + } + + val realUser = UserGroupInformation.createUserForTesting("realUser", Array.empty) + realUser.doAs(testImpl) + + val proxyUser = UserGroupInformation.createProxyUserForTesting("proxyUser", realUser, + Array.empty) + proxyUser.doAs(testImpl) + } finally { + UserGroupInformation.reset() + } + } } From c15023855673120bd073c2c1b0167727e34c6f53 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 17 Sep 2019 11:26:16 -0700 Subject: [PATCH 4/4] Add bug ref. --- .../deploy/security/HadoopDelegationTokenManagerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala index 7125c9f0621f7..bf53386da6304 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala @@ -74,7 +74,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite { assert(manager.isProviderLoaded("hbase")) } - test("do not fail if current user does not have credentials") { + test("SPARK-29082: do not fail if current user does not have credentials") { // SparkHadoopUtil overrides the UGI configuration during initialization. That normally // happens early in the Spark application, but here it may affect the test depending on // how it's run, so force its initialization.