From f7bb7007cc76647c378c0041f537806d07061a42 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Wed, 13 Apr 2016 22:33:08 -0700 Subject: [PATCH 1/7] [SPARK-14127] Describe table --- .../sql/catalyst/catalog/SessionCatalog.scala | 19 ++++- .../spark/sql/execution/SparkSqlParser.scala | 49 +++++++---- .../spark/sql/execution/command/tables.scala | 63 +++++++------- .../spark/sql/execution/datasources/ddl.scala | 1 - .../execution/command/DDLCommandSuite.scala | 24 ++++++ .../spark/sql/hive/HiveSessionCatalog.scala | 23 ++++- .../sql/hive/client/HiveClientImpl.scala | 84 +++++++++++++++++++ .../sql/hive/execution/HiveCommandSuite.scala | 20 +++++ .../sql/hive/execution/HiveQuerySuite.scala | 29 +++++++ 9 files changed, 263 insertions(+), 49 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index b06f24bc4866a..7887c977da6c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchFunctionException, SimpleFunctionRegistry} import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder -import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.util.StringUtils @@ -320,6 +320,23 @@ class SessionCatalog( alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable) } + /** + * Describes a table by returning various metadata pertaining to table/partitions/columns. + */ + def describeTable( + table: TableIdentifier, + partSpec: Option[TablePartitionSpec], + colPath: Option[String], + isExtended: Boolean, + output: Seq[Attribute]): Seq[(String, String, String)] = { + val relation = lookupRelation(table) + relation.schema.fields.map { field => + val cmtKey = "comment" + val comment = if (field.metadata.contains(cmtKey)) field.metadata.getString(cmtKey) else "" + (field.name, field.dataType.simpleString, comment) + } + } + /** * Return whether a table with the specified name exists. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 8128a6efe3ccb..35b8149f1727b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema} import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} @@ -218,32 +217,31 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { // Create the explain comment. val statement = plan(ctx.statement) - if (isExplainableStatement(statement)) { - ExplainCommand(statement, extended = options.exists(_.EXTENDED != null), - codegen = options.exists(_.CODEGEN != null)) - } else { - ExplainCommand(OneRowRelation) - } - } - - /** - * Determine if a plan should be explained at all. - */ - protected def isExplainableStatement(plan: LogicalPlan): Boolean = plan match { - case _: DescribeTableCommand => false - case _ => true + ExplainCommand(statement, extended = options.exists(_.EXTENDED != null), + codegen = options.exists(_.CODEGEN != null)) } /** - * Create a [[DescribeTableCommand]] logical plan. + * A command for users to describe a table in the given database. If a databaseName is not given, + * the current database will be used. + * The syntax of using this command in SQL is: + * {{{ + * DESCRIBE [EXTENDED|FORMATTED] [db_name.]table_name [column_name] [PARTITION partition_spec] + * }}} */ override def visitDescribeTable(ctx: DescribeTableContext): LogicalPlan = withOrigin(ctx) { // FORMATTED and columns are not supported. Return null and let the parser decide what to do // with this (create an exception or pass it on to a different system). - if (ctx.describeColName != null || ctx.FORMATTED != null || ctx.partitionSpec != null) { + if (ctx.FORMATTED != null) { null } else { - DescribeTableCommand(visitTableIdentifier(ctx.tableIdentifier), ctx.EXTENDED != null) + val partitionKeys = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec) + val columnPath = Option(ctx.describeColName).map(visitDescribeColName) + DescribeCommand( + visitTableIdentifier(ctx.tableIdentifier), + partitionKeys, + columnPath, + ctx.EXTENDED != null) } } @@ -349,6 +347,21 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } } + /** + * A column path can be specified as an parameter to describe command. It is a dot separated + * elements where the last element can be a String. + * TODO - check with Herman + */ + override def visitDescribeColName(ctx: DescribeColNameContext): String = { + var result = ctx.identifier.asScala.map ( _.getText).mkString(".") + val elementStr = ctx.STRING().asScala.map(c => string(c)).mkString(".") + if (elementStr.nonEmpty) { + result ++ "." + elementStr + } else { + result + } + } + /** * Create a [[CreateDatabase]] command. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 6078918316d9e..feb0183622165 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.command import java.io.File import java.net.URI +import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec + import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.{AnalysisException, Row, SparkSession} @@ -268,15 +270,28 @@ case class LoadData( } /** - * Command that looks like + * A command for users to describe a table in the given database. If a databaseName is not given, + * the current database will be used. + * The syntax of using this command in SQL is: * {{{ - * DESCRIBE (EXTENDED) table_name; + * DESCRIBE [EXTENDED|FORMATTED] [db_name.]table_name [column_name] [PARTITION partition_spec] * }}} + * @param table table to be described. + * @param partSpec spec If specified, the specified partition is described. It is effective only + * when the table is a Hive table + * @param colPath If specified, only the specified column is described. It is effective only + * when the table is a Hive table + * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false. It is effective only + * when the table is a Hive table */ -case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean) +case class DescribeTableCommand( + table: TableIdentifier, + partSpec: Option[TablePartitionSpec], + colPath: Option[String], + isExtended: Boolean) extends RunnableCommand { - override val output: Seq[Attribute] = Seq( + override val output: Seq[Attribute] = Seq( // Column names are based on Hive. AttributeReference("col_name", StringType, nullable = false, new MetadataBuilder().putString("comment", "name of the column").build())(), @@ -287,31 +302,23 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean) ) override def run(sparkSession: SparkSession): Seq[Row] = { - val result = new ArrayBuffer[Row] - sparkSession.sessionState.catalog.lookupRelation(table) match { - case catalogRelation: CatalogRelation => - catalogRelation.catalogTable.schema.foreach { column => - result += Row(column.name, column.dataType, column.comment.orNull) - } - - if (catalogRelation.catalogTable.partitionColumns.nonEmpty) { - result += Row("# Partition Information", "", "") - result += Row(s"# ${output(0).name}", output(1).name, output(2).name) - - catalogRelation.catalogTable.partitionColumns.foreach { col => - result += Row(col.name, col.dataType, col.comment.orNull) - } - } - - case relation => - relation.schema.fields.foreach { field => - val comment = - if (field.metadata.contains("comment")) field.metadata.getString("comment") else "" - result += Row(field.name, field.dataType.simpleString, comment) - } + val catalog = sparkSession.sessionState.catalog + // Check to make sure supplied partition are valid partition columns. . + if (partSpec.isDefined && !catalog.isTemporaryTable(table)) { + val tab = catalog.getTableMetadata(table) + val badColumns = partSpec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains) + if (badColumns.nonEmpty) { + throw new AnalysisException( + s"Non-partitioned column(s) [${badColumns.mkString(", ")}] are " + + s"specified for DESCRIBE command") + } } - - result + val results = + sparkSession.sessionState.catalog.describeTable(table, partSpec, colPath, isExtended, output) + val rows = results.map { case (name, dataType, comment) => + Row(name, dataType, comment) + } + rows } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 7d0a3d9756e90..9b1b53a7f6fdf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.types._ - /** * Used to represent the operation of create table using a data source. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index be0f4d78a5237..b39c4a8e9efff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -757,4 +757,28 @@ class DDLCommandSuite extends PlanTest { comparePlans(parsed2, expected2) comparePlans(parsed3, expected3) } + + test("describe table") { + val parsed1 = parser.parsePlan("DESCRIBE tab1") + val expected1 = DescribeCommand(TableIdentifier("tab1", None), None, None, false) + val parsed2 = parser.parsePlan("DESCRIBE db1.tab1") + val expected2 = DescribeCommand(TableIdentifier("tab1", Some("db1")), None, None, false) + val parsed3 = parser.parsePlan("DESCRIBE tab1 col1") + val expected3 = DescribeCommand(TableIdentifier("tab1", None), None, Some("col1"), false) + val parsed4 = parser.parsePlan("DESCRIBE tab1 PARTITION (c1 = 'val1')") + val expected4 = DescribeCommand(TableIdentifier("tab1", None), + Some(Map("c1" -> "val1")), None, false) + val parsed5 = parser.parsePlan("DESCRIBE EXTENDED tab1 PARTITION (c1 = 'val1')") + val expected5 = DescribeCommand(TableIdentifier("tab1", None), + Some(Map("c1" -> "val1")), None, true) + val parsed6 = parser.parsePlan("DESCRIBE EXTENDED tab1 tab1.col1.field1.'$elem$'") + val expected6 = DescribeCommand(TableIdentifier("tab1", None), + None, Some("tab1.col1.field1.$elem$"), true) + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + comparePlans(parsed3, expected3) + comparePlans(parsed4, expected4) + comparePlans(parsed5, expected5) + comparePlans(parsed6, expected6) + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 456587e0e0816..4102db200f3af 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -29,7 +29,8 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, SessionCatalog} -import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} +import org.apache.spark.sql.catalyst.catalog.ExternalCatalog._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper @@ -66,6 +67,26 @@ private[sql] class HiveSessionCatalog( } } + /** + * Describes a table by returning various metadata pertaining to table/partitions/columns. + */ + override def describeTable( + table: TableIdentifier, + partSpec: Option[TablePartitionSpec], + colPath: Option[String], + isExtended: Boolean, + output: Seq[Attribute]): Seq[(String, String, String)] = { + val relation = lookupRelation(table) + relation match { + case r: MetastoreRelation => + val db = table.database.getOrElse(currentDb) + val tableName = formatTableName(table.table) + client.describeTable(db, tableName, partSpec, colPath, isExtended, output) + case o: LogicalPlan => + super.describeTable(table, partSpec, colPath, isExtended, output) + } + } + // ---------------------------------------------------------------- // | Methods and fields for interacting with HiveMetastoreCatalog | // ---------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 78ba2bfda6128..3b6e365305d5f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -478,6 +478,90 @@ private[hive] class HiveClientImpl( client.getTablesByPattern(dbName, pattern).asScala } + /** + * Describes a Hive table. + * The syntax of using this command in SQL is: + * {{{ + * DESCRIBE [EXTENDED|FORMATTED] [db_name.]table_name [column_name] [PARTITION partition_spec] + * }}} + * @param table The table to be described. + * @param partSpec spec If specified, the specified partition is described. + * @param colPath If specified, only the specified column is described. + * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false. + */ + override def describeTable( + db: String, + table: String, + partSpec: Option[TablePartitionSpec], + colPath: Option[String], + isExtended: Boolean, + output: Seq[Attribute]): Seq[(String, String, String)] = withHiveState { + + // Get partition columns or table columns based on the supplied partition spec. + def getCols(tab: HiveTable, part: Option[HivePartition]): Seq[FieldSchema] = { + if (!partSpec.isEmpty && tab.getTableType() != HiveTableType.VIRTUAL_VIEW) { + part.get.getCols.asScala + } else { + tab.getCols.asScala + } + } + // Formats the column metadata as per output schema. + def formatColumns(cols: Seq[FieldSchema]): Seq[(String, String, String)] = { + cols.map(field => (field.getName, field.getType, field.getComment)) + } + + var results: Seq[(String, String, String)] = Nil + // Get Table + val tab = + Option(client.getTable(db, table, false)). + getOrElse(throw new NoSuchTableException(db, table)) + + // Get Partition info + var part = partSpec.map(p => Option(client.getPartition(tab, p.asJava, false))).getOrElse(None) + if (partSpec.nonEmpty && part.isEmpty) { + throw new AnalysisException( + s"partition to describe '${partSpec.get}' does not exist" + + s" in table '$table' database '$db'") + } + + // Get columns if colPath is specified. + val cols = colPath.map { p => + val qualifiedColPath = if (!p.startsWith(table)) s"${table}.${p}" else p + try { + Hive.getFieldsFromDeserializer(qualifiedColPath, tab.getDeserializer(true)).asScala + } catch { + case e: Exception => throw new AnalysisException(e.getMessage) + } + }.getOrElse(getCols(tab, part)) + + + if (colPath.isEmpty) { + // describe all the columns in the table first + results ++= formatColumns(getCols(tab, part) ++ tab.getPartCols().asScala) + + // describe partition columns + val partitionColumns = tab.getPartCols.asScala + if (partitionColumns.nonEmpty) { + results ++= + Seq(("# Partition Information", "", "")) ++ + Seq((s"# ${output(0).name}", output(1).name, output(2).name)) ++ + formatColumns(partitionColumns) + } + + // describe additional table/parition details + if (isExtended) { + if (partSpec.isEmpty) { + results ++= Seq(("Detailed Table Information", tab.getTTable.toString, "")) + } else { + results ++= Seq(("Detailed Partition Information", part.get.getTPartition.toString, "")) + } + } + } else { + results ++= formatColumns(cols) + } + results + } + /** * Runs the specified SQL query using Hive. */ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index 8b3f2d1a0cd07..aa03ae6ea1b96 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -152,6 +152,26 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto } } + test("describe table - negative tests") { + val message1 = intercept[AnalysisException] { + sql("DESCRIBE badtable") + }.getMessage + assert(message1.contains("Table or View badtable not found")) + val message2 = intercept[AnalysisException] { + sql("DESCRIBE parquet_tab2 PARTITION (day=31)") + }.getMessage + assert(message2.contains("Non-partitioned column(s) [day] are specified")) + val message3 = intercept[AnalysisException] { + sql("DESCRIBE parquet_tab4 PARTITION (Year=2000, month='10')") + }.getMessage + assert(message3.contains("partition to describe 'Map(year -> 2000, month -> 10)'" + + " does not exist in table")) + val message4 = intercept[AnalysisException] { + sql("DESCRIBE parquet_tab2 invalidCol") + }.getMessage + assert(message4.contains("Error in getting fields from serde.Invalid Field invalidCol")) + } + test("LOAD DATA") { withTable("non_part_table", "part_table") { sql( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 3bf0e84267419..43f1b03e768b9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -819,6 +819,35 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { .collect() } + // Describe a column is a native command + assertResult(Array(Row("value", "string", "from deserializer"))) { + sql("DESCRIBE test_describe_commands1 value") + .select('col_name, 'data_type, 'comment) + .collect() + } + + // Describe a column is a native command + assertResult(Array(Row("value", "string", "from deserializer"))) { + sql("DESCRIBE default.test_describe_commands1 value") + .select('col_name, 'data_type, 'comment) + .collect() + } + + // Describe a partition is a native command + assertResult( + Array( + Row("key", "int"), + Row("value", "string"), + Row("dt", "string"), + Row("# Partition Information", ""), + Row("# col_name", "data_type"), + Row("dt", "string")) + ) { + sql("DESCRIBE test_describe_commands1 PARTITION (dt='2008-06-08')") + .select('col_name, 'data_type) + .collect() + } + // Describe a registered temporary table. val testData = TestHive.sparkContext.parallelize( From 4bfa25b7ab1da351e1806f75da02f6a015ca2856 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 19 Apr 2016 23:55:11 -0700 Subject: [PATCH 2/7] complex type --- .../spark/sql/catalyst/parser/SqlBase.g4 | 9 +++++++- .../spark/sql/execution/SparkSqlParser.scala | 14 +++++------- .../sql/hive/execution/HiveCommandSuite.scala | 22 +++++++++++++++++++ 3 files changed, 36 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 4d5d125ecdd7e..5afb2acd3ba6e 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -231,7 +231,7 @@ describeFuncName ; describeColName - : identifier ('.' (identifier | STRING))* + : identifier ('.' colpathIdentifier)* ; ctes @@ -454,6 +454,10 @@ tableIdentifier : (db=identifier '.')? table=identifier ; +colpathIdentifier + : identifier | ELEM_TYPE | KEY_TYPE | VALUE_TYPE + ; + namedExpression : expression (AS? (identifier | identifierList))? ; @@ -902,6 +906,9 @@ OPTION: 'OPTION'; ANTI: 'ANTI'; LOCAL: 'LOCAL'; INPATH: 'INPATH'; +KEY_TYPE: '$KEY$'; +VALUE_TYPE: '$VALUE$'; +ELEM_TYPE: '$ELEM$'; STRING : '\'' ( ~('\''|'\\') | ('\\' .) )* '\'' diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 35b8149f1727b..e4e360564327a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -349,17 +349,15 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { /** * A column path can be specified as an parameter to describe command. It is a dot separated - * elements where the last element can be a String. - * TODO - check with Herman + * list of identifiers with three special kinds of identifiers namely '$elem$', '$key$' and + * '$value$' which are used to represent array element, map key and values respectively. */ override def visitDescribeColName(ctx: DescribeColNameContext): String = { - var result = ctx.identifier.asScala.map ( _.getText).mkString(".") - val elementStr = ctx.STRING().asScala.map(c => string(c)).mkString(".") - if (elementStr.nonEmpty) { - result ++ "." + elementStr - } else { - result + var result = ctx.identifier.getText + if (ctx.colpathIdentifier != null) { + result = result ++ "." ++ ctx.colpathIdentifier.asScala.map { _.getText}.mkString(".") } + result } /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index aa03ae6ea1b96..f73b6c99c37fa 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -61,6 +61,7 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto |PARTITION(year = 2016, month = 4, hour = 10, minute = 10, sec = 10, extra = 1) SELECT 3, 3 """.stripMargin) sql("CREATE VIEW parquet_view1 as select * from parquet_tab4") + sql("create table tab_complex (col1 map , col2 struct )") } override protected def afterAll(): Unit = { @@ -71,6 +72,7 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto sql("DROP VIEW IF EXISTS parquet_view1") sql("DROP TABLE IF EXISTS parquet_tab4") sql("DROP TABLE IF EXISTS parquet_tab5") + sql("DROP TABLE IF EXISTS tab_complex") } finally { super.afterAll() } @@ -172,6 +174,26 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto assert(message4.contains("Error in getting fields from serde.Invalid Field invalidCol")) } + test("describe column - nested") { + checkAnswer( + sql("describe tab_complex col1.$key$"), + Row("$key$", "int", "from deserializer") :: Nil) + checkAnswer( + sql("describe tab_complex col1.$value$"), + Row("$value$", "string", "from deserializer") :: Nil) + checkAnswer( + sql("describe tab_complex col1"), + Row("col1", "map", "from deserializer") :: Nil) + checkAnswer( + sql("describe tab_complex col2.f1"), + Row("f1", "int", "from deserializer") :: Nil) + checkAnswer( + sql("describe tab_complex col2"), + Row("f1", "int", "from deserializer") :: + Row("f2", "string", "from deserializer") :: Nil + ) + } + test("LOAD DATA") { withTable("non_part_table", "part_table") { sql( From e9ca6a53ec31a44c219cba4e871d18ef48457d4b Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Wed, 20 Apr 2016 00:25:13 -0700 Subject: [PATCH 3/7] one more test --- .../spark/sql/hive/execution/HiveCommandSuite.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index f73b6c99c37fa..1147c09d4dc00 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -61,7 +61,11 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto |PARTITION(year = 2016, month = 4, hour = 10, minute = 10, sec = 10, extra = 1) SELECT 3, 3 """.stripMargin) sql("CREATE VIEW parquet_view1 as select * from parquet_tab4") - sql("create table tab_complex (col1 map , col2 struct )") + sql( + """ + |create table tab_complex (col1 map , + |col2 struct , col3 map>) + """.stripMargin) } override protected def afterAll(): Unit = { @@ -190,8 +194,10 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto checkAnswer( sql("describe tab_complex col2"), Row("f1", "int", "from deserializer") :: - Row("f2", "string", "from deserializer") :: Nil - ) + Row("f2", "string", "from deserializer") :: Nil) + checkAnswer( + sql("describe tab_complex col3.$value$.f3"), + Row("f3", "int", "from deserializer") :: Nil) } test("LOAD DATA") { From 8386f293336cf0b9fd1458de7d432d17c577cb8b Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Wed, 20 Apr 2016 08:48:12 -0700 Subject: [PATCH 4/7] fix --- .../scala/org/apache/spark/sql/execution/SparkSqlParser.scala | 2 +- .../apache/spark/sql/execution/command/DDLCommandSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index e4e360564327a..f61596f0b87f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -354,7 +354,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { */ override def visitDescribeColName(ctx: DescribeColNameContext): String = { var result = ctx.identifier.getText - if (ctx.colpathIdentifier != null) { + if (!ctx.colpathIdentifier.isEmpty) { result = result ++ "." ++ ctx.colpathIdentifier.asScala.map { _.getText}.mkString(".") } result diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index b39c4a8e9efff..b9c8f82318ee2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -771,7 +771,7 @@ class DDLCommandSuite extends PlanTest { val parsed5 = parser.parsePlan("DESCRIBE EXTENDED tab1 PARTITION (c1 = 'val1')") val expected5 = DescribeCommand(TableIdentifier("tab1", None), Some(Map("c1" -> "val1")), None, true) - val parsed6 = parser.parsePlan("DESCRIBE EXTENDED tab1 tab1.col1.field1.'$elem$'") + val parsed6 = parser.parsePlan("DESCRIBE EXTENDED tab1 tab1.col1.field1.$elem$") val expected6 = DescribeCommand(TableIdentifier("tab1", None), None, Some("tab1.col1.field1.$elem$"), true) comparePlans(parsed1, expected1) From be6c9ba2d518e25881d73396d637bbc4a4997ce2 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Wed, 20 Apr 2016 11:52:16 -0700 Subject: [PATCH 5/7] style --- .../scala/org/apache/spark/sql/execution/command/tables.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index feb0183622165..687160a2cf8c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -313,6 +313,7 @@ case class DescribeTableCommand( s"specified for DESCRIBE command") } } + val results = sparkSession.sessionState.catalog.describeTable(table, partSpec, colPath, isExtended, output) val rows = results.map { case (name, dataType, comment) => From 30efbce9c750c7b6bef250c70a42f5aee6ea7faa Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Wed, 20 Apr 2016 23:57:45 -0700 Subject: [PATCH 6/7] fix after rebase --- .../apache/spark/sql/execution/SparkSqlParser.scala | 3 ++- .../apache/spark/sql/execution/command/tables.scala | 8 +++----- .../apache/spark/sql/execution/datasources/ddl.scala | 1 + .../sql/execution/command/DDLCommandSuite.scala | 12 ++++++------ 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index f61596f0b87f1..04e6a0f7fc8d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema} import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} @@ -237,7 +238,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } else { val partitionKeys = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec) val columnPath = Option(ctx.describeColName).map(visitDescribeColName) - DescribeCommand( + DescribeTableCommand( visitTableIdentifier(ctx.tableIdentifier), partitionKeys, columnPath, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 687160a2cf8c0..cebf52e666155 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -20,12 +20,10 @@ package org.apache.spark.sql.execution.command import java.io.File import java.net.URI -import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec - -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalog} +import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} @@ -313,7 +311,7 @@ case class DescribeTableCommand( s"specified for DESCRIBE command") } } - + val results = sparkSession.sessionState.catalog.describeTable(table, partSpec, colPath, isExtended, output) val rows = results.map { case (name, dataType, comment) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 9b1b53a7f6fdf..7d0a3d9756e90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.types._ + /** * Used to represent the operation of create table using a data source. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index b9c8f82318ee2..71352070be0c1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -760,19 +760,19 @@ class DDLCommandSuite extends PlanTest { test("describe table") { val parsed1 = parser.parsePlan("DESCRIBE tab1") - val expected1 = DescribeCommand(TableIdentifier("tab1", None), None, None, false) + val expected1 = DescribeTableCommand(TableIdentifier("tab1", None), None, None, false) val parsed2 = parser.parsePlan("DESCRIBE db1.tab1") - val expected2 = DescribeCommand(TableIdentifier("tab1", Some("db1")), None, None, false) + val expected2 = DescribeTableCommand(TableIdentifier("tab1", Some("db1")), None, None, false) val parsed3 = parser.parsePlan("DESCRIBE tab1 col1") - val expected3 = DescribeCommand(TableIdentifier("tab1", None), None, Some("col1"), false) + val expected3 = DescribeTableCommand(TableIdentifier("tab1", None), None, Some("col1"), false) val parsed4 = parser.parsePlan("DESCRIBE tab1 PARTITION (c1 = 'val1')") - val expected4 = DescribeCommand(TableIdentifier("tab1", None), + val expected4 = DescribeTableCommand(TableIdentifier("tab1", None), Some(Map("c1" -> "val1")), None, false) val parsed5 = parser.parsePlan("DESCRIBE EXTENDED tab1 PARTITION (c1 = 'val1')") - val expected5 = DescribeCommand(TableIdentifier("tab1", None), + val expected5 = DescribeTableCommand(TableIdentifier("tab1", None), Some(Map("c1" -> "val1")), None, true) val parsed6 = parser.parsePlan("DESCRIBE EXTENDED tab1 tab1.col1.field1.$elem$") - val expected6 = DescribeCommand(TableIdentifier("tab1", None), + val expected6 = DescribeTableCommand(TableIdentifier("tab1", None), None, Some("tab1.col1.field1.$elem$"), true) comparePlans(parsed1, expected1) comparePlans(parsed2, expected2) From 319d45ba2a46e317b6576160284d80df9f73c023 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Fri, 29 Apr 2016 18:24:34 -0700 Subject: [PATCH 7/7] DescribeTable based on CatalogTable --- .../sql/catalyst/catalog/SessionCatalog.scala | 17 -- .../sql/catalyst/catalog/interface.scala | 8 + .../spark/sql/execution/command/tables.scala | 178 +++++++++++++++--- .../spark/sql/hive/HiveSessionCatalog.scala | 23 +-- .../sql/hive/client/HiveClientImpl.scala | 84 --------- .../sql/hive/execution/HiveCommandSuite.scala | 21 +-- 6 files changed, 176 insertions(+), 155 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 7887c977da6c7..acc9bbf9146fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -320,23 +320,6 @@ class SessionCatalog( alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable) } - /** - * Describes a table by returning various metadata pertaining to table/partitions/columns. - */ - def describeTable( - table: TableIdentifier, - partSpec: Option[TablePartitionSpec], - colPath: Option[String], - isExtended: Boolean, - output: Seq[Attribute]): Seq[(String, String, String)] = { - val relation = lookupRelation(table) - relation.schema.fields.map { field => - val cmtKey = "comment" - val comment = if (field.metadata.contains(cmtKey)) field.metadata.getString(cmtKey) else "" - (field.name, field.dataType.simpleString, comment) - } - } - /** * Return whether a table with the specified name exists. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 5efaf8f2010f2..eb2d6d9a42975 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -110,6 +110,14 @@ case class CatalogTable( def partitionColumns: Seq[CatalogColumn] = schema.filter { c => partitionColumnNames.contains(c.name) } + /** Columns this table is bucketed by. */ + def bucketColumns: Seq[CatalogColumn] = + schema.filter { c => bucketColumnNames.contains(c.name) } + + /** Columns this table is sorted by. */ + def sortColumns: Seq[CatalogColumn] = + schema.filter { c => sortColumnNames.contains(c.name) } + /** Return the database this table was specified to belong to, assuming it exists. */ def database: String = identifier.database.getOrElse { throw new AnalysisException(s"table $identifier did not specify database") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index cebf52e666155..d876021e40731 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -20,15 +20,17 @@ package org.apache.spark.sql.execution.command import java.io.File import java.net.URI +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalog} -import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode} -import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType} +import org.apache.spark.sql.execution.datasources.PartitioningUtils +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils case class CreateTableAsSelectLogicalPlan( @@ -274,6 +276,7 @@ case class LoadData( * {{{ * DESCRIBE [EXTENDED|FORMATTED] [db_name.]table_name [column_name] [PARTITION partition_spec] * }}} + * Note : FORMATTED option is not supported. * @param table table to be described. * @param partSpec spec If specified, the specified partition is described. It is effective only * when the table is a Hive table @@ -289,7 +292,7 @@ case class DescribeTableCommand( isExtended: Boolean) extends RunnableCommand { - override val output: Seq[Attribute] = Seq( + override val output: Seq[Attribute] = Seq( // Column names are based on Hive. AttributeReference("col_name", StringType, nullable = false, new MetadataBuilder().putString("comment", "name of the column").build())(), @@ -299,28 +302,161 @@ case class DescribeTableCommand( new MetadataBuilder().putString("comment", "comment of the column").build())() ) - override def run(sparkSession: SparkSession): Seq[Row] = { - val catalog = sparkSession.sessionState.catalog - // Check to make sure supplied partition are valid partition columns. . - if (partSpec.isDefined && !catalog.isTemporaryTable(table)) { - val tab = catalog.getTableMetadata(table) - val badColumns = partSpec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains) - if (badColumns.nonEmpty) { - throw new AnalysisException( - s"Non-partitioned column(s) [${badColumns.mkString(", ")}] are " + - s"specified for DESCRIBE command") + private def formatColumns(cols: Seq[CatalogColumn]): String = { + cols.map { col => + s""" + |${col.getClass.getSimpleName} + |(name:${col.name} + |type:${col.dataType} + |comment:${col.comment.orNull} + """.stripMargin + }.mkString(",") + } + + private def formatProperties(props: Map[String, String]): String = { + props.map { + case (k, v) => s"$k=$v" + }.mkString("{", ", ", "}") + } + + private def getPartValues(part: CatalogTablePartition, cols: Seq[String]): String = { + cols.map { name => + PartitioningUtils.escapePathName(part.spec(name)) + }.mkString(", ") + } + + private def descColPath(table: CatalogTable, colPath: String): Array[Row] = { + val names = colPath.split("\\."); + val lastName = names(names.length - 1) + val fields = table.schema.map {c => + StructField(c.name, CatalystSqlParser.parseDataType(c.dataType), c.nullable) + } + var dataType: DataType = StructType(fields) + for (i <- 0 to names.length -1) { + dataType match { + case s: StructType => + try { + dataType = s.apply(names(i)).dataType + } catch { + case e: Exception => + throw new AnalysisException(s"Column name/path: ${colPath} does not exist.") + } + case m: MapType if names(i) == "$key$" => dataType = m.keyType + case m: MapType if names(i) == "$value$" => dataType = m.valueType + case a: ArrayType if names(i) == "$value$" => dataType = a.elementType + case _ => throw new AnalysisException("Column name/path: ${colPath} does not exist") } } - val results = - sparkSession.sessionState.catalog.describeTable(table, partSpec, colPath, isExtended, output) - val rows = results.map { case (name, dataType, comment) => - Row(name, dataType, comment) + val result: Seq[Row] = dataType match { + case s: StructType => + s.map { f => + Row(f.name, f.dataType.simpleString, "from deserializer")} + case d: DataType => Seq(Row(lastName, dataType.simpleString, "from deserializer")) } - rows + result.toArray + } + + private def descStorageFormat( + table: CatalogTable, + storage: CatalogStorageFormat): String = { + // TODO - check with Lian - from StorageDesc - compress, skewedInfo, storedAsSubDirectories + // are not availble. So these are dropped from the output. + val storageLocationStr = + s""" + |${storage.getClass.getSimpleName}(location:${storage.locationUri.orNull}, + | inputFormat:${storage.inputFormat.orNull}, + | outputFormat:${storage.outputFormat.orNull}, + | numBuckets:${table.numBuckets}, + | serializationLib=${storage.serde.orNull}, + | parameters=${formatProperties(storage.serdeProperties)}, + | bucketCols:[${formatColumns(table.bucketColumns)}], + | sortCols=[${formatColumns(table.sortColumns)}]) + """.stripMargin.replaceAll("\n", "").trim + storageLocationStr + } + + private def descPartExtended(table: CatalogTable, part: CatalogTablePartition): String = { + val result = StringBuilder.newBuilder + val clsName = part.getClass.getSimpleName + result ++= s"${clsName}(values:[${getPartValues(part, table.partitionColumnNames)}], " + result ++= s"dbName:${table.database}, " + // TODO - check with Lian - no owner info available. + result ++= s"createTime:${table.createTime}, " + result ++= s"lastAccessTime:${table.lastAccessTime}, " + // TODO - check with Lian - no retention info available. + + result ++= s"sd:${descStorageFormat(table, part.storage)}, " + // TODO Check with Lian - Hive prints partition keys here. Since we output paritioning keys and + // schema already at the start i don't output it here again. + result ++= s"parameters:${formatProperties(table.properties)}, " + result ++= s"viewOriginalText:${table.viewOriginalText.orNull}, " + result ++= s"viewExpandedText:${table.viewText.orNull}, " + result ++= s"tableType:${table.tableType})" + result.toString } -} + private def descTableExtended(table: CatalogTable): String = { + val result = StringBuilder.newBuilder + result ++= s"${table.getClass.getSimpleName}(tableName:${table.identifier.table}, " + result ++= s"dbName:${table.database}, " + // TODO - check with Lian - no owner info available. + result ++= s"createTime:${table.createTime}, " + result ++= s"lastAccessTime:${table.lastAccessTime}, " + // TODO - check with Lian - no retention info available. + + result ++= s"sd:${descStorageFormat(table, table.storage)}, " + // TODO Check with Lian - Hive prints partition keys here. Since we output paritioning keys + // and schema already i don't output it here again. + result ++= s"parameters:${formatProperties(table.properties)}, " + result ++= s"viewOriginalText:${table.viewOriginalText.orNull}, " + result ++= s"viewExpandedText:${table.viewText.orNull}, " + result ++= s"tableType:${table.tableType})" + result.toString + } + + override def run(sparkSession: SparkSession): Seq[Row] = { + val result = new ArrayBuffer[Row] + val catalog = sparkSession.sessionState.catalog + catalog.lookupRelation(table) match { + case catalogRelation: CatalogRelation => + val tab = catalogRelation.catalogTable + val part = partSpec.map(p => Option(catalog.getPartition(table, p))).getOrElse(None) + if (colPath.nonEmpty) { + result ++= descColPath(tab, colPath.get) + } else { + catalogRelation.catalogTable.schema.foreach { column => + result += Row(column.name, column.dataType, column.comment.orNull) + } + if (tab.partitionColumns.nonEmpty) { + result += Row("# Partition Information", "", "") + result += Row(s"# ${output(0).name}", output(1).name, output(2).name) + + tab.partitionColumns.foreach { col => + result += Row(col.name, col.dataType, col.comment.orNull) + } + } + if (isExtended) { + if (partSpec.isEmpty) { + result += Row("Detailed Table Information", descTableExtended(tab), "") + } else { + result += + Row("Detailed Partition Information", descPartExtended(tab, part.get), "") + } + } + } + + case relation => + relation.schema.fields.foreach { field => + val comment = + if (field.metadata.contains("comment")) field.metadata.getString("comment") else "" + result += Row(field.name, field.dataType.simpleString, comment) + } + } + + result + } +} /** * A command for users to get tables in the given database. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 4102db200f3af..456587e0e0816 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -29,8 +29,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, SessionCatalog} -import org.apache.spark.sql.catalyst.catalog.ExternalCatalog._ -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExpressionInfo} +import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper @@ -67,26 +66,6 @@ private[sql] class HiveSessionCatalog( } } - /** - * Describes a table by returning various metadata pertaining to table/partitions/columns. - */ - override def describeTable( - table: TableIdentifier, - partSpec: Option[TablePartitionSpec], - colPath: Option[String], - isExtended: Boolean, - output: Seq[Attribute]): Seq[(String, String, String)] = { - val relation = lookupRelation(table) - relation match { - case r: MetastoreRelation => - val db = table.database.getOrElse(currentDb) - val tableName = formatTableName(table.table) - client.describeTable(db, tableName, partSpec, colPath, isExtended, output) - case o: LogicalPlan => - super.describeTable(table, partSpec, colPath, isExtended, output) - } - } - // ---------------------------------------------------------------- // | Methods and fields for interacting with HiveMetastoreCatalog | // ---------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 3b6e365305d5f..78ba2bfda6128 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -478,90 +478,6 @@ private[hive] class HiveClientImpl( client.getTablesByPattern(dbName, pattern).asScala } - /** - * Describes a Hive table. - * The syntax of using this command in SQL is: - * {{{ - * DESCRIBE [EXTENDED|FORMATTED] [db_name.]table_name [column_name] [PARTITION partition_spec] - * }}} - * @param table The table to be described. - * @param partSpec spec If specified, the specified partition is described. - * @param colPath If specified, only the specified column is described. - * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false. - */ - override def describeTable( - db: String, - table: String, - partSpec: Option[TablePartitionSpec], - colPath: Option[String], - isExtended: Boolean, - output: Seq[Attribute]): Seq[(String, String, String)] = withHiveState { - - // Get partition columns or table columns based on the supplied partition spec. - def getCols(tab: HiveTable, part: Option[HivePartition]): Seq[FieldSchema] = { - if (!partSpec.isEmpty && tab.getTableType() != HiveTableType.VIRTUAL_VIEW) { - part.get.getCols.asScala - } else { - tab.getCols.asScala - } - } - // Formats the column metadata as per output schema. - def formatColumns(cols: Seq[FieldSchema]): Seq[(String, String, String)] = { - cols.map(field => (field.getName, field.getType, field.getComment)) - } - - var results: Seq[(String, String, String)] = Nil - // Get Table - val tab = - Option(client.getTable(db, table, false)). - getOrElse(throw new NoSuchTableException(db, table)) - - // Get Partition info - var part = partSpec.map(p => Option(client.getPartition(tab, p.asJava, false))).getOrElse(None) - if (partSpec.nonEmpty && part.isEmpty) { - throw new AnalysisException( - s"partition to describe '${partSpec.get}' does not exist" + - s" in table '$table' database '$db'") - } - - // Get columns if colPath is specified. - val cols = colPath.map { p => - val qualifiedColPath = if (!p.startsWith(table)) s"${table}.${p}" else p - try { - Hive.getFieldsFromDeserializer(qualifiedColPath, tab.getDeserializer(true)).asScala - } catch { - case e: Exception => throw new AnalysisException(e.getMessage) - } - }.getOrElse(getCols(tab, part)) - - - if (colPath.isEmpty) { - // describe all the columns in the table first - results ++= formatColumns(getCols(tab, part) ++ tab.getPartCols().asScala) - - // describe partition columns - val partitionColumns = tab.getPartCols.asScala - if (partitionColumns.nonEmpty) { - results ++= - Seq(("# Partition Information", "", "")) ++ - Seq((s"# ${output(0).name}", output(1).name, output(2).name)) ++ - formatColumns(partitionColumns) - } - - // describe additional table/parition details - if (isExtended) { - if (partSpec.isEmpty) { - results ++= Seq(("Detailed Table Information", tab.getTTable.toString, "")) - } else { - results ++= Seq(("Detailed Partition Information", part.get.getTPartition.toString, "")) - } - } - } else { - results ++= formatColumns(cols) - } - results - } - /** * Runs the specified SQL query using Hive. */ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index 1147c09d4dc00..b843a06de3c67 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton} +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, NoSuchTableException} +import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { @@ -63,8 +63,8 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto sql("CREATE VIEW parquet_view1 as select * from parquet_tab4") sql( """ - |create table tab_complex (col1 map , - |col2 struct , col3 map>) + |CREATE TABLE tab_complex (col1 map , + |col2 struct , col3 map>) """.stripMargin) } @@ -159,23 +159,22 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto } test("describe table - negative tests") { - val message1 = intercept[AnalysisException] { - sql("DESCRIBE badtable") + val message1 = intercept[NoSuchTableException] { + sql("DESCRIBE bad_table") }.getMessage - assert(message1.contains("Table or View badtable not found")) + assert(message1.contains("Table or View bad_table not found")) val message2 = intercept[AnalysisException] { sql("DESCRIBE parquet_tab2 PARTITION (day=31)") }.getMessage - assert(message2.contains("Non-partitioned column(s) [day] are specified")) + assert(message2.contains("table is not partitioned but partition spec exists: {day=31}")) val message3 = intercept[AnalysisException] { sql("DESCRIBE parquet_tab4 PARTITION (Year=2000, month='10')") }.getMessage - assert(message3.contains("partition to describe 'Map(year -> 2000, month -> 10)'" + - " does not exist in table")) + assert(message3.contains("Partition not found in table parquet_tab4 database default")) val message4 = intercept[AnalysisException] { sql("DESCRIBE parquet_tab2 invalidCol") }.getMessage - assert(message4.contains("Error in getting fields from serde.Invalid Field invalidCol")) + assert(message4.contains("Column name/path: invalidCol does not exist")) } test("describe column - nested") {