Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
*/
package org.apache.spark.sql.v2.avro

import java.util

import org.apache.spark.sql.avro.AvroFileFormat
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.catalog.{Identifier, Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.types.StructType
Expand All @@ -32,12 +35,24 @@ class AvroDataSourceV2 extends FileDataSourceV2 {
override def getTable(options: CaseInsensitiveStringMap): Table = {
val paths = getPaths(options)
val tableName = getTableName(paths)
AvroTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
AvroTable(tableName, sparkSession, options, paths, None, None, fallbackFileFormat)
}

override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
val paths = getPaths(options)
val tableName = getTableName(paths)
AvroTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
AvroTable(tableName, sparkSession, options, paths, Some(schema), None, fallbackFileFormat)
}

override def createTable(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
val options = toOptions(ident)
val paths = getPaths(options)
val tableName = getTableName(paths)
AvroTable(
tableName, sparkSession, options, paths, Some(schema), Some(partitions), fallbackFileFormat)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.AvroUtils
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.write.WriteBuilder
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.FileTable
Expand All @@ -34,8 +35,9 @@ case class AvroTable(
options: CaseInsensitiveStringMap,
paths: Seq[String],
userSpecifiedSchema: Option[StructType],
userSpecifiedPartitioning: Option[Array[Transform]],
fallbackFileFormat: Class[_ <: FileFormat])
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
extends FileTable(sparkSession, options, paths, userSpecifiedSchema, userSpecifiedPartitioning) {
override def newScanBuilder(options: CaseInsensitiveStringMap): AvroScanBuilder =
new AvroScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog;

import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;

import java.util.Map;

public interface SupportCreateTable {

/**
* Load table metadata by {@link Identifier identifier} from the catalog.
* <p>
* If the catalog supports views and contains a view for the identifier and not a table, this
* must throw {@link NoSuchTableException}.
*
* @param ident a table identifier
* @return the table's metadata
* @throws NoSuchTableException If the table doesn't exist or is a view
*/
Table loadTable(Identifier ident) throws NoSuchTableException;

/**
* Test whether a table exists using an {@link Identifier identifier} from the catalog.
* <p>
* If the catalog supports views and contains a view for the identifier and not a table, this
* must return false.
*
* @param ident a table identifier
* @return true if the table exists, false otherwise
*/
default boolean tableExists(Identifier ident) {
try {
return loadTable(ident) != null;
} catch (NoSuchTableException e) {
return false;
}
}

/**
* Create a table in the catalog.
*
* @param ident a table identifier
* @param schema the schema of the new table, as a struct type
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @return metadata for the new table
* @throws TableAlreadyExistsException If a table or view already exists for the identifier
* @throws UnsupportedOperationException If a requested partition transform is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
*/
Table createTable(
Identifier ident,
StructType schema,
Transform[] partitions,
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException;

/**
* Drop a table in the catalog.
* <p>
* If the catalog supports views and contains a view for the identifier and not a table, this
* must not drop the view and must return false.
*
* @param ident a table identifier
* @return true if a table was deleted, false if no table exists for the identifier
*/
boolean dropTable(Identifier ident);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog;

import org.apache.spark.sql.util.CaseInsensitiveStringMap;

public interface SupportIdentifierTranslation extends SupportCreateTable, TableProvider {

Identifier fromOptions(CaseInsensitiveStringMap options);

CaseInsensitiveStringMap toOptions(Identifier identifier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* insensitive.
*/
@Experimental
public interface TableCatalog extends CatalogPlugin {
public interface TableCatalog extends CatalogPlugin, SupportCreateTable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do tables managed by a TableProvider not require invalidateTable?

/**
* List the tables in a namespace from the catalog.
* <p>
Expand Down Expand Up @@ -71,41 +71,6 @@ public interface TableCatalog extends CatalogPlugin {
default void invalidateTable(Identifier ident) {
}

/**
* Test whether a table exists using an {@link Identifier identifier} from the catalog.
* <p>
* If the catalog supports views and contains a view for the identifier and not a table, this
* must return false.
*
* @param ident a table identifier
* @return true if the table exists, false otherwise
*/
default boolean tableExists(Identifier ident) {
try {
return loadTable(ident) != null;
} catch (NoSuchTableException e) {
return false;
}
}

/**
* Create a table in the catalog.
*
* @param ident a table identifier
* @param schema the schema of the new table, as a struct type
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @return metadata for the new table
* @throws TableAlreadyExistsException If a table or view already exists for the identifier
* @throws UnsupportedOperationException If a requested partition transform is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
*/
Table createTable(
Identifier ident,
StructType schema,
Transform[] partitions,
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException;

/**
* Apply a set of {@link TableChange changes} to a table in the catalog.
* <p>
Expand All @@ -125,17 +90,6 @@ Table alterTable(
Identifier ident,
TableChange... changes) throws NoSuchTableException;

/**
* Drop a table in the catalog.
* <p>
* If the catalog supports views and contains a view for the identifier and not a table, this
* must not drop the view and must return false.
*
* @param ident a table identifier
* @return true if a table was deleted, false if no table exists for the identifier
*/
boolean dropTable(Identifier ident);

/**
* Renames a table in the catalog.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsNamespaces, TableCatalog, TableChange}
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -427,7 +427,7 @@ case class CreateV2Table(
* Create a new table from a select query with a v2 catalog.
*/
case class CreateTableAsSelect(
catalog: TableCatalog,
catalog: SupportCreateTable,
tableName: Identifier,
partitioning: Seq[Transform],
query: LogicalPlan,
Expand Down Expand Up @@ -477,7 +477,7 @@ case class ReplaceTable(
* If the table does not exist, and orCreate is false, then an exception will be thrown.
*/
case class ReplaceTableAsSelect(
catalog: TableCatalog,
catalog: SupportCreateTable,
tableName: Identifier,
partitioning: Seq[Transform],
query: LogicalPlan,
Expand Down
Loading