From ddfa0e8f7cae0109e5e97bbb6e33d8ac82a3bb59 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 1 Dec 2020 01:06:04 +0800 Subject: [PATCH 1/4] Use default datasource as provider for CREATE TABLE command --- docs/sql-migration-guide.md | 2 ++ .../apache/spark/sql/internal/SQLConf.scala | 9 +++++++ .../analysis/ResolveSessionCatalog.scala | 20 ++++++-------- .../sql/connector/DataSourceV2SQLSuite.scala | 3 +-- .../command/PlanResolutionSuite.scala | 14 +++++----- .../execution/HiveCompatibilitySuite.scala | 4 +++ .../sql/hive/HiveShowCreateTableSuite.scala | 18 ++++++++++++- .../apache/spark/sql/hive/InsertSuite.scala | 3 ++- .../spark/sql/hive/QueryPartitionSuite.scala | 5 ++-- .../spark/sql/hive/StatisticsSuite.scala | 27 +++++++++++++------ .../spark/sql/hive/client/VersionsSuite.scala | 1 + .../sql/hive/execution/HiveDDLSuite.scala | 2 +- .../sql/hive/execution/HiveSerDeSuite.scala | 5 ++-- .../hive/execution/HiveTableScanSuite.scala | 5 +++- .../sql/hive/execution/SQLQuerySuite.scala | 1 + .../apache/spark/sql/hive/test/TestHive.scala | 13 ++++----- 16 files changed, 88 insertions(+), 44 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 2c86e7a932637..0122978ca547e 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -54,6 +54,8 @@ license: | - In Spark 3.1, creating or altering a view will capture runtime SQL configs and store them as view properties. These configs will be applied during the parsing and analysis phases of the view resolution. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.useCurrentConfigsForView` to `true`. + - In Spark 3.1, `CREATE TABLE` without a specific table provider uses the value of `spark.sql.sources.default` as its table provider. In Spark version 3.0 and below, it was Hive. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.createHiveTableByDefault.enabled` to `true`. + ## Upgrading from Spark SQL 3.0 to 3.0.1 - In Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Since version 3.0.1, the timestamp type inference is disabled by default. Set the JSON option `inferTimestamp` to `true` to enable such type inference. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b2c28ffa984a9..0547e9a1497a5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2922,6 +2922,15 @@ object SQLConf { .stringConf .createWithDefault("") + val LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT = + buildConf("spark.sql.legacy.createHiveTableByDefault") + .internal() + .doc("When set to true, CREATE TABLE syntax without a table provider will use hive " + + s"instead of the value of ${DEFAULT_DATA_SOURCE_NAME.key} as the table provider.") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + /** * Holds information about keys that have been deprecated. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index f6005f4b413a2..cce280b4241e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource} import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 -import org.apache.spark.sql.internal.HiveSerDe +import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType} /** @@ -636,9 +636,11 @@ class ResolveSessionCatalog( (storageFormat, DDLUtils.HIVE_PROVIDER) } else { // If neither USING nor STORED AS/ROW FORMAT is specified, we create native data source - // tables if it's a CTAS and `conf.convertCTAS` is true. - // TODO: create native data source table by default for non-CTAS. - if (ctas && conf.convertCTAS) { + // tables if: + // 1. `LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT` is false, or + // 2. It's a CTAS and `conf.convertCTAS` is true. + val createHiveTableByDefault = conf.getConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT) + if (!createHiveTableByDefault || (ctas && conf.convertCTAS)) { (nonHiveStorageFormat, conf.defaultDataSourceName) } else { (defaultHiveStorage, DDLUtils.HIVE_PROVIDER) @@ -657,14 +659,8 @@ class ResolveSessionCatalog( comment: Option[String], storageFormat: CatalogStorageFormat, external: Boolean): CatalogTable = { - if (external) { - if (DDLUtils.isHiveTable(Some(provider))) { - if (location.isEmpty) { - throw new AnalysisException(s"CREATE EXTERNAL TABLE must be accompanied by LOCATION") - } - } else { - throw new AnalysisException(s"Operation not allowed: CREATE EXTERNAL TABLE ... USING") - } + if (external && location.isEmpty) { + throw new AnalysisException(s"CREATE EXTERNAL TABLE must be accompanied by LOCATION") } val tableType = if (location.isDefined) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 583bc694dc3be..4a7dd7e5ba896 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -266,8 +266,7 @@ class DataSourceV2SQLSuite checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty) } - // TODO: ignored by SPARK-31707, restore the test after create table syntax unification - ignore("CreateTable: without USING clause") { + test("CreateTable: without USING clause") { // unset this config to use the default v2 session catalog. spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) val testCatalog = catalog("testcat").asTableCatalog diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 20cad721d3d0e..92aa14bb3b440 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -1588,7 +1588,7 @@ class PlanResolutionSuite extends AnalysisTest { .add("b", StringType) ) ) - compare("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) " + + compare("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) STORED AS textfile " + "PARTITIONED BY (c INT, d STRING COMMENT 'test2')", createTable( table = "my_tab", @@ -1616,7 +1616,7 @@ class PlanResolutionSuite extends AnalysisTest { ) // Partitioned by a StructType should be accepted by `SparkSqlParser` but will fail an analyze // rule in `AnalyzeCreateTable`. - compare("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) " + + compare("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) STORED AS textfile " + "PARTITIONED BY (nested STRUCT)", createTable( table = "my_tab", @@ -1890,7 +1890,7 @@ class PlanResolutionSuite extends AnalysisTest { } test("Test CTAS #3") { - val s3 = """CREATE TABLE page_view AS SELECT * FROM src""" + val s3 = """CREATE TABLE page_view STORED AS textfile AS SELECT * FROM src""" val (desc, exists) = extractTableDesc(s3) assert(exists == false) assert(desc.identifier.database == Some("default")) @@ -1967,11 +1967,9 @@ class PlanResolutionSuite extends AnalysisTest { assert(desc.viewText.isEmpty) assert(desc.viewQueryColumnNames.isEmpty) assert(desc.storage.locationUri.isEmpty) - assert(desc.storage.inputFormat == - Some("org.apache.hadoop.mapred.TextInputFormat")) - assert(desc.storage.outputFormat == - Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) - assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + assert(desc.storage.inputFormat.isEmpty) + assert(desc.storage.outputFormat.isEmpty) + assert(desc.storage.serde.isEmpty) assert(desc.storage.properties.isEmpty) assert(desc.properties.isEmpty) assert(desc.comment.isEmpty) diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 462206d8c546f..4ce1964a19bd9 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -40,6 +40,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled private val originalSessionLocalTimeZone = TestHive.conf.sessionLocalTimeZone + private val originalCreateHiveTable = + TestHive.conf.getConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT) def testCases: Seq[(String, File)] = { hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f) @@ -59,6 +61,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // Fix session local timezone to America/Los_Angeles for those timezone sensitive tests // (timestamp_*) TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, "America/Los_Angeles") + TestHive.setConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT, true) RuleExecutor.resetMetrics() } @@ -69,6 +72,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, originalCrossJoinEnabled) TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, originalSessionLocalTimeZone) + TestHive.setConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT, originalCreateHiveTable) // For debugging dump some statistics about how much time was spent in various optimizer rules logWarning(RuleExecutor.dumpTimeSpent()) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveShowCreateTableSuite.scala index 3e7c3e6799724..2fb67c793dc6a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveShowCreateTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveShowCreateTableSuite.scala @@ -21,10 +21,26 @@ import org.apache.spark.sql.{AnalysisException, ShowCreateTableSuite} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.internal.HiveSerDe +import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} class HiveShowCreateTableSuite extends ShowCreateTableSuite with TestHiveSingleton { + private var origCreateHiveTableConfig = false + + protected override def beforeAll(): Unit = { + super.beforeAll() + origCreateHiveTableConfig = + spark.conf.get(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT) + spark.conf.set(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key, true) + } + + protected override def afterAll(): Unit = { + spark.conf.set( + SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key, + origCreateHiveTableConfig) + super.afterAll() + } + test("view") { Seq(true, false).foreach { serde => withView("v1") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index ebc6cfb77d355..71750e6b3a516 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -277,7 +277,8 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter test("Test partition mode = strict") { withSQLConf(("hive.exec.dynamic.partition.mode", "strict")) { withTable("partitioned") { - sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)") + sql("CREATE TABLE partitioned (id bigint, data string) USING hive " + + "PARTITIONED BY (part string)") val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")) .toDF("id", "data", "part") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala index 483622b16762a..cec6ec1ee1275 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -38,7 +38,7 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl testData.createOrReplaceTempView("testData") // create the table for test - sql(s"CREATE TABLE table_with_partition(key int,value string) " + + sql(s"CREATE TABLE table_with_partition(key int,value string) USING hive " + s"PARTITIONED by (ds string) location '${tmpDir.toURI}' ") sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') " + "SELECT key,value FROM testData") @@ -81,7 +81,8 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl test("SPARK-21739: Cast expression should initialize timezoneId") { withTable("table_with_timestamp_partition") { - sql("CREATE TABLE table_with_timestamp_partition(value int) PARTITIONED BY (ts TIMESTAMP)") + sql("CREATE TABLE table_with_timestamp_partition(value int) USING hive " + + "PARTITIONED BY (ts TIMESTAMP)") sql("INSERT OVERWRITE TABLE table_with_timestamp_partition " + "PARTITION (ts = '2010-01-01 00:00:00.000') VALUES (1)") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 43d1ba04c561d..2ea98943011f4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -165,7 +165,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto // Partitioned table val partTable = "part_table" withTable(partTable) { - sql(s"CREATE TABLE $partTable (key STRING, value STRING) PARTITIONED BY (ds STRING)") + sql(s"CREATE TABLE $partTable (key STRING, value STRING) USING hive " + + "PARTITIONED BY (ds STRING)") sql(s"INSERT INTO TABLE $partTable PARTITION (ds='2010-01-01') SELECT * FROM src") sql(s"INSERT INTO TABLE $partTable PARTITION (ds='2010-01-02') SELECT * FROM src") sql(s"INSERT INTO TABLE $partTable PARTITION (ds='2010-01-03') SELECT * FROM src") @@ -191,7 +192,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto SQLConf.PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION.key -> "True") { val checkSizeTable = "checkSizeTable" withTable(checkSizeTable) { - sql(s"CREATE TABLE $checkSizeTable (key STRING, value STRING) PARTITIONED BY (ds STRING)") + sql(s"CREATE TABLE $checkSizeTable (key STRING, value STRING) USING hive " + + "PARTITIONED BY (ds STRING)") sql(s"INSERT INTO TABLE $checkSizeTable PARTITION (ds='2010-01-01') SELECT * FROM src") sql(s"INSERT INTO TABLE $checkSizeTable PARTITION (ds='2010-01-02') SELECT * FROM src") sql(s"INSERT INTO TABLE $checkSizeTable PARTITION (ds='2010-01-03') SELECT * FROM src") @@ -274,7 +276,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto test("SPARK-22745 - read Hive's statistics for partition") { val tableName = "hive_stats_part_table" withTable(tableName) { - sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)") + sql(s"CREATE TABLE $tableName (key STRING, value STRING) USING hive " + + "PARTITIONED BY (ds STRING)") sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2017-01-01') SELECT * FROM src") var partition = spark.sessionState.catalog .getPartition(TableIdentifier(tableName), Map("ds" -> "2017-01-01")) @@ -296,7 +299,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto val tableName = "analyzeTable_part" withTable(tableName) { withTempPath { path => - sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)") + sql(s"CREATE TABLE $tableName (key STRING, value STRING) USING hive " + + "PARTITIONED BY (ds STRING)") val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03") partitionDates.foreach { ds => @@ -321,6 +325,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto sql( s""" |CREATE TABLE $sourceTableName (key STRING, value STRING) + |USING hive |PARTITIONED BY (ds STRING) |LOCATION '${path.toURI}' """.stripMargin) @@ -338,6 +343,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto sql( s""" |CREATE TABLE $tableName (key STRING, value STRING) + |USING hive |PARTITIONED BY (ds STRING) |LOCATION '${path.toURI}' """.stripMargin) @@ -371,7 +377,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } withTable(tableName) { - sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)") + sql(s"CREATE TABLE $tableName (key STRING, value STRING) USING hive " + + "PARTITIONED BY (ds STRING)") createPartition("2010-01-01", "SELECT '1', 'A' from src") createPartition("2010-01-02", "SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src") @@ -424,7 +431,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } withTable(tableName) { - sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING, hr INT)") + sql(s"CREATE TABLE $tableName (key STRING, value STRING) USING hive " + + "PARTITIONED BY (ds STRING, hr INT)") createPartition("2010-01-01", 10, "SELECT '1', 'A' from src") createPartition("2010-01-01", 11, "SELECT '1', 'A' from src") @@ -472,7 +480,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } withTable(tableName) { - sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING, hr INT)") + sql(s"CREATE TABLE $tableName (key STRING, value STRING) USING hive " + + "PARTITIONED BY (ds STRING, hr INT)") createPartition("2010-01-01", 10, "SELECT '1', 'A' from src") createPartition("2010-01-01", 11, "SELECT '1', 'A' from src") @@ -961,7 +970,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto Seq(false, true).foreach { autoUpdate => withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> autoUpdate.toString) { withTable(table) { - sql(s"CREATE TABLE $table (i INT, j STRING) PARTITIONED BY (ds STRING, hr STRING)") + sql(s"CREATE TABLE $table (i INT, j STRING) USING hive " + + "PARTITIONED BY (ds STRING, hr STRING)") // table has two partitions initially for (ds <- Seq("2008-04-08"); hr <- Seq("11", "12")) { sql(s"INSERT OVERWRITE TABLE $table PARTITION (ds='$ds',hr='$hr') SELECT 1, 'a'") @@ -1034,6 +1044,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto sql( s""" |CREATE TABLE $managedTable (key INT, value STRING) + |USING hive |PARTITIONED BY (ds STRING, hr STRING) """.stripMargin) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index d9ba6dd80e4ef..684529aa330a7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -798,6 +798,7 @@ class VersionsSuite extends SparkFunSuite with Logging { versionSpark.sql( """ |CREATE TABLE tbl(c1 string) + |USING hive |PARTITIONED BY (ds STRING) """.stripMargin) versionSpark.sql("INSERT OVERWRITE TABLE tbl partition (ds='2') SELECT '1'") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 2dfb8bb552594..ce31e39985971 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -983,7 +983,7 @@ class HiveDDLSuite } test("alter table partition - storage information") { - sql("CREATE TABLE boxes (height INT, length INT) PARTITIONED BY (width INT)") + sql("CREATE TABLE boxes (height INT, length INT) STORED AS textfile PARTITIONED BY (width INT)") sql("INSERT OVERWRITE TABLE boxes PARTITION (width=4) SELECT 4, 4") val catalog = spark.sessionState.catalog val expectedSerde = "com.sparkbricks.serde.ColumnarSerDe" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala index f723c9f80c2ab..d7129bcb37e69 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala @@ -88,7 +88,7 @@ class HiveSerDeSuite extends HiveComparisonTest with PlanTest with BeforeAndAfte test("Test the default fileformat for Hive-serde tables") { withSQLConf("hive.default.fileformat" -> "orc") { val (desc, exists) = extractTableDesc( - "CREATE TABLE IF NOT EXISTS fileformat_test (id int)") + "CREATE TABLE IF NOT EXISTS fileformat_test (id int) USING hive") assert(exists) assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")) assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")) @@ -96,7 +96,8 @@ class HiveSerDeSuite extends HiveComparisonTest with PlanTest with BeforeAndAfte } withSQLConf("hive.default.fileformat" -> "parquet") { - val (desc, exists) = extractTableDesc("CREATE TABLE IF NOT EXISTS fileformat_test (id int)") + val (desc, exists) = extractTableDesc( + "CREATE TABLE IF NOT EXISTS fileformat_test (id int) USING hive") assert(exists) val input = desc.storage.inputFormat val output = desc.storage.outputFormat diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala index 4a50621d89d4e..5b43f82f253ea 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala @@ -113,6 +113,7 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH sql( s""" |CREATE TABLE $table(id string) + |USING hive |PARTITIONED BY (p1 string,p2 string,p3 string,p4 string,p5 string) """.stripMargin) sql( @@ -157,6 +158,7 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH sql( s""" |CREATE TABLE $table(id string) + |USING hive |PARTITIONED BY (p1 string,p2 string,p3 string,p4 string,p5 string) """.stripMargin) sql( @@ -182,6 +184,7 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH sql( s""" |CREATE TABLE $table (id int) + |USING hive |PARTITIONED BY (a int, b int) """.stripMargin) val scan1 = getHiveTableScanExec(s"SELECT * FROM $table WHERE a = 1 AND b = 2") @@ -252,7 +255,7 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH test("SPARK-32069: Improve error message on reading unexpected directory") { withTable("t") { withTempDir { f => - sql(s"CREATE TABLE t(i LONG) LOCATION '${f.getAbsolutePath}'") + sql(s"CREATE TABLE t(i LONG) USING hive LOCATION '${f.getAbsolutePath}'") sql("INSERT INTO t VALUES(1)") val dir = new File(f.getCanonicalPath + "/data") dir.mkdir() diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 79b3c3efe531c..6b82b1267bc66 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2026,6 +2026,7 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi sql( """ |CREATE TABLE part_table (c STRING) + |STORED AS textfile |PARTITIONED BY (d STRING) """.stripMargin) sql(s"LOAD DATA LOCAL INPATH '$path/part-r-000011' " + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala index a25c61c96f3d8..e996f2c6ec78f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -327,20 +327,22 @@ private[hive] class TestHiveSparkSession( } if (loadTestTables) { + def createTableSQL(tblName: String): String = { + s"CREATE TABLE $tblName (key INT, value STRING) STORED AS textfile" + } // The test tables that are defined in the Hive QTestUtil. // /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java // https://github.com/apache/hive/blob/branch-0.13/data/scripts/q_test_init.sql @transient val hiveQTestUtilTables: Seq[TestTable] = Seq( TestTable("src", - "CREATE TABLE src (key INT, value STRING) STORED AS TEXTFILE".cmd, + createTableSQL("src").cmd, s"LOAD DATA LOCAL INPATH '${quoteHiveFile("data/files/kv1.txt")}' INTO TABLE src".cmd), TestTable("src1", - "CREATE TABLE src1 (key INT, value STRING) STORED AS TEXTFILE".cmd, + createTableSQL("src1").cmd, s"LOAD DATA LOCAL INPATH '${quoteHiveFile("data/files/kv3.txt")}' INTO TABLE src1".cmd), TestTable("srcpart", () => { - "CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)" - .cmd.apply() + s"${createTableSQL("srcpart")} PARTITIONED BY (ds STRING, hr STRING)".cmd.apply() for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) { s""" |LOAD DATA LOCAL INPATH '${quoteHiveFile("data/files/kv1.txt")}' @@ -349,8 +351,7 @@ private[hive] class TestHiveSparkSession( } }), TestTable("srcpart1", () => { - "CREATE TABLE srcpart1 (key INT, value STRING) PARTITIONED BY (ds STRING, hr INT)" - .cmd.apply() + s"${createTableSQL("srcpart1")} PARTITIONED BY (ds STRING, hr INT)".cmd.apply() for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- 11 to 12) { s""" |LOAD DATA LOCAL INPATH '${quoteHiveFile("data/files/kv1.txt")}' From a508985ad1759ed06ce53b136ffe91184dc8b105 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 2 Dec 2020 00:59:08 +0800 Subject: [PATCH 2/4] remove invalid test --- .../sql/sources/CreateTableAsSelectSuite.scala | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 00c599065ce31..b7c1bf3df4088 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -170,23 +170,6 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSparkSession { } } - test("disallows CREATE EXTERNAL TABLE ... USING ... AS query") { - withTable("t") { - val error = intercept[AnalysisException] { - sql( - s""" - |CREATE EXTERNAL TABLE t USING PARQUET - |OPTIONS (PATH '${path.toURI}') - |AS SELECT 1 AS a, 2 AS b - """.stripMargin - ) - }.getMessage - - assert(error.contains("Operation not allowed") && - error.contains("CREATE EXTERNAL TABLE ...")) - } - } - test("create table using as select - with partitioned by") { val catalog = spark.sessionState.catalog withTable("t") { From ada2bd898c4ef0c264b30851a633e0979e0974af Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 2 Dec 2020 03:02:48 +0800 Subject: [PATCH 3/4] do not enable by default --- docs/sql-migration-guide.md | 2 -- .../apache/spark/sql/internal/SQLConf.scala | 2 +- .../sql/connector/DataSourceV2SQLSuite.scala | 30 ++++++++++--------- .../command/PlanResolutionSuite.scala | 8 +++-- 4 files changed, 22 insertions(+), 20 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 0122978ca547e..2c86e7a932637 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -54,8 +54,6 @@ license: | - In Spark 3.1, creating or altering a view will capture runtime SQL configs and store them as view properties. These configs will be applied during the parsing and analysis phases of the view resolution. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.useCurrentConfigsForView` to `true`. - - In Spark 3.1, `CREATE TABLE` without a specific table provider uses the value of `spark.sql.sources.default` as its table provider. In Spark version 3.0 and below, it was Hive. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.createHiveTableByDefault.enabled` to `true`. - ## Upgrading from Spark SQL 3.0 to 3.0.1 - In Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Since version 3.0.1, the timestamp type inference is disabled by default. Set the JSON option `inferTimestamp` to `true` to enable such type inference. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0547e9a1497a5..3f04b91ae8426 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2929,7 +2929,7 @@ object SQLConf { s"instead of the value of ${DEFAULT_DATA_SOURCE_NAME.key} as the table provider.") .version("3.1.0") .booleanConf - .createWithDefault(false) + .createWithDefault(true) /** * Holds information about keys that have been deprecated. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 4a7dd7e5ba896..7635590ab462e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -267,20 +267,22 @@ class DataSourceV2SQLSuite } test("CreateTable: without USING clause") { - // unset this config to use the default v2 session catalog. - spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) - val testCatalog = catalog("testcat").asTableCatalog - - sql("CREATE TABLE testcat.t1 (id int)") - val t1 = testCatalog.loadTable(Identifier.of(Array(), "t1")) - // Spark shouldn't set the default provider for catalog plugins. - assert(!t1.properties.containsKey(TableCatalog.PROP_PROVIDER)) - - sql("CREATE TABLE t2 (id int)") - val t2 = spark.sessionState.catalogManager.v2SessionCatalog.asTableCatalog - .loadTable(Identifier.of(Array("default"), "t2")).asInstanceOf[V1Table] - // Spark should set the default provider as DEFAULT_DATA_SOURCE_NAME for the session catalog. - assert(t2.v1Table.provider == Some(conf.defaultDataSourceName)) + withSQLConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key -> "false") { + // unset this config to use the default v2 session catalog. + spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) + val testCatalog = catalog("testcat").asTableCatalog + + sql("CREATE TABLE testcat.t1 (id int)") + val t1 = testCatalog.loadTable(Identifier.of(Array(), "t1")) + // Spark shouldn't set the default provider for catalog plugins. + assert(!t1.properties.containsKey(TableCatalog.PROP_PROVIDER)) + + sql("CREATE TABLE t2 (id int)") + val t2 = spark.sessionState.catalogManager.v2SessionCatalog.asTableCatalog + .loadTable(Identifier.of(Array("default"), "t2")).asInstanceOf[V1Table] + // Spark should set the default provider as DEFAULT_DATA_SOURCE_NAME for the session catalog. + assert(t2.v1Table.provider == Some(conf.defaultDataSourceName)) + } } test("CreateTable/RepalceTable: invalid schema if has interval type") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 92aa14bb3b440..33515ad41e918 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -1967,9 +1967,11 @@ class PlanResolutionSuite extends AnalysisTest { assert(desc.viewText.isEmpty) assert(desc.viewQueryColumnNames.isEmpty) assert(desc.storage.locationUri.isEmpty) - assert(desc.storage.inputFormat.isEmpty) - assert(desc.storage.outputFormat.isEmpty) - assert(desc.storage.serde.isEmpty) + assert(desc.storage.inputFormat == + Some("org.apache.hadoop.mapred.TextInputFormat")) + assert(desc.storage.outputFormat == + Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) + assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) assert(desc.storage.properties.isEmpty) assert(desc.properties.isEmpty) assert(desc.comment.isEmpty) From bff924de21b8af2da17f31591768ce02c784bd43 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 2 Dec 2020 16:50:40 +0800 Subject: [PATCH 4/4] address comments --- .../org/apache/spark/sql/internal/SQLConf.scala | 2 +- .../analysis/ResolveSessionCatalog.scala | 13 +++++++++++-- .../sql/sources/CreateTableAsSelectSuite.scala | 17 +++++++++++++++++ 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3f04b91ae8426..66272a63068d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2925,7 +2925,7 @@ object SQLConf { val LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT = buildConf("spark.sql.legacy.createHiveTableByDefault") .internal() - .doc("When set to true, CREATE TABLE syntax without a table provider will use hive " + + .doc("When set to true, CREATE TABLE syntax without USING or STORED AS will use Hive " + s"instead of the value of ${DEFAULT_DATA_SOURCE_NAME.key} as the table provider.") .version("3.1.0") .booleanConf diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index cce280b4241e3..f35fcdc07c372 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -643,6 +643,9 @@ class ResolveSessionCatalog( if (!createHiveTableByDefault || (ctas && conf.convertCTAS)) { (nonHiveStorageFormat, conf.defaultDataSourceName) } else { + logWarning("A Hive serde table will be created as there is no table provider " + + s"specified. You can set ${SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key} to false " + + "so that native data source table will be created instead.") (defaultHiveStorage, DDLUtils.HIVE_PROVIDER) } } @@ -659,8 +662,14 @@ class ResolveSessionCatalog( comment: Option[String], storageFormat: CatalogStorageFormat, external: Boolean): CatalogTable = { - if (external && location.isEmpty) { - throw new AnalysisException(s"CREATE EXTERNAL TABLE must be accompanied by LOCATION") + if (external) { + if (DDLUtils.isHiveTable(Some(provider))) { + if (location.isEmpty) { + throw new AnalysisException(s"CREATE EXTERNAL TABLE must be accompanied by LOCATION") + } + } else { + throw new AnalysisException(s"Operation not allowed: CREATE EXTERNAL TABLE ... USING") + } } val tableType = if (location.isDefined) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index b7c1bf3df4088..00c599065ce31 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -170,6 +170,23 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSparkSession { } } + test("disallows CREATE EXTERNAL TABLE ... USING ... AS query") { + withTable("t") { + val error = intercept[AnalysisException] { + sql( + s""" + |CREATE EXTERNAL TABLE t USING PARQUET + |OPTIONS (PATH '${path.toURI}') + |AS SELECT 1 AS a, 2 AS b + """.stripMargin + ) + }.getMessage + + assert(error.contains("Operation not allowed") && + error.contains("CREATE EXTERNAL TABLE ...")) + } + } + test("create table using as select - with partitioned by") { val catalog = spark.sessionState.catalog withTable("t") {