-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-22] Add PushbackDoFnRunner #258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
R: @peihe For an example of use, see here. These PRs split for easier reviewability, plus the linked change requires #249 and a follow-up |
| * Call the underlying {@link DoFnRunner#processElement(WindowedValue)} for the provided element | ||
| * for each window the element is in that is ready. | ||
| */ | ||
| public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the return value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted.
|
Please rebase and place new files into |
261c68c to
7134090
Compare
This SideInputReader allows callers to check for a side input being available before attempting to read the contents
This DoFnRunner wraps a DoFnRunner and provides an additional method to process an element in all the windows where all side inputs are ready, returning any elements that it could not process.
7134090 to
8bddf32
Compare
|
Rebased on top of module change; moved ReadyCheckingSideInputReader and PushbackSideInputDoFnRunner to util. |
| return Collections.emptyList(); | ||
| } | ||
| ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder(); | ||
| for (WindowedValue<InputT> windowElem : elem.explodeWindows()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to explode windows in this class, or can it be separated out?
I suppose the code here won't really change much, since you have to access the contents of the WindowedValue anyhow. But the helper method could be simplified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is required to return only the (value, window)s that could not be processed, instead of The (value, window*)s that contain a window that could not be processed. This lets us do as much work as possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I meant somewhat the opposite. I meant to suggest that you explode values prior to reaching this point. But since the type would still be WindowedValue it wouldn't really buy you much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally it's left up to the DoFnRunner to decide if it should explode windows or not (to not make redundant calls if possible), or at least that's how the existing FnRunners work
a0a9136 to
1cb0b10
Compare
1cb0b10 to
2f57a21
Compare
|
I wanted to be sure to ping @aljoscha on this PR. Not sure if I did before. I think it may be relevant to developments. |
|
LGTM. Will merge. |
|
Thanks @kennknowles, I'll keep it in mind. |
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull requestmvn clean verify. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.
This DoFnRunner wraps an existing DoFnRunner and provides a method,
processElementInReadyWindows, which returns a list of WindowedValues
that could not be processed due to requiring a SideInput that is not
ready.