From c5f58864b796c579eaed2ab735b1babd5479f9aa Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Wed, 16 May 2018 15:21:14 +0800 Subject: [PATCH 1/4] [SPARK-24292][SQL] Proxy user cannot connect to HiveMetastore in local mode --- .../apache/spark/deploy/SparkHadoopUtil.scala | 20 ++++++++++++++++ .../org/apache/spark/deploy/SparkSubmit.scala | 1 + .../HiveDelegationTokenProvider.scala | 24 +------------------ .../sql/hive/client/HiveClientImpl.scala | 5 +++- 4 files changed, 26 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 8353e64a619cf..2cd5444b151a0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException} +import java.lang.reflect.UndeclaredThrowableException import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} @@ -67,6 +68,25 @@ class SparkHadoopUtil extends Logging { }) } + /** + * Run some code as the real logged in user (which may differ from the current user, for + * example, when using proxying). + */ + def runAsRealUser[T](fn: => T): T = { + val currentUser = UserGroupInformation.getCurrentUser() + val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser) + + // For some reason the Scala-generated anonymous class ends up causing an + // UndeclaredThrowableException, even if you annotate the method with @throws. + try { + realUser.doAs(new PrivilegedExceptionAction[T]() { + override def run(): T = fn + }) + } catch { + case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e) + } + } + def createSparkUser(): UserGroupInformation = { val user = Utils.getCurrentUserName() logDebug("creating UGI for user: " + user) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 087e9c31a9c9a..ef0406172a5ac 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -146,6 +146,7 @@ private[spark] class SparkSubmit extends Logging { if (args.proxyUser != null) { val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, UserGroupInformation.getCurrentUser()) + proxyUser.addCredentials(UserGroupInformation.getCurrentUser.getCredentials) try { proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala index 7249eb85ac7c7..b46b13ce4d477 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala @@ -17,9 +17,6 @@ package org.apache.spark.deploy.security -import java.lang.reflect.UndeclaredThrowableException -import java.security.PrivilegedExceptionAction - import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration @@ -91,7 +88,7 @@ private[spark] class HiveDelegationTokenProvider logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} against " + s"$principal at $metastoreUri") - doAsRealUser { + SparkHadoopUtil.get.runAsRealUser { val hive = Hive.get(conf, classOf[HiveConf]) val tokenStr = hive.getDelegationToken(currentUser.getUserName(), principal) @@ -115,23 +112,4 @@ private[spark] class HiveDelegationTokenProvider } } } - - /** - * Run some code as the real logged in user (which may differ from the current user, for - * example, when using proxying). - */ - private def doAsRealUser[T](fn: => T): T = { - val currentUser = UserGroupInformation.getCurrentUser() - val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser) - - // For some reason the Scala-generated anonymous class ends up causing an - // UndeclaredThrowableException, even if you annotate the method with @throws. - try { - realUser.doAs(new PrivilegedExceptionAction[T]() { - override def run(): T = fn - }) - } catch { - case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e) - } - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index da9fe2d3088b4..de46723360972 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_AS import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics @@ -180,7 +181,9 @@ private[hive] class HiveClientImpl( if (clientLoader.cachedHive != null) { Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) } - SessionState.start(state) + SparkHadoopUtil.get.runAsRealUser { + SessionState.start(state) + } state.out = new PrintStream(outputBuffer, true, "UTF-8") state.err = new PrintStream(outputBuffer, true, "UTF-8") state From a56ab47aa2ae4e9192871af9cb58f3cea31039af Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Thu, 17 May 2018 16:04:22 +0800 Subject: [PATCH 2/4] revert SparkSubmit.scala --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index ef0406172a5ac..087e9c31a9c9a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -146,7 +146,6 @@ private[spark] class SparkSubmit extends Logging { if (args.proxyUser != null) { val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, UserGroupInformation.getCurrentUser()) - proxyUser.addCredentials(UserGroupInformation.getCurrentUser.getCredentials) try { proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { From 00439053e3cf66486988f49392cd0e6208424ad9 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Thu, 17 May 2018 17:27:00 +0800 Subject: [PATCH 3/4] Revert previous commit and fix via obtaining Delegation Token to credentials --- .../apache/spark/deploy/SparkHadoopUtil.scala | 20 ---------------- .../HiveDelegationTokenProvider.scala | 24 ++++++++++++++++++- .../sql/hive/thriftserver/SparkSQLEnv.scala | 12 ++++++++++ .../sql/hive/client/HiveClientImpl.scala | 5 +--- 4 files changed, 36 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 2cd5444b151a0..8353e64a619cf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException} -import java.lang.reflect.UndeclaredThrowableException import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} @@ -68,25 +67,6 @@ class SparkHadoopUtil extends Logging { }) } - /** - * Run some code as the real logged in user (which may differ from the current user, for - * example, when using proxying). - */ - def runAsRealUser[T](fn: => T): T = { - val currentUser = UserGroupInformation.getCurrentUser() - val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser) - - // For some reason the Scala-generated anonymous class ends up causing an - // UndeclaredThrowableException, even if you annotate the method with @throws. - try { - realUser.doAs(new PrivilegedExceptionAction[T]() { - override def run(): T = fn - }) - } catch { - case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e) - } - } - def createSparkUser(): UserGroupInformation = { val user = Utils.getCurrentUserName() logDebug("creating UGI for user: " + user) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala index b46b13ce4d477..7249eb85ac7c7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala @@ -17,6 +17,9 @@ package org.apache.spark.deploy.security +import java.lang.reflect.UndeclaredThrowableException +import java.security.PrivilegedExceptionAction + import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration @@ -88,7 +91,7 @@ private[spark] class HiveDelegationTokenProvider logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} against " + s"$principal at $metastoreUri") - SparkHadoopUtil.get.runAsRealUser { + doAsRealUser { val hive = Hive.get(conf, classOf[HiveConf]) val tokenStr = hive.getDelegationToken(currentUser.getUserName(), principal) @@ -112,4 +115,23 @@ private[spark] class HiveDelegationTokenProvider } } } + + /** + * Run some code as the real logged in user (which may differ from the current user, for + * example, when using proxying). + */ + private def doAsRealUser[T](fn: => T): T = { + val currentUser = UserGroupInformation.getCurrentUser() + val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser) + + // For some reason the Scala-generated anonymous class ends up causing an + // UndeclaredThrowableException, even if you annotate the method with @throws. + try { + realUser.doAs(new PrivilegedExceptionAction[T]() { + override def run(): T = fn + }) + } catch { + case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e) + } + } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 8980bcf885589..e6253ab409686 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.hive.thriftserver import java.io.PrintStream import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} @@ -49,6 +51,16 @@ private[hive] object SparkSQLEnv extends Logging { sparkContext = sparkSession.sparkContext sqlContext = sparkSession.sqlContext + val currentUser = UserGroupInformation.getCurrentUser + if (SparkHadoopUtil.get.isProxyUser(currentUser)) { + logInfo("Add credentials from token for proxy user") + val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf) + val credentials = currentUser.getCredentials + val tokenManager = new HadoopDelegationTokenManager(sparkConf, hadoopConf) + tokenManager.obtainDelegationTokens(hadoopConf, credentials) + currentUser.addCredentails(credentials) + } + val metadataHive = sparkSession .sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client metadataHive.setOut(new PrintStream(System.out, true, "UTF-8")) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index de46723360972..da9fe2d3088b4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -38,7 +38,6 @@ import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_AS import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics @@ -181,9 +180,7 @@ private[hive] class HiveClientImpl( if (clientLoader.cachedHive != null) { Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) } - SparkHadoopUtil.get.runAsRealUser { - SessionState.start(state) - } + SessionState.start(state) state.out = new PrintStream(outputBuffer, true, "UTF-8") state.err = new PrintStream(outputBuffer, true, "UTF-8") state From 0621a3aa2b094eb06495fdce4c7d2b7f60d5a422 Mon Sep 17 00:00:00 2001 From: Alan Jin Date: Thu, 17 May 2018 20:21:58 +0800 Subject: [PATCH 4/4] add missing import --- .../org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index e6253ab409686..4ec1de10ae9e8 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.thriftserver import java.io.PrintStream +import org.apache.hadoop.security.UserGroupInformation + import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.security.HadoopDelegationTokenManager @@ -57,7 +59,7 @@ private[hive] object SparkSQLEnv extends Logging { val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf) val credentials = currentUser.getCredentials val tokenManager = new HadoopDelegationTokenManager(sparkConf, hadoopConf) - tokenManager.obtainDelegationTokens(hadoopConf, credentials) + tokenManager.obtainDelegationTobtainDelegationTokensokens(hadoopConf, credentials) currentUser.addCredentails(credentials) }