From 52f8bcd6a600d4be5820222734e2a18fddd5e0b9 Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 24 Feb 2017 17:45:18 +0800 Subject: [PATCH 1/5] [SPARK-19724][SQL]create a managed hive table with an existed default location should throw an exception --- .../sql/execution/command/DDLSuite.scala | 50 +++++++++ .../CreateHiveTableAsSelectCommand.scala | 21 +++- .../sql/hive/execution/HiveDDLSuite.scala | 103 ++++++++++++++++++ 3 files changed, 171 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index b44f20e367f0a..3acfedff8a50c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1952,4 +1952,54 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } } + + test("CTAS for data source table with a created default location throw an exception") { + withTable("t", "t1", "t2") { + val warehousePath = spark.sharedState.warehousePath.stripPrefix("file:") + val tFile = new File(warehousePath, "t") + tFile.mkdirs() + assert(tFile.exists) + + val e = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t + |USING parquet + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + }.getMessage + assert(e.contains(s"path file:${tFile.getAbsolutePath} already exists.")) + + // partition table(table path exists) + val tFile1 = new File(warehousePath, "t1") + tFile1.mkdirs() + assert(tFile1.exists) + val e1 = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t1 + |USING parquet + |PARTITIONED BY(a, b) + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + }.getMessage + assert(e1.contains(s"path file:${tFile1.getAbsolutePath} already exists.")) + + // partition table(partition path exists) + val tFile2 = new File(warehousePath, "t2") + val tPartFile = new File(tFile2, "a=3/b=4") + tPartFile.mkdirs() + assert(tPartFile.exists) + val e2 = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t2 + |USING parquet + |PARTITIONED BY(a, b) + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + }.getMessage + assert(e2.contains(s"path file:${tFile2.getAbsolutePath} already exists.")) + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 41c6b18e9d794..0234f9977a508 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -19,9 +19,11 @@ package org.apache.spark.sql.hive.execution import scala.util.control.NonFatal +import org.apache.hadoop.fs.Path + import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.command.RunnableCommand @@ -68,9 +70,22 @@ case class CreateHiveTableAsSelectCommand( // add the relation into catalog, just in case of failure occurs while data // processing. assert(tableDesc.schema.isEmpty) - sparkSession.sessionState.catalog.createTable( - tableDesc.copy(schema = query.schema), ignoreIfExists = false) + // As discussed in SPARK-19583, in CTAS the default location of a managed + // table should not exists + if (mode == SaveMode.ErrorIfExists && tableDesc.tableType == CatalogTableType.MANAGED) { + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val tblLocationPath = + new Path(sparkSession.sessionState.catalog.defaultTablePath(tableIdentifier)) + val fs = tblLocationPath.getFileSystem(hadoopConf) + if (fs.exists(tblLocationPath)) { + throw new AnalysisException(s"the location('$tblLocationPath') of table" + + s"('$tableIdentifier') already exists.") + } + } + + sparkSession.sessionState.catalog.createTable( + tableDesc.copy(schema = query.schema), ignoreIfExists = false) try { sparkSession.sessionState.executePlan( InsertIntoTable( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 792ac1e259494..569b4683d3d49 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1587,4 +1587,107 @@ class HiveDDLSuite } } } + + test("CTAS for data source table with a created default location throw an exception") { + withTable("t", "t1", "t2") { + val warehousePath = spark.sharedState.warehousePath + val tFile = new File(warehousePath, "t") + tFile.mkdirs() + assert(tFile.exists) + + val e = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t + |USING parquet + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + }.getMessage + assert(e.contains(s"path file:${tFile.getAbsolutePath} already exists.")) + + // partition table(table path exists) + val tFile1 = new File(warehousePath, "t1") + tFile1.mkdirs() + assert(tFile1.exists) + val e1 = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t1 + |USING parquet + |PARTITIONED BY(a, b) + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + }.getMessage + assert(e1.contains(s"path file:${tFile1.getAbsolutePath} already exists.")) + + // partition table(partition path exists) + val tFile2 = new File(warehousePath, "t2") + val tPartFile = new File(tFile2, "a=3/b=4") + tPartFile.mkdirs() + assert(tPartFile.exists) + val e2 = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t2 + |USING parquet + |PARTITIONED BY(a, b) + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + }.getMessage + assert(e2.contains(s"path file:${tFile2.getAbsolutePath} already exists.")) + } + } + + test("CTAS for hive table with a created default location throw an exception") { + withTable("t", "t1", "t2") { + val warehousePath = spark.sharedState.warehousePath + val tFile = new File(warehousePath, "t") + tFile.mkdirs() + assert(tFile.exists) + + val e = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t + |USING hive + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + }.getMessage + assert(e.contains(s"the location('file:${tFile.getAbsolutePath}') of table" + + s"('`default`.`t`') already exists.")) + + // partition table(table path exists) + val tFile1 = new File(warehousePath, "t1") + tFile1.mkdirs() + assert(tFile1.exists) + val e1 = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t1 + |USING hive + |PARTITIONED BY(a, b) + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + }.getMessage + assert(e1.contains(s"the location('file:${tFile1.getAbsolutePath}') of table" + + s"('`default`.`t1`') already exists.")) + + // partition table(partition path exists) + val tFile2 = new File(warehousePath, "t2") + val tPartFile = new File(tFile2, "a=3/b=4") + tPartFile.mkdirs() + assert(tPartFile.exists) + val e2 = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t2 + |USING hive + |PARTITIONED BY(a, b) + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + }.getMessage + assert(e1.contains(s"the location('file:${tFile1.getAbsolutePath}') of table" + + s"('`default`.`t1`') already exists.")) + } + } } From 81cf177dba3695ef574e44acd782ab26e15d82d5 Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 24 Feb 2017 17:48:39 +0800 Subject: [PATCH 2/5] modify the test case name --- .../org/apache/spark/sql/execution/command/DDLSuite.scala | 2 +- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 3acfedff8a50c..30a096006b7e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1953,7 +1953,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } - test("CTAS for data source table with a created default location throw an exception") { + test("CTAS for managed data source table with a created default location throw an exception") { withTable("t", "t1", "t2") { val warehousePath = spark.sharedState.warehousePath.stripPrefix("file:") val tFile = new File(warehousePath, "t") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 569b4683d3d49..46bc718ca1f0c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1588,7 +1588,7 @@ class HiveDDLSuite } } - test("CTAS for data source table with a created default location throw an exception") { + test("CTAS for managed data source table with a created default location throw an exception") { withTable("t", "t1", "t2") { val warehousePath = spark.sharedState.warehousePath val tFile = new File(warehousePath, "t") @@ -1638,7 +1638,7 @@ class HiveDDLSuite } } - test("CTAS for hive table with a created default location throw an exception") { + test("CTAS for managed hive table with a created default location throw an exception") { withTable("t", "t1", "t2") { val warehousePath = spark.sharedState.warehousePath val tFile = new File(warehousePath, "t") From 7e9d34193d459d3d85b2196d24171c20cd57c13e Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 24 Feb 2017 20:21:48 +0800 Subject: [PATCH 3/5] modify some logic --- .../catalyst/catalog/InMemoryCatalog.scala | 6 +- .../sql/catalyst/catalog/SessionCatalog.scala | 22 +++++- .../command/createDataSourceTables.scala | 3 +- .../spark/sql/hive/HiveExternalCatalog.scala | 4 -- .../CreateHiveTableAsSelectCommand.scala | 15 +--- .../sql/hive/execution/HiveDDLSuite.scala | 72 ++++++++++++++++++- 6 files changed, 94 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 15aed5f9b1bdf..bc0d7f434be48 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -186,11 +186,7 @@ class InMemoryCatalog( val db = tableDefinition.identifier.database.get requireDbExists(db) val table = tableDefinition.identifier.table - if (tableExists(db, table)) { - if (!ignoreIfExists) { - throw new TableAlreadyExistsException(db = db, table = table) - } - } else { + if (!tableExists(db, table)) { // Set the default table location if this is a managed table and its location is not // specified. // Ideally we should not create a managed table with location, but Hive serde table can diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 73ef0e6a1869e..21d721f6c04a4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -250,12 +250,32 @@ class SessionCatalog( * Create a metastore table in the database specified in `tableDefinition`. * If no such database is specified, create it in the current database. */ - def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { + def createTable( + tableDefinition: CatalogTable, + ignoreIfExists: Boolean, + suggestIgnoreIfPathExists: Boolean = false): Unit = { val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableDefinition.identifier.table) validateName(table) val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db))) requireDbExists(db) + + if (!ignoreIfExists) { + if (tableExists(newTableDefinition.identifier)) { + throw new TableAlreadyExistsException(db = db, table = table) + } + // As discussed in SPARK-19583, the default location of a managed table should not exists + if (!suggestIgnoreIfPathExists && tableDefinition.tableType == CatalogTableType.MANAGED) { + val tblLocationPath = + new Path(defaultTablePath(tableDefinition.identifier)) + val fs = tblLocationPath.getFileSystem(hadoopConf) + if (fs.exists(tblLocationPath)) { + throw new AnalysisException(s"the location('$tblLocationPath') of table" + + s"('${newTableDefinition.identifier}') already exists.") + } + } + } + externalCatalog.createTable(newTableDefinition, ignoreIfExists) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 5abd579476504..01b956973fed2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -158,7 +158,8 @@ case class CreateDataSourceTableAsSelectCommand( // the schema of df). It is important since the nullability may be changed by the relation // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). schema = result.schema) - sessionState.catalog.createTable(newTable, ignoreIfExists = false) + sessionState.catalog.createTable(newTable, ignoreIfExists = false, + suggestIgnoreIfPathExists = true) result match { case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty && diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index ea48256147857..3cb9080bd9cec 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -195,10 +195,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat requireDbExists(db) verifyTableProperties(tableDefinition) - if (tableExists(db, table) && !ignoreIfExists) { - throw new TableAlreadyExistsException(db = db, table = table) - } - if (tableDefinition.tableType == VIEW) { client.createTable(tableDefinition, ignoreIfExists) } else { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 0234f9977a508..9311fee327869 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -71,21 +71,8 @@ case class CreateHiveTableAsSelectCommand( // processing. assert(tableDesc.schema.isEmpty) - // As discussed in SPARK-19583, in CTAS the default location of a managed - // table should not exists - if (mode == SaveMode.ErrorIfExists && tableDesc.tableType == CatalogTableType.MANAGED) { - val hadoopConf = sparkSession.sessionState.newHadoopConf() - val tblLocationPath = - new Path(sparkSession.sessionState.catalog.defaultTablePath(tableIdentifier)) - val fs = tblLocationPath.getFileSystem(hadoopConf) - if (fs.exists(tblLocationPath)) { - throw new AnalysisException(s"the location('$tblLocationPath') of table" + - s"('$tableIdentifier') already exists.") - } - } - sparkSession.sessionState.catalog.createTable( - tableDesc.copy(schema = query.schema), ignoreIfExists = false) + tableDesc.copy(schema = query.schema), ignoreIfExists = false) try { sparkSession.sessionState.executePlan( InsertIntoTable( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 46bc718ca1f0c..0ade2cec33893 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1289,7 +1289,7 @@ class HiveDDLSuite import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_PREFIX import org.apache.spark.sql.hive.HiveExternalCatalog.STATISTICS_PREFIX - withTable("tbl") { + withTable("tbl", "tbl1") { sql("CREATE TABLE tbl(a INT) STORED AS parquet") Seq(DATASOURCE_PREFIX, STATISTICS_PREFIX).foreach { forbiddenPrefix => @@ -1304,7 +1304,7 @@ class HiveDDLSuite assert(e2.getMessage.contains(forbiddenPrefix + "foo")) val e3 = intercept[AnalysisException] { - sql(s"CREATE TABLE tbl (a INT) TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')") + sql(s"CREATE TABLE tbl1 (a INT) TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')") } assert(e3.getMessage.contains(forbiddenPrefix + "foo")) } @@ -1588,7 +1588,7 @@ class HiveDDLSuite } } - test("CTAS for managed data source table with a created default location throw an exception") { + test("CTAS for managed datasource table with a created default location throw an exception") { withTable("t", "t1", "t2") { val warehousePath = spark.sharedState.warehousePath val tFile = new File(warehousePath, "t") @@ -1690,4 +1690,70 @@ class HiveDDLSuite s"('`default`.`t1`') already exists.")) } } + + test("create table for managed datasource table with a created location throw an exception") { + withTable("t", "t1") { + val warehousePath = spark.sharedState.warehousePath + val tFile = new File(warehousePath, "t") + tFile.mkdirs() + assert(tFile.exists) + + val e = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t(a string, b string) + |USING parquet + """.stripMargin) + }.getMessage + assert(e.contains(s"the location('file:${tFile.getAbsolutePath}') of table" + + s"('`default`.`t`') already exists.")) + // partition table(table path exists) + val tFile1 = new File(warehousePath, "t1") + tFile1.mkdirs() + assert(tFile1.exists) + val e1 = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t1(a string, b string) + |USING parquet + |PARTITIONED BY(a) + """.stripMargin) + }.getMessage + assert(e1.contains(s"the location('file:${tFile1.getAbsolutePath}') of table" + + s"('`default`.`t1`') already exists.")) + } + } + + test("create table for managed hive table with a created location throw an exception") { + withTable("t", "t1") { + val warehousePath = spark.sharedState.warehousePath + val tFile = new File(warehousePath, "t") + tFile.mkdirs() + assert(tFile.exists) + + val e = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t(a string, b string) + |USING hive + """.stripMargin) + }.getMessage + assert(e.contains(s"the location('file:${tFile.getAbsolutePath}') of table" + + s"('`default`.`t`') already exists.")) + // partition table(table path exists) + val tFile1 = new File(warehousePath, "t1") + tFile1.mkdirs() + assert(tFile1.exists) + val e1 = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t1(a string, b string) + |USING hive + |PARTITIONED BY(a) + """.stripMargin) + }.getMessage + assert(e1.contains(s"the location('file:${tFile1.getAbsolutePath}') of table" + + s"('`default`.`t1`') already exists.")) + } + } } From b4c55d6f39e88b6a8959813eede7e1711d53a2f7 Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 24 Feb 2017 20:49:58 +0800 Subject: [PATCH 4/5] modify some comment --- .../apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 4 ++++ .../sql/execution/command/createDataSourceTables.scala | 2 ++ .../sql/hive/execution/CreateHiveTableAsSelectCommand.scala | 6 ++---- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 21d721f6c04a4..317448002822f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -249,6 +249,10 @@ class SessionCatalog( /** * Create a metastore table in the database specified in `tableDefinition`. * If no such database is specified, create it in the current database. + * suggestIgnoreIfPathExists suggest whether should check if the path of table + * exists when ignoreIfExists is false, if it is false, it will only check the + * path of a managed table, if it is true, it will not do the check for all + * type tables. */ def createTable( tableDefinition: CatalogTable, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 01b956973fed2..36ff5e388824e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -158,6 +158,8 @@ case class CreateDataSourceTableAsSelectCommand( // the schema of df). It is important since the nullability may be changed by the relation // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). schema = result.schema) + // the path of the table has been created above before create table, we should not + // check if the path existes, so suggestIgnoreIfPathExists set to true . sessionState.catalog.createTable(newTable, ignoreIfExists = false, suggestIgnoreIfPathExists = true) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 9311fee327869..41c6b18e9d794 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -19,11 +19,9 @@ package org.apache.spark.sql.hive.execution import scala.util.control.NonFatal -import org.apache.hadoop.fs.Path - import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.command.RunnableCommand @@ -70,9 +68,9 @@ case class CreateHiveTableAsSelectCommand( // add the relation into catalog, just in case of failure occurs while data // processing. assert(tableDesc.schema.isEmpty) - sparkSession.sessionState.catalog.createTable( tableDesc.copy(schema = query.schema), ignoreIfExists = false) + try { sparkSession.sessionState.executePlan( InsertIntoTable( From a764624afacc44a07ee12a75a95323bb7a882816 Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 24 Feb 2017 22:26:56 +0800 Subject: [PATCH 5/5] fix test failed --- .../sql/catalyst/catalog/ExternalCatalogSuite.scala | 9 --------- 1 file changed, 9 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index a5d399a065589..484bcfe492f23 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -162,15 +162,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(actual.tableType === CatalogTableType.EXTERNAL) } - test("create table when the table already exists") { - val catalog = newBasicCatalog() - assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) - val table = newTable("tbl1", "db2") - intercept[TableAlreadyExistsException] { - catalog.createTable(table, ignoreIfExists = false) - } - } - test("drop table") { val catalog = newBasicCatalog() assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))