From d847e673a6fdf819009d8bc1259ce5a1fe1bf16e Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 28 Aug 2017 17:25:17 +0800 Subject: [PATCH 1/6] SPARK-21428-FOLLOWUP: CliSessionState should respect warehouse dir determined in SharedState --- .../org/apache/spark/sql/hive/HiveUtils.scala | 2 +- .../spark/sql/hive/client/HiveClientImpl.scala | 14 ++++++++++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 561c127a40bb6..b01cbcae78ab6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -395,7 +395,7 @@ private[spark] object HiveUtils extends Logging { propMap.put(confvar.varname, confvar.getDefaultExpr()) } } - propMap.put(WAREHOUSE_PATH.key, localMetastore.toURI.toString) + if (useInMemoryDerby) propMap.put(WAREHOUSE_PATH.key, localMetastore.toURI.toString) propMap.put(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, s"jdbc:derby:${withInMemoryMode};databaseName=${localMetastore.getAbsolutePath};create=true") propMap.put("datanucleus.rdbms.datastoreAdapterClassName", diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 426db6a4e1c12..c39122f7e7ca4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order} import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor} @@ -52,6 +53,7 @@ import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.client.HiveClientImpl._ +import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH import org.apache.spark.sql.types._ import org.apache.spark.util.{CircularBuffer, Utils} @@ -132,14 +134,22 @@ private[hive] class HiveClientImpl( // in hive jars, which will turn off isolation, if SessionSate.detachSession is // called to remove the current state after that, hive client created later will initialize // its own state by newState() - Option(SessionState.get).getOrElse(newState()) + val ret = SessionState.get + if (ret != null) { + if (sparkConf.contains(WAREHOUSE_PATH.key)) { + ret.getConf.setVar(ConfVars.METASTOREWAREHOUSE, sparkConf.get(WAREHOUSE_PATH.key)) + } + ret + } else { + newState() + } } } // Log the default warehouse location. logInfo( s"Warehouse location for Hive client " + - s"(version ${version.fullVersion}) is ${conf.get("hive.metastore.warehouse.dir")}") + s"(version ${version.fullVersion}) is ${conf.getVar(ConfVars.METASTOREWAREHOUSE)}") private def newState(): SessionState = { val hiveConf = new HiveConf(classOf[SessionState]) From dc996122513321123591c2929fa27977f46104e6 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 28 Aug 2017 22:15:41 +0800 Subject: [PATCH 2/6] hiveclient doesn't repect hive-site.xml in spark-sql --- .../hive/thriftserver/SparkSQLCLIDriver.scala | 6 +-- .../org/apache/spark/sql/hive/HiveUtils.scala | 37 ++++++++++++++++++- .../sql/hive/client/HiveClientImpl.scala | 31 ++-------------- 3 files changed, 40 insertions(+), 34 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 761e832ed14b8..135aef356cac5 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -81,11 +81,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { System.exit(1) } - val cliConf = new HiveConf(classOf[SessionState]) - // Override the location of the metastore since this is only used for local execution. - HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach { - case (key, value) => cliConf.set(key, value) - } + val cliConf = HiveUtils.newHiveConfigurations()() val sessionState = new CliSessionState(cliConf) sessionState.in = System.in diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index b01cbcae78ab6..a57a9753c78ea 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -231,6 +231,41 @@ private[spark] object HiveUtils extends Logging { }.toMap } + private[hive] def newHiveConfigurations( + sparkConf: SparkConf = new SparkConf(loadDefaults = true), + extraConfig: Map[String, String] = Map.empty, + classLoader: ClassLoader = null)( + hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration(sparkConf)): HiveConf = { + val hiveConf = new HiveConf(classOf[SessionState]) + // HiveConf is a Hadoop Configuration, which has a field of classLoader and + // the initial value will be the current thread's context class loader + // (i.e. initClassLoader at here). + // We call initialConf.setClassLoader(initClassLoader) at here to make + // this action explicit. + if (classLoader != null) { + hiveConf.setClassLoader(classLoader) + } + // 1: Take all from the hadoopConf to this hiveConf. + // This hadoopConf contains user settings in Hadoop's core-site.xml file + // and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in + // SharedState and put settings in this hadoopConf instead of relying on HiveConf + // to load user settings. Otherwise, HiveConf's initialize method will override + // settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars + // is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath + // has hive-site.xml. So, HiveConf will use that to override its default values. + // 2: we set all spark confs to this hiveConf. + // 3: we set all entries in config to this hiveConf. + (hiveClientConfigurations(hadoopConf) ++ sparkConf.getAll.toMap ++ extraConfig).foreach { + case (k, v) => + logDebug( + s""" + |Applying Hadoop/Hive/Spark and extra properties to Hive Conf: + |$k=${if (k.toLowerCase(Locale.ROOT).contains("password")) "xxx" else v} + """.stripMargin) + hiveConf.set(k, v) + } + hiveConf + } /** * Check current Thread's SessionState type * @return true when SessionState.get returns an instance of CliSessionState, @@ -395,7 +430,7 @@ private[spark] object HiveUtils extends Logging { propMap.put(confvar.varname, confvar.getDefaultExpr()) } } - if (useInMemoryDerby) propMap.put(WAREHOUSE_PATH.key, localMetastore.toURI.toString) + propMap.put(WAREHOUSE_PATH.key, localMetastore.toURI.toString) propMap.put(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, s"jdbc:derby:${withInMemoryMode};databaseName=${localMetastore.getAbsolutePath};create=true") propMap.put("datanucleus.rdbms.datastoreAdapterClassName", diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index c39122f7e7ca4..efcc3ae262a04 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -51,7 +51,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.hive.HiveExternalCatalog +import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} import org.apache.spark.sql.hive.client.HiveClientImpl._ import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH import org.apache.spark.sql.types._ @@ -152,33 +152,8 @@ private[hive] class HiveClientImpl( s"(version ${version.fullVersion}) is ${conf.getVar(ConfVars.METASTOREWAREHOUSE)}") private def newState(): SessionState = { - val hiveConf = new HiveConf(classOf[SessionState]) - // HiveConf is a Hadoop Configuration, which has a field of classLoader and - // the initial value will be the current thread's context class loader - // (i.e. initClassLoader at here). - // We call initialConf.setClassLoader(initClassLoader) at here to make - // this action explicit. - hiveConf.setClassLoader(initClassLoader) - - // 1: Take all from the hadoopConf to this hiveConf. - // This hadoopConf contains user settings in Hadoop's core-site.xml file - // and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in - // SharedState and put settings in this hadoopConf instead of relying on HiveConf - // to load user settings. Otherwise, HiveConf's initialize method will override - // settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars - // is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath - // has hive-site.xml. So, HiveConf will use that to override its default values. - // 2: we set all spark confs to this hiveConf. - // 3: we set all entries in config to this hiveConf. - (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) - ++ sparkConf.getAll.toMap ++ extraConfig).foreach { case (k, v) => - logDebug( - s""" - |Applying Hadoop/Hive/Spark and extra properties to Hive Conf: - |$k=${if (k.toLowerCase(Locale.ROOT).contains("password")) "xxx" else v} - """.stripMargin) - hiveConf.set(k, v) - } + val hiveConf = + HiveUtils.newHiveConfigurations(sparkConf, extraConfig, initClassLoader)(hadoopConf) val state = new SessionState(hiveConf) if (clientLoader.cachedHive != null) { Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) From eef8f5819ba21b5cc37b8ed84191faae34b363c5 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 28 Aug 2017 23:31:41 +0800 Subject: [PATCH 3/6] extract dir from hadoop conf --- .../spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala | 2 +- .../main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 7 ++++--- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 6 +++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 135aef356cac5..a13d33ed4d037 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -81,7 +81,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { System.exit(1) } - val cliConf = HiveUtils.newHiveConfigurations()() + val cliConf = HiveUtils.newHiveConfigurations()()() val sessionState = new CliSessionState(cliConf) sessionState.in = System.in diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index a57a9753c78ea..19e1aebc602a1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -233,9 +233,9 @@ private[spark] object HiveUtils extends Logging { private[hive] def newHiveConfigurations( sparkConf: SparkConf = new SparkConf(loadDefaults = true), - extraConfig: Map[String, String] = Map.empty, classLoader: ClassLoader = null)( - hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration(sparkConf)): HiveConf = { + hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration(sparkConf))( + extraConfig: Map[String, String] = hiveClientConfigurations(hadoopConf)): HiveConf = { val hiveConf = new HiveConf(classOf[SessionState]) // HiveConf is a Hadoop Configuration, which has a field of classLoader and // the initial value will be the current thread's context class loader @@ -255,7 +255,8 @@ private[spark] object HiveUtils extends Logging { // has hive-site.xml. So, HiveConf will use that to override its default values. // 2: we set all spark confs to this hiveConf. // 3: we set all entries in config to this hiveConf. - (hiveClientConfigurations(hadoopConf) ++ sparkConf.getAll.toMap ++ extraConfig).foreach { + (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) + ++ sparkConf.getAll.toMap ++ extraConfig).foreach { case (k, v) => logDebug( s""" diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index efcc3ae262a04..85dd260eddd52 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -136,8 +136,8 @@ private[hive] class HiveClientImpl( // its own state by newState() val ret = SessionState.get if (ret != null) { - if (sparkConf.contains(WAREHOUSE_PATH.key)) { - ret.getConf.setVar(ConfVars.METASTOREWAREHOUSE, sparkConf.get(WAREHOUSE_PATH.key)) + Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname)).foreach { dir => + ret.getConf.setVar(ConfVars.METASTOREWAREHOUSE, dir) } ret } else { @@ -153,7 +153,7 @@ private[hive] class HiveClientImpl( private def newState(): SessionState = { val hiveConf = - HiveUtils.newHiveConfigurations(sparkConf, extraConfig, initClassLoader)(hadoopConf) + HiveUtils.newHiveConfigurations(sparkConf, initClassLoader)(hadoopConf)(extraConfig) val state = new SessionState(hiveConf) if (clientLoader.cachedHive != null) { Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) From 779b0f8093281ac216a319dbb762defd8c27166b Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 13 Sep 2017 17:37:07 +0800 Subject: [PATCH 4/6] comments added --- .../org/apache/spark/sql/hive/HiveUtils.scala | 22 ++++++++++++++----- .../sql/hive/client/HiveClientImpl.scala | 2 ++ .../sql/hive/client/HiveVersionSuite.scala | 2 +- .../spark/sql/hive/client/VersionsSuite.scala | 2 +- 4 files changed, 21 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 19e1aebc602a1..ae2c78c255c19 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -176,9 +176,9 @@ private[spark] object HiveUtils extends Logging { } /** - * Configurations needed to create a [[HiveClient]]. + * Change time configurations needed to create a [[HiveClient]] into unified [[Long]] format. */ - private[hive] def hiveClientConfigurations(hadoopConf: Configuration): Map[String, String] = { + private[hive] def formatTimeVarsForHiveClient(hadoopConf: Configuration): Map[String, String] = { // Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch // of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards- // compatibility when users are trying to connecting to a Hive metastore of lower version, @@ -231,11 +231,23 @@ private[spark] object HiveUtils extends Logging { }.toMap } + /** + * Generate an instance of [[HiveConf]] from [[SparkConf]]& hadoop [[Configuration]] & + * formatted extra time configurations with an isolated classloader needed if isolationOn + * for [[HiveClient]] construction + * @param sparkConf a [[SparkConf]] object specifying Spark parameters + * @param classLoader an isolated classloader needed if isolationOn for [[HiveClient]] + * construction + * @param hadoopConf a hadoop [[Configuration]] object, Optional if we want generated it from + * the sparkConf + * @param extraTimeConfs time configurations in the form of long values from the given hadoopConf + */ + private[hive] def newHiveConfigurations( sparkConf: SparkConf = new SparkConf(loadDefaults = true), classLoader: ClassLoader = null)( hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration(sparkConf))( - extraConfig: Map[String, String] = hiveClientConfigurations(hadoopConf)): HiveConf = { + extraTimeConfs: Map[String, String] = formatTimeVarsForHiveClient(hadoopConf)): HiveConf = { val hiveConf = new HiveConf(classOf[SessionState]) // HiveConf is a Hadoop Configuration, which has a field of classLoader and // the initial value will be the current thread's context class loader @@ -256,7 +268,7 @@ private[spark] object HiveUtils extends Logging { // 2: we set all spark confs to this hiveConf. // 3: we set all entries in config to this hiveConf. (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) - ++ sparkConf.getAll.toMap ++ extraConfig).foreach { + ++ sparkConf.getAll.toMap ++ extraTimeConfs).foreach { case (k, v) => logDebug( s""" @@ -316,7 +328,7 @@ private[spark] object HiveUtils extends Logging { protected[hive] def newClientForMetadata( conf: SparkConf, hadoopConf: Configuration): HiveClient = { - val configurations = hiveClientConfigurations(hadoopConf) + val configurations = formatTimeVarsForHiveClient(hadoopConf) newClientForMetadata(conf, hadoopConf, configurations) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 85dd260eddd52..56d03b06d2be0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -136,6 +136,8 @@ private[hive] class HiveClientImpl( // its own state by newState() val ret = SessionState.get if (ret != null) { + // hive.metastore.warehouse.dir is determined in SharedState after the CliSessionState + // instance constructed, we need to follow that change here. Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname)).foreach { dir => ret.getConf.setVar(ConfVars.METASTOREWAREHOUSE, dir) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala index ed475a0261b0b..951ebfad4590e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala @@ -36,7 +36,7 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu hadoopConf.set("hive.metastore.schema.verification", "false") } HiveClientBuilder - .buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf)) + .buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf)) } override def suiteName: String = s"${super.suiteName}($version)" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 1d9c8da996fea..edb9a9ffbaaf6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -127,7 +127,7 @@ class VersionsSuite extends SparkFunSuite with Logging { hadoopConf.set("datanucleus.schema.autoCreateAll", "true") hadoopConf.set("hive.metastore.schema.verification", "false") } - client = buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf)) + client = buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf)) if (versionSpark != null) versionSpark.reset() versionSpark = TestHiveVersion(client) assert(versionSpark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client From f2618b9bd8a8c5c6dcd50257f089e2412b30b252 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 19 Sep 2017 12:53:30 +0800 Subject: [PATCH 5/6] fix ut --- .../hive/thriftserver/SparkSQLCLIDriver.scala | 14 +++++- .../org/apache/spark/sql/hive/HiveUtils.scala | 48 ------------------- .../sql/hive/client/HiveClientImpl.scala | 34 +++++++++++-- 3 files changed, 43 insertions(+), 53 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index a13d33ed4d037..832a15d09599f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -37,6 +37,8 @@ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.log4j.{Level, Logger} import org.apache.thrift.transport.TSocket +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.hive.HiveUtils @@ -81,7 +83,17 @@ private[hive] object SparkSQLCLIDriver extends Logging { System.exit(1) } - val cliConf = HiveUtils.newHiveConfigurations()()() + val sparkConf = new SparkConf(loadDefaults = true) + val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf) + val extraConfigs = HiveUtils.formatTimeVarsForHiveClient(hadoopConf) + + val cliConf = new HiveConf(classOf[SessionState]) + (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) + ++ sparkConf.getAll.toMap ++ extraConfigs).foreach { + case (k, v) => + cliConf.set(k, v) + } + val sessionState = new CliSessionState(cliConf) sessionState.in = System.in diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index ae2c78c255c19..80b9a3dc9605d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -231,54 +231,6 @@ private[spark] object HiveUtils extends Logging { }.toMap } - /** - * Generate an instance of [[HiveConf]] from [[SparkConf]]& hadoop [[Configuration]] & - * formatted extra time configurations with an isolated classloader needed if isolationOn - * for [[HiveClient]] construction - * @param sparkConf a [[SparkConf]] object specifying Spark parameters - * @param classLoader an isolated classloader needed if isolationOn for [[HiveClient]] - * construction - * @param hadoopConf a hadoop [[Configuration]] object, Optional if we want generated it from - * the sparkConf - * @param extraTimeConfs time configurations in the form of long values from the given hadoopConf - */ - - private[hive] def newHiveConfigurations( - sparkConf: SparkConf = new SparkConf(loadDefaults = true), - classLoader: ClassLoader = null)( - hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration(sparkConf))( - extraTimeConfs: Map[String, String] = formatTimeVarsForHiveClient(hadoopConf)): HiveConf = { - val hiveConf = new HiveConf(classOf[SessionState]) - // HiveConf is a Hadoop Configuration, which has a field of classLoader and - // the initial value will be the current thread's context class loader - // (i.e. initClassLoader at here). - // We call initialConf.setClassLoader(initClassLoader) at here to make - // this action explicit. - if (classLoader != null) { - hiveConf.setClassLoader(classLoader) - } - // 1: Take all from the hadoopConf to this hiveConf. - // This hadoopConf contains user settings in Hadoop's core-site.xml file - // and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in - // SharedState and put settings in this hadoopConf instead of relying on HiveConf - // to load user settings. Otherwise, HiveConf's initialize method will override - // settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars - // is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath - // has hive-site.xml. So, HiveConf will use that to override its default values. - // 2: we set all spark confs to this hiveConf. - // 3: we set all entries in config to this hiveConf. - (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) - ++ sparkConf.getAll.toMap ++ extraTimeConfs).foreach { - case (k, v) => - logDebug( - s""" - |Applying Hadoop/Hive/Spark and extra properties to Hive Conf: - |$k=${if (k.toLowerCase(Locale.ROOT).contains("password")) "xxx" else v} - """.stripMargin) - hiveConf.set(k, v) - } - hiveConf - } /** * Check current Thread's SessionState type * @return true when SessionState.get returns an instance of CliSessionState, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 56d03b06d2be0..8170e4f35b72c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -30,15 +30,15 @@ import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} -import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order} +import org.apache.hadoop.hive.metastore.api.{FieldSchema, Order, Database => HiveDatabase} import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor} import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable} import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState - import org.apache.spark.{SparkConf, SparkException} + import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics @@ -53,6 +53,7 @@ import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} import org.apache.spark.sql.hive.client.HiveClientImpl._ +import org.apache.spark.sql.hive.HiveUtils.logDebug import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH import org.apache.spark.sql.types._ import org.apache.spark.util.{CircularBuffer, Utils} @@ -154,8 +155,33 @@ private[hive] class HiveClientImpl( s"(version ${version.fullVersion}) is ${conf.getVar(ConfVars.METASTOREWAREHOUSE)}") private def newState(): SessionState = { - val hiveConf = - HiveUtils.newHiveConfigurations(sparkConf, initClassLoader)(hadoopConf)(extraConfig) + val hiveConf = new HiveConf(classOf[SessionState]) + // HiveConf is a Hadoop Configuration, which has a field of classLoader and + // the initial value will be the current thread's context class loader + // (i.e. initClassLoader at here). + // We call initialConf.setClassLoader(initClassLoader) at here to make + // this action explicit. + hiveConf.setClassLoader(initClassLoader) + + // 1: Take all from the hadoopConf to this hiveConf. + // This hadoopConf contains user settings in Hadoop's core-site.xml file + // and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in + // SharedState and put settings in this hadoopConf instead of relying on HiveConf + // to load user settings. Otherwise, HiveConf's initialize method will override + // settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars + // is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath + // has hive-site.xml. So, HiveConf will use that to override its default values. + // 2: we set all spark confs to this hiveConf. + // 3: we set all entries in config to this hiveConf. + (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) + ++ sparkConf.getAll.toMap ++ extraConfig).foreach { case (k, v) => + logDebug( + s""" + |Applying Hadoop/Hive/Spark and extra properties to Hive Conf: + |$k=${if (k.toLowerCase(Locale.ROOT).contains("password")) "xxx" else v} + """.stripMargin) + hiveConf.set(k, v) + } val state = new SessionState(hiveConf) if (clientLoader.cachedHive != null) { Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) From c5c1c2625d33dd08cbdde2e25041359bbbf50339 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 19 Sep 2017 13:04:55 +0800 Subject: [PATCH 6/6] code style --- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 8170e4f35b72c..c4e48c9360db7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -30,15 +30,15 @@ import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} -import org.apache.hadoop.hive.metastore.api.{FieldSchema, Order, Database => HiveDatabase} +import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order} import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor} import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable} import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics @@ -51,10 +51,8 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} +import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.client.HiveClientImpl._ -import org.apache.spark.sql.hive.HiveUtils.logDebug -import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH import org.apache.spark.sql.types._ import org.apache.spark.util.{CircularBuffer, Utils}