Skip to content

Conversation

@nsyca
Copy link
Contributor

@nsyca nsyca commented Jul 29, 2016

What changes were proposed in this pull request?

This patch fixes the incorrect results in the rule ResolveSubquery in Catalyst's Analysis phase by returning an error message when the LIMIT is found in the path from the parent table to the correlated predicate in the subquery.

How was this patch tested?

./dev/run-tests
a new unit test on the problematic pattern.

nsyca added 2 commits July 29, 2016 17:43
…rrect results

## What changes were proposed in this pull request?

This patch fixes the incorrect results in the rule ResolveSubquery in Catalyst's Analysis phase.

## How was this patch tested?
./dev/run-tests
a new unit test on the problematic pattern.
…rrect results

## What changes were proposed in this pull request?

This patch fixes the incorrect results in the rule ResolveSubquery in Catalyst's Analysis phase.

## How was this patch tested?
./dev/run-tests
a new unit test on the problematic pattern.
@gatorsmile
Copy link
Member

cc @hvanhovell

@gatorsmile
Copy link
Member

@nsyca Could you update the PR description? Thanks!

@gatorsmile
Copy link
Member

We also need more test cases to prove it work as expected.

@nsyca
Copy link
Contributor Author

nsyca commented Jul 29, 2016

I include two examples of "good" case in the JIRA to show that this fix only blocks cases where Spark will produce incorrect results. I need to find a place to host those "good" cases. Don't think AnalysisErrorSuite.scala is the right place.

@gatorsmile
Copy link
Member

I think the positive cases can move here.
sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala

@nsyca
Copy link
Contributor Author

nsyca commented Jul 29, 2016

@gatorsmile: thanks. I will add them in SubquerySuite.

@nsyca
Copy link
Contributor Author

nsyca commented Jul 29, 2016

Two good cases, which return the same result set, with and without this proposed fix:

sql("select c1 from t1 where exists (select 1 from t2 where t1.c1=t2.c2) and exists (select 1 from t2 LIMIT 1)").show

The above query will return both rows from T1.

sql("select c1 from t1 where exists (select 1 from (select 1 from t2 limit 1) where t1.c1=t2.c2)").show

This one above will return 1 row but which row will return is non-deterministic depending on what the first row from T2 will return from the innermost subquery.

@gatorsmile
Copy link
Member

It sounds great to me!

@hvanhovell
Copy link
Contributor

hvanhovell commented Jul 31, 2016

Ok, this looks pretty good. One overall comment: We are basically blacklisting operators here (which is fine IMO), should we check if there any other operators we should care about? If there are we might need to generalize the blacklisting (instead of working case by case)

@SparkQA
Copy link

SparkQA commented Jul 31, 2016

Test build #3199 has finished for PR 14411 at commit edca333.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@nsyca
Copy link
Contributor Author

nsyca commented Aug 1, 2016

@hvanhovell,

Thank you for your comment. There are quite a few patterns being blacklisted already, such as correlation under set operators (UNION, EXCEPT, INTERSECT), correlation outside of WHERE/HAVING context, correlation in the right table of a LEFT [OUTER] JOIN (and the left table of a RIGHT [OUTER] JOIN]). I am working on discovering more issues in this area but it looks like a bigger project to me. I have a general idea that the rewrite of correlation subquery to join should not happen in the Analysis phase. We should build a Logical plan to represent the subquery and perform the rewrite at the Optimizer phase instead.

I am new to the Spark code and this is my first PR. So I'd like to make it a small, self-contained project to gain my confidence in working with the code.

@gatorsmile
Copy link
Member

retest this please

@hvanhovell
Copy link
Contributor

@nsyca We do not rewrite the subquery into a join during analysis. We rewrite subqueries into joins during optimization. We do two things during analysis:

  1. We check if the subquery expression is valid. In order to do this we need to check if the query resolves (given the outer query), and that no outer references are used in (the children of) nodes for which the joining behavior is ill defined (UNION for instance).
  2. We also rewrite IN/EXISTS/Scalar subquery expressions into a PredicateSubquery. We do this by extracting correlated predicates and by rewriting the intermediate tree. One could argue that this could also be done during optimization, but this was needed to get correlated predicates with aggregate functions (referencing the outer query) working (see the example below). For this we needed to push the complete outer condition into the Aggregate below the Having clause. Perhaps there is a simpler way of doing this though.
select b.key, min(b.value)
from src b
group by b.key
having exists ( select a.key
                from src a
                where a.value > 'val_9' and a.value = min(b.value)
                )

I think we should also limit the use of Sample, which also filters non-deterministically and might give us very wrong results as well.

@SparkQA
Copy link

SparkQA commented Aug 1, 2016

Test build #3200 has finished for PR 14411 at commit 64184fd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@nsyca
Copy link
Contributor Author

nsyca commented Aug 1, 2016

@hvanhovell,

First, my apologies for delaying the replies. I am travelling this week, only getting spontaneous connections. Thank you for your explanation of the implementation and the reason behind the choice of the implementation. It is very helpful for a beginner like me.

My bad! What I meant in my previous comment on rewriting of subqueries to join is actually the moving of the positions of the correlated predicates from their original positions to outside of the scopes of subqueries, specifically, the call to the function pullOutCorrelatedPredicates() -- I hope I got it right this time. I see this as one of the root causes of many problems. Bear with me, I don't have a good solution as I am still getting myself familiar with the code. Here is an example of the problems, in my opinion. With the rewrite, we cannot distinct between the EXISTS form and IN form of the original SQL.

select * from t1 where exists (select 1 from t2 where t1.c1=t2.c2)
-and-
select * from t1 where t1.c1 in (select t2.c2 from t2)

are represented after Analysis phase. This does not have issue because they are semantically equivalent. However, when we add the NOT in

select * from t1 where not exists (select 1 from t2 where t1.c1=t2.c2)
-and-
select * from t1 where t1.c1 not in (select t2.c2 from t2)

are NOT semantically equivalent when T2.C2 can produce NULL values.

Lastly, your comment on the operator SAMPLE seems right. I will give it shot on adding it to this PR.

Thanks again for your patience.

@hvanhovell
Copy link
Contributor

@nsyca No problem. We actually support NOT IN queries. We set the the PredicateSubquery.nullAware flag to true if we encounter an IN subquery expression. NOT IN is planned using a NULL aware anti-join in the optimizer.

I have written a blogpost/notebook on the current implementation and state of subqueries in Spark 2.0 (including known issues): SQL Subqueries in Apache Spark 2.0

@nsyca
Copy link
Contributor Author

nsyca commented Aug 2, 2016

@hvanhovell , thanks for sharing the blog. I will read thru. It's nice to see the implementation of NOT IN this way. I have an idea to do it differently but let's move this to another place.

On the SAMPLE issue you raised, I think we should not flag an error. Here is what I tested:

Seq((1,1), (2,2)).toDF("c1","c2").createOrReplaceTempView("t1")
Seq((1,1), (2,2)).toDF("c3","c4").createOrReplaceTempView("t2")

scala> sql("select * from t1 where exists (select 1 from t2 tablesample(10 percent) s where c3=c1)").explain(true)
== Parsed Logical Plan ==
'Project [*]
+- 'Filter exists#29
   :  +- 'SubqueryAlias exists#29
   :     +- 'Project [unresolvedalias(1, None)]
   :        +- 'Filter ('c3 = 'c1)
   :           +- 'Sample 0.0, 0.1, false, 159
   :              +- 'UnresolvedRelation `t2`, s
   +- 'UnresolvedRelation `t1`

From the parser, the correlated predicate in the Filter operation is after the sampling operation. We should be able to treat the semantic of the sampling as an one-time execution and being reused for every input from the outer table. Using the analogy I used for LIMIT as described in the JIRA SPARK=16804, the SAMPLE operation is not on the correlation path and therefore the move of the correlated predicate above the scope of the subquery does not change the semantic of the query.

Your thoughts, please!

@nsyca
Copy link
Contributor Author

nsyca commented Aug 5, 2016

@hvanhovell,

Have you had a chance to review my last update? Are there anything I should add/change in this PR?

@hvanhovell
Copy link
Contributor

@nsyca I think we do need to prevent sampling from being used. I have the following example:

range(0, 10).createOrReplaceTempView("tbl_a")
range(0, 10).select($"id", $"id" % 10 as "grp_id").createOrReplaceTempView("tbl_b")
range(0, 10).select($"id", $"id" % 10 as "grp_id").createOrReplaceTempView("tbl_c")

val plan = sql("""
select *
from   tbl_a
where  not exists(
        select 1
        from   tbl_b
               join (select *
                     from   tbl_c
                     where  tbl_c.id = tbl_a.id) tablesample(0.01 percent) c
                on c.grp_id = tbl_b.grp_id)
""")

This results in the following analyzed plan:

Project [id#8L]
+- Filter NOT predicate-subquery#34 [(id#24L = id#8L)]
   :  +- SubqueryAlias predicate-subquery#34 [(id#24L = id#8L)]
   :     +- Project [1 AS 1#42, id#24L]
   :        +- Join Inner, (grp_id#29L = grp_id#19L)
   :           :- SubqueryAlias tbl_b
   :           :  +- Project [id#14L, (id#14L % cast(10 as bigint)) AS grp_id#19L]
   :           :     +- Range (0, 10, splits=8)
   :           +- SubqueryAlias c
   :              +- Sample 0.0, 1.0E-4, false, 968
   :                 +- Project [id#24L, grp_id#29L]
   :                    +- SubqueryAlias tbl_c
   :                       +- Project [id#24L, (id#24L % cast(10 as bigint)) AS grp_id#29L]
   :                          +- Range (0, 10, splits=8)
   +- SubqueryAlias tbl_a
      +- Range (0, 10, splits=8)

Clearly the predicate has been pulled out of a sampled relation. I don't think we want this.

I am looking forward to discuss your NOT IN approach. Could you open a JIRA for that?

@nsyca
Copy link
Contributor Author

nsyca commented Aug 5, 2016

@hvanhovell ,

Yes. I agree that we need to block this case. I was under the impression that the tablesample clause is supported only when referenced to a base table, not a derived table. It's clearly not in Spark. I will add code to prevent it.

On the NOT IN topic, let me spend some time arranging my thought then I will open a JIRA.

thanks.

@nsyca nsyca changed the title [SPARK-16804][SQL] Correlated subqueries containing LIMIT return incorrect results [SPARK-16804][SQL] Correlated subqueries containing non-deterministic operations return incorrect results Aug 5, 2016
@nsyca
Copy link
Contributor Author

nsyca commented Aug 5, 2016

@hvanhovell,

Code and test case for blocking TABLESAMPLE is in. Could you please review? Thanks.

case e: Expand =>
failOnOuterReferenceInSubTree(e, "an EXPAND")
e
case l @ LocalLimit(_, _) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: use l: LocalLimit instead of l @ LocalLimit(_, _) it makes it a bit more readable. Same for GlobalLimit and Sample.

@nsyca
Copy link
Contributor Author

nsyca commented Aug 5, 2016

Done. Thanks for the comment. It looks more compact and does not break if those 3 operators change their arguments in the future.

@hvanhovell
Copy link
Contributor

LGTM - pending Jenkins.

@hvanhovell
Copy link
Contributor

@nsyca I have triggered a manual build. I'll merge as soon as it completes successfully.

@SparkQA
Copy link

SparkQA commented Aug 7, 2016

Test build #3206 has finished for PR 14411 at commit ac43ab4.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 7, 2016

Test build #3207 has finished for PR 14411 at commit 631d396.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

gatorsmile commented Aug 7, 2016

Before you submitting the PR, you can run this command to check the scala style:

dev/lint-scala

@nsyca
Copy link
Contributor Author

nsyca commented Aug 8, 2016

Thanks, @gatorsmile. This time I ran dev/lint-scala. Hope it's my last attempt to get this work thru.

@gatorsmile
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Aug 8, 2016

Test build #3208 has finished for PR 14411 at commit 7eb9b2d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

Merging to master. Thanks for working on this!

Ping me as soon as you open a JIRA for null aware anti joins.

@asfgit asfgit closed this in 06f5dc8 Aug 8, 2016
@nsyca
Copy link
Contributor Author

nsyca commented Aug 8, 2016

@hvanhovell,

Thanks for getting the PR merged and sorry for causing a few hiccups before I got it right. It's my first PR.

I have opened a new JIRA, SPARK-16951, to track the NOT IN issue. Currently it still contains little information but I will start to fill in in the next few days.

Btw, would you mind assigning me (nsyca) the Assignee of SPARK-16804?

Will look forward to collaborating with you in future issues.

asfgit pushed a commit that referenced this pull request Nov 4, 2016
… PRs

## What changes were proposed in this pull request?
This PR backports two subquery related PRs to branch-2.0:

- #14411
- #15761

## How was this patch tested?
Added a tests  to `SubquerySuite`.

Author: Nattavut Sutyanyong <nsy.can@gmail.com>
Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #15772 from hvanhovell/SPARK-17337-2.0.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants