From 654b765dd5895c88825617b74dcbc064f524804a Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Wed, 7 Aug 2024 19:54:51 -0700 Subject: [PATCH 01/13] [SPARK-49152][SQL] V2SessionCatalog should use V2Command when possible. --- .../sql/connector/catalog/CatalogV2Util.scala | 6 ++++++ .../analysis/ResolveSessionCatalog.scala | 21 ++++++++++++------- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 283c550c4556f..d3ea2ff1db618 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.expressions.{ClusterByTransform, LiteralValue, Transform} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ArrayType, MapType, Metadata, MetadataBuilder, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.ArrayImplicits._ @@ -444,6 +445,11 @@ private[sql] object CatalogV2Util { catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) } + def isV2SessionCatalog(catalog: CatalogPlugin): Boolean = { + catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) && + SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined + } + def convertTableProperties(t: TableSpec): Map[String, String] = { val props = convertTableProperties( t.properties, t.options, t.serde, t.location, t.comment, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index a460634ad8a80..bd318535fe16f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -68,7 +68,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) throw QueryCompilationErrors.unsupportedTableOperationError(ident, "REPLACE COLUMNS") case a @ AlterColumn(ResolvedTable(catalog, ident, table: V1Table, _), _, _, _, _, _, _) - if isSessionCatalog(catalog) => + if isSessionCatalog(catalog) && !isV2SessionCatalog(catalog) => if (a.column.name.length > 1) { throw QueryCompilationErrors.unsupportedTableOperationError( catalog, ident, "ALTER COLUMN with qualified column") @@ -102,7 +102,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) AlterTableChangeColumnCommand(table.catalogTable.identifier, colName, newColumn) case AlterTableClusterBy(ResolvedTable(catalog, _, table: V1Table, _), clusterBySpecOpt) - if isSessionCatalog(catalog) => + if isSessionCatalog(catalog) && !isV2SessionCatalog(catalog) => val prop = Map(ClusterBySpec.toProperty(table.schema, clusterBySpecOpt.getOrElse(ClusterBySpec(Nil)), conf.resolver)) AlterTableSetPropertiesCommand(table.catalogTable.identifier, prop, isView = false) @@ -257,7 +257,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case ShowTablePartition( ResolvedTable(catalog, _, table: V1Table, _), partitionSpec, - output) if isSessionCatalog(catalog) => + output) if isSessionCatalog(catalog) && !isV2SessionCatalog(catalog) => val newOutput = if (conf.getConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA)) { output.head.withName("database") +: output.tail } else { @@ -305,7 +305,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) if conf.useV1Command => ShowCreateTableCommand(ident, output) case ShowCreateTable(ResolvedTable(catalog, _, table: V1Table, _), _, output) - if isSessionCatalog(catalog) && DDLUtils.isHiveTable(table.catalogTable) => + if isSessionCatalog(catalog) && !isV2SessionCatalog(catalog) + && DDLUtils.isHiveTable(table.catalogTable) => ShowCreateTableCommand(table.catalogTable.identifier, output) case TruncateTable(ResolvedV1TableIdentifier(ident)) => @@ -583,7 +584,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) object ResolvedV1TableIdentifier { def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { - case ResolvedTable(catalog, _, t: V1Table, _) if isSessionCatalog(catalog) => + case ResolvedTable(catalog, _, t: V1Table, _) + if isSessionCatalog(catalog) && !isV2SessionCatalog(catalog) => Some(t.catalogTable.identifier) case _ => None } @@ -599,7 +601,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) object ResolvedV1Identifier { def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { - case ResolvedIdentifier(catalog, ident) if isSessionCatalog(catalog) => + case ResolvedIdentifier(catalog, ident) + if isSessionCatalog(catalog) && !isV2SessionCatalog(catalog) => if (ident.namespace().length != 1) { throw QueryCompilationErrors .requiresSinglePartNamespaceError(ident.namespace().toImmutableArraySeq) @@ -624,7 +627,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) private object DatabaseInSessionCatalog { def unapply(resolved: ResolvedNamespace): Option[String] = resolved match { - case ResolvedNamespace(catalog, _, _) if !isSessionCatalog(catalog) => None + case ResolvedNamespace(catalog, _, _) + if !isSessionCatalog(catalog) || isV2SessionCatalog(catalog) => None case ResolvedNamespace(_, Seq(), _) => throw QueryCompilationErrors.databaseFromV1SessionCatalogNotSpecifiedError() case ResolvedNamespace(_, Seq(dbName), _) => Some(dbName) @@ -637,7 +641,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) private object DatabaseNameInSessionCatalog { def unapply(resolved: ResolvedNamespace): Option[String] = resolved match { - case ResolvedNamespace(catalog, _, _) if !isSessionCatalog(catalog) => None + case ResolvedNamespace(catalog, _, _) + if !isSessionCatalog(catalog) || isV2SessionCatalog(catalog) => None case ResolvedNamespace(_, Seq(dbName), _) => Some(dbName) case _ => assert(resolved.namespace.length > 1) From 97ea8233d484dc91c1d64d5c6531855f6b8ada54 Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Thu, 8 Aug 2024 21:49:06 -0700 Subject: [PATCH 02/13] update --- .../sql/connector/catalog/CatalogV2Util.scala | 10 +-- .../analysis/ResolveSessionCatalog.scala | 80 ++++++++++--------- .../datasources/v2/DataSourceV2Strategy.scala | 22 ++++- .../org/apache/spark/sql/CollationSuite.scala | 7 +- .../sql/connector/DataSourceV2SQLSuite.scala | 12 +-- .../connector/TestV2SessionCatalogBase.scala | 20 +++-- 6 files changed, 90 insertions(+), 61 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index d3ea2ff1db618..e72f3f0086946 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -441,13 +441,13 @@ private[sql] object CatalogV2Util { loadTable(catalog, ident).map(DataSourceV2Relation.create(_, Some(catalog), Some(ident))) } - def isSessionCatalog(catalog: CatalogPlugin): Boolean = { - catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) + def supportsV1Command(catalog: CatalogPlugin): Boolean = { + catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) && + !SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined } - def isV2SessionCatalog(catalog: CatalogPlugin): Boolean = { - catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) && - SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined + def isSessionCatalog(catalog: CatalogPlugin): Boolean = { + catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) } def convertTableProperties(t: TableSpec): Map[String, String] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index bd318535fe16f..2f06c56456a76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -68,7 +68,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) throw QueryCompilationErrors.unsupportedTableOperationError(ident, "REPLACE COLUMNS") case a @ AlterColumn(ResolvedTable(catalog, ident, table: V1Table, _), _, _, _, _, _, _) - if isSessionCatalog(catalog) && !isV2SessionCatalog(catalog) => + if supportsV1Command(catalog) => if (a.column.name.length > 1) { throw QueryCompilationErrors.unsupportedTableOperationError( catalog, ident, "ALTER COLUMN with qualified column") @@ -102,7 +102,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) AlterTableChangeColumnCommand(table.catalogTable.identifier, colName, newColumn) case AlterTableClusterBy(ResolvedTable(catalog, _, table: V1Table, _), clusterBySpecOpt) - if isSessionCatalog(catalog) && !isV2SessionCatalog(catalog) => + if supportsV1Command(catalog) => val prop = Map(ClusterBySpec.toProperty(table.schema, clusterBySpecOpt.getOrElse(ClusterBySpec(Nil)), conf.resolver)) AlterTableSetPropertiesCommand(table.catalogTable.identifier, prop, isView = false) @@ -194,26 +194,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case RefreshTable(ResolvedV1TableOrViewIdentifier(ident)) => RefreshTableCommand(ident) - // For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the - // session catalog and the table provider is not v2. - case c @ ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _) if c.resolved => - val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName) - if (!isV2Provider(provider)) { - throw QueryCompilationErrors.unsupportedTableOperationError( - ident, "REPLACE TABLE") - } else { - c - } - - case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _) => - val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName) - if (!isV2Provider(provider)) { - throw QueryCompilationErrors.unsupportedTableOperationError( - ident, "REPLACE TABLE AS SELECT") - } else { - c - } - case DropTable(ResolvedV1Identifier(ident), ifExists, purge) if conf.useV1Command => DropTableCommand(ident, ifExists, isView = false, purge = purge) @@ -221,7 +201,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case DropTable(ResolvedIdentifier(FakeSystemCatalog, ident), _, _) => DropTempViewCommand(ident) - case DropView(ResolvedV1Identifier(ident), ifExists) => + case DropView(ResolvedV1IdentifierForNonV2Commands(ident), ifExists) => DropTableCommand(ident, ifExists, isView = true, purge = false) case DropView(r @ ResolvedIdentifier(catalog, ident), _) => @@ -257,7 +237,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case ShowTablePartition( ResolvedTable(catalog, _, table: V1Table, _), partitionSpec, - output) if isSessionCatalog(catalog) && !isV2SessionCatalog(catalog) => + output) if supportsV1Command(catalog) => val newOutput = if (conf.getConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA)) { output.head.withName("database") +: output.tail } else { @@ -305,8 +285,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) if conf.useV1Command => ShowCreateTableCommand(ident, output) case ShowCreateTable(ResolvedTable(catalog, _, table: V1Table, _), _, output) - if isSessionCatalog(catalog) && !isV2SessionCatalog(catalog) - && DDLUtils.isHiveTable(table.catalogTable) => + if supportsV1Command(catalog) && DDLUtils.isHiveTable(table.catalogTable) => ShowCreateTableCommand(table.catalogTable.identifier, output) case TruncateTable(ResolvedV1TableIdentifier(ident)) => @@ -383,7 +362,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case AlterViewSchemaBinding(ResolvedViewIdentifier(ident), viewSchemaMode) => AlterViewSchemaBindingCommand(ident, viewSchemaMode) - case CreateView(ResolvedV1Identifier(ident), userSpecifiedColumns, comment, + case CreateView(ResolvedV1IdentifierForNonV2Commands(ident), userSpecifiedColumns, comment, properties, originalText, child, allowExisting, replace, viewSchemaMode) => CreateViewCommand( name = ident, @@ -402,7 +381,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case ShowViews(ns: ResolvedNamespace, pattern, output) => ns match { - case DatabaseInSessionCatalog(db) => ShowViewsCommand(db, pattern, output) + case DatabaseInSessionCatalogForNonV2Commands(db) => ShowViewsCommand(db, pattern, output) case _ => throw QueryCompilationErrors.missingCatalogAbilityError(ns.catalog, "views") } @@ -425,7 +404,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "functions") } - case ShowFunctions(DatabaseInSessionCatalog(db), userScope, systemScope, pattern, output) => + case ShowFunctions( + DatabaseInSessionCatalogForNonV2Commands(db), userScope, systemScope, pattern, output) => ShowFunctionsCommand(db, pattern, userScope, systemScope, output) case DropFunction(ResolvedPersistentFunc(catalog, identifier, _), ifExists) => @@ -446,7 +426,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "REFRESH FUNCTION") } - case CreateFunction(ResolvedV1Identifier(ident), className, resources, ifExists, replace) => + case CreateFunction + (ResolvedV1IdentifierForNonV2Commands(ident), className, resources, ifExists, replace) => CreateFunctionCommand( FunctionIdentifier(ident.table, ident.database, ident.catalog), className, @@ -584,8 +565,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) object ResolvedV1TableIdentifier { def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { - case ResolvedTable(catalog, _, t: V1Table, _) - if isSessionCatalog(catalog) && !isV2SessionCatalog(catalog) => + case ResolvedTable(catalog, _, t: V1Table, _) if supportsV1Command(catalog) => Some(t.catalogTable.identifier) case _ => None } @@ -601,8 +581,20 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) object ResolvedV1Identifier { def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { - case ResolvedIdentifier(catalog, ident) - if isSessionCatalog(catalog) && !isV2SessionCatalog(catalog) => + case ResolvedIdentifier(catalog, ident) if supportsV1Command(catalog) => + if (ident.namespace().length != 1) { + throw QueryCompilationErrors + .requiresSinglePartNamespaceError(ident.namespace().toImmutableArraySeq) + } + Some(TableIdentifier(ident.name, Some(ident.namespace.head), Some(catalog.name))) + case _ => None + } + } + + // Use this object to help match commands that do not have a v2 implementation. + object ResolvedV1IdentifierForNonV2Commands { + def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { + case ResolvedIdentifier(catalog, ident) if isSessionCatalog(catalog) => if (ident.namespace().length != 1) { throw QueryCompilationErrors .requiresSinglePartNamespaceError(ident.namespace().toImmutableArraySeq) @@ -627,8 +619,21 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) private object DatabaseInSessionCatalog { def unapply(resolved: ResolvedNamespace): Option[String] = resolved match { - case ResolvedNamespace(catalog, _, _) - if !isSessionCatalog(catalog) || isV2SessionCatalog(catalog) => None + case ResolvedNamespace(catalog, _, _) if !supportsV1Command(catalog) => None + case ResolvedNamespace(_, Seq(), _) => + throw QueryCompilationErrors.databaseFromV1SessionCatalogNotSpecifiedError() + case ResolvedNamespace(_, Seq(dbName), _) => Some(dbName) + case _ => + assert(resolved.namespace.length > 1) + throw QueryCompilationErrors.nestedDatabaseUnsupportedByV1SessionCatalogError( + resolved.namespace.map(quoteIfNeeded).mkString(".")) + } + } + + // Use this object to help match commands that do not have a v2 implementation. + private object DatabaseInSessionCatalogForNonV2Commands { + def unapply(resolved: ResolvedNamespace): Option[String] = resolved match { + case ResolvedNamespace(catalog, _, _) if !isSessionCatalog(catalog) => None case ResolvedNamespace(_, Seq(), _) => throw QueryCompilationErrors.databaseFromV1SessionCatalogNotSpecifiedError() case ResolvedNamespace(_, Seq(dbName), _) => Some(dbName) @@ -641,8 +646,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) private object DatabaseNameInSessionCatalog { def unapply(resolved: ResolvedNamespace): Option[String] = resolved match { - case ResolvedNamespace(catalog, _, _) - if !isSessionCatalog(catalog) || isV2SessionCatalog(catalog) => None + case ResolvedNamespace(catalog, _, _) if !supportsV1Command(catalog) => None case ResolvedNamespace(_, Seq(dbName), _) => Some(dbName) case _ => assert(resolved.namespace.length > 1) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index b0a89173060a5..a1d5b49e1516b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.mutable +import org.apache.hadoop.conf.Configuration + import org.apache.spark.SparkException import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys.EXPR @@ -53,6 +55,23 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat import DataSourceV2Implicits._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + lazy private val hadoopConf = new Configuration() + + private def withProjectAndFilter( + project: Seq[NamedExpression], + filters: Seq[Expression], + scan: LeafExecNode, + needsUnsafeConversion: Boolean): SparkPlan = { + val filterCondition = filters.reduceLeftOption(And) + val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan) + + if (withFilter.output != project || needsUnsafeConversion) { + ProjectExec(project, withFilter) + } else { + withFilter + } + } + private def refreshCache(r: DataSourceV2Relation)(): Unit = { session.sharedState.cacheManager.recacheByPlan(session, r) } @@ -87,7 +106,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } private def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = { - tableSpec.withNewLocation(tableSpec.location.map(makeQualifiedDBObjectPath(_))) + tableSpec.withNewLocation(tableSpec.location.map(loc => CatalogUtils.makeQualifiedPath( + CatalogUtils.stringToURI(loc), hadoopConf).toString)) } override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala index dd678ac48c687..0bfdca10b16d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.internal.{SqlApiConf, SQLConf} import org.apache.spark.sql.types.{ArrayType, MapType, StringType, StructField, StructType} +import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { protected val v2Source = classOf[FakeV2ProviderWithCustomSchema].getName @@ -157,6 +158,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { } test("disable bucketing on collated string column") { + spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) def createTable(bucketColumns: String*): Unit = { val tableName = "test_partition_tbl" withTable(tableName) { @@ -180,8 +182,8 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { exception = intercept[AnalysisException] { createTable(bucketColumns: _*) }, - errorClass = "INVALID_BUCKET_COLUMN_DATA_TYPE", - parameters = Map("type" -> "\"STRING COLLATE UNICODE\"") + errorClass = "INVALID_BUCKET_COLUMN_DATA_TYPE", + parameters = Map("type" -> "\"STRING COLLATE UNICODE\"") ); } } @@ -758,6 +760,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { } test("disable partition on collated string column") { + spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) def createTable(partitionColumns: String*): Unit = { val tableName = "test_partition_tbl" withTable(tableName) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index ec4b827c659f8..329d8cecbe69b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -441,7 +441,7 @@ class DataSourceV2SQLSuiteV1Filter val location = spark.sql(s"DESCRIBE EXTENDED $identifier") .filter("col_name = 'Location'") .select("data_type").head().getString(0) - assert(location === "file:/tmp/foo") + assert(location === "file:///tmp/foo") } } } @@ -2104,15 +2104,7 @@ class DataSourceV2SQLSuiteV1Filter } test("REPLACE TABLE: v1 table") { - val e = intercept[AnalysisException] { - sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}") - } - checkError( - exception = e, - errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", - sqlState = "0A000", - parameters = Map("tableName" -> "`spark_catalog`.`default`.`tbl`", - "operation" -> "REPLACE TABLE")) + sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}") } test("DeleteFrom: - delete with invalid predicate") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala index 3b2fc0379340b..023ec83fc4b08 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala @@ -71,18 +71,28 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] extends Delegating properties: java.util.Map[String, String]): Table = { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper val key = TestV2SessionCatalogBase.SIMULATE_ALLOW_EXTERNAL_PROPERTY - val propsWithLocation = if (properties.containsKey(key)) { - // Always set a location so that CREATE EXTERNAL TABLE won't fail with LOCATION not specified. - if (!properties.containsKey(TableCatalog.PROP_LOCATION)) { + val updatedProps = + if (properties.containsKey(TableCatalog.PROP_LOCATION)) { val newProps = new java.util.HashMap[String, String]() newProps.putAll(properties) - newProps.put(TableCatalog.PROP_LOCATION, "file:/abc") + newProps.put(TableCatalog.PROP_EXTERNAL, "true") newProps } else { properties } + + val propsWithLocation = if (updatedProps.containsKey(key)) { + // Always set a location so that CREATE EXTERNAL TABLE won't fail with LOCATION not specified. + if (!updatedProps.containsKey(TableCatalog.PROP_LOCATION)) { + val newProps = new java.util.HashMap[String, String]() + newProps.putAll(updatedProps) + newProps.put(TableCatalog.PROP_LOCATION, "file:/abc") + newProps + } else { + updatedProps + } } else { - properties + updatedProps } super.createTable(ident, columns, partitions, propsWithLocation) val schema = CatalogV2Util.v2ColumnsToStructType(columns) From 8da62d57787850476cd2e479255cde02f6881abc Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Thu, 8 Aug 2024 21:48:28 -0700 Subject: [PATCH 03/13] update --- .../org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 329d8cecbe69b..b79cafc6d31d4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -458,7 +458,7 @@ class DataSourceV2SQLSuiteV1Filter val location = spark.sql(s"DESCRIBE EXTENDED $identifier") .filter("col_name = 'Location'") .select("data_type").head().getString(0) - assert(location === "file:/tmp/foo") + assert(location === "file:///tmp/foo") } } } From 890909bfa9470cee49e1a8f85d3e6b612eec220a Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Thu, 8 Aug 2024 21:52:59 -0700 Subject: [PATCH 04/13] update --- .../src/test/scala/org/apache/spark/sql/CollationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala index 0bfdca10b16d9..61f92b87abf50 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala @@ -34,8 +34,8 @@ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAg import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.internal.{SqlApiConf, SQLConf} -import org.apache.spark.sql.types.{ArrayType, MapType, StringType, StructField, StructType} import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION +import org.apache.spark.sql.types.{ArrayType, MapType, StringType, StructField, StructType} class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { protected val v2Source = classOf[FakeV2ProviderWithCustomSchema].getName From 1eba81234e985d09868c7d33f3aa7ab3ab23b993 Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Thu, 8 Aug 2024 22:05:27 -0700 Subject: [PATCH 05/13] u[date --- .../src/test/scala/org/apache/spark/sql/CollationSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala index 61f92b87abf50..3757284d7d3e3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala @@ -182,8 +182,8 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { exception = intercept[AnalysisException] { createTable(bucketColumns: _*) }, - errorClass = "INVALID_BUCKET_COLUMN_DATA_TYPE", - parameters = Map("type" -> "\"STRING COLLATE UNICODE\"") + errorClass = "INVALID_BUCKET_COLUMN_DATA_TYPE", + parameters = Map("type" -> "\"STRING COLLATE UNICODE\"") ); } } From 250a4ae39d9a21313987740594c5fd6236275ae5 Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Fri, 9 Aug 2024 12:10:52 -0700 Subject: [PATCH 06/13] update --- .../sql/connector/catalog/CatalogV2Util.scala | 6 --- .../analysis/ResolveSessionCatalog.scala | 53 ++++++++++++++----- .../datasources/v2/DataSourceV2Strategy.scala | 4 +- 3 files changed, 40 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index e72f3f0086946..283c550c4556f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -35,7 +35,6 @@ import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.expressions.{ClusterByTransform, LiteralValue, Transform} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ArrayType, MapType, Metadata, MetadataBuilder, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.ArrayImplicits._ @@ -441,11 +440,6 @@ private[sql] object CatalogV2Util { loadTable(catalog, ident).map(DataSourceV2Relation.create(_, Some(catalog), Some(ident))) } - def supportsV1Command(catalog: CatalogPlugin): Boolean = { - catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) && - !SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined - } - def isSessionCatalog(catalog: CatalogPlugin): Boolean = { catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 2f06c56456a76..41dcba9905871 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL, ResolveDefaultColumns => DefaultCols} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command._ @@ -102,7 +102,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) AlterTableChangeColumnCommand(table.catalogTable.identifier, colName, newColumn) case AlterTableClusterBy(ResolvedTable(catalog, _, table: V1Table, _), clusterBySpecOpt) - if supportsV1Command(catalog) => + if supportsV1Command(catalog) => val prop = Map(ClusterBySpec.toProperty(table.schema, clusterBySpecOpt.getOrElse(ClusterBySpec(Nil)), conf.resolver)) AlterTableSetPropertiesCommand(table.catalogTable.identifier, prop, isView = false) @@ -125,13 +125,13 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case UnsetViewProperties(ResolvedViewIdentifier(ident), keys, ifExists) => AlterTableUnsetPropertiesCommand(ident, keys, ifExists, isView = true) - case DescribeNamespace(DatabaseInSessionCatalog(db), extended, output) if conf.useV1Command => + case DescribeNamespace(ResolvedV1Database(db), extended, output) if conf.useV1Command => DescribeDatabaseCommand(db, extended, output) - case SetNamespaceProperties(DatabaseInSessionCatalog(db), properties) if conf.useV1Command => + case SetNamespaceProperties(ResolvedV1Database(db), properties) if conf.useV1Command => AlterDatabasePropertiesCommand(db, properties) - case SetNamespaceLocation(DatabaseInSessionCatalog(db), location) if conf.useV1Command => + case SetNamespaceLocation(ResolvedV1Database(db), location) if conf.useV1Command => AlterDatabaseSetLocationCommand(db, location) case RenameTable(ResolvedV1TableOrViewIdentifier(oldIdent), newName, isView) => @@ -194,6 +194,26 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case RefreshTable(ResolvedV1TableOrViewIdentifier(ident)) => RefreshTableCommand(ident) + // For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the + // session catalog and the table provider is not v2. + case c@ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _) if c.resolved => + val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName) + if (!isV2Provider(provider)) { + throw QueryCompilationErrors.unsupportedTableOperationError( + ident, "REPLACE TABLE") + } else { + c + } + + case c@ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _) => + val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName) + if (!isV2Provider(provider)) { + throw QueryCompilationErrors.unsupportedTableOperationError( + ident, "REPLACE TABLE AS SELECT") + } else { + c + } + case DropTable(ResolvedV1Identifier(ident), ifExists, purge) if conf.useV1Command => DropTableCommand(ident, ifExists, isView = false, purge = purge) @@ -201,7 +221,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case DropTable(ResolvedIdentifier(FakeSystemCatalog, ident), _, _) => DropTempViewCommand(ident) - case DropView(ResolvedV1IdentifierForNonV2Commands(ident), ifExists) => + case DropView(ResolvedIdentifierInSessionCatalog(ident), ifExists) => DropTableCommand(ident, ifExists, isView = true, purge = false) case DropView(r @ ResolvedIdentifier(catalog, ident), _) => @@ -217,14 +237,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) val newProperties = c.properties -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES CreateDatabaseCommand(name, c.ifNotExists, location, comment, newProperties) - case d @ DropNamespace(DatabaseInSessionCatalog(db), _, _) if conf.useV1Command => + case d @ DropNamespace(ResolvedV1Database(db), _, _) if conf.useV1Command => DropDatabaseCommand(db, d.ifExists, d.cascade) - case ShowTables(DatabaseInSessionCatalog(db), pattern, output) if conf.useV1Command => + case ShowTables(ResolvedV1Database(db), pattern, output) if conf.useV1Command => ShowTablesCommand(Some(db), pattern, output) case ShowTablesExtended( - DatabaseInSessionCatalog(db), + ResolvedV1Database(db), pattern, output) => val newOutput = if (conf.getConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA)) { @@ -257,7 +277,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) AnalyzePartitionCommand(ident, partitionSpec, noScan) } - case AnalyzeTables(DatabaseInSessionCatalog(db), noScan) => + case AnalyzeTables(ResolvedV1Database(db), noScan) => AnalyzeTablesCommand(Some(db), noScan) case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) => @@ -362,7 +382,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case AlterViewSchemaBinding(ResolvedViewIdentifier(ident), viewSchemaMode) => AlterViewSchemaBindingCommand(ident, viewSchemaMode) - case CreateView(ResolvedV1IdentifierForNonV2Commands(ident), userSpecifiedColumns, comment, + case CreateView(ResolvedIdentifierInSessionCatalog(ident), userSpecifiedColumns, comment, properties, originalText, child, allowExisting, replace, viewSchemaMode) => CreateViewCommand( name = ident, @@ -427,7 +447,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } case CreateFunction - (ResolvedV1IdentifierForNonV2Commands(ident), className, resources, ifExists, replace) => + (ResolvedIdentifierInSessionCatalog(ident), className, resources, ifExists, replace) => CreateFunctionCommand( FunctionIdentifier(ident.table, ident.database, ident.catalog), className, @@ -592,7 +612,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } // Use this object to help match commands that do not have a v2 implementation. - object ResolvedV1IdentifierForNonV2Commands { + object ResolvedIdentifierInSessionCatalog{ def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { case ResolvedIdentifier(catalog, ident) if isSessionCatalog(catalog) => if (ident.namespace().length != 1) { @@ -617,7 +637,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) DataSourceV2Utils.getTableProvider(provider, conf).isDefined } - private object DatabaseInSessionCatalog { + private object ResolvedV1Database { def unapply(resolved: ResolvedNamespace): Option[String] = resolved match { case ResolvedNamespace(catalog, _, _) if !supportsV1Command(catalog) => None case ResolvedNamespace(_, Seq(), _) => @@ -653,4 +673,9 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) throw QueryCompilationErrors.requiresSinglePartNamespaceError(resolved.namespace) } } + + private def supportsV1Command(catalog: CatalogPlugin): Boolean = { + catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) && + !SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index a1d5b49e1516b..30cb477b1a87f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.mutable -import org.apache.hadoop.conf.Configuration - import org.apache.spark.SparkException import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys.EXPR @@ -55,7 +53,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat import DataSourceV2Implicits._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - lazy private val hadoopConf = new Configuration() + lazy private val hadoopConf = session.sparkContext.hadoopConfiguration private def withProjectAndFilter( project: Seq[NamedExpression], From e3173626e4988f95fba78885b25eef5d94a0a10d Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Fri, 9 Aug 2024 12:16:55 -0700 Subject: [PATCH 07/13] update --- .../spark/sql/catalyst/analysis/ResolveSessionCatalog.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 41dcba9905871..f820a57461137 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -401,7 +401,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case ShowViews(ns: ResolvedNamespace, pattern, output) => ns match { - case DatabaseInSessionCatalogForNonV2Commands(db) => ShowViewsCommand(db, pattern, output) + case ResolvedDatabaseInSessionCatalog(db) => ShowViewsCommand(db, pattern, output) case _ => throw QueryCompilationErrors.missingCatalogAbilityError(ns.catalog, "views") } @@ -425,7 +425,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } case ShowFunctions( - DatabaseInSessionCatalogForNonV2Commands(db), userScope, systemScope, pattern, output) => + ResolvedDatabaseInSessionCatalog(db), userScope, systemScope, pattern, output) => ShowFunctionsCommand(db, pattern, userScope, systemScope, output) case DropFunction(ResolvedPersistentFunc(catalog, identifier, _), ifExists) => @@ -651,7 +651,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } // Use this object to help match commands that do not have a v2 implementation. - private object DatabaseInSessionCatalogForNonV2Commands { + private object ResolvedDatabaseInSessionCatalog { def unapply(resolved: ResolvedNamespace): Option[String] = resolved match { case ResolvedNamespace(catalog, _, _) if !isSessionCatalog(catalog) => None case ResolvedNamespace(_, Seq(), _) => From 3dd1cfa7d2eb99bfcb123436d3214a48fe6fe3a8 Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Fri, 9 Aug 2024 12:29:59 -0700 Subject: [PATCH 08/13] update --- .../sql/connector/DataSourceV2SQLSuite.scala | 3 +++ .../connector/TestV2SessionCatalogBase.scala | 24 +++++++------------ 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index b79cafc6d31d4..a94066ca10342 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2105,6 +2105,9 @@ class DataSourceV2SQLSuiteV1Filter test("REPLACE TABLE: v1 table") { sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}") + val descInfo = sql(s"DESCRIBE TABLE EXTENDED tbl").collectAsList() + // This is the provider field that we check the table is a type of `SimpleScanSource`. + assert(descInfo.get(9).getString(1) == classOf[SimpleScanSource].getName) } test("DeleteFrom: - delete with invalid predicate") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala index 023ec83fc4b08..ff944dbb805cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala @@ -71,28 +71,22 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] extends Delegating properties: java.util.Map[String, String]): Table = { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper val key = TestV2SessionCatalogBase.SIMULATE_ALLOW_EXTERNAL_PROPERTY - val updatedProps = - if (properties.containsKey(TableCatalog.PROP_LOCATION)) { - val newProps = new java.util.HashMap[String, String]() - newProps.putAll(properties) - newProps.put(TableCatalog.PROP_EXTERNAL, "true") - newProps - } else { - properties - } + val newProps = new java.util.HashMap[String, String]() + newProps.putAll(properties) + if (properties.containsKey(TableCatalog.PROP_LOCATION)) { + newProps.put(TableCatalog.PROP_EXTERNAL, "true") + } - val propsWithLocation = if (updatedProps.containsKey(key)) { + val propsWithLocation = if (newProps.containsKey(key)) { // Always set a location so that CREATE EXTERNAL TABLE won't fail with LOCATION not specified. - if (!updatedProps.containsKey(TableCatalog.PROP_LOCATION)) { - val newProps = new java.util.HashMap[String, String]() - newProps.putAll(updatedProps) + if (!newProps.containsKey(TableCatalog.PROP_LOCATION)) { newProps.put(TableCatalog.PROP_LOCATION, "file:/abc") newProps } else { - updatedProps + newProps } } else { - updatedProps + newProps } super.createTable(ident, columns, partitions, propsWithLocation) val schema = CatalogV2Util.v2ColumnsToStructType(columns) From 73fc19534fd7d09d5f109d60c351ae351cc506b8 Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Fri, 9 Aug 2024 18:31:21 -0700 Subject: [PATCH 09/13] update --- .../catalyst/analysis/ResolveSessionCatalog.scala | 8 ++++---- .../datasources/v2/DataSourceV2Strategy.scala | 15 --------------- ...DataSourceV2DataFrameSessionCatalogSuite.scala | 2 +- .../command/v2/ShowCreateTableSuite.scala | 2 +- .../apache/spark/sql/internal/CatalogSuite.scala | 2 +- 5 files changed, 7 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index f820a57461137..92255775cfa5a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -196,7 +196,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the // session catalog and the table provider is not v2. - case c@ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _) if c.resolved => + case c @ ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _) if c.resolved => val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName) if (!isV2Provider(provider)) { throw QueryCompilationErrors.unsupportedTableOperationError( @@ -205,7 +205,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) c } - case c@ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _) => + case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _) => val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName) if (!isV2Provider(provider)) { throw QueryCompilationErrors.unsupportedTableOperationError( @@ -446,8 +446,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "REFRESH FUNCTION") } - case CreateFunction - (ResolvedIdentifierInSessionCatalog(ident), className, resources, ifExists, replace) => + case CreateFunction( + ResolvedIdentifierInSessionCatalog(ident), className, resources, ifExists, replace) => CreateFunctionCommand( FunctionIdentifier(ident.table, ident.database, ident.catalog), className, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 30cb477b1a87f..2797b48001e13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -55,21 +55,6 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat lazy private val hadoopConf = session.sparkContext.hadoopConfiguration - private def withProjectAndFilter( - project: Seq[NamedExpression], - filters: Seq[Expression], - scan: LeafExecNode, - needsUnsafeConversion: Boolean): SparkPlan = { - val filterCondition = filters.reduceLeftOption(And) - val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan) - - if (withFilter.output != project || needsUnsafeConversion) { - ProjectExec(project, withFilter) - } else { - withFilter - } - } - private def refreshCache(r: DataSourceV2Relation)(): Unit = { session.sharedState.cacheManager.recacheByPlan(session, r) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala index 7bbb6485c273f..ef22fb71bb405 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala @@ -84,7 +84,7 @@ class DataSourceV2DataFrameSessionCatalogSuite spark.range(20).write.format(v2Format).option("path", "/abc").saveAsTable(t1) val cat = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog] val tableInfo = cat.loadTable(Identifier.of(Array("default"), t1)) - assert(tableInfo.properties().get("location") === "file:/abc") + assert(tableInfo.properties().get("location") === "file:///abc") assert(tableInfo.properties().get("provider") === v2Format) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala index f72127cbd1de2..5b48faf5c8869 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala @@ -106,7 +106,7 @@ class ShowCreateTableSuite extends command.ShowCreateTableSuiteBase with Command "'via' = '2')", "PARTITIONED BY (a)", "COMMENT 'This is a comment'", - "LOCATION 'file:/tmp'", + "LOCATION 'file:///tmp'", "TBLPROPERTIES (", "'password' = '*********(redacted)',", "'prop1' = '1',", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 7c929b5da872a..88b8506138315 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -824,7 +824,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf assert(table.properties().get("comment").equals(description)) assert(table.properties().get("path").equals(dir.getAbsolutePath)) assert(table.properties().get("external").equals("true")) - assert(table.properties().get("location").equals("file:" + dir.getAbsolutePath)) + assert(table.properties().get("location").equals("file://" + dir.getAbsolutePath)) } } From 1c0640c20e83acace3ceb4c76f08e7aab3dc934a Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Sun, 11 Aug 2024 18:50:48 -0700 Subject: [PATCH 10/13] update --- .../sql/execution/datasources/v2/DataSourceV2Strategy.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 2797b48001e13..6423d216a19a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -53,7 +53,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat import DataSourceV2Implicits._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - lazy private val hadoopConf = session.sparkContext.hadoopConfiguration + private val hadoopConf = session.sessionState.newHadoopConf() private def refreshCache(r: DataSourceV2Relation)(): Unit = { session.sharedState.cacheManager.recacheByPlan(session, r) From 13f34fd92f159888b8452f63c8bdea271d8f7d34 Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Sun, 11 Aug 2024 18:54:35 -0700 Subject: [PATCH 11/13] update --- .../sql/execution/datasources/v2/DataSourceV2Strategy.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 6423d216a19a7..693bd0cd06ee9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -53,7 +53,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat import DataSourceV2Implicits._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - private val hadoopConf = session.sessionState.newHadoopConf() + private def hadoopConf = session.sessionState.newHadoopConf() private def refreshCache(r: DataSourceV2Relation)(): Unit = { session.sharedState.cacheManager.recacheByPlan(session, r) From 5dd4da0842765e8eb671f6890d02f822bc06b3cc Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Sun, 11 Aug 2024 19:22:47 -0700 Subject: [PATCH 12/13] update --- .../apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index a94066ca10342..2672d0e768928 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2105,9 +2105,9 @@ class DataSourceV2SQLSuiteV1Filter test("REPLACE TABLE: v1 table") { sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}") - val descInfo = sql(s"DESCRIBE TABLE EXTENDED tbl").collectAsList() - // This is the provider field that we check the table is a type of `SimpleScanSource`. - assert(descInfo.get(9).getString(1) == classOf[SimpleScanSource].getName) + val v2Catalog = catalog("spark_catalog").asTableCatalog + val table = v2Catalog.loadTable(Identifier.of(Array("default"), "tbl")) + assert(table.properties().get(TableCatalog.PROP_PROVIDER) == classOf[SimpleScanSource].getName) } test("DeleteFrom: - delete with invalid predicate") { From 9c6426615f1e5365c7007c23013c77e156ef7bd2 Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Sun, 11 Aug 2024 19:23:12 -0700 Subject: [PATCH 13/13] Update sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala Co-authored-by: Wenchen Fan --- .../spark/sql/catalyst/analysis/ResolveSessionCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 92255775cfa5a..910b9178e4151 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -425,7 +425,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } case ShowFunctions( - ResolvedDatabaseInSessionCatalog(db), userScope, systemScope, pattern, output) => + ResolvedDatabaseInSessionCatalog(db), userScope, systemScope, pattern, output) => ShowFunctionsCommand(db, pattern, userScope, systemScope, output) case DropFunction(ResolvedPersistentFunc(catalog, identifier, _), ifExists) =>