From bd2271b3ad27756bf0c85552ca7c2292268f42fe Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 3 Dec 2024 09:55:05 -0800 Subject: [PATCH 1/5] [#32222] Actually maintain the heap invariant for timers. --- runners/prism/java/build.gradle | 9 +-------- .../beam/runners/prism/internal/engine/elementmanager.go | 2 +- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index f2dfa2bb1a28..82eb62b9e207 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -178,10 +178,7 @@ def sickbayTests = [ 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testTwoRequiresTimeSortedInputWithLateData', 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateData', - // Timer race condition/ordering issue in Prism. - 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testTwoTimersSettingEachOtherWithCreateAsInputUnbounded', - - // Missing output due to timer skew. + // Missing output due to processing time timer skew. 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew', // TestStream + BundleFinalization. @@ -241,10 +238,6 @@ def createPrismValidatesRunnerTask = { name, environmentType -> excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' } filter { - // Hangs forever with prism. Put here instead of sickbay to allow sickbay runs to terminate. - // https://github.com/apache/beam/issues/32222 - excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerOrderingWithCreate' - for (String test : sickbayTests) { excludeTestsMatching test } diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 3cfde4701a8f..605439f4465a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1159,7 +1159,7 @@ func (ss *stageState) AddPending(newPending []element) int { } ss.pendingByKeys[string(e.keyBytes)] = dnt } - dnt.elements.Push(e) + heap.Push(&dnt.elements, e) if e.IsTimer() { if lastSet, ok := dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}]; ok { From 9bf8fe55c398d1d96612d711412ceeae044680f7 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 3 Dec 2024 10:14:02 -0800 Subject: [PATCH 2/5] Include in CHANGES.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index fc32398a7a5a..1b943a99f8a0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -81,6 +81,7 @@ ## Bugfixes * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)). ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). From 2e799bf5325956246d6ed3ba1f45df1d958472ec Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 3 Dec 2024 16:34:13 -0800 Subject: [PATCH 3/5] Extend GC by one instant. --- .../beam/runners/prism/internal/engine/elementmanager.go | 9 ++++++--- .../runners/portability/fn_api_runner/fn_runner_test.py | 4 ++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 605439f4465a..2c81ce6a53ab 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1576,16 +1576,19 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { // They'll never be read in again. for _, wins := range ss.sideInputs { for win := range wins { + // TODO(#https://github.com/apache/beam/issues/31438): + // Adjust with AllowedLateness // Clear out anything we've already used. - if win.MaxTimestamp() < newOut { + if win.MaxTimestamp()+1 < newOut { delete(wins, win) } } } for _, wins := range ss.state { for win := range wins { - // Clear out anything we've already used. - if win.MaxTimestamp() < newOut { + // TODO(#https://github.com/apache/beam/issues/31438): + // Adjust with AllowedLateness + if win.MaxTimestamp()+1 < newOut { delete(wins, win) } } diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index 4a737feaf288..c788caee68d1 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -790,8 +790,8 @@ def is_buffered_correctly(actual): # Acutal should be a grouping of the inputs into batches of size # at most buffer_size, but the actual batching is nondeterministic # based on ordering and trigger firing timing. - self.assertEqual(sorted(sum((list(b) for b in actual), [])), elements) - self.assertEqual(max(len(list(buffer)) for buffer in actual), buffer_size) + self.assertEqual(sorted(sum((list(b) for b in actual), [])), elements, actual) + self.assertEqual(max(len(list(buffer)) for buffer in actual), buffer_size, actual) if windowed: # Elements were assigned to windows based on their parity. # Assert that each grouping consists of elements belonging to the From 7516b981161e080425bac24dedf351a65ee75d8c Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 4 Dec 2024 10:39:58 -0800 Subject: [PATCH 4/5] Fix the python test instead. --- .../pkg/beam/runners/prism/internal/engine/elementmanager.go | 4 ++-- .../runners/portability/fn_api_runner/fn_runner_test.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 2c81ce6a53ab..1739efdb742a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1579,7 +1579,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { // TODO(#https://github.com/apache/beam/issues/31438): // Adjust with AllowedLateness // Clear out anything we've already used. - if win.MaxTimestamp()+1 < newOut { + if win.MaxTimestamp() < newOut { delete(wins, win) } } @@ -1588,7 +1588,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { for win := range wins { // TODO(#https://github.com/apache/beam/issues/31438): // Adjust with AllowedLateness - if win.MaxTimestamp()+1 < newOut { + if win.MaxTimestamp() < newOut { delete(wins, win) } } diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index c788caee68d1..45022ebc781f 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -776,7 +776,8 @@ def process( state.clear() yield buffer else: - timer.set(ts + 1) + # Set the timer to fire within it's window. + timer.set(ts + (1 - timestamp.Duration(micros=1000))) @userstate.on_timer(timer_spec) def process_timer(self, state=beam.DoFn.StateParam(state_spec)): From 41e92e8783a3cc238a44046fd98e4e766c0bef7a Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 4 Dec 2024 10:51:43 -0800 Subject: [PATCH 5/5] pyfmt --- .../runners/portability/fn_api_runner/fn_runner_test.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index 45022ebc781f..1309e7c74abc 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -791,8 +791,10 @@ def is_buffered_correctly(actual): # Acutal should be a grouping of the inputs into batches of size # at most buffer_size, but the actual batching is nondeterministic # based on ordering and trigger firing timing. - self.assertEqual(sorted(sum((list(b) for b in actual), [])), elements, actual) - self.assertEqual(max(len(list(buffer)) for buffer in actual), buffer_size, actual) + self.assertEqual( + sorted(sum((list(b) for b in actual), [])), elements, actual) + self.assertEqual( + max(len(list(buffer)) for buffer in actual), buffer_size, actual) if windowed: # Elements were assigned to windows based on their parity. # Assert that each grouping consists of elements belonging to the