Skip to content

Conversation

@laraschmidt
Copy link
Contributor

@laraschmidt laraschmidt commented Sep 20, 2021

A DoFn may emit elements with a timestamp up to DoFn#getAllowedTimestampSkew() before the current element's timestamp. This change implements this change for timer's as well. Now a timer may have an output timestamp up to DoFn#getAllowedTimestampSkew() before the current element's timestamp. Before this change a timer's output timestamp could not be before the current output element.

Additional Context: https://lists.apache.org/thread.html/r7554658114ddde86c5d82e1c39fe7e1ef587fe926b8e406d1130d501%40%3Cdev.beam.apache.org%3E

@reuvenlax


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

ValidatesRunner compliance status (on master branch)

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- Build Status Build Status Build Status Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Python --- Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status ---
XLang Build Status Build Status Build Status Build Status Build Status ---

Examples testing status on various runners

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- --- --- --- --- --- ---
Java --- Build Status
Build Status
Build Status
--- --- --- --- ---
Python --- --- --- --- --- --- ---
XLang --- --- --- --- --- --- ---

Post-Commit SDK/Transform Integration Tests Status (on master branch)

Go Java Python
Build Status Build Status Build Status
Build Status
Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@lukecwik
Copy link
Member

Is there an associated JIRA?

@reuvenlax
Copy link
Contributor

I think you have to update the similar check in FnApiDoFnRunner

@laraschmidt laraschmidt changed the title Allow for DoFn#getAllowedTimestampSkew() when checking the output timestamp [BEAM-12931] Allow for DoFn#getAllowedTimestampSkew() when checking the output timestamp Sep 21, 2021
@laraschmidt
Copy link
Contributor Author

Added checks in FnApiDoFnRunner which was not checking for skew on timers and not checking timestamps at all for normal emit. PTAL and make sure it makes sense and I can prob add some tests in FnApiRunner.

PTAL @reuvenlax @lukecwik Thanks!

@laraschmidt laraschmidt force-pushed the allowskew2 branch 2 times, most recently from abbc86d to a88948b Compare September 24, 2021 21:20
Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this allow for infinite skew since if have a timer at X and skew of -1 then the first time the timer is processed you can output at time X-1 and when it gets scheduled again you can now output at X-2 since the the new timers timestamp is X-1?

The changes to the FnApiDoFnRunner to check timestamp output validity makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can handle the proper bounds via:

Instant lowerBound;
try {
   lowerBound = elementInputTimestamp.minus(fn.getAllowedTimestampSkew());
catch (ArithmeticException e) {
   lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
}
if (outputTimestamp.isBefore(lowerBound)) {
  ...
}

Finally it would make sense to check the upper bound as well of BoundedWindow.TIMESTAMP_MAX_VALUE

Copy link
Contributor Author

@laraschmidt laraschmidt Sep 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you not use TimerMap as it limits the number of runners this can run on and use individual timers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The elements need to share a timer then but I suppose the same logic will hit whenever we set the output timestamp. I removed the output of elements though since this would be harder. Wasn't really needed anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to tag with UsesTimersInParDo.class and/or UsesTimerMap.class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Assuming just the validates runner tests need this because I don't see it elsewhere in the file. Let me know if not.

@laraschmidt
Copy link
Contributor Author

Won't this allow for infinite skew since if have a timer at X and skew of -1 then the first time the timer is processed you can output at time X-1 and when it gets scheduled again you can now output at X-2 since the the new timers timestamp is X-1?

So my understanding of the reason for these checks is to stop people from doing the wrong thing without realizing it. We don't even take any different action based on this variable. It seems okay to apply this to each specific output timestamp and let you skew more if you chain timers in this fashion.

On a more practical note, there's reasons why you might want a timer to output an earlier element if you've properly set up watermark holds. There's currently no way to do that so we need some allowance. It would probably be better if we could constrain skew from the first output timestamp but I don't think that's available in the later timers, right?

If you disagree with the approach, I can bring this up on the email thread for others to chime in in case they are not checking here.

@laraschmidt
Copy link
Contributor Author

PTAL, @lukecwik @reuvenlax

@laraschmidt laraschmidt force-pushed the allowskew2 branch 5 times, most recently from bf7d2c3 to 7cd3818 Compare October 4, 2021 18:20
@laraschmidt laraschmidt requested review from je-ik and lukecwik October 4, 2021 22:39
Copy link
Contributor

@je-ik je-ik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
Do we have a follow-up for un-deprececating the method?

@lukecwik
Copy link
Member

lukecwik commented Oct 6, 2021

Won't this allow for infinite skew since if have a timer at X and skew of -1 then the first time the timer is processed you can output at time X-1 and when it gets scheduled again you can now output at X-2 since the the new timers timestamp is X-1?

So my understanding of the reason for these checks is to stop people from doing the wrong thing without realizing it. We don't even take any different action based on this variable. It seems okay to apply this to each specific output timestamp and let you skew more if you chain timers in this fashion.

On a more practical note, there's reasons why you might want a timer to output an earlier element if you've properly set up watermark holds. There's currently no way to do that so we need some allowance. It would probably be better if we could constrain skew from the first output timestamp but I don't think that's available in the later timers, right?

If you disagree with the approach, I can bring this up on the email thread for others to chime in in case they are not checking here.

I think users will be surprised that their data will be dropped as late once they pass the watermark skew bound if they output past it. The existing logic had guards for this explicitly since it would be surprising for users so I do believe it is important enough to discuss whether there is another approach to solve this or we are ok with this happening.

@laraschmidt
Copy link
Contributor Author

Won't this allow for infinite skew since if have a timer at X and skew of -1 then the first time the timer is processed you can output at time X-1 and when it gets scheduled again you can now output at X-2 since the the new timers timestamp is X-1?

So my understanding of the reason for these checks is to stop people from doing the wrong thing without realizing it. We don't even take any different action based on this variable. It seems okay to apply this to each specific output timestamp and let you skew more if you chain timers in this fashion.
On a more practical note, there's reasons why you might want a timer to output an earlier element if you've properly set up watermark holds. There's currently no way to do that so we need some allowance. It would probably be better if we could constrain skew from the first output timestamp but I don't think that's available in the later timers, right?
If you disagree with the approach, I can bring this up on the email thread for others to chime in in case they are not checking here.

I think users will be surprised that their data will be dropped as late once they pass the watermark skew bound if they output past it. The existing logic had guards for this explicitly since it would be surprising for users so I do believe it is important enough to discuss whether there is another approach to solve this or we are ok with this happening.

We chatted a bit about this offline. There's actually no guarantee that the watermark is held back when using DoFn#getAllowedTimestampSkew. The allowedTimestampSkew just removes the check that we have to avoid accidentally dropping late data. See the javadoc [1] and relevant reply from Jan [2].

[1] https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/DoFn.html#getAllowedTimestampSkew--
[2] https://lists.apache.org/thread.html/r34c70d8a5f213f7bd2f4557019e27b7f07f5120d0a8794512c88568c%40%3Cdev.beam.apache.org%3E

@laraschmidt laraschmidt force-pushed the allowskew2 branch 4 times, most recently from fa5c8c8 to ba23c23 Compare October 19, 2021 20:25
@laraschmidt
Copy link
Contributor Author

PTAL @lukecwik

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking great, need to fix up the OnWindowExpiration case and add a ValidatesRunner test for it.

@codecov
Copy link

codecov bot commented Oct 28, 2021

Codecov Report

Merging #15540 (7b0de72) into master (bc82f6f) will increase coverage by 37.42%.
The diff coverage is n/a.

❗ Current head 7b0de72 differs from pull request most recent head c43415e. Consider uploading reports for the commit c43415e to get more accurate results
Impacted file tree graph

@@             Coverage Diff             @@
##           master   #15540       +/-   ##
===========================================
+ Coverage   46.10%   83.53%   +37.42%     
===========================================
  Files         196      445      +249     
  Lines       19498    61210    +41712     
===========================================
+ Hits         8990    51131    +42141     
- Misses       9538    10079      +541     
+ Partials      970        0      -970     
Impacted Files Coverage Δ
sdks/go/pkg/beam/io/textio/sdf.go
sdks/go/pkg/beam/testing/teststream/teststream.go
sdks/go/pkg/beam/core/util/reflectx/util.go
...ks/go/pkg/beam/core/runtime/coderx/coderx.shims.go
sdks/go/pkg/beam/core/runtime/exec/emit.go
sdks/go/pkg/beam/core/runtime/genx/genx.go
sdks/go/pkg/beam/validate.go
sdks/go/pkg/beam/core/graph/coder/bytes.go
sdks/go/pkg/beam/transforms/stats/sum_switch.go
sdks/go/pkg/beam/testing/passert/count.go
... and 631 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update bc82f6f...c43415e. Read the comment docs.

@laraschmidt
Copy link
Contributor Author

@lukecwik PTAL, still need to check that everything runs internally but otherwise should be good.

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great and I really like the simplification in the testing.

@laraschmidt laraschmidt force-pushed the allowskew2 branch 2 times, most recently from bafa6ce to 5a95bf6 Compare November 10, 2021 23:01
@lukecwik
Copy link
Member

Run Java PreCommit

@laraschmidt laraschmidt force-pushed the allowskew2 branch 3 times, most recently from b373e0a to 3249a0c Compare December 9, 2021 21:39
@laraschmidt laraschmidt force-pushed the allowskew2 branch 3 times, most recently from 8954610 to b3192dc Compare December 15, 2021 20:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants