From f7c8eb5bca073d521d0f306e740cbc872cbc657e Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Mon, 28 Mar 2016 21:57:20 +0200 Subject: [PATCH 1/5] Add DDL commands to SparkSqlParser. --- .../spark/sql/execution/SparkSqlParser.scala | 619 +++++++++++++++++- .../execution/command/DDLCommandSuite.scala | 5 +- 2 files changed, 618 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index c098fa99c2220..73d836a9037ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -20,7 +20,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.parser.ng.{AbstractSqlParser, AstBuilder} +import org.apache.spark.sql.catalyst.parser.ng.{AbstractSqlParser, AstBuilder, ParseException} import org.apache.spark.sql.catalyst.parser.ng.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} import org.apache.spark.sql.execution.command.{DescribeCommand => _, _} @@ -200,8 +200,8 @@ class SparkSqlAstBuilder extends AstBuilder { } /** - * Convert a table property list into a key-value map. - */ + * Convert a table property list into a key-value map. + */ override def visitTablePropertyList( ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) { ctx.tableProperty.asScala.map { property => @@ -216,4 +216,617 @@ class SparkSqlAstBuilder extends AstBuilder { key -> value }.toMap } + + /** + * Create a [[CreateDatabase]] command. + * + * For example: + * {{{ + * CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment] + * [LOCATION path] [WITH DBPROPERTIES (key1=val1, key2=val2, ...)] + * }}} + */ + override def visitCreateDatabase(ctx: CreateDatabaseContext): LogicalPlan = withOrigin(ctx) { + CreateDatabase( + ctx.identifier.getText, + ctx.EXISTS != null, + Option(ctx.locationSpec).map(visitLocationSpec), + Option(ctx.comment).map(string), + Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty))( + command(ctx)) + } + + /** + * Create an [[AlterDatabaseProperties]] command. + * + * For example: + * {{{ + * ALTER (DATABASE|SCHEMA) database SET DBPROPERTIES (property_name=property_value, ...); + * }}} + */ + override def visitSetDatabaseProperties( + ctx: SetDatabasePropertiesContext): LogicalPlan = withOrigin(ctx) { + AlterDatabaseProperties( + ctx.identifier.getText, + visitTablePropertyList(ctx.tablePropertyList))( + command(ctx)) + } + + /** + * Create a [[DropDatabase]] command. + * + * For example: + * {{{ + * DROP (DATABASE|SCHEMA) [IF EXISTS] database [RESTRICT|CASCADE]; + * }}} + */ + override def visitDropDatabase(ctx: DropDatabaseContext): LogicalPlan = withOrigin(ctx) { + DropDatabase(ctx.identifier.getText, ctx.EXISTS != null, ctx.CASCADE == null)(command(ctx)) + } + + /** + * Create a [[DescribeDatabase]] command. + * + * For example: + * {{{ + * DESCRIBE DATABASE [EXTENDED] database; + * }}} + */ + override def visitDescribeDatabase(ctx: DescribeDatabaseContext): LogicalPlan = withOrigin(ctx) { + DescribeDatabase(ctx.identifier.getText, ctx.EXTENDED != null)(command(ctx)) + } + + /** + * Create a [[CreateFunction]] command. + * + * For example: + * {{{ + * CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name + * [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']]; + * }}} + */ + override def visitCreateFunction(ctx: CreateFunctionContext): LogicalPlan = withOrigin(ctx) { + val resources = ctx.resource.asScala.map { resource => + val resourceType = resource.identifier.getText.toLowerCase + resourceType match { + case "jar" | "file" | "archive" => + resourceType -> string(resource.STRING) + case other => + throw new ParseException(s"Resource Type '$resourceType' is not supported.", ctx) + } + } + + // Extract database, name & alias. + val (database, function) = visitFunctionName(ctx.qualifiedName) + CreateFunction( + database, + function, + string(ctx.className), // TODO this is not an alias. + resources, + ctx.TEMPORARY != null)( + command(ctx)) + } + + /** + * Create a [[DropFunction]] command. + * + * For example: + * {{{ + * DROP [TEMPORARY] FUNCTION [IF EXISTS] function; + * }}} + */ + override def visitDropFunction(ctx: DropFunctionContext): LogicalPlan = withOrigin(ctx) { + val (database, function) = visitFunctionName(ctx.qualifiedName) + DropFunction(database, function, ctx.EXISTS != null, ctx.TEMPORARY != null)(command(ctx)) + } + + /** + * Create a function database (optional) and name pair. + */ + private def visitFunctionName(ctx: QualifiedNameContext): (Option[String], String) = { + ctx.identifier().asScala.map(_.getText) match { + case Seq(db, fn) => (Option(db), fn) + case Seq(fn) => (None, fn) + case other => throw new ParseException(s"Unsupported function name '${ctx.getText}'", ctx) + } + } + + /** + * Create a [[AlterTableRename]] command. + * + * For example: + * {{{ + * ALTER TABLE table1 RENAME TO table2; + * }}} + */ + override def visitRenameTable(ctx: RenameTableContext): LogicalPlan = withOrigin(ctx) { + AlterTableRename( + visitTableIdentifier(ctx.from), + visitTableIdentifier(ctx.to))( + command(ctx)) + } + + /** + * Create an [[AlterTableSetProperties]] command. + * + * For example: + * {{{ + * ALTER TABLE table SET TBLPROPERTIES ('comment' = new_comment); + * }}} + */ + override def visitSetTableProperties( + ctx: SetTablePropertiesContext): LogicalPlan = withOrigin(ctx) { + AlterTableSetProperties( + visitTableIdentifier(ctx.tableIdentifier), + visitTablePropertyList(ctx.tablePropertyList))( + command(ctx)) + } + + /** + * Create an [[AlterTableUnsetProperties]] command. + * + * For example: + * {{{ + * ALTER TABLE table UNSET TBLPROPERTIES IF EXISTS ('comment', 'key'); + * }}} + */ + override def visitUnsetTableProperties( + ctx: UnsetTablePropertiesContext): LogicalPlan = withOrigin(ctx) { + AlterTableUnsetProperties( + visitTableIdentifier(ctx.tableIdentifier), + visitTablePropertyList(ctx.tablePropertyList), + ctx.EXISTS != null)( + command(ctx)) + } + + /** + * Create an [[AlterTableSerDeProperties]] command. + * + * For example: + * {{{ + * ALTER TABLE table [PARTITION spec] SET SERDE serde_name [WITH SERDEPROPERTIES props]; + * ALTER TABLE table [PARTITION spec] SET SERDEPROPERTIES serde_properties; + * }}} + */ + override def visitSetTableSerDe(ctx: SetTableSerDeContext): LogicalPlan = withOrigin(ctx) { + AlterTableSerDeProperties( + visitTableIdentifier(ctx.tableIdentifier), + Option(ctx.STRING).map(string), + Option(ctx.tablePropertyList).map(visitTablePropertyList), + // TODO a partition spec is allowed to have optional values. This is currently violated. + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))( + command(ctx)) + } + + /** + * Create an [[AlterTableStorageProperties]] command. + * + * For example: + * {{{ + * ALTER TABLE table CLUSTERED BY (col, ...) [SORTED BY (col, ...)] INTO n BUCKETS; + * }}} + */ + override def visitBucketTable(ctx: BucketTableContext): LogicalPlan = withOrigin(ctx) { + AlterTableStorageProperties( + visitTableIdentifier(ctx.tableIdentifier), + visitBucketSpec(ctx.bucketSpec))( + command(ctx)) + } + + /** + * Create an [[AlterTableNotClustered]] command. + * + * For example: + * {{{ + * ALTER TABLE table NOT CLUSTERED; + * }}} + */ + override def visitUnclusterTable(ctx: UnclusterTableContext): LogicalPlan = withOrigin(ctx) { + AlterTableNotClustered(visitTableIdentifier(ctx.tableIdentifier))(command(ctx)) + } + + /** + * Create an [[AlterTableNotSorted]] command. + * + * For example: + * {{{ + * ALTER TABLE table NOT SORTED; + * }}} + */ + override def visitUnsortTable(ctx: UnsortTableContext): LogicalPlan = withOrigin(ctx) { + AlterTableNotSorted(visitTableIdentifier(ctx.tableIdentifier))(command(ctx)) + } + + /** + * Create an [[AlterTableSkewed]] command. + * + * For example: + * {{{ + * ALTER TABLE table SKEWED BY (col1, col2) + * ON ((col1_value, col2_value) [, (col1_value, col2_value), ...]) + * [STORED AS DIRECTORIES]; + * }}} + */ + override def visitSkewTable(ctx: SkewTableContext): LogicalPlan = withOrigin(ctx) { + val table = visitTableIdentifier(ctx.tableIdentifier) + val (cols, values, storedAsDirs) = visitSkewSpec(ctx.skewSpec) + AlterTableSkewed(table, cols, values, storedAsDirs)(command(ctx)) + } + + /** + * Create an [[AlterTableNotSorted]] command. + * + * For example: + * {{{ + * ALTER TABLE table NOT SKEWED; + * }}} + */ + override def visitUnskewTable(ctx: UnskewTableContext): LogicalPlan = withOrigin(ctx) { + AlterTableNotSkewed(visitTableIdentifier(ctx.tableIdentifier))(command(ctx)) + } + + /** + * Create an [[AlterTableNotStoredAsDirs]] command. + * + * For example: + * {{{ + * ALTER TABLE table NOT STORED AS DIRECTORIES + * }}} + */ + override def visitUnstoreTable(ctx: UnstoreTableContext): LogicalPlan = withOrigin(ctx) { + AlterTableNotStoredAsDirs(visitTableIdentifier(ctx.tableIdentifier))(command(ctx)) + } + + /** + * Create an [[AlterTableNotStoredAsDirs]] command. + * + * For example: + * {{{ + * ALTER TABLE table SET SKEWED LOCATION (col1="loc1" [, (col2, col3)="loc2", ...] ); + * }}} + */ + override def visitSetTableSkewLocations( + ctx: SetTableSkewLocationsContext): LogicalPlan = withOrigin(ctx) { + val skewedMap = ctx.skewedLocationList.skewedLocation.asScala.flatMap { + slCtx => + val location = string(slCtx.STRING) + if (slCtx.constant != null) { + Seq(visitStringConstant(slCtx.constant) -> location) + } else { + // TODO this is similar to what was in the original implementation. However this does not + // make to much sense to me since we should be storing a tuple of values (not column + // names) for which we want a dedicated storage location. + visitConstantList(slCtx.constantList).map(_ -> location) + } + }.toMap + + AlterTableSkewedLocation( + visitTableIdentifier(ctx.tableIdentifier), + skewedMap)( + command(ctx)) + } + + /** + * Create an [[AlterTableAddPartition]] command. + * + * For example: + * {{{ + * ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1'] + * }}} + */ + override def visitAddTablePartition( + ctx: AddTablePartitionContext): LogicalPlan = withOrigin(ctx) { + // Create partition spec to location mapping. + val specsAndLocs = ctx.partitionSpecLocation.asScala.map { + splCtx => + val spec = visitNonOptionalPartitionSpec(splCtx.partitionSpec) + val location = Option(splCtx.locationSpec).map(visitLocationSpec) + spec -> location + } + AlterTableAddPartition( + visitTableIdentifier(ctx.tableIdentifier), + specsAndLocs, + ctx.EXISTS != null)( + command(ctx)) + } + + /** + * Create an [[AlterTableExchangePartition]] command. + * + * For example: + * {{{ + * ALTER TABLE table1 EXCHANGE PARTITION spec WITH TABLE table2; + * }}} + */ + override def visitExchangeTablePartition( + ctx: ExchangeTablePartitionContext): LogicalPlan = withOrigin(ctx) { + AlterTableExchangePartition( + visitTableIdentifier(ctx.from), + visitTableIdentifier(ctx.to), + visitNonOptionalPartitionSpec(ctx.partitionSpec))( + command(ctx)) + } + + /** + * Create an [[AlterTableRenamePartition]] command + * + * For example: + * {{{ + * ALTER TABLE table PARTITION spec1 RENAME TO PARTITION spec2; + * }}} + */ + override def visitRenameTablePartition( + ctx: RenameTablePartitionContext): LogicalPlan = withOrigin(ctx) { + AlterTableRenamePartition( + visitTableIdentifier(ctx.tableIdentifier), + visitNonOptionalPartitionSpec(ctx.from), + visitNonOptionalPartitionSpec(ctx.to))( + command(ctx)) + } + + /** + * Create an [[AlterTableDropPartition]] command + * + * For example: + * {{{ + * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * }}} + */ + override def visitDropTablePartitions( + ctx: DropTablePartitionsContext): LogicalPlan = withOrigin(ctx) { + AlterTableDropPartition( + visitTableIdentifier(ctx.tableIdentifier), + ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), + ctx.EXISTS != null, + ctx.PURGE != null)( + command(ctx)) + } + + /** + * Create an [[AlterTableArchivePartition]] command + * + * For example: + * {{{ + * ALTER TABLE table ARCHIVE PARTITION spec; + * }}} + */ + override def visitArchiveTablePartition( + ctx: ArchiveTablePartitionContext): LogicalPlan = withOrigin(ctx) { + AlterTableArchivePartition( + visitTableIdentifier(ctx.tableIdentifier), + visitNonOptionalPartitionSpec(ctx.partitionSpec))( + command(ctx)) + } + + /** + * Create an [[AlterTableUnarchivePartition]] command + * + * For example: + * {{{ + * ALTER TABLE table UNARCHIVE PARTITION spec; + * }}} + */ + override def visitUnarchiveTablePartition( + ctx: UnarchiveTablePartitionContext): LogicalPlan = withOrigin(ctx) { + AlterTableUnarchivePartition( + visitTableIdentifier(ctx.tableIdentifier), + visitNonOptionalPartitionSpec(ctx.partitionSpec))( + command(ctx)) + } + + /** + * Create an [[AlterTableSetFileFormat]] command + * + * For example: + * {{{ + * ALTER TABLE table [PARTITION spec] SET FILEFORMAT file_format; + * }}} + */ + override def visitSetTableFileFormat( + ctx: SetTableFileFormatContext): LogicalPlan = withOrigin(ctx) { + // AlterTableSetFileFormat currently takes both a GenericFileFormat and a + // TableFileFormatContext. This is a bit weird because it should only take one. It also should + // use a CatalogFileFormat instead of either a String or a Sequence of Strings. We will address + // this in a follow-up PR. + val (fileFormat, genericFormat) = ctx.fileFormat match { + case s: GenericFileFormatContext => + (Seq.empty[String], Option(s.identifier.getText)) + case s: TableFileFormatContext => + val elements = Seq(s.inFmt, s.outFmt, s.serdeCls) ++ + Option(s.inDriver).toSeq ++ + Option(s.outDriver).toSeq + (elements.map(string), None) + } + AlterTableSetFileFormat( + visitTableIdentifier(ctx.tableIdentifier), + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec), + fileFormat, + genericFormat)( + command(ctx)) + } + + /** + * Create an [[AlterTableSetLocation]] command + * + * For example: + * {{{ + * ALTER TABLE table [PARTITION spec] SET LOCATION "loc"; + * }}} + */ + override def visitSetTableLocation(ctx: SetTableLocationContext): LogicalPlan = withOrigin(ctx) { + AlterTableSetLocation( + visitTableIdentifier(ctx.tableIdentifier), + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec), + visitLocationSpec(ctx.locationSpec))( + command(ctx)) + } + + /** + * Create an [[AlterTableTouch]] command + * + * For example: + * {{{ + * ALTER TABLE table TOUCH [PARTITION spec]; + * }}} + */ + override def visitTouchTable(ctx: TouchTableContext): LogicalPlan = withOrigin(ctx) { + AlterTableTouch( + visitTableIdentifier(ctx.tableIdentifier), + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))( + command(ctx)) + } + + /** + * Create an [[AlterTableCompact]] command + * + * For example: + * {{{ + * ALTER TABLE table [PARTITION spec] COMPACT 'compaction_type'; + * }}} + */ + override def visitCompactTable(ctx: CompactTableContext): LogicalPlan = withOrigin(ctx) { + AlterTableCompact( + visitTableIdentifier(ctx.tableIdentifier), + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec), + string(ctx.STRING))( + command(ctx)) + } + + /** + * Create an [[AlterTableMerge]] command + * + * For example: + * {{{ + * ALTER TABLE table [PARTITION spec] CONCATENATE; + * }}} + */ + override def visitConcatenateTable(ctx: ConcatenateTableContext): LogicalPlan = withOrigin(ctx) { + AlterTableMerge( + visitTableIdentifier(ctx.tableIdentifier), + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))( + command(ctx)) + } + + /** + * Create an [[AlterTableChangeCol]] command + * + * For example: + * {{{ + * ALTER TABLE tableIdentifier [PARTITION spec] + * CHANGE [COLUMN] col_old_name col_new_name column_type [COMMENT col_comment] + * [FIRST|AFTER column_name] [CASCADE|RESTRICT]; + * }}} + */ + override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = withOrigin(ctx) { + val col = visitColType(ctx.colType()) + val comment = if (col.metadata.contains("comment")) { + Option(col.metadata.getString("comment")) + } else { + None + } + + AlterTableChangeCol( + visitTableIdentifier(ctx.tableIdentifier), + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec), + ctx.oldName.getText, + // We could also pass in a struct field - seems easier. + col.name, + col.dataType, + comment, + Option(ctx.after).map(_.getText), + // Note that Restrict and Cascade are mutually exclusive. + ctx.RESTRICT != null, + ctx.CASCADE != null)( + command(ctx)) + } + + /** + * Create an [[AlterTableAddCol]] command + * + * For example: + * {{{ + * ALTER TABLE tableIdentifier [PARTITION spec] + * ADD COLUMNS (name type [COMMENT comment], ...) [CASCADE|RESTRICT] + * }}} + */ + override def visitAddColumns(ctx: AddColumnsContext): LogicalPlan = withOrigin(ctx) { + AlterTableAddCol( + visitTableIdentifier(ctx.tableIdentifier), + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec), + createStructType(ctx.colTypeList), + // Note that Restrict and Cascade are mutually exclusive. + ctx.RESTRICT != null, + ctx.CASCADE != null)( + command(ctx)) + } + + /** + * Create an [[AlterTableReplaceCol]] command + * + * For example: + * {{{ + * ALTER TABLE tableIdentifier [PARTITION spec] + * REPLACE COLUMNS (name type [COMMENT comment], ...) [CASCADE|RESTRICT] + * }}} + */ + override def visitReplaceColumns(ctx: ReplaceColumnsContext): LogicalPlan = withOrigin(ctx) { + AlterTableReplaceCol( + visitTableIdentifier(ctx.tableIdentifier), + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec), + createStructType(ctx.colTypeList), + // Note that Restrict and Cascade are mutually exclusive. + ctx.RESTRICT != null, + ctx.CASCADE != null)( + command(ctx)) + } + + /** + * Create location string. + */ + override def visitLocationSpec(ctx: LocationSpecContext): String = withOrigin(ctx) { + string(ctx.STRING) + } + + /** + * Create a [[BucketSpec]]. + */ + override def visitBucketSpec(ctx: BucketSpecContext): BucketSpec = withOrigin(ctx) { + BucketSpec( + ctx.INTEGER_VALUE.getText.toInt, + visitIdentifierList(ctx.identifierList), + Option(ctx.orderedIdentifierList).toSeq + .flatMap(_.orderedIdentifier.asScala) + .map(_.identifier.getText)) + } + + /** + * Create a skew specification. This contains three components: + * - The Skewed Columns + * - Values for which are skewed. The size of each entry must match the number of skewed columns. + * - A store in directory flag. + */ + override def visitSkewSpec( + ctx: SkewSpecContext): (Seq[String], Seq[Seq[String]], Boolean) = withOrigin(ctx) { + val skewedValues = if (ctx.constantList != null) { + Seq(visitConstantList(ctx.constantList)) + } else { + visitNestedConstantList(ctx.nestedConstantList) + } + (visitIdentifierList(ctx.identifierList), skewedValues, ctx.DIRECTORIES != null) + } + + /** + * Convert a nested constants list into a sequence of string sequences. + */ + override def visitNestedConstantList( + ctx: NestedConstantListContext): Seq[Seq[String]] = withOrigin(ctx) { + ctx.constantList.asScala.map(visitConstantList) + } + + /** + * Convert a constants list into a String sequence. + */ + override def visitConstantList(ctx: ConstantListContext): Seq[String] = withOrigin(ctx) { + ctx.constant.asScala.map(visitStringConstant) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 7a6343748ba9e..03079c6890a84 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -18,14 +18,13 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending} import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.execution.SparkQl +import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.types._ class DDLCommandSuite extends PlanTest { - private val parser = new SparkQl + private val parser = SparkSqlParser test("create database") { val sql = From 575acdb251328b28f05b2748d73a2a46b930f967 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Mon, 28 Mar 2016 22:14:04 +0200 Subject: [PATCH 2/5] Fix doc. --- .../scala/org/apache/spark/sql/execution/SparkSqlParser.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 73d836a9037ab..ef2fa09ea069a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -478,7 +478,7 @@ class SparkSqlAstBuilder extends AstBuilder { } /** - * Create an [[AlterTableNotStoredAsDirs]] command. + * Create an [[AlterTableSkewedLocation]] command. * * For example: * {{{ From b836eae02e10f9a0600c9c2a7e51ffa7365cb848 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Mon, 28 Mar 2016 22:58:38 +0200 Subject: [PATCH 3/5] SerdeClass is option in tableRowFormat. --- .../scala/org/apache/spark/sql/execution/SparkSqlParser.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index ef2fa09ea069a..a8313deeefc2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -632,7 +632,8 @@ class SparkSqlAstBuilder extends AstBuilder { case s: GenericFileFormatContext => (Seq.empty[String], Option(s.identifier.getText)) case s: TableFileFormatContext => - val elements = Seq(s.inFmt, s.outFmt, s.serdeCls) ++ + val elements = Seq(s.inFmt, s.outFmt) ++ + Option(s.serdeCls).toSeq ++ Option(s.inDriver).toSeq ++ Option(s.outDriver).toSeq (elements.map(string), None) From 1fc35921782f29b318a866be04df68f0e2e9cb81 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Mon, 28 Mar 2016 23:45:29 +0200 Subject: [PATCH 4/5] Migrate HiveQl parsing to ANTLR4 parser. --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 13 +- .../spark/sql/hive/HiveSessionState.scala | 3 +- .../sql/hive/execution/HiveSqlParser.scala | 428 ++++++++++++++++++ 3 files changed, 442 insertions(+), 2 deletions(-) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c7066d73631af..0cc645edd8f21 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -85,7 +85,18 @@ private[hive] object HiveSerDe { HiveSerDe( inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), - serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))) + serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")), + + "textfile" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")), + + "avro" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"), + serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))) val key = source.toLowerCase match { case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet" diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index caa7f296ed16a..f24a0dd57071c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.execution.{python, SparkPlanner} import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.hive.execution.HiveSqlParser import org.apache.spark.sql.internal.{SessionState, SQLConf} @@ -70,7 +71,7 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) /** * Parser for HiveQl query texts. */ - override lazy val sqlParser: ParserInterface = new HiveQl(conf) + override lazy val sqlParser: ParserInterface = HiveSqlParser /** * Planner that takes into account Hive-specific strategies. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala new file mode 100644 index 0000000000000..7ca546a9055e3 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution + +import scala.collection.JavaConverters._ + +import org.antlr.v4.runtime.{ParserRuleContext, Token} +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hadoop.hive.ql.exec.FunctionRegistry +import org.apache.hadoop.hive.ql.parse.EximUtil +import org.apache.hadoop.hive.ql.session.SessionState +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe + +import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.parser.ng._ +import org.apache.spark.sql.catalyst.parser.ng.SqlBaseParser._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkSqlAstBuilder +import org.apache.spark.sql.hive.{CreateTableAsSelect => CTAS, CreateViewAsSelect => CreateView} +import org.apache.spark.sql.hive.{HiveGenericUDTF, HiveSerDe} +import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper + +/** + * Concrete parser for HiveQl statements. + */ +object HiveSqlParser extends AbstractSqlParser { + val astBuilder = new HiveSqlAstBuilder + + override protected def nativeCommand(sqlText: String): LogicalPlan = { + HiveNativeCommand(sqlText) + } +} + +/** + * Builder that converts an ANTLR ParseTree into a LogicalPlan/Expression/TableIdentifier. + */ +class HiveSqlAstBuilder extends SparkSqlAstBuilder { + import ParserUtils._ + + /** + * Get the current Hive Configuration. + */ + private[this] def hiveConf: HiveConf = { + var ss = SessionState.get() + // SessionState is lazy initialization, it can be null here + if (ss == null) { + val original = Thread.currentThread().getContextClassLoader + val conf = new HiveConf(classOf[SessionState]) + conf.setClassLoader(original) + ss = new SessionState(conf) + SessionState.start(ss) + } + ss.getConf + } + + /** + * Pass a command to Hive using a [[HiveNativeCommand]]. + */ + override def visitExecuteNativeCommand( + ctx: ExecuteNativeCommandContext): LogicalPlan = withOrigin(ctx) { + HiveNativeCommand(command(ctx)) + } + + /** + * Create an [[AddJar]] or [[AddFile]] command depending on the requested resource. + */ + override def visitAddResource(ctx: AddResourceContext): LogicalPlan = withOrigin(ctx) { + ctx.identifier.getText.toLowerCase match { + case "file" => AddFile(remainder(ctx.identifier).trim) + case "jar" => AddJar(remainder(ctx.identifier).trim) + case other => throw new ParseException(s"Unsupported resource type '$other'.", ctx) + } + } + + /** + * Create a [[DropTable]] command. + */ + override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) { + if (ctx.PURGE != null) { + logWarning("PURGE option is ignored.") + } + if (ctx.REPLICATION != null) { + logWarning("REPLICATION clause is ignored.") + } + DropTable(visitTableIdentifier(ctx.tableIdentifier).toString, ctx.EXISTS != null) + } + + /** + * Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other + * options are passed on to Hive) e.g.: + * {{{ + * ANALYZE TABLE table COMPUTE STATISTICS NOSCAN; + * }}} + */ + override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) { + if (ctx.partitionSpec == null && + ctx.identifier != null && + ctx.identifier.getText.toLowerCase == "noscan") { + AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString) + } else { + HiveNativeCommand(command(ctx)) + } + } + + /** + * Create a [[CreateTableAsSelect]] command. + */ + override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = { + if (ctx.query == null) { + HiveNativeCommand(command(ctx)) + } else { + // Get the table header. + val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) + val tableType = if (external) { + CatalogTableType.EXTERNAL_TABLE + } else { + CatalogTableType.MANAGED_TABLE + } + + // Unsupported clauses. + if (temp) { + logWarning("TEMPORARY clause is ignored.") + } + if (ctx.bucketSpec != null) { + // TODO add this - we need cluster columns in the CatalogTable for this to work. + logWarning("CLUSTERED BY ... [ORDERED BY ...] INTO ... BUCKETS clause is ignored.") + } + if (ctx.skewSpec != null) { + logWarning("SKEWED BY ... ON ... [STORED AS DIRECTORIES] clause is ignored.") + } + + // Create the schema. + val schema = Option(ctx.colTypeList).toSeq.flatMap(_.colType.asScala).map { col => + CatalogColumn( + col.identifier.getText, + col.dataType.getText.toLowerCase, // TODO validate this? + nullable = true, + Option(col.STRING).map(string)) + } + + // Get the column by which the table is partitioned. + val partitionCols = Option(ctx.identifierList).toSeq.flatMap(visitIdentifierList).map { + CatalogColumn(_, null, nullable = true, None) + } + + // Create the storage. + def format(fmt: ParserRuleContext): CatalogStorageFormat = { + Option(fmt).map(typedVisit[CatalogStorageFormat]).getOrElse(EmptyStorageFormat) + } + // Default storage. + val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT) + val hiveSerDe = HiveSerDe.sourceToSerDe(defaultStorageType, hiveConf).getOrElse { + HiveSerDe( + inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")) + } + // Defined storage. + val fileStorage = format(ctx.createFileFormat) + val rowStorage = format(ctx.rowFormat) + val storage = CatalogStorageFormat( + Option(ctx.locationSpec).map(visitLocationSpec), + fileStorage.inputFormat.orElse(hiveSerDe.inputFormat), + fileStorage.outputFormat.orElse(hiveSerDe.outputFormat), + rowStorage.serde.orElse(hiveSerDe.serde).orElse(fileStorage.serde), + rowStorage.serdeProperties ++ fileStorage.serdeProperties + ) + + val tableDesc = CatalogTable( + name = table, + tableType = tableType, + schema = schema, + partitionColumns = partitionCols, + storage = storage, + properties = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty), + // TODO support the sql text - have a proper location for this! + viewText = Option(ctx.STRING).map(string)) + CTAS(tableDesc, plan(ctx.query), ifNotExists) + } + } + + /** + * Create or replace a view. This creates a [[CreateViewAsSelect]] command. + */ + override def visitCreateView(ctx: CreateViewContext): LogicalPlan = withOrigin(ctx) { + // Pass a partitioned view on to hive. + if (ctx.identifierList != null) { + HiveNativeCommand(command(ctx)) + } else { + if (ctx.STRING != null) { + logWarning("COMMENT clause is ignored.") + } + val identifiers = Option(ctx.identifierCommentList).toSeq.flatMap(_.identifierComment.asScala) + val schema = identifiers.map { ic => + CatalogColumn(ic.identifier.getText, null, nullable = true, Option(ic.STRING).map(string)) + } + createView( + ctx, + ctx.tableIdentifier, + schema, + ctx.query, + Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty), + ctx.EXISTS != null, + ctx.REPLACE != null + ) + } + } + + /** + * Alter the query of a view. This creates a [[CreateViewAsSelect]] command. + */ + override def visitAlterViewQuery(ctx: AlterViewQueryContext): LogicalPlan = withOrigin(ctx) { + createView( + ctx, + ctx.tableIdentifier, + Seq.empty, + ctx.query, + Map.empty, + allowExist = false, + replace = true) + } + + /** + * Create a [[CreateViewAsSelect]] command. + */ + private def createView( + ctx: ParserRuleContext, + name: TableIdentifierContext, + schema: Seq[CatalogColumn], + query: QueryContext, + properties: Map[String, String], + allowExist: Boolean, + replace: Boolean): LogicalPlan = { + val sql = Option(source(query)) + val tableDesc = CatalogTable( + name = visitTableIdentifier(name), + tableType = CatalogTableType.VIRTUAL_VIEW, + schema = schema, + storage = EmptyStorageFormat, + properties = properties, + viewOriginalText = sql, + viewText = sql) + CreateView(tableDesc, plan(query), allowExist, replace, command(ctx)) + } + + /** + * Create a [[Generator]]. Override this method in order to support custom Generators. + */ + override protected def withGenerator( + name: String, + expressions: Seq[Expression], + ctx: LateralViewContext): Generator = { + val info = Option(FunctionRegistry.getFunctionInfo(name.toLowerCase)).getOrElse { + throw new ParseException(s"Couldn't find Generator function '$name'", ctx) + } + HiveGenericUDTF(name, new HiveFunctionWrapper(info.getFunctionClass.getName), expressions) + } + + /** + * Create a [[HiveScriptIOSchema]]. + */ + override protected def withScriptIOSchema( + inRowFormat: RowFormatContext, + recordWriter: Token, + outRowFormat: RowFormatContext, + recordReader: Token, + schemaLess: Boolean): HiveScriptIOSchema = { + if (recordWriter != null || recordReader != null) { + logWarning("Used defined record reader/writer classes are currently ignored.") + } + + // Decode and input/output format. + type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) + def format(fmt: RowFormatContext, confVar: ConfVars): Format = fmt match { + case c: RowFormatDelimitedContext => + // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema + // expects a seq of pairs in which the old parsers' token names are used as keys. + // Transforming the result of visitRowFormatDelimited would be quite a bit messier than + // retrieving the key value pairs ourselves. + def entry(key: String, value: Token): Seq[(String, String)] = { + Option(value).map(t => key -> t.getText).toSeq + } + val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++ + entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++ + entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++ + entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++ + entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs) + + (entries, None, Seq.empty, None) + + case c: RowFormatSerdeContext => + // Use a serde format. + val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c) + + // SPARK-10310: Special cases LazySimpleSerDe + val recordHandler = if (name == classOf[LazySimpleSerDe].getCanonicalName) { + Option(hiveConf.getVar(confVar)) + } else { + None + } + (Seq.empty, Option(name), props.toSeq, recordHandler) + + case null => + // Use default (serde) format. + val name = hiveConf.getVar(ConfVars.HIVESCRIPTSERDE) + val props = Seq(serdeConstants.FIELD_DELIM -> "\t") + val recordHandler = Option(hiveConf.getVar(confVar)) + (Nil, Option(name), props, recordHandler) + } + + val (inFormat, inSerdeClass, inSerdeProps, reader) = + format(inRowFormat, ConfVars.HIVESCRIPTRECORDREADER) + + val (outFormat, outSerdeClass, outSerdeProps, writer) = + format(inRowFormat, ConfVars.HIVESCRIPTRECORDWRITER) + + HiveScriptIOSchema( + inFormat, outFormat, + inSerdeClass, outSerdeClass, + inSerdeProps, outSerdeProps, + reader, writer, + schemaLess) + } + + /** + * Create location string. + */ + override def visitLocationSpec(ctx: LocationSpecContext): String = { + EximUtil.relativeToAbsolutePath(hiveConf, super.visitLocationSpec(ctx)) + } + + /** Empty storage format for default values and copies. */ + private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, Map.empty) + + /** + * Create a [[CatalogStorageFormat]]. The INPUTDRIVER and OUTPUTDRIVER clauses are currently + * ignored. + */ + override def visitTableFileFormat( + ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { + import ctx._ + if (inDriver != null || outDriver != null) { + logWarning("INPUTDRIVER ... OUTPUTDRIVER ... clauses are ignored.") + } + EmptyStorageFormat.copy( + inputFormat = Option(string(inFmt)), + outputFormat = Option(string(outFmt)), + serde = Option(serdeCls).map(string) + ) + } + + /** + * Resolve a [[HiveSerDe]] based on the format name given. + */ + override def visitGenericFileFormat( + ctx: GenericFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { + val source = ctx.identifier.getText + HiveSerDe.sourceToSerDe(source, hiveConf) match { + case Some(s) => + EmptyStorageFormat.copy( + inputFormat = s.inputFormat, + outputFormat = s.outputFormat, + serde = s.serde) + case None => + throw new ParseException(s"Unrecognized file format in STORED AS clause: $source", ctx) + } + } + + /** + * Storage Handlers are currently not supported in the statements we support (CTAS). + */ + override def visitStorageHandler(ctx: StorageHandlerContext): AnyRef = withOrigin(ctx) { + throw new ParseException("Storage Handlers are currently unsupported.", ctx) + } + + /** + * Create SERDE row format name and properties pair. + */ + override def visitRowFormatSerde( + ctx: RowFormatSerdeContext): CatalogStorageFormat = withOrigin(ctx) { + import ctx._ + EmptyStorageFormat.copy( + serde = Option(string(name)), + serdeProperties = Option(tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty)) + } + + /** + * Create a delimited row format properties object. + */ + override def visitRowFormatDelimited( + ctx: RowFormatDelimitedContext): CatalogStorageFormat = withOrigin(ctx) { + // Collect the entries if any. + def entry(key: String, value: Token): Seq[(String, String)] = { + Option(value).toSeq.map(x => key -> string(x)) + } + // TODO we need proper support for the NULL format. + val entries = entry(serdeConstants.FIELD_DELIM, ctx.fieldsTerminatedBy) ++ + entry(serdeConstants.SERIALIZATION_FORMAT, ctx.fieldsTerminatedBy) ++ + entry(serdeConstants.ESCAPE_CHAR, ctx.escapedBy) ++ + entry(serdeConstants.COLLECTION_DELIM, ctx.collectionItemsTerminatedBy) ++ + entry(serdeConstants.MAPKEY_DELIM, ctx.keysTerminatedBy) ++ + Option(ctx.linesSeparatedBy).toSeq.map { token => + val value = string(token) + assert( + value == "\n", + s"LINES TERMINATED BY only supports newline '\\n' right now: $value", + ctx) + serdeConstants.LINE_DELIM -> value + } + EmptyStorageFormat.copy(serdeProperties = entries.toMap) + } +} From 1232590ac492fa50e3dcbd5b5a6dbaad8b980c87 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Tue, 29 Mar 2016 02:46:22 +0200 Subject: [PATCH 5/5] Handle unsupported Hive commands. --- .../spark/sql/catalyst/parser/ng/SqlBase.g4 | 34 +++++++++++++++++-- .../sql/hive/execution/HiveSqlParser.scala | 18 ++++++++-- 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/ng/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/ng/SqlBase.g4 index e46fd9bed5d03..4e77b6db25438 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/ng/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/ng/SqlBase.g4 @@ -118,7 +118,9 @@ statement | UNCACHE TABLE identifier #uncacheTable | CLEAR CACHE #clearCache | ADD identifier .*? #addResource + | SET ROLE .*? #failNativeCommand | SET .*? #setConfiguration + | kws=unsupportedHiveNativeCommands .*? #failNativeCommand | hiveNativeCommands #executeNativeCommand ; @@ -145,7 +147,26 @@ hiveNativeCommands | ROLLBACK WORK? | SHOW PARTITIONS tableIdentifier partitionSpec? | DFS .*? - | (CREATE | ALTER | DROP | SHOW | DESC | DESCRIBE | REVOKE | GRANT | LOCK | UNLOCK | MSCK | EXPORT | IMPORT | LOAD) .*? + | (CREATE | ALTER | DROP | SHOW | DESC | DESCRIBE | LOCK | UNLOCK | MSCK | LOAD) .*? + ; + +unsupportedHiveNativeCommands + : kw1=CREATE kw2=ROLE + | kw1=DROP kw2=ROLE + | kw1=GRANT kw2=ROLE? + | kw1=REVOKE kw2=ROLE? + | kw1=SHOW kw2=GRANT + | kw1=SHOW kw2=ROLE kw3=GRANT? + | kw1=SHOW kw2=PRINCIPALS + | kw1=SHOW kw2=ROLES + | kw1=SHOW kw2=CURRENT kw3=ROLES + | kw1=EXPORT kw2=TABLE + | kw1=IMPORT kw2=TABLE + | kw1=SHOW kw2=COMPACTIONS + | kw1=SHOW kw2=CREATE kw3=TABLE + | kw1=SHOW kw2=TRANSACTIONS + | kw1=SHOW kw2=INDEXES + | kw1=SHOW kw2=LOCKS ; createTableHeader @@ -619,7 +640,8 @@ nonReserved | AFTER | CASCADE | RESTRICT | BUCKETS | CLUSTERED | SORTED | PURGE | INPUTFORMAT | OUTPUTFORMAT | INPUTDRIVER | OUTPUTDRIVER | DBPROPERTIES | DFS | TRUNCATE | METADATA | REPLICATION | COMPUTE | STATISTICS | ANALYZE | PARTITIONED | EXTERNAL | DEFINED | RECORDWRITER - | REVOKE | GRANT | LOCK | UNLOCK | MSCK | EXPORT | IMPORT | LOAD | VALUES | COMMENT + | REVOKE | GRANT | LOCK | UNLOCK | MSCK | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE + | ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEXES | LOCKS | OPTION ; SELECT: 'SELECT'; @@ -834,6 +856,14 @@ MSCK: 'MSCK'; EXPORT: 'EXPORT'; IMPORT: 'IMPORT'; LOAD: 'LOAD'; +ROLE: 'ROLE'; +ROLES: 'ROLES'; +COMPACTIONS: 'COMPACTIONS'; +PRINCIPALS: 'PRINCIPALS'; +TRANSACTIONS: 'TRANSACTIONS'; +INDEXES: 'INDEXES'; +LOCKS: 'LOCKS'; +OPTION: 'OPTION'; STRING : '\'' ( ~('\''|'\\') | ('\\' .) )* '\'' diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala index 7ca546a9055e3..d6a08fcc57252 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala @@ -78,6 +78,20 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder { HiveNativeCommand(command(ctx)) } + /** + * Fail an unsupported Hive native command. + */ + override def visitFailNativeCommand( + ctx: FailNativeCommandContext): LogicalPlan = withOrigin(ctx) { + val keywords = if (ctx.kws != null) { + Seq(ctx.kws.kw1, ctx.kws.kw2, ctx.kws.kw3).filter(_ != null).map(_.getText).mkString(" ") + } else { + // SET ROLE is the exception to the rule, because we handle this before other SET commands. + "SET ROLE" + } + throw new ParseException(s"Unsupported operation: $keywords", ctx) + } + /** * Create an [[AddJar]] or [[AddFile]] command depending on the requested resource. */ @@ -183,7 +197,7 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder { ) val tableDesc = CatalogTable( - name = table, + identifier = table, tableType = tableType, schema = schema, partitionColumns = partitionCols, @@ -249,7 +263,7 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder { replace: Boolean): LogicalPlan = { val sql = Option(source(query)) val tableDesc = CatalogTable( - name = visitTableIdentifier(name), + identifier = visitTableIdentifier(name), tableType = CatalogTableType.VIRTUAL_VIEW, schema = schema, storage = EmptyStorageFormat,