From b57a5d1797dbe206aeb0a4d2a24ccd0c73845dc8 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 5 Apr 2018 17:35:19 +0200 Subject: [PATCH 01/14] [WIP][SPARK-23866][SQL] Support partition filters in ALTER TABLE DROP PARTITION --- .../spark/sql/catalyst/parser/SqlBase.g4 | 16 ++- .../sql/catalyst/catalog/interface.scala | 5 + .../sql/catalyst/parser/AstBuilder.scala | 16 +++ .../spark/sql/execution/SparkSqlParser.scala | 2 +- .../spark/sql/execution/command/ddl.scala | 101 ++++++++++++++++-- .../InsertIntoHadoopFsRelationCommand.scala | 2 +- .../datasources/PartitioningUtils.scala | 14 ++- .../execution/command/DDLParserSuite.scala | 3 +- 8 files changed, 138 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 5fa75fe348e68..0c46aca4640b1 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -114,10 +114,10 @@ statement partitionSpec+ #addTablePartition | ALTER TABLE tableIdentifier from=partitionSpec RENAME TO to=partitionSpec #renameTablePartition - | ALTER TABLE tableIdentifier - DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* PURGE? #dropTablePartitions - | ALTER VIEW tableIdentifier - DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions + | ALTER TABLE tableIdentifier DROP (IF EXISTS)? + dropPartitionSpec (',' dropPartitionSpec)* PURGE? #dropTablePartitions + | ALTER VIEW tableIdentifier DROP (IF EXISTS)? + dropPartitionSpec (',' dropPartitionSpec)* #dropTablePartitions | ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation | ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions | DROP TABLE (IF EXISTS)? tableIdentifier PURGE? #dropTable @@ -261,6 +261,14 @@ partitionVal : identifier (EQ constant)? ; +dropPartitionSpec + : PARTITION '(' dropPartitionVal (',' dropPartitionVal)* ')' + ; + +dropPartitionVal + : identifier (comparisonOperator constant)? + ; + describeFuncName : qualifiedName | STRING diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index f3e67dc4e975c..0061bd2cd3980 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -552,6 +552,11 @@ object CatalogTypes { */ type TablePartitionSpec = Map[String, String] + /** + * Specifications of table partition filters. Seq of column name, comparison operator and value. + */ + type PartitionFiltersSpec = Seq[(String, String, String)] + /** * Initialize an empty spec. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index bdc357d54a878..dc516cfa65f6e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -293,6 +293,22 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[(String, String, String)] = { + withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => + val partition = pFilter.identifier().getText + val value = visitStringConstant(pFilter.constant()) + val operator = pFilter.comparisonOperator().getChild(0).asInstanceOf[TerminalNode] + val stringOperator = SqlBaseParser.VOCABULARY.getSymbolicName(operator.getSymbol.getType) + (partition, stringOperator, value) + } + } + } + /** * Convert a constant of any type into a string. This is typically used in DDL commands, and its * main purpose is to prevent slight differences due to back to back conversions i.e.: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 4828fa60a7b58..1148fbd15c7f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -927,7 +927,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } AlterTableDropPartitionCommand( visitTableIdentifier(ctx.tableIdentifier), - ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), + ctx.dropPartitionSpec().asScala.map(visitDropPartitionSpec), ifExists = ctx.EXISTS != null, purge = ctx.PURGE != null, retainData = false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index bf4d96fa18d0d..56066eb876e2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -29,10 +29,10 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver} +import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.{PartitionFiltersSpec, TablePartitionSpec} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, EqualNullSafe, EqualTo, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Not} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils} import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat @@ -521,7 +521,7 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, - specs: Seq[TablePartitionSpec], + partitionsFilters: Seq[PartitionFiltersSpec], ifExists: Boolean, purge: Boolean, retainData: Boolean) @@ -529,20 +529,36 @@ case class AlterTableDropPartitionCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog + val timeZone = Option(sparkSession.sessionState.conf.sessionLocalTimeZone) val table = catalog.getTableMetadata(tableName) + val partitionColumns = table.partitionColumnNames + val partitionAttributes = table.partitionSchema.toAttributes.map(a => a.name -> a).toMap DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") - val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( - spec, - table.partitionColumnNames, - table.identifier.quotedString, - sparkSession.sessionState.conf.resolver) + val resolvedSpecs = partitionsFilters.flatMap { filtersSpec => + if (hasComplexFilters(filtersSpec)) { + generatePartitionSpec(filtersSpec, + partitionColumns, + partitionAttributes, + table.identifier, + catalog, + sparkSession.sessionState.conf.resolver, + timeZone) + } else { + val partitionSpec = filtersSpec.map { + case (key, _, value) => key -> value + }.toMap + PartitioningUtils.normalizePartitionSpec( + partitionSpec, + partitionColumns, + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) :: Nil + } } catalog.dropPartitions( - table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge, + table.identifier, resolvedSpecs, ignoreIfNotExists = ifExists, purge = purge, retainData = retainData) CommandUtils.updateTableStats(sparkSession, table) @@ -550,6 +566,69 @@ case class AlterTableDropPartitionCommand( Seq.empty[Row] } + def hasComplexFilters(partitionFilterSpec: PartitionFiltersSpec): Boolean = { + !partitionFilterSpec.forall(_._2 == "EQ") + } + + def generatePartitionSpec( + partitionFilterSpec: PartitionFiltersSpec, + partitionColumns: Seq[String], + partitionAttributes: Map[String, Attribute], + tableIdentifier: TableIdentifier, + catalog: SessionCatalog, + resolver: Resolver, + timeZone: Option[String]): Seq[TablePartitionSpec] = { + val filters = partitionFilterSpec.map { case (partitionColumn, operator, value) => + val normalizedPartition = PartitioningUtils.normalizePartitionColumn( + partitionColumn, + partitionColumns, + tableIdentifier.quotedString, + resolver) + val partitionAttr = partitionAttributes(normalizedPartition) + val castedLiteralValue = Cast(Literal(value), partitionAttr.dataType, timeZone) + operator match { + case "EQ" => + EqualTo(partitionAttr, castedLiteralValue) + case "NSEQ" => + EqualNullSafe(partitionAttr, castedLiteralValue) + case "NEQ" | "NEQJ" => + Not(EqualTo(partitionAttr, castedLiteralValue)) + case "LT" => + LessThan(partitionAttr, castedLiteralValue) + case "LTE" => + LessThanOrEqual(partitionAttr, castedLiteralValue) + case "GT" => + GreaterThan(partitionAttr, castedLiteralValue) + case "GTE" => + GreaterThanOrEqual(partitionAttr, castedLiteralValue) + } + } + val partitions = catalog.listPartitionsByFilter(tableIdentifier, filters) + partitions.map(_.spec) + } +} + + +object AlterTableDropPartitionCommand { + + def fromSpecs( + tableName: TableIdentifier, + specs: Seq[TablePartitionSpec], + ifExists: Boolean, + purge: Boolean, + retainData: Boolean): AlterTableDropPartitionCommand = { + AlterTableDropPartitionCommand(tableName, + specs.map(tablePartitionToPartitionFiltersSpec), + ifExists, + purge, + retainData) + } + + def tablePartitionToPartitionFiltersSpec(spec: TablePartitionSpec): PartitionFiltersSpec = { + spec.map { + case (key, value) => (key, "EQ", value) + }.toSeq + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index dd7ef0d15c140..9b5746a9dec20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -141,7 +141,7 @@ case class InsertIntoHadoopFsRelationCommand( if (mode == SaveMode.Overwrite && !dynamicPartitionOverwrite) { val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions if (deletedPartitions.nonEmpty) { - AlterTableDropPartitionCommand( + AlterTableDropPartitionCommand.fromSpecs( catalogTable.get.identifier, deletedPartitions.toSeq, ifExists = true, purge = false, retainData = true /* already deleted */).run(sparkSession) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index f9a24806953e6..b355934c3544c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -296,9 +296,7 @@ object PartitioningUtils { tblName: String, resolver: Resolver): Map[String, T] = { val normalizedPartSpec = partitionSpec.toSeq.map { case (key, value) => - val normalizedKey = partColNames.find(resolver(_, key)).getOrElse { - throw new AnalysisException(s"$key is not a valid partition column in table $tblName.") - } + val normalizedKey = normalizePartitionColumn(key, partColNames, tblName, resolver) normalizedKey -> value } @@ -308,6 +306,16 @@ object PartitioningUtils { normalizedPartSpec.toMap } + def normalizePartitionColumn( + partition: String, + partColNames: Seq[String], + tblName: String, + resolver: Resolver): String = { + partColNames.find(resolver(_, partition)).getOrElse { + throw new AnalysisException(s"$partition is not a valid partition column in table $tblName.") + } + } + /** * Resolves possible type conflicts between partitions by up-casting "lower" types using * [[findWiderTypeForPartitionColumn]]. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index e0ccae15f1d05..87129d374a9b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -861,7 +861,8 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { assertUnsupported(sql2_view) val tableIdent = TableIdentifier("table_name", None) - val expected1_table = AlterTableDropPartitionCommand( + + val expected1_table = AlterTableDropPartitionCommand.fromSpecs( tableIdent, Seq( Map("dt" -> "2008-08-08", "country" -> "us"), From 148f47742cae892260c46f9ffa97bb2d0422701d Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 5 Sep 2018 18:11:38 +0200 Subject: [PATCH 02/14] adding UT from SPARK-14922 by DazhuangSu --- .../sql/catalyst/parser/AstBuilder.scala | 4 + .../spark/sql/execution/command/ddl.scala | 11 +- .../sql/hive/execution/HiveDDLSuite.scala | 138 ++++++++++++++++++ 3 files changed, 150 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index dc516cfa65f6e..254e6f02eea82 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -300,6 +300,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging ctx: DropPartitionSpecContext): Seq[(String, String, String)] = { withOrigin(ctx) { ctx.dropPartitionVal().asScala.map { pFilter => + if (pFilter.identifier() == null || pFilter.constant() == null || + pFilter.comparisonOperator() == null) { + throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) + } val partition = pFilter.identifier().getText val value = visitStringConstant(pFilter.constant()) val operator = pFilter.comparisonOperator().getChild(0).asInstanceOf[TerminalNode] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 56066eb876e2c..ca47d5cc55b81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.{PartitionFiltersSpec, TablePartitionSpec} -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, EqualNullSafe, EqualTo, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Not} +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, Cast, EqualNullSafe, EqualTo, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Not} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils} import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat @@ -544,7 +544,8 @@ case class AlterTableDropPartitionCommand( table.identifier, catalog, sparkSession.sessionState.conf.resolver, - timeZone) + timeZone, + ifExists) } else { val partitionSpec = filtersSpec.map { case (key, _, value) => key -> value @@ -577,7 +578,8 @@ case class AlterTableDropPartitionCommand( tableIdentifier: TableIdentifier, catalog: SessionCatalog, resolver: Resolver, - timeZone: Option[String]): Seq[TablePartitionSpec] = { + timeZone: Option[String], + ifExists: Boolean): Seq[TablePartitionSpec] = { val filters = partitionFilterSpec.map { case (partitionColumn, operator, value) => val normalizedPartition = PartitioningUtils.normalizePartitionColumn( partitionColumn, @@ -604,6 +606,9 @@ case class AlterTableDropPartitionCommand( } } val partitions = catalog.listPartitionsByFilter(tableIdentifier, filters) + if (partitions.isEmpty && !ifExists) { + throw new AnalysisException(s"There is no partition for ${filters.reduceLeft(And).sql}") + } partitions.map(_.spec) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index db76ec9d084cb..d820892621201 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET} @@ -577,6 +578,143 @@ class HiveDDLSuite } } + def testDropPartition(dataType: DataType, value1: Any, value2: Any): Unit = { + withTable("tbl_x") { + sql(s"CREATE TABLE tbl_x (a INT) PARTITIONED BY (p ${dataType.sql})") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = $value1)") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = $value2)") + sql(s"ALTER TABLE tbl_x DROP PARTITION (p >= $value2)") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), + Row(s"p=$value1") :: Nil) + sql(s"ALTER TABLE tbl_x DROP PARTITION (p = $value1)") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), Nil) + } + } + + test("SPARK-14922: Drop partitions by filter") { + withTable("sales") { + sql("CREATE TABLE sales (id INT) PARTITIONED BY (country STRING, quarter STRING)") + for (country <- Seq("AU", "US", "CA", "KR")) { + for (quarter <- 1 to 5) { + sql(s"ALTER TABLE sales ADD PARTITION (country = '$country', quarter = '$quarter')") + } + } + sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter > '2')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=AU/quarter=1") :: + Row("country=AU/quarter=2") :: + Row("country=CA/quarter=1") :: + Row("country=CA/quarter=2") :: + Row("country=KR/quarter=1") :: + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=KR/quarter=5") :: + Row("country=US/quarter=1") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: + Row("country=US/quarter=5") :: Nil) + sql("ALTER TABLE sales DROP PARTITION (country < 'CA'), PARTITION (quarter = '5')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=CA/quarter=1") :: + Row("country=CA/quarter=2") :: + Row("country=KR/quarter=1") :: + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=US/quarter=1") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: Nil) + sql("ALTER TABLE sales DROP PARTITION (country < 'KR'), PARTITION (quarter <= '1')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: Nil) + sql("ALTER TABLE sales DROP PARTITION (country = 'KR', quarter = '4')") + sql("ALTER TABLE sales DROP PARTITION (country = 'US', quarter = '3')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=4") :: Nil) + sql("ALTER TABLE sales DROP PARTITION (quarter <= '2'), PARTITION (quarter >= '4')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=3") :: Nil) + // According to the declarative partition spec definitions, this drops the union of target + // partitions without exceptions. Hive raises exceptions because it handles them sequentially. + sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '3')") + checkAnswer(sql("SHOW PARTITIONS sales"), Nil) + } + withTable("tbl_x") { + sql(s"CREATE TABLE tbl_x (a INT) PARTITIONED BY (p STRING)") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = 'false')") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = 'true')") + sql(s"ALTER TABLE tbl_x DROP PARTITION (p >= 'true')") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), + Row(s"p=false") :: Nil) + sql(s"ALTER TABLE tbl_x DROP PARTITION (p = 'false')") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), Nil) + } + testDropPartition(IntegerType, 1, 2) + testDropPartition(BooleanType, false, true) + testDropPartition(LongType, 1L, 2L) + testDropPartition(ShortType, 1.toShort, 2.toShort) + testDropPartition(ByteType, 1.toByte, 2.toByte) + testDropPartition(FloatType, 1.0F, 2.0F) + testDropPartition(DoubleType, 1.0, 2.0) + testDropPartition(DecimalType(2, 1), Decimal(1.5), Decimal(2.5)) + } + + test("SPARK-14922: Error handling for drop partitions by filter") { + withTable("sales") { + sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") + val m = intercept[AnalysisException] { + sql("ALTER TABLE sales DROP PARTITION (unknown = 'KR')") + }.getMessage + assert(m.contains("unknown is not a valid partition column in table")) + val m2 = intercept[AnalysisException] { + sql("ALTER TABLE sales DROP PARTITION (unknown < 'KR')") + }.getMessage + assert(m2.contains("unknown is not a valid partition column in table")) + intercept[AnalysisException] { + sql("ALTER TABLE sales DROP PARTITION (unknown <=> 'KR')") + } + intercept[ParseException] { + sql("ALTER TABLE sales DROP PARTITION (unknown <=> upper('KR'))") + } + intercept[ParseException] { + sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter)") + } + sql(s"ALTER TABLE sales ADD PARTITION (country = 'KR', quarter = '3')") + intercept[AnalysisException] { + // The query is not executed because `PARTITION (quarter <= '2')`. + sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '2')") + } + + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=3") :: Nil) + intercept[ParseException] { + sql("ALTER TABLE sales DROP PARTITION ( '4' > quarter)") + } + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=3") :: Nil) + } + } + + test("SPARK-14922: Partition filter is not allowed in ADD PARTITION") { + withTable("sales") { + sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") + intercept[AnalysisException] { + sql("ALTER TABLE sales ADD PARTITION (country = 'US', quarter < '1')") + } + } + } + test("drop views") { withTable("tab1") { val tabName = "tab1" From 7d3cf0c4cb2bfe58320c6d1859993ea213f9c6e1 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 6 Sep 2018 14:03:38 +0200 Subject: [PATCH 03/14] move expression build logic to AstBuilder --- .../sql/catalyst/catalog/interface.scala | 5 -- .../sql/catalyst/parser/AstBuilder.scala | 29 +++++++-- .../spark/sql/execution/command/ddl.scala | 61 ++++++++----------- 3 files changed, 51 insertions(+), 44 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 0061bd2cd3980..f3e67dc4e975c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -552,11 +552,6 @@ object CatalogTypes { */ type TablePartitionSpec = Map[String, String] - /** - * Specifications of table partition filters. Seq of column name, comparison operator and value. - */ - type PartitionFiltersSpec = Seq[(String, String, String)] - /** * Initialize an empty spec. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 254e6f02eea82..5e8f555cfb788 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -297,18 +297,20 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * Create a partition specification map with filters. */ override def visitDropPartitionSpec( - ctx: DropPartitionSpecContext): Seq[(String, String, String)] = { + ctx: DropPartitionSpecContext): Seq[Expression] = { withOrigin(ctx) { ctx.dropPartitionVal().asScala.map { pFilter => if (pFilter.identifier() == null || pFilter.constant() == null || pFilter.comparisonOperator() == null) { throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) } - val partition = pFilter.identifier().getText - val value = visitStringConstant(pFilter.constant()) + // We cannot use UnresolvedAttribute because resolution is performed after Analysis, when + // running the command. The type is not relevant, it is replaced during the real resolution + val partition = + AttributeReference(pFilter.identifier().getText, StringType)() + val value = Literal(visitStringConstant(pFilter.constant())) val operator = pFilter.comparisonOperator().getChild(0).asInstanceOf[TerminalNode] - val stringOperator = SqlBaseParser.VOCABULARY.getSymbolicName(operator.getSymbol.getType) - (partition, stringOperator, value) + buildComparison(partition, value, operator) } } } @@ -1035,6 +1037,23 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val left = expression(ctx.left) val right = expression(ctx.right) val operator = ctx.comparisonOperator().getChild(0).asInstanceOf[TerminalNode] + buildComparison(left, right, operator) + } + + /** + * Creates a comparison expression. The following comparison operators are supported: + * - Equal: '=' or '==' + * - Null-safe Equal: '<=>' + * - Not Equal: '<>' or '!=' + * - Less than: '<' + * - Less then or Equal: '<=' + * - Greater than: '>' + * - Greater then or Equal: '>=' + */ + private def buildComparison( + left: Expression, + right: Expression, + operator: TerminalNode): Expression = { operator.getSymbol.getType match { case SqlBaseParser.EQ => EqualTo(left, right) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index ca47d5cc55b81..33c94c3aacd21 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -29,10 +29,10 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.Resolver +import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.catalog.CatalogTypes.{PartitionFiltersSpec, TablePartitionSpec} -import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, Cast, EqualNullSafe, EqualTo, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Not} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils} import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat @@ -521,7 +521,7 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, - partitionsFilters: Seq[PartitionFiltersSpec], + partitionsFilters: Seq[Seq[Expression]], ifExists: Boolean, purge: Boolean, retainData: Boolean) @@ -548,7 +548,8 @@ case class AlterTableDropPartitionCommand( ifExists) } else { val partitionSpec = filtersSpec.map { - case (key, _, value) => key -> value + case EqualTo(key: Attribute, Literal(value, StringType)) => + key.name -> value.toString }.toMap PartitioningUtils.normalizePartitionSpec( partitionSpec, @@ -567,12 +568,12 @@ case class AlterTableDropPartitionCommand( Seq.empty[Row] } - def hasComplexFilters(partitionFilterSpec: PartitionFiltersSpec): Boolean = { - !partitionFilterSpec.forall(_._2 == "EQ") + def hasComplexFilters(partitionFilterSpec: Seq[Expression]): Boolean = { + partitionFilterSpec.exists(!_.isInstanceOf[EqualTo]) } def generatePartitionSpec( - partitionFilterSpec: PartitionFiltersSpec, + partitionFilterSpec: Seq[Expression], partitionColumns: Seq[String], partitionAttributes: Map[String, Attribute], tableIdentifier: TableIdentifier, @@ -580,29 +581,21 @@ case class AlterTableDropPartitionCommand( resolver: Resolver, timeZone: Option[String], ifExists: Boolean): Seq[TablePartitionSpec] = { - val filters = partitionFilterSpec.map { case (partitionColumn, operator, value) => - val normalizedPartition = PartitioningUtils.normalizePartitionColumn( - partitionColumn, - partitionColumns, - tableIdentifier.quotedString, - resolver) - val partitionAttr = partitionAttributes(normalizedPartition) - val castedLiteralValue = Cast(Literal(value), partitionAttr.dataType, timeZone) - operator match { - case "EQ" => - EqualTo(partitionAttr, castedLiteralValue) - case "NSEQ" => - EqualNullSafe(partitionAttr, castedLiteralValue) - case "NEQ" | "NEQJ" => - Not(EqualTo(partitionAttr, castedLiteralValue)) - case "LT" => - LessThan(partitionAttr, castedLiteralValue) - case "LTE" => - LessThanOrEqual(partitionAttr, castedLiteralValue) - case "GT" => - GreaterThan(partitionAttr, castedLiteralValue) - case "GTE" => - GreaterThanOrEqual(partitionAttr, castedLiteralValue) + val filters = partitionFilterSpec.map { pFilter => + pFilter.transform { + // Resolve the partition attributes + case partitionCol: Attribute => + val normalizedPartition = PartitioningUtils.normalizePartitionColumn( + partitionCol.name, + partitionColumns, + tableIdentifier.quotedString, + resolver) + partitionAttributes(normalizedPartition) + }.transform { + // Cast the partition value to the data type of the corresponding partition attribute + case cmp @ BinaryComparison(partitionAttr, value) + if !partitionAttr.dataType.sameType(value.dataType) => + cmp.withNewChildren(Seq(partitionAttr, Cast(value, partitionAttr.dataType, timeZone))) } } val partitions = catalog.listPartitionsByFilter(tableIdentifier, filters) @@ -623,15 +616,15 @@ object AlterTableDropPartitionCommand { purge: Boolean, retainData: Boolean): AlterTableDropPartitionCommand = { AlterTableDropPartitionCommand(tableName, - specs.map(tablePartitionToPartitionFiltersSpec), + specs.map(tablePartitionToPartitionFilters), ifExists, purge, retainData) } - def tablePartitionToPartitionFiltersSpec(spec: TablePartitionSpec): PartitionFiltersSpec = { + def tablePartitionToPartitionFilters(spec: TablePartitionSpec): Seq[Expression] = { spec.map { - case (key, value) => (key, "EQ", value) + case (key, value) => EqualTo(AttributeReference(key, StringType)(), Literal(value)) }.toSeq } } From a964d2a7def5aed04bd362b3000b36583c0ba272 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sun, 9 Sep 2018 20:10:30 +0200 Subject: [PATCH 04/14] address comments --- .../sql/catalyst/parser/AstBuilder.scala | 3 +- .../execution/command/DDLParserSuite.scala | 25 +++++++++++++++- .../sql/hive/execution/HiveDDLSuite.scala | 29 ++++++++++++------- 3 files changed, 43 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 5e8f555cfb788..eaa7e6337ee19 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -300,8 +300,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging ctx: DropPartitionSpecContext): Seq[Expression] = { withOrigin(ctx) { ctx.dropPartitionVal().asScala.map { pFilter => - if (pFilter.identifier() == null || pFilter.constant() == null || - pFilter.comparisonOperator() == null) { + if (pFilter.constant() == null || pFilter.comparisonOperator() == null) { throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) } // We cannot use UnresolvedAttribute because resolution is performed after Analysis, when diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index 87129d374a9b0..88609a78290ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan -import org.apache.spark.sql.catalyst.expressions.JsonTuple +import org.apache.spark.sql.catalyst.expressions.{Expression, JsonTuple} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan} @@ -876,6 +876,29 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { comparePlans(parsed1_table, expected1_table) comparePlans(parsed2_table, expected2_table) comparePlans(parsed1_purge, expected1_purge) + + // SPARK-23866: Support any comparison operator in ALTER TABLE ... DROP PARTITION + Seq((">", (a: Expression, b: Expression) => a > b), + (">=", (a: Expression, b: Expression) => a >= b), + ("<", (a: Expression, b: Expression) => a < b), + ("<=", (a: Expression, b: Expression) => a <= b), + ("<>", (a: Expression, b: Expression) => a =!= b), + ("!=", (a: Expression, b: Expression) => a =!= b), + ("<=>", (a: Expression, b: Expression) => a <=> b)).foreach { case (op, predicateGen) => + val genPlan = parser.parsePlan(sql1_table.replace("=", op)) + val expectedPlan = AlterTableDropPartitionCommand( + tableIdent, + Seq( + Seq(predicateGen('dt.string, "2008-08-08"), predicateGen('country.string, "us")), + Seq(predicateGen('dt.string, "2009-09-09"), predicateGen('country.string, "uk"))), + ifExists = true, + purge = false, + retainData = false) + comparePlans(genPlan, expectedPlan) + } + + // SPARK-23866: Invalid partition specification + intercept("ALTER TABLE table_name DROP PARTITION (dt)", "Invalid partition spec:") } test("alter table: archive partition (not supported)") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index d820892621201..253ddeb1a0596 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -681,26 +681,32 @@ class HiveDDLSuite sql("ALTER TABLE sales DROP PARTITION (unknown < 'KR')") }.getMessage assert(m2.contains("unknown is not a valid partition column in table")) - intercept[AnalysisException] { + val m3 = intercept[AnalysisException] { sql("ALTER TABLE sales DROP PARTITION (unknown <=> 'KR')") - } - intercept[ParseException] { + }.getMessage + assert(m3.contains("unknown is not a valid partition column in table")) + val m4 = intercept[ParseException] { sql("ALTER TABLE sales DROP PARTITION (unknown <=> upper('KR'))") - } - intercept[ParseException] { + }.getMessage + assert(m4.contains("extraneous input")) + val m5 = intercept[ParseException] { sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter)") - } + }.getMessage + assert(m5.contains("Invalid partition spec: quarter")) + sql(s"ALTER TABLE sales ADD PARTITION (country = 'KR', quarter = '3')") - intercept[AnalysisException] { + val m6 = intercept[AnalysisException] { // The query is not executed because `PARTITION (quarter <= '2')`. sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '2')") - } + }.getMessage + assert(m6.contains("There is no partition for")) checkAnswer(sql("SHOW PARTITIONS sales"), Row("country=KR/quarter=3") :: Nil) - intercept[ParseException] { + val m7 = intercept[ParseException] { sql("ALTER TABLE sales DROP PARTITION ( '4' > quarter)") - } + }.getMessage + assert(m7.contains("mismatched input ''4''")) checkAnswer(sql("SHOW PARTITIONS sales"), Row("country=KR/quarter=3") :: Nil) } @@ -709,9 +715,10 @@ class HiveDDLSuite test("SPARK-14922: Partition filter is not allowed in ADD PARTITION") { withTable("sales") { sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") - intercept[AnalysisException] { + val e = intercept[AnalysisException] { sql("ALTER TABLE sales ADD PARTITION (country = 'US', quarter < '1')") } + assert(e.getMessage.contains("extraneous input '<'")) } } From 94d186263fce0382d0b4be563d9726a97528c61e Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 9 Oct 2018 11:37:59 +0200 Subject: [PATCH 05/14] handle nullsafe equal --- .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 7 ++++++- .../spark/sql/execution/command/DDLParserSuite.scala | 6 ++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index eaa7e6337ee19..d8731f7046a03 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -309,7 +309,12 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging AttributeReference(pFilter.identifier().getText, StringType)() val value = Literal(visitStringConstant(pFilter.constant())) val operator = pFilter.comparisonOperator().getChild(0).asInstanceOf[TerminalNode] - buildComparison(partition, value, operator) + val comparison = buildComparison(partition, value, operator) + if (comparison.isInstanceOf[EqualNullSafe]) { + throw new ParseException( + "'<=>' operator is not supported in ALTER TABLE ... DROP PARTITION.", ctx) + } + comparison } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index 88609a78290ee..b67d6a1fc3abe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -883,8 +883,7 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { ("<", (a: Expression, b: Expression) => a < b), ("<=", (a: Expression, b: Expression) => a <= b), ("<>", (a: Expression, b: Expression) => a =!= b), - ("!=", (a: Expression, b: Expression) => a =!= b), - ("<=>", (a: Expression, b: Expression) => a <=> b)).foreach { case (op, predicateGen) => + ("!=", (a: Expression, b: Expression) => a =!= b)).foreach { case (op, predicateGen) => val genPlan = parser.parsePlan(sql1_table.replace("=", op)) val expectedPlan = AlterTableDropPartitionCommand( tableIdent, @@ -897,6 +896,9 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { comparePlans(genPlan, expectedPlan) } + // SPARK-23866: <=> is not supported + intercept("ALTER TABLE table_name DROP PARTITION (dt <=> 'a')", "operator is not supported in") + // SPARK-23866: Invalid partition specification intercept("ALTER TABLE table_name DROP PARTITION (dt)", "Invalid partition spec:") } From 67c22147a83232739388b423d5100ca2527b72e5 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 9 Oct 2018 14:15:53 +0200 Subject: [PATCH 06/14] fix ut --- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 253ddeb1a0596..998f5d1ed2800 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -684,7 +684,7 @@ class HiveDDLSuite val m3 = intercept[AnalysisException] { sql("ALTER TABLE sales DROP PARTITION (unknown <=> 'KR')") }.getMessage - assert(m3.contains("unknown is not a valid partition column in table")) + assert(m3.contains("'<=>' operator is not supported")) val m4 = intercept[ParseException] { sql("ALTER TABLE sales DROP PARTITION (unknown <=> upper('KR'))") }.getMessage From 6397f980c948e977e931614d2b885693428a9648 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 9 Oct 2018 16:01:59 +0200 Subject: [PATCH 07/14] address comments --- .../scala/org/apache/spark/sql/execution/command/ddl.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 33c94c3aacd21..f406ad63186db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -595,7 +595,9 @@ case class AlterTableDropPartitionCommand( // Cast the partition value to the data type of the corresponding partition attribute case cmp @ BinaryComparison(partitionAttr, value) if !partitionAttr.dataType.sameType(value.dataType) => - cmp.withNewChildren(Seq(partitionAttr, Cast(value, partitionAttr.dataType, timeZone))) + val dt = partitionAttr.dataType + val lit = Literal(Cast(value, dt, timeZone).eval(), dt) + cmp.withNewChildren(Seq(partitionAttr, lit)) } } val partitions = catalog.listPartitionsByFilter(tableIdentifier, filters) From 77a945ea14e2a0a1eb97d7b0bce9c223b2a6c0ea Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 12 Oct 2018 09:32:32 +0200 Subject: [PATCH 08/14] introduce PartitioningAttribute --- .../expressions/namedExpressions.scala | 21 +++++++++++++++++++ .../sql/catalyst/parser/AstBuilder.scala | 5 ++--- .../spark/sql/execution/command/ddl.scala | 4 ++-- 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 8df870468c2ad..81eea1db58887 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -368,6 +368,27 @@ case class OuterReference(e: NamedExpression) override def newInstance(): NamedExpression = OuterReference(e.newInstance()) } +/** + * A place holder used to hold the name of the partition attributes specified when running commands + * involving partitions, eg. ALTER TABLE ... DROP PARTITIONS. + */ +case class PartitioningAttribute(name: String) + extends Attribute with Unevaluable { + override def dataType: DataType = throw new UnsupportedOperationException + override def nullable: Boolean = false + + override def qualifier: Option[String] = throw new UnsupportedOperationException + override def exprId: ExprId = throw new UnsupportedOperationException + override def withNullability(newNullability: Boolean): Attribute = + throw new UnsupportedOperationException + override def newInstance(): Attribute = throw new UnsupportedOperationException + override def withQualifier(newQualifier: Option[String]): Attribute = + throw new UnsupportedOperationException + override def withName(newName: String): Attribute = throw new UnsupportedOperationException + override def withMetadata(newMetadata: Metadata): Attribute = + throw new UnsupportedOperationException +} + object VirtualColumn { // The attribute name used by Hive, which has different result than Spark, deprecated. val hiveGroupingIdName: String = "grouping__id" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index d8731f7046a03..1e1b67bc46d81 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -304,9 +304,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) } // We cannot use UnresolvedAttribute because resolution is performed after Analysis, when - // running the command. The type is not relevant, it is replaced during the real resolution - val partition = - AttributeReference(pFilter.identifier().getText, StringType)() + // running the command. + val partition = PartitioningAttribute(pFilter.identifier().getText) val value = Literal(visitStringConstant(pFilter.constant())) val operator = pFilter.comparisonOperator().getChild(0).asInstanceOf[TerminalNode] val comparison = buildComparison(partition, value, operator) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index f406ad63186db..89a327745e2be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -584,7 +584,7 @@ case class AlterTableDropPartitionCommand( val filters = partitionFilterSpec.map { pFilter => pFilter.transform { // Resolve the partition attributes - case partitionCol: Attribute => + case partitionCol: PartitioningAttribute => val normalizedPartition = PartitioningUtils.normalizePartitionColumn( partitionCol.name, partitionColumns, @@ -626,7 +626,7 @@ object AlterTableDropPartitionCommand { def tablePartitionToPartitionFilters(spec: TablePartitionSpec): Seq[Expression] = { spec.map { - case (key, value) => EqualTo(AttributeReference(key, StringType)(), Literal(value)) + case (key, value) => EqualTo(PartitioningAttribute(key), Literal(value)) }.toSeq } } From 5e9e28cbd22c6f9a46ba5db59afd11cf3482ed0e Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 12 Oct 2018 10:01:11 +0200 Subject: [PATCH 09/14] fix build after upstream changes --- .../spark/sql/catalyst/expressions/namedExpressions.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 3269693f8dbd3..e3d28791891b7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -391,12 +391,12 @@ case class PartitioningAttribute(name: String) override def dataType: DataType = throw new UnsupportedOperationException override def nullable: Boolean = false - override def qualifier: Option[String] = throw new UnsupportedOperationException + override def qualifier: Seq[String] = throw new UnsupportedOperationException override def exprId: ExprId = throw new UnsupportedOperationException override def withNullability(newNullability: Boolean): Attribute = throw new UnsupportedOperationException override def newInstance(): Attribute = throw new UnsupportedOperationException - override def withQualifier(newQualifier: Option[String]): Attribute = + override def withQualifier(newQualifier: Seq[String]): Attribute = throw new UnsupportedOperationException override def withName(newName: String): Attribute = throw new UnsupportedOperationException override def withMetadata(newMetadata: Metadata): Attribute = From 441acf342a5fb11dd351f66a92a73e6dcfcfde76 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 12 Oct 2018 13:18:03 +0200 Subject: [PATCH 10/14] fix failures --- .../spark/sql/catalyst/expressions/namedExpressions.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index e3d28791891b7..5a34883aaeb72 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -388,11 +388,14 @@ case class OuterReference(e: NamedExpression) */ case class PartitioningAttribute(name: String) extends Attribute with Unevaluable { - override def dataType: DataType = throw new UnsupportedOperationException + override val exprId: ExprId = NamedExpression.newExprId + // Not really needed and used. We just need a dataType to be used during analysis for resolving + // the expressions. The String type is used because all the literals in PARTITION operations are + // parsed as strings and eventually casted later. + override def dataType: DataType = StringType override def nullable: Boolean = false override def qualifier: Seq[String] = throw new UnsupportedOperationException - override def exprId: ExprId = throw new UnsupportedOperationException override def withNullability(newNullability: Boolean): Attribute = throw new UnsupportedOperationException override def newInstance(): Attribute = throw new UnsupportedOperationException From 9b8405748b3756024e346a2f00d4561f7617b16e Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 12 Oct 2018 17:12:23 +0200 Subject: [PATCH 11/14] fix ut --- .../spark/sql/execution/command/DDLParserSuite.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index b67d6a1fc3abe..ad1e3648eb4a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan -import org.apache.spark.sql.catalyst.expressions.{Expression, JsonTuple} +import org.apache.spark.sql.catalyst.expressions.{Expression, JsonTuple, PartitioningAttribute} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan} @@ -885,11 +885,13 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { ("<>", (a: Expression, b: Expression) => a =!= b), ("!=", (a: Expression, b: Expression) => a =!= b)).foreach { case (op, predicateGen) => val genPlan = parser.parsePlan(sql1_table.replace("=", op)) + val dtAttr = PartitioningAttribute("dt") + val countryAttr = PartitioningAttribute("country") val expectedPlan = AlterTableDropPartitionCommand( tableIdent, Seq( - Seq(predicateGen('dt.string, "2008-08-08"), predicateGen('country.string, "us")), - Seq(predicateGen('dt.string, "2009-09-09"), predicateGen('country.string, "uk"))), + Seq(predicateGen(dtAttr, "2008-08-08"), predicateGen(countryAttr, "us")), + Seq(predicateGen(dtAttr, "2009-09-09"), predicateGen(countryAttr, "uk"))), ifExists = true, purge = false, retainData = false) From 2085088a566e079c77d7be4716c48c3d15e283ba Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 15 Oct 2018 11:52:32 +0200 Subject: [PATCH 12/14] address comments --- .../expressions/namedExpressions.scala | 11 ++++--- .../sql/catalyst/parser/AstBuilder.scala | 32 +++++++++---------- 2 files changed, 21 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 5a34883aaeb72..78dc75a3652f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -386,12 +386,13 @@ case class OuterReference(e: NamedExpression) * A place holder used to hold the name of the partition attributes specified when running commands * involving partitions, eg. ALTER TABLE ... DROP PARTITIONS. */ -case class PartitioningAttribute(name: String) +case class PartitioningAttribute( + name: String, + override val exprId: ExprId = NamedExpression.newExprId) extends Attribute with Unevaluable { - override val exprId: ExprId = NamedExpression.newExprId - // Not really needed and used. We just need a dataType to be used during analysis for resolving - // the expressions. The String type is used because all the literals in PARTITION operations are - // parsed as strings and eventually casted later. + // We need a dataType to be used during analysis for resolving the expressions (see + // checkInputDataTypes). The String type is used because all the literals in PARTITION operations + // are parsed as strings and eventually casted later. override def dataType: DataType = StringType override def nullable: Boolean = false diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 150690841f8f8..cb128b85c41c4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -297,24 +297,22 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * Create a partition specification map with filters. */ override def visitDropPartitionSpec( - ctx: DropPartitionSpecContext): Seq[Expression] = { - withOrigin(ctx) { - ctx.dropPartitionVal().asScala.map { pFilter => - if (pFilter.constant() == null || pFilter.comparisonOperator() == null) { - throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) - } - // We cannot use UnresolvedAttribute because resolution is performed after Analysis, when - // running the command. - val partition = PartitioningAttribute(pFilter.identifier().getText) - val value = Literal(visitStringConstant(pFilter.constant())) - val operator = pFilter.comparisonOperator().getChild(0).asInstanceOf[TerminalNode] - val comparison = buildComparison(partition, value, operator) - if (comparison.isInstanceOf[EqualNullSafe]) { - throw new ParseException( - "'<=>' operator is not supported in ALTER TABLE ... DROP PARTITION.", ctx) - } - comparison + ctx: DropPartitionSpecContext): Seq[Expression] = withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => + if (pFilter.constant() == null || pFilter.comparisonOperator() == null) { + throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) + } + // We cannot use UnresolvedAttribute because resolution is performed after Analysis, when + // running the command. + val partition = PartitioningAttribute(pFilter.identifier().getText) + val value = Literal(visitStringConstant(pFilter.constant())) + val operator = pFilter.comparisonOperator().getChild(0).asInstanceOf[TerminalNode] + val comparison = buildComparison(partition, value, operator) + if (comparison.isInstanceOf[EqualNullSafe]) { + throw new ParseException( + "'<=>' operator is not supported in ALTER TABLE ... DROP PARTITION.", ctx) } + comparison } } From 146aa32d4df291de39c51b989d192937f42c1109 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 15 Oct 2018 16:58:28 +0200 Subject: [PATCH 13/14] fix ut failure --- .../spark/sql/catalyst/expressions/namedExpressions.scala | 2 ++ .../spark/sql/execution/command/DDLParserSuite.scala | 8 ++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 78dc75a3652f1..a80764c5a8d19 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -405,6 +405,8 @@ case class PartitioningAttribute( override def withName(newName: String): Attribute = throw new UnsupportedOperationException override def withMetadata(newMetadata: Metadata): Attribute = throw new UnsupportedOperationException + + override lazy val canonicalized: Expression = this.copy(exprId = ExprId(0)) } object VirtualColumn { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index ad1e3648eb4a9..d5cbc84d83130 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -873,9 +873,9 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { val expected2_table = expected1_table.copy(ifExists = false) val expected1_purge = expected1_table.copy(purge = true) - comparePlans(parsed1_table, expected1_table) - comparePlans(parsed2_table, expected2_table) - comparePlans(parsed1_purge, expected1_purge) + comparePlans(parsed1_table.canonicalized, expected1_table.canonicalized) + comparePlans(parsed2_table.canonicalized, expected2_table.canonicalized) + comparePlans(parsed1_purge.canonicalized, expected1_purge.canonicalized) // SPARK-23866: Support any comparison operator in ALTER TABLE ... DROP PARTITION Seq((">", (a: Expression, b: Expression) => a > b), @@ -895,7 +895,7 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { ifExists = true, purge = false, retainData = false) - comparePlans(genPlan, expectedPlan) + comparePlans(genPlan.canonicalized, expectedPlan.canonicalized) } // SPARK-23866: <=> is not supported From 96760614d5d2cd31a75982d28948a28376f67964 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sat, 20 Jul 2019 12:43:02 +0200 Subject: [PATCH 14/14] fix --- .../spark/sql/catalyst/expressions/namedExpressions.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 8456f4daa2c49..42fc71e0e6b68 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -416,6 +416,8 @@ case class PartitioningAttribute( throw new UnsupportedOperationException override lazy val canonicalized: Expression = this.copy(exprId = ExprId(0)) + + override def withExprId(newExprId: ExprId): Attribute = throw new UnsupportedOperationException } object VirtualColumn {