From 42e231fda1697eed28d103a2e3114c99e7b71b04 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 31 Jul 2019 16:51:23 -0700 Subject: [PATCH 01/14] save --- .../apache/spark/sql/catalog/v2/Catalogs.java | 7 +- .../apache/spark/sql/DataFrameWriter.scala | 84 +++++++++++++++++-- .../v2/DataSourceV2DataFrameSuite.scala | 64 +++++++++++++- 3 files changed, 147 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java index 7511d947615a6..ff3973d20ca74 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java @@ -26,6 +26,7 @@ import java.lang.reflect.InvocationTargetException; import java.util.HashMap; import java.util.Map; +import java.util.NoSuchElementException; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -50,8 +51,10 @@ private Catalogs() { */ public static CatalogPlugin load(String name, SQLConf conf) throws CatalogNotFoundException, SparkException { - String pluginClassName = conf.getConfString("spark.sql.catalog." + name, null); - if (pluginClassName == null) { + String pluginClassName; + try { + pluginClassName = conf.getConfString("spark.sql.catalog." + name); + } catch (NoSuchElementException e) { throw new CatalogNotFoundException(String.format( "Catalog '%s' plugin class not found: spark.sql.catalog.%s is not defined", name, name)); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 549c54f45d1e3..20eebd0da6fe6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -22,12 +22,13 @@ import java.util.{Locale, Properties, UUID} import scala.collection.JavaConverters._ import org.apache.spark.annotation.Stable -import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier} +import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableCatalog} +import org.apache.spark.sql.catalog.v2.expressions._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} +import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation} @@ -37,7 +38,7 @@ import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.TableCapability._ -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{IntegerType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap /** @@ -485,7 +486,80 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 1.4.0 */ def saveAsTable(tableName: String): Unit = { - saveAsTable(df.sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)) + import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, CatalogObjectIdentifier} + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ + + val session = df.sparkSession + val useV1Sources = + session.sessionState.conf.useV1SourceWriterList.toLowerCase(Locale.ROOT).split(",") + val cls = DataSource.lookupDataSource(source, session.sessionState.conf) + val shouldUseV1Source = cls.newInstance() match { + case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => true + case _ => useV1Sources.contains(cls.getCanonicalName.toLowerCase(Locale.ROOT)) + } + + val canUseV2 = !shouldUseV1Source && classOf[TableProvider].isAssignableFrom(cls) + val sessionCatalogOpt = session.sessionState.analyzer.sessionCatalog + + session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { + case CatalogObjectIdentifier(Some(catalog), ident) => + saveAsTable(catalog.asTableCatalog, ident, modeForDSV2) + + case CatalogObjectIdentifier(None, ident) if canUseV2 && sessionCatalogOpt.isDefined => + // We pass in the modeForDSV1, as using the V2 session catalog should maintain compatibility + // for now. + saveAsTable(sessionCatalogOpt.get.asTableCatalog, ident, modeForDSV1) + + case AsTableIdentifier(tableIdentifier) => + saveAsTable(tableIdentifier) + } + } + + + private def saveAsTable(catalog: TableCatalog, ident: Identifier, mode: SaveMode): Unit = { + val partitioning = partitioningColumns.map { colNames => + colNames.map(name => IdentityTransform(FieldReference(name))) + }.getOrElse(Seq.empty[Transform]) + val bucketing = bucketColumnNames.map { cols => + Seq(BucketTransform(LiteralValue(numBuckets.get, IntegerType), cols.map(FieldReference(_)))) + }.getOrElse(Seq.empty[Transform]) + val partitionTransforms = partitioning ++ bucketing + + val tableOpt = try Option(catalog.loadTable(ident)) catch { + case _: NoSuchTableException => None + } + + val command = (mode, tableOpt) match { + case (SaveMode.Append, Some(table)) => + AppendData.byName(DataSourceV2Relation.create(table), df.logicalPlan) + + case (SaveMode.Overwrite, _) => + ReplaceTableAsSelect( + catalog, + ident, + partitionTransforms, + df.queryExecution.analyzed, + Map.empty, // properties can't be specified through this API + extraOptions.toMap, + orCreate = true) // Create the table if it doesn't exist + + case (other, _) => + // We have a potential race condition here in AppendMode, if the table suddenly gets + // created between our existence check and physical execution, but this can't be helped + // in any case. + CreateTableAsSelect( + catalog, + ident, + partitionTransforms, + df.queryExecution.analyzed, + Map.empty, + extraOptions.toMap, + ignoreIfExists = other == SaveMode.Ignore) + } + + runCommand(df.sparkSession, "saveAsTable") { + command + } } private def saveAsTable(tableIdent: TableIdentifier): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala index 86735c627cc56..966ec227a4a55 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala @@ -17,15 +17,23 @@ package org.apache.spark.sql.sources.v2 +import java.util.concurrent.CountDownLatch + import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.datasources.v2.parquet.{ParquetDataSourceV2, ParquetTable} import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode} import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.util.QueryExecutionListener class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with BeforeAndAfter { import testImplicits._ + private val v2Format = classOf[TestV2ParquetWrapper].getName + before { spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) spark.conf.set("spark.sql.catalog.testcat2", classOf[TestInMemoryTableCatalog].getName) @@ -104,4 +112,58 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be } } } + + test("saveAsTable: with defined catalog and table doesn't exist") { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + spark.table("source").select("id", "data").write.saveAsTable(t1) + checkAnswer(spark.table(t1), spark.table("source")) + } + } + + test("saveAsTable: with defined catalog and table exists") { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo") + spark.table("source").select("id", "data").write.saveAsTable(t1) + checkAnswer(spark.table(t1), spark.table("source")) + } + } + + test("saveAsTable: with session catalog and v2 table - table doesn't exist") { + val t1 = "tbl" + withTable(t1) { + spark.table("source").select("id", "data").write.format(v2Format).saveAsTable(t1) + checkAnswer(spark.table(t1), spark.table("source")) + } + } + + test("saveAsTable: with session catalog and v2 table - table exists") { + val t1 = "tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format") + intercept[TableAlreadyExistsException] { + spark.table("source").select("id", "data").write.format(v2Format).saveAsTable(t1) + } + spark.table("source").select("id", "data") + .write.format(v2Format).mode("append").saveAsTable(t1) + checkAnswer(spark.table(t1), spark.table("source")) + } + } +} + +class SingleSQLExecutionListener extends QueryExecutionListener { + + private var qe: QueryExecution = _ + // A latch that callers can wait on for an async message to be processed. + val latch = new CountDownLatch(1) + + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {} + + override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = { + this.qe = qe + latch.countDown() + } + + def getExecutedQuery: QueryExecution = qe } From cd8d4a6f04a06f297a16bfff5beb6a45d6173fe2 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 1 Aug 2019 00:12:30 -0700 Subject: [PATCH 02/14] working tests --- .../sql/catalyst/analysis/Analyzer.scala | 8 +- .../apache/spark/sql/DataFrameWriter.scala | 3 + .../v2/WriteToDataSourceV2Exec.scala | 5 +- .../v2/DataSourceV2DataFrameSuite.scala | 98 +++++++++++++------ 4 files changed, 80 insertions(+), 34 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5bf4dc1f045a4..09b6fb491ce3c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -648,8 +648,12 @@ class Analyzer( if catalog.isTemporaryTable(ident) => u // temporary views take precedence over catalog table names - case u @ UnresolvedRelation(CatalogObjectIdentifier(Some(catalogPlugin), ident)) => - loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u) + case u @ UnresolvedRelation(CatalogObjectIdentifier(maybeCatalog, ident)) => + maybeCatalog.orElse(sessionCatalog) match { + case Some(catalogPlugin) => + loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u) + case None => u + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 20eebd0da6fe6..4a2de8efc721e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -531,9 +531,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val command = (mode, tableOpt) match { case (SaveMode.Append, Some(table)) => + println("Append code path") AppendData.byName(DataSourceV2Relation.create(table), df.logicalPlan) case (SaveMode.Overwrite, _) => + println("Overwrite code path") ReplaceTableAsSelect( catalog, ident, @@ -544,6 +546,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { orCreate = true) // Create the table if it doesn't exist case (other, _) => + println(s"Others: ignoreIfExists = ${other == SaveMode.Ignore}") // We have a potential race condition here in AppendMode, if the table suddenly gets // created between our existence check and physical execution, but this can't be helped // in any case. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 86b64cb8835ad..f61ed042c6808 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -80,8 +80,9 @@ case class CreateTableAsSelectExec( } Utils.tryWithSafeFinallyAndFailureCallbacks({ - catalog.createTable( - ident, query.schema, partitioning.toArray, properties.asJava) match { + val t = catalog.createTable( + ident, query.schema, partitioning.toArray, properties.asJava) + t match { case table: SupportsWrite => val batchWrite = table.newWriteBuilder(writeOptions) .withInputDataSchema(query.schema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala index 966ec227a4a55..cc50a9701cc04 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala @@ -17,29 +17,38 @@ package org.apache.spark.sql.sources.v2 -import java.util.concurrent.CountDownLatch +import java.util +import java.util.concurrent.ConcurrentHashMap -import org.scalatest.BeforeAndAfter +import scala.collection.JavaConverters._ +import scala.collection.mutable -import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.scalatest.{BeforeAndAfter, PrivateMethodTester} + +import org.apache.spark.sql.{AnalysisException, QueryTest, SparkSession} +import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier} +import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog import org.apache.spark.sql.execution.datasources.v2.parquet.{ParquetDataSourceV2, ParquetTable} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode} import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.util.QueryExecutionListener +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.{CaseInsensitiveStringMap, QueryExecutionListener} class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with BeforeAndAfter { import testImplicits._ - private val v2Format = classOf[TestV2ParquetWrapper].getName + private val v2Format = classOf[InMemoryTableProvider].getName + private val dfData = Seq((1L, "a"), (2L, "b"), (3L, "c")) before { spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) spark.conf.set("spark.sql.catalog.testcat2", classOf[TestInMemoryTableCatalog].getName) - val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") - df.createOrReplaceTempView("source") + spark.createDataFrame(dfData).toDF("id", "data").createOrReplaceTempView("source") val df2 = spark.createDataFrame(Seq((4L, "d"), (5L, "e"), (6L, "f"))).toDF("id", "data") df2.createOrReplaceTempView("source2") } @@ -50,6 +59,19 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be spark.sql("DROP VIEW source2") } + private def sessionCatalogTest(testName: String)(f: SparkSession => Unit): Unit = { + test("using session catalog: " + testName) { + val catalogConf = SQLConf.V2_SESSION_CATALOG + val newSession = spark.newSession() + newSession.createDataFrame(dfData).toDF("id", "data").createOrReplaceTempView("source") + newSession.sessionState.conf.setConf(catalogConf, classOf[TestV2SessionCatalog].getName) + try f(newSession) finally { + newSession.catalog("session").asInstanceOf[TestV2SessionCatalog].clearTables() + newSession.sql("DROP VIEW source") + } + } + } + test("insertInto: append") { val t1 = "testcat.ns1.ns2.tbl" withTable(t1) { @@ -130,40 +152,56 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be } } - test("saveAsTable: with session catalog and v2 table - table doesn't exist") { + sessionCatalogTest("saveAsTable and v2 table - table doesn't exist") { session => val t1 = "tbl" - withTable(t1) { - spark.table("source").select("id", "data").write.format(v2Format).saveAsTable(t1) - checkAnswer(spark.table(t1), spark.table("source")) - } + session.table("source").select("id", "data").write.format(v2Format).saveAsTable(t1) + checkAnswer(session.table(t1), session.table("source")) } - test("saveAsTable: with session catalog and v2 table - table exists") { + sessionCatalogTest("saveAsTable: with session catalog and v2 table - table exists") { session => val t1 = "tbl" - withTable(t1) { - sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format") - intercept[TableAlreadyExistsException] { - spark.table("source").select("id", "data").write.format(v2Format).saveAsTable(t1) - } - spark.table("source").select("id", "data") - .write.format(v2Format).mode("append").saveAsTable(t1) - checkAnswer(spark.table(t1), spark.table("source")) + session.sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format") + intercept[TableAlreadyExistsException] { + session.table("source").select("id", "data").write.format(v2Format).saveAsTable(t1) } + session.table("source").select("id", "data") + .write.format(v2Format).mode("append").saveAsTable(t1) + checkAnswer(session.table(t1), session.table("source")) + } +} + +class InMemoryTableProvider extends TableProvider { + override def getTable(options: CaseInsensitiveStringMap): Table = { + throw new UnsupportedOperationException("D'oh!") } } -class SingleSQLExecutionListener extends QueryExecutionListener { +/** A SessionCatalog that always loads an in memory Table, so we can test write code paths. */ +class TestV2SessionCatalog extends V2SessionCatalog { - private var qe: QueryExecution = _ - // A latch that callers can wait on for an async message to be processed. - val latch = new CountDownLatch(1) + protected val tables: util.Map[Identifier, InMemoryTable] = + new ConcurrentHashMap[Identifier, InMemoryTable]() - override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {} + override def loadTable(ident: Identifier): Table = { + if (tables.containsKey(ident)) { + tables.get(ident) + } else { + super.loadTable(ident) + } + } - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = { - this.qe = qe - latch.countDown() + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { + val t = new InMemoryTable(ident.name(), schema, partitions, properties) + tables.put(ident, t) + t } - def getExecutedQuery: QueryExecution = qe + def clearTables(): Unit = { + tables.keySet().asScala.foreach(super.dropTable) + tables.clear() + } } From 4797234da548c9d95b0b49ce9eee190711f08fb0 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 1 Aug 2019 12:20:59 -0700 Subject: [PATCH 03/14] added a bunch of tests --- .../sql/catalyst/analysis/Analyzer.scala | 11 +- .../apache/spark/sql/DataFrameWriter.scala | 3 - .../v2/DataSourceV2DataFrameSuite.scala | 115 ++++++++++++++++-- 3 files changed, 112 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 09b6fb491ce3c..dcda75ae82f44 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -649,11 +649,12 @@ class Analyzer( u // temporary views take precedence over catalog table names case u @ UnresolvedRelation(CatalogObjectIdentifier(maybeCatalog, ident)) => - maybeCatalog.orElse(sessionCatalog) match { - case Some(catalogPlugin) => - loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u) - case None => u - } + // First try loading the table with a loadable catalog, then fallback to the session + // catalog if that exists + maybeCatalog.flatMap(loadTable(_, ident)) + .orElse(sessionCatalog.flatMap(loadTable(_, ident))) + .map(DataSourceV2Relation.create) + .getOrElse(u) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 4a2de8efc721e..20eebd0da6fe6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -531,11 +531,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val command = (mode, tableOpt) match { case (SaveMode.Append, Some(table)) => - println("Append code path") AppendData.byName(DataSourceV2Relation.create(table), df.logicalPlan) case (SaveMode.Overwrite, _) => - println("Overwrite code path") ReplaceTableAsSelect( catalog, ident, @@ -546,7 +544,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { orCreate = true) // Create the table if it doesn't exist case (other, _) => - println(s"Others: ignoreIfExists = ${other == SaveMode.Ignore}") // We have a potential race condition here in AppendMode, if the table suddenly gets // created between our existence check and physical execution, but this can't be helped // in any case. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala index cc50a9701cc04..1d81ee6b582c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala @@ -25,10 +25,12 @@ import scala.collection.mutable import org.scalatest.{BeforeAndAfter, PrivateMethodTester} -import org.apache.spark.sql.{AnalysisException, QueryTest, SparkSession} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SparkSession} import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier} import org.apache.spark.sql.catalog.v2.expressions.Transform +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog import org.apache.spark.sql.execution.datasources.v2.parquet.{ParquetDataSourceV2, ParquetTable} @@ -43,9 +45,10 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be private val v2Format = classOf[InMemoryTableProvider].getName private val dfData = Seq((1L, "a"), (2L, "b"), (3L, "c")) + private val catalogName = "testcat" before { - spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) + spark.conf.set(s"spark.sql.catalog.$catalogName", classOf[TestInMemoryTableCatalog].getName) spark.conf.set("spark.sql.catalog.testcat2", classOf[TestInMemoryTableCatalog].getName) spark.createDataFrame(dfData).toDF("id", "data").createOrReplaceTempView("source") @@ -138,7 +141,7 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be test("saveAsTable: with defined catalog and table doesn't exist") { val t1 = "testcat.ns1.ns2.tbl" withTable(t1) { - spark.table("source").select("id", "data").write.saveAsTable(t1) + spark.table("source").write.saveAsTable(t1) checkAnswer(spark.table(t1), spark.table("source")) } } @@ -147,27 +150,116 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be val t1 = "testcat.ns1.ns2.tbl" withTable(t1) { sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo") - spark.table("source").select("id", "data").write.saveAsTable(t1) + // Default saveMode is append, therefore this doesn't throw a table already exists eception + spark.table("source").write.saveAsTable(t1) checkAnswer(spark.table(t1), spark.table("source")) } } + test("saveAsTable: with defined catalog + table overwrite and table doesn't exist") { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + spark.table("source").write.mode("overwrite").saveAsTable(t1) + checkAnswer(spark.table(t1), spark.table("source")) + } + } + + test("saveAsTable: with defined catalog + table overwrite and table exists") { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 USING foo AS SELECT 'c', 'd'") + spark.table("source").write.mode("overwrite").saveAsTable(t1) + checkAnswer(spark.table(t1), spark.table("source")) + } + } + + test("saveAsTable: with defined catalog + ignore mode and table doesn't exist") { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + spark.table("source").write.mode("ignore").saveAsTable(t1) + checkAnswer(spark.table(t1), spark.table("source")) + } + } + + test("saveAsTable: with defined catalog + ignore mode and table exists") { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 USING foo AS SELECT 'c', 'd'") + spark.table("source").write.mode("ignore").saveAsTable(t1) + checkAnswer(spark.table(t1), Seq(Row("c", "d"))) + } + } + sessionCatalogTest("saveAsTable and v2 table - table doesn't exist") { session => val t1 = "tbl" - session.table("source").select("id", "data").write.format(v2Format).saveAsTable(t1) + session.table("source").write.format(v2Format).saveAsTable(t1) checkAnswer(session.table(t1), session.table("source")) } - sessionCatalogTest("saveAsTable: with session catalog and v2 table - table exists") { session => + sessionCatalogTest("saveAsTable: v2 table - table exists") { session => val t1 = "tbl" session.sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format") intercept[TableAlreadyExistsException] { session.table("source").select("id", "data").write.format(v2Format).saveAsTable(t1) } - session.table("source").select("id", "data") - .write.format(v2Format).mode("append").saveAsTable(t1) + session.table("source").write.format(v2Format).mode("append").saveAsTable(t1) + checkAnswer(session.table(t1), session.table("source")) + } + + sessionCatalogTest("saveAsTable: v2 table - table overwrite and table doesn't exist") { session => + val t1 = "tbl" + session.table("source").write.format(v2Format).mode("overwrite").saveAsTable(t1) + checkAnswer(session.table(t1), session.table("source")) + } + + sessionCatalogTest("saveAsTable: v2 table - table overwrite and table exists") { session => + val t1 = "tbl" + session.sql(s"CREATE TABLE $t1 USING $v2Format AS SELECT 'c', 'd'") + session.table("source").write.format(v2Format).mode("overwrite").saveAsTable(t1) + checkAnswer(session.table(t1), session.table("source")) + } + + sessionCatalogTest("saveAsTable: v2 table - ignore mode and table doesn't exist") { session => + val t1 = "tbl" + session.table("source").write.format(v2Format).mode("ignore").saveAsTable(t1) checkAnswer(session.table(t1), session.table("source")) } + + sessionCatalogTest("saveAsTable: v2 table - ignore mode and table exists") { session => + val t1 = "tbl" + session.sql(s"CREATE TABLE $t1 USING $v2Format AS SELECT 'c', 'd'") + session.table("source").write.format(v2Format).mode("ignore").saveAsTable(t1) + checkAnswer(session.table(t1), Seq(Row("c", "d"))) + } + + sessionCatalogTest("saveAsTable: old table defined in a database colliding " + + "with a catalog name") { session => + // Make sure the database name conflicts with a catalog name + val dbPath = session.sessionState.catalog.getDefaultDBPath(catalogName) + session.sessionState.catalog.createDatabase( + CatalogDatabase(catalogName, "", dbPath, Map.empty), ignoreIfExists = false) + val t1 = "tbl" + withTable(t1) { + // Create the table in the built in catalog, in the given database + session.sessionState.catalog.createTable( + CatalogTable( + identifier = TableIdentifier(t1, Some(catalogName)), + tableType = CatalogTableType.MANAGED, + provider = Some(v2Format), + storage = CatalogStorageFormat(None, None, None, None, false, Map.empty), + schema = session.table("source").schema + ), + ignoreIfExists = false + ) + val tableName = s"$catalogName.$t1" + checkAnswer(session.table(tableName), Nil) + intercept[TableAlreadyExistsException] { + session.table("source").write.format(v2Format).saveAsTable(tableName) + } + session.table("source").write.format(v2Format).mode("append").saveAsTable(tableName) + checkAnswer(session.table(tableName), session.table("source")) + } + } } class InMemoryTableProvider extends TableProvider { @@ -186,7 +278,11 @@ class TestV2SessionCatalog extends V2SessionCatalog { if (tables.containsKey(ident)) { tables.get(ident) } else { - super.loadTable(ident) + // Table was created through the built-in catalog + val t = super.loadTable(ident) + val table = new InMemoryTable(t.name(), t.schema(), t.partitioning(), t.properties()) + tables.put(ident, table) + table } } @@ -201,6 +297,7 @@ class TestV2SessionCatalog extends V2SessionCatalog { } def clearTables(): Unit = { + assert(!tables.isEmpty, "Tables were empty, maybe didn't use the session catalog code path?") tables.keySet().asScala.foreach(super.dropTable) tables.clear() } From f9c2f952f613982ba44ee7ed2e086f279f00ccfa Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 1 Aug 2019 12:21:58 -0700 Subject: [PATCH 04/14] added comment --- .../apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala index 1d81ee6b582c7..3b139c10cb812 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala @@ -254,6 +254,7 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be val tableName = s"$catalogName.$t1" checkAnswer(session.table(tableName), Nil) intercept[TableAlreadyExistsException] { + // Make sure default save mode is same as before session.table("source").write.format(v2Format).saveAsTable(tableName) } session.table("source").write.format(v2Format).mode("append").saveAsTable(tableName) From 6f3c106571a301408e3f45a0f132ac907d3776ad Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 1 Aug 2019 12:26:37 -0700 Subject: [PATCH 05/14] clean up --- .../java/org/apache/spark/sql/catalog/v2/Catalogs.java | 7 ++----- .../execution/datasources/v2/WriteToDataSourceV2Exec.scala | 5 ++--- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java index ff3973d20ca74..7511d947615a6 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java @@ -26,7 +26,6 @@ import java.lang.reflect.InvocationTargetException; import java.util.HashMap; import java.util.Map; -import java.util.NoSuchElementException; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -51,10 +50,8 @@ private Catalogs() { */ public static CatalogPlugin load(String name, SQLConf conf) throws CatalogNotFoundException, SparkException { - String pluginClassName; - try { - pluginClassName = conf.getConfString("spark.sql.catalog." + name); - } catch (NoSuchElementException e) { + String pluginClassName = conf.getConfString("spark.sql.catalog." + name, null); + if (pluginClassName == null) { throw new CatalogNotFoundException(String.format( "Catalog '%s' plugin class not found: spark.sql.catalog.%s is not defined", name, name)); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index f61ed042c6808..86b64cb8835ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -80,9 +80,8 @@ case class CreateTableAsSelectExec( } Utils.tryWithSafeFinallyAndFailureCallbacks({ - val t = catalog.createTable( - ident, query.schema, partitioning.toArray, properties.asJava) - t match { + catalog.createTable( + ident, query.schema, partitioning.toArray, properties.asJava) match { case table: SupportsWrite => val batchWrite = table.newWriteBuilder(writeOptions) .withInputDataSchema(query.schema) From dbb7e1f2ed83c1b06d87a3e0378162e5fc80b82b Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 1 Aug 2019 12:27:22 -0700 Subject: [PATCH 06/14] added testQuietly --- .../sql/sources/v2/DataSourceV2DataFrameSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala index 3b139c10cb812..118ef22ed1722 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala @@ -138,7 +138,7 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be } } - test("saveAsTable: with defined catalog and table doesn't exist") { + testQuietly("saveAsTable: with defined catalog and table doesn't exist") { val t1 = "testcat.ns1.ns2.tbl" withTable(t1) { spark.table("source").write.saveAsTable(t1) @@ -146,7 +146,7 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be } } - test("saveAsTable: with defined catalog and table exists") { + testQuietly("saveAsTable: with defined catalog and table exists") { val t1 = "testcat.ns1.ns2.tbl" withTable(t1) { sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo") @@ -156,7 +156,7 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be } } - test("saveAsTable: with defined catalog + table overwrite and table doesn't exist") { + testQuietly("saveAsTable: with defined catalog + table overwrite and table doesn't exist") { val t1 = "testcat.ns1.ns2.tbl" withTable(t1) { spark.table("source").write.mode("overwrite").saveAsTable(t1) @@ -164,7 +164,7 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be } } - test("saveAsTable: with defined catalog + table overwrite and table exists") { + testQuietly("saveAsTable: with defined catalog + table overwrite and table exists") { val t1 = "testcat.ns1.ns2.tbl" withTable(t1) { sql(s"CREATE TABLE $t1 USING foo AS SELECT 'c', 'd'") @@ -173,7 +173,7 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be } } - test("saveAsTable: with defined catalog + ignore mode and table doesn't exist") { + testQuietly("saveAsTable: with defined catalog + ignore mode and table doesn't exist") { val t1 = "testcat.ns1.ns2.tbl" withTable(t1) { spark.table("source").write.mode("ignore").saveAsTable(t1) @@ -181,7 +181,7 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be } } - test("saveAsTable: with defined catalog + ignore mode and table exists") { + testQuietly("saveAsTable: with defined catalog + ignore mode and table exists") { val t1 = "testcat.ns1.ns2.tbl" withTable(t1) { sql(s"CREATE TABLE $t1 USING foo AS SELECT 'c', 'd'") From 953da5109c713b95da358849fb4fe44e9bb0a332 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 5 Aug 2019 14:30:17 -0700 Subject: [PATCH 07/14] address comments --- .../sql/catalyst/analysis/Analyzer.scala | 6 +- ...aSourceDFWriterV2SessionCatalogSuite.scala | 142 +++++++++++++++ .../v2/DataSourceV2DataFrameSuite.scala | 162 ++---------------- 3 files changed, 154 insertions(+), 156 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceDFWriterV2SessionCatalogSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index dcda75ae82f44..7e3f9b39aef31 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -649,10 +649,8 @@ class Analyzer( u // temporary views take precedence over catalog table names case u @ UnresolvedRelation(CatalogObjectIdentifier(maybeCatalog, ident)) => - // First try loading the table with a loadable catalog, then fallback to the session - // catalog if that exists - maybeCatalog.flatMap(loadTable(_, ident)) - .orElse(sessionCatalog.flatMap(loadTable(_, ident))) + maybeCatalog.orElse(sessionCatalog) + .flatMap(loadTable(_, ident)) .map(DataSourceV2Relation.create) .getOrElse(u) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceDFWriterV2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceDFWriterV2SessionCatalogSuite.scala new file mode 100644 index 0000000000000..e46b38c361afe --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceDFWriterV2SessionCatalogSuite.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2 + +import java.util +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalog.v2.Identifier +import org.apache.spark.sql.catalog.v2.expressions.Transform +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class DataSourceDFWriterV2SessionCatalogSuite + extends QueryTest + with SharedSQLContext + with BeforeAndAfter { + import testImplicits._ + + private val v2Format = classOf[InMemoryTableProvider].getName + private val dfData = Seq((1L, "a"), (2L, "b"), (3L, "c")) + + before { + spark.conf.set(SQLConf.V2_SESSION_CATALOG.key, classOf[TestV2SessionCatalog].getName) + spark.createDataFrame(dfData).toDF("id", "data").createOrReplaceTempView("source") + } + + override def afterEach(): Unit = { + super.afterEach() + spark.catalog("session").asInstanceOf[TestV2SessionCatalog].clearTables() + } + + test("saveAsTable and v2 table - table doesn't exist") { + val t1 = "tbl" + spark.table("source").write.format(v2Format).saveAsTable(t1) + checkAnswer(spark.table(t1), spark.table("source")) + } + + test("saveAsTable: v2 table - table exists") { + val t1 = "tbl" + spark.sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format") + intercept[TableAlreadyExistsException] { + spark.table("source").select("id", "data").write.format(v2Format).saveAsTable(t1) + } + spark.table("source").write.format(v2Format).mode("append").saveAsTable(t1) + checkAnswer(spark.table(t1), spark.table("source")) + + // Check that appends are by name + spark.table("source").select('data, 'id).write.format(v2Format).mode("append").saveAsTable(t1) + checkAnswer(spark.table(t1), spark.table("source").union(spark.table("source"))) + } + + test("saveAsTable: v2 table - table overwrite and table doesn't exist") { + val t1 = "tbl" + spark.table("source").write.format(v2Format).mode("overwrite").saveAsTable(t1) + checkAnswer(spark.table(t1), spark.table("source")) + } + + test("saveAsTable: v2 table - table overwrite and table exists") { + val t1 = "tbl" + spark.sql(s"CREATE TABLE $t1 USING $v2Format AS SELECT 'c', 'd'") + spark.table("source").write.format(v2Format).mode("overwrite").saveAsTable(t1) + checkAnswer(spark.table(t1), spark.table("source")) + } + + test("saveAsTable: v2 table - ignore mode and table doesn't exist") { + val t1 = "tbl" + spark.table("source").write.format(v2Format).mode("ignore").saveAsTable(t1) + checkAnswer(spark.table(t1), spark.table("source")) + } + + test("saveAsTable: v2 table - ignore mode and table exists") { + val t1 = "tbl" + spark.sql(s"CREATE TABLE $t1 USING $v2Format AS SELECT 'c', 'd'") + spark.table("source").write.format(v2Format).mode("ignore").saveAsTable(t1) + checkAnswer(spark.table(t1), Seq(Row("c", "d"))) + } +} + +class InMemoryTableProvider extends TableProvider { + override def getTable(options: CaseInsensitiveStringMap): Table = { + throw new UnsupportedOperationException("D'oh!") + } +} + +/** A SessionCatalog that always loads an in memory Table, so we can test write code paths. */ +class TestV2SessionCatalog extends V2SessionCatalog { + + protected val tables: util.Map[Identifier, InMemoryTable] = + new ConcurrentHashMap[Identifier, InMemoryTable]() + + override def loadTable(ident: Identifier): Table = { + if (tables.containsKey(ident)) { + tables.get(ident) + } else { + // Table was created through the built-in catalog + val t = super.loadTable(ident) + val table = new InMemoryTable(t.name(), t.schema(), t.partitioning(), t.properties()) + tables.put(ident, table) + table + } + } + + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { + val t = new InMemoryTable(ident.name(), schema, partitions, properties) + tables.put(ident, t) + t + } + + def clearTables(): Unit = { + assert(!tables.isEmpty, "Tables were empty, maybe didn't use the session catalog code path?") + tables.keySet().asScala.foreach(super.dropTable) + tables.clear() + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala index 118ef22ed1722..43b3da83954f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala @@ -17,41 +17,21 @@ package org.apache.spark.sql.sources.v2 -import java.util -import java.util.concurrent.ConcurrentHashMap +import org.scalatest.BeforeAndAfter -import scala.collection.JavaConverters._ -import scala.collection.mutable - -import org.scalatest.{BeforeAndAfter, PrivateMethodTester} - -import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SparkSession} -import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier} -import org.apache.spark.sql.catalog.v2.expressions.Transform -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException -import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType} -import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog -import org.apache.spark.sql.execution.datasources.v2.parquet.{ParquetDataSourceV2, ParquetTable} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode} import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.{CaseInsensitiveStringMap, QueryExecutionListener} class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with BeforeAndAfter { import testImplicits._ - private val v2Format = classOf[InMemoryTableProvider].getName - private val dfData = Seq((1L, "a"), (2L, "b"), (3L, "c")) - private val catalogName = "testcat" - before { - spark.conf.set(s"spark.sql.catalog.$catalogName", classOf[TestInMemoryTableCatalog].getName) + spark.conf.set(s"spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) spark.conf.set("spark.sql.catalog.testcat2", classOf[TestInMemoryTableCatalog].getName) - spark.createDataFrame(dfData).toDF("id", "data").createOrReplaceTempView("source") + val df1 = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") + df1.createOrReplaceTempView("source") val df2 = spark.createDataFrame(Seq((4L, "d"), (5L, "e"), (6L, "f"))).toDF("id", "data") df2.createOrReplaceTempView("source2") } @@ -62,19 +42,6 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be spark.sql("DROP VIEW source2") } - private def sessionCatalogTest(testName: String)(f: SparkSession => Unit): Unit = { - test("using session catalog: " + testName) { - val catalogConf = SQLConf.V2_SESSION_CATALOG - val newSession = spark.newSession() - newSession.createDataFrame(dfData).toDF("id", "data").createOrReplaceTempView("source") - newSession.sessionState.conf.setConf(catalogConf, classOf[TestV2SessionCatalog].getName) - try f(newSession) finally { - newSession.catalog("session").asInstanceOf[TestV2SessionCatalog].clearTables() - newSession.sql("DROP VIEW source") - } - } - } - test("insertInto: append") { val t1 = "testcat.ns1.ns2.tbl" withTable(t1) { @@ -150,9 +117,13 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be val t1 = "testcat.ns1.ns2.tbl" withTable(t1) { sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo") - // Default saveMode is append, therefore this doesn't throw a table already exists eception + // Default saveMode is append, therefore this doesn't throw a table already exists exception spark.table("source").write.saveAsTable(t1) checkAnswer(spark.table(t1), spark.table("source")) + + // also appends are by name not by position + spark.table("source").select('data, 'id).write.saveAsTable(t1) + checkAnswer(spark.table(t1), spark.table("source").union(spark.table("source"))) } } @@ -189,117 +160,4 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be checkAnswer(spark.table(t1), Seq(Row("c", "d"))) } } - - sessionCatalogTest("saveAsTable and v2 table - table doesn't exist") { session => - val t1 = "tbl" - session.table("source").write.format(v2Format).saveAsTable(t1) - checkAnswer(session.table(t1), session.table("source")) - } - - sessionCatalogTest("saveAsTable: v2 table - table exists") { session => - val t1 = "tbl" - session.sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format") - intercept[TableAlreadyExistsException] { - session.table("source").select("id", "data").write.format(v2Format).saveAsTable(t1) - } - session.table("source").write.format(v2Format).mode("append").saveAsTable(t1) - checkAnswer(session.table(t1), session.table("source")) - } - - sessionCatalogTest("saveAsTable: v2 table - table overwrite and table doesn't exist") { session => - val t1 = "tbl" - session.table("source").write.format(v2Format).mode("overwrite").saveAsTable(t1) - checkAnswer(session.table(t1), session.table("source")) - } - - sessionCatalogTest("saveAsTable: v2 table - table overwrite and table exists") { session => - val t1 = "tbl" - session.sql(s"CREATE TABLE $t1 USING $v2Format AS SELECT 'c', 'd'") - session.table("source").write.format(v2Format).mode("overwrite").saveAsTable(t1) - checkAnswer(session.table(t1), session.table("source")) - } - - sessionCatalogTest("saveAsTable: v2 table - ignore mode and table doesn't exist") { session => - val t1 = "tbl" - session.table("source").write.format(v2Format).mode("ignore").saveAsTable(t1) - checkAnswer(session.table(t1), session.table("source")) - } - - sessionCatalogTest("saveAsTable: v2 table - ignore mode and table exists") { session => - val t1 = "tbl" - session.sql(s"CREATE TABLE $t1 USING $v2Format AS SELECT 'c', 'd'") - session.table("source").write.format(v2Format).mode("ignore").saveAsTable(t1) - checkAnswer(session.table(t1), Seq(Row("c", "d"))) - } - - sessionCatalogTest("saveAsTable: old table defined in a database colliding " + - "with a catalog name") { session => - // Make sure the database name conflicts with a catalog name - val dbPath = session.sessionState.catalog.getDefaultDBPath(catalogName) - session.sessionState.catalog.createDatabase( - CatalogDatabase(catalogName, "", dbPath, Map.empty), ignoreIfExists = false) - val t1 = "tbl" - withTable(t1) { - // Create the table in the built in catalog, in the given database - session.sessionState.catalog.createTable( - CatalogTable( - identifier = TableIdentifier(t1, Some(catalogName)), - tableType = CatalogTableType.MANAGED, - provider = Some(v2Format), - storage = CatalogStorageFormat(None, None, None, None, false, Map.empty), - schema = session.table("source").schema - ), - ignoreIfExists = false - ) - val tableName = s"$catalogName.$t1" - checkAnswer(session.table(tableName), Nil) - intercept[TableAlreadyExistsException] { - // Make sure default save mode is same as before - session.table("source").write.format(v2Format).saveAsTable(tableName) - } - session.table("source").write.format(v2Format).mode("append").saveAsTable(tableName) - checkAnswer(session.table(tableName), session.table("source")) - } - } -} - -class InMemoryTableProvider extends TableProvider { - override def getTable(options: CaseInsensitiveStringMap): Table = { - throw new UnsupportedOperationException("D'oh!") - } -} - -/** A SessionCatalog that always loads an in memory Table, so we can test write code paths. */ -class TestV2SessionCatalog extends V2SessionCatalog { - - protected val tables: util.Map[Identifier, InMemoryTable] = - new ConcurrentHashMap[Identifier, InMemoryTable]() - - override def loadTable(ident: Identifier): Table = { - if (tables.containsKey(ident)) { - tables.get(ident) - } else { - // Table was created through the built-in catalog - val t = super.loadTable(ident) - val table = new InMemoryTable(t.name(), t.schema(), t.partitioning(), t.properties()) - tables.put(ident, table) - table - } - } - - override def createTable( - ident: Identifier, - schema: StructType, - partitions: Array[Transform], - properties: util.Map[String, String]): Table = { - val t = new InMemoryTable(ident.name(), schema, partitions, properties) - tables.put(ident, t) - t - } - - def clearTables(): Unit = { - assert(!tables.isEmpty, "Tables were empty, maybe didn't use the session catalog code path?") - tables.keySet().asScala.foreach(super.dropTable) - tables.clear() - } } From 554507a6da2fed65bf5166dc40171124d502126e Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 5 Aug 2019 14:36:28 -0700 Subject: [PATCH 08/14] adjust new suite as well --- ...aSourceDFWriterV2SessionCatalogSuite.scala | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceDFWriterV2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceDFWriterV2SessionCatalogSuite.scala index e46b38c361afe..9a80294c27f1f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceDFWriterV2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceDFWriterV2SessionCatalogSuite.scala @@ -41,11 +41,9 @@ class DataSourceDFWriterV2SessionCatalogSuite import testImplicits._ private val v2Format = classOf[InMemoryTableProvider].getName - private val dfData = Seq((1L, "a"), (2L, "b"), (3L, "c")) before { spark.conf.set(SQLConf.V2_SESSION_CATALOG.key, classOf[TestV2SessionCatalog].getName) - spark.createDataFrame(dfData).toDF("id", "data").createOrReplaceTempView("source") } override def afterEach(): Unit = { @@ -55,47 +53,53 @@ class DataSourceDFWriterV2SessionCatalogSuite test("saveAsTable and v2 table - table doesn't exist") { val t1 = "tbl" - spark.table("source").write.format(v2Format).saveAsTable(t1) - checkAnswer(spark.table(t1), spark.table("source")) + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") + df.write.format(v2Format).saveAsTable(t1) + checkAnswer(spark.table(t1), df) } test("saveAsTable: v2 table - table exists") { val t1 = "tbl" + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") spark.sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format") intercept[TableAlreadyExistsException] { - spark.table("source").select("id", "data").write.format(v2Format).saveAsTable(t1) + df.select("id", "data").write.format(v2Format).saveAsTable(t1) } - spark.table("source").write.format(v2Format).mode("append").saveAsTable(t1) - checkAnswer(spark.table(t1), spark.table("source")) + df.write.format(v2Format).mode("append").saveAsTable(t1) + checkAnswer(spark.table(t1), df) // Check that appends are by name - spark.table("source").select('data, 'id).write.format(v2Format).mode("append").saveAsTable(t1) - checkAnswer(spark.table(t1), spark.table("source").union(spark.table("source"))) + df.select('data, 'id).write.format(v2Format).mode("append").saveAsTable(t1) + checkAnswer(spark.table(t1), df.union(df)) } test("saveAsTable: v2 table - table overwrite and table doesn't exist") { val t1 = "tbl" - spark.table("source").write.format(v2Format).mode("overwrite").saveAsTable(t1) - checkAnswer(spark.table(t1), spark.table("source")) + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") + df.write.format(v2Format).mode("overwrite").saveAsTable(t1) + checkAnswer(spark.table(t1), df) } test("saveAsTable: v2 table - table overwrite and table exists") { val t1 = "tbl" + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") spark.sql(s"CREATE TABLE $t1 USING $v2Format AS SELECT 'c', 'd'") - spark.table("source").write.format(v2Format).mode("overwrite").saveAsTable(t1) - checkAnswer(spark.table(t1), spark.table("source")) + df.write.format(v2Format).mode("overwrite").saveAsTable(t1) + checkAnswer(spark.table(t1), df) } test("saveAsTable: v2 table - ignore mode and table doesn't exist") { val t1 = "tbl" - spark.table("source").write.format(v2Format).mode("ignore").saveAsTable(t1) - checkAnswer(spark.table(t1), spark.table("source")) + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") + df.write.format(v2Format).mode("ignore").saveAsTable(t1) + checkAnswer(spark.table(t1), df) } test("saveAsTable: v2 table - ignore mode and table exists") { val t1 = "tbl" + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") spark.sql(s"CREATE TABLE $t1 USING $v2Format AS SELECT 'c', 'd'") - spark.table("source").write.format(v2Format).mode("ignore").saveAsTable(t1) + df.write.format(v2Format).mode("ignore").saveAsTable(t1) checkAnswer(spark.table(t1), Seq(Row("c", "d"))) } } From b584359eb67fe469cfa65dbe79870cb8e8f5e538 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 7 Aug 2019 13:58:29 -0700 Subject: [PATCH 09/14] remove session catalog changes --- .../sql/catalyst/analysis/Analyzer.scala | 7 +- .../apache/spark/sql/DataFrameWriter.scala | 15 -- ...aSourceDFWriterV2SessionCatalogSuite.scala | 146 ------------------ .../v2/DataSourceV2DataFrameSuite.scala | 14 +- 4 files changed, 9 insertions(+), 173 deletions(-) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceDFWriterV2SessionCatalogSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7e3f9b39aef31..5bf4dc1f045a4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -648,11 +648,8 @@ class Analyzer( if catalog.isTemporaryTable(ident) => u // temporary views take precedence over catalog table names - case u @ UnresolvedRelation(CatalogObjectIdentifier(maybeCatalog, ident)) => - maybeCatalog.orElse(sessionCatalog) - .flatMap(loadTable(_, ident)) - .map(DataSourceV2Relation.create) - .getOrElse(u) + case u @ UnresolvedRelation(CatalogObjectIdentifier(Some(catalogPlugin), ident)) => + loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 1ed44d574cec5..709f408fd2824 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -490,26 +490,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ val session = df.sparkSession - val useV1Sources = - session.sessionState.conf.useV1SourceWriterList.toLowerCase(Locale.ROOT).split(",") - val cls = DataSource.lookupDataSource(source, session.sessionState.conf) - val shouldUseV1Source = cls.newInstance() match { - case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => true - case _ => useV1Sources.contains(cls.getCanonicalName.toLowerCase(Locale.ROOT)) - } - - val canUseV2 = !shouldUseV1Source && classOf[TableProvider].isAssignableFrom(cls) - val sessionCatalogOpt = session.sessionState.analyzer.sessionCatalog session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { case CatalogObjectIdentifier(Some(catalog), ident) => saveAsTable(catalog.asTableCatalog, ident, modeForDSV2) - case CatalogObjectIdentifier(None, ident) if canUseV2 && sessionCatalogOpt.isDefined => - // We pass in the modeForDSV1, as using the V2 session catalog should maintain compatibility - // for now. - saveAsTable(sessionCatalogOpt.get.asTableCatalog, ident, modeForDSV1) - case AsTableIdentifier(tableIdentifier) => saveAsTable(tableIdentifier) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceDFWriterV2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceDFWriterV2SessionCatalogSuite.scala deleted file mode 100644 index 9a80294c27f1f..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceDFWriterV2SessionCatalogSuite.scala +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2 - -import java.util -import java.util.concurrent.ConcurrentHashMap - -import scala.collection.JavaConverters._ - -import org.scalatest.BeforeAndAfter - -import org.apache.spark.sql.{QueryTest, Row} -import org.apache.spark.sql.catalog.v2.Identifier -import org.apache.spark.sql.catalog.v2.expressions.Transform -import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException -import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap - -class DataSourceDFWriterV2SessionCatalogSuite - extends QueryTest - with SharedSQLContext - with BeforeAndAfter { - import testImplicits._ - - private val v2Format = classOf[InMemoryTableProvider].getName - - before { - spark.conf.set(SQLConf.V2_SESSION_CATALOG.key, classOf[TestV2SessionCatalog].getName) - } - - override def afterEach(): Unit = { - super.afterEach() - spark.catalog("session").asInstanceOf[TestV2SessionCatalog].clearTables() - } - - test("saveAsTable and v2 table - table doesn't exist") { - val t1 = "tbl" - val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") - df.write.format(v2Format).saveAsTable(t1) - checkAnswer(spark.table(t1), df) - } - - test("saveAsTable: v2 table - table exists") { - val t1 = "tbl" - val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") - spark.sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format") - intercept[TableAlreadyExistsException] { - df.select("id", "data").write.format(v2Format).saveAsTable(t1) - } - df.write.format(v2Format).mode("append").saveAsTable(t1) - checkAnswer(spark.table(t1), df) - - // Check that appends are by name - df.select('data, 'id).write.format(v2Format).mode("append").saveAsTable(t1) - checkAnswer(spark.table(t1), df.union(df)) - } - - test("saveAsTable: v2 table - table overwrite and table doesn't exist") { - val t1 = "tbl" - val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") - df.write.format(v2Format).mode("overwrite").saveAsTable(t1) - checkAnswer(spark.table(t1), df) - } - - test("saveAsTable: v2 table - table overwrite and table exists") { - val t1 = "tbl" - val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") - spark.sql(s"CREATE TABLE $t1 USING $v2Format AS SELECT 'c', 'd'") - df.write.format(v2Format).mode("overwrite").saveAsTable(t1) - checkAnswer(spark.table(t1), df) - } - - test("saveAsTable: v2 table - ignore mode and table doesn't exist") { - val t1 = "tbl" - val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") - df.write.format(v2Format).mode("ignore").saveAsTable(t1) - checkAnswer(spark.table(t1), df) - } - - test("saveAsTable: v2 table - ignore mode and table exists") { - val t1 = "tbl" - val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") - spark.sql(s"CREATE TABLE $t1 USING $v2Format AS SELECT 'c', 'd'") - df.write.format(v2Format).mode("ignore").saveAsTable(t1) - checkAnswer(spark.table(t1), Seq(Row("c", "d"))) - } -} - -class InMemoryTableProvider extends TableProvider { - override def getTable(options: CaseInsensitiveStringMap): Table = { - throw new UnsupportedOperationException("D'oh!") - } -} - -/** A SessionCatalog that always loads an in memory Table, so we can test write code paths. */ -class TestV2SessionCatalog extends V2SessionCatalog { - - protected val tables: util.Map[Identifier, InMemoryTable] = - new ConcurrentHashMap[Identifier, InMemoryTable]() - - override def loadTable(ident: Identifier): Table = { - if (tables.containsKey(ident)) { - tables.get(ident) - } else { - // Table was created through the built-in catalog - val t = super.loadTable(ident) - val table = new InMemoryTable(t.name(), t.schema(), t.partitioning(), t.properties()) - tables.put(ident, table) - table - } - } - - override def createTable( - ident: Identifier, - schema: StructType, - partitions: Array[Transform], - properties: util.Map[String, String]): Table = { - val t = new InMemoryTable(ident.name(), schema, partitions, properties) - tables.put(ident, t) - t - } - - def clearTables(): Unit = { - assert(!tables.isEmpty, "Tables were empty, maybe didn't use the session catalog code path?") - tables.keySet().asScala.foreach(super.dropTable) - tables.clear() - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala index 06bad96eedee3..8909c41ddaa8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala @@ -27,7 +27,7 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be import testImplicits._ before { - spark.conf.set(s"spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) + spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) spark.conf.set("spark.sql.catalog.testcat2", classOf[TestInMemoryTableCatalog].getName) } @@ -142,7 +142,7 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be } } - testQuietly("saveAsTable: with defined catalog and table doesn't exist") { + testQuietly("saveAsTable: table doesn't exist => create table") { val t1 = "testcat.ns1.ns2.tbl" withTable(t1) { val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") @@ -151,7 +151,7 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be } } - testQuietly("saveAsTable: with defined catalog and table exists") { + testQuietly("saveAsTable: table exists => append by name") { val t1 = "testcat.ns1.ns2.tbl" withTable(t1) { sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo") @@ -166,7 +166,7 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be } } - testQuietly("saveAsTable: with defined catalog + table overwrite and table doesn't exist") { + testQuietly("saveAsTable: table overwrite and table doesn't exist => create table") { val t1 = "testcat.ns1.ns2.tbl" withTable(t1) { val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") @@ -175,7 +175,7 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be } } - testQuietly("saveAsTable: with defined catalog + table overwrite and table exists") { + testQuietly("saveAsTable: table overwrite and table exists => replace table") { val t1 = "testcat.ns1.ns2.tbl" withTable(t1) { sql(s"CREATE TABLE $t1 USING foo AS SELECT 'c', 'd'") @@ -185,7 +185,7 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be } } - testQuietly("saveAsTable: with defined catalog + ignore mode and table doesn't exist") { + testQuietly("saveAsTable: ignore mode and table doesn't exist => create table") { val t1 = "testcat.ns1.ns2.tbl" withTable(t1) { val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") @@ -194,7 +194,7 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be } } - testQuietly("saveAsTable: with defined catalog + ignore mode and table exists") { + testQuietly("saveAsTable: ignore mode and table exists => do nothing") { val t1 = "testcat.ns1.ns2.tbl" withTable(t1) { val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") From cc4d349459f78f61aed717ae05f736aa0f65949f Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 7 Aug 2019 14:00:59 -0700 Subject: [PATCH 10/14] save --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 709f408fd2824..ba2affe1032df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -28,13 +28,13 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation} import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister} +import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.TableCapability._ From 57f469dac86964d7084bd6d8f66b997f61316bb4 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 7 Aug 2019 14:02:10 -0700 Subject: [PATCH 11/14] weird --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index ba2affe1032df..8369dfc8ba8ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -34,8 +34,7 @@ import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation} import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode -import org.apache.spark.sql.sources.DataSourceRegister -import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister} import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.TableCapability._ import org.apache.spark.sql.types.{IntegerType, StructType} From d0ab258cf942085b2ae03b3e356c8b2aabcc57c8 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 8 Aug 2019 10:34:36 -0700 Subject: [PATCH 12/14] add case matching handling --- .../scala/org/apache/spark/sql/DataFrameWriter.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 8369dfc8ba8ee..44b95248fe466 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -360,6 +360,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { */ def insertInto(tableName: String): Unit = { import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, CatalogObjectIdentifier} + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ assertNotBucketed("insertInto") @@ -376,6 +377,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { insertInto(catalog, ident) case AsTableIdentifier(tableIdentifier) => insertInto(tableIdentifier) + case other => + // TODO(SPARK-28667): This should go through V2SessionCatalog + throw new UnsupportedOperationException( + s"Couldn't find a catalog to handle the identifier ${other.quoted}.") } } @@ -488,6 +493,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, CatalogObjectIdentifier} import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ val session = df.sparkSession session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { @@ -496,6 +502,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { case AsTableIdentifier(tableIdentifier) => saveAsTable(tableIdentifier) + + case other => + // TODO(SPARK-28666): This should go through V2SessionCatalog + throw new UnsupportedOperationException( + s"Couldn't find a catalog to handle the identifier ${other.quoted}.") } } From a67df297c29a91ef0f4051f7dfce231e07637147 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 8 Aug 2019 18:04:47 -0700 Subject: [PATCH 13/14] Update DataFrameWriter.scala --- .../main/scala/org/apache/spark/sql/DataFrameWriter.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 44b95248fe466..ad29942b66728 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -375,11 +375,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { df.sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { case CatalogObjectIdentifier(Some(catalog), ident) => insertInto(catalog, ident) + // TODO(SPARK-28667): Support the V2SessionCatalog case AsTableIdentifier(tableIdentifier) => insertInto(tableIdentifier) case other => - // TODO(SPARK-28667): This should go through V2SessionCatalog - throw new UnsupportedOperationException( + throw new AnalysisError( s"Couldn't find a catalog to handle the identifier ${other.quoted}.") } } @@ -499,13 +499,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { case CatalogObjectIdentifier(Some(catalog), ident) => saveAsTable(catalog.asTableCatalog, ident, modeForDSV2) + // TODO(SPARK-28666): This should go through V2SessionCatalog case AsTableIdentifier(tableIdentifier) => saveAsTable(tableIdentifier) case other => - // TODO(SPARK-28666): This should go through V2SessionCatalog - throw new UnsupportedOperationException( + throw new AnalysisError( s"Couldn't find a catalog to handle the identifier ${other.quoted}.") } } From 50f1eef93f84b7a51c0968420cac17a87fb22352 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 8 Aug 2019 18:11:51 -0700 Subject: [PATCH 14/14] Update DataFrameWriter.scala --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index ad29942b66728..af7ddd756ae89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -379,7 +379,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { case AsTableIdentifier(tableIdentifier) => insertInto(tableIdentifier) case other => - throw new AnalysisError( + throw new AnalysisException( s"Couldn't find a catalog to handle the identifier ${other.quoted}.") } } @@ -505,7 +505,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { saveAsTable(tableIdentifier) case other => - throw new AnalysisError( + throw new AnalysisException( s"Couldn't find a catalog to handle the identifier ${other.quoted}.") } }