diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 2ddd7bbc5c1f..7ce9403f78d1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1084,6 +1084,14 @@ func (em *ElementManager) refreshWatermarks() set[string] { return refreshed } +func (em *ElementManager) wakeUpAt(t mtime.Time) { + go func(fireAt time.Time) { + time.AfterFunc(time.Until(fireAt), func() { + em.refreshCond.Broadcast() + }) + }(t.ToTime()) +} + type set[K comparable] map[K]struct{} func (s set[K]) present(k K) bool { @@ -1153,6 +1161,8 @@ type stageState struct { inprogressHoldsByBundle map[string]map[mtime.Time]int // bundle to associated holds. processingTimeTimers *timerHandler + + watermarkAdvanced bool // whether the watermark for this stage has advanced } // stageKind handles behavioral differences between ordinary, stateful, and aggregation stage kinds. @@ -1162,9 +1172,10 @@ type stageState struct { type stageKind interface { // addPending handles adding new pending elements to the stage appropriate for the kind. addPending(ss *stageState, em *ElementManager, newPending []element) int - // buildEventTimeBundle handles building bundles for the stage per it's kind. + // buildEventTimeBundle handles building event-time bundles for the stage per it's kind. buildEventTimeBundle(ss *stageState, watermark mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string], holdsInBundle map[mtime.Time]int, schedulable bool, pendingAdjustment int) - + // buildProcessingTimeBundle handles building processing-time bundles for the stage per it's kind. + buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string], holdsInBundle map[mtime.Time]int, schedulable bool) // updatePane based on the stage state. updatePane(ss *stageState, pane typex.PaneInfo, w typex.Window, keyBytes []byte) typex.PaneInfo } @@ -1302,17 +1313,54 @@ func (*aggregateStageKind) addPending(ss *stageState, em *ElementManager, newPen ready := ss.strat.IsTriggerReady(triggerInput{ newElementCount: 1, endOfWindowReached: endOfWindowReached, + emNow: em.ProcessingTimeNow(), }, &state) if ready { state.Pane = computeNextTriggeredPane(state.Pane, endOfWindowReached) + } else { + if pts := ss.strat.GetAfterProcessingTimeTriggers(); pts != nil { + for _, t := range pts { + ts := (&state).getTriggerState(t) + if ts.extra == nil || t.shouldFire((&state)) { + // Skipping inserting a processing time timer if the firing time + // is not set or it already should fire. + // When the after processing time triggers should fire, there are + // two scenarios: + // (1) the entire trigger of this window is ready to fire. In this + // case, `ready` should be true and we won't reach here. + // (2) we are still waiting for other triggers (subtriggers) to + // fire (e.g. AfterAll). + continue + } + firingTime := ts.extra.(afterProcessingTimeState).firingTime + notYetHolds := map[mtime.Time]int{} + timer := element{ + window: e.window, + timestamp: firingTime, + holdTimestamp: e.window.MaxTimestamp(), + pane: typex.NoFiringPane(), + transform: ss.ID, // Use stage id to fake transform id + family: "AfterProcessingTime", + tag: "", + sequence: 1, + elmBytes: nil, + keyBytes: e.keyBytes, + } + // TODO: how to deal with watermark holds for this implicit processing time timer + // ss.watermarkHolds.Add(timer.holdTimestamp, 1) + ss.processingTimeTimers.Persist(firingTime, timer, notYetHolds) + em.processTimeEvents.Schedule(firingTime, ss.ID) + em.wakeUpAt(firingTime) + } + } } // Store the state as triggers may have changed it. ss.state[LinkID{}][e.window][string(e.keyBytes)] = state // If we're ready, it's time to fire! if ready { - count += ss.buildTriggeredBundle(em, e.keyBytes, e.window) + count += ss.runTriggeredBundle(em, e.keyBytes, e.window) } } return count @@ -1427,16 +1475,11 @@ func computeNextWatermarkPane(pane typex.PaneInfo) typex.PaneInfo { return pane } -// buildTriggeredBundle must be called with the stage.mu lock held. -// When in discarding mode, returns 0. -// When in accumulating mode, returns the number of fired elements to maintain a correct pending count. -func (ss *stageState) buildTriggeredBundle(em *ElementManager, key []byte, win typex.Window) int { +func (ss *stageState) buildTriggeredBundle(key []byte, win typex.Window) ([]element, int) { var toProcess []element dnt := ss.pendingByKeys[string(key)] var notYet []element - rb := RunBundle{StageID: ss.ID, BundleID: "agg-" + em.nextBundID(), Watermark: ss.input} - // Look at all elements for this key, and only for this window. for dnt.elements.Len() > 0 { e := heap.Pop(&dnt.elements).(element) @@ -1466,22 +1509,43 @@ func (ss *stageState) buildTriggeredBundle(em *ElementManager, key []byte, win t // Ensure the heap invariants are maintained. heap.Init(&dnt.elements) } + return toProcess, accumulationDiff +} +func (ss *stageState) startTriggeredBundle(key []byte, win typex.Window, genBundID func() string) (string, bool, int) { + toProcess, accumulationDiff := ss.buildTriggeredBundle(key, win) + + if len(toProcess) == 0 { + return "", false, accumulationDiff + } if ss.inprogressKeys == nil { ss.inprogressKeys = set[string]{} } - ss.makeInProgressBundle( - func() string { return rb.BundleID }, + bundID := ss.makeInProgressBundle( + genBundID, toProcess, ss.input, singleSet(string(key)), nil, ) - ss.bundlesToInject = append(ss.bundlesToInject, rb) - // Bundle is marked in progress here to prevent a race condition. - em.refreshCond.L.Lock() - em.inprogressBundles.insert(rb.BundleID) - em.refreshCond.L.Unlock() + + return bundID, true, accumulationDiff +} + +// runTriggeredBundle must be called with the stage.mu lock held. +// When in discarding mode, returns 0. +// When in accumulating mode, returns the number of fired elements to maintain a correct pending count. +func (ss *stageState) runTriggeredBundle(em *ElementManager, key []byte, win typex.Window) int { + bundID, ok, accumulationDiff := ss.startTriggeredBundle(key, win, func() string { return "agg-" + em.nextBundID() }) + + if ok { + rb := RunBundle{StageID: ss.ID, BundleID: bundID, Watermark: ss.input} + ss.bundlesToInject = append(ss.bundlesToInject, rb) + // Bundle is marked in progress here to prevent a race condition. + em.refreshCond.L.Lock() + em.inprogressBundles.insert(rb.BundleID) + em.refreshCond.L.Unlock() + } return accumulationDiff } @@ -1810,6 +1874,22 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime. ss.mu.Lock() defer ss.mu.Unlock() + toProcess, minTs, newKeys, holdsInBundle, stillSchedulable := ss.kind.buildProcessingTimeBundle(ss, em, emNow) + + if len(toProcess) == 0 { + // If we have nothing + return "", false, stillSchedulable + } + bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, holdsInBundle) + return bundID, true, stillSchedulable +} + +func (*ordinaryStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (toProcess elementHeap, _ mtime.Time, _ set[string], _ map[mtime.Time]int, schedulable bool) { + slog.Error("ordinary stages can't have processing time elements") + return nil, mtime.MinTimestamp, nil, nil, false +} + +func (*statefulStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (toProcess elementHeap, _ mtime.Time, _ set[string], _ map[mtime.Time]int, schedulable bool) { // TODO: Determine if it's possible and a good idea to treat all EventTime processing as a MinTime // Special Case for ProcessingTime handling. // Eg. Always queue EventTime elements at minTime. @@ -1817,7 +1897,6 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime. // // Potentially puts too much work on the scheduling thread though. - var toProcess []element minTs := mtime.MaxTimestamp holdsInBundle := map[mtime.Time]int{} @@ -1869,17 +1948,87 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime. for _, v := range notYet { ss.processingTimeTimers.Persist(v.firing, v.timer, notYetHolds) em.processTimeEvents.Schedule(v.firing, ss.ID) + em.wakeUpAt(v.firing) } // Add a refresh if there are still processing time events to process. stillSchedulable := (nextTime < emNow && nextTime != mtime.MaxTimestamp || len(notYet) > 0) - if len(toProcess) == 0 { - // If we have nothing - return "", false, stillSchedulable + return toProcess, minTs, newKeys, holdsInBundle, stillSchedulable +} + +func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (toProcess elementHeap, _ mtime.Time, _ set[string], _ map[mtime.Time]int, schedulable bool) { + // var toProcess []element + minTs := mtime.MaxTimestamp + holdsInBundle := map[mtime.Time]int{} + + var notYet []fireElement + + nextTime := ss.processingTimeTimers.Peek() + keyCounts := map[string]int{} + newKeys := set[string]{} + + for nextTime <= emNow { + elems := ss.processingTimeTimers.FireAt(nextTime) + for _, e := range elems { + // Check if we're already executing this timer's key. + if ss.inprogressKeys.present(string(e.keyBytes)) { + notYet = append(notYet, fireElement{firing: nextTime, timer: e}) + continue + } + + // If we are set to have OneKeyPerBundle, and we already have a key for this bundle, we process it later. + if len(keyCounts) > 0 && OneKeyPerBundle { + notYet = append(notYet, fireElement{firing: nextTime, timer: e}) + continue + } + // If we are set to have OneElementPerKey, and we already have an element for this key we set this to process later. + if v := keyCounts[string(e.keyBytes)]; v > 0 && OneElementPerKey { + notYet = append(notYet, fireElement{firing: nextTime, timer: e}) + continue + } + keyCounts[string(e.keyBytes)]++ + newKeys.insert(string(e.keyBytes)) + if e.timestamp < minTs { + minTs = e.timestamp + } + // TODO: how to deal with watermark holds for this implicit processing time timer + // holdsInBundle[e.holdTimestamp]++ + + state := ss.state[LinkID{}][e.window][string(e.keyBytes)] + endOfWindowReached := e.window.MaxTimestamp() < ss.input + ready := ss.strat.IsTriggerReady(triggerInput{ + newElementCount: 0, + endOfWindowReached: endOfWindowReached, + emNow: emNow, + }, &state) + + if ready { + // We're going to process this trigger! + elems, _ := ss.buildTriggeredBundle(e.keyBytes, e.window) + toProcess = append(toProcess, elems...) + } + } + + nextTime = ss.processingTimeTimers.Peek() + if nextTime == mtime.MaxTimestamp { + // Escape the loop if there are no more events. + break + } } - bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, holdsInBundle) - return bundID, true, stillSchedulable + + // Reschedule unfired timers. + notYetHolds := map[mtime.Time]int{} + for _, v := range notYet { + ss.processingTimeTimers.Persist(v.firing, v.timer, notYetHolds) + em.processTimeEvents.Schedule(v.firing, ss.ID) + em.wakeUpAt(v.firing) + } + + // Add a refresh if there are still processing time events to process. + stillSchedulable := (nextTime < emNow && nextTime != mtime.MaxTimestamp || len(notYet) > 0) + + return toProcess, minTs, newKeys, holdsInBundle, stillSchedulable } // makeInProgressBundle is common code to store a set of elements as a bundle in progress. @@ -1996,6 +2145,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { // If bigger, advance the input watermark. if newIn > ss.input { ss.input = newIn + ss.watermarkAdvanced = true } // The output starts with the new input as the basis. newOut := ss.input @@ -2154,7 +2304,7 @@ func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.T // then we can't yet process this stage. inputW := ss.input _, upstreamW := ss.UpstreamWatermark() - if inputW == upstreamW { + if inputW == upstreamW && !ss.watermarkAdvanced { slog.Debug("bundleReady: unchanged upstream watermark", slog.String("stage", ss.ID), slog.Group("watermark", @@ -2162,6 +2312,8 @@ func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.T slog.Any("input", inputW))) return mtime.MinTimestamp, false, ptimeEventsReady, injectedReady } + + ss.watermarkAdvanced = false ready := true for _, side := range ss.sides { pID, ok := em.pcolParents[side.Global] diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go index 5446d3edd3c0..55dbe41344a4 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go @@ -77,10 +77,54 @@ func (ws WinStrat) String() string { return fmt.Sprintf("WinStrat[AllowedLateness:%v Trigger:%v]", ws.AllowedLateness, ws.Trigger) } +func getAfterProcessingTimeTriggers(t Trigger) []*TriggerAfterProcessingTime { + if t == nil { + return nil + } + var triggers []*TriggerAfterProcessingTime + switch at := t.(type) { + case *TriggerAfterProcessingTime: + return []*TriggerAfterProcessingTime{at} + case *TriggerAfterAll: + for _, st := range at.SubTriggers { + triggers = append(triggers, getAfterProcessingTimeTriggers(st)...) + } + return triggers + case *TriggerAfterAny: + for _, st := range at.SubTriggers { + triggers = append(triggers, getAfterProcessingTimeTriggers(st)...) + } + return triggers + case *TriggerAfterEach: + for _, st := range at.SubTriggers { + triggers = append(triggers, getAfterProcessingTimeTriggers(st)...) + } + return triggers + case *TriggerAfterEndOfWindow: + triggers = append(triggers, getAfterProcessingTimeTriggers(at.Early)...) + triggers = append(triggers, getAfterProcessingTimeTriggers(at.Late)...) + return triggers + case *TriggerOrFinally: + triggers = append(triggers, getAfterProcessingTimeTriggers(at.Main)...) + triggers = append(triggers, getAfterProcessingTimeTriggers(at.Finally)...) + return triggers + case *TriggerRepeatedly: + return getAfterProcessingTimeTriggers(at.Repeated) + default: + return nil + } +} + +// GetAfterProcessingTimeTriggers returns all AfterProcessingTime triggers within the trigger. +func (ws WinStrat) GetAfterProcessingTimeTriggers() []*TriggerAfterProcessingTime { + return getAfterProcessingTimeTriggers(ws.Trigger) +} + // triggerInput represents a Key + window + stage's trigger conditions. type triggerInput struct { - newElementCount int // The number of new elements since the last check. - endOfWindowReached bool // Whether or not the end of the window has been reached. + newElementCount int // The number of new elements since the last check. + endOfWindowReached bool // Whether or not the end of the window has been reached. + emNow mtime.Time // The current processing time in the runner. } // Trigger represents a trigger for a windowing strategy. A trigger determines when @@ -573,4 +617,84 @@ func (t *TriggerDefault) String() string { return "Default" } -// TODO https://github.com/apache/beam/issues/31438 Handle TriggerAfterProcessingTime +// TimestampTransform is the engine's representation of a processing time transform. +type TimestampTransform struct { + Delay time.Duration + AlignToPeriod time.Duration + AlignToOffset time.Duration +} + +// TriggerAfterProcessingTime fires once after a specified amount of processing time +// has passed since an element was first seen. +// Uses the extra state field to track the firing time for this trigger. +type TriggerAfterProcessingTime struct { + Transforms []TimestampTransform +} + +type afterProcessingTimeState struct { + emNow mtime.Time + firingTime mtime.Time +} + +func (t *TriggerAfterProcessingTime) onElement(input triggerInput, state *StateData) { + ts := state.getTriggerState(t) + if ts.finished { + return + } + + if ts.extra == nil { + ts.extra = afterProcessingTimeState{ + emNow: input.emNow, + firingTime: t.applyTimestampTransforms(input.emNow), + } + } else { + s, _ := ts.extra.(afterProcessingTimeState) + s.emNow = input.emNow + ts.extra = s + } + + state.setTriggerState(t, ts) +} + +func (t *TriggerAfterProcessingTime) applyTimestampTransforms(start mtime.Time) mtime.Time { + ret := start + for _, transform := range t.Transforms { + ret = ret + mtime.Time(transform.Delay/time.Millisecond) + if transform.AlignToPeriod > 0 { + // timestamp - (timestamp % period) + period + // And with an offset, we adjust before and after. + tsMs := ret + periodMs := mtime.Time(transform.AlignToPeriod / time.Millisecond) + offsetMs := mtime.Time(transform.AlignToOffset / time.Millisecond) + + adjustedMs := tsMs - offsetMs + alignedMs := adjustedMs - (adjustedMs % periodMs) + periodMs + offsetMs + ret = alignedMs + } + } + return ret +} + +func (t *TriggerAfterProcessingTime) shouldFire(state *StateData) bool { + ts := state.getTriggerState(t) + if ts.extra == nil { + return false + } + s := ts.extra.(afterProcessingTimeState) + return s.emNow >= s.firingTime +} + +func (t *TriggerAfterProcessingTime) onFire(state *StateData) { + if !t.shouldFire(state) { + return + } + triggerClearAndFinish(t, state) +} + +func (t *TriggerAfterProcessingTime) reset(state *StateData) { + delete(state.Trigger, t) +} + +func (t *TriggerAfterProcessingTime) String() string { + return fmt.Sprintf("AfterProcessingTime[%v]", t.Transforms) +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go index 4934665833ed..208afa82df8e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go @@ -401,6 +401,99 @@ func TestTriggers_isReady(t *testing.T) { {triggerInput{newElementCount: 1, endOfWindowReached: true}, false}, {triggerInput{newElementCount: 1, endOfWindowReached: true}, true}, // Late }, + }, { + name: "afterProcessingTime_Delay_Exact", + trig: &TriggerAfterProcessingTime{ + Transforms: []TimestampTransform{ + {Delay: 3 * time.Second}, + }, + }, + inputs: []io{ + {triggerInput{emNow: 0}, false}, + {triggerInput{emNow: 1000}, false}, + {triggerInput{emNow: 2000}, false}, + {triggerInput{emNow: 3000}, true}, + {triggerInput{emNow: 4000}, false}, + {triggerInput{emNow: 5000}, false}, + {triggerInput{emNow: 6000}, false}, + {triggerInput{emNow: 7000}, false}, + }, + }, { + name: "afterProcessingTime_Delay_Late", + trig: &TriggerAfterProcessingTime{ + Transforms: []TimestampTransform{ + {Delay: 3 * time.Second}, + }, + }, + inputs: []io{ + {triggerInput{emNow: 0}, false}, + {triggerInput{emNow: 1000}, false}, + {triggerInput{emNow: 2000}, false}, + {triggerInput{emNow: 3001}, true}, // a little after the expected firing time + {triggerInput{emNow: 4000}, false}, + }, + }, { + name: "afterProcessingTime_AlignToPeriodOnly", + trig: &TriggerAfterProcessingTime{ + Transforms: []TimestampTransform{ + {AlignToPeriod: 5 * time.Second}, + }, + }, + inputs: []io{ + {triggerInput{emNow: 1500}, false}, + {triggerInput{emNow: 2000}, false}, + {triggerInput{emNow: 4999}, false}, + {triggerInput{emNow: 5000}, true}, // 1.5 is aligned to 5 + {triggerInput{emNow: 5001}, false}, + }, + }, { + name: "afterProcessingTime_AlignToPeriodAndOffset", + trig: &TriggerAfterProcessingTime{ + Transforms: []TimestampTransform{ + {AlignToPeriod: 5 * time.Second, AlignToOffset: 200 * time.Millisecond}, + }, + }, + inputs: []io{ + {triggerInput{emNow: 1500}, false}, + {triggerInput{emNow: 2000}, false}, + {triggerInput{emNow: 5119}, false}, + {triggerInput{emNow: 5200}, true}, // 1.5 is aligned to 5.2 + {triggerInput{emNow: 5201}, false}, + }, + }, { + name: "afterProcessingTime_TwoTransforms", + trig: &TriggerAfterProcessingTime{ + Transforms: []TimestampTransform{ + {AlignToPeriod: 5 * time.Second, AlignToOffset: 200 * time.Millisecond}, + {Delay: 1 * time.Second}, + }, + }, + inputs: []io{ + {triggerInput{emNow: 1500}, false}, + {triggerInput{emNow: 2000}, false}, + {triggerInput{emNow: 5119}, false}, + {triggerInput{emNow: 5200}, false}, + {triggerInput{emNow: 5201}, false}, + {triggerInput{emNow: 6119}, false}, + {triggerInput{emNow: 6200}, true}, // 1.5 is aligned to 6.2 + {triggerInput{emNow: 6201}, false}, + }, + }, { + name: "afterProcessingTime_Repeated", trig: &TriggerRepeatedly{ + &TriggerAfterProcessingTime{ + Transforms: []TimestampTransform{ + {Delay: 3 * time.Second}, + }}}, + inputs: []io{ + {triggerInput{emNow: 0}, false}, + {triggerInput{emNow: 1000}, false}, + {triggerInput{emNow: 2000}, false}, + {triggerInput{emNow: 3000}, true}, // first the first time + {triggerInput{emNow: 4000}, false}, // trigger firing time is set again + {triggerInput{emNow: 5000}, false}, + {triggerInput{emNow: 6000}, false}, + {triggerInput{emNow: 7000}, true}, // trigger firing again + }, }, { name: "default", trig: &TriggerDefault{}, diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 5b277923d290..5822ecb0970f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -457,7 +457,27 @@ func buildTrigger(tpb *pipepb.Trigger) engine.Trigger { } case *pipepb.Trigger_Repeat_: return &engine.TriggerRepeatedly{Repeated: buildTrigger(at.Repeat.GetSubtrigger())} - case *pipepb.Trigger_AfterProcessingTime_, *pipepb.Trigger_AfterSynchronizedProcessingTime_: + case *pipepb.Trigger_AfterProcessingTime_: + var transforms []engine.TimestampTransform + for _, ts := range at.AfterProcessingTime.GetTimestampTransforms() { + var delay, period, offset time.Duration + if d := ts.GetDelay(); d != nil { + delay = time.Duration(d.GetDelayMillis()) * time.Millisecond + } + if align := ts.GetAlignTo(); align != nil { + period = time.Duration(align.GetPeriod()) * time.Millisecond + offset = time.Duration(align.GetOffset()) * time.Millisecond + } + transforms = append(transforms, engine.TimestampTransform{ + Delay: delay, + AlignToPeriod: period, + AlignToOffset: offset, + }) + } + return &engine.TriggerAfterProcessingTime{ + Transforms: transforms, + } + case *pipepb.Trigger_AfterSynchronizedProcessingTime_: panic(fmt.Sprintf("unsupported trigger: %v", prototext.Format(tpb))) default: return &engine.TriggerDefault{} diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index f00838152111..12c3c42c2e92 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -316,7 +316,7 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ * func hasUnsupportedTriggers(tpb *pipepb.Trigger) bool { unsupported := false switch at := tpb.GetTrigger().(type) { - case *pipepb.Trigger_AfterProcessingTime_, *pipepb.Trigger_AfterSynchronizedProcessingTime_: + case *pipepb.Trigger_AfterSynchronizedProcessingTime_: return true case *pipepb.Trigger_AfterAll_: for _, st := range at.AfterAll.GetSubtriggers() {