From f95e888e6e4b3a84e0c7b3ac88b175061eaf0782 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 12 Sep 2025 09:50:12 -0400 Subject: [PATCH] Return inputwatermarkAdvanced signal and use it to determine whether a new bundle is ready. --- .../prism/internal/engine/elementmanager.go | 28 +++++++++++-------- .../prism/internal/engine/teststream.go | 2 +- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 8c8b71ca4146..c6aa43c40101 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -345,7 +345,7 @@ func (em *ElementManager) Impulse(stageID string) { count := consumer.AddPending(em, newPending) em.addPending(count) } - refreshes := stage.updateWatermarks(em) + refreshes, _ := stage.updateWatermarks(em) em.markStagesAsChanged(refreshes) } @@ -428,13 +428,13 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context. // We know there is some work we can do that may advance the watermarks, // refresh them, and see which stages have advanced. - advanced := em.refreshWatermarks() + advanced, inputWatermarkAdvanced := em.refreshWatermarks() advanced.merge(changedByProcessingTime) // Check each advanced stage, to see if it's able to execute based on the watermark. for stageID := range advanced { ss := em.stages[stageID] - watermark, ready, ptimeEventsReady, injectedReady := ss.bundleReady(em, emNow) + watermark, ready, ptimeEventsReady, injectedReady := ss.bundleReady(em, emNow, inputWatermarkAdvanced[stageID]) if injectedReady { ss.mu.Lock() injected := ss.bundlesToInject @@ -1081,10 +1081,11 @@ func (em *ElementManager) markChangedAndClearBundle(stageID, bundID string, ptRe // been marked as changed, and returns the set of stages where the // the watermark may have advanced. // Must be called while holding em.refreshCond.L -func (em *ElementManager) refreshWatermarks() set[string] { +func (em *ElementManager) refreshWatermarks() (set[string], map[string]bool) { // Need to have at least one refresh signal. nextUpdates := set[string]{} refreshed := set[string]{} + inputWatermarkAdvanced := map[string]bool{} var i int for stageID := range em.changedStages { // clear out old one. @@ -1092,7 +1093,8 @@ func (em *ElementManager) refreshWatermarks() set[string] { ss := em.stages[stageID] refreshed.insert(stageID) - refreshes := ss.updateWatermarks(em) + refreshes, advanced := ss.updateWatermarks(em) + inputWatermarkAdvanced[stageID] = advanced nextUpdates.merge(refreshes) // cap refreshes incrementally. if i < 10 { @@ -1102,7 +1104,7 @@ func (em *ElementManager) refreshWatermarks() set[string] { } } em.changedStages.merge(nextUpdates) - return refreshed + return refreshed, inputWatermarkAdvanced } type set[K comparable] map[K]struct{} @@ -1998,10 +2000,11 @@ func (ss *stageState) String() string { // Watermark_In' = MAX(Watermark_In, MIN(U(TS_Pending), U(Watermark_InputPCollection))) // Watermark_Out' = MAX(Watermark_Out, MIN(Watermark_In', U(minWatermarkHold))) // Watermark_PCollection = Watermark_Out_ProducingPTransform -func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { +func (ss *stageState) updateWatermarks(em *ElementManager) (set[string], bool) { ss.mu.Lock() defer ss.mu.Unlock() + inputWatermarkAdvanced := false minPending := ss.minPendingTimestampLocked() minWatermarkHold := ss.watermarkHolds.Min() @@ -2017,6 +2020,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { // If bigger, advance the input watermark. if newIn > ss.input { ss.input = newIn + inputWatermarkAdvanced = true } // The output starts with the new input as the basis. newOut := ss.input @@ -2033,7 +2037,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { } // If the newOut is smaller, then don't change downstream watermarks. if newOut <= ss.output { - return nil + return nil, inputWatermarkAdvanced } // If bigger, advance the output watermark @@ -2068,7 +2072,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { } // If there are windows to expire, we don't update the output watermark yet. if preventDownstreamUpdate { - return nil + return nil, inputWatermarkAdvanced } // Update this stage's output watermark, and then propagate that to downstream stages @@ -2086,7 +2090,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { refreshes.insert(sID.Global) } } - return refreshes + return refreshes, inputWatermarkAdvanced } // createOnWindowExpirationBundles injects bundles when windows @@ -2164,7 +2168,7 @@ func (ss *stageState) createOnWindowExpirationBundles(newOut mtime.Time, em *Ele // bundleReady returns the maximum allowed watermark for this stage, and whether // it's permitted to execute by side inputs. -func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.Time, bool, bool, bool) { +func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time, inputWatermarkAdvanced bool) (mtime.Time, bool, bool, bool) { ss.mu.Lock() defer ss.mu.Unlock() @@ -2175,7 +2179,7 @@ func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.T // then we can't yet process this stage. inputW := ss.input _, upstreamW := ss.UpstreamWatermark() - if inputW == upstreamW { + if inputW == upstreamW && !inputWatermarkAdvanced { slog.Debug("bundleReady: unchanged upstream watermark", slog.String("stage", ss.ID), slog.Group("watermark", diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go index 0af4e7dc41f0..a2480aec5b57 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go @@ -252,7 +252,7 @@ type tsFinalEvent struct { func (ev tsFinalEvent) Execute(em *ElementManager) { em.testStreamHandler.UpdateHold(em, mtime.MaxTimestamp) ss := em.stages[ev.stageID] - kickSet := ss.updateWatermarks(em) + kickSet, _ := ss.updateWatermarks(em) kickSet.insert(ev.stageID) em.changedStages.merge(kickSet) }