Skip to content

Conversation

@shunping
Copy link
Collaborator

@shunping shunping commented Sep 30, 2025

This PR is a follow-up to #36126. The majority of this PR has been reviewed in #36069, and I've added comments to the code to indicate where the feedback has been addressed.

Part of the work to support AfterProcessingTime trigger (#31438)

// This is used for processing time timers to ensure the loop re-evaluates
// stages when a processing time timer is expected to fire.
func (em *ElementManager) wakeUpAt(t mtime.Time) {
if em.testStreamHandler == nil && em.config.EnableRTC {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed https://github.com/apache/beam/pull/36069/files#r2342213066.

We should also only be creating this Goroutine when the Real Time Clock is being used, and testStreamHandler is nil. Otherwise behavior will be additionally wonky.

})
}

func (*ordinaryStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string], map[mtime.Time]int, bool) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed https://github.com/apache/beam/pull/36069/files#r2342012446

Feel free to drop the named return values, since we aren't using them here...

slog.Debug("started a processing time bundle", "stageID", ss.ID, "bundleID", bundID, "size", len(toProcess), "emNow", emNow)
return bundID, true, stillSchedulable
func (*statefulStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string], map[mtime.Time]int, bool) {
return handleProcessingTimeTimer(ss, em, emNow, func(e element, toProcess []element, holdsInBundle map[mtime.Time]int) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed https://github.com/apache/beam/pull/36069/files#r2342029890

Correct me if I'm wrong, I think this is the only difference preventing us from a single shared method between the general stateful handler and the aggregation handler?
(That is, a single method on ss.state that both kinds call)

@shunping shunping force-pushed the prism-after-processing-time-2 branch from fe01a3d to b3cd953 Compare October 1, 2025 02:22
@shunping shunping marked this pull request as ready for review October 1, 2025 18:50
@shunping shunping requested a review from lostluck October 1, 2025 18:50
@shunping shunping self-assigned this Oct 1, 2025
@github-actions
Copy link
Contributor

github-actions bot commented Oct 1, 2025

Assigning reviewers:

R: @jrmccluskey for label go.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you leaving non-Go tests to another PR?

Are there any Java conformance tests that start passing now that this is in?

@shunping
Copy link
Collaborator Author

shunping commented Oct 1, 2025

Are you leaving non-Go tests to another PR?

Are there any Java conformance tests that start passing now that this is in?

Yes. I tried some commented Java tests and it seems that they need AfterSynchronizedProcessingTime trigger to run.

For Python ones, we will need to clear up some tests later.

@lostluck
Copy link
Contributor

lostluck commented Oct 1, 2025

Well, the goodnews is I'm not sure there's any "work" to be done here for AfterSynchronizedProcessingTime, since it depends on all the workers to have caught up to the first element's arrival.

https://beam.apache.org/releases/javadoc/2.68.0/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.html

But in principle, prism is always just a single worker (there can be multiple SDK stacks, but they're not workers per-se), but the "notion" of processing time across workers is probably a no-op in the current generation of Prism as a single Node local runner.

@shunping
Copy link
Collaborator Author

shunping commented Oct 1, 2025

Well, the goodnews is I'm not sure there's any "work" to be done here for AfterSynchronizedProcessingTime, since it depends on all the workers to have caught up to the first element's arrival.

https://beam.apache.org/releases/javadoc/2.68.0/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.html

But in principle, prism is always just a single worker (there can be multiple SDK stacks, but they're not workers per-se), but the "notion" of processing time across workers is probably a no-op in the current generation of Prism as a single Node local runner.

So in the current setting of prism, it sounds like the AfterSynchronizedProcessingTime is a AftetCount(1)? I see it is a subclass of OnceTrigger in the Java doc, so it is only fired once.

@lostluck
Copy link
Contributor

lostluck commented Oct 2, 2025

I think so yes.

It's a trigger only implemented in Java (and Go i guess...) IIRC and labeled as an implementation detail too. It's just unfortunately externalized too much so the internality isn't maintained well.

Beam's cross worker communication doesn't report a machine's local wall time, so at present there's no way to do this. To implement it properly requires the runner to manage its own machines/worker side communications better, like how Dataflow has runnerv2 for worker work acquisition.

@shunping shunping force-pushed the prism-after-processing-time-2 branch from 8c0e5ca to d6ccb90 Compare October 2, 2025 04:55
@shunping
Copy link
Collaborator Author

shunping commented Oct 2, 2025

Thanks!

The fact that some test pipelines are stuck if there is no watermark advancing is truly a problem.

I am looking into another approach to solve that. Basically, we will change bundleReady so for ordinary stages, it won't wait for the watermark advancing to process (I marked the change I made in the code).

PTAL. I am running tests to see if the idea works. If it works, we will no longer need TestStream.advanceWatermark calls in our processing-time trigger tests.

// ss.bundlesToInject = append(ss.bundlesToInject, rb)
// Bundle is marked in progress here to prevent a race condition.
em.refreshCond.L.Lock()
em.injectedBundles = append(em.injectedBundles, rb)
Copy link
Collaborator Author

@shunping shunping Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use em.injectedBundles here, because the stage-wise bundlesToInject is causing the pipeline stuck. This needs to be consolidated in the future PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If using this mechanism also add "em.inprogressBundles.insert(rb.BundleID)" to avoid "losing" the bundle. injectedBundles are considered "inprogress" even when they aren't yet executing. This is to avoid incorrect premature terminations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very odd that there's a stuckness going on with bundlesToInject. Not sure why that would be offhand.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If using this mechanism also add "em.inprogressBundles.insert(rb.BundleID)" to avoid "losing" the bundle. injectedBundles are considered "inprogress" even when they aren't yet executing. This is to avoid incorrect premature terminations.

Yes, the line you mentioned is already there.

if isOrdinaryStage && len(ss.sides) == 0 {
// For ordinary stage with no side inputs, we use whether there are pending elements to determine
// whether a bundle is ready or not.
if len(ss.pending) == 0 {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use watermark to determine whether to create a bundle for aggregation and stateful stages, but not stateless stages without sideinputs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codecov
Copy link

codecov bot commented Oct 2, 2025

Codecov Report

❌ Patch coverage is 49.03226% with 79 lines in your changes missing coverage. Please review.
✅ Project coverage is 56.84%. Comparing base (9cd994e) to head (28b2caf).
⚠️ Report is 27 commits behind head on master.

Files with missing lines Patch % Lines
...am/runners/prism/internal/engine/elementmanager.go 45.29% 60 Missing and 4 partials ⚠️
...pkg/beam/runners/prism/internal/engine/strategy.go 62.16% 13 Missing and 1 partial ⚠️
...m/runners/prism/internal/jobservices/management.go 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #36333      +/-   ##
============================================
- Coverage     56.84%   56.84%   -0.01%     
  Complexity     3386     3386              
============================================
  Files          1220     1220              
  Lines        185898   186056     +158     
  Branches       3523     3523              
============================================
+ Hits         105672   105757      +85     
- Misses        76885    76942      +57     
- Partials       3341     3357      +16     
Flag Coverage Δ
go 28.43% <49.03%> (+0.06%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@github-actions github-actions bot added java and removed java labels Oct 2, 2025
@shunping shunping force-pushed the prism-after-processing-time-2 branch from 923dea8 to 28b2caf Compare October 2, 2025 15:40
Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for all the detailed work!

// ss.bundlesToInject = append(ss.bundlesToInject, rb)
// Bundle is marked in progress here to prevent a race condition.
em.refreshCond.L.Lock()
em.injectedBundles = append(em.injectedBundles, rb)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If using this mechanism also add "em.inprogressBundles.insert(rb.BundleID)" to avoid "losing" the bundle. injectedBundles are considered "inprogress" even when they aren't yet executing. This is to avoid incorrect premature terminations.

// ss.bundlesToInject = append(ss.bundlesToInject, rb)
// Bundle is marked in progress here to prevent a race condition.
em.refreshCond.L.Lock()
em.injectedBundles = append(em.injectedBundles, rb)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very odd that there's a stuckness going on with bundlesToInject. Not sure why that would be offhand.

// TriggerAfterProcessingTimeNotTriggered tests the AfterProcessingTime Trigger. It won't fire because 't' processing time is not reached
// Not yet supported by the flink runner:
// java.lang.UnsupportedOperationException: Advancing Processing time is not supported by the Flink Runner.
func TriggerAfterProcessingTimeNotTriggered(s beam.Scope) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add a Test wrapper to

https://github.com/apache/beam/blob/3903623850dcad24e4a02f1eb7f81e13d4ea4d54/sdks/go/test/integration/primitives/windowinto_test.go

So this can get executed in the other runner suites, even if it's currently getting filtered out, it's important to not "miss" it. That's also where the "test name" for it gets added so it's visible in the lists at

https://github.com/apache/beam/blob/master/sdks/go/test/integration/integration.go#L166

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we skip the trigger tests in prism?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was easier/preferred to add them to the prism test suite (in the no longer correctly named unimplemented_test.go) than to unblock this. I think we can probably remove this skip soon, after your work here is done.

Something something re-org.

@shunping shunping merged commit 3534960 into apache:master Oct 2, 2025
28 of 29 checks passed
@damccorm damccorm mentioned this pull request Oct 3, 2025
3 tasks
@damccorm
Copy link
Contributor

damccorm commented Oct 3, 2025

FYI this PR is causing ML tests to start failing - https://github.com/apache/beam/actions/workflows/beam_PreCommit_Python_ML.yml?query=branch%3Amaster

The test is wrong, and I have #36377 to fix it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants