Skip to content

Conversation

@shunping
Copy link
Collaborator

@shunping shunping commented Sep 12, 2025

Construct after-processing-time trigger from proto and define trigger callbacks.

This is part of the original PR #36069, but with more tests and some changes to handle trigger reset.

Part of the work to support AfterProcessingTime trigger (#31438)

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @shunping, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the Prism runner's capabilities by introducing support for 'after-processing-time' triggers. This allows pipelines to define trigger conditions based on the actual processing time within the runner, providing more flexible and powerful control over when windows are emitted or computations are performed. The changes involve defining the new trigger type, integrating it with the existing trigger input mechanism, and enabling its construction from protobuf definitions.

Highlights

  • New Trigger Type: Introduced TriggerAfterProcessingTime to support triggers that fire after a specified amount of processing time has passed since an element was first seen. This includes mechanisms for delays and alignment to periods with offsets.
  • Processing Time Tracking: The triggerInput struct now includes emNow to track the current processing time, which is crucial for the new AfterProcessingTime trigger's logic.
  • Timestamp Transformation Logic: Implemented TimestampTransform and its application within TriggerAfterProcessingTime to handle complex processing time calculations, including delays and alignment to specific time periods and offsets.
  • Protobuf Integration: Updated the buildTrigger function to correctly parse and construct TriggerAfterProcessingTime instances from the corresponding protobuf messages, enabling the runner to interpret these new trigger definitions.
  • Comprehensive Testing: Added extensive unit tests for TriggerAfterProcessingTime covering various scenarios such as exact delays, late firings, different alignment configurations, and chained transformations, ensuring robustness and correctness.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions
Copy link
Contributor

Assigning reviewers:

R: @lostluck for label go.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@shunping shunping force-pushed the prism-after-processing-trigger-1 branch from d772ece to 1670089 Compare September 12, 2025 02:27
type afterProcessingTimeState struct {
emNow mtime.Time
firingTime mtime.Time
endOfWindowReached bool
Copy link
Collaborator Author

@shunping shunping Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we can pass triggerInput as an argument in shouldFire() and reset(). Then we don't need to put emNow and endOfWindowReached inside ts.extra.

I am fine with both approaches, but would also want to see if you have any opinion on that. @lostluck

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's handy to have emNow in state, since it helps track the change happening. Feels consistent. That's partly why there isn't the triggerInput on those methods presently: it's harder to reason about the state machine, if the state isn't in the machine.

@shunping shunping changed the title [Prism] Support after-process-time triggers - part 1 [Prism] Support AfterProcessingTime triggers - part 1 Sep 12, 2025
@shunping shunping requested a review from lostluck September 12, 2025 16:57
@lostluck
Copy link
Contributor

Sorry, this fell off my radar. Looking again today.

@shunping shunping marked this pull request as draft September 18, 2025 18:30
@shunping shunping marked this pull request as ready for review September 18, 2025 18:30
@shunping
Copy link
Collaborator Author

gentle ping here @lostluck

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lostluck
Copy link
Contributor

I don't think I got an email for the emoji response to my prior comment, or the move away from draft... Thank you for your patience.

@shunping
Copy link
Collaborator Author

Because the actual trigger is not supported yet. Need one or two more PRs :)

@shunping shunping merged commit d5059c3 into apache:master Sep 24, 2025
10 checks passed
@shunping shunping deleted the prism-after-processing-trigger-1 branch September 26, 2025 01:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants