Skip to content

Conversation

@dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Apr 27, 2020

This is a backport of #28326 . The authorship is kept. (Credit to @LiangchangZ and @xuanyuanking )

What changes were proposed in this pull request?

Make metadata propagatable between Aliases.

Why are the changes needed?

In Structured Streaming, we added an Alias for TimeWindow by default.

TimeWindow(timeColumn.expr, windowDuration, slideDuration, startTime)
}.as("window")

For some cases like stream join with watermark and window, users need to add an alias for convenience(we also added one in StreamingJoinSuite). The current metadata handling logic for as will lose the watermark metadata
def name(alias: String): Column = withExpr {
normalizedExpr() match {
case ne: NamedExpression => Alias(expr, alias)(explicitMetadata = Some(ne.metadata))
case other => Alias(other, alias)()
}
}

and finally cause the AnalysisException:

Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition

Does this PR introduce any user-facing change?

Bugfix for an alias on time window with watermark.

How was this patch tested?

New UTs added. One for the functionality and one for explaining the common scenario.

xuanyuanking and others added 2 commits April 27, 2020 16:00
…data lost

Credit to LiangchangZ, this PR reuses the UT as well as integrate test in #24457. Thanks Liangchang for your solid work.

Make metadata propagatable between Aliases.

In Structured Streaming, we added an Alias for TimeWindow by default.
https://github.com/apache/spark/blob/590b9a0132b68d9523e663997def957b2e46dfb1/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L3272-L3273
For some cases like stream join with watermark and window, users need to add an alias for convenience(we also added one in StreamingJoinSuite). The current metadata handling logic for `as` will lose the watermark metadata
https://github.com/apache/spark/blob/590b9a0132b68d9523e663997def957b2e46dfb1/sql/core/src/main/scala/org/apache/spark/sql/Column.scala#L1049-L1054
 and finally cause the AnalysisException:
```
Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition
```

Bugfix for an alias on time window with watermark.

New UTs added. One for the functionality and one for explaining the common scenario.

Closes #28326 from xuanyuanking/SPARK-27340.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit ba7adc4)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit d272482)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@dongjoon-hyun
Copy link
Member Author

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Apr 27, 2020

cc @holdenk since she is the release manager of 2.4.6.
Also, cc @dbtsai

Without this, we cannot run long running Structure Streaming with State queries due to the lack of watermark.

@holdenk
Copy link
Contributor

holdenk commented Apr 27, 2020

From a backporting perspective it seems like this is maybe a bug given that name doesn't appear to be copying the metadata even without an explicit metadata set. Do I have the right context @dongjoon-hyun ?

@dongjoon-hyun
Copy link
Member Author

Yes. Correct. In name function, the metadata loss occurs. Without the metadata, some structured query fails at Analysis Stage, and another structure queries becomes growing the internal group by state indefinitely.

@gatorsmile
Copy link
Member

gatorsmile commented Apr 27, 2020

@dongjoon-hyun @holdenk My understanding is different from what you said above.

It sounds like the metadata is not available for TimeWindow expression when the function name(alias: String) copies the metadata to explicitMetadata. Thus, we do not set it when calling the APIs in Column.

@dongjoon-hyun
Copy link
Member Author

@gatorsmile . Are you assuming NamedExpression only?

@gatorsmile
Copy link
Member

Any other Expressions except NamedExpression has the field Metadata?

@dongjoon-hyun
Copy link
Member Author

Got it. You are right.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Apr 28, 2020

It sounds like the metadata is not available for TimeWindow expression when the function name(alias: String) copies the metadata to explicitMetadata. Thus, we do not set it when calling the APIs in Column.

That's right, but I don't see the reason of the fix only because we encountered an actual problem from there. This has been simply wrong because we set explicit metadata on Alias while it's not asked to overwrite. Javadoc of the Alias clearly describes the usage of explicitMetadata:

@param explicitMetadata Explicit metadata associated with this alias that overwrites child's.

(that's why I couldn't agree with the initial proposal from #28326.)

@HeartSaVioR
Copy link
Contributor

That said, I'd support porting back this to 2.4 as it fixes the wrong code. This change might impact broader audiences (so technically saying the PR/commit title doesn't represent actual change, sorry I should have found this earlier) hence the risk might be bigger than we may imagine, but I couldn't imagine the case which relies on the previous behavior (bug) to make it work. If we can imagine anything then it might be a signal to reconsider.

@dongjoon-hyun dongjoon-hyun deleted the SPARK-27340 branch April 28, 2020 02:00
@SparkQA
Copy link

SparkQA commented Apr 28, 2020

Test build #121930 has finished for PR 28377 at commit bb09254.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 28, 2020

Test build #121936 has finished for PR 28377 at commit b6e410b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk
Copy link
Contributor

holdenk commented Apr 30, 2020

Are we still planning on backporting this or not? I see you deleted the branch so I'm assuming this particular attempt at backporting is abandoned but the root issue does seem important.

@dongjoon-hyun
Copy link
Member Author

We have no viable solution for branch-2.4. You can proceed 2.4.6 release without this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants