Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 175 additions & 23 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,14 @@ func (em *ElementManager) refreshWatermarks() set[string] {
return refreshed
}

func (em *ElementManager) wakeUpAt(t mtime.Time) {
go func(fireAt time.Time) {
time.AfterFunc(time.Until(fireAt), func() {
em.refreshCond.Broadcast()
})
}(t.ToTime())
Comment on lines +1088 to +1092
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also only be creating this Goroutine when the Real Time Clock is being used, and testStreamHandler is nil. Otherwise behavior will be additionally wonky.

eg.

if em.testStreamHandler != nil && !em.testStreamHandler.completed {

I recommend moving this clock function down there too, as they are related.

}

type set[K comparable] map[K]struct{}

func (s set[K]) present(k K) bool {
Expand Down Expand Up @@ -1153,6 +1161,8 @@ type stageState struct {
inprogressHoldsByBundle map[string]map[mtime.Time]int // bundle to associated holds.

processingTimeTimers *timerHandler

watermarkAdvanced bool // whether the watermark for this stage has advanced
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented elsewhere, but what this is whether the input watermark has advanced or not since the last bundle. Previously we just needed the output watermark time.

}

// stageKind handles behavioral differences between ordinary, stateful, and aggregation stage kinds.
Expand All @@ -1162,9 +1172,10 @@ type stageState struct {
type stageKind interface {
// addPending handles adding new pending elements to the stage appropriate for the kind.
addPending(ss *stageState, em *ElementManager, newPending []element) int
// buildEventTimeBundle handles building bundles for the stage per it's kind.
// buildEventTimeBundle handles building event-time bundles for the stage per it's kind.
buildEventTimeBundle(ss *stageState, watermark mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string], holdsInBundle map[mtime.Time]int, schedulable bool, pendingAdjustment int)

// buildProcessingTimeBundle handles building processing-time bundles for the stage per it's kind.
buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string], holdsInBundle map[mtime.Time]int, schedulable bool)
// updatePane based on the stage state.
updatePane(ss *stageState, pane typex.PaneInfo, w typex.Window, keyBytes []byte) typex.PaneInfo
}
Expand Down Expand Up @@ -1302,17 +1313,54 @@ func (*aggregateStageKind) addPending(ss *stageState, em *ElementManager, newPen
ready := ss.strat.IsTriggerReady(triggerInput{
newElementCount: 1,
endOfWindowReached: endOfWindowReached,
emNow: em.ProcessingTimeNow(),
}, &state)

if ready {
state.Pane = computeNextTriggeredPane(state.Pane, endOfWindowReached)
} else {
if pts := ss.strat.GetAfterProcessingTimeTriggers(); pts != nil {
for _, t := range pts {
ts := (&state).getTriggerState(t)
if ts.extra == nil || t.shouldFire((&state)) {
// Skipping inserting a processing time timer if the firing time
// is not set or it already should fire.
// When the after processing time triggers should fire, there are
// two scenarios:
// (1) the entire trigger of this window is ready to fire. In this
// case, `ready` should be true and we won't reach here.
// (2) we are still waiting for other triggers (subtriggers) to
// fire (e.g. AfterAll).
continue
}
firingTime := ts.extra.(afterProcessingTimeState).firingTime
notYetHolds := map[mtime.Time]int{}
timer := element{
window: e.window,
timestamp: firingTime,
holdTimestamp: e.window.MaxTimestamp(),
pane: typex.NoFiringPane(),
transform: ss.ID, // Use stage id to fake transform id
family: "AfterProcessingTime",
tag: "",
sequence: 1,
elmBytes: nil,
keyBytes: e.keyBytes,
}
// TODO: how to deal with watermark holds for this implicit processing time timer
// ss.watermarkHolds.Add(timer.holdTimestamp, 1)
ss.processingTimeTimers.Persist(firingTime, timer, notYetHolds)
em.processTimeEvents.Schedule(firingTime, ss.ID)
em.wakeUpAt(firingTime)
Comment on lines +1353 to +1354
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend just putting the em.processTime.Schedule call inside the wakeUpAt call, and replacing all the existing calls. Perhaps a better name: scheduleProcessingTimeEventForStage

Avoids issues where one is called but the other is not.


On the other hand: If we keep them separate we have a better chance of reducing the number of spare goroutines/timers we're keeping around. Eg. On such a firing, we see when the next processing time event is, and re-schedule it. That feels like a loop we can build and initialize as part of the Bundles set up... I'll give it some more thought.

}
}
}
// Store the state as triggers may have changed it.
ss.state[LinkID{}][e.window][string(e.keyBytes)] = state

// If we're ready, it's time to fire!
if ready {
count += ss.buildTriggeredBundle(em, e.keyBytes, e.window)
count += ss.runTriggeredBundle(em, e.keyBytes, e.window)
}
}
return count
Expand Down Expand Up @@ -1427,16 +1475,11 @@ func computeNextWatermarkPane(pane typex.PaneInfo) typex.PaneInfo {
return pane
}

// buildTriggeredBundle must be called with the stage.mu lock held.
// When in discarding mode, returns 0.
// When in accumulating mode, returns the number of fired elements to maintain a correct pending count.
func (ss *stageState) buildTriggeredBundle(em *ElementManager, key []byte, win typex.Window) int {
func (ss *stageState) buildTriggeredBundle(key []byte, win typex.Window) ([]element, int) {
var toProcess []element
dnt := ss.pendingByKeys[string(key)]
var notYet []element

rb := RunBundle{StageID: ss.ID, BundleID: "agg-" + em.nextBundID(), Watermark: ss.input}

// Look at all elements for this key, and only for this window.
for dnt.elements.Len() > 0 {
e := heap.Pop(&dnt.elements).(element)
Expand Down Expand Up @@ -1466,22 +1509,43 @@ func (ss *stageState) buildTriggeredBundle(em *ElementManager, key []byte, win t
// Ensure the heap invariants are maintained.
heap.Init(&dnt.elements)
}
return toProcess, accumulationDiff
}

func (ss *stageState) startTriggeredBundle(key []byte, win typex.Window, genBundID func() string) (string, bool, int) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is mutating the stage state, please also document that we need the stage.mu lock held. Same for buildTriggeredBundle above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, since this is ultimately only called by runTriggeredBundle, we should just move it all into there instead. I can be convinced otherwise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also document what the returns are please:

...returns the bundleID, whether the bundle is OK, and any necessary adjustments to the pending count due to accumulation.

toProcess, accumulationDiff := ss.buildTriggeredBundle(key, win)

if len(toProcess) == 0 {
return "", false, accumulationDiff
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should accumulationDiff ever be non-zero when a bundle isn't going to be run?

}
if ss.inprogressKeys == nil {
ss.inprogressKeys = set[string]{}
}
ss.makeInProgressBundle(
func() string { return rb.BundleID },
bundID := ss.makeInProgressBundle(
genBundID,
toProcess,
ss.input,
singleSet(string(key)),
nil,
)
ss.bundlesToInject = append(ss.bundlesToInject, rb)
// Bundle is marked in progress here to prevent a race condition.
em.refreshCond.L.Lock()
em.inprogressBundles.insert(rb.BundleID)
em.refreshCond.L.Unlock()

return bundID, true, accumulationDiff
}

// runTriggeredBundle must be called with the stage.mu lock held.
// When in discarding mode, returns 0.
// When in accumulating mode, returns the number of fired elements to maintain a correct pending count.
Comment on lines +1536 to +1537
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to elaborate here, since I had to re-think this through.

Perhaps...

Suggested change
// When in discarding mode, returns 0.
// When in accumulating mode, returns the number of fired elements to maintain a correct pending count.
// Returns the accumulation diff that the pending work needs to be adjusted by, as completed work is subtracted from the pending count.
// When in discarding mode, returns 0, as the pending work already includes these elements.
// When in accumulating mode, returns the number of fired elements, since those elements remain pending even after this bundle is fired.

func (ss *stageState) runTriggeredBundle(em *ElementManager, key []byte, win typex.Window) int {
bundID, ok, accumulationDiff := ss.startTriggeredBundle(key, win, func() string { return "agg-" + em.nextBundID() })

if ok {
rb := RunBundle{StageID: ss.ID, BundleID: bundID, Watermark: ss.input}
ss.bundlesToInject = append(ss.bundlesToInject, rb)
// Bundle is marked in progress here to prevent a race condition.
em.refreshCond.L.Lock()
em.inprogressBundles.insert(rb.BundleID)
em.refreshCond.L.Unlock()
}
return accumulationDiff
}

Expand Down Expand Up @@ -1810,14 +1874,29 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime.
ss.mu.Lock()
defer ss.mu.Unlock()

toProcess, minTs, newKeys, holdsInBundle, stillSchedulable := ss.kind.buildProcessingTimeBundle(ss, em, emNow)

if len(toProcess) == 0 {
// If we have nothing
return "", false, stillSchedulable
}
bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, holdsInBundle)
return bundID, true, stillSchedulable
}

func (*ordinaryStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (toProcess elementHeap, _ mtime.Time, _ set[string], _ map[mtime.Time]int, schedulable bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to drop the named return values, since we aren't using them here. Same for the others. Named returns are more valuable for documentation purposes when the function is exported, or for tricky but useful defer special cases.

Here we can just have a comment in the implementation anyway, when applicable. And avoid the _ names because we aren't using any of the named returns.

slog.Error("ordinary stages can't have processing time elements")
return nil, mtime.MinTimestamp, nil, nil, false
}

func (*statefulStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (toProcess elementHeap, _ mtime.Time, _ set[string], _ map[mtime.Time]int, schedulable bool) {
// TODO: Determine if it's possible and a good idea to treat all EventTime processing as a MinTime
// Special Case for ProcessingTime handling.
// Eg. Always queue EventTime elements at minTime.
// Iterate all available processingTime events until we can't anymore.
//
// Potentially puts too much work on the scheduling thread though.

var toProcess []element
minTs := mtime.MaxTimestamp
holdsInBundle := map[mtime.Time]int{}

Expand Down Expand Up @@ -1869,17 +1948,87 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime.
for _, v := range notYet {
ss.processingTimeTimers.Persist(v.firing, v.timer, notYetHolds)
em.processTimeEvents.Schedule(v.firing, ss.ID)
em.wakeUpAt(v.firing)
}

// Add a refresh if there are still processing time events to process.
stillSchedulable := (nextTime < emNow && nextTime != mtime.MaxTimestamp || len(notYet) > 0)

if len(toProcess) == 0 {
// If we have nothing
return "", false, stillSchedulable
return toProcess, minTs, newKeys, holdsInBundle, stillSchedulable
}

func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (toProcess elementHeap, _ mtime.Time, _ set[string], _ map[mtime.Time]int, schedulable bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're safe in commenting here that "aggegateStageKind" builds processing time Bundles only when there are processing time triggers.

// var toProcess []element
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rm comment

minTs := mtime.MaxTimestamp
holdsInBundle := map[mtime.Time]int{}

var notYet []fireElement

nextTime := ss.processingTimeTimers.Peek()
keyCounts := map[string]int{}
newKeys := set[string]{}

for nextTime <= emNow {
elems := ss.processingTimeTimers.FireAt(nextTime)
for _, e := range elems {
// Check if we're already executing this timer's key.
if ss.inprogressKeys.present(string(e.keyBytes)) {
notYet = append(notYet, fireElement{firing: nextTime, timer: e})
continue
}

// If we are set to have OneKeyPerBundle, and we already have a key for this bundle, we process it later.
if len(keyCounts) > 0 && OneKeyPerBundle {
notYet = append(notYet, fireElement{firing: nextTime, timer: e})
continue
}
// If we are set to have OneElementPerKey, and we already have an element for this key we set this to process later.
if v := keyCounts[string(e.keyBytes)]; v > 0 && OneElementPerKey {
notYet = append(notYet, fireElement{firing: nextTime, timer: e})
continue
}
keyCounts[string(e.keyBytes)]++
newKeys.insert(string(e.keyBytes))
if e.timestamp < minTs {
minTs = e.timestamp
}
// TODO: how to deal with watermark holds for this implicit processing time timer
// holdsInBundle[e.holdTimestamp]++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I'm wrong, I think this is the only difference preventing us from a single shared method between the general stateful handler and the aggregation handler?

(That is, a single method on ss.state that both kinds call)


state := ss.state[LinkID{}][e.window][string(e.keyBytes)]
endOfWindowReached := e.window.MaxTimestamp() < ss.input
ready := ss.strat.IsTriggerReady(triggerInput{
newElementCount: 0,
endOfWindowReached: endOfWindowReached,
emNow: emNow,
}, &state)

if ready {
// We're going to process this trigger!
elems, _ := ss.buildTriggeredBundle(e.keyBytes, e.window)
toProcess = append(toProcess, elems...)
}
}

nextTime = ss.processingTimeTimers.Peek()
if nextTime == mtime.MaxTimestamp {
// Escape the loop if there are no more events.
break
}
}
bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, holdsInBundle)
return bundID, true, stillSchedulable

// Reschedule unfired timers.
notYetHolds := map[mtime.Time]int{}
for _, v := range notYet {
ss.processingTimeTimers.Persist(v.firing, v.timer, notYetHolds)
em.processTimeEvents.Schedule(v.firing, ss.ID)
em.wakeUpAt(v.firing)
}

// Add a refresh if there are still processing time events to process.
stillSchedulable := (nextTime < emNow && nextTime != mtime.MaxTimestamp || len(notYet) > 0)

return toProcess, minTs, newKeys, holdsInBundle, stillSchedulable
}

// makeInProgressBundle is common code to store a set of elements as a bundle in progress.
Expand Down Expand Up @@ -1996,6 +2145,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
// If bigger, advance the input watermark.
if newIn > ss.input {
ss.input = newIn
ss.watermarkAdvanced = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love this toggle, but please name it "inputWatermarkAdvanced" instead.

It also looks like this is mostly just to mark this specific property to get it to the bundleReady call for that bypass. I'd prefer if we can make it explicitly returned out, and passed into bundleReady instead.

stageState contains persistent state, while the "inputWatermark has advanced" is a transient property between the two calls. It feels brittle that it could be reset or forgotten to be set, compared to implicitly reset when we pass it explicitly.

}
// The output starts with the new input as the basis.
newOut := ss.input
Expand Down Expand Up @@ -2154,14 +2304,16 @@ func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.T
// then we can't yet process this stage.
inputW := ss.input
_, upstreamW := ss.UpstreamWatermark()
if inputW == upstreamW {
if inputW == upstreamW && !ss.watermarkAdvanced {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to update the comment just above here explaining why this bypass is needed.

slog.Debug("bundleReady: unchanged upstream watermark",
slog.String("stage", ss.ID),
slog.Group("watermark",
slog.Any("upstream", upstreamW),
slog.Any("input", inputW)))
return mtime.MinTimestamp, false, ptimeEventsReady, injectedReady
}

ss.watermarkAdvanced = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The part I don't love about this here is that bundleReady wasn't doing any mutations to the state previously and now it's effectively bypassing the previous guardrails.

If we have both a good explanation why we need this behavior, and passing tests, then I'm happy with it. We have a what, but "why" is it correct/necessary?

This applies whether we move this property to be something we're passing into the bundleReady call or if it remains a field or not.

ready := true
for _, side := range ss.sides {
pID, ok := em.pcolParents[side.Global]
Expand Down
Loading
Loading