diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index d991e7cf7e898..f70c44ca15ba1 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -149,10 +149,10 @@ statement partitionSpec+ #addTablePartition | ALTER TABLE tableIdentifier from=partitionSpec RENAME TO to=partitionSpec #renameTablePartition - | ALTER TABLE tableIdentifier - DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* PURGE? #dropTablePartitions - | ALTER VIEW tableIdentifier - DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions + | ALTER TABLE tableIdentifier DROP (IF EXISTS)? + dropPartitionSpec (',' dropPartitionSpec)* PURGE? #dropTablePartitions + | ALTER VIEW tableIdentifier DROP (IF EXISTS)? + dropPartitionSpec (',' dropPartitionSpec)* #dropTablePartitions | ALTER TABLE multipartIdentifier SET locationSpec #setTableLocation | ALTER TABLE tableIdentifier partitionSpec SET locationSpec #setPartitionLocation | ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions @@ -300,6 +300,14 @@ partitionVal : identifier (EQ constant)? ; +dropPartitionSpec + : PARTITION '(' dropPartitionVal (',' dropPartitionVal)* ')' + ; + +dropPartitionVal + : identifier (comparisonOperator constant)? + ; + database : DATABASE | SCHEMA diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 3362353e2662a..42fc71e0e6b68 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -391,6 +391,35 @@ case class OuterReference(e: NamedExpression) override def newInstance(): NamedExpression = OuterReference(e.newInstance()) } +/** + * A place holder used to hold the name of the partition attributes specified when running commands + * involving partitions, eg. ALTER TABLE ... DROP PARTITIONS. + */ +case class PartitioningAttribute( + name: String, + override val exprId: ExprId = NamedExpression.newExprId) + extends Attribute with Unevaluable { + // We need a dataType to be used during analysis for resolving the expressions (see + // checkInputDataTypes). The String type is used because all the literals in PARTITION operations + // are parsed as strings and eventually casted later. + override def dataType: DataType = StringType + override def nullable: Boolean = false + + override def qualifier: Seq[String] = throw new UnsupportedOperationException + override def withNullability(newNullability: Boolean): Attribute = + throw new UnsupportedOperationException + override def newInstance(): Attribute = throw new UnsupportedOperationException + override def withQualifier(newQualifier: Seq[String]): Attribute = + throw new UnsupportedOperationException + override def withName(newName: String): Attribute = throw new UnsupportedOperationException + override def withMetadata(newMetadata: Metadata): Attribute = + throw new UnsupportedOperationException + + override lazy val canonicalized: Expression = this.copy(exprId = ExprId(0)) + + override def withExprId(newExprId: ExprId): Attribute = throw new UnsupportedOperationException +} + object VirtualColumn { // The attribute name used by Hive, which has different result than Spark, deprecated. val hiveGroupingIdName: String = "grouping__id" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index d9f8b9a7203ff..5df70a7ba200d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -352,6 +352,29 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => + if (pFilter.constant() == null || pFilter.comparisonOperator() == null) { + throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) + } + // We cannot use UnresolvedAttribute because resolution is performed after Analysis, when + // running the command. + val partition = PartitioningAttribute(pFilter.identifier().getText) + val value = Literal(visitStringConstant(pFilter.constant())) + val operator = pFilter.comparisonOperator().getChild(0).asInstanceOf[TerminalNode] + val comparison = buildComparison(partition, value, operator) + if (comparison.isInstanceOf[EqualNullSafe]) { + throw new ParseException( + "'<=>' operator is not supported in ALTER TABLE ... DROP PARTITION.", ctx) + } + comparison + } + } + /** * Convert a constant of any type into a string. This is typically used in DDL commands, and its * main purpose is to prevent slight differences due to back to back conversions i.e.: @@ -1174,6 +1197,23 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val left = expression(ctx.left) val right = expression(ctx.right) val operator = ctx.comparisonOperator().getChild(0).asInstanceOf[TerminalNode] + buildComparison(left, right, operator) + } + + /** + * Creates a comparison expression. The following comparison operators are supported: + * - Equal: '=' or '==' + * - Null-safe Equal: '<=>' + * - Not Equal: '<>' or '!=' + * - Less than: '<' + * - Less then or Equal: '<=' + * - Greater than: '>' + * - Greater then or Equal: '>=' + */ + private def buildComparison( + left: Expression, + right: Expression, + operator: TerminalNode): Expression = { operator.getSymbol.getType match { case SqlBaseParser.EQ => EqualTo(left, right) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 12cd8abcad890..7f69c7fd8a94f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -749,7 +749,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } AlterTableDropPartitionCommand( visitTableIdentifier(ctx.tableIdentifier), - ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), + ctx.dropPartitionSpec().asScala.map(visitDropPartitionSpec), ifExists = ctx.EXISTS != null, purge = ctx.PURGE != null, retainData = false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index ee5d37cebf2f3..4827a10375f2e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -31,10 +31,10 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.Resolver +import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils} import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat @@ -524,7 +524,7 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, - specs: Seq[TablePartitionSpec], + partitionsFilters: Seq[Seq[Expression]], ifExists: Boolean, purge: Boolean, retainData: Boolean) @@ -532,20 +532,38 @@ case class AlterTableDropPartitionCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog + val timeZone = Option(sparkSession.sessionState.conf.sessionLocalTimeZone) val table = catalog.getTableMetadata(tableName) + val partitionColumns = table.partitionColumnNames + val partitionAttributes = table.partitionSchema.toAttributes.map(a => a.name -> a).toMap DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") - val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( - spec, - table.partitionColumnNames, - table.identifier.quotedString, - sparkSession.sessionState.conf.resolver) + val resolvedSpecs = partitionsFilters.flatMap { filtersSpec => + if (hasComplexFilters(filtersSpec)) { + generatePartitionSpec(filtersSpec, + partitionColumns, + partitionAttributes, + table.identifier, + catalog, + sparkSession.sessionState.conf.resolver, + timeZone, + ifExists) + } else { + val partitionSpec = filtersSpec.map { + case EqualTo(key: Attribute, Literal(value, StringType)) => + key.name -> value.toString + }.toMap + PartitioningUtils.normalizePartitionSpec( + partitionSpec, + partitionColumns, + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) :: Nil + } } catalog.dropPartitions( - table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge, + table.identifier, resolvedSpecs, ignoreIfNotExists = ifExists, purge = purge, retainData = retainData) CommandUtils.updateTableStats(sparkSession, table) @@ -553,6 +571,67 @@ case class AlterTableDropPartitionCommand( Seq.empty[Row] } + def hasComplexFilters(partitionFilterSpec: Seq[Expression]): Boolean = { + partitionFilterSpec.exists(!_.isInstanceOf[EqualTo]) + } + + def generatePartitionSpec( + partitionFilterSpec: Seq[Expression], + partitionColumns: Seq[String], + partitionAttributes: Map[String, Attribute], + tableIdentifier: TableIdentifier, + catalog: SessionCatalog, + resolver: Resolver, + timeZone: Option[String], + ifExists: Boolean): Seq[TablePartitionSpec] = { + val filters = partitionFilterSpec.map { pFilter => + pFilter.transform { + // Resolve the partition attributes + case partitionCol: PartitioningAttribute => + val normalizedPartition = PartitioningUtils.normalizePartitionColumn( + partitionCol.name, + partitionColumns, + tableIdentifier.quotedString, + resolver) + partitionAttributes(normalizedPartition) + }.transform { + // Cast the partition value to the data type of the corresponding partition attribute + case cmp @ BinaryComparison(partitionAttr, value) + if !partitionAttr.dataType.sameType(value.dataType) => + val dt = partitionAttr.dataType + val lit = Literal(Cast(value, dt, timeZone).eval(), dt) + cmp.withNewChildren(Seq(partitionAttr, lit)) + } + } + val partitions = catalog.listPartitionsByFilter(tableIdentifier, filters) + if (partitions.isEmpty && !ifExists) { + throw new AnalysisException(s"There is no partition for ${filters.reduceLeft(And).sql}") + } + partitions.map(_.spec) + } +} + + +object AlterTableDropPartitionCommand { + + def fromSpecs( + tableName: TableIdentifier, + specs: Seq[TablePartitionSpec], + ifExists: Boolean, + purge: Boolean, + retainData: Boolean): AlterTableDropPartitionCommand = { + AlterTableDropPartitionCommand(tableName, + specs.map(tablePartitionToPartitionFilters), + ifExists, + purge, + retainData) + } + + def tablePartitionToPartitionFilters(spec: TablePartitionSpec): Seq[Expression] = { + spec.map { + case (key, value) => EqualTo(PartitioningAttribute(key), Literal(value)) + }.toSeq + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index d43fa3893df1d..241c377b4f79e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -148,7 +148,7 @@ case class InsertIntoHadoopFsRelationCommand( if (mode == SaveMode.Overwrite && !dynamicPartitionOverwrite) { val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions if (deletedPartitions.nonEmpty) { - AlterTableDropPartitionCommand( + AlterTableDropPartitionCommand.fromSpecs( catalogTable.get.identifier, deletedPartitions.toSeq, ifExists = true, purge = false, retainData = true /* already deleted */).run(sparkSession) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 6f42423353092..b00a5fa50c45e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -366,9 +366,7 @@ object PartitioningUtils { tblName: String, resolver: Resolver): Map[String, T] = { val normalizedPartSpec = partitionSpec.toSeq.map { case (key, value) => - val normalizedKey = partColNames.find(resolver(_, key)).getOrElse { - throw new AnalysisException(s"$key is not a valid partition column in table $tblName.") - } + val normalizedKey = normalizePartitionColumn(key, partColNames, tblName, resolver) normalizedKey -> value } @@ -378,6 +376,16 @@ object PartitioningUtils { normalizedPartSpec.toMap } + def normalizePartitionColumn( + partition: String, + partColNames: Seq[String], + tblName: String, + resolver: Resolver): String = { + partColNames.find(resolver(_, partition)).getOrElse { + throw new AnalysisException(s"$partition is not a valid partition column in table $tblName.") + } + } + /** * Resolves possible type conflicts between partitions by up-casting "lower" types using * [[findWiderTypeForPartitionColumn]]. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index 83452cdd8927b..5e71c9cc455df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan -import org.apache.spark.sql.catalyst.expressions.JsonTuple +import org.apache.spark.sql.catalyst.expressions.{Expression, JsonTuple, PartitioningAttribute} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan, Project, ScriptTransformation} import org.apache.spark.sql.execution.SparkSqlParser @@ -687,7 +687,8 @@ class DDLParserSuite extends AnalysisTest with SharedSQLContext { assertUnsupported(sql2_view) val tableIdent = TableIdentifier("table_name", None) - val expected1_table = AlterTableDropPartitionCommand( + + val expected1_table = AlterTableDropPartitionCommand.fromSpecs( tableIdent, Seq( Map("dt" -> "2008-08-08", "country" -> "us"), @@ -698,9 +699,36 @@ class DDLParserSuite extends AnalysisTest with SharedSQLContext { val expected2_table = expected1_table.copy(ifExists = false) val expected1_purge = expected1_table.copy(purge = true) - comparePlans(parsed1_table, expected1_table) - comparePlans(parsed2_table, expected2_table) - comparePlans(parsed1_purge, expected1_purge) + comparePlans(parsed1_table.canonicalized, expected1_table.canonicalized) + comparePlans(parsed2_table.canonicalized, expected2_table.canonicalized) + comparePlans(parsed1_purge.canonicalized, expected1_purge.canonicalized) + + // SPARK-23866: Support any comparison operator in ALTER TABLE ... DROP PARTITION + Seq((">", (a: Expression, b: Expression) => a > b), + (">=", (a: Expression, b: Expression) => a >= b), + ("<", (a: Expression, b: Expression) => a < b), + ("<=", (a: Expression, b: Expression) => a <= b), + ("<>", (a: Expression, b: Expression) => a =!= b), + ("!=", (a: Expression, b: Expression) => a =!= b)).foreach { case (op, predicateGen) => + val genPlan = parser.parsePlan(sql1_table.replace("=", op)) + val dtAttr = PartitioningAttribute("dt") + val countryAttr = PartitioningAttribute("country") + val expectedPlan = AlterTableDropPartitionCommand( + tableIdent, + Seq( + Seq(predicateGen(dtAttr, "2008-08-08"), predicateGen(countryAttr, "us")), + Seq(predicateGen(dtAttr, "2009-09-09"), predicateGen(countryAttr, "uk"))), + ifExists = true, + purge = false, + retainData = false) + comparePlans(genPlan.canonicalized, expectedPlan.canonicalized) + } + + // SPARK-23866: <=> is not supported + intercept("ALTER TABLE table_name DROP PARTITION (dt <=> 'a')", "operator is not supported in") + + // SPARK-23866: Invalid partition specification + intercept("ALTER TABLE table_name DROP PARTITION (dt)", "Invalid partition spec:") } test("alter table: archive partition (not supported)") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index a907fcae526c0..90a491d238a8f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -681,6 +681,150 @@ class HiveDDLSuite } } + def testDropPartition(dataType: DataType, value1: Any, value2: Any): Unit = { + withTable("tbl_x") { + sql(s"CREATE TABLE tbl_x (a INT) PARTITIONED BY (p ${dataType.sql})") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = $value1)") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = $value2)") + sql(s"ALTER TABLE tbl_x DROP PARTITION (p >= $value2)") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), + Row(s"p=$value1") :: Nil) + sql(s"ALTER TABLE tbl_x DROP PARTITION (p = $value1)") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), Nil) + } + } + + test("SPARK-14922: Drop partitions by filter") { + withTable("sales") { + sql("CREATE TABLE sales (id INT) PARTITIONED BY (country STRING, quarter STRING)") + for (country <- Seq("AU", "US", "CA", "KR")) { + for (quarter <- 1 to 5) { + sql(s"ALTER TABLE sales ADD PARTITION (country = '$country', quarter = '$quarter')") + } + } + sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter > '2')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=AU/quarter=1") :: + Row("country=AU/quarter=2") :: + Row("country=CA/quarter=1") :: + Row("country=CA/quarter=2") :: + Row("country=KR/quarter=1") :: + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=KR/quarter=5") :: + Row("country=US/quarter=1") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: + Row("country=US/quarter=5") :: Nil) + sql("ALTER TABLE sales DROP PARTITION (country < 'CA'), PARTITION (quarter = '5')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=CA/quarter=1") :: + Row("country=CA/quarter=2") :: + Row("country=KR/quarter=1") :: + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=US/quarter=1") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: Nil) + sql("ALTER TABLE sales DROP PARTITION (country < 'KR'), PARTITION (quarter <= '1')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: Nil) + sql("ALTER TABLE sales DROP PARTITION (country = 'KR', quarter = '4')") + sql("ALTER TABLE sales DROP PARTITION (country = 'US', quarter = '3')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=4") :: Nil) + sql("ALTER TABLE sales DROP PARTITION (quarter <= '2'), PARTITION (quarter >= '4')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=3") :: Nil) + // According to the declarative partition spec definitions, this drops the union of target + // partitions without exceptions. Hive raises exceptions because it handles them sequentially. + sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '3')") + checkAnswer(sql("SHOW PARTITIONS sales"), Nil) + } + withTable("tbl_x") { + sql(s"CREATE TABLE tbl_x (a INT) PARTITIONED BY (p STRING)") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = 'false')") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = 'true')") + sql(s"ALTER TABLE tbl_x DROP PARTITION (p >= 'true')") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), + Row(s"p=false") :: Nil) + sql(s"ALTER TABLE tbl_x DROP PARTITION (p = 'false')") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), Nil) + } + testDropPartition(IntegerType, 1, 2) + testDropPartition(BooleanType, false, true) + testDropPartition(LongType, 1L, 2L) + testDropPartition(ShortType, 1.toShort, 2.toShort) + testDropPartition(ByteType, 1.toByte, 2.toByte) + testDropPartition(FloatType, 1.0F, 2.0F) + testDropPartition(DoubleType, 1.0, 2.0) + testDropPartition(DecimalType(2, 1), Decimal(1.5), Decimal(2.5)) + } + + test("SPARK-14922: Error handling for drop partitions by filter") { + withTable("sales") { + sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") + val m = intercept[AnalysisException] { + sql("ALTER TABLE sales DROP PARTITION (unknown = 'KR')") + }.getMessage + assert(m.contains("unknown is not a valid partition column in table")) + val m2 = intercept[AnalysisException] { + sql("ALTER TABLE sales DROP PARTITION (unknown < 'KR')") + }.getMessage + assert(m2.contains("unknown is not a valid partition column in table")) + val m3 = intercept[AnalysisException] { + sql("ALTER TABLE sales DROP PARTITION (unknown <=> 'KR')") + }.getMessage + assert(m3.contains("'<=>' operator is not supported")) + val m4 = intercept[ParseException] { + sql("ALTER TABLE sales DROP PARTITION (unknown <=> upper('KR'))") + }.getMessage + assert(m4.contains("extraneous input")) + val m5 = intercept[ParseException] { + sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter)") + }.getMessage + assert(m5.contains("Invalid partition spec: quarter")) + + sql(s"ALTER TABLE sales ADD PARTITION (country = 'KR', quarter = '3')") + val m6 = intercept[AnalysisException] { + // The query is not executed because `PARTITION (quarter <= '2')`. + sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '2')") + }.getMessage + assert(m6.contains("There is no partition for")) + + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=3") :: Nil) + val m7 = intercept[ParseException] { + sql("ALTER TABLE sales DROP PARTITION ( '4' > quarter)") + }.getMessage + assert(m7.contains("mismatched input ''4''")) + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=3") :: Nil) + } + } + + test("SPARK-14922: Partition filter is not allowed in ADD PARTITION") { + withTable("sales") { + sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") + val e = intercept[AnalysisException] { + sql("ALTER TABLE sales ADD PARTITION (country = 'US', quarter < '1')") + } + assert(e.getMessage.contains("extraneous input '<'")) + } + } + test("drop views") { withTable("tab1") { val tabName = "tab1"