From 917ca040002fcb57e1947b2f619ad3e2b2fd32c2 Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Tue, 17 Jan 2017 17:41:25 +0800 Subject: [PATCH 1/5] implement view write path for the new approach. --- .../sql/catalyst/catalog/interface.scala | 19 --- .../spark/sql/execution/command/views.scala | 139 +++++++++++++----- .../org/apache/spark/sql/SQLQuerySuite.scala | 10 +- .../sql/hive/execution/SQLViewSuite.scala | 9 +- 4 files changed, 118 insertions(+), 59 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 2adccdd7bf61d..80d32822f58ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -223,25 +223,6 @@ case class CatalogTable( ) } - /** - * Insert/Update the view query output column names in `properties`. - */ - def withQueryColumnNames(columns: Seq[String]): CatalogTable = { - val props = new mutable.HashMap[String, String] - if (columns.nonEmpty) { - props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS, columns.length.toString) - columns.zipWithIndex.foreach { case (colName, index) => - props.put(s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index", colName) - } - } - - // We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable, - // while `CatalogTable` should be serializable. - copy(properties = properties.filterNot { case (key, _) => - key.startsWith(VIEW_QUERY_OUTPUT_PREFIX) - } ++ props) - } - /** Syntactic sugar to update a field in `storage`. */ def withNewStorage( locationUri: Option[String] = storage.locationUri, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 154141bf83c7d..f7663a203ed60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -17,10 +17,10 @@ package org.apache.spark.sql.execution.command -import scala.util.control.NonFatal +import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, Row, SparkSession} -import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.Alias @@ -64,9 +64,9 @@ object PersistedView extends ViewType /** - * Create or replace a view with given query plan. This command will convert the query plan to - * canonicalized SQL string, and store it as view text in metastore, if we need to create a - * permanent view. + * Create or replace a view with given query plan. This command will generate some view-specific + * properties(e.g. view default database, view query output column names) and store them as + * properties in metastore, if we need to create a permanent view. * * @param name the name of this view. * @param userSpecifiedColumns the output column names and optional comments specified by users, @@ -75,8 +75,8 @@ object PersistedView extends ViewType * @param properties the properties of this view. * @param originalText the original SQL text of this view, can be None if this view is created via * Dataset API. - * @param child the logical plan that represents the view; this is used to generate a canonicalized - * version of the SQL that can be saved in the catalog. + * @param child the logical plan that represents the view; this is used to generate the logical + * plan for temporary view and the view schema. * @param allowExisting if true, and if the view already exists, noop; if false, and if the view * already exists, throws analysis exception. * @param replace if true, and if the view already exists, updates it; if false, and if the view @@ -95,6 +95,8 @@ case class CreateViewCommand( viewType: ViewType) extends RunnableCommand { + import ViewHelper._ + override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child) if (viewType == PersistedView) { @@ -207,29 +209,26 @@ case class CreateViewCommand( } /** - * Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize - * SQL based on the analyzed plan, and also creates the proper schema for the view. + * Returns a [[CatalogTable]] that can be used to save in the catalog. Generate the view-specific + * properties(e.g. view default database, view query output column names) and store them as + * properties in the CatalogTable, and also creates the proper schema for the view. */ - private def prepareTable(sparkSession: SparkSession, aliasedPlan: LogicalPlan): CatalogTable = { - val viewSQL: String = new SQLBuilder(aliasedPlan).toSQL - - // Validate the view SQL - make sure we can parse it and analyze it. - // If we cannot analyze the generated query, there is probably a bug in SQL generation. - try { - sparkSession.sql(viewSQL).queryExecution.assertAnalyzed() - } catch { - case NonFatal(e) => - throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e) + private def prepareTable(session: SparkSession, aliasedPlan: LogicalPlan): CatalogTable = { + if (originalText.isEmpty) { + throw new AnalysisException( + "It is not allowed to create a persisted view from the Dataset API") } + val newProperties = generateViewProperties(properties, session, originalText.get) + CatalogTable( identifier = name, tableType = CatalogTableType.VIEW, storage = CatalogStorageFormat.empty, schema = aliasedPlan.schema, - properties = properties, + properties = newProperties, viewOriginalText = originalText, - viewText = Some(viewSQL), + viewText = originalText, comment = comment ) } @@ -244,14 +243,16 @@ case class CreateViewCommand( * @param name the name of this view. * @param originalText the original SQL text of this view. Note that we can only alter a view by * SQL API, which means we always have originalText. - * @param query the logical plan that represents the view; this is used to generate a canonicalized - * version of the SQL that can be saved in the catalog. + * @param query the logical plan that represents the view; this is used to generate the new view + * schema. */ case class AlterViewAsCommand( name: TableIdentifier, originalText: String, query: LogicalPlan) extends RunnableCommand { + import ViewHelper._ + override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query) override def run(session: SparkSession): Seq[Row] = { @@ -275,21 +276,93 @@ case class AlterViewAsCommand( throw new AnalysisException(s"${viewMeta.identifier} is not a view.") } - val viewSQL: String = new SQLBuilder(analyzedPlan).toSQL - // Validate the view SQL - make sure we can parse it and analyze it. - // If we cannot analyze the generated query, there is probably a bug in SQL generation. - try { - session.sql(viewSQL).queryExecution.assertAnalyzed() - } catch { - case NonFatal(e) => - throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e) - } + val newProperties = generateViewProperties(viewMeta.properties, session, originalText) val updatedViewMeta = viewMeta.copy( schema = analyzedPlan.schema, + properties = newProperties, viewOriginalText = Some(originalText), - viewText = Some(viewSQL)) + viewText = Some(originalText)) session.sessionState.catalog.alterTable(updatedViewMeta) } } + +object ViewHelper { + + import CatalogTable._ + + /** + * Generate the view default database in `properties`. + */ + def generateViewDefaultDatabase(databaseName: String): Map[String, String] = { + Map(VIEW_DEFAULT_DATABASE -> databaseName) + } + + /** + * Generate the view query output column names in `properties`. + */ + def generateQueryColumnNames(columns: Seq[String]): Map[String, String] = { + val props = new mutable.HashMap[String, String] + if (columns.nonEmpty) { + props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS, columns.length.toString) + columns.zipWithIndex.foreach { case (colName, index) => + props.put(s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index", colName) + } + } + props.toMap + } + + /** + * Remove the view query output column names in `properties`. + */ + def removeQueryColumnNames(properties: Map[String, String]): Map[String, String] = { + // We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable, + // while `CatalogTable` should be serializable. + properties.filterNot { case (key, _) => + key.startsWith(VIEW_QUERY_OUTPUT_PREFIX) + } + } + + /** + * Generate the view properties in CatalogTable, including: + * 1. view default database that is used to provide the default database name on view resolution. + * 2. the output column names of the query that creates a view, this is used to map the output of + * the view child to the view output during view resolution. + * + * @param properties the `properties` in CatalogTable. + * @param session the spark session. + * @param viewText the query string used to create the child plan on view resolution. + * @return new view properties including view default database and query column names properties. + */ + def generateViewProperties( + properties: Map[String, String], + session: SparkSession, + viewText: String): Map[String, String] = { + // Try to analyze the viewText, throw an AnalysisException if the query is invalid. + val queryPlan = try { + val unresolvedPlan = session.sessionState.sqlParser.parsePlan(viewText) + val resolvedPlan = session.sessionState.analyzer.execute(unresolvedPlan) + session.sessionState.analyzer.checkAnalysis(resolvedPlan) + + resolvedPlan + } catch { + case e: AnalysisException => + throw new AnalysisException(s"Failed to analyze the view query SQL: $viewText", + cause = Some(e)) + } + + // Generate the query column names, throw an AnalysisException if there exists duplicate column + // names. + val queryOutput = queryPlan.schema.fieldNames + assert(queryOutput.toSet.size == queryOutput.size, + s"The view output ${queryOutput.mkString("(", ",", ")")} contains duplicate column name.") + + // Generate the view default database name. + val viewDefaultDatabase = session.sessionState.catalog.getCurrentDatabase + + removeQueryColumnNames(properties) ++ + generateViewDefaultDatabase(viewDefaultDatabase) ++ + generateQueryColumnNames(queryOutput) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 605dec4a1ef90..10607b8dc2c21 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2501,11 +2501,15 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("should be able to resolve a persistent view") { - withTable("t1") { + withTable("t1", "t2") { withView("v1") { sql("CREATE TABLE `t1` USING parquet AS SELECT * FROM VALUES(1, 1) AS t1(a, b)") - sql("CREATE VIEW `v1` AS SELECT * FROM t1") - checkAnswer(spark.table("v1"), Row(1, 1)) + sql("CREATE TABLE `t2` USING parquet AS SELECT * FROM VALUES('a', 2, 1.0) AS t2(d, e, f)") + sql("CREATE VIEW `v1`(x, y) AS SELECT * FROM t1") + checkAnswer(spark.table("v1").orderBy("x"), Row(1, 1)) + + sql("ALTER VIEW `v1` AS SELECT * FROM t2") + checkAnswer(spark.table("v1").orderBy("f"), Row("a", 2, 1.0)) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala index 9bc078dbb0228..df7e6ffce4c17 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala @@ -222,13 +222,14 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } test("correctly parse CREATE VIEW statement") { - sql( - """CREATE VIEW IF NOT EXISTS + withView("testView") { + sql( + """CREATE VIEW IF NOT EXISTS |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla') |TBLPROPERTIES ('a' = 'b') |AS SELECT * FROM jt""".stripMargin) - checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i))) - sql("DROP VIEW testView") + checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i))) + } } test("correctly parse CREATE TEMPORARY VIEW statement") { From 9d582a453df92b2cc18df38acaaab0f621fac9c5 Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Tue, 17 Jan 2017 19:55:03 +0800 Subject: [PATCH 2/5] update failed test cases in HiveDDLSuite. --- .../sql/hive/execution/HiveDDLSuite.scala | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index e3f1667249684..e9355b8c52382 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -370,28 +370,35 @@ class HiveDDLSuite spark.range(10).write.saveAsTable(tabName) val viewName = "view1" withView(viewName) { + def checkProperties( + properties: Map[String, String], + expected: Map[String, String]): Boolean = { + properties.filterNot { case (key, value) => + Seq("transient_lastDdlTime", CatalogTable.VIEW_DEFAULT_DATABASE).contains(key) || + key.startsWith(CatalogTable.VIEW_QUERY_OUTPUT_PREFIX) + } == expected + } + val catalog = spark.sessionState.catalog sql(s"CREATE VIEW $viewName AS SELECT * FROM $tabName") - assert(catalog.getTableMetadata(TableIdentifier(viewName)) - .properties.filter(_._1 != "transient_lastDdlTime") == Map()) + checkProperties(catalog.getTableMetadata(TableIdentifier(viewName)).properties, Map()) sql(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'an')") - assert(catalog.getTableMetadata(TableIdentifier(viewName)) - .properties.filter(_._1 != "transient_lastDdlTime") == Map("p" -> "an")) + checkProperties(catalog.getTableMetadata(TableIdentifier(viewName)).properties, + Map("p" -> "an")) // no exception or message will be issued if we set it again sql(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'an')") - assert(catalog.getTableMetadata(TableIdentifier(viewName)) - .properties.filter(_._1 != "transient_lastDdlTime") == Map("p" -> "an")) + checkProperties(catalog.getTableMetadata(TableIdentifier(viewName)).properties, + Map("p" -> "an")) // the value will be updated if we set the same key to a different value sql(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'b')") - assert(catalog.getTableMetadata(TableIdentifier(viewName)) - .properties.filter(_._1 != "transient_lastDdlTime") == Map("p" -> "b")) + checkProperties(catalog.getTableMetadata(TableIdentifier(viewName)).properties, + Map("p" -> "b")) sql(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')") - assert(catalog.getTableMetadata(TableIdentifier(viewName)) - .properties.filter(_._1 != "transient_lastDdlTime") == Map()) + checkProperties(catalog.getTableMetadata(TableIdentifier(viewName)).properties, Map()) val message = intercept[AnalysisException] { sql(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')") @@ -644,10 +651,7 @@ class HiveDDLSuite Seq( Row("# View Information", "", ""), Row("View Original Text:", "SELECT * FROM tbl", ""), - Row("View Expanded Text:", - "SELECT `gen_attr_0` AS `a` FROM (SELECT `gen_attr_0` FROM " + - "(SELECT `a` AS `gen_attr_0` FROM `default`.`tbl`) AS gen_subquery_0) AS tbl", - "") + Row("View Expanded Text:", "SELECT * FROM tbl", "") ) )) } From 2d49ef26936448dd70768562c4ef429542f56e4e Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Wed, 18 Jan 2017 15:37:23 +0800 Subject: [PATCH 3/5] code refactor. --- .../spark/sql/execution/command/views.scala | 41 +++++++++---------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index f7663a203ed60..17137a857293e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -165,7 +165,7 @@ case class CreateViewCommand( throw new AnalysisException(s"$name is not a view") } else if (replace) { // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` - catalog.alterTable(prepareTable(sparkSession, aliasedPlan)) + catalog.alterTable(prepareTable(sparkSession, aliasedPlan, analyzedPlan)) } else { // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already // exists. @@ -175,7 +175,8 @@ case class CreateViewCommand( } } else { // Create the view if it doesn't exist. - catalog.createTable(prepareTable(sparkSession, aliasedPlan), ignoreIfExists = false) + catalog.createTable(prepareTable(sparkSession, aliasedPlan, analyzedPlan), + ignoreIfExists = false) } Seq.empty[Row] } @@ -212,14 +213,23 @@ case class CreateViewCommand( * Returns a [[CatalogTable]] that can be used to save in the catalog. Generate the view-specific * properties(e.g. view default database, view query output column names) and store them as * properties in the CatalogTable, and also creates the proper schema for the view. + * + * @param session the spark session. + * @param aliasedPlan if `userSpecifiedColumns` is defined, the aliased plan outputs the user + * specified columns, else it is the same as the `analyzedPlan`. + * @param analyzedPlan the analyzed logical plan that represents the child of a view. + * @return a CatalogTable that can be used to save in the catalog. */ - private def prepareTable(session: SparkSession, aliasedPlan: LogicalPlan): CatalogTable = { + private def prepareTable( + session: SparkSession, + aliasedPlan: LogicalPlan, + analyzedPlan: LogicalPlan): CatalogTable = { if (originalText.isEmpty) { throw new AnalysisException( "It is not allowed to create a persisted view from the Dataset API") } - val newProperties = generateViewProperties(properties, session, originalText.get) + val newProperties = generateViewProperties(properties, session, analyzedPlan) CatalogTable( identifier = name, @@ -276,7 +286,7 @@ case class AlterViewAsCommand( throw new AnalysisException(s"${viewMeta.identifier} is not a view.") } - val newProperties = generateViewProperties(viewMeta.properties, session, originalText) + val newProperties = generateViewProperties(viewMeta.properties, session, analyzedPlan) val updatedViewMeta = viewMeta.copy( schema = analyzedPlan.schema, @@ -332,30 +342,17 @@ object ViewHelper { * * @param properties the `properties` in CatalogTable. * @param session the spark session. - * @param viewText the query string used to create the child plan on view resolution. + * @param analyzedPlan the analyzed logical plan that represents the child of a view. * @return new view properties including view default database and query column names properties. */ def generateViewProperties( properties: Map[String, String], session: SparkSession, - viewText: String): Map[String, String] = { - // Try to analyze the viewText, throw an AnalysisException if the query is invalid. - val queryPlan = try { - val unresolvedPlan = session.sessionState.sqlParser.parsePlan(viewText) - val resolvedPlan = session.sessionState.analyzer.execute(unresolvedPlan) - session.sessionState.analyzer.checkAnalysis(resolvedPlan) - - resolvedPlan - } catch { - case e: AnalysisException => - throw new AnalysisException(s"Failed to analyze the view query SQL: $viewText", - cause = Some(e)) - } - + analyzedPlan: LogicalPlan): Map[String, String] = { // Generate the query column names, throw an AnalysisException if there exists duplicate column // names. - val queryOutput = queryPlan.schema.fieldNames - assert(queryOutput.toSet.size == queryOutput.size, + val queryOutput = analyzedPlan.schema.fieldNames + assert(queryOutput.distinct.size == queryOutput.size, s"The view output ${queryOutput.mkString("(", ",", ")")} contains duplicate column name.") // Generate the view default database name. From e2ccdd5689274e44289017822a2bf81de32dbd37 Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Wed, 18 Jan 2017 16:17:40 +0800 Subject: [PATCH 4/5] refactor aliasedPlan. --- .../spark/sql/execution/command/views.scala | 54 +++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 17137a857293e..e9da36e014fa6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -139,22 +139,12 @@ case class CreateViewCommand( // This should be called after `qe.assertAnalyzed()` (i.e., `child` can be resolved) verifyTemporaryObjectsNotExists(sparkSession) - val aliasedPlan = if (userSpecifiedColumns.isEmpty) { - analyzedPlan - } else { - val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { - case (attr, (colName, None)) => Alias(attr, colName)() - case (attr, (colName, Some(colComment))) => - val meta = new MetadataBuilder().putString("comment", colComment).build() - Alias(attr, colName)(explicitMetadata = Some(meta)) - } - sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed - } - val catalog = sparkSession.sessionState.catalog if (viewType == LocalTempView) { + val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) catalog.createTempView(name.table, aliasedPlan, overrideIfExists = replace) } else if (viewType == GlobalTempView) { + val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) catalog.createGlobalTempView(name.table, aliasedPlan, overrideIfExists = replace) } else if (catalog.tableExists(name)) { val tableMetadata = catalog.getTableMetadata(name) @@ -165,7 +155,7 @@ case class CreateViewCommand( throw new AnalysisException(s"$name is not a view") } else if (replace) { // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` - catalog.alterTable(prepareTable(sparkSession, aliasedPlan, analyzedPlan)) + catalog.alterTable(prepareTable(sparkSession, analyzedPlan)) } else { // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already // exists. @@ -175,7 +165,7 @@ case class CreateViewCommand( } } else { // Create the view if it doesn't exist. - catalog.createTable(prepareTable(sparkSession, aliasedPlan, analyzedPlan), + catalog.createTable(prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false) } Seq.empty[Row] @@ -209,26 +199,36 @@ case class CreateViewCommand( } } + /** + * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns, + * else return the analyzed plan directly. + */ + private def aliasPlan(session: SparkSession, analyzedPlan: LogicalPlan): LogicalPlan = { + if (userSpecifiedColumns.isEmpty) { + analyzedPlan + } else { + val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { + case (attr, (colName, None)) => Alias(attr, colName)() + case (attr, (colName, Some(colComment))) => + val meta = new MetadataBuilder().putString("comment", colComment).build() + Alias(attr, colName)(explicitMetadata = Some(meta)) + } + session.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed + } + } + /** * Returns a [[CatalogTable]] that can be used to save in the catalog. Generate the view-specific * properties(e.g. view default database, view query output column names) and store them as * properties in the CatalogTable, and also creates the proper schema for the view. - * - * @param session the spark session. - * @param aliasedPlan if `userSpecifiedColumns` is defined, the aliased plan outputs the user - * specified columns, else it is the same as the `analyzedPlan`. - * @param analyzedPlan the analyzed logical plan that represents the child of a view. - * @return a CatalogTable that can be used to save in the catalog. */ - private def prepareTable( - session: SparkSession, - aliasedPlan: LogicalPlan, - analyzedPlan: LogicalPlan): CatalogTable = { + private def prepareTable(session: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = { if (originalText.isEmpty) { throw new AnalysisException( "It is not allowed to create a persisted view from the Dataset API") } + val aliasedPlan = aliasPlan(session, analyzedPlan) val newProperties = generateViewProperties(properties, session, analyzedPlan) CatalogTable( @@ -305,14 +305,14 @@ object ViewHelper { /** * Generate the view default database in `properties`. */ - def generateViewDefaultDatabase(databaseName: String): Map[String, String] = { + private def generateViewDefaultDatabase(databaseName: String): Map[String, String] = { Map(VIEW_DEFAULT_DATABASE -> databaseName) } /** * Generate the view query output column names in `properties`. */ - def generateQueryColumnNames(columns: Seq[String]): Map[String, String] = { + private def generateQueryColumnNames(columns: Seq[String]): Map[String, String] = { val props = new mutable.HashMap[String, String] if (columns.nonEmpty) { props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS, columns.length.toString) @@ -326,7 +326,7 @@ object ViewHelper { /** * Remove the view query output column names in `properties`. */ - def removeQueryColumnNames(properties: Map[String, String]): Map[String, String] = { + private def removeQueryColumnNames(properties: Map[String, String]): Map[String, String] = { // We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable, // while `CatalogTable` should be serializable. properties.filterNot { case (key, _) => From 7c5b6afe97676c42b62a8d2a92e98d120f81c96b Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Wed, 18 Jan 2017 17:07:12 +0800 Subject: [PATCH 5/5] code refactor. --- .../spark/sql/execution/command/views.scala | 6 ++---- .../sql/hive/execution/HiveDDLSuite.scala | 21 +++++++------------ .../sql/hive/execution/SQLViewSuite.scala | 7 ++++--- 3 files changed, 14 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index e9da36e014fa6..3da4bcfe9363c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -165,8 +165,7 @@ case class CreateViewCommand( } } else { // Create the view if it doesn't exist. - catalog.createTable(prepareTable(sparkSession, analyzedPlan), - ignoreIfExists = false) + catalog.createTable(prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false) } Seq.empty[Row] } @@ -228,14 +227,13 @@ case class CreateViewCommand( "It is not allowed to create a persisted view from the Dataset API") } - val aliasedPlan = aliasPlan(session, analyzedPlan) val newProperties = generateViewProperties(properties, session, analyzedPlan) CatalogTable( identifier = name, tableType = CatalogTableType.VIEW, storage = CatalogStorageFormat.empty, - schema = aliasedPlan.schema, + schema = aliasPlan(session, analyzedPlan).schema, properties = newProperties, viewOriginalText = originalText, viewText = originalText, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index e9355b8c52382..ab8fe3cba3551 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -370,35 +370,30 @@ class HiveDDLSuite spark.range(10).write.saveAsTable(tabName) val viewName = "view1" withView(viewName) { - def checkProperties( - properties: Map[String, String], - expected: Map[String, String]): Boolean = { + def checkProperties(expected: Map[String, String]): Boolean = { + val properties = spark.sessionState.catalog.getTableMetadata(TableIdentifier(viewName)) + .properties properties.filterNot { case (key, value) => Seq("transient_lastDdlTime", CatalogTable.VIEW_DEFAULT_DATABASE).contains(key) || key.startsWith(CatalogTable.VIEW_QUERY_OUTPUT_PREFIX) } == expected } - - val catalog = spark.sessionState.catalog sql(s"CREATE VIEW $viewName AS SELECT * FROM $tabName") - checkProperties(catalog.getTableMetadata(TableIdentifier(viewName)).properties, Map()) + checkProperties(Map()) sql(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'an')") - checkProperties(catalog.getTableMetadata(TableIdentifier(viewName)).properties, - Map("p" -> "an")) + checkProperties(Map("p" -> "an")) // no exception or message will be issued if we set it again sql(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'an')") - checkProperties(catalog.getTableMetadata(TableIdentifier(viewName)).properties, - Map("p" -> "an")) + checkProperties(Map("p" -> "an")) // the value will be updated if we set the same key to a different value sql(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'b')") - checkProperties(catalog.getTableMetadata(TableIdentifier(viewName)).properties, - Map("p" -> "b")) + checkProperties(Map("p" -> "b")) sql(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')") - checkProperties(catalog.getTableMetadata(TableIdentifier(viewName)).properties, Map()) + checkProperties(Map()) val message = intercept[AnalysisException] { sql(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala index df7e6ffce4c17..2658e2c91f235 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala @@ -225,9 +225,10 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { withView("testView") { sql( """CREATE VIEW IF NOT EXISTS - |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla') - |TBLPROPERTIES ('a' = 'b') - |AS SELECT * FROM jt""".stripMargin) + |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla') + |TBLPROPERTIES ('a' = 'b') + |AS SELECT * FROM jt + |""".stripMargin) checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i))) } }