Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (em *ElementManager) Impulse(stageID string) {
count := consumer.AddPending(em, newPending)
em.addPending(count)
}
refreshes := stage.updateWatermarks(em)
refreshes, _ := stage.updateWatermarks(em)
em.markStagesAsChanged(refreshes)
}

Expand Down Expand Up @@ -428,13 +428,13 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.

// We know there is some work we can do that may advance the watermarks,
// refresh them, and see which stages have advanced.
advanced := em.refreshWatermarks()
advanced, inputWatermarkAdvanced := em.refreshWatermarks()
advanced.merge(changedByProcessingTime)

// Check each advanced stage, to see if it's able to execute based on the watermark.
for stageID := range advanced {
ss := em.stages[stageID]
watermark, ready, ptimeEventsReady, injectedReady := ss.bundleReady(em, emNow)
watermark, ready, ptimeEventsReady, injectedReady := ss.bundleReady(em, emNow, inputWatermarkAdvanced[stageID])
if injectedReady {
ss.mu.Lock()
injected := ss.bundlesToInject
Expand Down Expand Up @@ -1081,18 +1081,20 @@ func (em *ElementManager) markChangedAndClearBundle(stageID, bundID string, ptRe
// been marked as changed, and returns the set of stages where the
// the watermark may have advanced.
// Must be called while holding em.refreshCond.L
func (em *ElementManager) refreshWatermarks() set[string] {
func (em *ElementManager) refreshWatermarks() (set[string], map[string]bool) {
// Need to have at least one refresh signal.
nextUpdates := set[string]{}
refreshed := set[string]{}
inputWatermarkAdvanced := map[string]bool{}
var i int
for stageID := range em.changedStages {
// clear out old one.
em.changedStages.remove(stageID)
ss := em.stages[stageID]
refreshed.insert(stageID)

refreshes := ss.updateWatermarks(em)
refreshes, advanced := ss.updateWatermarks(em)
inputWatermarkAdvanced[stageID] = advanced
nextUpdates.merge(refreshes)
// cap refreshes incrementally.
if i < 10 {
Expand All @@ -1102,7 +1104,7 @@ func (em *ElementManager) refreshWatermarks() set[string] {
}
}
em.changedStages.merge(nextUpdates)
return refreshed
return refreshed, inputWatermarkAdvanced
}

type set[K comparable] map[K]struct{}
Expand Down Expand Up @@ -1998,10 +2000,11 @@ func (ss *stageState) String() string {
// Watermark_In' = MAX(Watermark_In, MIN(U(TS_Pending), U(Watermark_InputPCollection)))
// Watermark_Out' = MAX(Watermark_Out, MIN(Watermark_In', U(minWatermarkHold)))
// Watermark_PCollection = Watermark_Out_ProducingPTransform
func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
func (ss *stageState) updateWatermarks(em *ElementManager) (set[string], bool) {
ss.mu.Lock()
defer ss.mu.Unlock()

inputWatermarkAdvanced := false
minPending := ss.minPendingTimestampLocked()
minWatermarkHold := ss.watermarkHolds.Min()

Expand All @@ -2017,6 +2020,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
// If bigger, advance the input watermark.
if newIn > ss.input {
ss.input = newIn
inputWatermarkAdvanced = true
}
// The output starts with the new input as the basis.
newOut := ss.input
Expand All @@ -2033,7 +2037,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
}
// If the newOut is smaller, then don't change downstream watermarks.
if newOut <= ss.output {
return nil
return nil, inputWatermarkAdvanced
}

// If bigger, advance the output watermark
Expand Down Expand Up @@ -2068,7 +2072,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
}
// If there are windows to expire, we don't update the output watermark yet.
if preventDownstreamUpdate {
return nil
return nil, inputWatermarkAdvanced
}

// Update this stage's output watermark, and then propagate that to downstream stages
Expand All @@ -2086,7 +2090,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
refreshes.insert(sID.Global)
}
}
return refreshes
return refreshes, inputWatermarkAdvanced
}

// createOnWindowExpirationBundles injects bundles when windows
Expand Down Expand Up @@ -2164,7 +2168,7 @@ func (ss *stageState) createOnWindowExpirationBundles(newOut mtime.Time, em *Ele

// bundleReady returns the maximum allowed watermark for this stage, and whether
// it's permitted to execute by side inputs.
func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.Time, bool, bool, bool) {
func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time, inputWatermarkAdvanced bool) (mtime.Time, bool, bool, bool) {
ss.mu.Lock()
defer ss.mu.Unlock()

Expand All @@ -2175,7 +2179,7 @@ func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.T
// then we can't yet process this stage.
inputW := ss.input
_, upstreamW := ss.UpstreamWatermark()
if inputW == upstreamW {
if inputW == upstreamW && !inputWatermarkAdvanced {
slog.Debug("bundleReady: unchanged upstream watermark",
slog.String("stage", ss.ID),
slog.Group("watermark",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ type tsFinalEvent struct {
func (ev tsFinalEvent) Execute(em *ElementManager) {
em.testStreamHandler.UpdateHold(em, mtime.MaxTimestamp)
ss := em.stages[ev.stageID]
kickSet := ss.updateWatermarks(em)
kickSet, _ := ss.updateWatermarks(em)
kickSet.insert(ev.stageID)
em.changedStages.merge(kickSet)
}
Expand Down
Loading