diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala index c6f52d676422c..b917a3d40dbe5 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala @@ -16,8 +16,11 @@ */ package org.apache.spark.sql.v2.avro +import java.util + import org.apache.spark.sql.avro.AvroFileFormat -import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.connector.catalog.{Identifier, Table} +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 import org.apache.spark.sql.types.StructType @@ -32,12 +35,24 @@ class AvroDataSourceV2 extends FileDataSourceV2 { override def getTable(options: CaseInsensitiveStringMap): Table = { val paths = getPaths(options) val tableName = getTableName(paths) - AvroTable(tableName, sparkSession, options, paths, None, fallbackFileFormat) + AvroTable(tableName, sparkSession, options, paths, None, None, fallbackFileFormat) } override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { val paths = getPaths(options) val tableName = getTableName(paths) - AvroTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat) + AvroTable(tableName, sparkSession, options, paths, Some(schema), None, fallbackFileFormat) + } + + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { + val options = toOptions(ident) + val paths = getPaths(options) + val tableName = getTableName(paths) + AvroTable( + tableName, sparkSession, options, paths, Some(schema), Some(partitions), fallbackFileFormat) } } diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala index 765e5727d944a..7dc5f374e5b1d 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession import org.apache.spark.sql.avro.AvroUtils +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.write.WriteBuilder import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.FileTable @@ -34,8 +35,9 @@ case class AvroTable( options: CaseInsensitiveStringMap, paths: Seq[String], userSpecifiedSchema: Option[StructType], + userSpecifiedPartitioning: Option[Array[Transform]], fallbackFileFormat: Class[_ <: FileFormat]) - extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + extends FileTable(sparkSession, options, paths, userSpecifiedSchema, userSpecifiedPartitioning) { override def newScanBuilder(options: CaseInsensitiveStringMap): AvroScanBuilder = new AvroScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportCreateTable.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportCreateTable.java new file mode 100644 index 0000000000000..94ae6c456f69f --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportCreateTable.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.StructType; + +import java.util.Map; + +public interface SupportCreateTable { + + /** + * Load table metadata by {@link Identifier identifier} from the catalog. + *

+ * If the catalog supports views and contains a view for the identifier and not a table, this + * must throw {@link NoSuchTableException}. + * + * @param ident a table identifier + * @return the table's metadata + * @throws NoSuchTableException If the table doesn't exist or is a view + */ + Table loadTable(Identifier ident) throws NoSuchTableException; + + /** + * Test whether a table exists using an {@link Identifier identifier} from the catalog. + *

+ * If the catalog supports views and contains a view for the identifier and not a table, this + * must return false. + * + * @param ident a table identifier + * @return true if the table exists, false otherwise + */ + default boolean tableExists(Identifier ident) { + try { + return loadTable(ident) != null; + } catch (NoSuchTableException e) { + return false; + } + } + + /** + * Create a table in the catalog. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @param partitions transforms to use for partitioning data in the table + * @param properties a string map of table properties + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table or view already exists for the identifier + * @throws UnsupportedOperationException If a requested partition transform is not supported + * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) + */ + Table createTable( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException; + + /** + * Drop a table in the catalog. + *

+ * If the catalog supports views and contains a view for the identifier and not a table, this + * must not drop the view and must return false. + * + * @param ident a table identifier + * @return true if a table was deleted, false if no table exists for the identifier + */ + boolean dropTable(Identifier ident); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportIdentifierTranslation.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportIdentifierTranslation.java new file mode 100644 index 0000000000000..585c96fed90d5 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportIdentifierTranslation.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public interface SupportIdentifierTranslation extends SupportCreateTable, TableProvider { + + Identifier fromOptions(CaseInsensitiveStringMap options); + + CaseInsensitiveStringMap toOptions(Identifier identifier); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index b72f7845a6ea5..79d52d5bea41c 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -36,7 +36,7 @@ * insensitive. */ @Experimental -public interface TableCatalog extends CatalogPlugin { +public interface TableCatalog extends CatalogPlugin, SupportCreateTable { /** * List the tables in a namespace from the catalog. *

@@ -71,41 +71,6 @@ public interface TableCatalog extends CatalogPlugin { default void invalidateTable(Identifier ident) { } - /** - * Test whether a table exists using an {@link Identifier identifier} from the catalog. - *

- * If the catalog supports views and contains a view for the identifier and not a table, this - * must return false. - * - * @param ident a table identifier - * @return true if the table exists, false otherwise - */ - default boolean tableExists(Identifier ident) { - try { - return loadTable(ident) != null; - } catch (NoSuchTableException e) { - return false; - } - } - - /** - * Create a table in the catalog. - * - * @param ident a table identifier - * @param schema the schema of the new table, as a struct type - * @param partitions transforms to use for partitioning data in the table - * @param properties a string map of table properties - * @return metadata for the new table - * @throws TableAlreadyExistsException If a table or view already exists for the identifier - * @throws UnsupportedOperationException If a requested partition transform is not supported - * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) - */ - Table createTable( - Identifier ident, - StructType schema, - Transform[] partitions, - Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException; - /** * Apply a set of {@link TableChange changes} to a table in the catalog. *

@@ -125,17 +90,6 @@ Table alterTable( Identifier ident, TableChange... changes) throws NoSuchTableException; - /** - * Drop a table in the catalog. - *

- * If the catalog supports views and contains a view for the identifier and not a table, this - * must not drop the view and must return false. - * - * @param ident a table identifier - * @return true if a table was deleted, false if no table exists for the identifier - */ - boolean dropTable(Identifier ident); - /** * Renames a table in the catalog. *

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 2b6378e7c7268..381d04ef6da1b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.connector.catalog.{Identifier, SupportsNamespaces, TableCatalog, TableChange} +import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types._ @@ -427,7 +427,7 @@ case class CreateV2Table( * Create a new table from a select query with a v2 catalog. */ case class CreateTableAsSelect( - catalog: TableCatalog, + catalog: SupportCreateTable, tableName: Identifier, partitioning: Seq[Transform], query: LogicalPlan, @@ -477,7 +477,7 @@ case class ReplaceTable( * If the table does not exist, and orCreate is false, then an exception will be thrown. */ case class ReplaceTableAsSelect( - catalog: TableCatalog, + catalog: SupportCreateTable, tableName: Identifier, partitioning: Seq[Transform], query: LogicalPlan, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 58acfb836b305..fb5e9bc38a8d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -26,12 +26,12 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical.sql.InsertIntoStatement import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, SupportsWrite, TableCatalog, TableProvider, V1Table} +import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.TableCapability._ -import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, LiteralValue, Transform} +import org.apache.spark.sql.connector.expressions._ import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation} @@ -253,11 +253,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val maybeV2Provider = lookupV2Provider() if (maybeV2Provider.isDefined) { - if (partitioningColumns.nonEmpty) { - throw new AnalysisException( - "Cannot write data to TableProvider implementation if partition columns are specified.") - } - val provider = maybeV2Provider.get val sessionOptions = DataSourceV2Utils.extractSessionConfigs( provider, df.sparkSession.sessionState.conf) @@ -265,31 +260,73 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val dsOptions = new CaseInsensitiveStringMap(options.asJava) import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ - provider.getTable(dsOptions) match { - case table: SupportsWrite if table.supports(BATCH_WRITE) => - lazy val relation = DataSourceV2Relation.create(table, dsOptions) - modeForDSV2 match { - case SaveMode.Append => - runCommand(df.sparkSession, "save") { - AppendData.byName(relation, df.logicalPlan) - } + provider match { + case canCreateTable: SupportIdentifierTranslation => + val ident = canCreateTable.fromOptions(dsOptions) + val tableOpt = try Option(canCreateTable.loadTable(ident)) catch { + case _: NoSuchTableException => None + } + val command = (modeForDSV1, tableOpt) match { + case (SaveMode.Append, Some(table)) => + AppendData.byName(DataSourceV2Relation.create(table), df.logicalPlan) + + case (SaveMode.Overwrite, _) => + ReplaceTableAsSelect( + canCreateTable, + ident, + getV2Transforms(), + df.queryExecution.analyzed, + Map.empty, + extraOptions.toMap, + orCreate = true) // Create the table if it doesn't exist + + case (other, _) => + CreateTableAsSelect( + canCreateTable, + ident, + getV2Transforms(), + df.queryExecution.analyzed, + Map.empty, + extraOptions.toMap, + ignoreIfExists = other == SaveMode.Ignore) + } - case SaveMode.Overwrite if table.supportsAny(TRUNCATE, OVERWRITE_BY_FILTER) => - // truncate the table - runCommand(df.sparkSession, "save") { - OverwriteByExpression.byName(relation, df.logicalPlan, Literal(true)) - } + runCommand(df.sparkSession, "saveAsTable") { + command + } - case other => - throw new AnalysisException(s"TableProvider implementation $source cannot be " + - s"written with $other mode, please use Append or Overwrite " + - "modes instead.") + case justProvider: TableProvider => + if (partitioningColumns.nonEmpty) { + throw new AnalysisException("Cannot write data to TableProvider implementation " + + "if partition columns are specified.") } - // Streaming also uses the data source V2 API. So it may be that the data source implements - // v2, but has no v2 implementation for batch writes. In that case, we fall back to saving - // as though it's a V1 source. - case _ => saveToV1Source() + justProvider.getTable(dsOptions) match { + case table: SupportsWrite if table.supports(BATCH_WRITE) => + lazy val relation = DataSourceV2Relation.create(table, dsOptions) + modeForDSV2 match { + case SaveMode.Append => + runCommand(df.sparkSession, "save") { + AppendData.byName(relation, df.logicalPlan) + } + + case SaveMode.Overwrite if table.supportsAny(TRUNCATE, OVERWRITE_BY_FILTER) => + // truncate the table + runCommand(df.sparkSession, "save") { + OverwriteByExpression.byName(relation, df.logicalPlan, Literal(true)) + } + + case other => + throw new AnalysisException(s"TableProvider implementation $source cannot be " + + s"written with $other mode, please use Append or Overwrite " + + "modes instead.") + } + + // Streaming also uses the data source V2 API. So it may be that the data source + // implements v2, but has no v2 implementation for batch writes. In that case, we fall + // back to saving as though it's a V1 source. + case _ => saveToV1Source() + } } } else { saveToV1Source() @@ -442,6 +479,20 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } } + private def getV2Transforms(): Seq[Transform] = { + val partitioning = partitioningColumns.map { colNames => + colNames.map(name => IdentityTransform(FieldReference(name))) + }.getOrElse(Seq.empty[Transform]) + val bucketing = getBucketSpec.map { spec => + if (spec.sortColumnNames.nonEmpty) { + throw new IllegalArgumentException("Sorting in bucketing is not supported for V2 tables") + } + val cols = spec.bucketColumnNames + Seq(BucketTransform(LiteralValue(numBuckets.get, IntegerType), cols.map(FieldReference(_)))) + }.getOrElse(Seq.empty[Transform]) + partitioning ++ bucketing + } + /** * Saves the content of the `DataFrame` as the specified table. * @@ -508,14 +559,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private def saveAsTable(catalog: TableCatalog, ident: Identifier, mode: SaveMode): Unit = { - val partitioning = partitioningColumns.map { colNames => - colNames.map(name => IdentityTransform(FieldReference(name))) - }.getOrElse(Seq.empty[Transform]) - val bucketing = bucketColumnNames.map { cols => - Seq(BucketTransform(LiteralValue(numBuckets.get, IntegerType), cols.map(FieldReference(_)))) - }.getOrElse(Seq.empty[Transform]) - val partitionTransforms = partitioning ++ bucketing - val tableOpt = try Option(catalog.loadTable(ident)) catch { case _: NoSuchTableException => None } @@ -536,7 +579,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ReplaceTableAsSelect( catalog, ident, - partitionTransforms, + getV2Transforms(), df.queryExecution.analyzed, Map("provider" -> source) ++ getLocationIfExists, extraOptions.toMap, @@ -549,7 +592,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { CreateTableAsSelect( catalog, ident, - partitionTransforms, + getV2Transforms(), df.queryExecution.analyzed, Map("provider" -> source) ++ getLocationIfExists, extraOptions.toMap, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala index e0091293d1669..b9ec3cfd89be2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala @@ -14,13 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.spark.sql.execution.datasources.v2 +import scala.collection.JavaConverters._ + import com.fasterxml.jackson.databind.ObjectMapper import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.catalog.TableProvider +import org.apache.spark.sql.connector.catalog.{Identifier, SupportIdentifierTranslation, Table, TableProvider} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -29,7 +32,7 @@ import org.apache.spark.util.Utils /** * A base interface for data source v2 implementations of the built-in file-based data sources. */ -trait FileDataSourceV2 extends TableProvider with DataSourceRegister { +trait FileDataSourceV2 extends SupportIdentifierTranslation with DataSourceRegister { /** * Returns a V1 [[FileFormat]] class of the same file data source. * This is a solution for the following cases: @@ -59,4 +62,27 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister { val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toString } + + override def fromOptions(options: CaseInsensitiveStringMap): Identifier = { + val paths = getPaths(options) + if (paths.length > 1) { + throw new IllegalArgumentException("Cannot create a table in multiple locations") + } else if (paths.isEmpty) { + throw new IllegalArgumentException("Didn't specify the 'path' for file based table") + } + Identifier.of(Array.empty, paths.head) + } + + override def toOptions(identifier: Identifier): CaseInsensitiveStringMap = { + new CaseInsensitiveStringMap(Map("path" -> identifier.name()).asJava) + } + + override def dropTable(ident: Identifier): Boolean = { + val path = ident.name() + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + fs.delete(hdfsPath, true /* recursive */) + } + + override def loadTable(ident: Identifier): Table = getTable(toOptions(ident)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index 5329e09916bd6..a077bbf291e00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -36,7 +36,8 @@ abstract class FileTable( sparkSession: SparkSession, options: CaseInsensitiveStringMap, paths: Seq[String], - userSpecifiedSchema: Option[StructType]) + userSpecifiedSchema: Option[StructType], + userSpecifiedPartitioning: Option[Array[Transform]]) extends Table with SupportsRead with SupportsWrite { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ @@ -102,7 +103,8 @@ abstract class FileTable( StructType(fields) } - override def partitioning: Array[Transform] = fileIndex.partitionSchema.asTransforms + override def partitioning: Array[Transform] = userSpecifiedPartitioning.getOrElse( + fileIndex.partitionSchema.asTransforms) override def properties: util.Map[String, String] = options.asCaseSensitiveMap diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 9f4392da6ab4d..c61b6b635cb51 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -30,9 +30,9 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, StagingTableCatalog, SupportsWrite, TableCatalog} +import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder, WriterCommitMessage} +import org.apache.spark.sql.connector.write._ import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.sources.{AlwaysTrue, Filter} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -59,7 +59,7 @@ case class WriteToDataSourceV2(batchWrite: BatchWrite, query: LogicalPlan) * CreateTableAsSelectStagingExec. */ case class CreateTableAsSelectExec( - catalog: TableCatalog, + catalog: SupportCreateTable, ident: Identifier, partitioning: Seq[Transform], plan: LogicalPlan, @@ -148,7 +148,7 @@ case class AtomicCreateTableAsSelectExec( * ReplaceTableAsSelectStagingExec. */ case class ReplaceTableAsSelectExec( - catalog: TableCatalog, + catalog: SupportCreateTable, ident: Identifier, partitioning: Seq[Transform], plan: LogicalPlan, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala index 1f99d4282f6da..cf448f2702d9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala @@ -16,7 +16,10 @@ */ package org.apache.spark.sql.execution.datasources.v2.csv -import org.apache.spark.sql.connector.catalog.Table +import java.util + +import org.apache.spark.sql.connector.catalog.{Identifier, Table} +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 @@ -32,12 +35,24 @@ class CSVDataSourceV2 extends FileDataSourceV2 { override def getTable(options: CaseInsensitiveStringMap): Table = { val paths = getPaths(options) val tableName = getTableName(paths) - CSVTable(tableName, sparkSession, options, paths, None, fallbackFileFormat) + CSVTable(tableName, sparkSession, options, paths, None, None, fallbackFileFormat) } override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { val paths = getPaths(options) val tableName = getTableName(paths) - CSVTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat) + CSVTable(tableName, sparkSession, options, paths, Some(schema), None, fallbackFileFormat) + } + + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { + val options = toOptions(ident) + val paths = getPaths(options) + val tableName = getTableName(paths) + CSVTable(tableName, sparkSession, options, paths + , Some(schema), Some(partitions), fallbackFileFormat) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala index 04beee0e3b0f2..5f58e608757a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.csv.CSVOptions +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.write.WriteBuilder import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.csv.CSVDataSource @@ -35,8 +36,9 @@ case class CSVTable( options: CaseInsensitiveStringMap, paths: Seq[String], userSpecifiedSchema: Option[StructType], + userSpecifiedPartitioning: Option[Array[Transform]], fallbackFileFormat: Class[_ <: FileFormat]) - extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + extends FileTable(sparkSession, options, paths, userSpecifiedSchema, userSpecifiedPartitioning) { override def newScanBuilder(options: CaseInsensitiveStringMap): CSVScanBuilder = CSVScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala index 7a0949e586cd8..4d7bdb40d96c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala @@ -16,7 +16,10 @@ */ package org.apache.spark.sql.execution.datasources.v2.json -import org.apache.spark.sql.connector.catalog.Table +import java.util + +import org.apache.spark.sql.connector.catalog.{Identifier, Table} +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.v2._ @@ -32,13 +35,25 @@ class JsonDataSourceV2 extends FileDataSourceV2 { override def getTable(options: CaseInsensitiveStringMap): Table = { val paths = getPaths(options) val tableName = getTableName(paths) - JsonTable(tableName, sparkSession, options, paths, None, fallbackFileFormat) + JsonTable(tableName, sparkSession, options, paths, None, None, fallbackFileFormat) } override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { val paths = getPaths(options) val tableName = getTableName(paths) - JsonTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat) + JsonTable(tableName, sparkSession, options, paths, Some(schema), None, fallbackFileFormat) + } + + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { + val options = toOptions(ident) + val paths = getPaths(options) + val tableName = getTableName(paths) + JsonTable( + tableName, sparkSession, options, paths, Some(schema), Some(partitions), fallbackFileFormat) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala index 9bb615528fc5d..9e3ba5756995e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.json.JSONOptionsInRead +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.write.WriteBuilder import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.json.JsonDataSource @@ -35,8 +36,9 @@ case class JsonTable( options: CaseInsensitiveStringMap, paths: Seq[String], userSpecifiedSchema: Option[StructType], + userSpecifiedPartitioning: Option[Array[Transform]], fallbackFileFormat: Class[_ <: FileFormat]) - extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + extends FileTable(sparkSession, options, paths, userSpecifiedSchema, userSpecifiedPartitioning) { override def newScanBuilder(options: CaseInsensitiveStringMap): JsonScanBuilder = new JsonScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala index 8665af33b976a..9c1298f9f2490 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala @@ -16,7 +16,10 @@ */ package org.apache.spark.sql.execution.datasources.v2.orc -import org.apache.spark.sql.connector.catalog.Table +import java.util + +import org.apache.spark.sql.connector.catalog.{Identifier, Table} +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.v2._ @@ -32,13 +35,25 @@ class OrcDataSourceV2 extends FileDataSourceV2 { override def getTable(options: CaseInsensitiveStringMap): Table = { val paths = getPaths(options) val tableName = getTableName(paths) - OrcTable(tableName, sparkSession, options, paths, None, fallbackFileFormat) + OrcTable(tableName, sparkSession, options, paths, None, None, fallbackFileFormat) } override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { val paths = getPaths(options) val tableName = getTableName(paths) - OrcTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat) + OrcTable(tableName, sparkSession, options, paths, Some(schema), None, fallbackFileFormat) + } + + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { + val options = toOptions(ident) + val paths = getPaths(options) + val tableName = getTableName(paths) + OrcTable( + tableName, sparkSession, options, paths, Some(schema), Some(partitions), fallbackFileFormat) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala index f2e4b88e9f1ae..c047df352358e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.write.WriteBuilder import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.orc.OrcUtils @@ -34,8 +35,9 @@ case class OrcTable( options: CaseInsensitiveStringMap, paths: Seq[String], userSpecifiedSchema: Option[StructType], + userSpecifiedPartitioning: Option[Array[Transform]], fallbackFileFormat: Class[_ <: FileFormat]) - extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + extends FileTable(sparkSession, options, paths, userSpecifiedSchema, userSpecifiedPartitioning) { override def newScanBuilder(options: CaseInsensitiveStringMap): OrcScanBuilder = new OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala index 8cb6186c12ff3..6a3e50788a80c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala @@ -16,7 +16,10 @@ */ package org.apache.spark.sql.execution.datasources.v2.parquet -import org.apache.spark.sql.connector.catalog.Table +import java.util + +import org.apache.spark.sql.connector.catalog.{Identifier, Table} +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.v2._ @@ -32,13 +35,25 @@ class ParquetDataSourceV2 extends FileDataSourceV2 { override def getTable(options: CaseInsensitiveStringMap): Table = { val paths = getPaths(options) val tableName = getTableName(paths) - ParquetTable(tableName, sparkSession, options, paths, None, fallbackFileFormat) + ParquetTable(tableName, sparkSession, options, paths, None, None, fallbackFileFormat) } override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { val paths = getPaths(options) val tableName = getTableName(paths) - ParquetTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat) + ParquetTable(tableName, sparkSession, options, paths, Some(schema), None, fallbackFileFormat) + } + + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { + val options = toOptions(ident) + val paths = getPaths(options) + val tableName = getTableName(paths) + ParquetTable( + tableName, sparkSession, options, paths, Some(schema), Some(partitions), fallbackFileFormat) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala index 2ad64b1aa5244..3a65dbf0d06cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.write.WriteBuilder import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils @@ -34,8 +35,9 @@ case class ParquetTable( options: CaseInsensitiveStringMap, paths: Seq[String], userSpecifiedSchema: Option[StructType], + userSpecifiedPartitioning: Option[Array[Transform]], fallbackFileFormat: Class[_ <: FileFormat]) - extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + extends FileTable(sparkSession, options, paths, userSpecifiedSchema, userSpecifiedPartitioning) { override def newScanBuilder(options: CaseInsensitiveStringMap): ParquetScanBuilder = new ParquetScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala index 049c717effa26..949b4cec46269 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala @@ -16,11 +16,15 @@ */ package org.apache.spark.sql.execution.datasources.v2.text -import org.apache.spark.sql.connector.catalog.Table +import java.util + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.connector.catalog.{Identifier, Table} +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.text.TextFileFormat import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap class TextDataSourceV2 extends FileDataSourceV2 { @@ -32,13 +36,29 @@ class TextDataSourceV2 extends FileDataSourceV2 { override def getTable(options: CaseInsensitiveStringMap): Table = { val paths = getPaths(options) val tableName = getTableName(paths) - TextTable(tableName, sparkSession, options, paths, None, fallbackFileFormat) + TextTable(tableName, sparkSession, options, paths, None, None, fallbackFileFormat) } override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { val paths = getPaths(options) val tableName = getTableName(paths) - TextTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat) + TextTable(tableName, sparkSession, options, paths, Some(schema), None, fallbackFileFormat) + } + + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { + if (schema != StructType(Seq(StructField("value", StringType)))) { + throw new AnalysisException( + "Text table format requires a single string column with the name value.") + } + val options = toOptions(ident) + val paths = getPaths(options) + val tableName = getTableName(paths) + TextTable( + tableName, sparkSession, options, paths, Some(schema), Some(partitions), fallbackFileFormat) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala index 87bfa84985e5a..eea39cf0744ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2.text import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.write.WriteBuilder import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.FileTable @@ -31,8 +32,9 @@ case class TextTable( options: CaseInsensitiveStringMap, paths: Seq[String], userSpecifiedSchema: Option[StructType], + userSpecifiedPartitioning: Option[Array[Transform]], fallbackFileFormat: Class[_ <: FileFormat]) - extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + extends FileTable(sparkSession, options, paths, userSpecifiedSchema, userSpecifiedPartitioning) { override def newScanBuilder(options: CaseInsensitiveStringMap): TextScanBuilder = TextScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala index cd804adfa2133..afe7c3d30b59f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala @@ -16,12 +16,15 @@ */ package org.apache.spark.sql.connector +import java.util + import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.{AnalysisException, QueryTest} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability} +import org.apache.spark.sql.connector.catalog._ +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.WriteBuilder import org.apache.spark.sql.execution.{FileSourceScanExec, QueryExecution} @@ -43,6 +46,14 @@ class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 { override def getTable(options: CaseInsensitiveStringMap): Table = { new DummyReadOnlyFileTable } + + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { + throw new UnsupportedOperationException("Not supported") + } } class DummyReadOnlyFileTable extends Table with SupportsRead { @@ -67,6 +78,14 @@ class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 { override def getTable(options: CaseInsensitiveStringMap): Table = { new DummyWriteOnlyFileTable } + + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { + throw new UnsupportedOperationException("Not supported") + } } class DummyWriteOnlyFileTable extends Table with SupportsWrite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala index 7f4bbcf97b534..a82905ed473e7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala @@ -35,7 +35,7 @@ class DummyFileTable( paths: Seq[String], expectedDataSchema: StructType, userSpecifiedSchema: Option[StructType]) - extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + extends FileTable(sparkSession, options, paths, userSpecifiedSchema, None) { override def inferSchema(files: Seq[FileStatus]): Option[StructType] = Some(expectedDataSchema) override def name(): String = "Dummy"