-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-22] Schedule roots less aggressively #64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The excess scheduling of known-empty bundles can consume excessive resources.
|
R: @bjchambers |
| } | ||
| fireTimers(); | ||
| mightNeedMoreWork(); | ||
| if (!fireTimers()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment here, since it's not immediately clear what "fireTimers" returns. Eg:
if (!fireTimers()) {
// If any timers fired, then we may have more work.
...
}
(Or is it if there are unfired timers? etc.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Add documentation around completion and maybe adding more work. Refactor mightNeedMoreWork to helper function the "can't make progress" check Rename mightNeedMoreWork -> addWorkIfNecessary
Comment on why we might decide that work can still be done. TODO: test many many times
| mightNeedMoreWork(); | ||
| if (!fireTimers()) { | ||
| // If any timers have fired, they will add more work; We don't need to add more | ||
| addWorkIfNecessary(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that I think about this, I'd propose:
boolean timersFired = fireTimers();
addWorkIfNecessary(timersFired);
This replaces the comment on what fireTimers() returns by assigning a value, and it moves all the logic on when to fire timers into addWorkIfNecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Slightly cleaner method calls
add tests for isDone. containsUnboundedPCollection should look at outputs rather than inputs
| if (!watermarkManager | ||
| .getWatermarks(transform) | ||
| .getOutputWatermark() | ||
| .equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to risk of NPE, its generally better to do:
CONSTANT.equals(expression)
rather than
expression.equals(CONSTANT)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched this to isBefore, which is much easier to read in this order.
NPEs should never occur from the watermark manager, as all of the watermarks at creation time. Switching the order decreases readability and also isn't actually particularly good protection against NPEs in this context.
Improve isDone implementation
|
LGTM. |
The excess scheduling of known-empty bundles can consume excessive
resources, especially with the default CachedThreadPool executor service.
Removes an unnecessary synchronized block (the map is already
thread-safe)