From 2c5149e3bb6ae3f7c5801564936b77adc535d14b Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Wed, 4 Sep 2024 20:02:37 -0700 Subject: [PATCH 1/4] [SPARK-49152][SQL][FOLLOWUP] fix issues --- .../analysis/ResolveSessionCatalog.scala | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index d569f1ed484cc..bd5af59afd6fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL, ResolveDefaultColumns => DefaultCols} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, DelegatingCatalogExtension, LookupCatalog, SupportsNamespaces, V1Table} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command._ @@ -284,7 +284,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) => AnalyzeColumnCommand(ident, columnNames, allColumns) - case RepairTable(ResolvedV1TableIdentifier(ident), addPartitions, dropPartitions) => + case RepairTable( + ResolvedTableIdentifierInSessionCatalog(ident), addPartitions, dropPartitions) => RepairTableCommand(ident, addPartitions, dropPartitions) case LoadData(ResolvedV1TableIdentifier(ident), path, isLocal, isOverwrite, partition) => @@ -600,6 +601,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } } + object ResolvedTableIdentifierInSessionCatalog { + def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { + case ResolvedTable(catalog, _, t: V1Table, _) if isSessionCatalog(catalog) => + Some(t.catalogTable.identifier) + case _ => None + } + } + object ResolvedV1TableOrViewIdentifier { def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { case ResolvedV1TableIdentifier(ident) => Some(ident) @@ -684,7 +693,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } private def supportsV1Command(catalog: CatalogPlugin): Boolean = { - isSessionCatalog(catalog) && - SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isEmpty + (isSessionCatalog(catalog) && + SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isEmpty) || + catalog.isInstanceOf[DelegatingCatalogExtension] } } From 4227762bbee76891f9044fcca096aa326a146ac8 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 5 Sep 2024 14:38:46 +0800 Subject: [PATCH 2/4] fix --- .../analysis/ResolveSessionCatalog.scala | 31 ++++++++++++------- .../DataSourceV2SQLSessionCatalogSuite.scala | 8 +++++ 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index bd5af59afd6fc..f1b8c6456d81c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -285,10 +285,17 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) AnalyzeColumnCommand(ident, columnNames, allColumns) case RepairTable( - ResolvedTableIdentifierInSessionCatalog(ident), addPartitions, dropPartitions) => + ResolvedV1TableIdentifierInSessionCatalog(ident), + addPartitions, + dropPartitions) => RepairTableCommand(ident, addPartitions, dropPartitions) - case LoadData(ResolvedV1TableIdentifier(ident), path, isLocal, isOverwrite, partition) => + case LoadData( + ResolvedV1TableIdentifierInSessionCatalog(ident), + path, + isLocal, + isOverwrite, + partition) => LoadDataCommand( ident, path, @@ -337,7 +344,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } ShowColumnsCommand(db, v1TableName, output) - case RecoverPartitions(ResolvedV1TableIdentifier(ident)) => + case RecoverPartitions(ResolvedV1TableIdentifierInSessionCatalog(ident)) => RepairTableCommand( ident, enableAddPartitions = true, @@ -366,7 +373,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) retainData = false) case SetTableSerDeProperties( - ResolvedV1TableIdentifier(ident), + ResolvedV1TableIdentifierInSessionCatalog(ident), serdeClassName, serdeProperties, partitionSpec) => @@ -376,15 +383,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) serdeProperties, partitionSpec) - case SetTableLocation(ResolvedV1TableIdentifier(ident), None, location) => + case SetTableLocation(ResolvedV1TableIdentifierInSessionCatalog(ident), None, location) => AlterTableSetLocationCommand(ident, None, location) // V2 catalog doesn't support setting partition location yet, we must use v1 command here. case SetTableLocation( - ResolvedTable(catalog, _, t: V1Table, _), + ResolvedV1TableIdentifierInSessionCatalog(ident), Some(partitionSpec), - location) if isSessionCatalog(catalog) => - AlterTableSetLocationCommand(t.v1Table.identifier, Some(partitionSpec), location) + location) => + AlterTableSetLocationCommand(ident, Some(partitionSpec), location) case AlterViewAs(ResolvedViewIdentifier(ident), originalText, query) => AlterViewAsCommand(ident, originalText, query) @@ -601,7 +608,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } } - object ResolvedTableIdentifierInSessionCatalog { + object ResolvedV1TableIdentifierInSessionCatalog { def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { case ResolvedTable(catalog, _, t: V1Table, _) if isSessionCatalog(catalog) => Some(t.catalogTable.identifier) @@ -693,8 +700,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } private def supportsV1Command(catalog: CatalogPlugin): Boolean = { - (isSessionCatalog(catalog) && - SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isEmpty) || - catalog.isInstanceOf[DelegatingCatalogExtension] + isSessionCatalog(catalog) && ( + SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isEmpty || + catalog.isInstanceOf[DelegatingCatalogExtension]) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala index 95624f3f61c5c..7463eb34d17ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala @@ -71,4 +71,12 @@ class DataSourceV2SQLSessionCatalogSuite sql(s"CREATE EXTERNAL TABLE t (i INT) USING $v2Format TBLPROPERTIES($prop)") } } + + test("SPARK-49152: partition columns should be put at the end") { + withTable("t") { + sql("CREATE TABLE t (c1 INT, c2 INT) USING json PARTITIONED BY (c1)") + // partition columns should be put at the end. + assert(getTableMetadata("default.t").columns().map(_.name()) === Seq("c2", "c1")) + } + } } From f3828fe2e383ba2de38418561c4d02d65bd98caa Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 5 Sep 2024 14:45:37 +0800 Subject: [PATCH 3/4] Apply suggestions from code review --- .../spark/sql/catalyst/analysis/ResolveSessionCatalog.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index f1b8c6456d81c..02ad2e79a5645 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -284,12 +284,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) => AnalyzeColumnCommand(ident, columnNames, allColumns) + // V2 catalog doesn't support REPAIR TABLE yet, we must use v1 command here. case RepairTable( ResolvedV1TableIdentifierInSessionCatalog(ident), addPartitions, dropPartitions) => RepairTableCommand(ident, addPartitions, dropPartitions) + // V2 catalog doesn't support LOAD DATA yet, we must use v1 command here. case LoadData( ResolvedV1TableIdentifierInSessionCatalog(ident), path, @@ -344,6 +346,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } ShowColumnsCommand(db, v1TableName, output) + // V2 catalog doesn't support RECOVER PARTITIONS yet, we must use v1 command here. case RecoverPartitions(ResolvedV1TableIdentifierInSessionCatalog(ident)) => RepairTableCommand( ident, @@ -372,6 +375,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) purge, retainData = false) + // V2 catalog doesn't support setting serde properties yet, we must use v1 command here. case SetTableSerDeProperties( ResolvedV1TableIdentifierInSessionCatalog(ident), serdeClassName, @@ -383,7 +387,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) serdeProperties, partitionSpec) - case SetTableLocation(ResolvedV1TableIdentifierInSessionCatalog(ident), None, location) => + case SetTableLocation(ResolvedV1TableIdentifier(ident), None, location) => AlterTableSetLocationCommand(ident, None, location) // V2 catalog doesn't support setting partition location yet, we must use v1 command here. From f08a763367f2a5d8105b2af3b7af4977d9808d4f Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 5 Sep 2024 17:55:11 +0800 Subject: [PATCH 4/4] fix --- .../sql/connector/DataSourceV2SQLSuite.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 1d37c6aa4eb7f..922bf01b541a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2125,10 +2125,18 @@ class DataSourceV2SQLSuiteV1Filter } test("REPLACE TABLE: v1 table") { - sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}") - val v2Catalog = catalog("spark_catalog").asTableCatalog - val table = v2Catalog.loadTable(Identifier.of(Array("default"), "tbl")) - assert(table.properties().get(TableCatalog.PROP_PROVIDER) == classOf[SimpleScanSource].getName) + val e = intercept[AnalysisException] { + sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}") + } + checkError( + exception = e, + errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", + sqlState = "0A000", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`tbl`", + "operation" -> "REPLACE TABLE" + ) + ) } test("DeleteFrom: - delete with invalid predicate") {