Skip to content

[Bug]: Issues with Watermark propagation in Spark runner (streaming) #23129

@mosche

Description

@mosche

What happened?

I was running into this issue when adding support for VR tests for Spark in streaming mode (#22620).

It looks like the runner only supports a global watermark, causing issues when running multiple stateful operators.
In any case watermark propagation seems to be highly indeterministic and prone to race conditions.

In the logs below you can see that timers fire downstream in PAssert$0/GroupGlobally/GroupByKey (at 56,830) after advancing the global watermark (at 56,793) but before respective timers are fired upstream in Group.ByFields/ToKvs/GroupByKey (at 57,005). As a result the expected element is the considered expired when finally arriving in PAssert$0/GroupGlobally/GroupByKey (at 57,065):

52,112 [worker] INFO  TestSparkRunner  - About to run test pipeline avroschematest0testavropipelinegroupby-mmack-0909103252-2afdba63

55,816 [0] INFO  StateSpecFunctions  - Source 0_0 read 1 values, watermarks: [low=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z]

55,899 [JobGenerator] INFO  GlobalWatermarkHolder  - Queued watermarks for source 0: [low=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.000Z]

56,297 [1] INFO  StateSpecFunctions  - Source 0_0 read 0 values, watermarks: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z]

56,314 [JobGenerator] INFO  GlobalWatermarkHolder  - Queued watermarks for source 0: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.500Z]

56,546 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: input elements: 1, expired elements: 0 [current=-290308-12-21T19:59:05.225Z, high=-290308-12-21T19:59:05.225Z]
56,567 [3] INFO  SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: advancing watermark from -290308-12-21T19:59:05.225Z to -290308-12-21T19:59:05.225Z
56,570 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: output elements: 0, state size: 2

56,614 [9] TRACE SparkGroupAlsoByWindowViaWindowSet  - PAssert$0/GroupGlobally/GroupByKey: input elements: 1, expired elements: 0 [current=-290308-12-21T19:59:05.225Z, high=-290308-12-21T19:59:05.225Z]
56,615 [9] INFO  SparkGroupAlsoByWindowViaWindowSet  - PAssert$0/GroupGlobally/GroupByKey: advancing watermark from -290308-12-21T19:59:05.225Z to -290308-12-21T19:59:05.225Z
56,615 [9] TRACE SparkGroupAlsoByWindowViaWindowSet  - PAssert$0/GroupGlobally/GroupByKey: output elements: 0, state size: 2

56,770 [15] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: input elements: 0, expired elements: 0 [current=-290308-12-21T19:59:05.225Z, high=-290308-12-21T19:59:05.225Z]
56,771 [15] INFO  SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: advancing watermark from -290308-12-21T19:59:05.225Z to -290308-12-21T19:59:05.225Z
56,771 [15] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: output elements: 0, state size: 2

56,780 [8] INFO  StateSpecFunctions  - Source 0_0 read 0 values, watermarks: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z]

56,793 [listener] INFO  GlobalWatermarkHolder  - Batch 2022-09-09T10:32:55.000Z completed, new watermarks: {0=[low=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.000Z]}

56,793 [JobGenerator] INFO  GlobalWatermarkHolder  - Queued watermarks for source 0: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:56.000Z]

56,829 [20] TRACE SparkGroupAlsoByWindowViaWindowSet  - PAssert$0/GroupGlobally/GroupByKey: input elements: 0, expired elements: 0 [current=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z]
56,829 [20] INFO  SparkGroupAlsoByWindowViaWindowSet  - PAssert$0/GroupGlobally/GroupByKey: advancing watermark from -290308-12-21T19:59:05.225Z to 294247-01-10T04:00:54.775Z
56,830 [20] DEBUG SparkGroupAlsoByWindowViaWindowSet  - PAssert$0/GroupGlobally/GroupByKey: eligible timer at 294247-01-09T04:00:54.775Z: TimerData{timerId=0:9223371950454775, timerFamilyId=, namespace=Window(org.apache.beam.sdk.transforms.windowing.GlobalWindow@1f346ad2), timestamp=294247-01-09T04:00:54.775Z, outputTimestamp=294247-01-09T04:00:54.775Z, domain=EVENT_TIME, deleted=false}
56,835 [20] TRACE SparkGroupAlsoByWindowViaWindowSet  - PAssert$0/GroupGlobally/GroupByKey: output elements: 1, state size: 0

56,850 [20] ERROR Executor  - Exception in task 1.0 in stage 14.0 (TID 20)
org.apache.beam.sdk.util.UserCodeException: java.lang.AssertionError: Group.ByFields/ToRow/ParMultiDo(Anonymous).output: 
Expected: iterable with items [<Row: key:Row: string:mystring,,value:[Row: bool_non_nullable:true,int:43,long:44,float:44.1,double:44.2,string:mystring,bytes:[1, 2, 3, 4],fixed:[B@7cb94f7,date:1979-03-14T00:00:00.000Z,timestampMillis:1979-03-14T00:02:03.004Z,testEnum:enum value: 0,row:Row: BOOL_NON_NULLABLE:true,int:42,,array:[Row: BOOL_NON_NULLABLE:true,int:42,, Row: BOOL_NON_NULLABLE:true,int:42,, ],map:{(k1, Row: BOOL_NON_NULLABLE:true,int:42,), (k2, Row: BOOL_NON_NULLABLE:true,int:42,), },, ],>] in any order but: no item matches: <Row: key:Row: string:mystring,,value:[Row: bool_non_nullable:true,int:43,long:44,float:44.1,double:44.2,string:mystring,bytes:[1, 2, 3, 4],fixed:[B@7cb94f7,date:1979-03-14T00:00:00.000Z,timestampMillis:1979-03-14T00:02:03.004Z,testEnum:enum value: 0,row:Row: BOOL_NON_NULLABLE:true,int:42,,array:[Row: BOOL_NON_NULLABLE:true,int:42,, Row: BOOL_NON_NULLABLE:true,int:42,, ],map:{(k1, Row: BOOL_NON_NULLABLE:true,int:42,), (k2, Row: BOOL_NON_NULLABLE:true,int:42,), },, ],> in []
56,879 [task-result] ERROR TaskSetManager  - Task 1 in stage 14.0 failed 1 times; aborting job
56,893 [JobScheduler] ERROR JobScheduler  - Error running job streaming job 1662719575500 ms.0

57,004 [26] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: input elements: 0, expired elements: 0 [current=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z]
57,004 [26] INFO  SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: advancing watermark from -290308-12-21T19:59:05.225Z to 294247-01-10T04:00:54.775Z
57,005 [26] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: eligible timer at 294247-01-09T04:00:54.775Z: TimerData{timerId=0:9223371950454775, timerFamilyId=, namespace=Window(org.apache.beam.sdk.transforms.windowing.GlobalWindow@1f346ad2), timestamp=294247-01-09T04:00:54.775Z, outputTimestamp=294247-01-09T04:00:54.775Z, domain=EVENT_TIME, deleted=false}
57,005 [26] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: output elements: 1, state size: 0

57,005 [listener] INFO  GlobalWatermarkHolder  - Current watermarks for sourceId 0: [low=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.000Z]
57,005 [listener] INFO  GlobalWatermarkHolder  - Batch 2022-09-09T10:32:55.500Z completed, new watermarks {0=[low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.500Z]}

57,065 [31] TRACE SparkGroupAlsoByWindowViaWindowSet  - PAssert$0/GroupGlobally/GroupByKey: input elements: 1, expired elements: 1 [current=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z]
57,065 [31] INFO  SparkGroupAlsoByWindowViaWindowSet  - PAssert$0/GroupGlobally/GroupByKey: advancing watermark from 294247-01-10T04:00:54.775Z to 294247-01-10T04:00:54.775Z

57,174 [listener] INFO  GlobalWatermarkHolder  - Current watermarks for sourceId 0: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.500Z]
57,176 [listener] INFO  GlobalWatermarkHolder  - Batch 2022-09-09T10:32:56.000Z completed: new watermarks {0=[low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:56.000Z]}

57,180 [24] INFO  StateSpecFunctions  - Source 0_0 read 0 values, watermarks: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z]

57,190 [JobGenerator] INFO  GlobalWatermarkHolder  - Queued watermarks for source 0: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:56.500Z]

57,552 [listener] INFO  GlobalWatermarkHolder  - Current watermarks for sourceId 0: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:56.000Z]
57,555 [listener] INFO  GlobalWatermarkHolder  - Batch 2022-09-09T10:32:56.500Z completed, new watermarks {0=[low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:56.500Z]}

Issue Priority

Priority: 2

Issue Component

Component: runner-spark

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions