Skip to content

Conversation

@imback82
Copy link
Contributor

@imback82 imback82 commented Nov 26, 2019

What changes were proposed in this pull request?

This PR makes Analyzer.ResolveRelations responsible for looking up both v1 and v2 tables from the session catalog and create an appropriate relation.

Why are the changes needed?

Currently there are two issues:

  1. As described in SPARK-29966, the logic for resolving relation can load a table twice, which is a perf regression (e.g., Hive metastore can be accessed twice).
  2. As described in SPARK-30001, if a catalog name is specified for v1 tables, the query fails:
scala> sql("create table t using csv as select 1 as i")
res2: org.apache.spark.sql.DataFrame = []

scala> sql("select * from t").show
+---+
|  i|
+---+
|  1|
+---+

scala> sql("select * from spark_catalog.t").show
org.apache.spark.sql.AnalysisException: Table or view not found: spark_catalog.t; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation [spark_catalog, t]

Does this PR introduce any user-facing change?

Yes. Now the catalog name is resolved correctly:

scala> sql("create table t using csv as select 1 as i")
res0: org.apache.spark.sql.DataFrame = []                                       

scala> sql("select * from t").show
+---+
|  i|
+---+
|  1|
+---+


scala> sql("select * from spark_catalog.t").show
+---+
|  i|
+---+
|  1|
+---+

How was this patch tested?

Added new tests.

@SparkQA
Copy link

SparkQA commented Nov 26, 2019

Test build #114480 has finished for PR 26684 at commit 01fad27.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 27, 2019

Test build #114482 has finished for PR 26684 at commit ce0e450.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case i @ InsertIntoStatement(
u @ UnresolvedRelation(CatalogObjectIdentifier(catalog, ident)), _, _, _, _)
if i.query.resolved && CatalogV2Util.isSessionCatalog(catalog) =>
val relation = ResolveTempViews(u) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this? temp views should always be resolve first. If we reach here, it's not a temp view.

Copy link
Contributor Author

@imback82 imback82 Nov 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

u is inside InsertIntoStatement (and not its children), so it is not resolved when we reach here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! Can we resolve temp views inside InsertIntoStatement in ResolveTempViews as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we resolve temp views for InsertIntoStatment.table in ResolveTempViews, we need additional rule here to match SubqueryAlias. Is that what you were suggesting?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have EliminateSubqueryAliases here, so SubqueryAlias should be fine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala#L87 is one example, where temp view resolution is required. Maybe the confusion is that InsertIntoStatement is used for INSERT OVERWRITE, INSERT INTO, etc.?

It seems to me that it should insert into the table default.t1 because it doesn't make sense to insert into the temp view.

I think it should still resolve to temp view (for consistent lookup behavior), but fails during analysis check, which is the current behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, how does the analysis check catch the problem? Are we confident that always works?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have the following checks for InsertIntoStatement:

object PreWriteCheck extends (LogicalPlan => Unit) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I think I would rather implement the check a different way:

  1. Don't resolve temp tables in ResolveRelations
  2. In CheckAnalysis (in catalyst), check for InsertInto(Unresolved(...)) and if the unresolved relation is a temp table state that it is a temp table and can't be resolved.

It would be good to know what other databases do in this case because my suggestion to not resolve the identifier as a temp table would allow matching a table here if there is one that conflicts. Probably good to consider this case in the broader clean up of temp table handling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to know what other databases do in this case because my suggestion to not resolve the identifier as a temp table would allow matching a table here if there is one that conflicts.

In Postgres, it is resolved to temp view:

postgres=# create schema s1;
CREATE SCHEMA
postgres=# SET search_path TO s1;
SET
postgres=# create table s1.t (i int);
CREATE TABLE
postgres=# insert into s1.t values (1);
INSERT 0 1

# resolves to table 't'
postgres=# select * from t;
 i 
---
 1
(1 row)

postgres=# create temp view t as select 2 as i;
CREATE VIEW

# resolves to temp view 't'
postgres=# select * from t;
 i 
---
 2
(1 row)

# resolves to temp view 't'
postgres=# insert into t values (1);
2019-12-05 21:40:47.229 EST [5451] ERROR:  cannot insert into view "t"
2019-12-05 21:40:47.229 EST [5451] DETAIL:  Views that do not select from a single table or view are not automatically updatable.

@SparkQA
Copy link

SparkQA commented Nov 27, 2019

Test build #114496 has finished for PR 26684 at commit 7560112.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

cloud-fan commented Nov 27, 2019

This seems like a hard problem. What we need is:

  1. access hive metadata only once when resolving a table.
  2. allow having catalog name in the table name for v1 tables.

There are two things conflicting:

  1. we want to make fewer changes to the v1 code path. we want to still get v1 table through SessionCatalog.lookupRelation
  2. we want to know the table from session catalog is v1 or v2, through V2SessionCatalog.loadTable

To do these 2 things together with one Hive metastore access, we have 3 options:

  1. In ResolveTables, if we see a V1Table, we return a v1 relation instead of skipping it. This needs to refactor the view resolution, so that we don't need to resolve view and table recursively in one rule ResolveRelations.
  2. In ResolveRelations, we look up table using v2 API V2SessionCatalog.loadTable, so that we know if it's a v1 or v2 table.
  3. introduce a cache. This needs to be carefully designed, so that the cache only takes affect between ResolveTables and ResolveRelations.

I think option 2 is the easiest to do at the current stage.

cc @rdblue @brkyvz

@SparkQA
Copy link

SparkQA commented Nov 27, 2019

Test build #114502 has finished for PR 26684 at commit 51e9d14.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

private def lookupRelation(
catalog: CatalogPlugin,
ident: Identifier,
recurse: Boolean): Option[LogicalPlan] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ResolveRelations is going to be completely rewritten before 3.0, then we should fix it to separate view resolution from table resolution and to use multiple executions instead of recursion. The only reason why I don't think we should do that is to avoid too many changes to ResolveRelations just before a release.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I will also explore removing the recursion separately.

@rdblue
Copy link
Contributor

rdblue commented Nov 27, 2019

I think that this goes too far by adding more V2 resolution into ResolveRelations and making too many changes to that rule.

It is helpful to consider how resolution works for other nodes to understand the problem with ResolveRelations. Let's use ALTER TABLE ... ADD COLUMN as an example. That is parsed into AlterTableAddColumnsStatement(multipartIdent, ...). Next, if the identifier requires v2 because it uses a non-session catalog, ResolveCatalogs will convert the plan to v2 with an UnresolvedV2Relation. If the identifier uses the session catalog, then ResolveSessionCatalog looks up the table with the V2SessionCatalog. If that returns a V1Table then it will create a v1 plan. If it returns anything else, then it uses a v2 plan. Note that the v2 session catalog is used to test whether the table is v1 or v2, and the TableProvider check should be done there.

This shows that there are 2 decision points that are missing from ResolveRelations:

  1. If the identifier uses a non-session catalog, v2 should be used (should be ignored by ResolveRelations)
  2. If the V2SessionCatalog returns anything but a V1Table, v2 should be used

In addition, ResolveRelations implements its own recursion, so any table included by a view will be resolved using ResolveRelations without first running ResolveTables.

As I've said earlier, I don't think it is a good idea to make major changes to ResolveRelations, and I think it is worse to mix v1 and v2 table resolution in that rule. But I don't think we need to. We just need to make a couple alterations so that ResolveRelations and ResolveTables operate independently, just like ResolveCatalogs and ResolveSessionCatalog:

  1. ResolveRelations should only resolve session catalog references, and ResolveTables should resolve only non-session catalog references using the same extractors. (I think @cloud-fan also suggested this)
  2. ResolveRelations should use the V2SessionCatalog to look up a relation and should only convert to v1 if it returns V1Table.

If we make those changes, then we no longer need to run ResolveTables in ResolveRelations, which doesn't entirely work anyway because of the custom recursion.

We will also need to follow up with a fix for views. Views that are defined with the session catalog as the current catalog are okay because ResolveRelations will correctly use the session catalog and the view definition's database. However, views that are created in the session catalog using CREATE VIEW session_catalog.db.view_name ... could have a different current catalog and a multi-part namespace. For Spark 3.0, I suggest we disallow view creation when any catalog other than the session catalog is the current catalog, or ensure that all tables are fully-qualified.

@imback82
Copy link
Contributor Author

Thanks @cloud-fan and @rdblue for detailed explanation and suggestion!
I will try to minimize changes in v1 code path in the next iteration. The fact that V1Table has the V1 provider should help a bit.

There are few things we need to follow up:

  1. "If we make those changes, then we no longer need to run ResolveTables in ResolveRelations, which doesn't entirely work anyway because of the custom recursion.".

I can address this in this PR.

  1. "For Spark 3.0, I suggest we disallow view creation when any catalog other than the session catalog is the current catalog, or ensure that all tables are fully-qualified."

I agree, and I can do a follow up PR to disallow this scenario.

  1. "Whether to return a V1 table is up to the session catalog implementation. That's where the TableProvider check should happen, so it can either return a V1Table for a v1 provider, or a v2 table for a v2 provider."

The current implementation of V2SessionCatalog always returns V1Table. Should we update it to handle v2 providers?

@cloud-fan
Copy link
Contributor

The current implementation of V2SessionCatalog always returns V1Table. Should we update it to handle v2 providers?

This will be fixed by #25651 . We can leave it here.

@cloud-fan
Copy link
Contributor

cloud-fan commented Dec 2, 2019

  1. "ResolveRelations should only resolve session catalog references, and ResolveTables should resolve only non-session catalog references using the same extractors. (I think @cloud-fan also suggested this)"
  2. "ResolveRelations should use the V2SessionCatalog to look up a relation and should only convert to v1 if it returns V1Table."

This is also what I suggested with option 2. Also agree that we should fix view creation later.

@imback82 imback82 changed the title [WIP][SPARK-30001][SQL] ResolveRelations should handle both V1 and V2 tables. [SPARK-30001][SQL] ResolveRelations should handle both V1 and V2 tables. Dec 3, 2019
@SparkQA
Copy link

SparkQA commented Dec 3, 2019

Test build #114747 has finished for PR 26684 at commit 985e84d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

CatalogV2Util.loadTable(catalog, newIdent) match {
case Some(v1Table: V1Table) =>
val tableIdent = TableIdentifier(newIdent.name, newIdent.namespace.headOption)
if (!isRunningDirectlyOnFiles(tableIdent)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this check? If we find a v1 table, we should read this table instead of treating table name as path and read files directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! For isRunningDirectlyOnFiles to be true, the table should not exist. If CatalogV2Util.loadTable returned v1 table, it means that the table exists, so this will always be false.

private def lookupV2Relation(identifier: Seq[String]): Option[DataSourceV2Relation] =
identifier match {
case CatalogObjectIdentifier(catalog, ident) if !CatalogV2Util.isSessionCatalog(catalog) =>
case NonSessionCatalogAndIdentifier(catalog, ident) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we also respect current namespace here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I was planning to do it as a separate PR. Would that be OK?

@SparkQA
Copy link

SparkQA commented Dec 5, 2019

Test build #114887 has finished for PR 26684 at commit da50735.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

@rdblue
Copy link
Contributor

rdblue commented Dec 5, 2019

+1

I'm okay with resolving temp views in InsertInto for now, we can follow up to fix it. I don't think that temp views should be allowed as an insert target though.

@cloud-fan
Copy link
Contributor

@rdblue there are some history stories here. Spark supports CREATE TEMP VIEW USING, which creates a special temp view that points to a data source table (e.g. a parquet table, a JDBC table). So INSERT INTO needs to support temp views.

Maybe we can remove CREATE TEMP VIEW USING too, but that needs more discussion.

@SparkQA
Copy link

SparkQA commented Dec 6, 2019

Test build #114924 has finished for PR 26684 at commit fad6d34.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

the last commit just renames a method and has passed compilation, the previous commit passes all tests, I'm merging to master, thanks!

@cloud-fan cloud-fan closed this in b86d4bb Dec 6, 2019
@SparkQA
Copy link

SparkQA commented Dec 6, 2019

Test build #114927 has finished for PR 26684 at commit 23cc9d3.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

attilapiros pushed a commit to attilapiros/spark that referenced this pull request Dec 6, 2019
### What changes were proposed in this pull request?

This PR makes `Analyzer.ResolveRelations` responsible for looking up both v1 and v2 tables from the session catalog and create an appropriate relation.

### Why are the changes needed?

Currently there are two issues:
1. As described in [SPARK-29966](https://issues.apache.org/jira/browse/SPARK-29966), the logic for resolving relation can load a table twice, which is a perf regression (e.g., Hive metastore can be accessed twice).
2. As described in [SPARK-30001](https://issues.apache.org/jira/browse/SPARK-30001), if a catalog name is specified for v1 tables, the query fails:
```
scala> sql("create table t using csv as select 1 as i")
res2: org.apache.spark.sql.DataFrame = []

scala> sql("select * from t").show
+---+
|  i|
+---+
|  1|
+---+

scala> sql("select * from spark_catalog.t").show
org.apache.spark.sql.AnalysisException: Table or view not found: spark_catalog.t; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation [spark_catalog, t]
```

### Does this PR introduce any user-facing change?

Yes. Now the catalog name is resolved correctly:
```
scala> sql("create table t using csv as select 1 as i")
res0: org.apache.spark.sql.DataFrame = []

scala> sql("select * from t").show
+---+
|  i|
+---+
|  1|
+---+

scala> sql("select * from spark_catalog.t").show
+---+
|  i|
+---+
|  1|
+---+
```

### How was this patch tested?

Added new tests.

Closes apache#26684 from imback82/resolve_relation.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

CatalogV2Util.loadTable(catalog, newIdent) match {
case Some(v1Table: V1Table) =>
val tableIdent = TableIdentifier(newIdent.name, newIdent.namespace.headOption)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'tableIdent' is not used at all.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants