Skip to content

Conversation

@shunping
Copy link
Collaborator

@shunping shunping commented Sep 5, 2025

The core of this PR is the implementation of the AfterProcessingTime trigger. Triggered bundles can now be generated in two ways:

  • Upon insertion of new pending elements into an aggregate stage (existing).
  • When the processing-time timer associated with the trigger fires (new).

In both cases, a check is performed to determine if the stage's overall trigger should fire before a bundle is generated.

This PR also resolves two issues that were identified during development:

  • Wake-up mechanism for refreshCond so processing-time events won't be blocked if there is no event-time events.
  • A correction for an over-conservative condition that determines when an event-time bundle is ready.

addressed one of the tasks in #31438


There are also two issues that need to be addressed in follow-up PRs:

  • To fix the watermarkhold TODOs I left
  • Fix the extra ON_TIME window panes for non-afterWaterMark triggers. This is currently one of the discrepancies between Prism and DataflowRunner. We will need to fix that to re-enable more trigger tests for prism.

@shunping shunping force-pushed the prism-after-processing-time-trigger branch from 28cc7fa to a252d92 Compare September 5, 2025 19:06
Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a first pass for the front end, this is a good start.

As discussed offline, the fun part will be making sure that events for triggers/keys etc are put into the Element manager's awareness for ProcessingTime handling, so both the eventual real time clock, and the synthetic TestStream clock will trigger these properly.

Don't forget there are some Java tests that might exercise this trigger to some degree too.


// TriggerAfterProcessingTime fires once after a specified amount of processing time
// has passed since an element was first seen.
// Uses the extra state field to track if the processing time of the first element.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Uses the extra state field to track if the processing time of the first element.
// Uses the extra state field to track the processing time of the first element.

@codecov
Copy link

codecov bot commented Sep 8, 2025

Codecov Report

❌ Patch coverage is 67.21992% with 79 lines in your changes missing coverage. Please review.
✅ Project coverage is 56.76%. Comparing base (e72d9c7) to head (fcac8c8).
⚠️ Report is 64 commits behind head on master.

Files with missing lines Patch % Lines
...am/runners/prism/internal/engine/elementmanager.go 59.25% 50 Missing and 5 partials ⚠️
...pkg/beam/runners/prism/internal/engine/strategy.go 77.64% 17 Missing and 2 partials ⚠️
sdks/go/pkg/beam/runners/prism/internal/execute.go 80.00% 3 Missing and 1 partial ⚠️
...m/runners/prism/internal/jobservices/management.go 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #36069      +/-   ##
============================================
- Coverage     56.77%   56.76%   -0.01%     
  Complexity     3383     3383              
============================================
  Files          1220     1220              
  Lines        184968   185226     +258     
  Branches       3508     3508              
============================================
+ Hits         105015   105148     +133     
- Misses        76625    76742     +117     
- Partials       3328     3336       +8     
Flag Coverage Δ
go 28.29% <67.21%> (+0.07%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@shunping shunping marked this pull request as ready for review September 10, 2025 04:38
@shunping shunping changed the title [WIP][Prism]Support after_processing_time trigger [Prism]Support after_processing_time trigger Sep 10, 2025
@github-actions
Copy link
Contributor

Assigning reviewers:

R: @jrmccluskey for label go.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly requesting improving/updating comments where applicable. Apologies if some things are repetitive.

Comment on lines +1353 to +1354
em.processTimeEvents.Schedule(firingTime, ss.ID)
em.wakeUpAt(firingTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend just putting the em.processTime.Schedule call inside the wakeUpAt call, and replacing all the existing calls. Perhaps a better name: scheduleProcessingTimeEventForStage

Avoids issues where one is called but the other is not.


On the other hand: If we keep them separate we have a better chance of reducing the number of spare goroutines/timers we're keeping around. Eg. On such a firing, we see when the next processing time event is, and re-schedule it. That feels like a loop we can build and initialize as part of the Bundles set up... I'll give it some more thought.

return toProcess, accumulationDiff
}

func (ss *stageState) startTriggeredBundle(key []byte, win typex.Window, genBundID func() string) (string, bool, int) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is mutating the stage state, please also document that we need the stage.mu lock held. Same for buildTriggeredBundle above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, since this is ultimately only called by runTriggeredBundle, we should just move it all into there instead. I can be convinced otherwise.

Comment on lines +1536 to +1537
// When in discarding mode, returns 0.
// When in accumulating mode, returns the number of fired elements to maintain a correct pending count.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to elaborate here, since I had to re-think this through.

Perhaps...

Suggested change
// When in discarding mode, returns 0.
// When in accumulating mode, returns the number of fired elements to maintain a correct pending count.
// Returns the accumulation diff that the pending work needs to be adjusted by, as completed work is subtracted from the pending count.
// When in discarding mode, returns 0, as the pending work already includes these elements.
// When in accumulating mode, returns the number of fired elements, since those elements remain pending even after this bundle is fired.

toProcess, accumulationDiff := ss.buildTriggeredBundle(key, win)

if len(toProcess) == 0 {
return "", false, accumulationDiff
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should accumulationDiff ever be non-zero when a bundle isn't going to be run?

return toProcess, accumulationDiff
}

func (ss *stageState) startTriggeredBundle(key []byte, win typex.Window, genBundID func() string) (string, bool, int) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also document what the returns are please:

...returns the bundleID, whether the bundle is OK, and any necessary adjustments to the pending count due to accumulation.

minTs = e.timestamp
}
// TODO: how to deal with watermark holds for this implicit processing time timer
// holdsInBundle[e.holdTimestamp]++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I'm wrong, I think this is the only difference preventing us from a single shared method between the general stateful handler and the aggregation handler?

(That is, a single method on ss.state that both kinds call)

inputW := ss.input
_, upstreamW := ss.UpstreamWatermark()
if inputW == upstreamW {
if inputW == upstreamW && !ss.watermarkAdvanced {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to update the comment just above here explaining why this bypass is needed.

return mtime.MinTimestamp, false, ptimeEventsReady, injectedReady
}

ss.watermarkAdvanced = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The part I don't love about this here is that bundleReady wasn't doing any mutations to the state previously and now it's effectively bypassing the previous guardrails.

If we have both a good explanation why we need this behavior, and passing tests, then I'm happy with it. We have a what, but "why" is it correct/necessary?

This applies whether we move this property to be something we're passing into the bundleReady call or if it remains a field or not.

// If bigger, advance the input watermark.
if newIn > ss.input {
ss.input = newIn
ss.watermarkAdvanced = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love this toggle, but please name it "inputWatermarkAdvanced" instead.

It also looks like this is mostly just to mark this specific property to get it to the bundleReady call for that bypass. I'd prefer if we can make it explicitly returned out, and passed into bundleReady instead.

stageState contains persistent state, while the "inputWatermark has advanced" is a transient property between the two calls. It feels brittle that it could be reset or forgotten to be set, compared to implicitly reset when we pass it explicitly.


processingTimeTimers *timerHandler

watermarkAdvanced bool // whether the watermark for this stage has advanced
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented elsewhere, but what this is whether the input watermark has advanced or not since the last bundle. Previously we just needed the output watermark time.

@lostluck
Copy link
Contributor

I'l note that with the 'wakeup' being added, a later PR can also sort out correctly implementing the Streaming Splittable DoFn delay for the ProcessContinuations. IIRC, prism currently just schedules the bundle immeadiately.

Comment on lines +1088 to +1092
go func(fireAt time.Time) {
time.AfterFunc(time.Until(fireAt), func() {
em.refreshCond.Broadcast()
})
}(t.ToTime())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also only be creating this Goroutine when the Real Time Clock is being used, and testStreamHandler is nil. Otherwise behavior will be additionally wonky.

eg.

if em.testStreamHandler != nil && !em.testStreamHandler.completed {

I recommend moving this clock function down there too, as they are related.

@shunping
Copy link
Collaborator Author

As always, thank you so much your the review and detailed feedback! Given there are a few pieces in this PR: some pretty much standard (plumbing the trigger data from proto and defining trigger callbacks), while some can use some more thoughts (watermark advanced flag or the watermark hold) and may be handle in independent PRs. I would suggest splitting this PRs into different tasks, so it will make development, testing, and reviewing easy. WDYT?

@lostluck
Copy link
Contributor

As always, thank you so much your the review and detailed feedback! Given there are a few pieces in this PR: some pretty much standard (plumbing the trigger data from proto and defining trigger callbacks), while some can use some more thoughts (watermark advanced flag or the watermark hold) and may be handle in independent PRs. I would suggest splitting this PRs into different tasks, so it will make development, testing, and reviewing easy. WDYT?

SGTM. A focus and some additional unit tests go a long way. Let me know when you want another look.

@shunping
Copy link
Collaborator Author

Closing this PR as its code change would be split into smaller ones later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants