From 20ebf8a9f6393f4605ab48b02fab3218776b9eef Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 6 May 2016 11:40:51 -0700 Subject: [PATCH 1/4] ban dropping default db --- .../apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 3 +++ .../org/apache/spark/sql/execution/command/DDLSuite.scala | 7 +++++++ 2 files changed, 10 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 712770784bf9e..d697b63eced5a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -118,6 +118,9 @@ class SessionCatalog( } def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = { + if (db == "default") { + throw new AnalysisException(s"Can not drop default database") + } externalCatalog.dropDatabase(db, ignoreIfNotExists, cascade) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 6085098a709e4..e5962e227346d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -939,4 +939,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { Row("Usage: a ^ b - Bitwise exclusive OR.") :: Nil ) } + + test("drop default database") { + val message = intercept[AnalysisException] { + sql("DROP DATABASE default") + }.getMessage + assert(message.contains("Can not drop default database")) + } } From 67c2ebc2db2191943586344657072e859846f21d Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 7 May 2016 08:03:13 -0700 Subject: [PATCH 2/4] address comments. --- .../sql/catalyst/catalog/SessionCatalog.scala | 12 +++++++++-- .../sql/execution/command/DDLSuite.scala | 21 +++++++++++++++---- .../sql/hive/execution/HiveDDLSuite.scala | 19 +++++++++++++---- 3 files changed, 42 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index d697b63eced5a..5377059ee8833 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -92,6 +92,13 @@ class SessionCatalog( if (conf.caseSensitiveAnalysis) name else name.toLowerCase } + /** + * Format database name, taking into account case sensitivity. + */ + protected[this] def formatDatabaseName(name: String): String = { + if (conf.caseSensitiveAnalysis) name else name.toLowerCase + } + /** * This method is used to make the given path qualified before we * store this path in the underlying external catalog. So, when a path @@ -118,10 +125,11 @@ class SessionCatalog( } def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = { - if (db == "default") { + val dbName = formatDatabaseName(db) + if (dbName == "default") { throw new AnalysisException(s"Can not drop default database") } - externalCatalog.dropDatabase(db, ignoreIfNotExists, cascade) + externalCatalog.dropDatabase(dbName, ignoreIfNotExists, cascade) } def alterDatabase(dbDefinition: CatalogDatabase): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index e5962e227346d..56244f26fbd0a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -941,9 +941,22 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } test("drop default database") { - val message = intercept[AnalysisException] { - sql("DROP DATABASE default") - }.getMessage - assert(message.contains("Can not drop default database")) + Seq("true", "false").foreach { caseSensitive => + withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) { + var message = intercept[AnalysisException] { + sql("DROP DATABASE default") + }.getMessage + assert(message.contains("Can not drop default database")) + + message = intercept[AnalysisException] { + sql("DROP DATABASE DeFault") + }.getMessage + if (caseSensitive == "true") { + assert(message.contains("Database 'DeFault' does not exist")) + } else { + assert(message.contains("Can not drop default database")) + } + } + } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index d55ddb251d00d..f7165790050dd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -498,9 +498,20 @@ class HiveDDLSuite } test("drop default database") { - val message = intercept[AnalysisException] { - sql("DROP DATABASE default") - }.getMessage - assert(message.contains("Can not drop default database")) + Seq("true", "false").foreach { caseSensitive => + withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) { + var message = intercept[AnalysisException] { + sql("DROP DATABASE default") + }.getMessage + assert(message.contains("Can not drop default database")) + + // SQLConf.CASE_SENSITIVE does not affect the result + // because the Hive metastore is not case sensitive. + message = intercept[AnalysisException] { + sql("DROP DATABASE DeFault") + }.getMessage + assert(message.contains("Can not drop default database")) + } + } } } From 223ed36b1efc234b024e617c533ba3bdd625b2ea Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 9 May 2016 13:10:55 -0700 Subject: [PATCH 3/4] try 1 --- .../sql/catalyst/catalog/SessionCatalog.scala | 53 ++++++++++--------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 5377059ee8833..590fbf2fa69ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -119,8 +119,9 @@ class SessionCatalog( def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = { val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString + val dbName = formatDatabaseName(dbDefinition.name) externalCatalog.createDatabase( - dbDefinition.copy(locationUri = qualifiedPath), + dbDefinition.copy(name = dbName, locationUri = qualifiedPath), ignoreIfExists) } @@ -137,11 +138,13 @@ class SessionCatalog( } def getDatabaseMetadata(db: String): CatalogDatabase = { - externalCatalog.getDatabase(db) + val dbName = formatDatabaseName(db) + externalCatalog.getDatabase(dbName) } def databaseExists(db: String): Boolean = { - externalCatalog.databaseExists(db) + val dbName = formatDatabaseName(db) + externalCatalog.databaseExists(dbName) } def listDatabases(): Seq[String] = { @@ -155,10 +158,11 @@ class SessionCatalog( def getCurrentDatabase: String = synchronized { currentDb } def setCurrentDatabase(db: String): Unit = { - if (!databaseExists(db)) { - throw new AnalysisException(s"Database '$db' does not exist.") + val dbName = formatDatabaseName(db) + if (!databaseExists(dbName)) { + throw new AnalysisException(s"Database '$dbName' does not exist.") } - synchronized { currentDb = db } + synchronized { currentDb = dbName } } /** @@ -166,7 +170,7 @@ class SessionCatalog( * by users. */ def getDefaultDBPath(db: String): String = { - val database = if (conf.caseSensitiveAnalysis) db else db.toLowerCase + val database = formatDatabaseName(db) new Path(new Path(conf.warehousePath), database + ".db").toString } @@ -188,7 +192,7 @@ class SessionCatalog( * If no such database is specified, create it in the current database. */ def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { - val db = tableDefinition.identifier.database.getOrElse(getCurrentDatabase) + val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableDefinition.identifier.table) val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db))) externalCatalog.createTable(db, newTableDefinition, ignoreIfExists) @@ -204,7 +208,7 @@ class SessionCatalog( * this becomes a no-op. */ def alterTable(tableDefinition: CatalogTable): Unit = { - val db = tableDefinition.identifier.database.getOrElse(getCurrentDatabase) + val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableDefinition.identifier.table) val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db))) externalCatalog.alterTable(db, newTableDefinition) @@ -216,7 +220,7 @@ class SessionCatalog( * If the specified table is not found in the database then an [[AnalysisException]] is thrown. */ def getTableMetadata(name: TableIdentifier): CatalogTable = { - val db = name.database.getOrElse(getCurrentDatabase) + val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val table = formatTableName(name.table) externalCatalog.getTable(db, table) } @@ -227,7 +231,7 @@ class SessionCatalog( * If the specified table is not found in the database then return None if it doesn't exist. */ def getTableMetadataOption(name: TableIdentifier): Option[CatalogTable] = { - val db = name.database.getOrElse(getCurrentDatabase) + val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val table = formatTableName(name.table) externalCatalog.getTableOption(db, table) } @@ -242,7 +246,7 @@ class SessionCatalog( loadPath: String, isOverwrite: Boolean, holdDDLTime: Boolean): Unit = { - val db = name.database.getOrElse(getCurrentDatabase) + val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val table = formatTableName(name.table) externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime) } @@ -260,14 +264,14 @@ class SessionCatalog( holdDDLTime: Boolean, inheritTableSpecs: Boolean, isSkewedStoreAsSubdir: Boolean): Unit = { - val db = name.database.getOrElse(getCurrentDatabase) + val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val table = formatTableName(name.table) externalCatalog.loadPartition(db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs, isSkewedStoreAsSubdir) } def defaultTablePath(tableIdent: TableIdentifier): String = { - val dbName = tableIdent.database.getOrElse(getCurrentDatabase) + val dbName = formatDatabaseName(tableIdent.database.getOrElse(getCurrentDatabase)) val dbLocation = getDatabaseMetadata(dbName).locationUri new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toString @@ -301,8 +305,8 @@ class SessionCatalog( * This assumes the database specified in `oldName` matches the one specified in `newName`. */ def renameTable(oldName: TableIdentifier, newName: TableIdentifier): Unit = synchronized { - val db = oldName.database.getOrElse(currentDb) - val newDb = newName.database.getOrElse(currentDb) + val db = formatDatabaseName(oldName.database.getOrElse(currentDb)) + val newDb = formatDatabaseName(newName.database.getOrElse(currentDb)) if (db != newDb) { throw new AnalysisException( s"RENAME TABLE source and destination databases do not match: '$db' != '$newDb'") @@ -326,7 +330,7 @@ class SessionCatalog( * the same name, then, if that does not exist, drop the table from the current database. */ def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = synchronized { - val db = name.database.getOrElse(currentDb) + val db = formatDatabaseName(name.database.getOrElse(currentDb)) val table = formatTableName(name.table) if (name.database.isDefined || !tempTables.contains(table)) { // When ignoreIfNotExists is false, no exception is issued when the table does not exist. @@ -350,7 +354,7 @@ class SessionCatalog( */ def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = { synchronized { - val db = name.database.getOrElse(currentDb) + val db = formatDatabaseName(name.database.getOrElse(currentDb)) val table = formatTableName(name.table) val relation = if (name.database.isDefined || !tempTables.contains(table)) { @@ -375,7 +379,7 @@ class SessionCatalog( * contain the table. */ def tableExists(name: TableIdentifier): Boolean = synchronized { - val db = name.database.getOrElse(currentDb) + val db = formatDatabaseName(name.database.getOrElse(currentDb)) val table = formatTableName(name.table) if (name.database.isDefined || !tempTables.contains(table)) { externalCatalog.tableExists(db, table) @@ -397,14 +401,15 @@ class SessionCatalog( /** * List all tables in the specified database, including temporary tables. */ - def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*") + def listTables(db: String): Seq[TableIdentifier] = listTables(formatDatabaseName(db), "*") /** * List all matching tables in the specified database, including temporary tables. */ def listTables(db: String, pattern: String): Seq[TableIdentifier] = { + val dbName = formatDatabaseName(db) val dbTables = - externalCatalog.listTables(db, pattern).map { t => TableIdentifier(t, Some(db)) } + externalCatalog.listTables(dbName, pattern).map { t => TableIdentifier(t, Some(dbName)) } synchronized { val _tempTables = StringUtils.filterPattern(tempTables.keys.toSeq, pattern) .map { t => TableIdentifier(t) } @@ -460,7 +465,7 @@ class SessionCatalog( tableName: TableIdentifier, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = { - val db = tableName.database.getOrElse(getCurrentDatabase) + val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) externalCatalog.createPartitions(db, table, parts, ignoreIfExists) } @@ -473,7 +478,7 @@ class SessionCatalog( tableName: TableIdentifier, parts: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean): Unit = { - val db = tableName.database.getOrElse(getCurrentDatabase) + val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists) } @@ -488,7 +493,7 @@ class SessionCatalog( tableName: TableIdentifier, specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit = { - val db = tableName.database.getOrElse(getCurrentDatabase) + val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) externalCatalog.renamePartitions(db, table, specs, newSpecs) } From 87099f1ea7c9aa2327ca37fd8672ab156233f393 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 9 May 2016 15:59:45 -0700 Subject: [PATCH 4/4] address comments --- .../sql/catalyst/catalog/SessionCatalog.scala | 30 +++++++++++-------- .../sql/execution/command/DDLSuite.scala | 8 ++--- .../spark/sql/hive/HiveSessionCatalog.scala | 15 ++++++---- 3 files changed, 30 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 590fbf2fa69ca..35924ee8e6c19 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -82,7 +82,7 @@ class SessionCatalog( CatalogDatabase(defaultName, "default database", conf.warehousePath, Map()) // Initialize default database if it doesn't already exist createDatabase(defaultDbDefinition, ignoreIfExists = true) - defaultName + formatDatabaseName(defaultName) } /** @@ -134,7 +134,8 @@ class SessionCatalog( } def alterDatabase(dbDefinition: CatalogDatabase): Unit = { - externalCatalog.alterDatabase(dbDefinition) + val dbName = formatDatabaseName(dbDefinition.name) + externalCatalog.alterDatabase(dbDefinition.copy(name = dbName)) } def getDatabaseMetadata(db: String): CatalogDatabase = { @@ -508,7 +509,7 @@ class SessionCatalog( * this becomes a no-op. */ def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = { - val db = tableName.database.getOrElse(getCurrentDatabase) + val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) externalCatalog.alterPartitions(db, table, parts) } @@ -518,7 +519,7 @@ class SessionCatalog( * If no database is specified, assume the table is in the current database. */ def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = { - val db = tableName.database.getOrElse(getCurrentDatabase) + val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) externalCatalog.getPartition(db, table, spec) } @@ -533,7 +534,7 @@ class SessionCatalog( def listPartitions( tableName: TableIdentifier, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = { - val db = tableName.database.getOrElse(getCurrentDatabase) + val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) externalCatalog.listPartitions(db, table, partialSpec) } @@ -556,7 +557,7 @@ class SessionCatalog( * If no such database is specified, create it in the current database. */ def createFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = { - val db = funcDefinition.identifier.database.getOrElse(getCurrentDatabase) + val db = formatDatabaseName(funcDefinition.identifier.database.getOrElse(getCurrentDatabase)) val identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)) val newFuncDefinition = funcDefinition.copy(identifier = identifier) if (!functionExists(identifier)) { @@ -571,7 +572,7 @@ class SessionCatalog( * If no database is specified, assume the function is in the current database. */ def dropFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = { - val db = name.database.getOrElse(getCurrentDatabase) + val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val identifier = name.copy(database = Some(db)) if (functionExists(identifier)) { // TODO: registry should just take in FunctionIdentifier for type safety @@ -595,7 +596,7 @@ class SessionCatalog( * If no database is specified, this will return the function in the current database. */ def getFunctionMetadata(name: FunctionIdentifier): CatalogFunction = { - val db = name.database.getOrElse(getCurrentDatabase) + val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) externalCatalog.getFunction(db, name.funcName) } @@ -603,7 +604,7 @@ class SessionCatalog( * Check if the specified function exists. */ def functionExists(name: FunctionIdentifier): Boolean = { - val db = name.database.getOrElse(getCurrentDatabase) + val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) functionRegistry.functionExists(name.unquotedString) || externalCatalog.functionExists(db, name.funcName) } @@ -670,7 +671,8 @@ class SessionCatalog( */ private[spark] def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = synchronized { // TODO: just make function registry take in FunctionIdentifier instead of duplicating this - val qualifiedName = name.copy(database = name.database.orElse(Some(currentDb))) + val database = name.database.orElse(Some(currentDb)).map(formatDatabaseName) + val qualifiedName = name.copy(database = database) functionRegistry.lookupFunction(name.funcName) .orElse(functionRegistry.lookupFunction(qualifiedName.unquotedString)) .getOrElse { @@ -709,7 +711,8 @@ class SessionCatalog( } // If the name itself is not qualified, add the current database to it. - val qualifiedName = if (name.database.isEmpty) name.copy(database = Some(currentDb)) else name + val database = name.database.orElse(Some(currentDb)).map(formatDatabaseName) + val qualifiedName = name.copy(database = database) if (functionRegistry.functionExists(qualifiedName.unquotedString)) { // This function has been already loaded into the function registry. @@ -749,8 +752,9 @@ class SessionCatalog( * List all matching functions in the specified database, including temporary functions. */ def listFunctions(db: String, pattern: String): Seq[FunctionIdentifier] = { - val dbFunctions = - externalCatalog.listFunctions(db, pattern).map { f => FunctionIdentifier(f, Some(db)) } + val dbName = formatDatabaseName(db) + val dbFunctions = externalCatalog.listFunctions(dbName, pattern) + .map { f => FunctionIdentifier(f, Some(dbName)) } val loadedFunctions = StringUtils.filterPattern(functionRegistry.listFunction(), pattern) .map { f => FunctionIdentifier(f) } dbFunctions ++ loadedFunctions diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 56244f26fbd0a..0590bb9a4d9df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -605,16 +605,16 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { checkAnswer( sql("SHOW DATABASES LIKE '*db1A'"), - Row("showdb1A") :: Nil) + Row("showdb1a") :: Nil) checkAnswer( sql("SHOW DATABASES LIKE 'showdb1A'"), - Row("showdb1A") :: Nil) + Row("showdb1a") :: Nil) checkAnswer( sql("SHOW DATABASES LIKE '*db1A|*db2B'"), - Row("showdb1A") :: - Row("showdb2B") :: Nil) + Row("showdb1a") :: + Row("showdb2b") :: Nil) checkAnswer( sql("SHOW DATABASES LIKE 'non-existentdb'"), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 3220f143aa23f..75a252ccba569 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -62,7 +62,8 @@ private[sql] class HiveSessionCatalog( override def lookupRelation(name: TableIdentifier, alias: Option[String]): LogicalPlan = { val table = formatTableName(name.table) if (name.database.isDefined || !tempTables.contains(table)) { - val newName = name.copy(table = table) + val database = name.database.map(formatDatabaseName) + val newName = name.copy(database = database, table = table) metastoreCatalog.lookupRelation(newName, alias) } else { val relation = tempTables(table) @@ -181,10 +182,12 @@ private[sql] class HiveSessionCatalog( // // This function is a Hive builtin function. // ... // } - Try(super.lookupFunction(name, children)) match { + val database = name.database.map(formatDatabaseName) + val funcName = name.copy(database = database) + Try(super.lookupFunction(funcName, children)) match { case Success(expr) => expr case Failure(error) => - if (functionRegistry.functionExists(name.unquotedString)) { + if (functionRegistry.functionExists(funcName.unquotedString)) { // If the function actually exists in functionRegistry, it means that there is an // error when we create the Expression using the given children. // We need to throw the original exception. @@ -193,7 +196,7 @@ private[sql] class HiveSessionCatalog( // This function is not in functionRegistry, let's try to load it as a Hive's // built-in function. // Hive is case insensitive. - val functionName = name.unquotedString.toLowerCase + val functionName = funcName.unquotedString.toLowerCase // TODO: This may not really work for current_user because current_user is not evaluated // with session info. // We do not need to use executionHive at here because we only load @@ -201,12 +204,12 @@ private[sql] class HiveSessionCatalog( val functionInfo = { try { Option(HiveFunctionRegistry.getFunctionInfo(functionName)).getOrElse( - failFunctionLookup(name.unquotedString)) + failFunctionLookup(funcName.unquotedString)) } catch { // If HiveFunctionRegistry.getFunctionInfo throws an exception, // we are failing to load a Hive builtin function, which means that // the given function is not a Hive builtin function. - case NonFatal(e) => failFunctionLookup(name.unquotedString) + case NonFatal(e) => failFunctionLookup(funcName.unquotedString) } } val className = functionInfo.getFunctionClass.getName