From 4ab777ecf8e0ae51835dc7e83cf299d245f151ea Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 15 Sep 2016 00:50:22 +0800 Subject: [PATCH 1/4] fix some DDL bugs about table management when same-name temp view exists --- .../sql/catalyst/catalog/SessionCatalog.scala | 23 ++++-- .../catalog/SessionCatalogSuite.scala | 24 +++--- .../apache/spark/sql/DataFrameWriter.scala | 10 ++- .../command/createDataSourceTables.scala | 18 +++-- .../spark/sql/internal/CatalogImpl.scala | 10 ++- .../org/apache/spark/sql/SQLQuerySuite.scala | 11 +++ .../spark/sql/internal/CatalogSuite.scala | 12 +++ .../sql/test/DataFrameReaderWriterSuite.scala | 76 +++++++++++++++++++ .../sql/sources/HadoopFsRelationTest.scala | 10 +-- 9 files changed, 157 insertions(+), 37 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 9fb5db573b70f..13107f3341372 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -325,6 +325,21 @@ class SessionCatalog( new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toString } + // ---------------------------------------------- + // | Methods that interact with temp views only | + // ---------------------------------------------- + + /** + * Return a temporary view exactly as it was stored. + */ + def getTempView(name: String): Option[LogicalPlan] = synchronized { + tempTables.get(formatTableName(name)) + } + + def dropTempView(name: String): Unit = synchronized { + tempTables.remove(formatTableName(name)) + } + // ------------------------------------------------------------- // | Methods that interact with temporary and metastore tables | // ------------------------------------------------------------- @@ -492,14 +507,6 @@ class SessionCatalog( tempTables.clear() } - /** - * Return a temporary table exactly as it was stored. - * For testing only. - */ - private[catalog] def getTempTable(name: String): Option[LogicalPlan] = synchronized { - tempTables.get(formatTableName(name)) - } - // ---------------------------------------------------------------------------- // Partitions // ---------------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 012df629bbdef..84b77ad250b5c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -201,16 +201,16 @@ class SessionCatalogSuite extends SparkFunSuite { val tempTable2 = Range(1, 20, 2, 10) catalog.createTempView("tbl1", tempTable1, overrideIfExists = false) catalog.createTempView("tbl2", tempTable2, overrideIfExists = false) - assert(catalog.getTempTable("tbl1") == Option(tempTable1)) - assert(catalog.getTempTable("tbl2") == Option(tempTable2)) - assert(catalog.getTempTable("tbl3").isEmpty) + assert(catalog.getTempView("tbl1") == Option(tempTable1)) + assert(catalog.getTempView("tbl2") == Option(tempTable2)) + assert(catalog.getTempView("tbl3").isEmpty) // Temporary table already exists intercept[TempTableAlreadyExistsException] { catalog.createTempView("tbl1", tempTable1, overrideIfExists = false) } // Temporary table already exists but we override it catalog.createTempView("tbl1", tempTable2, overrideIfExists = true) - assert(catalog.getTempTable("tbl1") == Option(tempTable2)) + assert(catalog.getTempView("tbl1") == Option(tempTable2)) } test("drop table") { @@ -251,11 +251,11 @@ class SessionCatalogSuite extends SparkFunSuite { val tempTable = Range(1, 10, 2, 10) sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false) sessionCatalog.setCurrentDatabase("db2") - assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable)) + assert(sessionCatalog.getTempView("tbl1") == Some(tempTable)) assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) // If database is not specified, temp table should be dropped first sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) - assert(sessionCatalog.getTempTable("tbl1") == None) + assert(sessionCatalog.getTempView("tbl1") == None) assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) // If temp table does not exist, the table in the current database should be dropped sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) @@ -265,7 +265,7 @@ class SessionCatalogSuite extends SparkFunSuite { sessionCatalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false) sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false, purge = false) - assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable)) + assert(sessionCatalog.getTempView("tbl1") == Some(tempTable)) assert(externalCatalog.listTables("db2").toSet == Set("tbl2")) } @@ -303,17 +303,17 @@ class SessionCatalogSuite extends SparkFunSuite { val tempTable = Range(1, 10, 2, 10) sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false) sessionCatalog.setCurrentDatabase("db2") - assert(sessionCatalog.getTempTable("tbl1") == Option(tempTable)) + assert(sessionCatalog.getTempView("tbl1") == Option(tempTable)) assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) // If database is not specified, temp table should be renamed first sessionCatalog.renameTable(TableIdentifier("tbl1"), "tbl3") - assert(sessionCatalog.getTempTable("tbl1").isEmpty) - assert(sessionCatalog.getTempTable("tbl3") == Option(tempTable)) + assert(sessionCatalog.getTempView("tbl1").isEmpty) + assert(sessionCatalog.getTempView("tbl3") == Option(tempTable)) assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) // If database is specified, temp tables are never renamed sessionCatalog.renameTable(TableIdentifier("tbl2", Some("db2")), "tbl4") - assert(sessionCatalog.getTempTable("tbl3") == Option(tempTable)) - assert(sessionCatalog.getTempTable("tbl4").isEmpty) + assert(sessionCatalog.getTempView("tbl3") == Option(tempTable)) + assert(sessionCatalog.getTempView("tbl4").isEmpty) assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl4")) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index c05c7a6551600..94517d2314289 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -357,8 +357,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } private def saveAsTable(tableIdent: TableIdentifier): Unit = { - - val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent) + val sessionState = df.sparkSession.sessionState + val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) + val tableIdentWithDB = tableIdent.copy(database = Some(db)) + // Pass a table identifier with database part, so that `tableExists` won't check temp views + // unexpectedly. + val tableExists = sessionState.catalog.tableExists(tableIdentWithDB) (tableExists, mode) match { case (true, SaveMode.Ignore) => @@ -384,7 +388,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { bucketSpec = getBucketSpec ) val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan)) - df.sparkSession.sessionState.executePlan(cmd).toRdd + sessionState.executePlan(cmd).toRdd } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index b1830e6cf3ea8..200b0f8c225d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -47,7 +47,11 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo assert(table.provider.isDefined) val sessionState = sparkSession.sessionState - if (sessionState.catalog.tableExists(table.identifier)) { + val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase) + val tableIdentWithDB = table.identifier.copy(database = Some(db)) + // Pass a table identifier with database part, so that `tableExists` won't check temp views + // unexpectedly. + if (sessionState.catalog.tableExists(tableIdentWithDB)) { if (ignoreIfExists) { return Seq.empty[Row] } else { @@ -131,6 +135,8 @@ case class CreateDataSourceTableAsSelectCommand( val tableName = table.identifier.unquotedString val provider = table.provider.get val sessionState = sparkSession.sessionState + val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase) + val tableIdentWithDB = table.identifier.copy(database = Some(db)) val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) { table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier)) @@ -140,7 +146,9 @@ case class CreateDataSourceTableAsSelectCommand( var createMetastoreTable = false var existingSchema = Option.empty[StructType] - if (sparkSession.sessionState.catalog.tableExists(table.identifier)) { + // Pass a table identifier with database part, so that `tableExists` won't check temp views + // unexpectedly. + if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) { // Check if we need to throw an exception or just return. mode match { case SaveMode.ErrorIfExists => @@ -165,7 +173,7 @@ case class CreateDataSourceTableAsSelectCommand( // inserting into (i.e. using the same compression). EliminateSubqueryAliases( - sessionState.catalog.lookupRelation(table.identifier)) match { + sessionState.catalog.lookupRelation(tableIdentWithDB)) match { case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) => // check if the file formats match l.relation match { @@ -188,7 +196,7 @@ case class CreateDataSourceTableAsSelectCommand( throw new AnalysisException(s"Saving data in ${o.toString} is not supported.") } case SaveMode.Overwrite => - sparkSession.sql(s"DROP TABLE IF EXISTS $tableName") + sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = false, purge = false) // Need to create the table again. createMetastoreTable = true } @@ -230,7 +238,7 @@ case class CreateDataSourceTableAsSelectCommand( } // Refresh the cache of the table in the catalog. - sessionState.catalog.refreshTable(table.identifier) + sessionState.catalog.refreshTable(tableIdentWithDB) Seq.empty[Row] } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 1f87f0e73a3ba..0a6f8dcd4ac98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table} import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, SubqueryAlias} import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.types.StructType @@ -284,8 +284,12 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * @since 2.0.0 */ override def dropTempView(viewName: String): Unit = { - sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(viewName)) - sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = true, purge = false) + val maybeTempView = sparkSession.sessionState.catalog.getTempView(viewName) + if (maybeTempView.isDefined) { + val view = SubqueryAlias(viewName, maybeTempView.get, Some(TableIdentifier(viewName))) + sparkSession.sharedState.cacheManager.uncacheQuery(Dataset.ofRows(sparkSession, view)) + sessionCatalog.dropTempView(viewName) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 3cc3b319f5a57..0ee8c959eeb4d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2667,4 +2667,15 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { }.limit(1).queryExecution.toRdd.count() assert(numRecordsRead.value === 10) } + + test("CREATE TABLE USING should not fail if a same-name temp view exists") { + withTable("same_name") { + withTempView("same_name") { + spark.range(10).createTempView("same_name") + sql("CREATE TABLE same_name(i int) USING json") + checkAnswer(spark.table("same_name"), spark.range(10).toDF()) + assert(spark.table("default.same_name").collect().isEmpty) + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index b221eed7b2426..badacff87d6e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -322,6 +322,18 @@ class CatalogSuite assert(e2.message == "Cannot create a file-based external data source table without path") } + test("dropTempView should not un-cache and drop metastore table if a same-name table exists") { + withTable("same_name") { + spark.range(10).write.saveAsTable("same_name") + sql("CACHE TABLE same_name") + assert(spark.catalog.isCached("default.same_name")) + spark.catalog.dropTempView("same_name") + assert(spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default")))) + assert(spark.catalog.isCached("default.same_name")) + } + } + + // TODO: add tests for the rest of them } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 63b0e4588e4a6..7368dad62859b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -22,6 +22,7 @@ import java.io.File import org.scalatest.BeforeAndAfter import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.util.Utils @@ -464,4 +465,79 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be checkAnswer(df, spark.createDataset(expectedResult).toDF()) assert(df.schema === expectedSchema) } + + test("saveAsTable with mode Append should not fail if the table not exists " + + "but a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + spark.range(10).createTempView("same_name") + spark.range(20).write.mode(SaveMode.Append).saveAsTable("same_name") + assert( + spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default")))) + } + } + } + + test("saveAsTable with mode Append should not fail if the table already exists " + + "and a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + sql("CREATE TABLE same_name(id LONG) USING parquet") + spark.range(10).createTempView("same_name") + spark.range(20).write.mode(SaveMode.Append).saveAsTable("same_name") + checkAnswer(spark.table("same_name"), spark.range(10).toDF()) + checkAnswer(spark.table("default.same_name"), spark.range(20).toDF()) + } + } + } + + test("saveAsTable with mode ErrorIfExists should not fail if the table not exists " + + "but a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + spark.range(10).createTempView("same_name") + spark.range(20).write.mode(SaveMode.ErrorIfExists).saveAsTable("same_name") + assert( + spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default")))) + } + } + } + + test("saveAsTable with mode Overwrite should not drop the temp view if the table not exists " + + "but a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + spark.range(10).createTempView("same_name") + spark.range(20).write.mode(SaveMode.Overwrite).saveAsTable("same_name") + assert(spark.sessionState.catalog.getTempView("same_name").isDefined) + assert( + spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default")))) + } + } + } + + test("saveAsTable with mode Overwrite should not fail if the table already exists " + + "and a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + sql("CREATE TABLE same_name(id LONG) USING parquet") + spark.range(10).createTempView("same_name") + spark.range(20).write.mode(SaveMode.Overwrite).saveAsTable("same_name") + checkAnswer(spark.table("same_name"), spark.range(10).toDF()) + checkAnswer(spark.table("default.same_name"), spark.range(20).toDF()) + } + } + } + + test("saveAsTable with mode Ignore should create the table if the table not exists " + + "but a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + spark.range(10).createTempView("same_name") + spark.range(20).write.mode(SaveMode.Ignore).saveAsTable("same_name") + assert( + spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default")))) + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 27bb9676e9abf..22f13a494cd4c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -337,9 +337,8 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } test("saveAsTable()/load() - non-partitioned table - ErrorIfExists") { - Seq.empty[(Int, String)].toDF().createOrReplaceTempView("t") - - withTempView("t") { + withTable("t") { + sql("CREATE TABLE t(i INT) USING parquet") intercept[AnalysisException] { testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).saveAsTable("t") } @@ -347,9 +346,8 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } test("saveAsTable()/load() - non-partitioned table - Ignore") { - Seq.empty[(Int, String)].toDF().createOrReplaceTempView("t") - - withTempView("t") { + withTable("t") { + sql("CREATE TABLE t(i INT) USING parquet") testDF.write.format(dataSourceName).mode(SaveMode.Ignore).saveAsTable("t") assert(spark.table("t").collect().isEmpty) } From 1c57262830cae20f25d6f51e3b8e97b9166e183b Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 16 Sep 2016 19:17:31 +0800 Subject: [PATCH 2/4] address comments --- .../sql/catalyst/catalog/SessionCatalog.scala | 31 ++++++++++--------- .../command/createDataSourceTables.scala | 6 ++-- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 13107f3341372..574c3d7eeeec9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -329,6 +329,20 @@ class SessionCatalog( // | Methods that interact with temp views only | // ---------------------------------------------- + /** + * Create a temporary table. + */ + def createTempView( + name: String, + tableDefinition: LogicalPlan, + overrideIfExists: Boolean): Unit = synchronized { + val table = formatTableName(name) + if (tempTables.contains(table) && !overrideIfExists) { + throw new TempTableAlreadyExistsException(name) + } + tempTables.put(table, tableDefinition) + } + /** * Return a temporary view exactly as it was stored. */ @@ -336,6 +350,9 @@ class SessionCatalog( tempTables.get(formatTableName(name)) } + /** + * Drop a temporary view. + */ def dropTempView(name: String): Unit = synchronized { tempTables.remove(formatTableName(name)) } @@ -344,20 +361,6 @@ class SessionCatalog( // | Methods that interact with temporary and metastore tables | // ------------------------------------------------------------- - /** - * Create a temporary table. - */ - def createTempView( - name: String, - tableDefinition: LogicalPlan, - overrideIfExists: Boolean): Unit = synchronized { - val table = formatTableName(name) - if (tempTables.contains(table) && !overrideIfExists) { - throw new TempTableAlreadyExistsException(name) - } - tempTables.put(table, tableDefinition) - } - /** * Rename a table. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 200b0f8c225d0..d8e20b09c1add 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -55,7 +55,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo if (ignoreIfExists) { return Seq.empty[Row] } else { - throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.") + throw new AnalysisException(s"Table ${tableIdentWithDB.unquotedString} already exists.") } } @@ -132,11 +132,11 @@ case class CreateDataSourceTableAsSelectCommand( assert(table.provider.isDefined) assert(table.schema.isEmpty) - val tableName = table.identifier.unquotedString val provider = table.provider.get val sessionState = sparkSession.sessionState val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase) val tableIdentWithDB = table.identifier.copy(database = Some(db)) + val tableName = tableIdentWithDB.unquotedString val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) { table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier)) @@ -196,7 +196,7 @@ case class CreateDataSourceTableAsSelectCommand( throw new AnalysisException(s"Saving data in ${o.toString} is not supported.") } case SaveMode.Overwrite => - sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = false, purge = false) + sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false) // Need to create the table again. createMetastoreTable = true } From c5513d591d5e4802a6b52ddd8034106a1f1cc739 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 16 Sep 2016 23:26:38 +0800 Subject: [PATCH 3/4] fix tests --- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 3466733d7fdcd..05283cc0ba9bb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -338,7 +338,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv }.getMessage assert( - message.contains("Table ctasJsonTable already exists."), + message.contains("Table default.ctasJsonTable already exists."), "We should complain that ctasJsonTable already exists") // The following statement should be fine if it has IF NOT EXISTS. @@ -514,7 +514,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv assert( intercept[AnalysisException] { sparkSession.catalog.createExternalTable("createdJsonTable", jsonFilePath.toString) - }.getMessage.contains("Table createdJsonTable already exists."), + }.getMessage.contains("Table default.createdJsonTable already exists."), "We should complain that createdJsonTable already exists") } @@ -906,7 +906,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val e = intercept[AnalysisException] { createDF(10, 19).write.mode(SaveMode.Append).format("orc").saveAsTable("appendOrcToParquet") } - assert(e.getMessage.contains("The file format of the existing table appendOrcToParquet " + + assert(e.getMessage.contains( + "The file format of the existing table default.appendOrcToParquet " + "is `org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat`. " + "It doesn't match the specified format `orc`")) } @@ -917,7 +918,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv createDF(10, 19).write.mode(SaveMode.Append).format("parquet") .saveAsTable("appendParquetToJson") } - assert(e.getMessage.contains("The file format of the existing table appendParquetToJson " + + assert(e.getMessage.contains( + "The file format of the existing table default.appendParquetToJson " + "is `org.apache.spark.sql.execution.datasources.json.JsonFileFormat`. " + "It doesn't match the specified format `parquet`")) } @@ -928,7 +930,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv createDF(10, 19).write.mode(SaveMode.Append).format("text") .saveAsTable("appendTextToJson") } - assert(e.getMessage.contains("The file format of the existing table appendTextToJson is " + + assert(e.getMessage.contains( + "The file format of the existing table default.appendTextToJson is " + "`org.apache.spark.sql.execution.datasources.json.JsonFileFormat`. " + "It doesn't match the specified format `text`")) } From b48e4ae67526cebe160a832f6666b25e686fdb90 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sun, 18 Sep 2016 15:30:19 +0800 Subject: [PATCH 4/4] address comments --- .../scala/org/apache/spark/sql/internal/CatalogImpl.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 0a6f8dcd4ac98..ebad5954e3c5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -284,10 +284,8 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * @since 2.0.0 */ override def dropTempView(viewName: String): Unit = { - val maybeTempView = sparkSession.sessionState.catalog.getTempView(viewName) - if (maybeTempView.isDefined) { - val view = SubqueryAlias(viewName, maybeTempView.get, Some(TableIdentifier(viewName))) - sparkSession.sharedState.cacheManager.uncacheQuery(Dataset.ofRows(sparkSession, view)) + sparkSession.sessionState.catalog.getTempView(viewName).foreach { tempView => + sparkSession.sharedState.cacheManager.uncacheQuery(Dataset.ofRows(sparkSession, tempView)) sessionCatalog.dropTempView(viewName) } }