-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-22] Return a map of CommittedBundle to Consumers from handleResult #249
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-22] Return a map of CommittedBundle to Consumers from handleResult #249
Conversation
|
R: @kennknowles This is to ensure that elements don't get duplicated if, as part of #220 (and additional follow-up) a PTransform fails to process an element (due to some input not being ready) - it must schedule that element to be processed, but should not schedule it for any other consumer. |
3d07dfe to
ee7e8b5
Compare
| */ | ||
| private static class ExecutorUpdate { | ||
| private final Optional<? extends CommittedBundle<?>> bundle; | ||
| private final Optional<? extends AppliedPTransform<?, ?, ?>> consumer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment here about the invariant that this is either/or where two things are null together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
This allows the executor to be ignorant of the mapping from PValue to Consumers, as well as allowing the TransformExecutor to pass bundles that should only be consumed by specific PTransforms. This can occur if a transform is incapable of processing a bundle.
ee7e8b5 to
ad381f7
Compare
| // filter them out | ||
| if (!Iterables.isEmpty(committed.getElements())) { | ||
| completed.add(committed); | ||
| outputs.put(committed, valueToConsumers.get(committed.getPCollection())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since committed carries with it all the information needed for a lookup in valueToConsumers, perhaps you don't need to convert the iterable to a map so early? Can it be deferred just to the moment you add them as pending?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes in the immediate follow-up to this CL, which allows a TransformEvaluator to return some elements as "unprocessed". Those elements are added to the result map here, but should only be consumed by the producing transform
Ex: 7da8e1a
|
Your intention is to resolve conflicts and overlay this on the new |
|
This can be closed with the merge of #260, which replaces it. |
…-guava [thirdparty-guava] use shaded jar with shadow classifier
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull requestmvn clean verify. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.
This allows the executor to be ignorant of the mapping from PValue to
Consumers, as well as allowing the TransformExecutor to pass bundles
that should only be consumed by specific PTransforms. This can occur if
a transform is incapable of processing a bundle.