diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 2aecd2991364..de997fa3282c 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1647,6 +1647,12 @@ keysPerBundle: timerCleared = true continue } + if e.timestamp > watermark { + // We don't trigger a timer when its firing timestamp is over watermark. + // Push the timer back so we won't lose it. + heap.Push(&dnt.elements, e) + break + } holdsInBundle[e.holdTimestamp]++ // Clear the "fired" timer so subsequent matches can be ignored. delete(dnt.timers, timerKey{family: e.family, tag: e.tag, window: e.window}) diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index a877a887ac1a..a1fd49f609d9 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -180,9 +180,9 @@ func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, c progTick := time.NewTicker(baseTick) defer progTick.Stop() var dataFinished, bundleFinished bool - // If we have no data outputs, we still need to have progress & splits + // If we have no data outputs and timers, we still need to have progress & splits // while waiting for bundle completion. - if b.OutputCount == 0 { + if b.OutputCount+len(b.HasTimers) == 0 { dataFinished = true } var resp *fnpb.ProcessBundleResponse