diff --git a/CHANGES.md b/CHANGES.md index fc32398a7a5a..1b943a99f8a0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -81,6 +81,7 @@ ## Bugfixes * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)). ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index f2dfa2bb1a28..82eb62b9e207 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -178,10 +178,7 @@ def sickbayTests = [ 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testTwoRequiresTimeSortedInputWithLateData', 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateData', - // Timer race condition/ordering issue in Prism. - 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testTwoTimersSettingEachOtherWithCreateAsInputUnbounded', - - // Missing output due to timer skew. + // Missing output due to processing time timer skew. 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew', // TestStream + BundleFinalization. @@ -241,10 +238,6 @@ def createPrismValidatesRunnerTask = { name, environmentType -> excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' } filter { - // Hangs forever with prism. Put here instead of sickbay to allow sickbay runs to terminate. - // https://github.com/apache/beam/issues/32222 - excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerOrderingWithCreate' - for (String test : sickbayTests) { excludeTestsMatching test } diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 3cfde4701a8f..1739efdb742a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1159,7 +1159,7 @@ func (ss *stageState) AddPending(newPending []element) int { } ss.pendingByKeys[string(e.keyBytes)] = dnt } - dnt.elements.Push(e) + heap.Push(&dnt.elements, e) if e.IsTimer() { if lastSet, ok := dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}]; ok { @@ -1576,6 +1576,8 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { // They'll never be read in again. for _, wins := range ss.sideInputs { for win := range wins { + // TODO(#https://github.com/apache/beam/issues/31438): + // Adjust with AllowedLateness // Clear out anything we've already used. if win.MaxTimestamp() < newOut { delete(wins, win) @@ -1584,7 +1586,8 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { } for _, wins := range ss.state { for win := range wins { - // Clear out anything we've already used. + // TODO(#https://github.com/apache/beam/issues/31438): + // Adjust with AllowedLateness if win.MaxTimestamp() < newOut { delete(wins, win) } diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index 4a737feaf288..1309e7c74abc 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -776,7 +776,8 @@ def process( state.clear() yield buffer else: - timer.set(ts + 1) + # Set the timer to fire within it's window. + timer.set(ts + (1 - timestamp.Duration(micros=1000))) @userstate.on_timer(timer_spec) def process_timer(self, state=beam.DoFn.StateParam(state_spec)): @@ -790,8 +791,10 @@ def is_buffered_correctly(actual): # Acutal should be a grouping of the inputs into batches of size # at most buffer_size, but the actual batching is nondeterministic # based on ordering and trigger firing timing. - self.assertEqual(sorted(sum((list(b) for b in actual), [])), elements) - self.assertEqual(max(len(list(buffer)) for buffer in actual), buffer_size) + self.assertEqual( + sorted(sum((list(b) for b in actual), [])), elements, actual) + self.assertEqual( + max(len(list(buffer)) for buffer in actual), buffer_size, actual) if windowed: # Elements were assigned to windows based on their parity. # Assert that each grouping consists of elements belonging to the