From 2011ef219934e6b2e09da3935ca4a4658698cac8 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 9 Mar 2018 18:25:00 +0800 Subject: [PATCH 1/6] obtain token before init metastore client --- .../apache/spark/deploy/SparkHadoopUtil.scala | 20 +++++++++++ .../HiveDelegationTokenProvider.scala | 24 +------------ .../hive/thriftserver/SparkSQLCLIDriver.scala | 34 ++++++++++++++++++- 3 files changed, 54 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index e14f9845e6db6..744fdf150b705 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException} +import java.lang.reflect.UndeclaredThrowableException import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} @@ -417,6 +418,25 @@ class SparkHadoopUtil extends Logging { ugi.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY } + /** + * Run some code as the real logged in user (which may differ from the current user, for + * example, when using proxying). + */ + private[spark] def doAsRealUser[T](fn: => T): T = { + val currentUser = UserGroupInformation.getCurrentUser + val realUser = Option(currentUser.getRealUser).getOrElse(currentUser) + + // For some reason the Scala-generated anonymous class ends up causing an + // UndeclaredThrowableException, even if you annotate the method with @throws. + try { + realUser.doAs(new PrivilegedExceptionAction[T]() { + override def run(): T = fn + }) + } catch { + case e: UndeclaredThrowableException => throw Option(e.getCause).getOrElse(e) + } + } + } object SparkHadoopUtil { diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala index ece5ce79c650d..d660ece7de72f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala @@ -17,9 +17,6 @@ package org.apache.spark.deploy.security -import java.lang.reflect.UndeclaredThrowableException -import java.security.PrivilegedExceptionAction - import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration @@ -91,7 +88,7 @@ private[security] class HiveDelegationTokenProvider logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} against " + s"$principal at $metastoreUri") - doAsRealUser { + SparkHadoopUtil.get.doAsRealUser { val hive = Hive.get(conf, classOf[HiveConf]) val tokenStr = hive.getDelegationToken(currentUser.getUserName(), principal) @@ -115,23 +112,4 @@ private[security] class HiveDelegationTokenProvider } } } - - /** - * Run some code as the real logged in user (which may differ from the current user, for - * example, when using proxying). - */ - private def doAsRealUser[T](fn: => T): T = { - val currentUser = UserGroupInformation.getCurrentUser() - val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser) - - // For some reason the Scala-generated anonymous class ends up causing an - // UndeclaredThrowableException, even if you annotate the method with @throws. - try { - realUser.doAs(new PrivilegedExceptionAction[T]() { - override def run(): T = fn - }) - } catch { - case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e) - } - } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 832a15d09599f..8cc68fd6fd0a1 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -21,19 +21,26 @@ import java.io._ import java.util.{ArrayList => JArrayList, Locale} import scala.collection.JavaConverters._ +import scala.util.control.NonFatal import jline.console.ConsoleReader import jline.console.history.FileHistory import org.apache.commons.lang3.StringUtils import org.apache.commons.logging.LogFactory import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier import org.apache.hadoop.hive.cli.{CliDriver, CliSessionState, OptionsProcessor} import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils} import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.exec.Utilities +import org.apache.hadoop.hive.ql.metadata.Hive import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.token.Token import org.apache.log4j.{Level, Logger} import org.apache.thrift.transport.TSocket @@ -42,7 +49,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.hive.HiveUtils -import org.apache.spark.util.ShutdownHookManager +import org.apache.spark.util.{ShutdownHookManager, Utils} /** * This code doesn't support remote connections in Hive 1.2+, as the underlying CliDriver @@ -77,6 +84,12 @@ private[hive] object SparkSQLCLIDriver extends Logging { }) } + private def isSecuredAndProxy(hiveConf: HiveConf): Boolean = { + UserGroupInformation.isSecurityEnabled && + hiveConf.getTrimmed(ConfVars.METASTOREURIS.varname, "").nonEmpty && + SparkHadoopUtil.get.isProxyUser(UserGroupInformation.getCurrentUser) + } + def main(args: Array[String]) { val oproc = new OptionsProcessor() if (!oproc.process_stage1(args)) { @@ -121,6 +134,25 @@ private[hive] object SparkSQLCLIDriver extends Logging { } } + if (isSecuredAndProxy(conf)) { + val currentUser = UserGroupInformation.getCurrentUser + try { + SparkHadoopUtil.get.doAsRealUser { + val tokenStr = Hive.get(conf, classOf[HiveConf]) + .getDelegationToken(currentUser.getShortUserName, currentUser.getRealUser.getUserName) + val token = new Token[DelegationTokenIdentifier]() + token.decodeFromUrlString(tokenStr) + currentUser.addToken(new Text("hive.metastore.delegation.token"), token) + } + } catch { + case NonFatal(_) => + case e: NoClassDefFoundError => + logWarning(e.getMessage) + } finally { + Utils.tryLogNonFatalError(Hive.closeCurrent()) + } + } + SessionState.start(sessionState) // Clean up after we exit From 40638552d0e252b62ac0c6d97376f0d2e66730e6 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 27 Mar 2018 10:06:16 +0800 Subject: [PATCH 2/6] call hive delegation token provider --- .../apache/spark/deploy/SparkHadoopUtil.scala | 21 --------------- .../HiveDelegationTokenProvider.scala | 26 +++++++++++++++++-- .../hive/thriftserver/SparkSQLCLIDriver.scala | 22 ++++------------ 3 files changed, 29 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 744fdf150b705..39b9b19a2d3df 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException} -import java.lang.reflect.UndeclaredThrowableException import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} @@ -417,26 +416,6 @@ class SparkHadoopUtil extends Logging { def isProxyUser(ugi: UserGroupInformation): Boolean = { ugi.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY } - - /** - * Run some code as the real logged in user (which may differ from the current user, for - * example, when using proxying). - */ - private[spark] def doAsRealUser[T](fn: => T): T = { - val currentUser = UserGroupInformation.getCurrentUser - val realUser = Option(currentUser.getRealUser).getOrElse(currentUser) - - // For some reason the Scala-generated anonymous class ends up causing an - // UndeclaredThrowableException, even if you annotate the method with @throws. - try { - realUser.doAs(new PrivilegedExceptionAction[T]() { - override def run(): T = fn - }) - } catch { - case e: UndeclaredThrowableException => throw Option(e.getCause).getOrElse(e) - } - } - } object SparkHadoopUtil { diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala index d660ece7de72f..7249eb85ac7c7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala @@ -17,6 +17,9 @@ package org.apache.spark.deploy.security +import java.lang.reflect.UndeclaredThrowableException +import java.security.PrivilegedExceptionAction + import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration @@ -33,7 +36,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.KEYTAB import org.apache.spark.util.Utils -private[security] class HiveDelegationTokenProvider +private[spark] class HiveDelegationTokenProvider extends HadoopDelegationTokenProvider with Logging { override def serviceName: String = "hive" @@ -88,7 +91,7 @@ private[security] class HiveDelegationTokenProvider logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} against " + s"$principal at $metastoreUri") - SparkHadoopUtil.get.doAsRealUser { + doAsRealUser { val hive = Hive.get(conf, classOf[HiveConf]) val tokenStr = hive.getDelegationToken(currentUser.getUserName(), principal) @@ -112,4 +115,23 @@ private[security] class HiveDelegationTokenProvider } } } + + /** + * Run some code as the real logged in user (which may differ from the current user, for + * example, when using proxying). + */ + private def doAsRealUser[T](fn: => T): T = { + val currentUser = UserGroupInformation.getCurrentUser() + val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser) + + // For some reason the Scala-generated anonymous class ends up causing an + // UndeclaredThrowableException, even if you annotate the method with @throws. + try { + realUser.doAs(new PrivilegedExceptionAction[T]() { + override def run(): T = fn + }) + } catch { + case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e) + } + } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 8cc68fd6fd0a1..e41d3dd24b615 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive.thriftserver import java.io._ -import java.util.{ArrayList => JArrayList, Locale} +import java.util.{Locale, ArrayList => JArrayList} import scala.collection.JavaConverters._ import scala.util.control.NonFatal @@ -43,9 +43,10 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.security.token.Token import org.apache.log4j.{Level, Logger} import org.apache.thrift.transport.TSocket - import org.apache.spark.SparkConf + import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HiveDelegationTokenProvider import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.hive.HiveUtils @@ -136,21 +137,8 @@ private[hive] object SparkSQLCLIDriver extends Logging { if (isSecuredAndProxy(conf)) { val currentUser = UserGroupInformation.getCurrentUser - try { - SparkHadoopUtil.get.doAsRealUser { - val tokenStr = Hive.get(conf, classOf[HiveConf]) - .getDelegationToken(currentUser.getShortUserName, currentUser.getRealUser.getUserName) - val token = new Token[DelegationTokenIdentifier]() - token.decodeFromUrlString(tokenStr) - currentUser.addToken(new Text("hive.metastore.delegation.token"), token) - } - } catch { - case NonFatal(_) => - case e: NoClassDefFoundError => - logWarning(e.getMessage) - } finally { - Utils.tryLogNonFatalError(Hive.closeCurrent()) - } + val tokenProvider = new HiveDelegationTokenProvider() + tokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, currentUser.getCredentials) } SessionState.start(sessionState) From ba6b0860d808f276537da9ee4e8e0800a8f42bbd Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 27 Mar 2018 10:08:59 +0800 Subject: [PATCH 3/6] import --- .../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index e41d3dd24b615..63f5f02954dc7 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -18,39 +18,34 @@ package org.apache.spark.sql.hive.thriftserver import java.io._ -import java.util.{Locale, ArrayList => JArrayList} +import java.util.{ArrayList => JArrayList, Locale} import scala.collection.JavaConverters._ -import scala.util.control.NonFatal import jline.console.ConsoleReader import jline.console.history.FileHistory import org.apache.commons.lang3.StringUtils import org.apache.commons.logging.LogFactory import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier import org.apache.hadoop.hive.cli.{CliDriver, CliSessionState, OptionsProcessor} import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.exec.Utilities -import org.apache.hadoop.hive.ql.metadata.Hive import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.hadoop.io.Text import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.security.token.Token import org.apache.log4j.{Level, Logger} import org.apache.thrift.transport.TSocket -import org.apache.spark.SparkConf +import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.security.HiveDelegationTokenProvider import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.hive.HiveUtils -import org.apache.spark.util.{ShutdownHookManager, Utils} +import org.apache.spark.util.ShutdownHookManager /** * This code doesn't support remote connections in Hive 1.2+, as the underlying CliDriver From 5c4335b28fc94a406ca52f60b549e3931668de08 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 27 Mar 2018 10:14:08 +0800 Subject: [PATCH 4/6] unrelated white line --- .../src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 39b9b19a2d3df..e14f9845e6db6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -416,6 +416,7 @@ class SparkHadoopUtil extends Logging { def isProxyUser(ugi: UserGroupInformation): Boolean = { ugi.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY } + } object SparkHadoopUtil { From 1e38840c6611373430c8c6bfc50ef2f09beeaf2d Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 27 Mar 2018 11:27:49 +0800 Subject: [PATCH 5/6] check via hive delegation token provider itself --- .../hive/thriftserver/SparkSQLCLIDriver.scala | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 63f5f02954dc7..9fa497157bc3e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -30,7 +30,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.cli.{CliDriver, CliSessionState, OptionsProcessor} import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils} import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.processors._ @@ -80,12 +79,6 @@ private[hive] object SparkSQLCLIDriver extends Logging { }) } - private def isSecuredAndProxy(hiveConf: HiveConf): Boolean = { - UserGroupInformation.isSecurityEnabled && - hiveConf.getTrimmed(ConfVars.METASTOREURIS.varname, "").nonEmpty && - SparkHadoopUtil.get.isProxyUser(UserGroupInformation.getCurrentUser) - } - def main(args: Array[String]) { val oproc = new OptionsProcessor() if (!oproc.process_stage1(args)) { @@ -130,11 +123,10 @@ private[hive] object SparkSQLCLIDriver extends Logging { } } - if (isSecuredAndProxy(conf)) { - val currentUser = UserGroupInformation.getCurrentUser - val tokenProvider = new HiveDelegationTokenProvider() - tokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, currentUser.getCredentials) - } + Option(new HiveDelegationTokenProvider) + .filter(_.delegationTokensRequired(sparkConf, hadoopConf)) + .foreach(_.obtainDelegationTokens( + hadoopConf, sparkConf, UserGroupInformation.getCurrentUser.getCredentials)) SessionState.start(sessionState) From cd8056c3ad40afc08ac251a7ce502626fb9dd3c4 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 28 Mar 2018 09:54:33 +0800 Subject: [PATCH 6/6] option to if else --- .../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 9fa497157bc3e..084f8200102ba 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.log4j.{Level, Logger} import org.apache.thrift.transport.TSocket @@ -123,10 +123,12 @@ private[hive] object SparkSQLCLIDriver extends Logging { } } - Option(new HiveDelegationTokenProvider) - .filter(_.delegationTokensRequired(sparkConf, hadoopConf)) - .foreach(_.obtainDelegationTokens( - hadoopConf, sparkConf, UserGroupInformation.getCurrentUser.getCredentials)) + val tokenProvider = new HiveDelegationTokenProvider() + if (tokenProvider.delegationTokensRequired(sparkConf, hadoopConf)) { + val credentials = new Credentials() + tokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, credentials) + UserGroupInformation.getCurrentUser.addCredentials(credentials) + } SessionState.start(sessionState)