From 2cd7e7013538686240e9526f802dbffd6792637c Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 29 May 2025 22:47:34 -0400 Subject: [PATCH 1/2] Fix event timer firing earlier than the time set. --- .../pkg/beam/runners/prism/internal/engine/elementmanager.go | 4 ++++ sdks/go/pkg/beam/runners/prism/internal/stage.go | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 2aecd2991364..cbac3e0caf5f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1648,6 +1648,10 @@ keysPerBundle: continue } holdsInBundle[e.holdTimestamp]++ + if e.timestamp > watermark { + // we don't trigger a timer when its firing timestamp is over watermark + break + } // Clear the "fired" timer so subsequent matches can be ignored. delete(dnt.timers, timerKey{family: e.family, tag: e.tag, window: e.window}) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index a877a887ac1a..a1fd49f609d9 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -180,9 +180,9 @@ func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, c progTick := time.NewTicker(baseTick) defer progTick.Stop() var dataFinished, bundleFinished bool - // If we have no data outputs, we still need to have progress & splits + // If we have no data outputs and timers, we still need to have progress & splits // while waiting for bundle completion. - if b.OutputCount == 0 { + if b.OutputCount+len(b.HasTimers) == 0 { dataFinished = true } var resp *fnpb.ProcessBundleResponse From 124c4425485b4c018b21f8b9e63cbde4b125c2a5 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 30 May 2025 13:00:39 -0400 Subject: [PATCH 2/2] Fix timer loss issue. --- .../beam/runners/prism/internal/engine/elementmanager.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index cbac3e0caf5f..de997fa3282c 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1647,11 +1647,13 @@ keysPerBundle: timerCleared = true continue } - holdsInBundle[e.holdTimestamp]++ if e.timestamp > watermark { - // we don't trigger a timer when its firing timestamp is over watermark + // We don't trigger a timer when its firing timestamp is over watermark. + // Push the timer back so we won't lose it. + heap.Push(&dnt.elements, e) break } + holdsInBundle[e.holdTimestamp]++ // Clear the "fired" timer so subsequent matches can be ignored. delete(dnt.timers, timerKey{family: e.family, tag: e.tag, window: e.window}) }