From 365299739c92132cd34110a00e860e218e7ca9c6 Mon Sep 17 00:00:00 2001 From: Alex Liu Date: Wed, 7 Jan 2015 21:09:17 -0800 Subject: [PATCH 1/4] [SPARK-4943][SQL] Allow table name having dot to support db/catalog ... --- .../apache/spark/sql/catalyst/SqlParser.scala | 19 +++- .../sql/catalyst/analysis/Analyzer.scala | 8 +- .../spark/sql/catalyst/analysis/Catalog.scala | 103 ++++++++---------- .../sql/catalyst/analysis/unresolved.scala | 3 +- .../spark/sql/catalyst/dsl/package.scala | 2 +- .../sql/catalyst/analysis/AnalysisSuite.scala | 20 ++-- .../analysis/DecimalPrecisionSuite.scala | 2 +- .../org/apache/spark/sql/SQLContext.scala | 6 +- .../org/apache/spark/sql/SchemaRDDLike.scala | 4 +- .../org/apache/spark/sql/JoinSuite.scala | 4 +- .../apache/spark/sql/hive/HiveContext.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 43 ++++++-- .../org/apache/spark/sql/hive/HiveQl.scala | 25 +++-- .../org/apache/spark/sql/hive/TestHive.scala | 2 +- .../hive/execution/CreateTableAsSelect.scala | 4 +- .../spark/sql/hive/execution/commands.scala | 2 +- .../spark/sql/hive/StatisticsSuite.scala | 4 +- 17 files changed, 137 insertions(+), 116 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index f79d4ff444dc0..1283442ebf700 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -178,10 +178,23 @@ class SqlParser extends AbstractSparkSQLParser { joinedRelation | relationFactor protected lazy val relationFactor: Parser[LogicalPlan] = - ( ident ~ (opt(AS) ~> opt(ident)) ^^ { - case tableName ~ alias => UnresolvedRelation(None, tableName, alias) + ( + ident ~ ("." ~> ident) ~ ("." ~> ident) ~ ("." ~> ident) ~ (opt(AS) ~> opt(ident)) ^^ { + case reserveName1 ~ reserveName2 ~ dbName ~ tableName ~ alias => + UnresolvedRelation(IndexedSeq(tableName, dbName, reserveName2, reserveName1), alias) } - | ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, s) } + | ident ~ ("." ~> ident) ~ ("." ~> ident) ~ (opt(AS) ~> opt(ident)) ^^ { + case reserveName1 ~ dbName ~ tableName ~ alias => + UnresolvedRelation(IndexedSeq(tableName, dbName, reserveName1), alias) + } + | ident ~ ("." ~> ident) ~ (opt(AS) ~> opt(ident)) ^^ { + case dbName ~ tableName ~ alias => + UnresolvedRelation(IndexedSeq(tableName, dbName), alias) + } + | ident ~ (opt(AS) ~> opt(ident)) ^^ { + case tableName ~ alias => UnresolvedRelation(IndexedSeq(tableName), alias) + } + | ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, s) } ) protected lazy val joinedRelation: Parser[LogicalPlan] = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 72680f37a0b4d..c009cc1e1e85c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -228,11 +228,11 @@ class Analyzer(catalog: Catalog, */ object ResolveRelations extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case i @ InsertIntoTable(UnresolvedRelation(databaseName, name, alias), _, _, _) => + case i @ InsertIntoTable(UnresolvedRelation(tableIdentifier, alias), _, _, _) => i.copy( - table = EliminateAnalysisOperators(catalog.lookupRelation(databaseName, name, alias))) - case UnresolvedRelation(databaseName, name, alias) => - catalog.lookupRelation(databaseName, name, alias) + table = EliminateAnalysisOperators(catalog.lookupRelation(tableIdentifier, alias))) + case UnresolvedRelation(tableIdentifier, alias) => + catalog.lookupRelation(tableIdentifier, alias) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index 0415d74bd8141..9228e66b0e57e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -28,77 +28,63 @@ trait Catalog { def caseSensitive: Boolean - def tableExists(db: Option[String], tableName: String): Boolean + def tableExists(tableIdentifier: Seq[String]): Boolean def lookupRelation( - databaseName: Option[String], - tableName: String, - alias: Option[String] = None): LogicalPlan + tableIdentifier: Seq[String], + alias: Option[String] = None): LogicalPlan - def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit + def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit - def unregisterTable(databaseName: Option[String], tableName: String): Unit + def unregisterTable(tableIdentifier: Seq[String]): Unit def unregisterAllTables(): Unit - protected def processDatabaseAndTableName( - databaseName: Option[String], - tableName: String): (Option[String], String) = { + protected def processTableIdentifier(tableIdentifier: Seq[String]): + Seq[String] = { if (!caseSensitive) { - (databaseName.map(_.toLowerCase), tableName.toLowerCase) + tableIdentifier.map(_.toLowerCase) } else { - (databaseName, tableName) + tableIdentifier } } - protected def processDatabaseAndTableName( - databaseName: String, - tableName: String): (String, String) = { - if (!caseSensitive) { - (databaseName.toLowerCase, tableName.toLowerCase) - } else { - (databaseName, tableName) - } - } } class SimpleCatalog(val caseSensitive: Boolean) extends Catalog { val tables = new mutable.HashMap[String, LogicalPlan]() override def registerTable( - databaseName: Option[String], - tableName: String, + tableIdentifier: Seq[String], plan: LogicalPlan): Unit = { - val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName) - tables += ((tblName, plan)) + val tableIdent = processTableIdentifier(tableIdentifier) + tables += ((tableIdent.mkString("."), plan)) } - override def unregisterTable( - databaseName: Option[String], - tableName: String) = { - val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName) - tables -= tblName + override def unregisterTable(tableIdentifier: Seq[String]) = { + val tableIdent = processTableIdentifier(tableIdentifier) + tables -= tableIdent.mkString(".") } override def unregisterAllTables() = { tables.clear() } - override def tableExists(db: Option[String], tableName: String): Boolean = { - val (dbName, tblName) = processDatabaseAndTableName(db, tableName) - tables.get(tblName) match { + override def tableExists(tableIdentifier: Seq[String]): Boolean = { + val tableIdent = processTableIdentifier(tableIdentifier) + tables.get(tableIdent.mkString(".")) match { case Some(_) => true case None => false } } override def lookupRelation( - databaseName: Option[String], - tableName: String, + tableIdentifier: Seq[String], alias: Option[String] = None): LogicalPlan = { - val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName) - val table = tables.getOrElse(tblName, sys.error(s"Table Not Found: $tableName")) - val tableWithQualifiers = Subquery(tblName, table) + val tableIdent = processTableIdentifier(tableIdentifier) + val tableFullName = tableIdent.mkString(".") + val table = tables.getOrElse(tableFullName, sys.error(s"Table Not Found: $tableFullName")) + val tableWithQualifiers = Subquery(tableIdent.head, table) // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are // properly qualified with this alias. @@ -115,43 +101,41 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog { trait OverrideCatalog extends Catalog { // TODO: This doesn't work when the database changes... - val overrides = new mutable.HashMap[(Option[String],String), LogicalPlan]() + val overrides = new mutable.HashMap[String, LogicalPlan]() - abstract override def tableExists(db: Option[String], tableName: String): Boolean = { - val (dbName, tblName) = processDatabaseAndTableName(db, tableName) - overrides.get((dbName, tblName)) match { + abstract override def tableExists(tableIdentifier: Seq[String]): Boolean = { + val tableIdent = processTableIdentifier(tableIdentifier).mkString(".") + overrides.get(tableIdent) match { case Some(_) => true - case None => super.tableExists(db, tableName) + case None => super.tableExists(tableIdentifier) } } abstract override def lookupRelation( - databaseName: Option[String], - tableName: String, + tableIdentifier: Seq[String], alias: Option[String] = None): LogicalPlan = { - val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName) - val overriddenTable = overrides.get((dbName, tblName)) - val tableWithQualifers = overriddenTable.map(r => Subquery(tblName, r)) + val tableIdent = processTableIdentifier(tableIdentifier) + val overriddenTable = overrides.get(tableIdent.mkString(".")) + val tableWithQualifers = overriddenTable.map(r => Subquery(tableIdent.head, r)) // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are // properly qualified with this alias. val withAlias = tableWithQualifers.map(r => alias.map(a => Subquery(a, r)).getOrElse(r)) - withAlias.getOrElse(super.lookupRelation(dbName, tblName, alias)) + withAlias.getOrElse(super.lookupRelation(tableIdentifier, alias)) } override def registerTable( - databaseName: Option[String], - tableName: String, + tableIdentifier: Seq[String], plan: LogicalPlan): Unit = { - val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName) - overrides.put((dbName, tblName), plan) + val tableIdent = processTableIdentifier(tableIdentifier).mkString(".") + overrides.put(tableIdent, plan) } - override def unregisterTable(databaseName: Option[String], tableName: String): Unit = { - val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName) - overrides.remove((dbName, tblName)) + override def unregisterTable(tableIdentifier: Seq[String]): Unit = { + val tableIdent = processTableIdentifier(tableIdentifier).mkString(".") + overrides.remove(tableIdent) } override def unregisterAllTables(): Unit = { @@ -167,22 +151,21 @@ object EmptyCatalog extends Catalog { val caseSensitive: Boolean = true - def tableExists(db: Option[String], tableName: String): Boolean = { + def tableExists(tableIdentifier: Seq[String]): Boolean = { throw new UnsupportedOperationException } def lookupRelation( - databaseName: Option[String], - tableName: String, + tableIdentifier: Seq[String], alias: Option[String] = None) = { throw new UnsupportedOperationException } - def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = { + def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = { throw new UnsupportedOperationException } - def unregisterTable(databaseName: Option[String], tableName: String): Unit = { + def unregisterTable(tableIdentifier: Seq[String]): Unit = { throw new UnsupportedOperationException } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 77d84e1687e1b..71a738a0b2ca0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -34,8 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str * Holds the name of a relation that has yet to be looked up in a [[Catalog]]. */ case class UnresolvedRelation( - databaseName: Option[String], - tableName: String, + tableIdentifier: Seq[String], alias: Option[String] = None) extends LeafNode { override def output = Nil override lazy val resolved = false diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 9608e15c0f302..67110837e4d5c 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -290,7 +290,7 @@ package object dsl { def insertInto(tableName: String, overwrite: Boolean = false) = InsertIntoTable( - analysis.UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite) + analysis.UnresolvedRelation(IndexedSeq(tableName)), Map.empty, logicalPlan, overwrite) def analyze = analysis.SimpleAnalyzer(logicalPlan) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 82f2101d8ce17..19955cca2999d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -44,8 +44,8 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter { AttributeReference("e", ShortType)()) before { - caseSensitiveCatalog.registerTable(None, "TaBlE", testRelation) - caseInsensitiveCatalog.registerTable(None, "TaBlE", testRelation) + caseSensitiveCatalog.registerTable(IndexedSeq("TaBlE"), testRelation) + caseInsensitiveCatalog.registerTable(IndexedSeq("TaBlE"), testRelation) } test("union project *") { @@ -64,45 +64,45 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter { assert( caseSensitiveAnalyze( Project(Seq(UnresolvedAttribute("TbL.a")), - UnresolvedRelation(None, "TaBlE", Some("TbL")))) === + UnresolvedRelation(IndexedSeq("TaBlE"), Some("TbL")))) === Project(testRelation.output, testRelation)) val e = intercept[TreeNodeException[_]] { caseSensitiveAnalyze( Project(Seq(UnresolvedAttribute("tBl.a")), - UnresolvedRelation(None, "TaBlE", Some("TbL")))) + UnresolvedRelation(IndexedSeq("TaBlE"), Some("TbL")))) } assert(e.getMessage().toLowerCase.contains("unresolved")) assert( caseInsensitiveAnalyze( Project(Seq(UnresolvedAttribute("TbL.a")), - UnresolvedRelation(None, "TaBlE", Some("TbL")))) === + UnresolvedRelation(IndexedSeq("TaBlE"), Some("TbL")))) === Project(testRelation.output, testRelation)) assert( caseInsensitiveAnalyze( Project(Seq(UnresolvedAttribute("tBl.a")), - UnresolvedRelation(None, "TaBlE", Some("TbL")))) === + UnresolvedRelation(IndexedSeq("TaBlE"), Some("TbL")))) === Project(testRelation.output, testRelation)) } test("resolve relations") { val e = intercept[RuntimeException] { - caseSensitiveAnalyze(UnresolvedRelation(None, "tAbLe", None)) + caseSensitiveAnalyze(UnresolvedRelation(IndexedSeq("tAbLe"), None)) } assert(e.getMessage == "Table Not Found: tAbLe") assert( - caseSensitiveAnalyze(UnresolvedRelation(None, "TaBlE", None)) === + caseSensitiveAnalyze(UnresolvedRelation(IndexedSeq("TaBlE"), None)) === testRelation) assert( - caseInsensitiveAnalyze(UnresolvedRelation(None, "tAbLe", None)) === + caseInsensitiveAnalyze(UnresolvedRelation(IndexedSeq("tAbLe"), None)) === testRelation) assert( - caseInsensitiveAnalyze(UnresolvedRelation(None, "TaBlE", None)) === + caseInsensitiveAnalyze(UnresolvedRelation(IndexedSeq("TaBlE"), None)) === testRelation) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala index 3677a6e72e23a..2e7b1d420f1bd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala @@ -41,7 +41,7 @@ class DecimalPrecisionSuite extends FunSuite with BeforeAndAfter { val f: Expression = UnresolvedAttribute("f") before { - catalog.registerTable(None, "table", relation) + catalog.registerTable(IndexedSeq("table"), relation) } private def checkType(expression: Expression, expectedType: DataType): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 6a1a4d995bf61..4209a4f220b6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -276,7 +276,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * @group userf */ def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = { - catalog.registerTable(None, tableName, rdd.queryExecution.logical) + catalog.registerTable(IndexedSeq(tableName), rdd.queryExecution.logical) } /** @@ -289,7 +289,7 @@ class SQLContext(@transient val sparkContext: SparkContext) */ def dropTempTable(tableName: String): Unit = { tryUncacheQuery(table(tableName)) - catalog.unregisterTable(None, tableName) + catalog.unregisterTable(IndexedSeq(tableName)) } /** @@ -308,7 +308,7 @@ class SQLContext(@transient val sparkContext: SparkContext) /** Returns the specified table as a SchemaRDD */ def table(tableName: String): SchemaRDD = - new SchemaRDD(this, catalog.lookupRelation(None, tableName)) + new SchemaRDD(this, catalog.lookupRelation(IndexedSeq(tableName))) /** * :: DeveloperApi :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala index fd5f4abcbcd65..b6d1b4e2cb02c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala @@ -97,8 +97,8 @@ private[sql] trait SchemaRDDLike { */ @Experimental def insertInto(tableName: String, overwrite: Boolean): Unit = - sqlContext.executePlan( - InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite)).toRdd + sqlContext.executePlan(InsertIntoTable(UnresolvedRelation(IndexedSeq(tableName)), + Map.empty, logicalPlan, overwrite)).toRdd /** * :: Experimental :: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 1a4232dab86e7..77f89a3753a1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -302,8 +302,8 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { upperCaseData.where('N <= 4).registerTempTable("left") upperCaseData.where('N >= 3).registerTempTable("right") - val left = UnresolvedRelation(None, "left", None) - val right = UnresolvedRelation(None, "right", None) + val left = UnresolvedRelation(IndexedSeq("left"), None) + val right = UnresolvedRelation(IndexedSeq("right"), None) checkAnswer( left.join(right, FullOuter, Some("left.N".attr === "right.N".attr)), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 982e0593fcfd1..6ef7bd5810d36 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -124,7 +124,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { * in the Hive metastore. */ def analyze(tableName: String) { - val relation = EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) + val relation = EliminateAnalysisOperators(catalog.lookupRelation(IndexedSeq(tableName))) relation match { case relation: MetastoreRelation => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index b31a3ec25096b..98ed020473077 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -57,18 +57,19 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val caseSensitive: Boolean = false - def tableExists(db: Option[String], tableName: String): Boolean = { - val (databaseName, tblName) = processDatabaseAndTableName( - db.getOrElse(hive.sessionState.getCurrentDatabase), tableName) - client.getTable(databaseName, tblName, false) != null + def tableExists(tableIdentifier: Seq[String]): Boolean = { + val tableIdent = processTableIdentifier(tableIdentifier) + val (databaseName, tblName) = + (tableIdent.lift(1).getOrElse(hive.sessionState.getCurrentDatabase), tableIdent.head) + client.getTable(databaseName, tblName) != null } def lookupRelation( - db: Option[String], - tableName: String, + tableIdentifier: Seq[String], alias: Option[String]): LogicalPlan = synchronized { + val tableIdent = processTableIdentifier(tableIdentifier) val (databaseName, tblName) = - processDatabaseAndTableName(db.getOrElse(hive.sessionState.getCurrentDatabase), tableName) + (tableIdent.lift(1).getOrElse(hive.sessionState.getCurrentDatabase), tableIdent.head) val table = client.getTable(databaseName, tblName) if (table.isView) { // if the unresolved relation is from hive view @@ -251,6 +252,26 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } } + protected def processDatabaseAndTableName( + databaseName: Option[String], + tableName: String): (Option[String], String) = { + if (!caseSensitive) { + (databaseName.map(_.toLowerCase), tableName.toLowerCase) + } else { + (databaseName, tableName) + } + } + + protected def processDatabaseAndTableName( + databaseName: String, + tableName: String): (String, String) = { + if (!caseSensitive) { + (databaseName.toLowerCase, tableName.toLowerCase) + } else { + (databaseName, tableName) + } + } + /** * Creates any tables required for query execution. * For example, because of a CREATE TABLE X AS statement. @@ -270,7 +291,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) // Get the CreateTableDesc from Hive SemanticAnalyzer - val desc: Option[CreateTableDesc] = if (tableExists(Some(databaseName), tblName)) { + val desc: Option[CreateTableDesc] = if (tableExists(IndexedSeq(tblName, databaseName))) { None } else { val sa = new SemanticAnalyzer(hive.hiveconf) { @@ -352,15 +373,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore. * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]]. */ - override def registerTable( - databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = ??? + override def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = ??? /** * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore. * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]]. */ - override def unregisterTable( - databaseName: Option[String], tableName: String): Unit = ??? + override def unregisterTable(tableIdentifier: Seq[String]): Unit = ??? override def unregisterAllTables() = {} } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 8a9613cf96e54..2cca0c1669a06 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -386,6 +386,13 @@ private[hive] object HiveQl { (db, tableName) } + protected def extractTableIdent(tableNameParts: Node): IndexedSeq[String] = { + tableNameParts.getChildren.map { case Token(part, Nil) => cleanIdentifier(part) } match { + case Seq(tableOnly) => IndexedSeq(tableOnly) + case Seq(databaseName, table) => IndexedSeq(table, databaseName) + } + } + /** * SELECT MAX(value) FROM src GROUP BY k1, k2, k3 GROUPING SETS((k1, k2), (k2)) * is equivalent to @@ -475,16 +482,16 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case Token(".", dbName :: tableName :: Nil) => // It is describing a table with the format like "describe db.table". // TODO: Actually, a user may mean tableName.columnName. Need to resolve this issue. - val (db, tableName) = extractDbNameTableName(nameParts.head) + val tableIdent = extractTableIdent(nameParts.head) DescribeCommand( - UnresolvedRelation(db, tableName, None), extended.isDefined) + UnresolvedRelation(tableIdent, None), extended.isDefined) case Token(".", dbName :: tableName :: colName :: Nil) => // It is describing a column with the format like "describe db.table column". NativePlaceholder case tableName => // It is describing a table with the format like "describe table". DescribeCommand( - UnresolvedRelation(None, tableName.getText, None), + UnresolvedRelation(IndexedSeq(tableName.getText), None), extended.isDefined) } } @@ -757,13 +764,13 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C nonAliasClauses) } - val (db, tableName) = + val tableIdent = tableNameParts.getChildren.map{ case Token(part, Nil) => cleanIdentifier(part)} match { - case Seq(tableOnly) => (None, tableOnly) - case Seq(databaseName, table) => (Some(databaseName), table) + case Seq(tableOnly) => IndexedSeq(tableOnly) + case Seq(databaseName, table) => IndexedSeq(table, databaseName) } val alias = aliasClause.map { case Token(a, Nil) => cleanIdentifier(a) } - val relation = UnresolvedRelation(db, tableName, alias) + val relation = UnresolvedRelation(tableIdent, alias) // Apply sampling if requested. (bucketSampleClause orElse splitSampleClause).map { @@ -882,7 +889,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C val Some(tableNameParts) :: partitionClause :: Nil = getClauses(Seq("TOK_TABNAME", "TOK_PARTSPEC"), tableArgs) - val (db, tableName) = extractDbNameTableName(tableNameParts) + val tableIdent = extractTableIdent(tableNameParts) val partitionKeys = partitionClause.map(_.getChildren.map { // Parse partitions. We also make keys case insensitive. @@ -892,7 +899,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C cleanIdentifier(key.toLowerCase) -> None }.toMap).getOrElse(Map.empty) - InsertIntoTable(UnresolvedRelation(db, tableName, None), partitionKeys, query, overwrite) + InsertIntoTable(UnresolvedRelation(tableIdent, None), partitionKeys, query, overwrite) case a: ASTNode => throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index b2149bd95a336..42f71b4f5375b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -167,7 +167,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { // Make sure any test tables referenced are loaded. val referencedTables = describedTables ++ - logical.collect { case UnresolvedRelation(databaseName, name, _) => name } + logical.collect { case UnresolvedRelation(IndexedSeq(name, databaseName), _) => name } val referencedTestTables = referencedTables.filter(testTables.contains) logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}") referencedTestTables.foreach(loadTestTable) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index fe21454e7fb38..b15c00390aec8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -53,14 +53,14 @@ case class CreateTableAsSelect( hiveContext.catalog.createTable(database, tableName, query.output, allowExisting, desc) // Get the Metastore Relation - hiveContext.catalog.lookupRelation(Some(database), tableName, None) match { + hiveContext.catalog.lookupRelation(IndexedSeq(tableName, database), None) match { case r: MetastoreRelation => r } } // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data // processing. - if (hiveContext.catalog.tableExists(Some(database), tableName)) { + if (hiveContext.catalog.tableExists(IndexedSeq(tableName, database))) { if (allowExisting) { // table already exists, will do nothing, to keep consistent with Hive } else { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 6fc4153f6a5df..799f80877b614 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -53,7 +53,7 @@ case class DropTable( val hiveContext = sqlContext.asInstanceOf[HiveContext] val ifExistsClause = if (ifExists) "IF EXISTS " else "" hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") - hiveContext.catalog.unregisterTable(None, tableName) + hiveContext.catalog.unregisterTable(IndexedSeq(tableName)) Seq.empty[Row] } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 4b6a9308b9811..d322d4f13318c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -72,7 +72,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { test("analyze MetastoreRelations") { def queryTotalSize(tableName: String): BigInt = - catalog.lookupRelation(None, tableName).statistics.sizeInBytes + catalog.lookupRelation(IndexedSeq(tableName)).statistics.sizeInBytes // Non-partitioned table sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect() @@ -123,7 +123,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { intercept[NotImplementedError] { analyze("tempTable") } - catalog.unregisterTable(None, "tempTable") + catalog.unregisterTable(IndexedSeq("tempTable")) } test("estimates the size of a test MetastoreRelation") { From 6ae77cef28c2b2fa4b1914c1902e8e5a9157d8cb Mon Sep 17 00:00:00 2001 From: Alex Liu Date: Thu, 8 Jan 2015 18:06:14 -0800 Subject: [PATCH 2/4] [SPARK-4943][SQL] fix TestHive matching error --- .../src/main/scala/org/apache/spark/sql/hive/TestHive.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index 42f71b4f5375b..1c43a5e3637df 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -167,7 +167,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { // Make sure any test tables referenced are loaded. val referencedTables = describedTables ++ - logical.collect { case UnresolvedRelation(IndexedSeq(name, databaseName), _) => name } + logical.collect { case UnresolvedRelation(IndexedSeq(name, _ *), _) => name } val referencedTestTables = referencedTables.filter(testTables.contains) logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}") referencedTestTables.foreach(loadTestTable) From 29e5e55d39af6abe49eea8ef97b0d394caa8f5ab Mon Sep 17 00:00:00 2001 From: Alex Liu Date: Thu, 8 Jan 2015 19:44:27 -0800 Subject: [PATCH 3/4] [SPARK-4943][SQL] fix failed Hive CTAS tests --- .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 98ed020473077..e1f3951edeadd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.metastore.TableType import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition} import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException} +import org.apache.hadoop.hive.ql.metadata.InvalidTableException import org.apache.hadoop.hive.ql.plan.CreateTableDesc import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException} @@ -61,7 +62,11 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val tableIdent = processTableIdentifier(tableIdentifier) val (databaseName, tblName) = (tableIdent.lift(1).getOrElse(hive.sessionState.getCurrentDatabase), tableIdent.head) - client.getTable(databaseName, tblName) != null + try { + client.getTable(databaseName, tblName) != null + } catch { + case ie: InvalidTableException => false + } } def lookupRelation( From 343ae27959bcccd20b7360c9a050eb297a181e14 Mon Sep 17 00:00:00 2001 From: Alex Liu Date: Fri, 9 Jan 2015 19:02:06 -0800 Subject: [PATCH 4/4] [SPARK-4943][SQL] refactoring according to review --- .../apache/spark/sql/catalyst/SqlParser.scala | 17 +------ .../spark/sql/catalyst/analysis/Catalog.scala | 47 ++++++++++++------- .../spark/sql/catalyst/dsl/package.scala | 2 +- .../sql/catalyst/analysis/AnalysisSuite.scala | 20 ++++---- .../analysis/DecimalPrecisionSuite.scala | 2 +- .../org/apache/spark/sql/SQLContext.scala | 6 +-- .../org/apache/spark/sql/SchemaRDDLike.scala | 2 +- .../org/apache/spark/sql/JoinSuite.scala | 4 +- .../apache/spark/sql/hive/HiveContext.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 12 +++-- .../org/apache/spark/sql/hive/HiveQl.scala | 16 ++++--- .../org/apache/spark/sql/hive/TestHive.scala | 2 +- .../hive/execution/CreateTableAsSelect.scala | 4 +- .../spark/sql/hive/execution/commands.scala | 2 +- .../spark/sql/hive/StatisticsSuite.scala | 4 +- 15 files changed, 73 insertions(+), 69 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 1283442ebf700..fc7b8745590d1 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -178,22 +178,9 @@ class SqlParser extends AbstractSparkSQLParser { joinedRelation | relationFactor protected lazy val relationFactor: Parser[LogicalPlan] = - ( - ident ~ ("." ~> ident) ~ ("." ~> ident) ~ ("." ~> ident) ~ (opt(AS) ~> opt(ident)) ^^ { - case reserveName1 ~ reserveName2 ~ dbName ~ tableName ~ alias => - UnresolvedRelation(IndexedSeq(tableName, dbName, reserveName2, reserveName1), alias) + ( rep1sep(ident, ".") ~ (opt(AS) ~> opt(ident)) ^^ { + case tableIdent ~ alias => UnresolvedRelation(tableIdent, alias) } - | ident ~ ("." ~> ident) ~ ("." ~> ident) ~ (opt(AS) ~> opt(ident)) ^^ { - case reserveName1 ~ dbName ~ tableName ~ alias => - UnresolvedRelation(IndexedSeq(tableName, dbName, reserveName1), alias) - } - | ident ~ ("." ~> ident) ~ (opt(AS) ~> opt(ident)) ^^ { - case dbName ~ tableName ~ alias => - UnresolvedRelation(IndexedSeq(tableName, dbName), alias) - } - | ident ~ (opt(AS) ~> opt(ident)) ^^ { - case tableName ~ alias => UnresolvedRelation(IndexedSeq(tableName), alias) - } | ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, s) } ) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index 9228e66b0e57e..df8d03b86c533 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -31,8 +31,8 @@ trait Catalog { def tableExists(tableIdentifier: Seq[String]): Boolean def lookupRelation( - tableIdentifier: Seq[String], - alias: Option[String] = None): LogicalPlan + tableIdentifier: Seq[String], + alias: Option[String] = None): LogicalPlan def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit @@ -40,8 +40,7 @@ trait Catalog { def unregisterAllTables(): Unit - protected def processTableIdentifier(tableIdentifier: Seq[String]): - Seq[String] = { + protected def processTableIdentifier(tableIdentifier: Seq[String]): Seq[String] = { if (!caseSensitive) { tableIdentifier.map(_.toLowerCase) } else { @@ -49,6 +48,18 @@ trait Catalog { } } + protected def getDbTableName(tableIdent: Seq[String]): String = { + val size = tableIdent.size + if (size <= 2) { + tableIdent.mkString(".") + } else { + tableIdent.slice(size - 2, size).mkString(".") + } + } + + protected def getDBTable(tableIdent: Seq[String]) : (Option[String], String) = { + (tableIdent.lift(tableIdent.size - 2), tableIdent.last) + } } class SimpleCatalog(val caseSensitive: Boolean) extends Catalog { @@ -58,12 +69,12 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog { tableIdentifier: Seq[String], plan: LogicalPlan): Unit = { val tableIdent = processTableIdentifier(tableIdentifier) - tables += ((tableIdent.mkString("."), plan)) + tables += ((getDbTableName(tableIdent), plan)) } override def unregisterTable(tableIdentifier: Seq[String]) = { val tableIdent = processTableIdentifier(tableIdentifier) - tables -= tableIdent.mkString(".") + tables -= getDbTableName(tableIdent) } override def unregisterAllTables() = { @@ -72,7 +83,7 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog { override def tableExists(tableIdentifier: Seq[String]): Boolean = { val tableIdent = processTableIdentifier(tableIdentifier) - tables.get(tableIdent.mkString(".")) match { + tables.get(getDbTableName(tableIdent)) match { case Some(_) => true case None => false } @@ -82,9 +93,9 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog { tableIdentifier: Seq[String], alias: Option[String] = None): LogicalPlan = { val tableIdent = processTableIdentifier(tableIdentifier) - val tableFullName = tableIdent.mkString(".") + val tableFullName = getDbTableName(tableIdent) val table = tables.getOrElse(tableFullName, sys.error(s"Table Not Found: $tableFullName")) - val tableWithQualifiers = Subquery(tableIdent.head, table) + val tableWithQualifiers = Subquery(tableIdent.last, table) // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are // properly qualified with this alias. @@ -101,11 +112,11 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog { trait OverrideCatalog extends Catalog { // TODO: This doesn't work when the database changes... - val overrides = new mutable.HashMap[String, LogicalPlan]() + val overrides = new mutable.HashMap[(Option[String],String), LogicalPlan]() abstract override def tableExists(tableIdentifier: Seq[String]): Boolean = { - val tableIdent = processTableIdentifier(tableIdentifier).mkString(".") - overrides.get(tableIdent) match { + val tableIdent = processTableIdentifier(tableIdentifier) + overrides.get(getDBTable(tableIdent)) match { case Some(_) => true case None => super.tableExists(tableIdentifier) } @@ -115,8 +126,8 @@ trait OverrideCatalog extends Catalog { tableIdentifier: Seq[String], alias: Option[String] = None): LogicalPlan = { val tableIdent = processTableIdentifier(tableIdentifier) - val overriddenTable = overrides.get(tableIdent.mkString(".")) - val tableWithQualifers = overriddenTable.map(r => Subquery(tableIdent.head, r)) + val overriddenTable = overrides.get(getDBTable(tableIdent)) + val tableWithQualifers = overriddenTable.map(r => Subquery(tableIdent.last, r)) // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are // properly qualified with this alias. @@ -129,13 +140,13 @@ trait OverrideCatalog extends Catalog { override def registerTable( tableIdentifier: Seq[String], plan: LogicalPlan): Unit = { - val tableIdent = processTableIdentifier(tableIdentifier).mkString(".") - overrides.put(tableIdent, plan) + val tableIdent = processTableIdentifier(tableIdentifier) + overrides.put(getDBTable(tableIdent), plan) } override def unregisterTable(tableIdentifier: Seq[String]): Unit = { - val tableIdent = processTableIdentifier(tableIdentifier).mkString(".") - overrides.remove(tableIdent) + val tableIdent = processTableIdentifier(tableIdentifier) + overrides.remove(getDBTable(tableIdent)) } override def unregisterAllTables(): Unit = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 67110837e4d5c..b2262e5e6efb6 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -290,7 +290,7 @@ package object dsl { def insertInto(tableName: String, overwrite: Boolean = false) = InsertIntoTable( - analysis.UnresolvedRelation(IndexedSeq(tableName)), Map.empty, logicalPlan, overwrite) + analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, overwrite) def analyze = analysis.SimpleAnalyzer(logicalPlan) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 19955cca2999d..f430057ef7191 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -44,8 +44,8 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter { AttributeReference("e", ShortType)()) before { - caseSensitiveCatalog.registerTable(IndexedSeq("TaBlE"), testRelation) - caseInsensitiveCatalog.registerTable(IndexedSeq("TaBlE"), testRelation) + caseSensitiveCatalog.registerTable(Seq("TaBlE"), testRelation) + caseInsensitiveCatalog.registerTable(Seq("TaBlE"), testRelation) } test("union project *") { @@ -64,45 +64,45 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter { assert( caseSensitiveAnalyze( Project(Seq(UnresolvedAttribute("TbL.a")), - UnresolvedRelation(IndexedSeq("TaBlE"), Some("TbL")))) === + UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) === Project(testRelation.output, testRelation)) val e = intercept[TreeNodeException[_]] { caseSensitiveAnalyze( Project(Seq(UnresolvedAttribute("tBl.a")), - UnresolvedRelation(IndexedSeq("TaBlE"), Some("TbL")))) + UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) } assert(e.getMessage().toLowerCase.contains("unresolved")) assert( caseInsensitiveAnalyze( Project(Seq(UnresolvedAttribute("TbL.a")), - UnresolvedRelation(IndexedSeq("TaBlE"), Some("TbL")))) === + UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) === Project(testRelation.output, testRelation)) assert( caseInsensitiveAnalyze( Project(Seq(UnresolvedAttribute("tBl.a")), - UnresolvedRelation(IndexedSeq("TaBlE"), Some("TbL")))) === + UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) === Project(testRelation.output, testRelation)) } test("resolve relations") { val e = intercept[RuntimeException] { - caseSensitiveAnalyze(UnresolvedRelation(IndexedSeq("tAbLe"), None)) + caseSensitiveAnalyze(UnresolvedRelation(Seq("tAbLe"), None)) } assert(e.getMessage == "Table Not Found: tAbLe") assert( - caseSensitiveAnalyze(UnresolvedRelation(IndexedSeq("TaBlE"), None)) === + caseSensitiveAnalyze(UnresolvedRelation(Seq("TaBlE"), None)) === testRelation) assert( - caseInsensitiveAnalyze(UnresolvedRelation(IndexedSeq("tAbLe"), None)) === + caseInsensitiveAnalyze(UnresolvedRelation(Seq("tAbLe"), None)) === testRelation) assert( - caseInsensitiveAnalyze(UnresolvedRelation(IndexedSeq("TaBlE"), None)) === + caseInsensitiveAnalyze(UnresolvedRelation(Seq("TaBlE"), None)) === testRelation) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala index 2e7b1d420f1bd..bbbeb4f2e4fe3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala @@ -41,7 +41,7 @@ class DecimalPrecisionSuite extends FunSuite with BeforeAndAfter { val f: Expression = UnresolvedAttribute("f") before { - catalog.registerTable(IndexedSeq("table"), relation) + catalog.registerTable(Seq("table"), relation) } private def checkType(expression: Expression, expectedType: DataType): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 4209a4f220b6b..9962937277dad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -276,7 +276,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * @group userf */ def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = { - catalog.registerTable(IndexedSeq(tableName), rdd.queryExecution.logical) + catalog.registerTable(Seq(tableName), rdd.queryExecution.logical) } /** @@ -289,7 +289,7 @@ class SQLContext(@transient val sparkContext: SparkContext) */ def dropTempTable(tableName: String): Unit = { tryUncacheQuery(table(tableName)) - catalog.unregisterTable(IndexedSeq(tableName)) + catalog.unregisterTable(Seq(tableName)) } /** @@ -308,7 +308,7 @@ class SQLContext(@transient val sparkContext: SparkContext) /** Returns the specified table as a SchemaRDD */ def table(tableName: String): SchemaRDD = - new SchemaRDD(this, catalog.lookupRelation(IndexedSeq(tableName))) + new SchemaRDD(this, catalog.lookupRelation(Seq(tableName))) /** * :: DeveloperApi :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala index b6d1b4e2cb02c..3cf9209465b76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala @@ -97,7 +97,7 @@ private[sql] trait SchemaRDDLike { */ @Experimental def insertInto(tableName: String, overwrite: Boolean): Unit = - sqlContext.executePlan(InsertIntoTable(UnresolvedRelation(IndexedSeq(tableName)), + sqlContext.executePlan(InsertIntoTable(UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, overwrite)).toRdd /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 77f89a3753a1d..c7e136388fce8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -302,8 +302,8 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { upperCaseData.where('N <= 4).registerTempTable("left") upperCaseData.where('N >= 3).registerTempTable("right") - val left = UnresolvedRelation(IndexedSeq("left"), None) - val right = UnresolvedRelation(IndexedSeq("right"), None) + val left = UnresolvedRelation(Seq("left"), None) + val right = UnresolvedRelation(Seq("right"), None) checkAnswer( left.join(right, FullOuter, Some("left.N".attr === "right.N".attr)), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 6ef7bd5810d36..1648fa826b900 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -124,7 +124,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { * in the Hive metastore. */ def analyze(tableName: String) { - val relation = EliminateAnalysisOperators(catalog.lookupRelation(IndexedSeq(tableName))) + val relation = EliminateAnalysisOperators(catalog.lookupRelation(Seq(tableName))) relation match { case relation: MetastoreRelation => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index e1f3951edeadd..2c859894cf8d3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -60,8 +60,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with def tableExists(tableIdentifier: Seq[String]): Boolean = { val tableIdent = processTableIdentifier(tableIdentifier) - val (databaseName, tblName) = - (tableIdent.lift(1).getOrElse(hive.sessionState.getCurrentDatabase), tableIdent.head) + val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse( + hive.sessionState.getCurrentDatabase) + val tblName = tableIdent.last try { client.getTable(databaseName, tblName) != null } catch { @@ -73,8 +74,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with tableIdentifier: Seq[String], alias: Option[String]): LogicalPlan = synchronized { val tableIdent = processTableIdentifier(tableIdentifier) - val (databaseName, tblName) = - (tableIdent.lift(1).getOrElse(hive.sessionState.getCurrentDatabase), tableIdent.head) + val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse( + hive.sessionState.getCurrentDatabase) + val tblName = tableIdent.last val table = client.getTable(databaseName, tblName) if (table.isView) { // if the unresolved relation is from hive view @@ -296,7 +298,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) // Get the CreateTableDesc from Hive SemanticAnalyzer - val desc: Option[CreateTableDesc] = if (tableExists(IndexedSeq(tblName, databaseName))) { + val desc: Option[CreateTableDesc] = if (tableExists(Seq(databaseName, tblName))) { None } else { val sa = new SemanticAnalyzer(hive.hiveconf) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 2cca0c1669a06..c2ab3579c1f95 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -386,10 +386,12 @@ private[hive] object HiveQl { (db, tableName) } - protected def extractTableIdent(tableNameParts: Node): IndexedSeq[String] = { + protected def extractTableIdent(tableNameParts: Node): Seq[String] = { tableNameParts.getChildren.map { case Token(part, Nil) => cleanIdentifier(part) } match { - case Seq(tableOnly) => IndexedSeq(tableOnly) - case Seq(databaseName, table) => IndexedSeq(table, databaseName) + case Seq(tableOnly) => Seq(tableOnly) + case Seq(databaseName, table) => Seq(databaseName, table) + case other => sys.error("Hive only supports tables names like 'tableName' " + + s"or 'databaseName.tableName', found '$other'") } } @@ -491,7 +493,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case tableName => // It is describing a table with the format like "describe table". DescribeCommand( - UnresolvedRelation(IndexedSeq(tableName.getText), None), + UnresolvedRelation(Seq(tableName.getText), None), extended.isDefined) } } @@ -766,8 +768,10 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C val tableIdent = tableNameParts.getChildren.map{ case Token(part, Nil) => cleanIdentifier(part)} match { - case Seq(tableOnly) => IndexedSeq(tableOnly) - case Seq(databaseName, table) => IndexedSeq(table, databaseName) + case Seq(tableOnly) => Seq(tableOnly) + case Seq(databaseName, table) => Seq(databaseName, table) + case other => sys.error("Hive only supports tables names like 'tableName' " + + s"or 'databaseName.tableName', found '$other'") } val alias = aliasClause.map { case Token(a, Nil) => cleanIdentifier(a) } val relation = UnresolvedRelation(tableIdent, alias) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index 1c43a5e3637df..8f2311cf83eb8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -167,7 +167,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { // Make sure any test tables referenced are loaded. val referencedTables = describedTables ++ - logical.collect { case UnresolvedRelation(IndexedSeq(name, _ *), _) => name } + logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.last } val referencedTestTables = referencedTables.filter(testTables.contains) logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}") referencedTestTables.foreach(loadTestTable) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index b15c00390aec8..a547babcebfff 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -53,14 +53,14 @@ case class CreateTableAsSelect( hiveContext.catalog.createTable(database, tableName, query.output, allowExisting, desc) // Get the Metastore Relation - hiveContext.catalog.lookupRelation(IndexedSeq(tableName, database), None) match { + hiveContext.catalog.lookupRelation(Seq(database, tableName), None) match { case r: MetastoreRelation => r } } // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data // processing. - if (hiveContext.catalog.tableExists(IndexedSeq(tableName, database))) { + if (hiveContext.catalog.tableExists(Seq(database, tableName))) { if (allowExisting) { // table already exists, will do nothing, to keep consistent with Hive } else { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 799f80877b614..6b733a280e6d5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -53,7 +53,7 @@ case class DropTable( val hiveContext = sqlContext.asInstanceOf[HiveContext] val ifExistsClause = if (ifExists) "IF EXISTS " else "" hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") - hiveContext.catalog.unregisterTable(IndexedSeq(tableName)) + hiveContext.catalog.unregisterTable(Seq(tableName)) Seq.empty[Row] } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index d322d4f13318c..a758f921e0417 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -72,7 +72,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { test("analyze MetastoreRelations") { def queryTotalSize(tableName: String): BigInt = - catalog.lookupRelation(IndexedSeq(tableName)).statistics.sizeInBytes + catalog.lookupRelation(Seq(tableName)).statistics.sizeInBytes // Non-partitioned table sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect() @@ -123,7 +123,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { intercept[NotImplementedError] { analyze("tempTable") } - catalog.unregisterTable(IndexedSeq("tempTable")) + catalog.unregisterTable(Seq("tempTable")) } test("estimates the size of a test MetastoreRelation") {