-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-22] Allow InProcess Evaluators to check Side Input completion #220
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This checks to ensure that the PCollectionView in the SideInputWindow for the provided window either has elements available or is empty. Schedule a future to ensure that the SideInputWindows are appropriately filled with an empty iterable after retreiving the element.
ffc47d7 to
ab26b17
Compare
| for (PCollectionView<?> view : readerViews) { | ||
| try { | ||
| BoundedWindow viewWindow = | ||
| view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(elementWindow); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel the conversion from the main input window to the side input window doesn't belong to the side input reader.
allViewsReadyInWindow(mainWindow) is not consistent with get(...) method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isReady(view, sideInputWindow) is symmetric with get(view, sideInputWindow). Changed to that.
| try { | ||
| Future<Iterable<? extends WindowedValue<?>>> viewContents = | ||
| getViewFuture(view, window); | ||
| if (!viewContents.isDone()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return viewContents.isDone();
and remove return true at the very end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| * contents if necessary. | ||
| */ | ||
| private <T> Future<Iterable<? extends WindowedValue<?>>> getViewFuture( | ||
| final PCollectionView<T> view, final BoundedWindow window) throws ExecutionException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you wrap ExecutionException into a RuntimeException here, you can avoid to do it twice outside.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using getUnchecked - our CacheLoader will never throw an exception.
| + "Contained views; %s", | ||
| view, | ||
| readerViews); | ||
| Future<Iterable<? extends WindowedValue<?>>> viewContents = getViewFuture(view, window); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inline?
return getViewFuture(view, window).isDone();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
LGTM |
|
@kennknowles for committer |
| } | ||
|
|
||
| @Test | ||
| public void isReadyForSomeNotReadyViewsFalseUntilElements() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadoc this a little bit so one has context when parsing the test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
LGTM, merging. |
Co-authored-by: Tres Seaver <tseaver@palladion.com> Co-authored-by: Christopher Wilcox <crwilcox@google.com>
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull requestmvn clean verify. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.
This checks to ensure that the PCollectionView in the SideInputWindow
for the provided window either has elements available or is empty.
Schedule a future to ensure that the SideInputWindows are appropriately
filled with an empty iterable after retreiving the element.
This is allows the ParDoEvaluator to not attempt to process elements
that cannot currently be completed.