From 623e136549b5402e9183e0ec479f11419fe64465 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 8 Dec 2020 22:40:30 +0300 Subject: [PATCH 01/18] First test --- .../AlterTableAddPartitionSuiteBase.scala | 58 +++++++++++++++++++ .../v1/AlterTableAddPartitionSuite.scala | 30 ++++++++++ .../v2/AlterTableAddPartitionSuite.scala | 35 +++++++++++ 3 files changed, 123 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala new file mode 100644 index 0000000000000..818221d423781 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.scalactic.source.Position +import org.scalatest.Tag + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.datasources.PartitioningUtils +import org.apache.spark.sql.test.SQLTestUtils + +trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { + protected def version: String + protected def catalog: String + protected def defaultUsing: String + + override def test(testName: String, testTags: Tag*)(testFun: => Any) + (implicit pos: Position): Unit = { + super.test(s"ALTER TABLE .. ADD PARTITION $version: " + testName, testTags: _*)(testFun) + } + + protected def checkPartitions(t: String, expected: Map[String, String]*): Unit = { + val partitions = sql(s"SHOW PARTITIONS $t") + .collect() + .toSet + .map((row: Row) => row.getString(0)) + .map(PartitioningUtils.parsePathFragment) + assert(partitions === expected.toSet) + } + + test("one partition") { + withNamespace(s"$catalog.ns") { + sql(s"CREATE NAMESPACE $catalog.ns") + val t = s"$catalog.ns.tbl" + withTable(t) { + spark.sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") + spark.sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'") + + checkPartitions(t, Map("id" -> "1")) + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala new file mode 100644 index 0000000000000..48edd5233c90c --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.v1 + +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.execution.command +import org.apache.spark.sql.test.SharedSparkSession + +trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuiteBase { + override def version: String = "V1" + override def catalog: String = CatalogManager.SESSION_CATALOG_NAME + override def defaultUsing: String = "USING parquet" +} + +class AlterTableAddPartitionSuite extends AlterTableAddPartitionSuiteBase with SharedSparkSession diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala new file mode 100644 index 0000000000000..e1ad35fd0a30a --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.v2 + +import org.apache.spark.SparkConf +import org.apache.spark.sql.connector.InMemoryPartitionTableCatalog +import org.apache.spark.sql.execution.command +import org.apache.spark.sql.test.SharedSparkSession + +class AlterTableAddPartitionSuite + extends command.AlterTableAddPartitionSuiteBase + with SharedSparkSession { + + override def version: String = "V2" + override def catalog: String = "test_catalog" + override def defaultUsing: String = "USING _" + + override def sparkConf: SparkConf = super.sparkConf + .set(s"spark.sql.catalog.$catalog", classOf[InMemoryPartitionTableCatalog].getName) +} From 97c6fa1e0230d24efdf24d64295b4ca2c9d85757 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 9 Dec 2020 09:48:36 +0300 Subject: [PATCH 02/18] Add checkLocation() --- .../command/AlterTableAddPartitionSuiteBase.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala index 818221d423781..e2d3a3b22f339 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala @@ -43,6 +43,18 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { assert(partitions === expected.toSet) } + protected def checkLocation( + ns: String, + tableName: String, + spec: String, + expected: String): Unit = { + val information = sql(s"SHOW TABLE EXTENDED IN $ns LIKE '$tableName' PARTITION($spec)") + .select("information") + .first().getString(0) + val location = information.split("\\r?\\n").filter(_.startsWith("Location:")).head + assert(location.endsWith(expected)) + } + test("one partition") { withNamespace(s"$catalog.ns") { sql(s"CREATE NAMESPACE $catalog.ns") @@ -52,6 +64,7 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { spark.sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'") checkPartitions(t, Map("id" -> "1")) + checkLocation(s"$catalog.ns", "tbl", "id = 1", "loc") } } } From f7c06fe5c137ee12e6c9f4207b0b3613977b5cf2 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 9 Dec 2020 10:05:36 +0300 Subject: [PATCH 03/18] Improve checkLocation() --- .../command/AlterTableAddPartitionSuiteBase.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala index e2d3a3b22f339..de5622bd79768 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala @@ -43,12 +43,12 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { assert(partitions === expected.toSet) } - protected def checkLocation( - ns: String, - tableName: String, - spec: String, - expected: String): Unit = { - val information = sql(s"SHOW TABLE EXTENDED IN $ns LIKE '$tableName' PARTITION($spec)") + protected def checkLocation(t: String, spec: Map[String, String], expected: String): Unit = { + val tablePath = t.split('.') + val tableName = tablePath.last + val ns = tablePath.init.mkString(".") + val partSpec = spec.map { case (key, value) => s"$key = $value"}.mkString(", ") + val information = sql(s"SHOW TABLE EXTENDED IN $ns LIKE '$tableName' PARTITION($partSpec)") .select("information") .first().getString(0) val location = information.split("\\r?\\n").filter(_.startsWith("Location:")).head @@ -64,7 +64,7 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { spark.sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'") checkPartitions(t, Map("id" -> "1")) - checkLocation(s"$catalog.ns", "tbl", "id = 1", "loc") + checkLocation(t, Map("id" -> "1"), "loc") } } } From ee65251940345840672afdd0d6e614aecb9fca7a Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 9 Dec 2020 10:23:33 +0300 Subject: [PATCH 04/18] Move checkLocation() --- .../analysis/ResolvePartitionSpec.scala | 2 +- .../AlterTableAddPartitionSuiteBase.scala | 18 +++---------- .../v1/AlterTableAddPartitionSuite.scala | 16 +++++++++++ .../v2/AlterTableAddPartitionSuite.scala | 27 ++++++++++++++++++- 4 files changed, 47 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala index feb05d3b6926b..099ac6172c9e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala @@ -81,7 +81,7 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] { resolvedPartitionSpec } - private def convertToPartIdent( + private[sql] def convertToPartIdent( partitionSpec: TablePartitionSpec, schema: Seq[StructField]): InternalRow = { val partValues = schema.map { part => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala index de5622bd79768..8957c6eb3090c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala @@ -21,6 +21,7 @@ import org.scalactic.source.Position import org.scalatest.Tag import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.test.SQLTestUtils @@ -42,26 +43,15 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { .map(PartitioningUtils.parsePathFragment) assert(partitions === expected.toSet) } - - protected def checkLocation(t: String, spec: Map[String, String], expected: String): Unit = { - val tablePath = t.split('.') - val tableName = tablePath.last - val ns = tablePath.init.mkString(".") - val partSpec = spec.map { case (key, value) => s"$key = $value"}.mkString(", ") - val information = sql(s"SHOW TABLE EXTENDED IN $ns LIKE '$tableName' PARTITION($partSpec)") - .select("information") - .first().getString(0) - val location = information.split("\\r?\\n").filter(_.startsWith("Location:")).head - assert(location.endsWith(expected)) - } + protected def checkLocation(t: String, spec: TablePartitionSpec, expected: String): Unit test("one partition") { withNamespace(s"$catalog.ns") { sql(s"CREATE NAMESPACE $catalog.ns") val t = s"$catalog.ns.tbl" withTable(t) { - spark.sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") - spark.sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'") + sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") + sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'") checkPartitions(t, Map("id" -> "1")) checkLocation(t, Map("id" -> "1"), "loc") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala index 48edd5233c90c..b29564e1d81b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.command.v1 +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.execution.command import org.apache.spark.sql.test.SharedSparkSession @@ -25,6 +26,21 @@ trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuit override def version: String = "V1" override def catalog: String = CatalogManager.SESSION_CATALOG_NAME override def defaultUsing: String = "USING parquet" + + override protected def checkLocation( + t: String, + spec: TablePartitionSpec, + expected: String): Unit = { + val tablePath = t.split('.') + val tableName = tablePath.last + val ns = tablePath.init.mkString(".") + val partSpec = spec.map { case (key, value) => s"$key = $value"}.mkString(", ") + val information = sql(s"SHOW TABLE EXTENDED IN $ns LIKE '$tableName' PARTITION($partSpec)") + .select("information") + .first().getString(0) + val location = information.split("\\r?\\n").filter(_.startsWith("Location:")).head + assert(location.endsWith(expected)) + } } class AlterTableAddPartitionSuite extends AlterTableAddPartitionSuiteBase with SharedSparkSession diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala index e1ad35fd0a30a..7f13259eb3c6d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala @@ -18,7 +18,10 @@ package org.apache.spark.sql.execution.command.v2 import org.apache.spark.SparkConf -import org.apache.spark.sql.connector.InMemoryPartitionTableCatalog +import org.apache.spark.sql.catalyst.analysis.ResolvePartitionSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryPartitionTableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier} import org.apache.spark.sql.execution.command import org.apache.spark.sql.test.SharedSparkSession @@ -26,10 +29,32 @@ class AlterTableAddPartitionSuite extends command.AlterTableAddPartitionSuiteBase with SharedSparkSession { + import CatalogV2Implicits._ + override def version: String = "V2" override def catalog: String = "test_catalog" override def defaultUsing: String = "USING _" override def sparkConf: SparkConf = super.sparkConf .set(s"spark.sql.catalog.$catalog", classOf[InMemoryPartitionTableCatalog].getName) + + override protected def checkLocation( + t: String, + spec: TablePartitionSpec, + expected: String): Unit = { + val tablePath = t.split('.') + val catalogName = tablePath.head + val namespaceWithTable = tablePath.tail + val namespaces = namespaceWithTable.init + val tableName = namespaceWithTable.last + val catalogPlugin = spark.sessionState.catalogManager.catalog(catalogName) + val partTable = catalogPlugin.asTableCatalog + .loadTable(Identifier.of(namespaces, tableName)) + .asInstanceOf[InMemoryPartitionTable] + val ident = ResolvePartitionSpec.convertToPartIdent(spec, partTable.partitionSchema.fields) + val partMetadata = partTable.loadPartitionMetadata(ident) + + assert(partMetadata.containsKey("location")) + assert(partMetadata.get("location") === expected) + } } From 1c3542a38b881bcbe1e2e6601522cdafd383800d Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 9 Dec 2020 10:43:24 +0300 Subject: [PATCH 05/18] Add the test "multiple partitions" --- .../AlterTablePartitionV2SQLSuite.scala | 38 ------------------- .../AlterTableAddPartitionSuiteBase.scala | 15 ++++++++ 2 files changed, 15 insertions(+), 38 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index 45d47c6d8681c..154f351713618 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -45,44 +45,6 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { } } - test("ALTER TABLE ADD PARTITION") { - val t = "testpart.ns1.ns2.tbl" - withTable(t) { - spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)") - spark.sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'") - - val partTable = catalog("testpart").asTableCatalog - .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")).asInstanceOf[InMemoryPartitionTable] - assert(partTable.partitionExists(InternalRow.fromSeq(Seq(1)))) - - val partMetadata = partTable.loadPartitionMetadata(InternalRow.fromSeq(Seq(1))) - assert(partMetadata.containsKey("location")) - assert(partMetadata.get("location") == "loc") - } - } - - test("ALTER TABLE ADD PARTITIONS") { - val t = "testpart.ns1.ns2.tbl" - withTable(t) { - spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)") - spark.sql( - s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc' PARTITION (id=2) LOCATION 'loc1'") - - val partTable = catalog("testpart").asTableCatalog - .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")).asInstanceOf[InMemoryPartitionTable] - assert(partTable.partitionExists(InternalRow.fromSeq(Seq(1)))) - assert(partTable.partitionExists(InternalRow.fromSeq(Seq(2)))) - - val partMetadata = partTable.loadPartitionMetadata(InternalRow.fromSeq(Seq(1))) - assert(partMetadata.containsKey("location")) - assert(partMetadata.get("location") == "loc") - - val partMetadata1 = partTable.loadPartitionMetadata(InternalRow.fromSeq(Seq(2))) - assert(partMetadata1.containsKey("location")) - assert(partMetadata1.get("location") == "loc1") - } - } - test("ALTER TABLE ADD PARTITIONS: partition already exists") { val t = "testpart.ns1.ns2.tbl" withTable(t) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala index 8957c6eb3090c..1854593174375 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala @@ -58,4 +58,19 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { } } } + + test("multiple partitions") { + withNamespace(s"$catalog.ns") { + sql(s"CREATE NAMESPACE $catalog.ns") + val t = s"$catalog.ns.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") + sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc' PARTITION (id=2) LOCATION 'loc1'") + + checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) + checkLocation(t, Map("id" -> "1"), "loc") + checkLocation(t, Map("id" -> "2"), "loc1") + } + } + } } From 4501f63460024bb14b634a5d18caa1a36b204951 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 9 Dec 2020 10:55:08 +0300 Subject: [PATCH 06/18] Add the test "partition already exists" --- .../AlterTablePartitionV2SQLSuite.scala | 22 ------------------- .../AlterTableAddPartitionSuiteBase.scala | 22 +++++++++++++++++++ 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index 154f351713618..6aa4b7c0b52af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -45,28 +45,6 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { } } - test("ALTER TABLE ADD PARTITIONS: partition already exists") { - val t = "testpart.ns1.ns2.tbl" - withTable(t) { - spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)") - spark.sql( - s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'") - - assertThrows[PartitionsAlreadyExistException]( - spark.sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" + - " PARTITION (id=2) LOCATION 'loc1'")) - - val partTable = catalog("testpart").asTableCatalog - .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")).asInstanceOf[InMemoryPartitionTable] - assert(!partTable.partitionExists(InternalRow.fromSeq(Seq(1)))) - - spark.sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + - " PARTITION (id=2) LOCATION 'loc1'") - assert(partTable.partitionExists(InternalRow.fromSeq(Seq(1)))) - assert(partTable.partitionExists(InternalRow.fromSeq(Seq(2)))) - } - } - test("ALTER TABLE RENAME PARTITION") { val t = "testcat.ns1.ns2.tbl" withTable(t) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala index 1854593174375..6e4c2902d6d18 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala @@ -21,6 +21,7 @@ import org.scalactic.source.Position import org.scalatest.Tag import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.test.SQLTestUtils @@ -73,4 +74,25 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { } } } + + test("partition already exists") { + withNamespace(s"$catalog.ns") { + sql(s"CREATE NAMESPACE $catalog.ns") + val t = s"$catalog.ns.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") + sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'") + + val errMsg = intercept[PartitionsAlreadyExistException] { + sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" + + " PARTITION (id=2) LOCATION 'loc1'") + }.getMessage + assert(errMsg.contains("The following partitions already exists")) + + sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + + " PARTITION (id=2) LOCATION 'loc1'") + checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) + } + } + } } From 89268f7de4d4904624268b23cebe096df872260f Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 9 Dec 2020 11:08:10 +0300 Subject: [PATCH 07/18] Add withNsTable() --- .../AlterTableAddPartitionSuiteBase.scala | 66 +++++++++---------- 1 file changed, 32 insertions(+), 34 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala index 6e4c2902d6d18..6ec07641d0af8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala @@ -46,53 +46,51 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { } protected def checkLocation(t: String, spec: TablePartitionSpec, expected: String): Unit - test("one partition") { - withNamespace(s"$catalog.ns") { - sql(s"CREATE NAMESPACE $catalog.ns") - val t = s"$catalog.ns.tbl" + protected def withNsTable(ns: String, tableName: String)(f: String => Unit): Unit = { + withNamespace(ns) { + sql(s"CREATE NAMESPACE $ns") + val t = s"$ns.$tableName" withTable(t) { - sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") - sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'") - - checkPartitions(t, Map("id" -> "1")) - checkLocation(t, Map("id" -> "1"), "loc") + f(t) } } } + test("one partition") { + withNsTable(s"$catalog.ns", "tbl") { t => + sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") + sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'") + + checkPartitions(t, Map("id" -> "1")) + checkLocation(t, Map("id" -> "1"), "loc") + } + } + test("multiple partitions") { - withNamespace(s"$catalog.ns") { - sql(s"CREATE NAMESPACE $catalog.ns") - val t = s"$catalog.ns.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") - sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc' PARTITION (id=2) LOCATION 'loc1'") + withNsTable(s"$catalog.ns", "tbl") { t => + sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") + sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc' PARTITION (id=2) LOCATION 'loc1'") - checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) - checkLocation(t, Map("id" -> "1"), "loc") - checkLocation(t, Map("id" -> "2"), "loc1") - } + checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) + checkLocation(t, Map("id" -> "1"), "loc") + checkLocation(t, Map("id" -> "2"), "loc1") } } test("partition already exists") { - withNamespace(s"$catalog.ns") { - sql(s"CREATE NAMESPACE $catalog.ns") - val t = s"$catalog.ns.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") - sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'") + withNsTable(s"$catalog.ns", "tbl") { t => + sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") + sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'") - val errMsg = intercept[PartitionsAlreadyExistException] { - sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" + - " PARTITION (id=2) LOCATION 'loc1'") - }.getMessage - assert(errMsg.contains("The following partitions already exists")) - - sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + + val errMsg = intercept[PartitionsAlreadyExistException] { + sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" + " PARTITION (id=2) LOCATION 'loc1'") - checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) - } + }.getMessage + assert(errMsg.contains("The following partitions already exists")) + + sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + + " PARTITION (id=2) LOCATION 'loc1'") + checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) } } } From e5011cd43622089cdc60b66e73e4a2b6bf6cedab Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 9 Dec 2020 11:17:23 +0300 Subject: [PATCH 08/18] Add case sensitivity test --- .../AlterTablePartitionV2SQLSuite.scala | 2 +- .../AlterTableAddPartitionSuiteBase.scala | 20 ++++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index 6aa4b7c0b52af..935485aff2bfb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -113,7 +113,7 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)") withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { val errMsg = intercept[AnalysisException] { - spark.sql(s"ALTER TABLE $t ADD PARTITION (ID=1) LOCATION 'loc1'") + spark.sql(s"ALTER TABLE $t DROP PARTITION (ID=1)") }.getMessage assert(errMsg.contains(s"ID is not a valid partition column in table $t")) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala index 6ec07641d0af8..5ad311d654e5f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala @@ -20,10 +20,11 @@ package org.apache.spark.sql.execution.command import org.scalactic.source.Position import org.scalatest.Tag -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.execution.datasources.PartitioningUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { @@ -93,4 +94,21 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) } } + + test("case sensitivity in resolving partition specs") { + withNsTable(s"$catalog.ns", "tbl") { t => + spark.sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val errMsg = intercept[AnalysisException] { + spark.sql(s"ALTER TABLE $t ADD PARTITION (ID=1) LOCATION 'loc1'") + }.getMessage + assert(errMsg.contains("ID is not a valid partition column")) + } + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + spark.sql(s"ALTER TABLE $t ADD PARTITION (ID=1) LOCATION 'loc1'") + checkPartitions(t, Map("id" -> "1")) + checkLocation(t, Map("id" -> "1"), "loc1") + } + } + } } From dbb370ba09a3b2e31dafa352831913983945f89e Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 9 Dec 2020 12:49:06 +0300 Subject: [PATCH 09/18] Move "universal type conversions of partition values" --- .../AlterTablePartitionV2SQLSuite.scala | 54 ------------------- .../AlterTableAddPartitionSuiteBase.scala | 48 +++++++++++++++++ 2 files changed, 48 insertions(+), 54 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index 935485aff2bfb..ecf6938174503 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -132,60 +132,6 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { } } - test("SPARK-33521: universal type conversions of partition values") { - val t = "testpart.ns1.ns2.tbl" - withTable(t) { - sql(s""" - |CREATE TABLE $t ( - | part0 tinyint, - | part1 smallint, - | part2 int, - | part3 bigint, - | part4 float, - | part5 double, - | part6 string, - | part7 boolean, - | part8 date, - | part9 timestamp - |) USING foo - |PARTITIONED BY (part0, part1, part2, part3, part4, part5, part6, part7, part8, part9) - |""".stripMargin) - val partTable = catalog("testpart").asTableCatalog - .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")) - .asPartitionable - val expectedPartition = InternalRow.fromSeq(Seq[Any]( - -1, // tinyint - 0, // smallint - 1, // int - 2, // bigint - 3.14F, // float - 3.14D, // double - UTF8String.fromString("abc"), // string - true, // boolean - LocalDate.parse("2020-11-23").toEpochDay, - DateTimeUtils.instantToMicros( - LocalDateTime.parse("2020-11-23T22:13:10.123456").atZone(DateTimeTestUtils.LA).toInstant) - )) - assert(!partTable.partitionExists(expectedPartition)) - val partSpec = """ - | part0 = -1, - | part1 = 0, - | part2 = 1, - | part3 = 2, - | part4 = 3.14, - | part5 = 3.14, - | part6 = 'abc', - | part7 = true, - | part8 = '2020-11-23', - | part9 = '2020-11-23T22:13:10.123456' - |""".stripMargin - sql(s"ALTER TABLE $t ADD PARTITION ($partSpec) LOCATION 'loc1'") - assert(partTable.partitionExists(expectedPartition)) - sql(s" ALTER TABLE $t DROP PARTITION ($partSpec)") - assert(!partTable.partitionExists(expectedPartition)) - } - } - test("SPARK-33650: add/drop partition into a table which doesn't support partition management") { val t = "testcat.ns1.ns2.tbl" withTable(t) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala index 5ad311d654e5f..75017b9ce5ca2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala @@ -111,4 +111,52 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { } } } + + test("SPARK-33521: universal type conversions of partition values") { + withNsTable(s"$catalog.ns", "tbl") { t => + sql(s""" + |CREATE TABLE $t ( + | id int, + | part0 tinyint, + | part1 smallint, + | part2 int, + | part3 bigint, + | part4 float, + | part5 double, + | part6 string, + | part7 boolean, + | part8 date, + | part9 timestamp + |) $defaultUsing + |PARTITIONED BY (part0, part1, part2, part3, part4, part5, part6, part7, part8, part9) + |""".stripMargin) + val partSpec = """ + | part0 = -1, + | part1 = 0, + | part2 = 1, + | part3 = 2, + | part4 = 3.14, + | part5 = 3.14, + | part6 = 'abc', + | part7 = true, + | part8 = '2020-11-23', + | part9 = '2020-11-23T22:13:10.123456' + |""".stripMargin + sql(s"ALTER TABLE $t ADD PARTITION ($partSpec) LOCATION 'loc1'") + val expected = Map( + "part0" -> "-1", + "part1" -> "0", + "part2" -> "1", + "part3" -> "2", + "part4" -> "3.14", + "part5" -> "3.14", + "part6" -> "abc", + "part7" -> "true", + "part8" -> "2020-11-23", + "part9" -> s"2020-11-23${if (version == "V2") " " else "T"}22:13:10.123456") + checkPartitions(t, expected) + sql(s"ALTER TABLE $t DROP PARTITION ($partSpec)") + checkPartitions(t) // no partitions + } + } } From 7ec87fe5169853b54533d6a61f558fed1a4ba655 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 9 Dec 2020 12:55:25 +0300 Subject: [PATCH 10/18] Move the SPARK-33650 test --- .../connector/AlterTablePartitionV2SQLSuite.scala | 15 +++++---------- .../command/v2/AlterTableAddPartitionSuite.scala | 14 +++++++++++++- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index ecf6938174503..010d6f03ce46d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -132,19 +132,14 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { } } - test("SPARK-33650: add/drop partition into a table which doesn't support partition management") { + test("SPARK-33650: drop partition into a table which doesn't support partition management") { val t = "testcat.ns1.ns2.tbl" withTable(t) { spark.sql(s"CREATE TABLE $t (id bigint, data string) USING _") - Seq( - s"ALTER TABLE $t ADD PARTITION (id=1)", - s"ALTER TABLE $t DROP PARTITION (id=1)" - ).foreach { alterTable => - val errMsg = intercept[AnalysisException] { - spark.sql(alterTable) - }.getMessage - assert(errMsg.contains(s"Table $t can not alter partitions")) - } + val errMsg = intercept[AnalysisException] { + spark.sql(s"ALTER TABLE $t DROP PARTITION (id=1)") + }.getMessage + assert(errMsg.contains(s"Table $t can not alter partitions")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala index 7f13259eb3c6d..461c222bc9547 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala @@ -18,9 +18,10 @@ package org.apache.spark.sql.execution.command.v2 import org.apache.spark.SparkConf +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.ResolvePartitionSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryPartitionTableCatalog} +import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryPartitionTableCatalog, InMemoryTableCatalog} import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier} import org.apache.spark.sql.execution.command import org.apache.spark.sql.test.SharedSparkSession @@ -37,6 +38,7 @@ class AlterTableAddPartitionSuite override def sparkConf: SparkConf = super.sparkConf .set(s"spark.sql.catalog.$catalog", classOf[InMemoryPartitionTableCatalog].getName) + .set(s"spark.sql.catalog.non_part_$catalog", classOf[InMemoryTableCatalog].getName) override protected def checkLocation( t: String, @@ -57,4 +59,14 @@ class AlterTableAddPartitionSuite assert(partMetadata.containsKey("location")) assert(partMetadata.get("location") === expected) } + + test("SPARK-33650: add partition into a table which doesn't support partition management") { + withNsTable(s"non_part_$catalog.ns", "tbl") { t => + sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing") + val errMsg = intercept[AnalysisException] { + sql(s"ALTER TABLE $t ADD PARTITION (id=1)") + }.getMessage + assert(errMsg.contains(s"Table $t can not alter partitions")) + } + } } From f763def4c14bd4878eb748323e1cd4d2a397ac31 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 9 Dec 2020 13:00:13 +0300 Subject: [PATCH 11/18] Move the SPARK-33676 test --- .../AlterTablePartitionV2SQLSuite.scala | 21 ++++++------------- .../AlterTableAddPartitionSuiteBase.scala | 14 +++++++++++++ 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index 010d6f03ce46d..570976965ec7c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -17,16 +17,12 @@ package org.apache.spark.sql.connector -import java.time.{LocalDate, LocalDateTime} - import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionsAlreadyExistException} -import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils} +import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.unsafe.types.UTF8String class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { @@ -150,16 +146,11 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { |CREATE TABLE $t (id bigint, part0 int, part1 string) |USING foo |PARTITIONED BY (part0, part1)""".stripMargin) - Seq( - s"ALTER TABLE $t ADD PARTITION (part0 = 1)", - s"ALTER TABLE $t DROP PARTITION (part0 = 1)" - ).foreach { alterTable => - val errMsg = intercept[AnalysisException] { - sql(alterTable) - }.getMessage - assert(errMsg.contains("Partition spec is invalid. " + - "The spec (part0) must match the partition spec (part0, part1)")) - } + val errMsg = intercept[AnalysisException] { + sql(s"ALTER TABLE $t DROP PARTITION (part0 = 1)") + }.getMessage + assert(errMsg.contains("Partition spec is invalid. " + + "The spec (part0) must match the partition spec (part0, part1)")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala index 75017b9ce5ca2..beb7b294bc3bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala @@ -159,4 +159,18 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { checkPartitions(t) // no partitions } } + + test("SPARK-33676: not fully specified partition spec") { + withNsTable(s"$catalog.ns", "tbl") { t => + sql(s""" + |CREATE TABLE $t (id bigint, part0 int, part1 string) + |$defaultUsing + |PARTITIONED BY (part0, part1)""".stripMargin) + val errMsg = intercept[AnalysisException] { + sql(s"ALTER TABLE $t ADD PARTITION (part0 = 1)") + }.getMessage + assert(errMsg.contains("Partition spec is invalid. " + + "The spec (part0) must match the partition spec (part0, part1)")) + } + } } From e7af1bc0d5a543449bdae7ce5458412c8e1fdfef Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 9 Dec 2020 13:18:24 +0300 Subject: [PATCH 12/18] Add "multi-part partition" test --- .../AlterTableAddPartitionSuiteBase.scala | 32 +++++++++++++++---- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala index beb7b294bc3bd..67f1e768f265d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala @@ -60,21 +60,39 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { test("one partition") { withNsTable(s"$catalog.ns", "tbl") { t => sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") - sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'") + Seq("", "IF NOT EXISTS").foreach { exists => + sql(s"ALTER TABLE $t ADD $exists PARTITION (id=1) LOCATION 'loc'") - checkPartitions(t, Map("id" -> "1")) - checkLocation(t, Map("id" -> "1"), "loc") + checkPartitions(t, Map("id" -> "1")) + checkLocation(t, Map("id" -> "1"), "loc") + } } } test("multiple partitions") { withNsTable(s"$catalog.ns", "tbl") { t => sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") - sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc' PARTITION (id=2) LOCATION 'loc1'") + Seq("", "IF NOT EXISTS").foreach { exists => + sql(s""" + |ALTER TABLE $t ADD $exists + |PARTITION (id=1) LOCATION 'loc' + |PARTITION (id=2) LOCATION 'loc1'""".stripMargin) + + checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) + checkLocation(t, Map("id" -> "1"), "loc") + checkLocation(t, Map("id" -> "2"), "loc1") + } + } + } - checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) - checkLocation(t, Map("id" -> "1"), "loc") - checkLocation(t, Map("id" -> "2"), "loc1") + test("multi-part partition") { + withNsTable(s"$catalog.ns", "tbl") { t => + sql(s"CREATE TABLE $t (id bigint, a int, b string) $defaultUsing PARTITIONED BY (a, b)") + Seq("", "IF NOT EXISTS").foreach { exists => + sql(s"ALTER TABLE $t ADD $exists PARTITION (a=2, b='abc')") + + checkPartitions(t, Map("a" -> "2", "b" -> "abc")) + } } } From c4d5028ed36d849b5284dc0277e9d4893580829b Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 9 Dec 2020 13:28:55 +0300 Subject: [PATCH 13/18] Unify V1 tests --- .../AlterTableAddPartitionSuiteBase.scala | 9 +++ .../sql/execution/command/DDLSuite.scala | 57 ------------------- 2 files changed, 9 insertions(+), 57 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala index 67f1e768f265d..a70854cdf1871 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala @@ -96,6 +96,15 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { } } + test("table to alter does not exist") { + withNsTable(s"$catalog.ns", "does_not_exist") { t => + val errMsg = intercept[AnalysisException] { + sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (a='4', b='9')") + }.getMessage + assert(errMsg.contains("Table not found")) + } + } + test("partition already exists") { withNsTable(s"$catalog.ns", "tbl") { t => sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index b3cd9f1057a70..bf19c77a8859a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1621,63 +1621,6 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } - protected def testAddPartitions(isDatasourceTable: Boolean): Unit = { - if (!isUsingHiveMetastore) { - assert(isDatasourceTable, "InMemoryCatalog only supports data source tables") - } - val catalog = spark.sessionState.catalog - val tableIdent = TableIdentifier("tab1", Some("dbx")) - val part1 = Map("a" -> "1", "b" -> "5") - val part2 = Map("a" -> "2", "b" -> "6") - val part3 = Map("a" -> "3", "b" -> "7") - val part4 = Map("a" -> "4", "b" -> "8") - val part5 = Map("a" -> "9", "b" -> "9") - createDatabase(catalog, "dbx") - createTable(catalog, tableIdent, isDatasourceTable) - createTablePartition(catalog, part1, tableIdent) - assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1)) - - // basic add partition - sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " + - "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')") - assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3)) - assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isDefined) - - val tableLocation = catalog.getTableMetadata(tableIdent).storage.locationUri - assert(tableLocation.isDefined) - val partitionLocation = makeQualifiedPath( - new Path(tableLocation.get.toString, "paris").toString) - - assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option(partitionLocation)) - assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isDefined) - - // add partitions without explicitly specifying database - catalog.setCurrentDatabase("dbx") - sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')") - assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == - Set(part1, part2, part3, part4)) - - // table to alter does not exist - intercept[AnalysisException] { - sql("ALTER TABLE does_not_exist ADD IF NOT EXISTS PARTITION (a='4', b='9')") - } - - // partition to add already exists - intercept[AnalysisException] { - sql("ALTER TABLE tab1 ADD PARTITION (a='4', b='8')") - } - - // partition to add already exists when using IF NOT EXISTS - sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')") - assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == - Set(part1, part2, part3, part4)) - - // partition spec in ADD PARTITION should be case insensitive by default - sql("ALTER TABLE tab1 ADD PARTITION (A='9', B='9')") - assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == - Set(part1, part2, part3, part4, part5)) - } - protected def testDropPartitions(isDatasourceTable: Boolean): Unit = { if (!isUsingHiveMetastore) { assert(isDatasourceTable, "InMemoryCatalog only supports data source tables") From 329fcc96388bc462e990d9716efa9891ccadc319 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 9 Dec 2020 14:35:32 +0300 Subject: [PATCH 14/18] Add AlterTableAddPartitionParserSuite --- .../sql/catalyst/parser/DDLParserSuite.scala | 27 ---------- .../AlterTableAddPartitionParserSuite.scala | 51 +++++++++++++++++++ .../sql/execution/command/DDLSuite.scala | 4 -- .../sql/hive/execution/HiveDDLSuite.scala | 4 -- 4 files changed, 51 insertions(+), 35 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionParserSuite.scala diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index d5b27d9ad25cf..9008312cd1248 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -2042,33 +2042,6 @@ class DDLParserSuite extends AnalysisTest { AlterTableRecoverPartitionsStatement(Seq("a", "b", "c"))) } - test("alter table: add partition") { - val sql1 = - """ - |ALTER TABLE a.b.c ADD IF NOT EXISTS PARTITION - |(dt='2008-08-08', country='us') LOCATION 'location1' PARTITION - |(dt='2009-09-09', country='uk') - """.stripMargin - val sql2 = "ALTER TABLE a.b.c ADD PARTITION (dt='2008-08-08') LOCATION 'loc'" - - val parsed1 = parsePlan(sql1) - val parsed2 = parsePlan(sql2) - - val expected1 = AlterTableAddPartition( - UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... ADD PARTITION ..."), - Seq( - UnresolvedPartitionSpec(Map("dt" -> "2008-08-08", "country" -> "us"), Some("location1")), - UnresolvedPartitionSpec(Map("dt" -> "2009-09-09", "country" -> "uk"), None)), - ifNotExists = true) - val expected2 = AlterTableAddPartition( - UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... ADD PARTITION ..."), - Seq(UnresolvedPartitionSpec(Map("dt" -> "2008-08-08"), Some("loc"))), - ifNotExists = false) - - comparePlans(parsed1, expected1) - comparePlans(parsed2, expected2) - } - test("alter view: add partition (not supported)") { assertUnsupported( """ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionParserSuite.scala new file mode 100644 index 0000000000000..7f31e7b27bf83 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionParserSuite.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedPartitionSpec, UnresolvedTable} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan +import org.apache.spark.sql.catalyst.plans.logical.AlterTableAddPartition +import org.apache.spark.sql.test.SharedSparkSession + +class AlterTableAddPartitionParserSuite extends AnalysisTest with SharedSparkSession { + test("add partition") { + val sql = """ + |ALTER TABLE a.b.c ADD IF NOT EXISTS PARTITION + |(dt='2008-08-08', country='us') LOCATION 'location1' PARTITION + |(dt='2009-09-09', country='uk')""".stripMargin + val parsed = parsePlan(sql) + val expected = AlterTableAddPartition( + UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... ADD PARTITION ..."), + Seq( + UnresolvedPartitionSpec(Map("dt" -> "2008-08-08", "country" -> "us"), Some("location1")), + UnresolvedPartitionSpec(Map("dt" -> "2009-09-09", "country" -> "uk"), None)), + ifNotExists = true) + comparePlans(parsed, expected) + } + + test("add partition if not exists") { + val sql = "ALTER TABLE a.b.c ADD PARTITION (dt='2008-08-08') LOCATION 'loc'" + val parsed = parsePlan(sql) + val expected = AlterTableAddPartition( + UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... ADD PARTITION ..."), + Seq(UnresolvedPartitionSpec(Map("dt" -> "2008-08-08"), Some("loc"))), + ifNotExists = false) + + comparePlans(parsed, expected) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index bf19c77a8859a..927cd8c387cc0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -334,10 +334,6 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { testChangeColumn(isDatasourceTable = true) } - test("alter table: add partition (datasource table)") { - testAddPartitions(isDatasourceTable = true) - } - test("alter table: drop partition (datasource table)") { testDropPartitions(isDatasourceTable = true) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index d6a4d76386889..070fdf55deb38 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -166,10 +166,6 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA testDropPartitions(isDatasourceTable = false) } - test("alter table: add partition") { - testAddPartitions(isDatasourceTable = false) - } - test("drop table") { testDropTable(isDatasourceTable = false) } From 2ce716b0d0e629b0b28f339dc0967b048240e0eb Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 9 Dec 2020 15:12:49 +0300 Subject: [PATCH 15/18] Move specific tests --- .../AlterTableAddPartitionSuiteBase.scala | 18 -------- .../v1/AlterTableAddPartitionSuite.scala | 20 +++++++- .../v2/AlterTableAddPartitionSuite.scala | 19 +++++++- .../command/AlterTableAddPartitionSuite.scala | 46 +++++++++++++++++++ 4 files changed, 83 insertions(+), 20 deletions(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableAddPartitionSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala index a70854cdf1871..643b3fbee313c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala @@ -21,7 +21,6 @@ import org.scalactic.source.Position import org.scalatest.Tag import org.apache.spark.sql.{AnalysisException, QueryTest, Row} -import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.internal.SQLConf @@ -105,23 +104,6 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { } } - test("partition already exists") { - withNsTable(s"$catalog.ns", "tbl") { t => - sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") - sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'") - - val errMsg = intercept[PartitionsAlreadyExistException] { - sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" + - " PARTITION (id=2) LOCATION 'loc1'") - }.getMessage - assert(errMsg.contains("The following partitions already exists")) - - sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + - " PARTITION (id=2) LOCATION 'loc1'") - checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) - } - } - test("case sensitivity in resolving partition specs") { withNsTable(s"$catalog.ns", "tbl") { t => spark.sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala index b29564e1d81b6..4c9ed1a66404e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.command.v1 +import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.execution.command @@ -43,4 +44,21 @@ trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuit } } -class AlterTableAddPartitionSuite extends AlterTableAddPartitionSuiteBase with SharedSparkSession +class AlterTableAddPartitionSuite extends AlterTableAddPartitionSuiteBase with SharedSparkSession { + test("partition already exists") { + withNsTable(s"$catalog.ns", "tbl") { t => + sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") + sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'") + + val errMsg = intercept[PartitionsAlreadyExistException] { + sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" + + " PARTITION (id=2) LOCATION 'loc1'") + }.getMessage + assert(errMsg.contains("The following partitions already exists")) + + sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + + " PARTITION (id=2) LOCATION 'loc1'") + checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala index 461c222bc9547..bb5c51079dc12 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command.v2 import org.apache.spark.SparkConf import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.ResolvePartitionSpec +import org.apache.spark.sql.catalyst.analysis.{PartitionsAlreadyExistException, ResolvePartitionSpec} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryPartitionTableCatalog, InMemoryTableCatalog} import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier} @@ -60,6 +60,23 @@ class AlterTableAddPartitionSuite assert(partMetadata.get("location") === expected) } + test("partition already exists") { + withNsTable(s"$catalog.ns", "tbl") { t => + sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") + sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'") + + val errMsg = intercept[PartitionsAlreadyExistException] { + sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" + + " PARTITION (id=2) LOCATION 'loc1'") + }.getMessage + assert(errMsg.contains("The following partitions already exists")) + + sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + + " PARTITION (id=2) LOCATION 'loc1'") + checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) + } + } + test("SPARK-33650: add partition into a table which doesn't support partition management") { withNsTable(s"non_part_$catalog.ns", "tbl") { t => sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableAddPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableAddPartitionSuite.scala new file mode 100644 index 0000000000000..38d8aa08b570c --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableAddPartitionSuite.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution.command + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.command.v1 +import org.apache.spark.sql.hive.test.TestHiveSingleton + +class AlterTableAddPartitionSuite + extends v1.AlterTableAddPartitionSuiteBase + with TestHiveSingleton { + override def version: String = "Hive V1" + override def defaultUsing: String = "USING HIVE" + + test("partition already exists") { + withNsTable(s"$catalog.ns", "tbl") { t => + sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") + sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'") + + val errMsg = intercept[AnalysisException] { + sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" + + " PARTITION (id=2) LOCATION 'loc1'") + }.getMessage + assert(errMsg.contains("already exists")) + + sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + + " PARTITION (id=2) LOCATION 'loc1'") + checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) + } + } +} From 29a560fad2fcd8c902b32b1447b8227d7a0d900f Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 9 Dec 2020 21:44:49 +0300 Subject: [PATCH 16/18] Fix test names in AlterTableAddPartitionParserSuite --- .../execution/command/AlterTableAddPartitionParserSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionParserSuite.scala index 7f31e7b27bf83..5ebca8f651604 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionParserSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.AlterTableAddPartition import org.apache.spark.sql.test.SharedSparkSession class AlterTableAddPartitionParserSuite extends AnalysisTest with SharedSparkSession { - test("add partition") { + test("add partition if not exists") { val sql = """ |ALTER TABLE a.b.c ADD IF NOT EXISTS PARTITION |(dt='2008-08-08', country='us') LOCATION 'location1' PARTITION @@ -38,7 +38,7 @@ class AlterTableAddPartitionParserSuite extends AnalysisTest with SharedSparkSes comparePlans(parsed, expected) } - test("add partition if not exists") { + test("add partition") { val sql = "ALTER TABLE a.b.c ADD PARTITION (dt='2008-08-08') LOCATION 'loc'" val parsed = parsePlan(sql) val expected = AlterTableAddPartition( From f2c4ecc30029749408fb39efadf9687e296e60eb Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 9 Dec 2020 22:00:33 +0300 Subject: [PATCH 17/18] Address Wenchen's review comment --- .../AlterTableAddPartitionSuiteBase.scala | 24 ++++++++++--------- .../v1/AlterTableAddPartitionSuite.scala | 2 +- .../v2/AlterTableAddPartitionSuite.scala | 4 ++-- .../command/AlterTableAddPartitionSuite.scala | 2 +- 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala index 643b3fbee313c..bdb8ecbfb3f6f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala @@ -46,10 +46,12 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { } protected def checkLocation(t: String, spec: TablePartitionSpec, expected: String): Unit - protected def withNsTable(ns: String, tableName: String)(f: String => Unit): Unit = { - withNamespace(ns) { - sql(s"CREATE NAMESPACE $ns") - val t = s"$ns.$tableName" + protected def withNsTable(ns: String, tableName: String, cat: String = catalog) + (f: String => Unit): Unit = { + val nsCat = s"$cat.$ns" + withNamespace(nsCat) { + sql(s"CREATE NAMESPACE $nsCat") + val t = s"$nsCat.$tableName" withTable(t) { f(t) } @@ -57,7 +59,7 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { } test("one partition") { - withNsTable(s"$catalog.ns", "tbl") { t => + withNsTable("ns", "tbl") { t => sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") Seq("", "IF NOT EXISTS").foreach { exists => sql(s"ALTER TABLE $t ADD $exists PARTITION (id=1) LOCATION 'loc'") @@ -69,7 +71,7 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { } test("multiple partitions") { - withNsTable(s"$catalog.ns", "tbl") { t => + withNsTable("ns", "tbl") { t => sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") Seq("", "IF NOT EXISTS").foreach { exists => sql(s""" @@ -85,7 +87,7 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { } test("multi-part partition") { - withNsTable(s"$catalog.ns", "tbl") { t => + withNsTable("ns", "tbl") { t => sql(s"CREATE TABLE $t (id bigint, a int, b string) $defaultUsing PARTITIONED BY (a, b)") Seq("", "IF NOT EXISTS").foreach { exists => sql(s"ALTER TABLE $t ADD $exists PARTITION (a=2, b='abc')") @@ -96,7 +98,7 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { } test("table to alter does not exist") { - withNsTable(s"$catalog.ns", "does_not_exist") { t => + withNsTable("ns", "does_not_exist") { t => val errMsg = intercept[AnalysisException] { sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (a='4', b='9')") }.getMessage @@ -105,7 +107,7 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { } test("case sensitivity in resolving partition specs") { - withNsTable(s"$catalog.ns", "tbl") { t => + withNsTable("ns", "tbl") { t => spark.sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { val errMsg = intercept[AnalysisException] { @@ -122,7 +124,7 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { } test("SPARK-33521: universal type conversions of partition values") { - withNsTable(s"$catalog.ns", "tbl") { t => + withNsTable("ns", "tbl") { t => sql(s""" |CREATE TABLE $t ( | id int, @@ -170,7 +172,7 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { } test("SPARK-33676: not fully specified partition spec") { - withNsTable(s"$catalog.ns", "tbl") { t => + withNsTable("ns", "tbl") { t => sql(s""" |CREATE TABLE $t (id bigint, part0 int, part1 string) |$defaultUsing diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala index 4c9ed1a66404e..295ce1d3da13f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala @@ -46,7 +46,7 @@ trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuit class AlterTableAddPartitionSuite extends AlterTableAddPartitionSuiteBase with SharedSparkSession { test("partition already exists") { - withNsTable(s"$catalog.ns", "tbl") { t => + withNsTable("ns", "tbl") { t => sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala index bb5c51079dc12..b15235d17671a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala @@ -61,7 +61,7 @@ class AlterTableAddPartitionSuite } test("partition already exists") { - withNsTable(s"$catalog.ns", "tbl") { t => + withNsTable("ns", "tbl") { t => sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'") @@ -78,7 +78,7 @@ class AlterTableAddPartitionSuite } test("SPARK-33650: add partition into a table which doesn't support partition management") { - withNsTable(s"non_part_$catalog.ns", "tbl") { t => + withNsTable("ns", "tbl", s"non_part_$catalog") { t => sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing") val errMsg = intercept[AnalysisException] { sql(s"ALTER TABLE $t ADD PARTITION (id=1)") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableAddPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableAddPartitionSuite.scala index 38d8aa08b570c..ef0ec8d9bd69f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableAddPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableAddPartitionSuite.scala @@ -28,7 +28,7 @@ class AlterTableAddPartitionSuite override def defaultUsing: String = "USING HIVE" test("partition already exists") { - withNsTable(s"$catalog.ns", "tbl") { t => + withNsTable("ns", "tbl") { t => sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'") From 50c59aa821bddc447695d87e2485db3fdd2d67c0 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 10 Dec 2020 00:29:36 +0300 Subject: [PATCH 18/18] Timestamp with gap --- .../execution/command/AlterTableAddPartitionSuiteBase.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala index bdb8ecbfb3f6f..0cf0b395f139b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala @@ -151,7 +151,7 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { | part6 = 'abc', | part7 = true, | part8 = '2020-11-23', - | part9 = '2020-11-23T22:13:10.123456' + | part9 = '2020-11-23 22:13:10.123456' |""".stripMargin sql(s"ALTER TABLE $t ADD PARTITION ($partSpec) LOCATION 'loc1'") val expected = Map( @@ -164,7 +164,7 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { "part6" -> "abc", "part7" -> "true", "part8" -> "2020-11-23", - "part9" -> s"2020-11-23${if (version == "V2") " " else "T"}22:13:10.123456") + "part9" -> "2020-11-23 22:13:10.123456") checkPartitions(t, expected) sql(s"ALTER TABLE $t DROP PARTITION ($partSpec)") checkPartitions(t) // no partitions