From 0ac07322d3e97ce20160f3ff9ea99d27dcbedf9c Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 5 Sep 2025 11:49:59 -0400 Subject: [PATCH 01/14] Construct trigger struct based on pipeline proto. --- .../runners/prism/internal/engine/strategy.go | 22 +++++++++++++++++++ .../beam/runners/prism/internal/execute.go | 20 ++++++++++++++++- .../prism/internal/jobservices/management.go | 2 +- 3 files changed, 42 insertions(+), 2 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go index 5446d3edd3c0..79ca70dad31e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go @@ -573,4 +573,26 @@ func (t *TriggerDefault) String() string { return "Default" } +// TriggerAfterProcessingTime fires once after a specified amount of processing time +// has passed since an element was first seen. +type TriggerAfterProcessingTime struct { + Delay time.Duration + AlignToPeriod time.Duration + AlignToOffset time.Duration +} + +func (t *TriggerAfterProcessingTime) onElement(input triggerInput, state *StateData) {} + +func (t *TriggerAfterProcessingTime) shouldFire(state *StateData) bool { + return false +} + +func (t *TriggerAfterProcessingTime) onFire(state *StateData) {} + +func (t *TriggerAfterProcessingTime) reset(state *StateData) {} + +func (t *TriggerAfterProcessingTime) String() string { + return fmt.Sprintf("AfterProcessingTime[Delay: %v, AlignToPeriod: %v, AlignToOffset: %v]", t.Delay, t.AlignToPeriod, t.AlignToOffset) +} + // TODO https://github.com/apache/beam/issues/31438 Handle TriggerAfterProcessingTime diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 5b277923d290..70e002bc0b33 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -457,7 +457,25 @@ func buildTrigger(tpb *pipepb.Trigger) engine.Trigger { } case *pipepb.Trigger_Repeat_: return &engine.TriggerRepeatedly{Repeated: buildTrigger(at.Repeat.GetSubtrigger())} - case *pipepb.Trigger_AfterProcessingTime_, *pipepb.Trigger_AfterSynchronizedProcessingTime_: + case *pipepb.Trigger_AfterProcessingTime_: + var delay, period, offset time.Duration + // TODO: support multiple transforms. + if len(at.AfterProcessingTime.GetTimestampTransforms()) > 0 { + ts := at.AfterProcessingTime.GetTimestampTransforms()[0] + if d := ts.GetDelay(); d != nil { + delay = time.Duration(d.GetDelayMillis()) * time.Millisecond + } + if align := ts.GetAlignTo(); align != nil { + period = time.Duration(align.GetPeriod()) * time.Millisecond + offset = time.Duration(align.GetOffset()) * time.Millisecond + } + } + return &engine.TriggerAfterProcessingTime{ + Delay: delay, + AlignToPeriod: period, + AlignToOffset: offset, + } + case *pipepb.Trigger_AfterSynchronizedProcessingTime_: panic(fmt.Sprintf("unsupported trigger: %v", prototext.Format(tpb))) default: return &engine.TriggerDefault{} diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index f00838152111..12c3c42c2e92 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -316,7 +316,7 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ * func hasUnsupportedTriggers(tpb *pipepb.Trigger) bool { unsupported := false switch at := tpb.GetTrigger().(type) { - case *pipepb.Trigger_AfterProcessingTime_, *pipepb.Trigger_AfterSynchronizedProcessingTime_: + case *pipepb.Trigger_AfterSynchronizedProcessingTime_: return true case *pipepb.Trigger_AfterAll_: for _, st := range at.AfterAll.GetSubtriggers() { From a252d9243dc7210b2313b672304189038ab6c3b2 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 5 Sep 2025 15:05:00 -0400 Subject: [PATCH 02/14] Fill in callbacks for TriggerAfterProcessingTime --- .../runners/prism/internal/engine/strategy.go | 61 ++++++++++++++++--- .../beam/runners/prism/internal/execute.go | 16 ++--- 2 files changed, 61 insertions(+), 16 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go index 79ca70dad31e..16b11332c657 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go @@ -573,26 +573,69 @@ func (t *TriggerDefault) String() string { return "Default" } -// TriggerAfterProcessingTime fires once after a specified amount of processing time -// has passed since an element was first seen. -type TriggerAfterProcessingTime struct { +// TimestampTransform is the engine's representation of a processing time transform. +type TimestampTransform struct { Delay time.Duration AlignToPeriod time.Duration AlignToOffset time.Duration } -func (t *TriggerAfterProcessingTime) onElement(input triggerInput, state *StateData) {} +// TriggerAfterProcessingTime fires once after a specified amount of processing time +// has passed since an element was first seen. +// Uses the extra state field to track if the processing time of the first element. +type TriggerAfterProcessingTime struct { + Transforms []TimestampTransform +} + +func (t *TriggerAfterProcessingTime) onElement(input triggerInput, state *StateData) { + ts := state.getTriggerState(t) + if ts.finished { + return + } + + if ts.extra == nil { + ts.extra = mtime.Now() + } + + state.setTriggerState(t, ts) +} + +func (t *TriggerAfterProcessingTime) applyTimestampTransforms(start mtime.Time) mtime.Time { + ret := start + for _, transform := range t.Transforms { + ret = ret + mtime.Time(transform.Delay/time.Millisecond) + if transform.AlignToPeriod > 0 { + // Formula from https://cloud.google.com/blog/products/data-analytics/windowing-and-triggering-in-apache-beam + // timestamp - (timestamp % period) + period + // And with an offset, we adjust before and after. + tsMs := ret + periodMs := mtime.Time(transform.AlignToPeriod / time.Millisecond) + offsetMs := mtime.Time(transform.AlignToOffset / time.Millisecond) + + adjustedMs := tsMs - offsetMs + alignedMs := adjustedMs - (adjustedMs % periodMs) + periodMs + offsetMs + ret = alignedMs + } + } + return ret +} func (t *TriggerAfterProcessingTime) shouldFire(state *StateData) bool { - return false + ts := state.getTriggerState(t) + if ts.extra == nil { + return false + } + startTime := ts.extra.(mtime.Time) + firingTime := t.applyTimestampTransforms(startTime) + return mtime.Now() > firingTime } func (t *TriggerAfterProcessingTime) onFire(state *StateData) {} -func (t *TriggerAfterProcessingTime) reset(state *StateData) {} +func (t *TriggerAfterProcessingTime) reset(state *StateData) { + delete(state.Trigger, t) +} func (t *TriggerAfterProcessingTime) String() string { - return fmt.Sprintf("AfterProcessingTime[Delay: %v, AlignToPeriod: %v, AlignToOffset: %v]", t.Delay, t.AlignToPeriod, t.AlignToOffset) + return fmt.Sprintf("AfterProcessingTime[%v]", t.Transforms) } - -// TODO https://github.com/apache/beam/issues/31438 Handle TriggerAfterProcessingTime diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 70e002bc0b33..5822ecb0970f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -458,10 +458,9 @@ func buildTrigger(tpb *pipepb.Trigger) engine.Trigger { case *pipepb.Trigger_Repeat_: return &engine.TriggerRepeatedly{Repeated: buildTrigger(at.Repeat.GetSubtrigger())} case *pipepb.Trigger_AfterProcessingTime_: - var delay, period, offset time.Duration - // TODO: support multiple transforms. - if len(at.AfterProcessingTime.GetTimestampTransforms()) > 0 { - ts := at.AfterProcessingTime.GetTimestampTransforms()[0] + var transforms []engine.TimestampTransform + for _, ts := range at.AfterProcessingTime.GetTimestampTransforms() { + var delay, period, offset time.Duration if d := ts.GetDelay(); d != nil { delay = time.Duration(d.GetDelayMillis()) * time.Millisecond } @@ -469,11 +468,14 @@ func buildTrigger(tpb *pipepb.Trigger) engine.Trigger { period = time.Duration(align.GetPeriod()) * time.Millisecond offset = time.Duration(align.GetOffset()) * time.Millisecond } + transforms = append(transforms, engine.TimestampTransform{ + Delay: delay, + AlignToPeriod: period, + AlignToOffset: offset, + }) } return &engine.TriggerAfterProcessingTime{ - Delay: delay, - AlignToPeriod: period, - AlignToOffset: offset, + Transforms: transforms, } case *pipepb.Trigger_AfterSynchronizedProcessingTime_: panic(fmt.Sprintf("unsupported trigger: %v", prototext.Format(tpb))) From 2b70fc276c8c56fd81e2fa7c3287655e16b68af7 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 6 Sep 2025 05:20:03 -0400 Subject: [PATCH 03/14] Some minor fix. --- sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go index 16b11332c657..7b2823447727 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go @@ -582,7 +582,7 @@ type TimestampTransform struct { // TriggerAfterProcessingTime fires once after a specified amount of processing time // has passed since an element was first seen. -// Uses the extra state field to track if the processing time of the first element. +// Uses the extra state field to track the processing time of the first element. type TriggerAfterProcessingTime struct { Transforms []TimestampTransform } @@ -605,7 +605,6 @@ func (t *TriggerAfterProcessingTime) applyTimestampTransforms(start mtime.Time) for _, transform := range t.Transforms { ret = ret + mtime.Time(transform.Delay/time.Millisecond) if transform.AlignToPeriod > 0 { - // Formula from https://cloud.google.com/blog/products/data-analytics/windowing-and-triggering-in-apache-beam // timestamp - (timestamp % period) + period // And with an offset, we adjust before and after. tsMs := ret @@ -627,7 +626,7 @@ func (t *TriggerAfterProcessingTime) shouldFire(state *StateData) bool { } startTime := ts.extra.(mtime.Time) firingTime := t.applyTimestampTransforms(startTime) - return mtime.Now() > firingTime + return mtime.Now() >= firingTime } func (t *TriggerAfterProcessingTime) onFire(state *StateData) {} From f20876d6de026a52518883edf9d92c8673c12da5 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 6 Sep 2025 05:54:48 -0400 Subject: [PATCH 04/14] Rename buildTriggeredBundle to runTriggeredBundle. Rearrange the code for refactoring. --- .../runners/prism/internal/engine/elementmanager.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 2ddd7bbc5c1f..6c10a011980d 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1312,7 +1312,7 @@ func (*aggregateStageKind) addPending(ss *stageState, em *ElementManager, newPen // If we're ready, it's time to fire! if ready { - count += ss.buildTriggeredBundle(em, e.keyBytes, e.window) + count += ss.runTriggeredBundle(em, e.keyBytes, e.window) } } return count @@ -1427,16 +1427,14 @@ func computeNextWatermarkPane(pane typex.PaneInfo) typex.PaneInfo { return pane } -// buildTriggeredBundle must be called with the stage.mu lock held. +// runTriggeredBundle must be called with the stage.mu lock held. // When in discarding mode, returns 0. // When in accumulating mode, returns the number of fired elements to maintain a correct pending count. -func (ss *stageState) buildTriggeredBundle(em *ElementManager, key []byte, win typex.Window) int { +func (ss *stageState) runTriggeredBundle(em *ElementManager, key []byte, win typex.Window) int { var toProcess []element dnt := ss.pendingByKeys[string(key)] var notYet []element - rb := RunBundle{StageID: ss.ID, BundleID: "agg-" + em.nextBundID(), Watermark: ss.input} - // Look at all elements for this key, and only for this window. for dnt.elements.Len() > 0 { e := heap.Pop(&dnt.elements).(element) @@ -1470,13 +1468,14 @@ func (ss *stageState) buildTriggeredBundle(em *ElementManager, key []byte, win t if ss.inprogressKeys == nil { ss.inprogressKeys = set[string]{} } - ss.makeInProgressBundle( - func() string { return rb.BundleID }, + bundID := ss.makeInProgressBundle( + func() string { return "agg-" + em.nextBundID() }, toProcess, ss.input, singleSet(string(key)), nil, ) + rb := RunBundle{StageID: ss.ID, BundleID: bundID, Watermark: ss.input} ss.bundlesToInject = append(ss.bundlesToInject, rb) // Bundle is marked in progress here to prevent a race condition. em.refreshCond.L.Lock() From 86ea9039ccb13b38d257668e468e74e8244e86a1 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 6 Sep 2025 06:14:08 -0400 Subject: [PATCH 05/14] Split runTriggeredBundle into three parts, following the pattern of EventTimeBundle and ProcessingTimeBundle. --- .../prism/internal/engine/elementmanager.go | 39 +++++++++++++------ 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 6c10a011980d..8ca77ac42f65 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1427,10 +1427,7 @@ func computeNextWatermarkPane(pane typex.PaneInfo) typex.PaneInfo { return pane } -// runTriggeredBundle must be called with the stage.mu lock held. -// When in discarding mode, returns 0. -// When in accumulating mode, returns the number of fired elements to maintain a correct pending count. -func (ss *stageState) runTriggeredBundle(em *ElementManager, key []byte, win typex.Window) int { +func (ss *stageState) buildTriggeredBundle(key []byte, win typex.Window) ([]element, int) { var toProcess []element dnt := ss.pendingByKeys[string(key)] var notYet []element @@ -1464,23 +1461,43 @@ func (ss *stageState) runTriggeredBundle(em *ElementManager, key []byte, win typ // Ensure the heap invariants are maintained. heap.Init(&dnt.elements) } + return toProcess, accumulationDiff +} +func (ss *stageState) startTriggeredBundle(key []byte, win typex.Window, genBundID func() string) (string, bool, int) { + toProcess, accumulationDiff := ss.buildTriggeredBundle(key, win) + + if len(toProcess) == 0 { + return "", false, accumulationDiff + } if ss.inprogressKeys == nil { ss.inprogressKeys = set[string]{} } bundID := ss.makeInProgressBundle( - func() string { return "agg-" + em.nextBundID() }, + genBundID, toProcess, ss.input, singleSet(string(key)), nil, ) - rb := RunBundle{StageID: ss.ID, BundleID: bundID, Watermark: ss.input} - ss.bundlesToInject = append(ss.bundlesToInject, rb) - // Bundle is marked in progress here to prevent a race condition. - em.refreshCond.L.Lock() - em.inprogressBundles.insert(rb.BundleID) - em.refreshCond.L.Unlock() + + return bundID, true, accumulationDiff +} + +// runTriggeredBundle must be called with the stage.mu lock held. +// When in discarding mode, returns 0. +// When in accumulating mode, returns the number of fired elements to maintain a correct pending count. +func (ss *stageState) runTriggeredBundle(em *ElementManager, key []byte, win typex.Window) int { + bundID, ok, accumulationDiff := ss.startTriggeredBundle(key, win, func() string { return "agg-" + em.nextBundID() }) + + if ok { + rb := RunBundle{StageID: ss.ID, BundleID: bundID, Watermark: ss.input} + ss.bundlesToInject = append(ss.bundlesToInject, rb) + // Bundle is marked in progress here to prevent a race condition. + em.refreshCond.L.Lock() + em.inprogressBundles.insert(rb.BundleID) + em.refreshCond.L.Unlock() + } return accumulationDiff } From f5b0bd1cc66a593ba43df1bc3c90fb679ff458b1 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 6 Sep 2025 23:30:31 -0400 Subject: [PATCH 06/14] Retrieve after processing time triggers and set processing time timers accordingly. --- .../prism/internal/engine/elementmanager.go | 33 ++++++++++++ .../runners/prism/internal/engine/strategy.go | 50 +++++++++++++++++-- 2 files changed, 79 insertions(+), 4 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 8ca77ac42f65..87db9aa75bfb 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1306,6 +1306,39 @@ func (*aggregateStageKind) addPending(ss *stageState, em *ElementManager, newPen if ready { state.Pane = computeNextTriggeredPane(state.Pane, endOfWindowReached) + } else { + if pts := ss.strat.GetAfterProcessingTimeTriggers(); pts != nil { + for _, t := range pts { + ts := (&state).getTriggerState(t) + if ts.extra == nil || t.shouldFire((&state)) { + // Skipping inserting a processing time timer if the firing time + // is not set or it already should fire. + // When the after processing time triggers should fire, there are + // two scenarios: + // (1) the entire trigger of this window is ready to fire. In this + // case, `ready` should be true and we won't reach here. + // (2) we are still waiting for other triggers (subtriggers) to + // fire (e.g. AfterAll). + continue + } + firingTime := ts.extra.(mtime.Time) + notYetHolds := map[mtime.Time]int{} + timer := element{ + window: e.window, + timestamp: firingTime, + holdTimestamp: firingTime, + pane: typex.NoFiringPane(), + transform: ss.ID, // Use stage id to fake transform id + family: "AfterProcessingTime", + tag: "", + sequence: 1, + elmBytes: nil, + keyBytes: e.keyBytes, + } + ss.processingTimeTimers.Persist(firingTime, timer, notYetHolds) + em.processTimeEvents.Schedule(firingTime, ss.ID) + } + } } // Store the state as triggers may have changed it. ss.state[LinkID{}][e.window][string(e.keyBytes)] = state diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go index 7b2823447727..168c6e8fea71 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go @@ -77,6 +77,49 @@ func (ws WinStrat) String() string { return fmt.Sprintf("WinStrat[AllowedLateness:%v Trigger:%v]", ws.AllowedLateness, ws.Trigger) } +func getAfterProcessingTimeTriggers(t Trigger) []*TriggerAfterProcessingTime { + if t == nil { + return nil + } + var triggers []*TriggerAfterProcessingTime + switch at := t.(type) { + case *TriggerAfterProcessingTime: + return []*TriggerAfterProcessingTime{at} + case *TriggerAfterAll: + for _, st := range at.SubTriggers { + triggers = append(triggers, getAfterProcessingTimeTriggers(st)...) + } + return triggers + case *TriggerAfterAny: + for _, st := range at.SubTriggers { + triggers = append(triggers, getAfterProcessingTimeTriggers(st)...) + } + return triggers + case *TriggerAfterEach: + for _, st := range at.SubTriggers { + triggers = append(triggers, getAfterProcessingTimeTriggers(st)...) + } + return triggers + case *TriggerAfterEndOfWindow: + triggers = append(triggers, getAfterProcessingTimeTriggers(at.Early)...) + triggers = append(triggers, getAfterProcessingTimeTriggers(at.Late)...) + return triggers + case *TriggerOrFinally: + triggers = append(triggers, getAfterProcessingTimeTriggers(at.Main)...) + triggers = append(triggers, getAfterProcessingTimeTriggers(at.Finally)...) + return triggers + case *TriggerRepeatedly: + return getAfterProcessingTimeTriggers(at.Repeated) + default: + return nil + } +} + +// GetAfterProcessingTimeTriggers returns all AfterProcessingTime triggers within the trigger. +func (ws WinStrat) GetAfterProcessingTimeTriggers() []*TriggerAfterProcessingTime { + return getAfterProcessingTimeTriggers(ws.Trigger) +} + // triggerInput represents a Key + window + stage's trigger conditions. type triggerInput struct { newElementCount int // The number of new elements since the last check. @@ -582,7 +625,7 @@ type TimestampTransform struct { // TriggerAfterProcessingTime fires once after a specified amount of processing time // has passed since an element was first seen. -// Uses the extra state field to track the processing time of the first element. +// Uses the extra state field to track the firing time for this trigger. type TriggerAfterProcessingTime struct { Transforms []TimestampTransform } @@ -594,7 +637,7 @@ func (t *TriggerAfterProcessingTime) onElement(input triggerInput, state *StateD } if ts.extra == nil { - ts.extra = mtime.Now() + ts.extra = t.applyTimestampTransforms(mtime.Now()) } state.setTriggerState(t, ts) @@ -624,8 +667,7 @@ func (t *TriggerAfterProcessingTime) shouldFire(state *StateData) bool { if ts.extra == nil { return false } - startTime := ts.extra.(mtime.Time) - firingTime := t.applyTimestampTransforms(startTime) + firingTime := ts.extra.(mtime.Time) return mtime.Now() >= firingTime } From 3344b41b153a84ee88c2b8e068e12af0428e2898 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sun, 7 Sep 2025 00:38:57 -0400 Subject: [PATCH 07/14] Add buildProcessingTimeBundle to stageKind. Move part of the startProcessingTimeBundle into the build function for stagefulStageKind. --- .../prism/internal/engine/elementmanager.go | 35 ++++++++++++++----- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 87db9aa75bfb..8fcc6c831df1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1162,9 +1162,10 @@ type stageState struct { type stageKind interface { // addPending handles adding new pending elements to the stage appropriate for the kind. addPending(ss *stageState, em *ElementManager, newPending []element) int - // buildEventTimeBundle handles building bundles for the stage per it's kind. + // buildEventTimeBundle handles building event-time bundles for the stage per it's kind. buildEventTimeBundle(ss *stageState, watermark mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string], holdsInBundle map[mtime.Time]int, schedulable bool, pendingAdjustment int) - + // buildProcessingTimeBundle handles building processing-time bundles for the stage per it's kind. + buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string], holdsInBundle map[mtime.Time]int, schedulable bool) // updatePane based on the stage state. updatePane(ss *stageState, pane typex.PaneInfo, w typex.Window, keyBytes []byte) typex.PaneInfo } @@ -1859,6 +1860,22 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime. ss.mu.Lock() defer ss.mu.Unlock() + toProcess, minTs, newKeys, holdsInBundle, stillSchedulable := ss.kind.buildProcessingTimeBundle(ss, em, emNow) + + if len(toProcess) == 0 { + // If we have nothing + return "", false, stillSchedulable + } + bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, holdsInBundle) + return bundID, true, stillSchedulable +} + +func (*ordinaryStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (toProcess elementHeap, _ mtime.Time, _ set[string], _ map[mtime.Time]int, schedulable bool) { + slog.Error("ordinary stages can't have processing time elements") + return nil, mtime.MinTimestamp, nil, nil, false +} + +func (*statefulStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (toProcess elementHeap, _ mtime.Time, _ set[string], _ map[mtime.Time]int, schedulable bool) { // TODO: Determine if it's possible and a good idea to treat all EventTime processing as a MinTime // Special Case for ProcessingTime handling. // Eg. Always queue EventTime elements at minTime. @@ -1866,7 +1883,7 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime. // // Potentially puts too much work on the scheduling thread though. - var toProcess []element + // var toProcess []element minTs := mtime.MaxTimestamp holdsInBundle := map[mtime.Time]int{} @@ -1923,12 +1940,12 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime. // Add a refresh if there are still processing time events to process. stillSchedulable := (nextTime < emNow && nextTime != mtime.MaxTimestamp || len(notYet) > 0) - if len(toProcess) == 0 { - // If we have nothing - return "", false, stillSchedulable - } - bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, holdsInBundle) - return bundID, true, stillSchedulable + return toProcess, minTs, newKeys, holdsInBundle, stillSchedulable +} + +func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (toProcess elementHeap, _ mtime.Time, _ set[string], _ map[mtime.Time]int, schedulable bool) { + slog.Error("aggregate stages can't have processing time elements") + return nil, mtime.MinTimestamp, nil, nil, false } // makeInProgressBundle is common code to store a set of elements as a bundle in progress. From 51e7b9064e34be2b55475739c26ec6da4bd9b841 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 8 Sep 2025 12:23:52 -0400 Subject: [PATCH 08/14] Attempt to implement buildProcessingTimeBundle for aggregateKind. --- .../prism/internal/engine/elementmanager.go | 70 ++++++++++++++++++- 1 file changed, 68 insertions(+), 2 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 8fcc6c831df1..811e54246714 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1944,8 +1944,74 @@ func (*statefulStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementM } func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (toProcess elementHeap, _ mtime.Time, _ set[string], _ map[mtime.Time]int, schedulable bool) { - slog.Error("aggregate stages can't have processing time elements") - return nil, mtime.MinTimestamp, nil, nil, false + // var toProcess []element + minTs := mtime.MaxTimestamp + holdsInBundle := map[mtime.Time]int{} + + var notYet []fireElement + + nextTime := ss.processingTimeTimers.Peek() + keyCounts := map[string]int{} + newKeys := set[string]{} + + for nextTime <= emNow { + elems := ss.processingTimeTimers.FireAt(nextTime) + for _, e := range elems { + // Check if we're already executing this timer's key. + if ss.inprogressKeys.present(string(e.keyBytes)) { + notYet = append(notYet, fireElement{firing: nextTime, timer: e}) + continue + } + + // If we are set to have OneKeyPerBundle, and we already have a key for this bundle, we process it later. + if len(keyCounts) > 0 && OneKeyPerBundle { + notYet = append(notYet, fireElement{firing: nextTime, timer: e}) + continue + } + // If we are set to have OneElementPerKey, and we already have an element for this key we set this to process later. + if v := keyCounts[string(e.keyBytes)]; v > 0 && OneElementPerKey { + notYet = append(notYet, fireElement{firing: nextTime, timer: e}) + continue + } + keyCounts[string(e.keyBytes)]++ + newKeys.insert(string(e.keyBytes)) + if e.timestamp < minTs { + minTs = e.timestamp + } + holdsInBundle[e.holdTimestamp]++ + + state := ss.state[LinkID{}][e.window][string(e.keyBytes)] + endOfWindowReached := e.window.MaxTimestamp() < ss.input + ready := ss.strat.IsTriggerReady(triggerInput{ + newElementCount: 0, + endOfWindowReached: endOfWindowReached, + }, &state) + + if ready { + // We're going to process this trigger! + elems, _ := ss.buildTriggeredBundle(e.keyBytes, e.window) + toProcess = append(toProcess, elems...) + } + } + + nextTime = ss.processingTimeTimers.Peek() + if nextTime == mtime.MaxTimestamp { + // Escape the loop if there are no more events. + break + } + } + + // Reschedule unfired timers. + notYetHolds := map[mtime.Time]int{} + for _, v := range notYet { + ss.processingTimeTimers.Persist(v.firing, v.timer, notYetHolds) + em.processTimeEvents.Schedule(v.firing, ss.ID) + } + + // Add a refresh if there are still processing time events to process. + stillSchedulable := (nextTime < emNow && nextTime != mtime.MaxTimestamp || len(notYet) > 0) + + return toProcess, minTs, newKeys, holdsInBundle, stillSchedulable } // makeInProgressBundle is common code to store a set of elements as a bundle in progress. From f1514b51680053828f31637c9fcf513325e3d2ee Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 9 Sep 2025 00:29:17 -0400 Subject: [PATCH 09/14] Fix the holdwatermark for the implicit timer for after processing time trigger. However we ignore it atm. Fix trigger invoked more than expected due to state not cleared. --- .../beam/runners/prism/internal/engine/elementmanager.go | 7 +++++-- sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go | 7 ++++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 811e54246714..7b65af11c050 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1327,7 +1327,7 @@ func (*aggregateStageKind) addPending(ss *stageState, em *ElementManager, newPen timer := element{ window: e.window, timestamp: firingTime, - holdTimestamp: firingTime, + holdTimestamp: e.window.MaxTimestamp(), pane: typex.NoFiringPane(), transform: ss.ID, // Use stage id to fake transform id family: "AfterProcessingTime", @@ -1336,6 +1336,8 @@ func (*aggregateStageKind) addPending(ss *stageState, em *ElementManager, newPen elmBytes: nil, keyBytes: e.keyBytes, } + // TODO: how to deal with watermark holds for this implicit processing time timer + // ss.watermarkHolds.Add(timer.holdTimestamp, 1) ss.processingTimeTimers.Persist(firingTime, timer, notYetHolds) em.processTimeEvents.Schedule(firingTime, ss.ID) } @@ -1978,7 +1980,8 @@ func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em *Element if e.timestamp < minTs { minTs = e.timestamp } - holdsInBundle[e.holdTimestamp]++ + // TODO: how to deal with watermark holds for this implicit processing time timer + // holdsInBundle[e.holdTimestamp]++ state := ss.state[LinkID{}][e.window][string(e.keyBytes)] endOfWindowReached := e.window.MaxTimestamp() < ss.input diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go index 168c6e8fea71..e761b01675ac 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go @@ -671,7 +671,12 @@ func (t *TriggerAfterProcessingTime) shouldFire(state *StateData) bool { return mtime.Now() >= firingTime } -func (t *TriggerAfterProcessingTime) onFire(state *StateData) {} +func (t *TriggerAfterProcessingTime) onFire(state *StateData) { + if !t.shouldFire(state) { + return + } + triggerClearAndFinish(t, state) +} func (t *TriggerAfterProcessingTime) reset(state *StateData) { delete(state.Trigger, t) From 3938d6b6bdfbc7584c3801944267a30b0b37d41c Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 9 Sep 2025 11:02:53 -0400 Subject: [PATCH 10/14] Plumb runner processing time into trigger callbacks via triggerInput. --- .../prism/internal/engine/elementmanager.go | 4 +++- .../runners/prism/internal/engine/strategy.go | 23 +++++++++++++++---- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 7b65af11c050..32fb2f1b9a7b 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1303,6 +1303,7 @@ func (*aggregateStageKind) addPending(ss *stageState, em *ElementManager, newPen ready := ss.strat.IsTriggerReady(triggerInput{ newElementCount: 1, endOfWindowReached: endOfWindowReached, + emNow: em.ProcessingTimeNow(), }, &state) if ready { @@ -1322,7 +1323,7 @@ func (*aggregateStageKind) addPending(ss *stageState, em *ElementManager, newPen // fire (e.g. AfterAll). continue } - firingTime := ts.extra.(mtime.Time) + firingTime := ts.extra.(afterProcessingTimeState).firingTime notYetHolds := map[mtime.Time]int{} timer := element{ window: e.window, @@ -1988,6 +1989,7 @@ func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em *Element ready := ss.strat.IsTriggerReady(triggerInput{ newElementCount: 0, endOfWindowReached: endOfWindowReached, + emNow: emNow, }, &state) if ready { diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go index e761b01675ac..55dbe41344a4 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go @@ -122,8 +122,9 @@ func (ws WinStrat) GetAfterProcessingTimeTriggers() []*TriggerAfterProcessingTim // triggerInput represents a Key + window + stage's trigger conditions. type triggerInput struct { - newElementCount int // The number of new elements since the last check. - endOfWindowReached bool // Whether or not the end of the window has been reached. + newElementCount int // The number of new elements since the last check. + endOfWindowReached bool // Whether or not the end of the window has been reached. + emNow mtime.Time // The current processing time in the runner. } // Trigger represents a trigger for a windowing strategy. A trigger determines when @@ -630,6 +631,11 @@ type TriggerAfterProcessingTime struct { Transforms []TimestampTransform } +type afterProcessingTimeState struct { + emNow mtime.Time + firingTime mtime.Time +} + func (t *TriggerAfterProcessingTime) onElement(input triggerInput, state *StateData) { ts := state.getTriggerState(t) if ts.finished { @@ -637,7 +643,14 @@ func (t *TriggerAfterProcessingTime) onElement(input triggerInput, state *StateD } if ts.extra == nil { - ts.extra = t.applyTimestampTransforms(mtime.Now()) + ts.extra = afterProcessingTimeState{ + emNow: input.emNow, + firingTime: t.applyTimestampTransforms(input.emNow), + } + } else { + s, _ := ts.extra.(afterProcessingTimeState) + s.emNow = input.emNow + ts.extra = s } state.setTriggerState(t, ts) @@ -667,8 +680,8 @@ func (t *TriggerAfterProcessingTime) shouldFire(state *StateData) bool { if ts.extra == nil { return false } - firingTime := ts.extra.(mtime.Time) - return mtime.Now() >= firingTime + s := ts.extra.(afterProcessingTimeState) + return s.emNow >= s.firingTime } func (t *TriggerAfterProcessingTime) onFire(state *StateData) { From b4cefbca049f067d6ed850572bc6b953e06472d6 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 9 Sep 2025 13:37:25 -0400 Subject: [PATCH 11/14] Add unit tests for the new trigger --- .../prism/internal/engine/strategy_test.go | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go index 4934665833ed..208afa82df8e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go @@ -401,6 +401,99 @@ func TestTriggers_isReady(t *testing.T) { {triggerInput{newElementCount: 1, endOfWindowReached: true}, false}, {triggerInput{newElementCount: 1, endOfWindowReached: true}, true}, // Late }, + }, { + name: "afterProcessingTime_Delay_Exact", + trig: &TriggerAfterProcessingTime{ + Transforms: []TimestampTransform{ + {Delay: 3 * time.Second}, + }, + }, + inputs: []io{ + {triggerInput{emNow: 0}, false}, + {triggerInput{emNow: 1000}, false}, + {triggerInput{emNow: 2000}, false}, + {triggerInput{emNow: 3000}, true}, + {triggerInput{emNow: 4000}, false}, + {triggerInput{emNow: 5000}, false}, + {triggerInput{emNow: 6000}, false}, + {triggerInput{emNow: 7000}, false}, + }, + }, { + name: "afterProcessingTime_Delay_Late", + trig: &TriggerAfterProcessingTime{ + Transforms: []TimestampTransform{ + {Delay: 3 * time.Second}, + }, + }, + inputs: []io{ + {triggerInput{emNow: 0}, false}, + {triggerInput{emNow: 1000}, false}, + {triggerInput{emNow: 2000}, false}, + {triggerInput{emNow: 3001}, true}, // a little after the expected firing time + {triggerInput{emNow: 4000}, false}, + }, + }, { + name: "afterProcessingTime_AlignToPeriodOnly", + trig: &TriggerAfterProcessingTime{ + Transforms: []TimestampTransform{ + {AlignToPeriod: 5 * time.Second}, + }, + }, + inputs: []io{ + {triggerInput{emNow: 1500}, false}, + {triggerInput{emNow: 2000}, false}, + {triggerInput{emNow: 4999}, false}, + {triggerInput{emNow: 5000}, true}, // 1.5 is aligned to 5 + {triggerInput{emNow: 5001}, false}, + }, + }, { + name: "afterProcessingTime_AlignToPeriodAndOffset", + trig: &TriggerAfterProcessingTime{ + Transforms: []TimestampTransform{ + {AlignToPeriod: 5 * time.Second, AlignToOffset: 200 * time.Millisecond}, + }, + }, + inputs: []io{ + {triggerInput{emNow: 1500}, false}, + {triggerInput{emNow: 2000}, false}, + {triggerInput{emNow: 5119}, false}, + {triggerInput{emNow: 5200}, true}, // 1.5 is aligned to 5.2 + {triggerInput{emNow: 5201}, false}, + }, + }, { + name: "afterProcessingTime_TwoTransforms", + trig: &TriggerAfterProcessingTime{ + Transforms: []TimestampTransform{ + {AlignToPeriod: 5 * time.Second, AlignToOffset: 200 * time.Millisecond}, + {Delay: 1 * time.Second}, + }, + }, + inputs: []io{ + {triggerInput{emNow: 1500}, false}, + {triggerInput{emNow: 2000}, false}, + {triggerInput{emNow: 5119}, false}, + {triggerInput{emNow: 5200}, false}, + {triggerInput{emNow: 5201}, false}, + {triggerInput{emNow: 6119}, false}, + {triggerInput{emNow: 6200}, true}, // 1.5 is aligned to 6.2 + {triggerInput{emNow: 6201}, false}, + }, + }, { + name: "afterProcessingTime_Repeated", trig: &TriggerRepeatedly{ + &TriggerAfterProcessingTime{ + Transforms: []TimestampTransform{ + {Delay: 3 * time.Second}, + }}}, + inputs: []io{ + {triggerInput{emNow: 0}, false}, + {triggerInput{emNow: 1000}, false}, + {triggerInput{emNow: 2000}, false}, + {triggerInput{emNow: 3000}, true}, // first the first time + {triggerInput{emNow: 4000}, false}, // trigger firing time is set again + {triggerInput{emNow: 5000}, false}, + {triggerInput{emNow: 6000}, false}, + {triggerInput{emNow: 7000}, true}, // trigger firing again + }, }, { name: "default", trig: &TriggerDefault{}, From 356b582558812bd257b1bbffaee41b2872db21bd Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 9 Sep 2025 15:46:33 -0400 Subject: [PATCH 12/14] Add wake up when a processing timer is scheduled. --- .../runners/prism/internal/engine/elementmanager.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 32fb2f1b9a7b..5a4491745b4d 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1084,6 +1084,14 @@ func (em *ElementManager) refreshWatermarks() set[string] { return refreshed } +func (em *ElementManager) wakeUpAt(t mtime.Time) { + go func(fireAt time.Time) { + time.AfterFunc(time.Until(fireAt), func() { + em.refreshCond.Broadcast() + }) + }(t.ToTime()) +} + type set[K comparable] map[K]struct{} func (s set[K]) present(k K) bool { @@ -1341,6 +1349,7 @@ func (*aggregateStageKind) addPending(ss *stageState, em *ElementManager, newPen // ss.watermarkHolds.Add(timer.holdTimestamp, 1) ss.processingTimeTimers.Persist(firingTime, timer, notYetHolds) em.processTimeEvents.Schedule(firingTime, ss.ID) + em.wakeUpAt(firingTime) } } } @@ -1938,6 +1947,7 @@ func (*statefulStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementM for _, v := range notYet { ss.processingTimeTimers.Persist(v.firing, v.timer, notYetHolds) em.processTimeEvents.Schedule(v.firing, ss.ID) + em.wakeUpAt(v.firing) } // Add a refresh if there are still processing time events to process. @@ -2011,6 +2021,7 @@ func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em *Element for _, v := range notYet { ss.processingTimeTimers.Persist(v.firing, v.timer, notYetHolds) em.processTimeEvents.Schedule(v.firing, ss.ID) + em.wakeUpAt(v.firing) } // Add a refresh if there are still processing time events to process. From fcac8c8d84b9fe4cd28c4dd23ecb82251b5298b9 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 9 Sep 2025 23:56:53 -0400 Subject: [PATCH 13/14] Fix the over-conservative condition for determining if a event-time bundle is ready. --- .../beam/runners/prism/internal/engine/elementmanager.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 5a4491745b4d..b7c184952c30 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1161,6 +1161,8 @@ type stageState struct { inprogressHoldsByBundle map[string]map[mtime.Time]int // bundle to associated holds. processingTimeTimers *timerHandler + + watermarkAdvanced bool // whether the watermark for this stage has advanced } // stageKind handles behavioral differences between ordinary, stateful, and aggregation stage kinds. @@ -2144,6 +2146,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { // If bigger, advance the input watermark. if newIn > ss.input { ss.input = newIn + ss.watermarkAdvanced = true } // The output starts with the new input as the basis. newOut := ss.input @@ -2302,7 +2305,7 @@ func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.T // then we can't yet process this stage. inputW := ss.input _, upstreamW := ss.UpstreamWatermark() - if inputW == upstreamW { + if inputW == upstreamW && !ss.watermarkAdvanced { slog.Debug("bundleReady: unchanged upstream watermark", slog.String("stage", ss.ID), slog.Group("watermark", @@ -2310,6 +2313,8 @@ func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.T slog.Any("input", inputW))) return mtime.MinTimestamp, false, ptimeEventsReady, injectedReady } + + ss.watermarkAdvanced = false ready := true for _, side := range ss.sides { pID, ok := em.pcolParents[side.Global] From 616542cb787b94aeabd30a75f05e6970fb4a7ecd Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 10 Sep 2025 00:30:48 -0400 Subject: [PATCH 14/14] Minor cleanup --- sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index b7c184952c30..7ce9403f78d1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1897,7 +1897,6 @@ func (*statefulStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementM // // Potentially puts too much work on the scheduling thread though. - // var toProcess []element minTs := mtime.MaxTimestamp holdsInBundle := map[mtime.Time]int{}