Skip to content

Conversation

@shunping
Copy link
Collaborator

@shunping shunping commented Oct 3, 2025

A follow-up PR to #36126 and #36333.

This completes one of remaining tasks related to triggers and panes (in #31438 (comment)):

No support for ProcessingTime and SynchronizedProcessingTime triggers.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @shunping, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly advances the Apache Beam Prism runner by introducing foundational support for the AfterSynchronizedProcessingTime trigger. It also enables a range of previously failing or excluded Java processing-time trigger tests, demonstrating increased robustness in handling time-based windowing and triggering mechanisms. These updates are crucial for bringing Prism closer to feature parity with other Beam runners and addressing long-standing issues related to trigger capabilities.

Highlights

  • AfterSynchronizedProcessingTime Trigger Support: The Prism Go runner now includes an implementation for the AfterSynchronizedProcessingTime trigger. While it currently operates as a no-op in the single-node Prism architecture, firing upon data reception, its presence marks a significant step towards broader trigger support.
  • Processing Time Trigger Test Enablement: Several Java processing-time trigger tests, previously marked as sickbayTests or excluded by category, have been re-enabled. This indicates improved stability and functionality of processing-time triggers within the Prism runner, allowing more comprehensive testing.
  • Refined Trigger Handling Logic: The internal logic for identifying and building triggers has been enhanced. The hasUnsupportedTriggers function now correctly recognizes AfterSynchronizedProcessingTime as supported, and the buildTrigger function properly instantiates it, improving the runner's overall trigger management.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@shunping shunping self-assigned this Oct 3, 2025
@github-actions
Copy link
Contributor

github-actions bot commented Oct 3, 2025

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@shunping
Copy link
Collaborator Author

shunping commented Oct 3, 2025

R: @lostluck

@github-actions
Copy link
Contributor

github-actions bot commented Oct 3, 2025

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

// Teststream has bad KV encodings due to using an outer context.
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode',
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testBufferingTimerInFixedWindow',
// sdk worker disconnected
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Means the Java worker side crashed... Historically that's been a coder issue. You can usually find the failure output in the gradle scan pages (there's both stack traces and StdErr output there) Agreed not to worry about it for this PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, the good thing is that we now include 20+ more processing-time related tests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we finish supporting all triggers that are defined in the proto. Yay!

@lostluck
Copy link
Contributor

lostluck commented Oct 3, 2025

The test failure is unexpected. Re-running but it's very concerning if the state handling became flakey because of this... Weird.

    state_test.go:37: Failed to execute job: job job-040 failed:
        bundle inst007 stage-008 failed:process bundle failed for instruction inst007 using plan stage-008 : while executing Process for Plan[stage-008]:
        1: DataSource[S[stage-008_source@localhost:57861], i0] Out:2 Coder:W;cwv_n11<bytes;c0>!GWC 
        2: ParDo[passert.failIfBadEntries] Out:[] Sig: func([]uint8, func(*typex.T) bool, func(*typex.T) bool, func(*typex.T) bool) error, SideInputs: [SideInputAdapter[S[e10@localhost:57861], i1] - Coder W<string;c2>!GWC SideInputAdapter[S[e10@localhost:57861], i2] - Coder W<string;c2>!GWC SideInputAdapter[S[e10@localhost:57861], i3] - Coder W<string;c2>!GWC]
        	caused by:
        source failed processing data
        	caused by:
        DoFn[UID:2, PID:e10, Name: github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert.failIfBadEntries] failed:
        actual PCollection does not match expected values
        =========
        7 correct entries (present in both)
        =========
        1 unexpected entries (present in actual, missing in expected)
        +++
        apple: 0,false
        =========
        1 missing entries (missing in actual, present in expected)
        ---
        apple: 1,true
--- FAIL: TestValueStateClear (12.12s)

@shunping
Copy link
Collaborator Author

shunping commented Oct 3, 2025

The test failure is unexpected. Re-running but it's very concerning if the state handling became flakey because of this... Weird.


    state_test.go:37: Failed to execute job: job job-040 failed:

        bundle inst007 stage-008 failed:process bundle failed for instruction inst007 using plan stage-008 : while executing Process for Plan[stage-008]:

        1: DataSource[S[stage-008_source@localhost:57861], i0] Out:2 Coder:W;cwv_n11<bytes;c0>!GWC 

        2: ParDo[passert.failIfBadEntries] Out:[] Sig: func([]uint8, func(*typex.T) bool, func(*typex.T) bool, func(*typex.T) bool) error, SideInputs: [SideInputAdapter[S[e10@localhost:57861], i1] - Coder W<string;c2>!GWC SideInputAdapter[S[e10@localhost:57861], i2] - Coder W<string;c2>!GWC SideInputAdapter[S[e10@localhost:57861], i3] - Coder W<string;c2>!GWC]

        	caused by:

        source failed processing data

        	caused by:

        DoFn[UID:2, PID:e10, Name: github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert.failIfBadEntries] failed:

        actual PCollection does not match expected values

        =========

        7 correct entries (present in both)

        =========

        1 unexpected entries (present in actual, missing in expected)

        +++

        apple: 0,false

        =========

        1 missing entries (missing in actual, present in expected)

        ---

        apple: 1,true

--- FAIL: TestValueStateClear (12.12s)

Right. Not sure why it affects states. I would run the test locally to see if I can catch any race condition.

@shunping
Copy link
Collaborator Author

shunping commented Oct 3, 2025

The test failure is unexpected. Re-running but it's very concerning if the state handling became flakey because of this... Weird.


    state_test.go:37: Failed to execute job: job job-040 failed:

        bundle inst007 stage-008 failed:process bundle failed for instruction inst007 using plan stage-008 : while executing Process for Plan[stage-008]:

        1: DataSource[S[stage-008_source@localhost:57861], i0] Out:2 Coder:W;cwv_n11<bytes;c0>!GWC 

        2: ParDo[passert.failIfBadEntries] Out:[] Sig: func([]uint8, func(*typex.T) bool, func(*typex.T) bool, func(*typex.T) bool) error, SideInputs: [SideInputAdapter[S[e10@localhost:57861], i1] - Coder W<string;c2>!GWC SideInputAdapter[S[e10@localhost:57861], i2] - Coder W<string;c2>!GWC SideInputAdapter[S[e10@localhost:57861], i3] - Coder W<string;c2>!GWC]

        	caused by:

        source failed processing data

        	caused by:

        DoFn[UID:2, PID:e10, Name: github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert.failIfBadEntries] failed:

        actual PCollection does not match expected values

        =========

        7 correct entries (present in both)

        =========

        1 unexpected entries (present in actual, missing in expected)

        +++

        apple: 0,false

        =========

        1 missing entries (missing in actual, present in expected)

        ---

        apple: 1,true

--- FAIL: TestValueStateClear (12.12s)

Right. Not sure why it affects states. I would run the test locally to see if I can catch any race condition.

I ran it for 10k times and could not see a single failure.

@shunping shunping merged commit 586cb11 into apache:master Oct 3, 2025
22 of 24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants