Skip to content

Conversation

@mosche
Copy link
Member

@mosche mosche commented Aug 8, 2022

Run VR tests for Spark streaming runner rather than custom tests (test are already run as part of the "normal" unit test run).

If forceStreaming is set to true, the TestSparkRunner will replace Read.Bounded with UnboundedReadFromBoundedSource so tests are run in streaming mode.
Additionally this PR adds support for TestStream.

Closes #22472


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@mosche
Copy link
Member Author

mosche commented Aug 8, 2022

Run Spark ValidatesRunner

@mosche
Copy link
Member Author

mosche commented Aug 8, 2022

Unfortunately I'm stuck with some flaky tests. It looks like watermarks are not advanced in a deterministic way.
Below some logs of org.apache.beam.sdk.schemas.AvroSchemaTest.testAvroPipelineGroupBy (edited for readability).

@aromanenko-dev @echauchot if you have some time I'd be more than grateful for a 2nd pair of 👀 .

Successful run (watermark advanced early enough, so that timer is triggered and the only element is emitted):

14:42:52,938 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: input elements: [ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
14:42:52,940 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: non expired input elements: [ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
14:42:52,949 [3] TRACE WindowTracing  - ReduceFnRunner.scheduleGarbageCollectionTimer: Scheduling at GLOBALW_MAX for key:Row2; window:GlobalWindow where inputWatermark:BOUNDEDW_MIN; outputWatermark:null
14:42:52,957 [3] TRACE WindowTracing  - WatermarkHold.addHolds: element hold at GLOBALW_MAX is on time for key:Row2; window:GlobalWindow; inputWatermark:BOUNDEDW_MIN; outputWatermark:null
14:42:52,960 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are SparkTimerInternals{highWatermark=BOUNDEDW_MIN, synchronizedProcessingTime=EPOCH, timers=[TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}], inputWatermark=BOUNDEDW_MIN}
14:42:52,961 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are [] [inputWatermark: BOUNDEDW_MIN]
14:42:52,962 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: output elements are  0
14:42:53,137 [spark-listener-group-appStatus] INFO  GlobalWatermarkHolder  - Put new watermark block: {0=SparkWatermarks{lowWatermark=BOUNDEDW_MIN, highWatermark=BOUNDEDW_MAX, synchronizedProcessingTime=NOW}}
14:42:53,146 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are SparkTimerInternals{highWatermark=BOUNDEDW_MAX, synchronizedProcessingTime=NOW, timers=[TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}], inputWatermark=BOUNDEDW_MIN}
14:42:53,146 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are [TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}] [inputWatermark: BOUNDEDW_MAX]
14:42:53,146 [15] DEBUG WindowTracing  - ReduceFnRunner: Received timer key:Row2; window:GlobalWindow; data:TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false} with inputWatermark:BOUNDEDW_MAX; outputWatermark:null
14:42:53,148 [15] DEBUG WindowTracing  - ReduceFnRunner: Cleaning up for key:Row2; window:GlobalWindow with inputWatermark:BOUNDEDW_MAX; outputWatermark:null
14:42:53,148 [15] DEBUG WindowTracing  - WatermarkHold.extractAndRelease: for key:Row2; window:GlobalWindow; inputWatermark:BOUNDEDW_MAX; outputWatermark:null
14:42:53,149 [15] DEBUG WindowTracing  - WatermarkHold.extractAndRelease.read: clearing for key:Row2; window:GlobalWindow
14:42:53,150 [15] DEBUG WindowTracing  - describePane: ON_TIME pane (prev was null) for key:Row2; windowMaxTimestamp:GLOBALW_MAX; inputWatermark:BOUNDEDW_MAX; outputWatermark:null; isLateForOutput:false
14:42:53,152 [15] TRACE WindowTracing  - ReduceFnRunner.onTrigger: outputWindowedValue key:Row2 value:[Row1] at GLOBALW_MAX
14:42:53,152 [15] DEBUG WindowTracing  - WatermarkHold.clearHolds: For key:Row2; window:GlobalWindow; inputWatermark:BOUNDEDW_MAX; outputWatermark:null
14:42:53,153 [15] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: output elements are TimestampedValueInGlobalWindow{value=KV{Row2, [Row1]}, timestamp=GLOBALW_MAX, pane=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}} 1

Failed run (watermark is advanced too late, timer doesn't trigger and element is lost):

14:41:51,453 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: input elements: [ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
14:41:51,455 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: non expired input elements: [ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
14:41:51,463 [3] TRACE WindowTracing  - ReduceFnRunner.scheduleGarbageCollectionTimer: Scheduling at GLOBALW_MAX for key:Row2; window:GlobalWindow where inputWatermark:BOUNDEDW_MIN; outputWatermark:null
14:41:51,471 [3] TRACE WindowTracing  - WatermarkHold.addHolds: element hold at GLOBALW_MAX is on time for key:Row2; window:GlobalWindow; inputWatermark:BOUNDEDW_MIN; outputWatermark:null
14:41:51,474 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are SparkTimerInternals{highWatermark=BOUNDEDW_MIN, synchronizedProcessingTime=EPOCH, timers=[TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}], inputWatermark=BOUNDEDW_MIN}
14:41:51,474 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are [] [inputWatermark: BOUNDEDW_MIN]
14:41:51,476 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: output elements are  0
14:41:51,658 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are SparkTimerInternals{highWatermark=BOUNDEDW_MIN, synchronizedProcessingTime=EPOCH, timers=[TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}], inputWatermark=BOUNDEDW_MIN}
14:41:51,658 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are [] [inputWatermark: BOUNDEDW_MIN]
14:41:51,658 [15] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: output elements are  0
14:41:51,662 [spark-listener-group-appStatus] INFO  GlobalWatermarkHolder  - Put new watermark block: {0=SparkWatermarks{lowWatermark=BOUNDEDW_MIN, highWatermark=BOUNDEDW_MAX, synchronizedProcessingTime=NOW}}

@github-actions
Copy link
Contributor

github-actions bot commented Aug 8, 2022

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @lukecwik for label java.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@mosche
Copy link
Member Author

mosche commented Aug 9, 2022

Run Spark ValidatesRunner

2 similar comments
@mosche
Copy link
Member Author

mosche commented Aug 9, 2022

Run Spark ValidatesRunner

@mosche
Copy link
Member Author

mosche commented Aug 9, 2022

Run Spark ValidatesRunner

@@ -105,6 +105,9 @@ public void testLateDataAccumulating() {
.advanceWatermarkTo(instant.plus(Duration.standardMinutes(6)))
// These elements are late but within the allowed lateness
.addElements(TimestampedValue.of(4L, instant), TimestampedValue.of(5L, instant))
.advanceWatermarkTo(instant.plus(Duration.standardMinutes(10)))
Copy link
Member Author

@mosche mosche Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kennknowles Maybe you could answer this? I'm wondering if this is an issue of the Spark streaming runner (and how this is handled by other runners) or if it's a lack of my own understanding.

Without advancing the watermark once more the (lower) input watermark remains at 6 mins, but data in [0,5 min) won't be considered late until it passes 10 mins.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just responding to let you know I have been on vacation and I will look at this later today.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot! I'm off as well for a bit, so no rush on this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kennknowles Finally back to this, if you could have a look it would be great:)

@mosche mosche force-pushed the 22472-Spark-TestStream branch from d6cfb9a to e50fec9 Compare August 11, 2022 09:30
@mosche
Copy link
Member Author

mosche commented Aug 11, 2022

Run Spark ValidatesRunner

@mosche
Copy link
Member Author

mosche commented Aug 11, 2022

Run Java PreCommit

@github-actions
Copy link
Contributor

Reminder, please take a look at this pr: @lukecwik

@github-actions
Copy link
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @kennknowles for label java.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@github-actions
Copy link
Contributor

Reminder, please take a look at this pr: @kennknowles

@github-actions
Copy link
Contributor

github-actions bot commented Sep 2, 2022

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @lukecwik for label java.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@github-actions
Copy link
Contributor

Reminder, please take a look at this pr: @lukecwik

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is super useful! Thank you!


classpath = configurations.validatesRunner
testClassesDirs += files(project.sourceSets.test.output.classesDirs)
testClassesDirs += files(
project(":sdks:java:core").sourceSets.test.output.classesDirs,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change makes sense. I don't understand how it worked before. Was it not actually running the VR tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in fact VR test were never run/supported for Spark in streaming mode. I just stumbled on this accidentally when I started looking into bugs related to onWindowExpiration :(
Instead there was some custom tests in the module that try to mimic the VR test, but they only cover a very small part (and also run as unit tests).

* Override factory to replace {@link Read.Unbounded} with {@link UnboundedReadFromBoundedSource}
* to force streaming mode.
*/
private static class UnboundedReadFromBoundedSourceOverrideFactory<T>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems useful as a general thing that could be in runners-core-construction FWIW.

Copy link
Member Author

@mosche mosche Sep 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to do that though I'm not entirely sure if the factory is of much value by itself. I also had to fix the outputs in a non trivial way using a visitor after the replacement, see https://github.com/apache/beam/pull/22620/files#diff-d81f49eb0330230bd03ce6cd33b5f70f59c443aac57741e877ececbada32b16bR246-R274. I couldn't find a way to achieve this in mapOutputs of the override factory itself.
Let me know what you think.

@kennknowles
Copy link
Member

Run Spark ValidatesRunner

@mosche
Copy link
Member Author

mosche commented Sep 12, 2022

@kennknowles FYI, a significant number of VR tests are constantly failing here. If I run them independently they usually succeed. It looks like there's some indeterminism around watermark propagation in the runner, see #23129.
Wondering, would you know anyone who's familiar with that code?

@mosche mosche force-pushed the 22472-Spark-TestStream branch from e50fec9 to 9eaf52e Compare September 13, 2022 12:42
@kennknowles
Copy link
Member

I don't know if anyone currently around would be familiar with SparkRunner watermark propagation.

@kennknowles
Copy link
Member

I think it is valuable to get these tests running and disable them. The test and list of disabled tests can be a real representation of the current state. That way things that are green can stay green.

@kennknowles
Copy link
Member

run spark validatesrunner

@mosche
Copy link
Member Author

mosche commented Sep 14, 2022

Run Spark ValidatesRunner

@mosche mosche force-pushed the 22472-Spark-TestStream branch from 79c1953 to b35dbaa Compare September 14, 2022 12:28
@mosche
Copy link
Member Author

mosche commented Sep 14, 2022

Run Spark ValidatesRunner

@mosche
Copy link
Member Author

mosche commented Sep 14, 2022

I took a bit of a turn here after validating my initial approach replacing bounded sources with UnboundedReadFromBoundedSource with VR tests in Flink:

  • Tests that failed likely due to watermark issues with the Spark runner ([Bug]: Issues with Watermark propagation in Spark runner (streaming) #23129, see test results) ran fine with Flink suggesting there really is a major problem (in streaming mode).

  • Nevertheless, it also showed that the approach is somehow flawed. Some bounded test cases simply cannot be forced into a streaming execution, e.g. any GroupByKey will fail on the GlobalWindow if there's no trigger set.

The initial reason for this approach was to prevent the Spark runner from failing when streaming was forced via pipeline options in VR tests for bounded test cases: Spark refuses to start if there's no streaming workload scheduled.
Instead TestSparkRunner now just detects the translation mode and acts accordingly.

Unfortunately, this hides any watermark issues uncovered above as VR tests succeed.

@kennknowles
Copy link
Member

  • Nevertheless, it also showed that the approach is somehow flawed. Some bounded test cases simply cannot be forced into a streaming execution, e.g. any GroupByKey will fail on the GlobalWindow if there's no trigger set.

In the Beam model, this condition is that a GroupByKey of an unbounded PCollection in global window must have a trigger. But you can still have a bounded PCollection in streaming mode.

So the summary is:

  • forcing a run in streaming mode, but leaving bounded PCollections as bounded is OK
  • automatically making all PCollections unbounded is flawed (but still can be useful to find bugs sometimes)

@mosche
Copy link
Member Author

mosche commented Sep 19, 2022

@kennknowles fine to merge this?

@github-actions
Copy link
Contributor

Reminder, please take a look at this pr: @lukecwik

@github-actions
Copy link
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @kennknowles for label java.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@aromanenko-dev
Copy link
Contributor

@kennknowles kind ping, are you ok to merge it?

@kennknowles kennknowles merged commit 3c7a4e0 into apache:master Sep 30, 2022
@mosche mosche deleted the 22472-Spark-TestStream branch October 1, 2022 06:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Improvement]: Support TestStream in SparkRunner to validate streaming runner

3 participants