From a3cc0f72e755899a66580d7512743777a5f911d8 Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Tue, 8 Dec 2020 15:39:04 +0800 Subject: [PATCH 1/6] SPARK-33142 followup --- .../sql/catalyst/catalog/SessionCatalog.scala | 44 ++++++++++++++++--- .../plans/logical/basicLogicalOperators.scala | 15 ------- .../spark/sql/execution/command/views.scala | 16 ++----- .../apache/spark/sql/CachedTableSuite.scala | 13 ++++++ 4 files changed, 54 insertions(+), 34 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 7c805bdb4b6f1..2c69dce1debd1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -610,8 +610,16 @@ class SessionCatalog( /** * Return a local temporary view exactly as it was stored. */ + def getRawTempView(name: String): Option[LogicalPlan] = synchronized { + tempViews.get(formatTableName(name)) + } + + /** + * Generate a [[View]] operator from the view description if the view stores sql text, + * otherwise, it is same to `getRawTempView` + */ def getTempView(name: String): Option[LogicalPlan] = synchronized { - tempViews.get(formatTableName(name)).map(getTempViewPlan) + getRawTempView(name).map(getTempViewPlan) } def getTempViewNames(): Seq[String] = synchronized { @@ -621,8 +629,16 @@ class SessionCatalog( /** * Return a global temporary view exactly as it was stored. */ + def getRawGlobalTempView(name: String): Option[LogicalPlan] = { + globalTempViewManager.get(formatTableName(name)) + } + + /** + * Generate a [[View]] operator from the view description if the view stores sql text, + * otherwise, it is same to `getRawGlobalTempView` + */ def getGlobalTempView(name: String): Option[LogicalPlan] = { - globalTempViewManager.get(formatTableName(name)).map(getTempViewPlan) + getGlobalTempView(name).map(getTempViewPlan) } /** @@ -659,7 +675,7 @@ class SessionCatalog( def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized { val table = formatTableName(name.table) if (name.database.isEmpty) { - getTempView(table).map { + tempViews.get(table).map { case TemporaryViewRelation(metadata) => metadata case plan => CatalogTable( @@ -669,7 +685,6 @@ class SessionCatalog( schema = plan.output.toStructType) }.getOrElse(getTableMetadata(name)) } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) { - val a = globalTempViewManager.get(table) globalTempViewManager.get(table).map { case TemporaryViewRelation(metadata) => metadata case plan => @@ -810,21 +825,36 @@ class SessionCatalog( // The relation is a view, so we wrap the relation by: // 1. Add a [[View]] operator over the relation to keep track of the view desc; // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view. - val child = View.fromCatalogTable(metadata, isTempView = false, parser) + val child = fromCatalogTable(metadata, isTempView = false, parser) SubqueryAlias(multiParts, child) } else { SubqueryAlias(multiParts, UnresolvedCatalogRelation(metadata, options)) } } - def getTempViewPlan(plan: LogicalPlan): LogicalPlan = { + private def getTempViewPlan(plan: LogicalPlan): LogicalPlan = { plan match { case viewInfo: TemporaryViewRelation => - View.fromCatalogTable(viewInfo.tableMeta, isTempView = true, parser) + fromCatalogTable(viewInfo.tableMeta, isTempView = true, parser) case v => v } } + private def fromCatalogTable( + metadata: CatalogTable, isTempView: Boolean, parser: ParserInterface): View = { + val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text.")) + val viewConfigs = metadata.viewSQLConfigs + val viewPlan = + SQLConf.withExistingConf(View.effectiveSQLConf(viewConfigs, isTempView = isTempView)) { + parser.parsePlan(viewText) + } + View( + desc = metadata, + isTempView = isTempView, + output = metadata.schema.toAttributes, + child = viewPlan) + } + def lookupTempView(table: String): Option[SubqueryAlias] = { val formattedTable = formatTableName(table) getTempView(formattedTable).map { view => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 0e4bfa4dc34da..8ce11ee7bf13e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -485,21 +485,6 @@ object View { } sqlConf } - - def fromCatalogTable( - metadata: CatalogTable, isTempView: Boolean, parser: ParserInterface): View = { - val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text.")) - val viewConfigs = metadata.viewSQLConfigs - val viewPlan = - SQLConf.withExistingConf(effectiveSQLConf(viewConfigs, isTempView = isTempView)) { - parser.parsePlan(viewText) - } - View( - desc = metadata, - isTempView = isTempView, - output = metadata.schema.toAttributes, - child = viewPlan) - } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 06b1e03adea50..6f32f9d2bfcbe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -113,12 +113,8 @@ case class CreateViewCommand( verifyTemporaryObjectsNotExists(catalog, isTemporary, name, child) if (viewType == LocalTempView) { - val shouldUncache = replace && catalog.getTempView(name.table).exists { - // Uncache View logical plan without checking the same result check, since it's unresolved. - case _: View => true - case other => !other.sameResult(child) - } - if (shouldUncache) { + if (replace && catalog.getRawTempView(name.table).isDefined && + !catalog.getRawTempView(name.table).get.sameResult(child)) { logInfo(s"Try to uncache ${name.quotedString} before replacing.") checkCyclicViewReference(analyzedPlan, Seq(name), name) CommandUtils.uncacheTableOrView(sparkSession, name.quotedString) @@ -141,12 +137,8 @@ case class CreateViewCommand( } else if (viewType == GlobalTempView) { val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) val viewIdent = TableIdentifier(name.table, Option(db)) - val shouldUncache = replace && catalog.getGlobalTempView(name.table).exists { - // Uncache View logical plan without checking the same result check, since it's unresolved. - case _: View => true - case other => !other.sameResult(child) - } - if (shouldUncache) { + if (replace && catalog.getRawGlobalTempView(name.table).isDefined && + !catalog.getRawGlobalTempView(name.table).get.sameResult(child)) { logInfo(s"Try to uncache ${viewIdent.quotedString} before replacing.") checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent) CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index a3a6d6721c993..af8d72309bdea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -1272,4 +1272,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils } } } + + test("SPARK-33647: cache table support for permanent view") { + withView("v1") { + spark.catalog.clearCache() + sql("create or replace view v1 as select 1") + sql("cache table v1") + assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1")).isDefined) + sql("create or replace view v1 as select 1, 2") + assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1")).isEmpty) + sql("cache table v1") + assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1, 2")).isDefined) + } + } } From 6d27c0d32a905a6c13b529ce70ff38f3a03fe99e Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Tue, 8 Dec 2020 17:23:36 +0800 Subject: [PATCH 2/6] support function --- .../sql/catalyst/catalog/SessionCatalog.scala | 2 +- .../plans/logical/basicLogicalOperators.scala | 1 - .../apache/spark/sql/execution/SQLViewSuite.scala | 14 -------------- .../spark/sql/execution/SQLViewTestSuite.scala | 1 - 4 files changed, 1 insertion(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 2c69dce1debd1..17b0415cd57e1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -638,7 +638,7 @@ class SessionCatalog( * otherwise, it is same to `getRawGlobalTempView` */ def getGlobalTempView(name: String): Option[LogicalPlan] = { - getGlobalTempView(name).map(getTempViewPlan) + getRawGlobalTempView(name).map(getTempViewPlan) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 8ce11ee7bf13e..91fb77574a0ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.analysis.{EliminateView, MultiInstanceRelat import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression -import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index 7595ae0ec7a53..50db986490033 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -812,20 +812,6 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { } } - test("creating local temp view should not affect existing table reference") { - withTable("t") { - withTempView("t") { - withGlobalTempView("v") { - val globalTempDB = spark.sharedState.globalTempViewManager.database - Seq(2).toDF("c1").write.format("parquet").saveAsTable("t") - sql("CREATE GLOBAL TEMPORARY VIEW v AS SELECT * FROM t") - sql("CREATE TEMPORARY VIEW t AS SELECT 1") - checkAnswer(sql(s"SELECT * FROM ${globalTempDB}.v"), Seq(Row(2))) - } - } - } - } - test("SPARK-33141: view should be parsed and analyzed with configs set when creating") { withTable("t") { withView("v1", "v2", "v3", "v4", "v5") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala index 3a7a63ed45ce3..81c60b8e8865c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala @@ -231,7 +231,6 @@ abstract class SQLViewTestSuite extends QueryTest with SQLTestUtils { class LocalTempViewTestSuite extends SQLViewTestSuite with SharedSparkSession { override protected def viewTypeString: String = "TEMPORARY VIEW" override protected def formattedViewName(viewName: String): String = viewName - } class GlobalTempViewTestSuite extends SQLViewTestSuite with SharedSparkSession { From d72e0f6b7170a30f30a1d8d5579cc6f43918e9aa Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Thu, 10 Dec 2020 11:22:59 +0800 Subject: [PATCH 3/6] add migration guide --- docs/sql-migration-guide.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 484823b7c07ab..4b6c2266387f5 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -58,7 +58,9 @@ license: | - In Spark 3.1, refreshing a table will trigger an uncache operation for all other caches that reference the table, even if the table itself is not cached. In Spark 3.0 the operation will only be triggered if the table itself is cached. - - In Spark 3.1, creating or altering a view will capture runtime SQL configs and store them as view properties. These configs will be applied during the parsing and analysis phases of the view resolution. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.useCurrentConfigsForView` to `true`. + - In Spark 3.1, creating or altering a permanent view will capture runtime SQL configs and store them as view properties. These configs will be applied during the parsing and analysis phases of the view resolution. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.useCurrentConfigsForView` to `true`. + + - In Spark 3.1, the temporary view will have same behaviors with the permanent view, i.e. capture and store runtime SQL configs, SQL text, catalog and namespace. The capatured view properties will be applied during the parsing and analysis phases of the view resolution. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.storeAnalyzedPlanForView` to `true`. - Since Spark 3.1, CHAR/CHARACTER and VARCHAR types are supported in the table schema. Table scan/insertion will respect the char/varchar semantic. If char/varchar is used in places other than table schema, an exception will be thrown (CAST is an exception that simply treats char/varchar as string like before). To restore the behavior before Spark 3.1, which treats them as STRING types and ignores a length parameter, e.g. `CHAR(4)`, you can set `spark.sql.legacy.charVarcharAsString` to `true`. From 8dde6292b27bff8ed3ff1e5fedc8b7311e9ec6fb Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Thu, 10 Dec 2020 17:21:20 +0800 Subject: [PATCH 4/6] add test --- .../spark/sql/execution/SQLViewTestSuite.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala index 81c60b8e8865c..e2514a9e3a8c3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala @@ -200,6 +200,20 @@ abstract class SQLViewTestSuite extends QueryTest with SQLTestUtils { } } + test("creating local temp view should not affect existing table reference") { + withTable("t") { + withTempView("t") { + withGlobalTempView("v") { + val globalTempDB = spark.sharedState.globalTempViewManager.database + Seq(2).toDF("c1").write.format("parquet").saveAsTable("t") + sql("CREATE GLOBAL TEMPORARY VIEW v AS SELECT * FROM t") + sql("CREATE TEMPORARY VIEW t AS SELECT 1") + checkAnswer(sql(s"SELECT * FROM ${globalTempDB}.v"), Seq(Row(2))) + } + } + } + } + test("SPARK-33692: view should use captured catalog and namespace to lookup function") { val avgFuncClass = "test.org.apache.spark.sql.MyDoubleAvg" val sumFuncClass = "test.org.apache.spark.sql.MyDoubleSum" From 9bcf00be05404109d71413131d466477e48a9258 Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Thu, 10 Dec 2020 21:27:55 +0800 Subject: [PATCH 5/6] address comments --- .../sql/catalyst/catalog/SessionCatalog.scala | 8 +++---- .../sql/execution/SQLViewTestSuite.scala | 23 +++++++++++++++++++ 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 17b0415cd57e1..9814f4b3aa75b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -825,8 +825,7 @@ class SessionCatalog( // The relation is a view, so we wrap the relation by: // 1. Add a [[View]] operator over the relation to keep track of the view desc; // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view. - val child = fromCatalogTable(metadata, isTempView = false, parser) - SubqueryAlias(multiParts, child) + SubqueryAlias(multiParts, fromCatalogTable(metadata, isTempView = false)) } else { SubqueryAlias(multiParts, UnresolvedCatalogRelation(metadata, options)) } @@ -835,13 +834,12 @@ class SessionCatalog( private def getTempViewPlan(plan: LogicalPlan): LogicalPlan = { plan match { case viewInfo: TemporaryViewRelation => - fromCatalogTable(viewInfo.tableMeta, isTempView = true, parser) + fromCatalogTable(viewInfo.tableMeta, isTempView = true) case v => v } } - private def fromCatalogTable( - metadata: CatalogTable, isTempView: Boolean, parser: ParserInterface): View = { + private def fromCatalogTable(metadata: CatalogTable, isTempView: Boolean): View = { val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text.")) val viewConfigs = metadata.viewSQLConfigs val viewPlan = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala index e2514a9e3a8c3..6c039c654437b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala @@ -214,6 +214,29 @@ abstract class SQLViewTestSuite extends QueryTest with SQLTestUtils { } } + test("view should use captured catalog and namespace to resolve relation") { + withTempDatabase { dbName => + withTable("default.t", s"$dbName.t") { + withTempView("t") { + // create a table in default database + sql("USE DEFAULT") + Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t") + // create a view refer the created table in default database + val viewName = createView("v1", "SELECT * FROM t") + // using another database to create a table with same name + sql(s"USE $dbName") + Seq(4, 5, 6).toDF("c1").write.format("parquet").saveAsTable("t") + // create a temporary view with the same name + sql("CREATE TEMPORARY VIEW t AS SELECT 1") + withView(viewName) { + // view v1 should still refer the table defined in `default` database + checkViewOutput(viewName, Seq(Row(2), Row(3), Row(1))) + } + } + } + } + } + test("SPARK-33692: view should use captured catalog and namespace to lookup function") { val avgFuncClass = "test.org.apache.spark.sql.MyDoubleAvg" val sumFuncClass = "test.org.apache.spark.sql.MyDoubleSum" From 1cde94e40490ae021334af885f90811c8265ac98 Mon Sep 17 00:00:00 2001 From: Linhong Liu <67896261+linhongliu-db@users.noreply.github.com> Date: Fri, 11 Dec 2020 12:49:37 +0800 Subject: [PATCH 6/6] Update SQLViewTestSuite.scala --- .../spark/sql/execution/SQLViewTestSuite.scala | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala index 6c039c654437b..8c3d92358a975 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala @@ -200,20 +200,6 @@ abstract class SQLViewTestSuite extends QueryTest with SQLTestUtils { } } - test("creating local temp view should not affect existing table reference") { - withTable("t") { - withTempView("t") { - withGlobalTempView("v") { - val globalTempDB = spark.sharedState.globalTempViewManager.database - Seq(2).toDF("c1").write.format("parquet").saveAsTable("t") - sql("CREATE GLOBAL TEMPORARY VIEW v AS SELECT * FROM t") - sql("CREATE TEMPORARY VIEW t AS SELECT 1") - checkAnswer(sql(s"SELECT * FROM ${globalTempDB}.v"), Seq(Row(2))) - } - } - } - } - test("view should use captured catalog and namespace to resolve relation") { withTempDatabase { dbName => withTable("default.t", s"$dbName.t") {