Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ license: |

- In Spark 3.1, refreshing a table will trigger an uncache operation for all other caches that reference the table, even if the table itself is not cached. In Spark 3.0 the operation will only be triggered if the table itself is cached.

- In Spark 3.1, creating or altering a view will capture runtime SQL configs and store them as view properties. These configs will be applied during the parsing and analysis phases of the view resolution. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.useCurrentConfigsForView` to `true`.
- In Spark 3.1, creating or altering a permanent view will capture runtime SQL configs and store them as view properties. These configs will be applied during the parsing and analysis phases of the view resolution. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.useCurrentConfigsForView` to `true`.

- In Spark 3.1, the temporary view will have same behaviors with the permanent view, i.e. capture and store runtime SQL configs, SQL text, catalog and namespace. The capatured view properties will be applied during the parsing and analysis phases of the view resolution. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.storeAnalyzedPlanForView` to `true`.

- Since Spark 3.1, CHAR/CHARACTER and VARCHAR types are supported in the table schema. Table scan/insertion will respect the char/varchar semantic. If char/varchar is used in places other than table schema, an exception will be thrown (CAST is an exception that simply treats char/varchar as string like before). To restore the behavior before Spark 3.1, which treats them as STRING types and ignores a length parameter, e.g. `CHAR(4)`, you can set `spark.sql.legacy.charVarcharAsString` to `true`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,16 @@ class SessionCatalog(
/**
* Return a local temporary view exactly as it was stored.
*/
def getRawTempView(name: String): Option[LogicalPlan] = synchronized {
tempViews.get(formatTableName(name))
}

/**
* Generate a [[View]] operator from the view description if the view stores sql text,
* otherwise, it is same to `getRawTempView`
*/
def getTempView(name: String): Option[LogicalPlan] = synchronized {
tempViews.get(formatTableName(name)).map(getTempViewPlan)
getRawTempView(name).map(getTempViewPlan)
}

def getTempViewNames(): Seq[String] = synchronized {
Expand All @@ -621,8 +629,16 @@ class SessionCatalog(
/**
* Return a global temporary view exactly as it was stored.
*/
def getRawGlobalTempView(name: String): Option[LogicalPlan] = {
globalTempViewManager.get(formatTableName(name))
}

/**
* Generate a [[View]] operator from the view description if the view stores sql text,
* otherwise, it is same to `getRawGlobalTempView`
*/
def getGlobalTempView(name: String): Option[LogicalPlan] = {
globalTempViewManager.get(formatTableName(name)).map(getTempViewPlan)
getRawGlobalTempView(name).map(getTempViewPlan)
}

/**
Expand Down Expand Up @@ -659,7 +675,7 @@ class SessionCatalog(
def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized {
val table = formatTableName(name.table)
if (name.database.isEmpty) {
getTempView(table).map {
tempViews.get(table).map {
case TemporaryViewRelation(metadata) => metadata
case plan =>
CatalogTable(
Expand All @@ -669,7 +685,6 @@ class SessionCatalog(
schema = plan.output.toStructType)
}.getOrElse(getTableMetadata(name))
} else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
val a = globalTempViewManager.get(table)
globalTempViewManager.get(table).map {
case TemporaryViewRelation(metadata) => metadata
case plan =>
Expand Down Expand Up @@ -810,21 +825,34 @@ class SessionCatalog(
// The relation is a view, so we wrap the relation by:
// 1. Add a [[View]] operator over the relation to keep track of the view desc;
// 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view.
val child = View.fromCatalogTable(metadata, isTempView = false, parser)
SubqueryAlias(multiParts, child)
SubqueryAlias(multiParts, fromCatalogTable(metadata, isTempView = false))
} else {
SubqueryAlias(multiParts, UnresolvedCatalogRelation(metadata, options))
}
}

def getTempViewPlan(plan: LogicalPlan): LogicalPlan = {
private def getTempViewPlan(plan: LogicalPlan): LogicalPlan = {
plan match {
case viewInfo: TemporaryViewRelation =>
View.fromCatalogTable(viewInfo.tableMeta, isTempView = true, parser)
fromCatalogTable(viewInfo.tableMeta, isTempView = true)
case v => v
}
}

private def fromCatalogTable(metadata: CatalogTable, isTempView: Boolean): View = {
val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text."))
val viewConfigs = metadata.viewSQLConfigs
val viewPlan =
SQLConf.withExistingConf(View.effectiveSQLConf(viewConfigs, isTempView = isTempView)) {
parser.parsePlan(viewText)
}
View(
desc = metadata,
isTempView = isTempView,
output = metadata.schema.toAttributes,
child = viewPlan)
}

def lookupTempView(table: String): Option[SubqueryAlias] = {
val formattedTable = formatTableName(table)
getTempView(formattedTable).map { view =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.analysis.{EliminateView, MultiInstanceRelat
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
Expand Down Expand Up @@ -485,21 +484,6 @@ object View {
}
sqlConf
}

def fromCatalogTable(
metadata: CatalogTable, isTempView: Boolean, parser: ParserInterface): View = {
val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text."))
val viewConfigs = metadata.viewSQLConfigs
val viewPlan =
SQLConf.withExistingConf(effectiveSQLConf(viewConfigs, isTempView = isTempView)) {
parser.parsePlan(viewText)
}
View(
desc = metadata,
isTempView = isTempView,
output = metadata.schema.toAttributes,
child = viewPlan)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,8 @@ case class CreateViewCommand(
verifyTemporaryObjectsNotExists(catalog, isTemporary, name, child)

if (viewType == LocalTempView) {
val shouldUncache = replace && catalog.getTempView(name.table).exists {
// Uncache View logical plan without checking the same result check, since it's unresolved.
case _: View => true
case other => !other.sameResult(child)
}
if (shouldUncache) {
if (replace && catalog.getRawTempView(name.table).isDefined &&
!catalog.getRawTempView(name.table).get.sameResult(child)) {
logInfo(s"Try to uncache ${name.quotedString} before replacing.")
checkCyclicViewReference(analyzedPlan, Seq(name), name)
CommandUtils.uncacheTableOrView(sparkSession, name.quotedString)
Expand All @@ -141,12 +137,8 @@ case class CreateViewCommand(
} else if (viewType == GlobalTempView) {
val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
val viewIdent = TableIdentifier(name.table, Option(db))
val shouldUncache = replace && catalog.getGlobalTempView(name.table).exists {
// Uncache View logical plan without checking the same result check, since it's unresolved.
case _: View => true
case other => !other.sameResult(child)
}
if (shouldUncache) {
if (replace && catalog.getRawGlobalTempView(name.table).isDefined &&
!catalog.getRawGlobalTempView(name.table).get.sameResult(child)) {
logInfo(s"Try to uncache ${viewIdent.quotedString} before replacing.")
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent)
CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1272,4 +1272,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
}
}
}

test("SPARK-33647: cache table support for permanent view") {
withView("v1") {
spark.catalog.clearCache()
sql("create or replace view v1 as select 1")
sql("cache table v1")
assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1")).isDefined)
sql("create or replace view v1 as select 1, 2")
assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1")).isEmpty)
sql("cache table v1")
assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1, 2")).isDefined)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -812,20 +812,6 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
}
}

test("creating local temp view should not affect existing table reference") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we remove this test?

Copy link
Contributor Author

@linhongliu-db linhongliu-db Dec 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is same to the newly added test case in SQLViewTestSuite.
They both test the scenario:
global temp view refers to a table, create a temporary view with the same name, won't affect the result.

But SQLViewTestSuite will covert all 3 kinds of views, and also test the same table name in different database.

withTable("t") {
withTempView("t") {
withGlobalTempView("v") {
val globalTempDB = spark.sharedState.globalTempViewManager.database
Seq(2).toDF("c1").write.format("parquet").saveAsTable("t")
sql("CREATE GLOBAL TEMPORARY VIEW v AS SELECT * FROM t")
sql("CREATE TEMPORARY VIEW t AS SELECT 1")
checkAnswer(sql(s"SELECT * FROM ${globalTempDB}.v"), Seq(Row(2)))
}
}
}
}

test("SPARK-33141: view should be parsed and analyzed with configs set when creating") {
withTable("t") {
withView("v1", "v2", "v3", "v4", "v5") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,29 @@ abstract class SQLViewTestSuite extends QueryTest with SQLTestUtils {
}
}

test("view should use captured catalog and namespace to resolve relation") {
withTempDatabase { dbName =>
withTable("default.t", s"$dbName.t") {
withTempView("t") {
// create a table in default database
sql("USE DEFAULT")
Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
// create a view refer the created table in default database
val viewName = createView("v1", "SELECT * FROM t")
// using another database to create a table with same name
sql(s"USE $dbName")
Seq(4, 5, 6).toDF("c1").write.format("parquet").saveAsTable("t")
// create a temporary view with the same name
sql("CREATE TEMPORARY VIEW t AS SELECT 1")
withView(viewName) {
// view v1 should still refer the table defined in `default` database
checkViewOutput(viewName, Seq(Row(2), Row(3), Row(1)))
}
}
}
}
}

test("SPARK-33692: view should use captured catalog and namespace to lookup function") {
val avgFuncClass = "test.org.apache.spark.sql.MyDoubleAvg"
val sumFuncClass = "test.org.apache.spark.sql.MyDoubleSum"
Expand Down Expand Up @@ -231,7 +254,6 @@ abstract class SQLViewTestSuite extends QueryTest with SQLTestUtils {
class LocalTempViewTestSuite extends SQLViewTestSuite with SharedSparkSession {
override protected def viewTypeString: String = "TEMPORARY VIEW"
override protected def formattedViewName(viewName: String): String = viewName

}

class GlobalTempViewTestSuite extends SQLViewTestSuite with SharedSparkSession {
Expand Down