From 20f658ad8e14a94dd23bff6a8d795124d1db24e9 Mon Sep 17 00:00:00 2001 From: Dylan Su Date: Wed, 8 Nov 2017 11:44:28 +0800 Subject: [PATCH 01/14] [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITION should support comparators --- .../spark/sql/catalyst/parser/SqlBase.g4 | 2 +- .../sql/catalyst/parser/AstBuilder.scala | 21 ++++ .../spark/sql/execution/SparkSqlParser.scala | 3 +- .../spark/sql/execution/command/ddl.scala | 36 +++++- .../execution/command/DDLParserSuite.scala | 2 +- .../sql/hive/execution/HiveDDLSuite.scala | 103 ++++++++++++++++++ 6 files changed, 161 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 6fe995f650d55..53d92989a7784 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -250,7 +250,7 @@ partitionSpecLocation ; partitionSpec - : PARTITION '(' partitionVal (',' partitionVal)* ')' + : PARTITION '(' (partitionVal | expression) (',' (partitionVal | expression))* ')' ; partitionVal diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 7651d11ee65a8..f79633e0140ab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging parts.toMap } + /** + * Create a partition fileter specification. + */ + def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) { + val parts = ctx.expression.asScala.map { pVal => + expression(pVal) match { + case EqualNullSafe(_, _) => + throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) + case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => + cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), constant)) + case _ => + throw new ParseException("Invalid partition filter specification", ctx) + } + } + if(parts.isEmpty) { + null + } else { + parts.reduceLeft(And) + } + } + /** * Create a partition specification map without optional values. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 29b584b55972c..a58b92b2b4de0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -916,7 +916,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } AlterTableDropPartitionCommand( visitTableIdentifier(ctx.tableIdentifier), - ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), + ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec).filter(_ != null), + ctx.partitionSpec.asScala.map(visitPartitionFilterSpec).filter(_ != null), ifExists = ctx.EXISTS != null, purge = ctx.PURGE != null, retainData = false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 568567aa8ea88..08060c6f0aeaa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, PredicateHelper} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils} import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat @@ -516,27 +516,57 @@ case class AlterTableRenamePartitionCommand( case class AlterTableDropPartitionCommand( tableName: TableIdentifier, specs: Seq[TablePartitionSpec], + exprs: Seq[Expression] = Seq.empty[Expression], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) + val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") + exprs.foreach { expr => + expr.references.foreach { attr => + if (!table.partitionColumnNames.exists(resolver(_, attr.name))) { + throw new AnalysisException(s"${attr.name} is not a valid partition column " + s"in table ${table.identifier.quotedString}.") + } + } + } + + val partitionSet = exprs.flatMap { expr => + val partitions = catalog.listPartitionsByFilter(table.identifier, Seq(expr)).map(_.spec) + if (partitions.isEmpty && !ifExists) { + throw new AnalysisException(s"There is no partition for ${expr.sql}") + } + partitions + }.distinct + val normalizedSpecs = specs.map { spec => PartitioningUtils.normalizePartitionSpec( spec, table.partitionColumnNames, table.identifier.quotedString, sparkSession.sessionState.conf.resolver) + }.filter(_.size != 0) + + val toDrop = { + if (normalizedSpecs.isEmpty && partitionSet.isEmpty) { + Seq.empty[TablePartitionSpec] + } else if (normalizedSpecs.isEmpty && !partitionSet.isEmpty) { + partitionSet + } else if (!normalizedSpecs.isEmpty && partitionSet.isEmpty) { + normalizedSpecs + } else { + partitionSet.intersect(normalizedSpecs) + } } catalog.dropPartitions( - table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge, + table.identifier, toDrop, ignoreIfNotExists = ifExists, purge = purge, retainData = retainData) CommandUtils.updateTableStats(sparkSession, table) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index eb7c33590b602..41af956efaaf7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan -import org.apache.spark.sql.catalyst.expressions.JsonTuple +import org.apache.spark.sql.catalyst.expressions.{JsonTuple, Literal} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index d3465a641a1a4..fad85d957a79e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -30,6 +30,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} @@ -495,6 +496,108 @@ class HiveDDLSuite } } + test("SPARK-17732; Drop partitions by filter") { + withTable("sales") { + sql("CREATE TABLE sales (id INT) PARTITIONED BY (country STRING, quarter STRING)") + + for (country <- Seq("US", "CA", "KR")) { + for (quarter <- 1 to 4) { + sql(s"ALTER TABLE sales ADD PARTITION (country = '$country', quarter = '$quarter')") + } + } + + sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter > '2')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=CA/quarter=1") :: + Row("country=CA/quarter=2") :: + Row("country=KR/quarter=1") :: + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=US/quarter=1") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: Nil) + + sql("ALTER TABLE sales DROP PARTITION (country < 'KR'), PARTITION (quarter <= '1')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: Nil) + + sql("ALTER TABLE sales DROP PARTITION (country = 'KR', quarter = '4')") + sql("ALTER TABLE sales DROP PARTITION (country = 'US', quarter = '3')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=4") :: Nil) + + sql("ALTER TABLE sales DROP PARTITION (quarter <= 2), PARTITION (quarter >= '4')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=3") :: Nil) + + // According to the declarative partition spec definitions, this drops the union of target + // partitions without exceptions. Hive raises exceptions because it handles them sequentially. + sql("ALTER TABLE sales DROP PARTITION (quarter <= 4), PARTITION (quarter <= '3')") + checkAnswer(sql("SHOW PARTITIONS sales"), Nil) + } + } + + test("SPARK-14922, SPARK-17732: Error handling for drop partitions by filter") { + withTable("sales") { + sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") + + val m = intercept[AnalysisException] { + sql("ALTER TABLE sales DROP PARTITION (unknown = 'KR')") + }.getMessage + assert(m.contains("unknown is not a valid partition column in table")) + + val m2 = intercept[AnalysisException] { + sql("ALTER TABLE sales DROP PARTITION (unknown < 'KR')") + }.getMessage + assert(m2.contains("unknown is not a valid partition column in table")) + + val m3 = intercept[AnalysisException] { + sql("ALTER TABLE sales DROP PARTITION (unknown <=> 'KR')") + }.getMessage + assert(m3.contains("'<=>' operator is not allowed in partition specification")) + + val m4 = intercept[ParseException] { + sql("ALTER TABLE sales DROP PARTITION (unknown <=> upper('KR'))") + }.getMessage + assert(m4.contains("'<=>' operator is not allowed in partition specification")) + + val m5 = intercept[ParseException] { + sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter)") + }.getMessage + assert(m5.contains("Found an empty partition key")) + + sql(s"ALTER TABLE sales ADD PARTITION (country = 'KR', quarter = '3')") + val m6 = intercept[AnalysisException] { + sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '2')") + }.getMessage + // The query is not executed because `PARTITION (quarter <= '2')` is invalid. + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=3") :: Nil) + assert(m6.contains("There is no partition for (`quarter` <= '2')")) + } + } + + test("SPARK-17732: Partition filter is not allowed in ADD PARTITION") { + withTable("sales") { + sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") + + val m = intercept[AnalysisException] { + sql("ALTER TABLE sales ADD PARTITION (country = 'US', quarter < '1')") + }.getMessage + assert(m.contains("Partition spec is invalid")) + } + } + test("drop views") { withTable("tab1") { val tabName = "tab1" From 85fdb4663865a5998788860c4ec152bcaee3a894 Mon Sep 17 00:00:00 2001 From: Dylan Su Date: Tue, 14 Nov 2017 14:48:51 +0800 Subject: [PATCH 02/14] bug fix --- .../sql/catalyst/parser/AstBuilder.scala | 7 +++ .../spark/sql/execution/SparkSqlParser.scala | 3 +- .../spark/sql/execution/command/ddl.scala | 52 +++++++++---------- .../InsertIntoHadoopFsRelationCommand.scala | 4 +- .../execution/command/DDLParserSuite.scala | 10 ++-- .../sql/hive/execution/HiveDDLSuite.scala | 25 +++++++-- 6 files changed, 61 insertions(+), 40 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index f79633e0140ab..4b1cf7b9a1050 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -314,6 +314,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * + */ + protected def visitPartition(ctx: PartitionSpecContext): (Map[String, String], Expression) = { + (visitNonOptionalPartitionSpec(ctx), visitPartitionFilterSpec(ctx)) + } + /** * Convert a constant of any type into a string. This is typically used in DDL commands, and its * main purpose is to prevent slight differences due to back to back conversions i.e.: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index a58b92b2b4de0..910a1a76af80e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -916,8 +916,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } AlterTableDropPartitionCommand( visitTableIdentifier(ctx.tableIdentifier), - ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec).filter(_ != null), - ctx.partitionSpec.asScala.map(visitPartitionFilterSpec).filter(_ != null), + ctx.partitionSpec.asScala.map(visitPartition), ifExists = ctx.EXISTS != null, purge = ctx.PURGE != null, retainData = false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 08060c6f0aeaa..f1f9bcfbc0496 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -515,8 +515,7 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, - specs: Seq[TablePartitionSpec], - exprs: Seq[Expression] = Seq.empty[Expression], + partitions: Seq[(TablePartitionSpec, Expression)], ifExists: Boolean, purge: Boolean, retainData: Boolean) @@ -529,39 +528,38 @@ case class AlterTableDropPartitionCommand( DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") - exprs.foreach { expr => - expr.references.foreach { attr => - if (!table.partitionColumnNames.exists(resolver(_, attr.name))) { - throw new AnalysisException(s"${attr.name} is not a valid partition column " + s"in table ${table.identifier.quotedString}.") + val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( + partition._1, + table.partitionColumnNames, + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) + + val partitionSet = { + if (partition._2 != null) { + partition._2.references.foreach { attr => + if (!table.partitionColumnNames.exists(resolver(_, attr.name))) { + throw new AnalysisException(s"${attr.name} is not a valid partition column " + s"in table ${table.identifier.quotedString}.") + } + } + val partitions = catalog.listPartitionsByFilter(table.identifier, Seq(partition._2)).map(_.spec) + if (partitions.isEmpty && !ifExists) { + throw new AnalysisException(s"There is no partition for ${partition._2.sql}") + } + partitions + } else { + Seq.empty[TablePartitionSpec] } - } - } - - val partitionSet = exprs.flatMap { expr => - val partitions = catalog.listPartitionsByFilter(table.identifier, Seq(expr)).map(_.spec) - if (partitions.isEmpty && !ifExists) { - throw new AnalysisException(s"There is no partition for ${expr.sql}") - } - partitions - }.distinct - - val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( - spec, - table.partitionColumnNames, - table.identifier.quotedString, - sparkSession.sessionState.conf.resolver) - }.filter(_.size != 0) + }.distinct - val toDrop = { if (normalizedSpecs.isEmpty && partitionSet.isEmpty) { Seq.empty[TablePartitionSpec] } else if (normalizedSpecs.isEmpty && !partitionSet.isEmpty) { partitionSet } else if (!normalizedSpecs.isEmpty && partitionSet.isEmpty) { - normalizedSpecs + Seq(normalizedSpecs) } else { - partitionSet.intersect(normalizedSpecs) + partitionSet.intersect(normalizedSpecs.toSeq) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 675bee85bf61e..ac0fef5f60e74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -25,7 +25,7 @@ import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.util.SchemaUtils @@ -128,7 +128,7 @@ case class InsertIntoHadoopFsRelationCommand( val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions if (deletedPartitions.nonEmpty) { AlterTableDropPartitionCommand( - catalogTable.get.identifier, deletedPartitions.toSeq, + catalogTable.get.identifier, deletedPartitions.map(x => (x, null)).toSeq, ifExists = true, purge = false, retainData = true /* already deleted */).run(sparkSession) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index 41af956efaaf7..801ca65bb9f6d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -29,11 +29,10 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan -import org.apache.spark.sql.catalyst.expressions.{JsonTuple, Literal} +import org.apache.spark.sql.catalyst.expressions.JsonTuple import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan} -import org.apache.spark.sql.catalyst.plans.logical.{Project, ScriptTransformation} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -41,6 +40,7 @@ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} + class DDLParserSuite extends PlanTest with SharedSQLContext { private lazy val parser = new SparkSqlParser(new SQLConf) @@ -826,8 +826,8 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { val expected1_table = AlterTableDropPartitionCommand( tableIdent, Seq( - Map("dt" -> "2008-08-08", "country" -> "us"), - Map("dt" -> "2009-09-09", "country" -> "uk")), + (Map("dt" -> "2008-08-08", "country" -> "us"), null), + (Map("dt" -> "2009-09-09", "country" -> "uk"), null)), ifExists = true, purge = false, retainData = false) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index fad85d957a79e..2ecfd168ed3d3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -500,13 +500,30 @@ class HiveDDLSuite withTable("sales") { sql("CREATE TABLE sales (id INT) PARTITIONED BY (country STRING, quarter STRING)") - for (country <- Seq("US", "CA", "KR")) { - for (quarter <- 1 to 4) { + for (country <- Seq("AU", "US", "CA", "KR")) { + for (quarter <- 1 to 5) { sql(s"ALTER TABLE sales ADD PARTITION (country = '$country', quarter = '$quarter')") } } sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter > '2')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=AU/quarter=1") :: + Row("country=AU/quarter=2") :: + Row("country=CA/quarter=1") :: + Row("country=CA/quarter=2") :: + Row("country=KR/quarter=1") :: + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=KR/quarter=5") :: + Row("country=US/quarter=1") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: + Row("country=US/quarter=5") :: Nil) + + sql("ALTER TABLE sales DROP PARTITION (country < 'CA'), PARTITION (quarter = '5')") checkAnswer(sql("SHOW PARTITIONS sales"), Row("country=CA/quarter=1") :: Row("country=CA/quarter=2") :: @@ -536,13 +553,13 @@ class HiveDDLSuite Row("country=US/quarter=2") :: Row("country=US/quarter=4") :: Nil) - sql("ALTER TABLE sales DROP PARTITION (quarter <= 2), PARTITION (quarter >= '4')") + sql("ALTER TABLE sales DROP PARTITION (quarter <= '2'), PARTITION (quarter >= '4')") checkAnswer(sql("SHOW PARTITIONS sales"), Row("country=KR/quarter=3") :: Nil) // According to the declarative partition spec definitions, this drops the union of target // partitions without exceptions. Hive raises exceptions because it handles them sequentially. - sql("ALTER TABLE sales DROP PARTITION (quarter <= 4), PARTITION (quarter <= '3')") + sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '3')") checkAnswer(sql("SHOW PARTITIONS sales"), Nil) } } From f18caeb64253f98c7413a9a8d4b2d31a529e4cf0 Mon Sep 17 00:00:00 2001 From: Dylan Su Date: Tue, 14 Nov 2017 15:09:07 +0800 Subject: [PATCH 03/14] Scala Style fix --- .../scala/org/apache/spark/sql/execution/command/ddl.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index f1f9bcfbc0496..27167cce78afa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -539,10 +539,12 @@ case class AlterTableDropPartitionCommand( if (partition._2 != null) { partition._2.references.foreach { attr => if (!table.partitionColumnNames.exists(resolver(_, attr.name))) { - throw new AnalysisException(s"${attr.name} is not a valid partition column " + s"in table ${table.identifier.quotedString}.") + throw new AnalysisException(s"${attr.name} is not a valid partition column " + + s"in table ${table.identifier.quotedString}.") } } - val partitions = catalog.listPartitionsByFilter(table.identifier, Seq(partition._2)).map(_.spec) + val partitions = catalog.listPartitionsByFilter( + table.identifier, Seq(partition._2)).map(_.spec) if (partitions.isEmpty && !ifExists) { throw new AnalysisException(s"There is no partition for ${partition._2.sql}") } From f79c6f445741dd64714cbd177145eecce09715bc Mon Sep 17 00:00:00 2001 From: Dylan Su Date: Tue, 14 Nov 2017 15:32:54 +0800 Subject: [PATCH 04/14] Scala Style fix --- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 2ecfd168ed3d3..fed9b81bdd59c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -30,9 +30,9 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET} From 8728d3be8002464d51a77eae0d0052f2856747b5 Mon Sep 17 00:00:00 2001 From: Dylan Su Date: Tue, 14 Nov 2017 16:47:15 +0800 Subject: [PATCH 05/14] some changes --- .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 4 ++-- .../apache/spark/sql/execution/command/DDLParserSuite.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 4b1cf7b9a1050..fda3529feb459 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -283,7 +283,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Create a partition fileter specification. + * Create a partition filter specification. */ def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) { val parts = ctx.expression.asScala.map { pVal => @@ -315,7 +315,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * + * Create a partition specification map without optional values and a partition filter specification. */ protected def visitPartition(ctx: PartitionSpecContext): (Map[String, String], Expression) = { (visitNonOptionalPartitionSpec(ctx), visitPartitionFilterSpec(ctx)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index 801ca65bb9f6d..3dfe17983c360 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -32,7 +32,8 @@ import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan import org.apache.spark.sql.catalyst.expressions.JsonTuple import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{Project, ScriptTransformation} import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -40,7 +41,6 @@ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} - class DDLParserSuite extends PlanTest with SharedSQLContext { private lazy val parser = new SparkSqlParser(new SQLConf) From 9832ec55191deb995fe975d01d7899cb049207e5 Mon Sep 17 00:00:00 2001 From: Dylan Su Date: Tue, 14 Nov 2017 16:58:00 +0800 Subject: [PATCH 06/14] Scala Style fix --- .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index fda3529feb459..a97288014df66 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -315,7 +315,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Create a partition specification map without optional values and a partition filter specification. + * Create a partition specification map without optional values + * and a partition filter specification. */ protected def visitPartition(ctx: PartitionSpecContext): (Map[String, String], Expression) = { (visitNonOptionalPartitionSpec(ctx), visitPartitionFilterSpec(ctx)) From dd5d4825d247d9949eb2bf10c05ee7661c389709 Mon Sep 17 00:00:00 2001 From: Dazhuang Su Date: Mon, 21 May 2018 20:27:00 +0800 Subject: [PATCH 07/14] update --- .../sql/catalyst/parser/AstBuilder.scala | 11 +++--- .../spark/sql/execution/command/ddl.scala | 34 ++++++++++++------- .../execution/command/DDLParserSuite.scala | 4 +-- .../sql/hive/execution/HiveDDLSuite.scala | 22 ++++++++++-- 4 files changed, 49 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index a97288014df66..ae750c1e774c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -285,21 +285,21 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging /** * Create a partition filter specification. */ - def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) { + def visitPartitionFilterSpec(ctx: PartitionSpecContext): Seq[Expression] = withOrigin(ctx) { val parts = ctx.expression.asScala.map { pVal => expression(pVal) match { case EqualNullSafe(_, _) => throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => - cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), constant)) + cmp case _ => throw new ParseException("Invalid partition filter specification", ctx) } } if(parts.isEmpty) { - null + Seq.empty[Expression] } else { - parts.reduceLeft(And) + parts } } @@ -318,7 +318,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * Create a partition specification map without optional values * and a partition filter specification. */ - protected def visitPartition(ctx: PartitionSpecContext): (Map[String, String], Expression) = { + protected def visitPartition( + ctx: PartitionSpecContext): (Map[String, String], Seq[Expression]) = { (visitNonOptionalPartitionSpec(ctx), visitPartitionFilterSpec(ctx)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 27167cce78afa..06393d92384d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -29,10 +29,10 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver} +import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, PredicateHelper} +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BinaryComparison, Cast, Expression, Literal, PredicateHelper} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils} import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat @@ -515,7 +515,7 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, - partitions: Seq[(TablePartitionSpec, Expression)], + partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) @@ -536,19 +536,27 @@ case class AlterTableDropPartitionCommand( sparkSession.sessionState.conf.resolver) val partitionSet = { - if (partition._2 != null) { - partition._2.references.foreach { attr => - if (!table.partitionColumnNames.exists(resolver(_, attr.name))) { - throw new AnalysisException(s"${attr.name} is not a valid partition column " + + if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => + val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => + (name, constant.value) + } + if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + s"in table ${table.identifier.quotedString}.") } + val dataType = table.partitionSchema.apply(attrName).dataType + expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) + }.reduce(And) + + val partitions = catalog.listPartitionsByFilter( + table.identifier, Seq(parts)).map(_.spec) + if (partitions.isEmpty && !ifExists) { + throw new AnalysisException(s"There is no partition for ${parts.sql}") } - val partitions = catalog.listPartitionsByFilter( - table.identifier, Seq(partition._2)).map(_.spec) - if (partitions.isEmpty && !ifExists) { - throw new AnalysisException(s"There is no partition for ${partition._2.sql}") - } - partitions + partitions } else { Seq.empty[TablePartitionSpec] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index 3dfe17983c360..cb27f3378682d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -826,8 +826,8 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { val expected1_table = AlterTableDropPartitionCommand( tableIdent, Seq( - (Map("dt" -> "2008-08-08", "country" -> "us"), null), - (Map("dt" -> "2009-09-09", "country" -> "uk"), null)), + (Map("dt" -> "2008-08-08", "country" -> "us"), List()), + (Map("dt" -> "2009-09-09", "country" -> "uk"), List())), ifExists = true, purge = false, retainData = false) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index fed9b81bdd59c..2e3c46616785a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -496,7 +496,16 @@ class HiveDDLSuite } } - test("SPARK-17732; Drop partitions by filter") { + def testDropPartition(dataType: DataType, value: Any): Unit = { + withTable("tbl_x") { + sql(s"CREATE TABLE tbl_x (a INT) PARTITIONED BY (p ${dataType.sql})") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = $value)") + sql(s"ALTER TABLE tbl_x DROP PARTITION (p >= $value)") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), Nil) + } + } + + test("SPARK-17732: Drop partitions by filter") { withTable("sales") { sql("CREATE TABLE sales (id INT) PARTITIONED BY (country STRING, quarter STRING)") @@ -562,6 +571,15 @@ class HiveDDLSuite sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '3')") checkAnswer(sql("SHOW PARTITIONS sales"), Nil) } + testDropPartition(IntegerType, 1) + testDropPartition(BooleanType, true) + testDropPartition(StringType, "'true'") + testDropPartition(LongType, 1L) + testDropPartition(ShortType, 1.toShort) + testDropPartition(ByteType, 1.toByte) + testDropPartition(FloatType, 1.0F) + testDropPartition(DoubleType, 1.0) + testDropPartition(DecimalType(2, 1), Decimal(1.5)) } test("SPARK-14922, SPARK-17732: Error handling for drop partitions by filter") { @@ -600,7 +618,7 @@ class HiveDDLSuite // The query is not executed because `PARTITION (quarter <= '2')` is invalid. checkAnswer(sql("SHOW PARTITIONS sales"), Row("country=KR/quarter=3") :: Nil) - assert(m6.contains("There is no partition for (`quarter` <= '2')")) + assert(m6.contains("There is no partition for (`quarter` <= CAST('2' AS STRING))")) } } From b4a637cd3eedc0e2d8ce2495f24bab6721ea5193 Mon Sep 17 00:00:00 2001 From: Dazhuang Su Date: Tue, 22 May 2018 02:29:34 +0800 Subject: [PATCH 08/14] update --- .../datasources/InsertIntoHadoopFsRelationCommand.scala | 2 +- .../apache/spark/sql/execution/command/DDLParserSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index ac0fef5f60e74..f010d0a9eda93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -128,7 +128,7 @@ case class InsertIntoHadoopFsRelationCommand( val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions if (deletedPartitions.nonEmpty) { AlterTableDropPartitionCommand( - catalogTable.get.identifier, deletedPartitions.map(x => (x, null)).toSeq, + catalogTable.get.identifier, deletedPartitions.map(x => (x, Seq())).toSeq, ifExists = true, purge = false, retainData = true /* already deleted */).run(sparkSession) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index cb27f3378682d..9667ebcadb495 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -826,8 +826,8 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { val expected1_table = AlterTableDropPartitionCommand( tableIdent, Seq( - (Map("dt" -> "2008-08-08", "country" -> "us"), List()), - (Map("dt" -> "2009-09-09", "country" -> "uk"), List())), + (Map("dt" -> "2008-08-08", "country" -> "us"), Seq()), + (Map("dt" -> "2009-09-09", "country" -> "uk"), Seq())), ifExists = true, purge = false, retainData = false) From 4b5c74ebc080e70d5183b62f69397cabd43d06df Mon Sep 17 00:00:00 2001 From: Dazhuang Su Date: Tue, 22 May 2018 16:06:51 +0800 Subject: [PATCH 09/14] update --- .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 3 +++ .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 7 +++++++ 2 files changed, 10 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index ae750c1e774c7..73435a78f537e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -292,6 +292,9 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => cmp + case bc @ BinaryComparison(constant: Literal, _) => + throw new ParseException("Literal " + constant + + " is supported only on the rigth-side.", ctx) case _ => throw new ParseException("Invalid partition filter specification", ctx) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 2e3c46616785a..ea75060593d76 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -619,6 +619,13 @@ class HiveDDLSuite checkAnswer(sql("SHOW PARTITIONS sales"), Row("country=KR/quarter=3") :: Nil) assert(m6.contains("There is no partition for (`quarter` <= CAST('2' AS STRING))")) + + val m7 = intercept[ParseException] { + sql("ALTER TABLE sales DROP PARTITION ( '4' > quarter)") + }.getMessage + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=3") :: Nil) + assert(m7.contains("Literal 4 is supported only on the rigth-side")) } } From e92185c021435b4b041f7a4b88ea9875f4976433 Mon Sep 17 00:00:00 2001 From: Dazhuang Su Date: Fri, 25 May 2018 16:25:51 +0800 Subject: [PATCH 10/14] add some tests --- .../sql/catalyst/parser/AstBuilder.scala | 6 +-- .../InsertIntoHadoopFsRelationCommand.scala | 2 +- .../execution/command/DDLParserSuite.scala | 4 +- .../sql/hive/execution/HiveDDLSuite.scala | 38 +++++++++++++------ 4 files changed, 30 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 73435a78f537e..5256a86475cd3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -299,11 +299,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging throw new ParseException("Invalid partition filter specification", ctx) } } - if(parts.isEmpty) { - Seq.empty[Expression] - } else { - parts - } + parts } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index f010d0a9eda93..39519276b8d05 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -128,7 +128,7 @@ case class InsertIntoHadoopFsRelationCommand( val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions if (deletedPartitions.nonEmpty) { AlterTableDropPartitionCommand( - catalogTable.get.identifier, deletedPartitions.map(x => (x, Seq())).toSeq, + catalogTable.get.identifier, deletedPartitions.map(x => (x, Seq.empty)).toSeq, ifExists = true, purge = false, retainData = true /* already deleted */).run(sparkSession) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index 9667ebcadb495..3b21cb1199db1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -826,8 +826,8 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { val expected1_table = AlterTableDropPartitionCommand( tableIdent, Seq( - (Map("dt" -> "2008-08-08", "country" -> "us"), Seq()), - (Map("dt" -> "2009-09-09", "country" -> "uk"), Seq())), + (Map("dt" -> "2008-08-08", "country" -> "us"), Seq.empty), + (Map("dt" -> "2009-09-09", "country" -> "uk"), Seq.empty)), ifExists = true, purge = false, retainData = false) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index ea75060593d76..b3da4625f6c10 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -496,11 +496,15 @@ class HiveDDLSuite } } - def testDropPartition(dataType: DataType, value: Any): Unit = { + def testDropPartition(dataType: DataType, value1: Any, value2: Any): Unit = { withTable("tbl_x") { sql(s"CREATE TABLE tbl_x (a INT) PARTITIONED BY (p ${dataType.sql})") - sql(s"ALTER TABLE tbl_x ADD PARTITION (p = $value)") - sql(s"ALTER TABLE tbl_x DROP PARTITION (p >= $value)") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = $value1)") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = $value2)") + sql(s"ALTER TABLE tbl_x DROP PARTITION (p >= $value2)") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), + Row(s"p=$value1") :: Nil) + sql(s"ALTER TABLE tbl_x DROP PARTITION (p = $value1)") checkAnswer(sql("SHOW PARTITIONS tbl_x"), Nil) } } @@ -571,15 +575,25 @@ class HiveDDLSuite sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '3')") checkAnswer(sql("SHOW PARTITIONS sales"), Nil) } - testDropPartition(IntegerType, 1) - testDropPartition(BooleanType, true) - testDropPartition(StringType, "'true'") - testDropPartition(LongType, 1L) - testDropPartition(ShortType, 1.toShort) - testDropPartition(ByteType, 1.toByte) - testDropPartition(FloatType, 1.0F) - testDropPartition(DoubleType, 1.0) - testDropPartition(DecimalType(2, 1), Decimal(1.5)) + + withTable("tbl_x") { + sql(s"CREATE TABLE tbl_x (a INT) PARTITIONED BY (p STRING)") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = 'false')") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = 'true')") + sql(s"ALTER TABLE tbl_x DROP PARTITION (p >= 'true')") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), + Row(s"p=false") :: Nil) + sql(s"ALTER TABLE tbl_x DROP PARTITION (p = 'false')") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), Nil) + } + testDropPartition(IntegerType, 1, 2) + testDropPartition(BooleanType, false, true) + testDropPartition(LongType, 1L, 2L) + testDropPartition(ShortType, 1.toShort, 2.toShort) + testDropPartition(ByteType, 1.toByte, 2.toByte) + testDropPartition(FloatType, 1.0F, 2.0F) + testDropPartition(DoubleType, 1.0, 2.0) + testDropPartition(DecimalType(2, 1), Decimal(1.5), Decimal(2.5)) } test("SPARK-14922, SPARK-17732: Error handling for drop partitions by filter") { From 182449b76e6798cba5e2ee0d22d3eb8f984f8979 Mon Sep 17 00:00:00 2001 From: Dazhuang Su Date: Thu, 31 May 2018 00:47:15 +0800 Subject: [PATCH 11/14] address comments --- .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 5256a86475cd3..0772dcf471b47 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -293,10 +293,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => cmp case bc @ BinaryComparison(constant: Literal, _) => - throw new ParseException("Literal " + constant - + " is supported only on the rigth-side.", ctx) + throw new ParseException(s"Literal $constant is supported only on the rigth-side.", ctx) case _ => - throw new ParseException("Invalid partition filter specification", ctx) + throw new ParseException( + s"Invalid partition filter specification (${pVal.getText}).", ctx) } } parts From d725fc96804d5f4418b0916d3e6e9d912576bad8 Mon Sep 17 00:00:00 2001 From: Dazhuang Su Date: Fri, 1 Jun 2018 00:20:03 +0800 Subject: [PATCH 12/14] update --- .../org/apache/spark/sql/catalyst/expressions/literals.scala | 1 + .../main/scala/org/apache/spark/sql/execution/command/ddl.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index eaeaf08c37b4e..d3323e1e58512 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -57,6 +57,7 @@ object Literal { case b: Byte => Literal(b, ByteType) case s: Short => Literal(s, ShortType) case s: String => Literal(UTF8String.fromString(s), StringType) + case s: UTF8String => Literal(s, StringType) case b: Boolean => Literal(b, BooleanType) case d: BigDecimal => Literal(Decimal(d), DecimalType(Math.max(d.precision, d.scale), d.scale)) case d: JavaBigDecimal => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 06393d92384d9..e5f6f99c09080 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -548,7 +548,7 @@ case class AlterTableDropPartitionCommand( } val dataType = table.partitionSchema.apply(attrName).dataType expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), - Cast(Literal(value.toString), dataType))) + Cast(Literal(value), dataType))) }.reduce(And) val partitions = catalog.listPartitionsByFilter( From defc9f11831c053727970e8d9c1f784ec5223644 Mon Sep 17 00:00:00 2001 From: Dazhuang Su Date: Fri, 1 Jun 2018 22:27:50 +0800 Subject: [PATCH 13/14] update --- .../apache/spark/sql/catalyst/expressions/literals.scala | 1 - .../scala/org/apache/spark/sql/execution/command/ddl.scala | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index d3323e1e58512..eaeaf08c37b4e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -57,7 +57,6 @@ object Literal { case b: Byte => Literal(b, ByteType) case s: Short => Literal(s, ShortType) case s: String => Literal(UTF8String.fromString(s), StringType) - case s: UTF8String => Literal(s, StringType) case b: Boolean => Literal(b, BooleanType) case d: BigDecimal => Literal(Decimal(d), DecimalType(Math.max(d.precision, d.scale), d.scale)) case d: JavaBigDecimal => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index e5f6f99c09080..cbe6ec9b90caa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -538,9 +538,9 @@ case class AlterTableDropPartitionCommand( val partitionSet = { if (partition._2.nonEmpty) { val parts = partition._2.map { expr => - val (attrName, value) = expr match { + val (attrName, constant) = expr match { case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => - (name, constant.value) + (name, constant) } if (!table.partitionColumnNames.exists(resolver(_, attrName))) { throw new AnalysisException(s"${attrName} is not a valid partition column " + @@ -548,7 +548,7 @@ case class AlterTableDropPartitionCommand( } val dataType = table.partitionSchema.apply(attrName).dataType expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), - Cast(Literal(value), dataType))) + Cast(constant, dataType))) }.reduce(And) val partitions = catalog.listPartitionsByFilter( From 6b189398c138e2fb17085ece1bd36d30cbc0aa44 Mon Sep 17 00:00:00 2001 From: Dazhuang Su Date: Tue, 5 Jun 2018 13:18:05 +0800 Subject: [PATCH 14/14] address comments --- .../spark/sql/execution/command/ddl.scala | 93 ++++++++++--------- 1 file changed, 51 insertions(+), 42 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index cbe6ec9b90caa..0d1fa4e97f624 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -500,7 +500,8 @@ case class AlterTableRenamePartitionCommand( } /** - * Drop Partition in ALTER TABLE: to drop a particular partition for a table. + * Drop Partition in ALTER TABLE: to drop a particular partition + * or a set of partitions according to given expressions for a table. * * This removes the data and metadata for this partition. * The data is actually moved to the .Trash/Current directory if Trash is configured, @@ -510,7 +511,8 @@ case class AlterTableRenamePartitionCommand( * * The syntax of this command is: * {{{ - * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1) + * [, PARTITION (spec2, expr2), ...] [PURGE]; * }}} */ case class AlterTableDropPartitionCommand( @@ -529,47 +531,18 @@ case class AlterTableDropPartitionCommand( DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") val toDrop = partitions.flatMap { partition => - val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( - partition._1, - table.partitionColumnNames, - table.identifier.quotedString, - sparkSession.sessionState.conf.resolver) - - val partitionSet = { - if (partition._2.nonEmpty) { - val parts = partition._2.map { expr => - val (attrName, constant) = expr match { - case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => - (name, constant) - } - if (!table.partitionColumnNames.exists(resolver(_, attrName))) { - throw new AnalysisException(s"${attrName} is not a valid partition column " + - s"in table ${table.identifier.quotedString}.") - } - val dataType = table.partitionSchema.apply(attrName).dataType - expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), - Cast(constant, dataType))) - }.reduce(And) - - val partitions = catalog.listPartitionsByFilter( - table.identifier, Seq(parts)).map(_.spec) - if (partitions.isEmpty && !ifExists) { - throw new AnalysisException(s"There is no partition for ${parts.sql}") - } - partitions - } else { - Seq.empty[TablePartitionSpec] - } - }.distinct - - if (normalizedSpecs.isEmpty && partitionSet.isEmpty) { - Seq.empty[TablePartitionSpec] - } else if (normalizedSpecs.isEmpty && !partitionSet.isEmpty) { - partitionSet - } else if (!normalizedSpecs.isEmpty && partitionSet.isEmpty) { - Seq(normalizedSpecs) + if (partition._1.isEmpty && !partition._2.isEmpty) { + // There are only expressions in this drop condition. + extractFromPartitionFilter(partition._2, catalog, table, resolver) + } else if (!partition._1.isEmpty && partition._2.isEmpty) { + // There are only partitionSpecs in this drop condition. + extractFromPartitionSpec(partition._1, table, resolver) + } else if (!partition._1.isEmpty && !partition._2.isEmpty) { + // This drop condition has both partitionSpecs and expressions. + extractFromPartitionFilter(partition._2, catalog, table, resolver).intersect( + extractFromPartitionSpec(partition._1, table, resolver)) } else { - partitionSet.intersect(normalizedSpecs.toSeq) + Seq.empty[TablePartitionSpec] } } @@ -582,6 +555,42 @@ case class AlterTableDropPartitionCommand( Seq.empty[Row] } + private def extractFromPartitionSpec( + specs: TablePartitionSpec, + table: CatalogTable, + resolver: Resolver): Seq[Map[String, String]] = { + Seq(PartitioningUtils.normalizePartitionSpec( + specs, + table.partitionColumnNames, + table.identifier.quotedString, + resolver)) + } + + private def extractFromPartitionFilter( + filters: Seq[Expression], + catalog: SessionCatalog, + table: CatalogTable, + resolver: Resolver): Seq[TablePartitionSpec] = { + val expressions = filters.map { expr => + val (attrName, constant) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => + (name, constant) + } + if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + + s"in table ${table.identifier.quotedString}.") + } + val dataType = table.partitionSchema.apply(attrName).dataType + expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(constant, dataType))) + }.reduce(And) + val parts = catalog.listPartitionsByFilter( + table.identifier, Seq(expressions)).map(_.spec) + if (parts.isEmpty && !ifExists) { + throw new AnalysisException(s"There is no partition for ${expressions.sql}") + } + parts + } }