Skip to content

Conversation

@twosom
Copy link
Contributor

@twosom twosom commented Jun 17, 2025

Please add a meaningful description for your change here

fixes #33633

Problem Statement

The Spark Runner did not support timers due to the following technical challenges:

  1. Timer Triggering Difficulties: UnboundedInOutIterator objects are periodically created and destroyed at each Spark Streaming batch interval, rendering the internal while (true) loop ineffective for timer management.

  2. Sparse Key Issue: As a consequence of the above limitation, sparse keys lose their timer triggering capability once their corresponding UnboundedInOutIterator objects are destroyed, leaving no mechanism to fire pending timers.


This PR resolves the above issues to enable Processing Time Timer functionality.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@twosom twosom force-pushed the spark-runner-processing-timer branch from ac8f7a6 to d23948a Compare June 24, 2025 03:15
@twosom twosom force-pushed the spark-runner-processing-timer branch from 50d800d to 86c67ab Compare June 24, 2025 11:43
@twosom
Copy link
Contributor Author

twosom commented Jun 27, 2025

assign set of reviewers

@github-actions
Copy link
Contributor

Assigning reviewers:

R: @damccorm added as fallback since no labels match configuration

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

@damccorm damccorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting this together

.isBefore(sparkTimerInternals.currentInputWatermarkTime()))
.isBefore(
timer.getDomain().equals(TimeDomain.PROCESSING_TIME)
? sparkTimerInternals.currentProcessingTime()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this safe to do? I'd expect us to try to fire processing time timers (if unfired) before clearing them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@damccorm

You're absolutely right! I've added TimerUtils.triggerExpiredTimers to fire expired processing time timers before clearing them. This ensures we don't lose any timer callbacks that should have been executed.

Please take a look at the implementation.

@twosom
Copy link
Contributor Author

twosom commented Jun 30, 2025

@damccorm Thanks for the feedback!
I'll fix it!

@github-actions
Copy link
Contributor

github-actions bot commented Jul 8, 2025

Reminder, please take a look at this pr: @damccorm

@damccorm
Copy link
Contributor

damccorm commented Jul 9, 2025

waiting on author

@twosom
Copy link
Contributor Author

twosom commented Jul 9, 2025

@damccorm
Sorry for the delay in addressing your feedback. I've now implemented the changes you suggested. Thank you for your patience!

@github-actions
Copy link
Contributor

Reminder, please take a look at this pr: @damccorm

Copy link
Contributor

@damccorm damccorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@damccorm damccorm merged commit 6812836 into apache:master Jul 17, 2025
17 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature Request]: Support ProcessingTime Timer in Spark Runner

2 participants