From ecec69b013a40b6f9878d67949be6eb5a493464b Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 21 Jun 2022 19:11:38 +0300 Subject: [PATCH 1/9] Unify v1 and v2 DESCRIBE TABLE --- .../datasources/v2/DescribeTableExec.scala | 10 ++++---- .../command/DescribeTableSuiteBase.scala | 19 ++++++++++++++- .../command/v1/DescribeTableSuite.scala | 17 ------------- .../command/v2/DescribeTableSuite.scala | 24 ++----------------- 4 files changed, 24 insertions(+), 46 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala index 6cca0a7722228..dc3d6c70b2eb5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala @@ -31,9 +31,9 @@ case class DescribeTableExec( override protected def run(): Seq[InternalRow] = { val rows = new ArrayBuffer[InternalRow]() addSchema(rows) - addPartitioning(rows) if (isExtended) { + addPartitioning(rows) addMetadataColumns(rows) addTableDetails(rows) } @@ -62,7 +62,7 @@ case class DescribeTableExec( private def addSchema(rows: ArrayBuffer[InternalRow]): Unit = { rows ++= table.schema.map{ column => toCatalystRow( - column.name, column.dataType.simpleString, column.getComment().getOrElse("")) + column.name, column.dataType.simpleString, column.getComment().orNull) } } @@ -81,10 +81,8 @@ case class DescribeTableExec( private def addPartitioning(rows: ArrayBuffer[InternalRow]): Unit = { rows += emptyRow() - rows += toCatalystRow("# Partitioning", "", "") - if (table.partitioning.isEmpty) { - rows += toCatalystRow("Not partitioned", "", "") - } else { + if (!table.partitioning.isEmpty) { + rows += toCatalystRow("# Partitioning", "", "") rows ++= table.partitioning.zipWithIndex.map { case (transform, index) => toCatalystRow(s"Part $index", transform.describe(), "") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala index 0cf062fb34e60..b255934370b98 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.command -import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType, StructType} /** @@ -44,6 +44,23 @@ trait DescribeTableSuiteBase extends QueryTest with DDLCommandTestUtils { } } + test("DESCRIBE TABLE with non-'partitioned-by' clause") { + withNamespaceAndTable("ns", "table") { tbl => + spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing") + val descriptionDf = spark.sql(s"DESCRIBE TABLE $tbl") + assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === + Seq( + ("col_name", StringType), + ("data_type", StringType), + ("comment", StringType))) + QueryTest.checkAnswer( + descriptionDf, + Seq( + Row("data", "string", null), + Row("id", "bigint", null))) + } + } + test("SPARK-34561: drop/add columns to a dataset of `DESCRIBE TABLE`") { withNamespaceAndTable("ns", "table") { tbl => sql(s"CREATE TABLE $tbl (c0 INT) $defaultUsing") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala index 01b7aefdd7864..7eae8d2d248e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala @@ -33,23 +33,6 @@ import org.apache.spark.sql.types.StringType trait DescribeTableSuiteBase extends command.DescribeTableSuiteBase with command.TestsV1AndV2Commands { - test("DESCRIBE TABLE with non-'partitioned-by' clause") { - withNamespaceAndTable("ns", "table") { tbl => - spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing") - val descriptionDf = spark.sql(s"DESCRIBE TABLE $tbl") - assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === - Seq( - ("col_name", StringType), - ("data_type", StringType), - ("comment", StringType))) - QueryTest.checkAnswer( - descriptionDf, - Seq( - Row("data", "string", null), - Row("id", "bigint", null))) - } - } - test("Describing a partition is not supported") { withNamespaceAndTable("ns", "table") { tbl => spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala index ee614b87718c7..234b79b98cb58 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala @@ -28,26 +28,6 @@ import org.apache.spark.util.Utils */ class DescribeTableSuite extends command.DescribeTableSuiteBase with CommandSuiteBase { - test("DESCRIBE TABLE with non-'partitioned-by' clause") { - withNamespaceAndTable("ns", "table") { tbl => - spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing") - val descriptionDf = spark.sql(s"DESCRIBE TABLE $tbl") - assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === - Seq( - ("col_name", StringType), - ("data_type", StringType), - ("comment", StringType))) - QueryTest.checkAnswer( - descriptionDf, - Seq( - Row("data", "string", ""), - Row("id", "bigint", ""), - Row("", "", ""), - Row("# Partitioning", "", ""), - Row("Not partitioned", "", ""))) - } - } - test("Describing a partition is not supported") { withNamespaceAndTable("ns", "table") { tbl => spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing " + @@ -74,8 +54,8 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase with CommandSuit QueryTest.checkAnswer( descriptionDf, Seq( - Row("id", "bigint", ""), - Row("data", "string", ""), + Row("id", "bigint", null), + Row("data", "string", null), Row("", "", ""), Row("# Partitioning", "", ""), Row("Part 0", "id", ""), From 4ffa242e1f32892f3142a6136b7bedc92900c821 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 21 Jun 2022 21:23:47 +0300 Subject: [PATCH 2/9] Fix test title --- .../spark/sql/execution/command/v1/DescribeTableSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala index 7eae8d2d248e8..be1a67b494dec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.types.StringType trait DescribeTableSuiteBase extends command.DescribeTableSuiteBase with command.TestsV1AndV2Commands { - test("Describing a partition is not supported") { + test("Describing of a non-existent partition") { withNamespaceAndTable("ns", "table") { tbl => spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing " + "PARTITIONED BY (id)") From c17b346b194fd2273a7a32e832d1ebe36d855ae5 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 21 Jun 2022 22:19:12 +0300 Subject: [PATCH 3/9] Fix test failures of v2 DESCRIBE TABLE --- .../sql/execution/datasources/v2/DescribeTableExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala index dc3d6c70b2eb5..9ae8f74df71b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala @@ -31,9 +31,9 @@ case class DescribeTableExec( override protected def run(): Seq[InternalRow] = { val rows = new ArrayBuffer[InternalRow]() addSchema(rows) + addPartitioning(rows) if (isExtended) { - addPartitioning(rows) addMetadataColumns(rows) addTableDetails(rows) } @@ -80,8 +80,8 @@ case class DescribeTableExec( } private def addPartitioning(rows: ArrayBuffer[InternalRow]): Unit = { - rows += emptyRow() if (!table.partitioning.isEmpty) { + rows += emptyRow() rows += toCatalystRow("# Partitioning", "", "") rows ++= table.partitioning.zipWithIndex.map { case (transform, index) => toCatalystRow(s"Part $index", transform.describe(), "") From 2224caad666c059fc36e11a3bc35634dbc25194b Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 22 Jun 2022 10:34:15 +0300 Subject: [PATCH 4/9] Introduce getProvider() --- .../sql/execution/command/DescribeTableSuiteBase.scala | 2 +- .../spark/sql/execution/command/v1/DescribeTableSuite.scala | 6 +++++- .../sql/hive/execution/command/DescribeTableSuite.scala | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala index b255934370b98..559247c85e1d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala @@ -44,7 +44,7 @@ trait DescribeTableSuiteBase extends QueryTest with DDLCommandTestUtils { } } - test("DESCRIBE TABLE with non-'partitioned-by' clause") { + test("DESCRIBE TABLE of a non-partitioned table") { withNamespaceAndTable("ns", "table") { tbl => spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing") val descriptionDf = spark.sql(s"DESCRIBE TABLE $tbl") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala index be1a67b494dec..9ea0d4bacd297 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.command.v1 +import java.util.Locale + import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.execution.command import org.apache.spark.sql.types.StringType @@ -33,6 +35,8 @@ import org.apache.spark.sql.types.StringType trait DescribeTableSuiteBase extends command.DescribeTableSuiteBase with command.TestsV1AndV2Commands { + def getProvider(): String = defaultUsing.stripPrefix("USING").trim.toLowerCase(Locale.ROOT) + test("Describing of a non-existent partition") { withNamespaceAndTable("ns", "table") { tbl => spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing " + @@ -79,7 +83,7 @@ class DescribeTableSuite extends DescribeTableSuiteBase with CommandSuiteBase { Row("Last Access", "UNKNOWN", ""), Row("Created By", "Spark 3.4.0-SNAPSHOT", ""), Row("Type", "EXTERNAL", ""), - Row("Provider", "parquet", ""), + Row("Provider", getProvider(), ""), Row("Comment", "this is a test table", ""), Row("Table Properties", "[bar=baz]", ""), Row("Location", "file:/tmp/testcat/table_name", ""), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DescribeTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DescribeTableSuite.scala index 455a2c8a307ec..783f12dd81d6a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DescribeTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DescribeTableSuite.scala @@ -72,7 +72,7 @@ class DescribeTableSuite extends v1.DescribeTableSuiteBase with CommandSuiteBase Row("Last Access", "UNKNOWN", ""), Row("Created By", "Spark 3.4.0-SNAPSHOT", ""), Row("Type", "EXTERNAL", ""), - Row("Provider", "hive", ""), + Row("Provider", getProvider(), ""), Row("Comment", "this is a test table", ""), Row("Location", "file:/tmp/testcat/table_name", ""), Row("Serde Library", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", ""), From 227b99ff724ee2da2962e0a9c7696fa74c044920 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 22 Jun 2022 10:38:22 +0300 Subject: [PATCH 5/9] Add the test "DESCRIBE TABLE of a partitioned table" --- .../command/v1/DescribeTableSuite.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala index 9ea0d4bacd297..33b6fdc17e47a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala @@ -47,6 +47,25 @@ trait DescribeTableSuiteBase extends command.DescribeTableSuiteBase assert(e.message === "Partition not found in table 'table' database 'ns':\nid -> 1") } } + + test("DESCRIBE TABLE of a partitioned table") { + withNamespaceAndTable("ns", "table") { tbl => + spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing PARTITIONED BY (id)") + val descriptionDf = spark.sql(s"DESCRIBE TABLE $tbl") + assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === Seq( + ("col_name", StringType), + ("data_type", StringType), + ("comment", StringType))) + QueryTest.checkAnswer( + descriptionDf.filter("col_name != 'Created Time'"), + Seq( + Row("data", "string", null), + Row("id", "bigint", null), + Row("# Partition Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("id", "bigint", null))) + } + } } /** From 699e72e1da5eeeb7baa96e5263e36a5aa6ae950b Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 22 Jun 2022 11:23:25 +0300 Subject: [PATCH 6/9] Support describing of a partitioned table --- .../datasources/v2/DescribeTableExec.scala | 25 +++++++++++++++---- .../command/DescribeTableSuiteBase.scala | 19 ++++++++++++++ .../command/v1/DescribeTableSuite.scala | 19 -------------- .../command/v2/DescribeTableSuite.scala | 6 ++--- 4 files changed, 42 insertions(+), 27 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala index 9ae8f74df71b8..ec0694f439847 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsMetadataColumns, Table} +import org.apache.spark.sql.connector.expressions.IdentityTransform case class DescribeTableExec( output: Seq[Attribute], @@ -80,11 +81,25 @@ case class DescribeTableExec( } private def addPartitioning(rows: ArrayBuffer[InternalRow]): Unit = { - if (!table.partitioning.isEmpty) { - rows += emptyRow() - rows += toCatalystRow("# Partitioning", "", "") - rows ++= table.partitioning.zipWithIndex.map { - case (transform, index) => toCatalystRow(s"Part $index", transform.describe(), "") + if (table.partitioning.nonEmpty) { + val partitionColumnsOnly = table.partitioning.forall(t => t.isInstanceOf[IdentityTransform]) + if (partitionColumnsOnly) { + rows += toCatalystRow("# Partition Information", "", "") + rows += toCatalystRow(s"# ${output(0).name}", output(1).name, output(2).name) + val nameToField = table.schema.map(f => (f.name, f)).toMap + rows ++= table.partitioning + .map(_.asInstanceOf[IdentityTransform]) + .flatMap(_.ref.fieldNames()) + .map { name => + val field = nameToField(name) + toCatalystRow(name, field.dataType.simpleString, field.getComment().orNull) + } + } else { + rows += emptyRow() + rows += toCatalystRow("# Partitioning", "", "") + rows ++= table.partitioning.zipWithIndex.map { + case (transform, index) => toCatalystRow(s"Part $index", transform.describe(), "") + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala index 559247c85e1d9..7ecc38848cf2b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala @@ -61,6 +61,25 @@ trait DescribeTableSuiteBase extends QueryTest with DDLCommandTestUtils { } } + test("DESCRIBE TABLE of a partitioned table") { + withNamespaceAndTable("ns", "table") { tbl => + spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing PARTITIONED BY (id)") + val descriptionDf = spark.sql(s"DESCRIBE TABLE $tbl") + assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === Seq( + ("col_name", StringType), + ("data_type", StringType), + ("comment", StringType))) + QueryTest.checkAnswer( + descriptionDf.filter("col_name != 'Created Time'"), + Seq( + Row("data", "string", null), + Row("id", "bigint", null), + Row("# Partition Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("id", "bigint", null))) + } + } + test("SPARK-34561: drop/add columns to a dataset of `DESCRIBE TABLE`") { withNamespaceAndTable("ns", "table") { tbl => sql(s"CREATE TABLE $tbl (c0 INT) $defaultUsing") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala index 33b6fdc17e47a..9ea0d4bacd297 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala @@ -47,25 +47,6 @@ trait DescribeTableSuiteBase extends command.DescribeTableSuiteBase assert(e.message === "Partition not found in table 'table' database 'ns':\nid -> 1") } } - - test("DESCRIBE TABLE of a partitioned table") { - withNamespaceAndTable("ns", "table") { tbl => - spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing PARTITIONED BY (id)") - val descriptionDf = spark.sql(s"DESCRIBE TABLE $tbl") - assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === Seq( - ("col_name", StringType), - ("data_type", StringType), - ("comment", StringType))) - QueryTest.checkAnswer( - descriptionDf.filter("col_name != 'Created Time'"), - Seq( - Row("data", "string", null), - Row("id", "bigint", null), - Row("# Partition Information", "", ""), - Row("# col_name", "data_type", "comment"), - Row("id", "bigint", null))) - } - } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala index 234b79b98cb58..473c921a965dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala @@ -56,9 +56,9 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase with CommandSuit Seq( Row("id", "bigint", null), Row("data", "string", null), - Row("", "", ""), - Row("# Partitioning", "", ""), - Row("Part 0", "id", ""), + Row("# Partition Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("id", "bigint", null), Row("", "", ""), Row("# Metadata Columns", "", ""), Row("index", "int", "Metadata column used to conflict with a data column"), From c75e0906d6f24d209433967d1774e3fdb99724c6 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 22 Jun 2022 20:48:09 +0300 Subject: [PATCH 7/9] Fix v2 impl for partitioning by nested columns --- .../datasources/v2/DescribeTableExec.scala | 14 ++++++++------ .../command/v2/DescribeTableSuite.scala | 17 +++++++++++++++++ 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala index ec0694f439847..1702c62c7328f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala @@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.util.quoteIfNeeded import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsMetadataColumns, Table} import org.apache.spark.sql.connector.expressions.IdentityTransform @@ -86,13 +87,14 @@ case class DescribeTableExec( if (partitionColumnsOnly) { rows += toCatalystRow("# Partition Information", "", "") rows += toCatalystRow(s"# ${output(0).name}", output(1).name, output(2).name) - val nameToField = table.schema.map(f => (f.name, f)).toMap rows ++= table.partitioning - .map(_.asInstanceOf[IdentityTransform]) - .flatMap(_.ref.fieldNames()) - .map { name => - val field = nameToField(name) - toCatalystRow(name, field.dataType.simpleString, field.getComment().orNull) + .map(_.asInstanceOf[IdentityTransform].ref.fieldNames()) + .flatMap(table.schema.findNestedField(_)) + .map { case (path, field) => + toCatalystRow( + (path :+ field.name).map(quoteIfNeeded(_)).mkString("."), + field.dataType.simpleString, + field.getComment().orNull) } } else { rows += emptyRow() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala index 473c921a965dd..7b44fa59e8174 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala @@ -39,6 +39,23 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase with CommandSuit } } + test("DESCRIBE TABLE of a partitioned table by a nested column") { + withNamespaceAndTable("ns", "table") { tbl => + sql(s"CREATE TABLE $tbl (s struct, data string) " + + s"$defaultUsing PARTITIONED BY (s.id, s.a)") + val descriptionDf = sql(s"DESCRIBE TABLE $tbl") + QueryTest.checkAnswer( + descriptionDf.filter("col_name != 'Created Time'"), + Seq( + Row("data", "string", null), + Row("s", "struct", null), + Row("# Partition Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("s.id", "int", null), + Row("s.a", "bigint", null))) + } + } + test("DESCRIBE TABLE EXTENDED of a partitioned table") { withNamespaceAndTable("ns", "table") { tbl => spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing" + From 39ed298a5e6337a0bb85c336395558d52e471f58 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 22 Jun 2022 21:24:54 +0300 Subject: [PATCH 8/9] Output Type for v2 tables --- .../datasources/v2/DescribeTableExec.scala | 21 +++++++++++++------ .../command/v2/DescribeTableSuite.scala | 3 ++- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala index 1702c62c7328f..ba5d613c1a752 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala @@ -21,9 +21,10 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.util.quoteIfNeeded -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsMetadataColumns, Table} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsMetadataColumns, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.IdentityTransform case class DescribeTableExec( @@ -47,11 +48,19 @@ case class DescribeTableExec( rows += toCatalystRow("# Detailed Table Information", "", "") rows += toCatalystRow("Name", table.name(), "") - CatalogV2Util.TABLE_RESERVED_PROPERTIES.foreach(propKey => { - if (table.properties.containsKey(propKey)) { - rows += toCatalystRow(propKey.capitalize, table.properties.get(propKey), "") - } - }) + val tableType = if (table.properties().containsKey(TableCatalog.PROP_EXTERNAL)) { + CatalogTableType.EXTERNAL.name + } else { + CatalogTableType.MANAGED.name + } + rows += toCatalystRow("Type", tableType, "") + CatalogV2Util.TABLE_RESERVED_PROPERTIES + .filterNot(_ == TableCatalog.PROP_EXTERNAL) + .foreach(propKey => { + if (table.properties.containsKey(propKey)) { + rows += toCatalystRow(propKey.capitalize, table.properties.get(propKey), "") + } + }) val properties = conf.redactOptions(table.properties.asScala.toMap).toList .filter(kv => !CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(kv._1)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala index 7b44fa59e8174..b09abec6bc339 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala @@ -39,7 +39,7 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase with CommandSuit } } - test("DESCRIBE TABLE of a partitioned table by a nested column") { + test("DESCRIBE TABLE of a partitioned table by nested columns") { withNamespaceAndTable("ns", "table") { tbl => sql(s"CREATE TABLE $tbl (s struct, data string) " + s"$defaultUsing PARTITIONED BY (s.id, s.a)") @@ -83,6 +83,7 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase with CommandSuit Row("", "", ""), Row("# Detailed Table Information", "", ""), Row("Name", tbl, ""), + Row("Type", "MANAGED", ""), Row("Comment", "this is a test table", ""), Row("Location", "file:/tmp/testcat/table_name", ""), Row("Provider", "_", ""), From d95ff9a2db025e37db37dc5a6b1438c7a83f5651 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 23 Jun 2022 08:57:31 +0300 Subject: [PATCH 9/9] Print assert when a partition column doesn't exist --- .../sql/execution/datasources/v2/DescribeTableExec.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala index ba5d613c1a752..acb861d7679d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala @@ -98,8 +98,13 @@ case class DescribeTableExec( rows += toCatalystRow(s"# ${output(0).name}", output(1).name, output(2).name) rows ++= table.partitioning .map(_.asInstanceOf[IdentityTransform].ref.fieldNames()) - .flatMap(table.schema.findNestedField(_)) - .map { case (path, field) => + .map { fieldNames => + val nestedField = table.schema.findNestedField(fieldNames) + assert(nestedField.isDefined, + s"Not found the partition column ${fieldNames.map(quoteIfNeeded).mkString(".")} " + + s"in the table schema ${table.schema().catalogString}.") + nestedField.get + }.map { case (path, field) => toCatalystRow( (path :+ field.name).map(quoteIfNeeded(_)).mkString("."), field.dataType.simpleString,