From 4904d79e2579739e65f8324a539c34b448cb8ff5 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 9 Aug 2019 13:34:42 -0700 Subject: [PATCH 01/15] save all --- .../sql/catalyst/analysis/Analyzer.scala | 30 +--- .../apache/spark/sql/DataFrameWriter.scala | 25 ++- ...SourceV2DataFrameSessionCatalogSuite.scala | 146 ++++++++++++++++++ 3 files changed, 174 insertions(+), 27 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f8eef0cf32361..7e3f9b39aef31 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.objects._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, DescribeTableStatement, InsertIntoStatement} +import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, InsertIntoStatement} import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.catalyst.util.toPrettySQL @@ -171,7 +171,6 @@ class Analyzer( Batch("Resolution", fixedPoint, ResolveTableValuedFunctions :: ResolveAlterTable :: - ResolveDescribeTable :: ResolveInsertInto :: ResolveTables :: ResolveRelations :: @@ -324,8 +323,7 @@ class Analyzer( gid: Expression): Expression = { expr transform { case e: GroupingID => - if (e.groupByExprs.isEmpty || - e.groupByExprs.map(_.canonicalized) == groupByExprs.map(_.canonicalized)) { + if (e.groupByExprs.isEmpty || e.groupByExprs == groupByExprs) { Alias(gid, toPrettySQL(e))() } else { throw new AnalysisException( @@ -650,8 +648,11 @@ class Analyzer( if catalog.isTemporaryTable(ident) => u // temporary views take precedence over catalog table names - case u @ UnresolvedRelation(CatalogObjectIdentifier(Some(catalogPlugin), ident)) => - loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u) + case u @ UnresolvedRelation(CatalogObjectIdentifier(maybeCatalog, ident)) => + maybeCatalog.orElse(sessionCatalog) + .flatMap(loadTable(_, ident)) + .map(DataSourceV2Relation.create) + .getOrElse(u) } } @@ -973,21 +974,6 @@ class Analyzer( Seq(TableChange.setProperty("location", newLoc))) } } - /** - * Resolve DESCRIBE TABLE statements that use a DSv2 catalog. - * - * This rule converts unresolved DESCRIBE TABLE statements to v2 when a v2 catalog is responsible - * for the table identifier. A v2 catalog is responsible for an identifier when the identifier - * has a catalog specified, like prod_catalog.db.table, or when a default v2 catalog is set and - * the table identifier does not include a catalog. - */ - object ResolveDescribeTable extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case describe @ DescribeTableStatement( - CatalogObjectIdentifier(Some(v2Catalog), ident), _, isExtended) => - DescribeTable(UnresolvedRelation(describe.tableName), isExtended) - } - } /** * Replaces [[UnresolvedAttribute]]s with concrete [[AttributeReference]]s from @@ -1181,8 +1167,6 @@ class Analyzer( // To resolve duplicate expression IDs for Join and Intersect case j @ Join(left, right, _, _, _) if !j.duplicateResolved => j.copy(right = dedupRight(left, right)) - // intersect/except will be rewritten to join at the begininng of optimizer. Here we need to - // deduplicate the right side plan, so that we won't produce an invalid self-join later. case i @ Intersect(left, right, _) if !i.duplicateResolved => i.copy(right = dedupRight(left, right)) case e @ Except(left, right, _) if !e.duplicateResolved => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index af7ddd756ae89..a6c6c43354232 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -28,13 +28,14 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation} import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister} +import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.TableCapability._ import org.apache.spark.sql.types.{IntegerType, StructType} @@ -360,7 +361,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { */ def insertInto(tableName: String): Unit = { import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, CatalogObjectIdentifier} - import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ assertNotBucketed("insertInto") @@ -493,13 +493,27 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, CatalogObjectIdentifier} import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ - import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ val session = df.sparkSession + val useV1Sources = + session.sessionState.conf.useV1SourceWriterList.toLowerCase(Locale.ROOT).split(",") + val cls = DataSource.lookupDataSource(source, session.sessionState.conf) + val shouldUseV1Source = cls.newInstance() match { + case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => true + case _ => useV1Sources.contains(cls.getCanonicalName.toLowerCase(Locale.ROOT)) + } + + val canUseV2 = !shouldUseV1Source && classOf[TableProvider].isAssignableFrom(cls) + val sessionCatalogOpt = session.sessionState.analyzer.sessionCatalog session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { case CatalogObjectIdentifier(Some(catalog), ident) => saveAsTable(catalog.asTableCatalog, ident, modeForDSV2) - // TODO(SPARK-28666): This should go through V2SessionCatalog + + case CatalogObjectIdentifier(None, ident) + if canUseV2 && sessionCatalogOpt.isDefined && ident.namespace().length <= 1 => + // We pass in the modeForDSV1, as using the V2 session catalog should maintain compatibility + // for now. + saveAsTable(sessionCatalogOpt.get.asTableCatalog, ident, modeForDSV1) case AsTableIdentifier(tableIdentifier) => saveAsTable(tableIdentifier) @@ -523,6 +537,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val tableOpt = try Option(catalog.loadTable(ident)) catch { case _: NoSuchTableException => None } + if (tableOpt.exists(_.isInstanceOf[CatalogTableAsV2])) { + return saveAsTable(TableIdentifier(ident.name(), ident.namespace().headOption)) + } val command = (mode, tableOpt) match { case (SaveMode.Append, Some(table)) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala new file mode 100644 index 0000000000000..ed3b2beb6881f --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2 + +import java.util +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalog.v2.Identifier +import org.apache.spark.sql.catalog.v2.expressions.Transform +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class DataSourceV2DataFrameSessionCatalogSuite + extends QueryTest + with SharedSQLContext + with BeforeAndAfter { + import testImplicits._ + + private val v2Format = classOf[InMemoryTableProvider].getName + + before { + spark.conf.set(SQLConf.V2_SESSION_CATALOG.key, classOf[TestV2SessionCatalog].getName) + } + + override def afterEach(): Unit = { + super.afterEach() + spark.catalog("session").asInstanceOf[TestV2SessionCatalog].clearTables() + } + + test("saveAsTable and v2 table - table doesn't exist") { + val t1 = "tbl" + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") + df.write.format(v2Format).saveAsTable(t1) + checkAnswer(spark.table(t1), df) + } + + test("saveAsTable: v2 table - table exists") { + val t1 = "tbl" + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") + spark.sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format") + intercept[TableAlreadyExistsException] { + df.select("id", "data").write.format(v2Format).saveAsTable(t1) + } + df.write.format(v2Format).mode("append").saveAsTable(t1) + checkAnswer(spark.table(t1), df) + + // Check that appends are by name + df.select('data, 'id).write.format(v2Format).mode("append").saveAsTable(t1) + checkAnswer(spark.table(t1), df.union(df)) + } + + test("saveAsTable: v2 table - table overwrite and table doesn't exist") { + val t1 = "tbl" + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") + df.write.format(v2Format).mode("overwrite").saveAsTable(t1) + checkAnswer(spark.table(t1), df) + } + + test("saveAsTable: v2 table - table overwrite and table exists") { + val t1 = "tbl" + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") + spark.sql(s"CREATE TABLE $t1 USING $v2Format AS SELECT 'c', 'd'") + df.write.format(v2Format).mode("overwrite").saveAsTable(t1) + checkAnswer(spark.table(t1), df) + } + + test("saveAsTable: v2 table - ignore mode and table doesn't exist") { + val t1 = "tbl" + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") + df.write.format(v2Format).mode("ignore").saveAsTable(t1) + checkAnswer(spark.table(t1), df) + } + + test("saveAsTable: v2 table - ignore mode and table exists") { + val t1 = "tbl" + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") + spark.sql(s"CREATE TABLE $t1 USING $v2Format AS SELECT 'c', 'd'") + df.write.format(v2Format).mode("ignore").saveAsTable(t1) + checkAnswer(spark.table(t1), Seq(Row("c", "d"))) + } +} + +class InMemoryTableProvider extends TableProvider { + override def getTable(options: CaseInsensitiveStringMap): Table = { + throw new UnsupportedOperationException("D'oh!") + } +} + +/** A SessionCatalog that always loads an in memory Table, so we can test write code paths. */ +class TestV2SessionCatalog extends V2SessionCatalog { + + protected val tables: util.Map[Identifier, InMemoryTable] = + new ConcurrentHashMap[Identifier, InMemoryTable]() + + override def loadTable(ident: Identifier): Table = { + if (tables.containsKey(ident)) { + tables.get(ident) + } else { + // Table was created through the built-in catalog + val t = super.loadTable(ident) + val table = new InMemoryTable(t.name(), t.schema(), t.partitioning(), t.properties()) + tables.put(ident, table) + table + } + } + + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { + val t = new InMemoryTable(ident.name(), schema, partitions, properties) + tables.put(ident, t) + t + } + + def clearTables(): Unit = { + assert(!tables.isEmpty, "Tables were empty, maybe didn't use the session catalog code path?") + tables.keySet().asScala.foreach(super.dropTable) + tables.clear() + } +} From 9781ae85e6516b98e0c15a0f6231596f869ffd0c Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 9 Aug 2019 13:36:21 -0700 Subject: [PATCH 02/15] savE --- .../sql/catalyst/analysis/Analyzer.scala | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7e3f9b39aef31..09201697953f8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.objects._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, InsertIntoStatement} +import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, DescribeTableStatement, InsertIntoStatement} import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.catalyst.util.toPrettySQL @@ -171,6 +171,7 @@ class Analyzer( Batch("Resolution", fixedPoint, ResolveTableValuedFunctions :: ResolveAlterTable :: + ResolveDescribeTable :: ResolveInsertInto :: ResolveTables :: ResolveRelations :: @@ -323,7 +324,8 @@ class Analyzer( gid: Expression): Expression = { expr transform { case e: GroupingID => - if (e.groupByExprs.isEmpty || e.groupByExprs == groupByExprs) { + if (e.groupByExprs.isEmpty || + e.groupByExprs.map(_.canonicalized) == groupByExprs.map(_.canonicalized)) { Alias(gid, toPrettySQL(e))() } else { throw new AnalysisException( @@ -974,6 +976,21 @@ class Analyzer( Seq(TableChange.setProperty("location", newLoc))) } } + /** + * Resolve DESCRIBE TABLE statements that use a DSv2 catalog. + * + * This rule converts unresolved DESCRIBE TABLE statements to v2 when a v2 catalog is responsible + * for the table identifier. A v2 catalog is responsible for an identifier when the identifier + * has a catalog specified, like prod_catalog.db.table, or when a default v2 catalog is set and + * the table identifier does not include a catalog. + */ + object ResolveDescribeTable extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case describe @ DescribeTableStatement( + CatalogObjectIdentifier(Some(v2Catalog), ident), _, isExtended) => + DescribeTable(UnresolvedRelation(describe.tableName), isExtended) + } + } /** * Replaces [[UnresolvedAttribute]]s with concrete [[AttributeReference]]s from @@ -1167,6 +1184,8 @@ class Analyzer( // To resolve duplicate expression IDs for Join and Intersect case j @ Join(left, right, _, _, _) if !j.duplicateResolved => j.copy(right = dedupRight(left, right)) + // intersect/except will be rewritten to join at the begininng of optimizer. Here we need to + // deduplicate the right side plan, so that we won't produce an invalid self-join later. case i @ Intersect(left, right, _) if !i.duplicateResolved => i.copy(right = dedupRight(left, right)) case e @ Except(left, right, _) if !e.duplicateResolved => From 99ba64d4282655ac20a6b743d9fdfeaed012d283 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 9 Aug 2019 14:39:45 -0700 Subject: [PATCH 03/15] try --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index a6c6c43354232..7152204230584 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -361,6 +361,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { */ def insertInto(tableName: String): Unit = { import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, CatalogObjectIdentifier} + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ assertNotBucketed("insertInto") From f73feb8f5effd26b2c97e521eb9bf39b7a0754e9 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 12 Aug 2019 09:45:32 -0700 Subject: [PATCH 04/15] try this --- .../spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- .../datasources/v2/DataSourceV2Relation.scala | 12 ++++++++++++ .../scala/org/apache/spark/sql/DataFrameWriter.scala | 3 +-- .../execution/datasources/DataSourceResolution.scala | 8 +++++++- 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 09201697953f8..595af9669b763 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStat import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.catalyst.util.toPrettySQL -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, UnresolvedDataSourceV2Relation} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.sources.v2.Table @@ -653,7 +653,7 @@ class Analyzer( case u @ UnresolvedRelation(CatalogObjectIdentifier(maybeCatalog, ident)) => maybeCatalog.orElse(sessionCatalog) .flatMap(loadTable(_, ident)) - .map(DataSourceV2Relation.create) + .map(DataSourceV2Relation.unresolved) .getOrElse(u) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 9ae3dbbc45502..1c253c8261f8c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -27,6 +27,14 @@ import org.apache.spark.sql.sources.v2.reader.streaming.{Offset, SparkDataStream import org.apache.spark.sql.sources.v2.writer._ import org.apache.spark.sql.util.CaseInsensitiveStringMap +case class UnresolvedDataSourceV2Relation(table: Table) extends LeafNode with NamedRelation { + override lazy val resolved: Boolean = false + + override def name: String = table.name() + + override def output: Seq[Attribute] = Seq.empty +} + /** * A logical plan representing a data source v2 table. * @@ -106,6 +114,10 @@ object DataSourceV2Relation { def create(table: Table): DataSourceV2Relation = create(table, CaseInsensitiveStringMap.empty) + def unresolved(table: Table): UnresolvedDataSourceV2Relation = { + UnresolvedDataSourceV2Relation(table) + } + /** * This is used to transform data source v2 statistics to logical.Statistics. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 7152204230584..8be5ae6c4a5b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -28,14 +28,13 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation} import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister} -import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.TableCapability._ import org.apache.spark.sql.types.{IntegerType, StructType} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index a150a049f33e1..422b63007b321 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand} -import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2, DataSourceV2Relation} +import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2, DataSourceV2Relation, UnresolvedDataSourceV2Relation} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.TableProvider import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType} @@ -173,6 +173,12 @@ case class DataSourceResolution( // only top-level adds are supported using AlterTableAddColumnsCommand AlterTableAddColumnsCommand(table, newColumns.map(convertToStructField)) + case UnresolvedDataSourceV2Relation(CatalogTableAsV2(catalogTable)) => + UnresolvedCatalogRelation(catalogTable) + + case UnresolvedDataSourceV2Relation(v2Table) => + DataSourceV2Relation.create(v2Table) + case DataSourceV2Relation(CatalogTableAsV2(catalogTable), _, _) => UnresolvedCatalogRelation(catalogTable) From aac950320b9e0b9afc3921bc5935eefa44b37f27 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 12 Aug 2019 12:32:57 -0700 Subject: [PATCH 05/15] Alternative path --- .../sources/v2/internal/UnresolvedTable.java | 22 +++++++++++++++++++ .../sql/catalyst/analysis/Analyzer.scala | 4 ++-- .../datasources/v2/DataSourceV2Relation.scala | 20 ++++++----------- .../datasources/DataSourceResolution.scala | 8 +------ .../datasources/v2/V2SessionCatalog.scala | 3 ++- 5 files changed, 34 insertions(+), 23 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/internal/UnresolvedTable.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/internal/UnresolvedTable.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/internal/UnresolvedTable.java new file mode 100644 index 0000000000000..122a18fbc18b0 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/internal/UnresolvedTable.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.internal; + +import org.apache.spark.sql.sources.v2.Table; + +public interface UnresolvedTable extends Table {} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 595af9669b763..09201697953f8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStat import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.catalyst.util.toPrettySQL -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, UnresolvedDataSourceV2Relation} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.sources.v2.Table @@ -653,7 +653,7 @@ class Analyzer( case u @ UnresolvedRelation(CatalogObjectIdentifier(maybeCatalog, ident)) => maybeCatalog.orElse(sessionCatalog) .flatMap(loadTable(_, ident)) - .map(DataSourceV2Relation.unresolved) + .map(DataSourceV2Relation.create) .getOrElse(u) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 1c253c8261f8c..fc3556f786cfd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -22,19 +22,12 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.internal.UnresolvedTable import org.apache.spark.sql.sources.v2.reader.{Statistics => V2Statistics, _} import org.apache.spark.sql.sources.v2.reader.streaming.{Offset, SparkDataStream} import org.apache.spark.sql.sources.v2.writer._ import org.apache.spark.sql.util.CaseInsensitiveStringMap -case class UnresolvedDataSourceV2Relation(table: Table) extends LeafNode with NamedRelation { - override lazy val resolved: Boolean = false - - override def name: String = table.name() - - override def output: Seq[Attribute] = Seq.empty -} - /** * A logical plan representing a data source v2 table. * @@ -50,6 +43,8 @@ case class DataSourceV2Relation( import DataSourceV2Implicits._ + override lazy val resolved: Boolean = !table.isInstanceOf[UnresolvedTable] + override def name: String = table.name() override def skipSchemaResolution: Boolean = table.supports(TableCapability.ACCEPT_ANY_SCHEMA) @@ -108,16 +103,15 @@ case class StreamingDataSourceV2Relation( object DataSourceV2Relation { def create(table: Table, options: CaseInsensitiveStringMap): DataSourceV2Relation = { - val output = table.schema().toAttributes + val output = table match { + case _: UnresolvedTable => Nil + case _ => table.schema().toAttributes + } DataSourceV2Relation(table, output, options) } def create(table: Table): DataSourceV2Relation = create(table, CaseInsensitiveStringMap.empty) - def unresolved(table: Table): UnresolvedDataSourceV2Relation = { - UnresolvedDataSourceV2Relation(table) - } - /** * This is used to transform data source v2 statistics to logical.Statistics. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index 422b63007b321..a150a049f33e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand} -import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2, DataSourceV2Relation, UnresolvedDataSourceV2Relation} +import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2, DataSourceV2Relation} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.TableProvider import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType} @@ -173,12 +173,6 @@ case class DataSourceResolution( // only top-level adds are supported using AlterTableAddColumnsCommand AlterTableAddColumnsCommand(table, newColumns.map(convertToStructField)) - case UnresolvedDataSourceV2Relation(CatalogTableAsV2(catalogTable)) => - UnresolvedCatalogRelation(catalogTable) - - case UnresolvedDataSourceV2Relation(v2Table) => - DataSourceV2Relation.create(v2Table) - case DataSourceV2Relation(CatalogTableAsV2(catalogTable), _, _) => UnresolvedCatalogRelation(catalogTable) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index a3b8f28fc5c39..3f12e5f633611 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogT import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.sources.v2.{Table, TableCapability} +import org.apache.spark.sql.sources.v2.internal.UnresolvedTable import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -172,7 +173,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog { /** * An implementation of catalog v2 [[Table]] to expose v1 table metadata. */ -case class CatalogTableAsV2(v1Table: CatalogTable) extends Table { +case class CatalogTableAsV2(v1Table: CatalogTable) extends UnresolvedTable { implicit class IdentifierHelper(identifier: TableIdentifier) { def quoted: String = { identifier.database match { From 428e82a6318fa34f0b3202f8e3499ed2b4fb5928 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 12 Aug 2019 13:07:09 -0700 Subject: [PATCH 06/15] Try this way --- .../sql/catalog/v2/utils/CatalogV2Util.scala | 3 ++- .../org/apache/spark/sql/DataFrameWriter.scala | 11 ++++++----- .../datasources/DataSourceResolution.scala | 18 ++++++++++-------- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala index cd9bcc0f44f74..bbbcb20ca0b12 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, NamespaceChange, TableChange} import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType} -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException} import org.apache.spark.sql.sources.v2.Table import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} @@ -219,5 +219,6 @@ object CatalogV2Util { Option(catalog.asTableCatalog.loadTable(ident)) } catch { case _: NoSuchTableException => None + case _: NoSuchDatabaseException => None } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 8be5ae6c4a5b7..e6effa8b0b32f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -25,10 +25,10 @@ import org.apache.spark.annotation.Stable import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableCatalog} import org.apache.spark.sql.catalog.v2.expressions._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedRelation} +import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation} @@ -37,6 +37,7 @@ import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister} import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.TableCapability._ +import org.apache.spark.sql.sources.v2.internal.UnresolvedTable import org.apache.spark.sql.types.{IntegerType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -537,11 +538,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val tableOpt = try Option(catalog.loadTable(ident)) catch { case _: NoSuchTableException => None } - if (tableOpt.exists(_.isInstanceOf[CatalogTableAsV2])) { - return saveAsTable(TableIdentifier(ident.name(), ident.namespace().headOption)) - } val command = (mode, tableOpt) match { + case (_, Some(table: UnresolvedTable)) => + return saveAsTable(TableIdentifier(ident.name(), ident.namespace().headOption)) + case (SaveMode.Append, Some(table)) => AppendData.byName(DataSourceV2Relation.create(table), df.logicalPlan) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index a150a049f33e1..3c1c8cddc114b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -24,17 +24,17 @@ import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, LookupCatalog, TableCatalog} import org.apache.spark.sql.catalog.v2.expressions.Transform -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedAttribute} -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect} -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.sql._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand} +import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2, DataSourceV2Relation} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.TableProvider -import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils case class DataSourceResolution( @@ -173,8 +173,10 @@ case class DataSourceResolution( // only top-level adds are supported using AlterTableAddColumnsCommand AlterTableAddColumnsCommand(table, newColumns.map(convertToStructField)) - case DataSourceV2Relation(CatalogTableAsV2(catalogTable), _, _) => - UnresolvedCatalogRelation(catalogTable) + case DataSourceV2Relation(CatalogTableAsV2(ct), _, _) => + SubqueryAlias( + AliasIdentifier(ct.identifier.table, ct.identifier.database), + UnresolvedCatalogRelation(ct)) } From d9f478d45b6c684a285e33c54540f14b0f8a74da Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 12 Aug 2019 13:28:36 -0700 Subject: [PATCH 07/15] add docs --- .../sources/v2/internal/UnresolvedTable.java | 6 ++++ .../apache/spark/sql/DataFrameWriter.scala | 35 +++++++++---------- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/internal/UnresolvedTable.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/internal/UnresolvedTable.java index 122a18fbc18b0..1162d6be87775 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/internal/UnresolvedTable.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/internal/UnresolvedTable.java @@ -19,4 +19,10 @@ import org.apache.spark.sql.sources.v2.Table; +/** + * Internal interface used for table definitions, which we do not have complete information to + * resolve yet. This is primarily used by the `CatalogTableAsV2` wrapper returned by the + * V2SessionCatalog. When a `CatalogTableAsV2` is returned by the V2SessionCatalog, we defer + * planning to V1 data source code paths. + */ public interface UnresolvedTable extends Table {} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index e6effa8b0b32f..0b49cf24e6c7f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -25,10 +25,10 @@ import org.apache.spark.annotation.Stable import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableCatalog} import org.apache.spark.sql.catalog.v2.expressions._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedRelation} +import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation} @@ -252,19 +252,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { assertNotBucketed("save") val session = df.sparkSession - val useV1Sources = - session.sessionState.conf.useV1SourceWriterList.toLowerCase(Locale.ROOT).split(",") val cls = DataSource.lookupDataSource(source, session.sessionState.conf) - val shouldUseV1Source = cls.newInstance() match { - case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => true - case _ => useV1Sources.contains(cls.getCanonicalName.toLowerCase(Locale.ROOT)) - } + val canUseV2 = canUseV2Source(session, cls) && partitioningColumns.isEmpty // In Data Source V2 project, partitioning is still under development. // Here we fallback to V1 if partitioning columns are specified. // TODO(SPARK-26778): use V2 implementations when partitioning feature is supported. - if (!shouldUseV1Source && classOf[TableProvider].isAssignableFrom(cls) && - partitioningColumns.isEmpty) { + if (canUseV2) { val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] val sessionOptions = DataSourceV2Utils.extractSessionConfigs( provider, session.sessionState.conf) @@ -495,15 +489,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ val session = df.sparkSession - val useV1Sources = - session.sessionState.conf.useV1SourceWriterList.toLowerCase(Locale.ROOT).split(",") - val cls = DataSource.lookupDataSource(source, session.sessionState.conf) - val shouldUseV1Source = cls.newInstance() match { - case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => true - case _ => useV1Sources.contains(cls.getCanonicalName.toLowerCase(Locale.ROOT)) - } - - val canUseV2 = !shouldUseV1Source && classOf[TableProvider].isAssignableFrom(cls) + val provider = DataSource.lookupDataSource(source, session.sessionState.conf) + val canUseV2 = canUseV2Source(session, provider) val sessionCatalogOpt = session.sessionState.analyzer.sessionCatalog session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { @@ -848,6 +835,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private def modeForDSV2 = mode.getOrElse(SaveMode.Append) + private def canUseV2Source(session: SparkSession, providerClass: Class[_]): Boolean = { + val useV1Sources = + session.sessionState.conf.useV1SourceWriterList.toLowerCase(Locale.ROOT).split(",") + val shouldUseV1Source = providerClass.newInstance() match { + case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => true + case _ => useV1Sources.contains(providerClass.getCanonicalName.toLowerCase(Locale.ROOT)) + } + !shouldUseV1Source && classOf[TableProvider].isAssignableFrom(providerClass) + } + /////////////////////////////////////////////////////////////////////////////////////// // Builder pattern config options /////////////////////////////////////////////////////////////////////////////////////// From 0608d99021164636ba92699f7cb825c1e165ffac Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 12 Aug 2019 13:34:04 -0700 Subject: [PATCH 08/15] Button up --- .../sql/catalog/v2/utils/CatalogV2Util.scala | 3 ++- ...SourceV2DataFrameSessionCatalogSuite.scala | 22 ++++++++++++------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala index bbbcb20ca0b12..d5079202c8fee 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, NamespaceChange, TableChange} import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType} -import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException} import org.apache.spark.sql.sources.v2.Table import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} @@ -220,5 +220,6 @@ object CatalogV2Util { } catch { case _: NoSuchTableException => None case _: NoSuchDatabaseException => None + case _: NoSuchNamespaceException => None } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala index ed3b2beb6881f..e656ca5294e32 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{DataFrame, QueryTest} import org.apache.spark.sql.catalog.v2.Identifier import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException @@ -51,11 +51,17 @@ class DataSourceV2DataFrameSessionCatalogSuite spark.catalog("session").asInstanceOf[TestV2SessionCatalog].clearTables() } + private def verifyTable(tableName: String, expected: DataFrame): Unit = { + checkAnswer(spark.table(tableName), expected) + checkAnswer(sql(s"SELECT * FROM $tableName"), expected) + checkAnswer(sql(s"TABLE $tableName"), expected) + } + test("saveAsTable and v2 table - table doesn't exist") { val t1 = "tbl" val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") df.write.format(v2Format).saveAsTable(t1) - checkAnswer(spark.table(t1), df) + verifyTable(t1, df) } test("saveAsTable: v2 table - table exists") { @@ -66,18 +72,18 @@ class DataSourceV2DataFrameSessionCatalogSuite df.select("id", "data").write.format(v2Format).saveAsTable(t1) } df.write.format(v2Format).mode("append").saveAsTable(t1) - checkAnswer(spark.table(t1), df) + verifyTable(t1, df) // Check that appends are by name df.select('data, 'id).write.format(v2Format).mode("append").saveAsTable(t1) - checkAnswer(spark.table(t1), df.union(df)) + verifyTable(t1, df.union(df)) } test("saveAsTable: v2 table - table overwrite and table doesn't exist") { val t1 = "tbl" val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") df.write.format(v2Format).mode("overwrite").saveAsTable(t1) - checkAnswer(spark.table(t1), df) + verifyTable(t1, df) } test("saveAsTable: v2 table - table overwrite and table exists") { @@ -85,14 +91,14 @@ class DataSourceV2DataFrameSessionCatalogSuite val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") spark.sql(s"CREATE TABLE $t1 USING $v2Format AS SELECT 'c', 'd'") df.write.format(v2Format).mode("overwrite").saveAsTable(t1) - checkAnswer(spark.table(t1), df) + verifyTable(t1, df) } test("saveAsTable: v2 table - ignore mode and table doesn't exist") { val t1 = "tbl" val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") df.write.format(v2Format).mode("ignore").saveAsTable(t1) - checkAnswer(spark.table(t1), df) + verifyTable(t1, df) } test("saveAsTable: v2 table - ignore mode and table exists") { @@ -100,7 +106,7 @@ class DataSourceV2DataFrameSessionCatalogSuite val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") spark.sql(s"CREATE TABLE $t1 USING $v2Format AS SELECT 'c', 'd'") df.write.format(v2Format).mode("ignore").saveAsTable(t1) - checkAnswer(spark.table(t1), Seq(Row("c", "d"))) + verifyTable(t1, Seq(("c", "d")).toDF("id", "data")) } } From e489a166f11c7e804a74a3c75380b5f10e30a1cb Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 12 Aug 2019 13:39:42 -0700 Subject: [PATCH 09/15] save --- .../execution/datasources/DataSourceResolution.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index 3c1c8cddc114b..c43ed9804a110 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -26,16 +26,15 @@ import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, LookupCatalog import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedAttribute} -import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.logical.sql._ +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand} import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2, DataSourceV2Relation} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.TableProvider -import org.apache.spark.sql.types._ -import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType} case class DataSourceResolution( conf: SQLConf, From 96606730cc3f474c0ff066abf4cb588da6356286 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 13 Aug 2019 11:52:57 -0700 Subject: [PATCH 10/15] just let unresolved tables be --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 6 +++++- .../sql/execution/datasources/DataSourceResolution.scala | 8 +------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 09201697953f8..78382fc3069b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.sources.v2.Table +import org.apache.spark.sql.sources.v2.internal.UnresolvedTable import org.apache.spark.sql.types._ /** @@ -653,7 +654,10 @@ class Analyzer( case u @ UnresolvedRelation(CatalogObjectIdentifier(maybeCatalog, ident)) => maybeCatalog.orElse(sessionCatalog) .flatMap(loadTable(_, ident)) - .map(DataSourceV2Relation.create) + .map { + case unresolved: UnresolvedTable => u + case resolved => DataSourceV2Relation.create(resolved) + } .getOrElse(u) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index c43ed9804a110..c01fb338fad9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand} -import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2, DataSourceV2Relation} +import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2, CatalogTableDefinitionAsV2, CatalogViewAsV2, DataSourceV2Relation} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.TableProvider import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType} @@ -171,12 +171,6 @@ case class DataSourceResolution( if newColumns.forall(_.name.size == 1) => // only top-level adds are supported using AlterTableAddColumnsCommand AlterTableAddColumnsCommand(table, newColumns.map(convertToStructField)) - - case DataSourceV2Relation(CatalogTableAsV2(ct), _, _) => - SubqueryAlias( - AliasIdentifier(ct.identifier.table, ct.identifier.database), - UnresolvedCatalogRelation(ct)) - } object V1WriteProvider { From 762f8739828f2ad0c5cadcf790666d2ab9e07fa3 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 13 Aug 2019 11:58:22 -0700 Subject: [PATCH 11/15] remove changes --- .../execution/datasources/v2/DataSourceV2Relation.scala | 8 +------- .../sql/execution/datasources/DataSourceResolution.scala | 9 ++++----- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index fc3556f786cfd..9ae3dbbc45502 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.sources.v2._ -import org.apache.spark.sql.sources.v2.internal.UnresolvedTable import org.apache.spark.sql.sources.v2.reader.{Statistics => V2Statistics, _} import org.apache.spark.sql.sources.v2.reader.streaming.{Offset, SparkDataStream} import org.apache.spark.sql.sources.v2.writer._ @@ -43,8 +42,6 @@ case class DataSourceV2Relation( import DataSourceV2Implicits._ - override lazy val resolved: Boolean = !table.isInstanceOf[UnresolvedTable] - override def name: String = table.name() override def skipSchemaResolution: Boolean = table.supports(TableCapability.ACCEPT_ANY_SCHEMA) @@ -103,10 +100,7 @@ case class StreamingDataSourceV2Relation( object DataSourceV2Relation { def create(table: Table, options: CaseInsensitiveStringMap): DataSourceV2Relation = { - val output = table match { - case _: UnresolvedTable => Nil - case _ => table.schema().toAttributes - } + val output = table.schema().toAttributes DataSourceV2Relation(table, output, options) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index c01fb338fad9a..49cf688742591 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -24,14 +24,13 @@ import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, LookupCatalog, TableCatalog} import org.apache.spark.sql.catalog.v2.expressions.Transform -import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedAttribute} -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, SubqueryAlias} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect} import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand} -import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2, CatalogTableDefinitionAsV2, CatalogViewAsV2, DataSourceV2Relation} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.TableProvider import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType} From 06cf1f92f31a61bbe9e043fcd43946bbfb60dbff Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 13 Aug 2019 14:43:48 -0700 Subject: [PATCH 12/15] passes new tests as well as old ones --- ...SourceV2DataFrameSessionCatalogSuite.scala | 91 +++++++++++++++++-- 1 file changed, 83 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala index e656ca5294e32..af67c671406b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala @@ -24,9 +24,10 @@ import scala.collection.JavaConverters._ import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode} import org.apache.spark.sql.catalog.v2.Identifier import org.apache.spark.sql.catalog.v2.expressions.Transform +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog import org.apache.spark.sql.internal.SQLConf @@ -54,16 +55,50 @@ class DataSourceV2DataFrameSessionCatalogSuite private def verifyTable(tableName: String, expected: DataFrame): Unit = { checkAnswer(spark.table(tableName), expected) checkAnswer(sql(s"SELECT * FROM $tableName"), expected) + checkAnswer(sql(s"SELECT * FROM default.$tableName"), expected) checkAnswer(sql(s"TABLE $tableName"), expected) } - test("saveAsTable and v2 table - table doesn't exist") { + test("saveAsTable: v2 table - table doesn't exist and default mode (ErrorIfExists)") { val t1 = "tbl" val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") df.write.format(v2Format).saveAsTable(t1) verifyTable(t1, df) } + test("saveAsTable: v2 table - table doesn't exist and append mode") { + val t1 = "tbl" + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") + df.write.format(v2Format).mode("append").saveAsTable(t1) + verifyTable(t1, df) + } + + test("saveAsTable: Append mode should not fail if the table not exists " + + "but a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + spark.range(10).createTempView("same_name") + spark.range(20).write.format(v2Format).mode(SaveMode.Append).saveAsTable("same_name") + assert( + spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default")))) + } + } + } + + test("saveAsTable: Append mode should not fail if the table already exists " + + "and a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + val format = spark.sessionState.conf.defaultDataSourceName + sql(s"CREATE TABLE same_name(id LONG) USING $format") + spark.range(10).createTempView("same_name") + spark.range(20).write.format(v2Format).mode(SaveMode.Append).saveAsTable("same_name") + checkAnswer(spark.table("same_name"), spark.range(10).toDF()) + checkAnswer(spark.table("default.same_name"), spark.range(20).toDF()) + } + } + } + test("saveAsTable: v2 table - table exists") { val t1 = "tbl" val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") @@ -94,6 +129,32 @@ class DataSourceV2DataFrameSessionCatalogSuite verifyTable(t1, df) } + test("saveAsTable: Overwrite mode should not drop the temp view if the table not exists " + + "but a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + spark.range(10).createTempView("same_name") + spark.range(20).write.format(v2Format).mode(SaveMode.Overwrite).saveAsTable("same_name") + assert(spark.sessionState.catalog.getTempView("same_name").isDefined) + assert( + spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default")))) + } + } + } + + test("saveAsTable with mode Overwrite should not fail if the table already exists " + + "and a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + sql(s"CREATE TABLE same_name(id LONG) USING $v2Format") + spark.range(10).createTempView("same_name") + spark.range(20).write.format(v2Format).mode(SaveMode.Overwrite).saveAsTable("same_name") + checkAnswer(spark.table("same_name"), spark.range(10).toDF()) + checkAnswer(spark.table("default.same_name"), spark.range(20).toDF()) + } + } + } + test("saveAsTable: v2 table - ignore mode and table doesn't exist") { val t1 = "tbl" val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") @@ -116,20 +177,32 @@ class InMemoryTableProvider extends TableProvider { } } +/** A second fake format to test behavior with format changes. */ +class InMemoryTableProvider2 extends InMemoryTableProvider + /** A SessionCatalog that always loads an in memory Table, so we can test write code paths. */ class TestV2SessionCatalog extends V2SessionCatalog { protected val tables: util.Map[Identifier, InMemoryTable] = new ConcurrentHashMap[Identifier, InMemoryTable]() + private def fullIdentifier(ident: Identifier): Identifier = { + if (ident.namespace().isEmpty) { + Identifier.of(Array("default"), ident.name()) + } else { + ident + } + } + override def loadTable(ident: Identifier): Table = { - if (tables.containsKey(ident)) { - tables.get(ident) + val fullIdent = fullIdentifier(ident) + if (tables.containsKey(fullIdent)) { + tables.get(fullIdent) } else { // Table was created through the built-in catalog - val t = super.loadTable(ident) + val t = super.loadTable(fullIdent) val table = new InMemoryTable(t.name(), t.schema(), t.partitioning(), t.properties()) - tables.put(ident, table) + tables.put(fullIdent, table) table } } @@ -139,8 +212,10 @@ class TestV2SessionCatalog extends V2SessionCatalog { schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): Table = { - val t = new InMemoryTable(ident.name(), schema, partitions, properties) - tables.put(ident, t) + val created = super.createTable(ident, schema, partitions, properties) + val t = new InMemoryTable(created.name(), schema, partitions, properties) + val fullIdent = fullIdentifier(ident) + tables.put(fullIdent, t) t } From 0bd93ae7ccc67068c65e6ba83451fcfc0c3e589e Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 13 Aug 2019 21:32:28 -0700 Subject: [PATCH 13/15] fix test --- .../v2/DataSourceV2DataFrameSessionCatalogSuite.scala | 1 + .../spark/sql/sources/v2/DataSourceV2SQLSuite.scala | 9 +++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala index af67c671406b2..bd895bbd4d9ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala @@ -50,6 +50,7 @@ class DataSourceV2DataFrameSessionCatalogSuite override def afterEach(): Unit = { super.afterEach() spark.catalog("session").asInstanceOf[TestV2SessionCatalog].clearTables() + spark.conf.set(SQLConf.V2_SESSION_CATALOG.key, classOf[V2SessionCatalog].getName) } private def verifyTable(tableName: String, expected: DataFrame): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index 9ae51d577b562..0223d3ee98a83 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode, V2_SESSION_CATALOG} +import org.apache.spark.sql.sources.v2.internal.UnresolvedTable import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{ArrayType, DoubleType, IntegerType, LongType, MapType, Metadata, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -493,8 +494,12 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn sparkSession.sql(s"CREATE TABLE table_name USING parquet AS SELECT id, data FROM source") - // use the catalog name to force loading with the v2 catalog - checkAnswer(sparkSession.sql(s"TABLE session.table_name"), sparkSession.table("source")) + checkAnswer(sparkSession.sql(s"TABLE default.table_name"), sparkSession.table("source")) + // The fact that the following line doesn't throw an exception means, the session catalog + // can load the table. + val t = sparkSession.catalog("session").asTableCatalog + .loadTable(Identifier.of(Array.empty, "table_name")) + assert(t.isInstanceOf[UnresolvedTable], "V1 table wasn't returned as an unresolved table") } test("DropTable: basic") { From 99ae1567483d6f77987c136d073e84d49d6407b0 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 13 Aug 2019 21:47:26 -0700 Subject: [PATCH 14/15] rename CatalogTableAsV2 to UnresolvedTable --- .../sources/v2/internal/UnresolvedTable.java | 28 ------ .../sources/v2/internal/UnresolvedTable.scala | 89 +++++++++++++++++++ .../datasources/v2/V2SessionCatalog.scala | 64 +------------ ...SourceV2DataFrameSessionCatalogSuite.scala | 3 - 4 files changed, 91 insertions(+), 93 deletions(-) delete mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/internal/UnresolvedTable.java create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/internal/UnresolvedTable.scala diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/internal/UnresolvedTable.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/internal/UnresolvedTable.java deleted file mode 100644 index 1162d6be87775..0000000000000 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/internal/UnresolvedTable.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.internal; - -import org.apache.spark.sql.sources.v2.Table; - -/** - * Internal interface used for table definitions, which we do not have complete information to - * resolve yet. This is primarily used by the `CatalogTableAsV2` wrapper returned by the - * V2SessionCatalog. When a `CatalogTableAsV2` is returned by the V2SessionCatalog, we defer - * planning to V1 data source code paths. - */ -public interface UnresolvedTable extends Table {} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/internal/UnresolvedTable.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/internal/UnresolvedTable.scala new file mode 100644 index 0000000000000..f188d307ce910 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/internal/UnresolvedTable.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.internal + +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.catalog.v2.expressions.{LogicalExpressions, Transform} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.sources.v2.{Table, TableCapability} +import org.apache.spark.sql.types.StructType + +/** + * An implementation of catalog v2 [[Table]] to expose v1 table metadata. + */ +case class UnresolvedTable(v1Table: CatalogTable) extends Table { + implicit class IdentifierHelper(identifier: TableIdentifier) { + def quoted: String = { + identifier.database match { + case Some(db) => + Seq(db, identifier.table).map(quote).mkString(".") + case _ => + quote(identifier.table) + + } + } + + private def quote(part: String): String = { + if (part.contains(".") || part.contains("`")) { + s"`${part.replace("`", "``")}`" + } else { + part + } + } + } + + def catalogTable: CatalogTable = v1Table + + lazy val options: Map[String, String] = { + v1Table.storage.locationUri match { + case Some(uri) => + v1Table.storage.properties + ("path" -> uri.toString) + case _ => + v1Table.storage.properties + } + } + + override lazy val properties: util.Map[String, String] = v1Table.properties.asJava + + override lazy val schema: StructType = v1Table.schema + + override lazy val partitioning: Array[Transform] = { + val partitions = new mutable.ArrayBuffer[Transform]() + + v1Table.partitionColumnNames.foreach { col => + partitions += LogicalExpressions.identity(col) + } + + v1Table.bucketSpec.foreach { spec => + partitions += LogicalExpressions.bucket(spec.numBuckets, spec.bucketColumnNames: _*) + } + + partitions.toArray + } + + override def name: String = v1Table.identifier.quoted + + override def capabilities: util.Set[TableCapability] = new util.HashSet[TableCapability]() + + override def toString: String = s"UnresolvedTable($name)" +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 3f12e5f633611..ea6a701a9a036 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchT import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog} import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.internal.SessionState -import org.apache.spark.sql.sources.v2.{Table, TableCapability} +import org.apache.spark.sql.sources.v2.Table import org.apache.spark.sql.sources.v2.internal.UnresolvedTable import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -71,7 +71,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog { throw new NoSuchTableException(ident) } - CatalogTableAsV2(catalogTable) + UnresolvedTable(catalogTable) } override def invalidateTable(ident: Identifier): Unit = { @@ -170,66 +170,6 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog { override def toString: String = s"V2SessionCatalog($name)" } -/** - * An implementation of catalog v2 [[Table]] to expose v1 table metadata. - */ -case class CatalogTableAsV2(v1Table: CatalogTable) extends UnresolvedTable { - implicit class IdentifierHelper(identifier: TableIdentifier) { - def quoted: String = { - identifier.database match { - case Some(db) => - Seq(db, identifier.table).map(quote).mkString(".") - case _ => - quote(identifier.table) - - } - } - - private def quote(part: String): String = { - if (part.contains(".") || part.contains("`")) { - s"`${part.replace("`", "``")}`" - } else { - part - } - } - } - - def catalogTable: CatalogTable = v1Table - - lazy val options: Map[String, String] = { - v1Table.storage.locationUri match { - case Some(uri) => - v1Table.storage.properties + ("path" -> uri.toString) - case _ => - v1Table.storage.properties - } - } - - override lazy val properties: util.Map[String, String] = v1Table.properties.asJava - - override lazy val schema: StructType = v1Table.schema - - override lazy val partitioning: Array[Transform] = { - val partitions = new mutable.ArrayBuffer[Transform]() - - v1Table.partitionColumnNames.foreach { col => - partitions += LogicalExpressions.identity(col) - } - - v1Table.bucketSpec.foreach { spec => - partitions += LogicalExpressions.bucket(spec.numBuckets, spec.bucketColumnNames: _*) - } - - partitions.toArray - } - - override def name: String = v1Table.identifier.quoted - - override def capabilities: util.Set[TableCapability] = new util.HashSet[TableCapability]() - - override def toString: String = s"CatalogTableAsV2($name)" -} - private[sql] object V2SessionCatalog { /** * Convert v2 Transforms to v1 partition columns and an optional bucket spec. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala index bd895bbd4d9ed..2ef2df3345e8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala @@ -178,9 +178,6 @@ class InMemoryTableProvider extends TableProvider { } } -/** A second fake format to test behavior with format changes. */ -class InMemoryTableProvider2 extends InMemoryTableProvider - /** A SessionCatalog that always loads an in memory Table, so we can test write code paths. */ class TestV2SessionCatalog extends V2SessionCatalog { From 673d95a58fb1b80618c9d626acc8d1a64dd61d51 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 13 Aug 2019 22:41:45 -0700 Subject: [PATCH 15/15] Update UnresolvedTable.scala --- .../apache/spark/sql/sources/v2/internal/UnresolvedTable.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/internal/UnresolvedTable.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/internal/UnresolvedTable.scala index f188d307ce910..8813d0ab840d0 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/internal/UnresolvedTable.scala +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/internal/UnresolvedTable.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.sources.v2.{Table, TableCapability} import org.apache.spark.sql.types.StructType /** - * An implementation of catalog v2 [[Table]] to expose v1 table metadata. + * An implementation of catalog v2 `Table` to expose v1 table metadata. */ case class UnresolvedTable(v1Table: CatalogTable) extends Table { implicit class IdentifierHelper(identifier: TableIdentifier) {