Skip to content

Conversation

@kennknowles
Copy link
Member

As per the Runner API design this makes GroupByKey very explicitly a primitive, and moves the subsidiary primitives to top level classes in the util/ (aka miscellaneous) directory, eventually to move to some appropriate final location for "runner utilities".

@kennknowles
Copy link
Member Author

R: @amitsela there's some incidental import reordering in the Spark code. I used Eclipse, which sorted the imports, actually in a way that broke the Spark runner's checkstyle. I didn't fix it up, only because checkstyle let me get by with things the way they are... :-)
R: @mxm the changes to the Flink runner are, I hope, reasonably uninteresting :-)
R: @dhalperi please review from the Dataflow side of things.

If anyone would like to me to undo the automatic whitespace smooshing my IDE did, I am happy to.

And, of course, everyone should feel free to comment on the overall change. There should be no observable behavioral change. I did rely somewhat on unit tests to catch anything egregious, so if there is a lack of coverage I could have missed an issue.

@kennknowles kennknowles changed the title [BEAM-] Move GroupByKey expansion into DirectPipelineRunner [BEAM-115] Move GroupByKey expansion into DirectPipelineRunner Mar 25, 2016
@davorbonaci
Copy link
Member

@lukecwik and @tgroh might be interested too.

EVALUATORS.put(ParDo.Bound.class, parDo());
EVALUATORS.put(ParDo.BoundMulti.class, multiDo());
EVALUATORS.put(GroupByKey.GroupByKeyOnly.class, gbk());
EVALUATORS.put(GroupByKeyOnly.class, gbk());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this also removes the default GroupByKey expansion, will this work without adding a runner-specific expansion?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I didn't catch that this runner does use the expansion. I will move some bits around (more things have to be public). I will still use util/ as a temporary holding pen for the pieces for now, I think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}

private static class GroupByKeyOnlyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey.GroupByKeyOnly<K, V>> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this, but I admit to not being sure what the intent is. There is a translator for the whole GroupByKey so this should have been dead code. On the other hand, the translator translates GroupByKey to GroupByKeyOnly so perhaps it would be better to use the expanded form, like Spark and the DirectPipelineRunner.

LMK

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GroupByKey expands not only into GroupByKeyOnly but also does the Windowing and timestamp assignment. In early Dataflow versions, this used to be different. When the changes came, we introduced an additional translator to skip the Window assignment. I would leave it as it is for now and do an immediate follow-up pull request where we get rid of this artifact. IMHO the GroupByKeyOnly translator should stay and GroupByKey should be removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, had a second look. GroupByKey has been removed. Should be good to merge as-it-is then.

@kennknowles
Copy link
Member Author

Please take another look. When merged, the commit title can be changed to reflect the new structure: the expansion is just moved to the side.

@kennknowles kennknowles changed the title [BEAM-115] Move GroupByKey expansion into DirectPipelineRunner [BEAM-115] Remove GroupByKey expansion, invoke it on a per-runner basis. Mar 28, 2016
@dhalperi
Copy link
Contributor

Tests don't pass.

* <p>This implementation of {@link GroupByKey} proceeds by reifying windows and timestamps (making
* them part of the element rather than metadata), performing a {@link GroupByKeyOnly} primitive,
* then using a {@link GroupAlsoByWindow} transform to further group the resulting elements by
* window.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This notably only functions as a composite if the input PCollection is Bounded, due to the choice of implementation for GroupAlsoByWindows; Additionally, it makes assumptions about the form in which it will recieve per-key input at the point of GroupAlsoByWindow (namely the entire Iterable<T> for each key), so it is not a general implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted in javadoc.

@kennknowles
Copy link
Member Author

Fixed up the tests.

@kennknowles
Copy link
Member Author

@amitsela any comment on expanding GBK in the spark runner? It should leave the behavior exactly as it was before. This is the intended method of runner-specific replacements until the new pipeline transformation API is ready.

@lukecwik
Copy link
Member

@davorbonaci Will pass on this PR.

* <li>{@code SortValuesByTimestamp ParDo(SortValuesByTimestamp)}: The values in the iterables
* output by {@link GroupByKeyOnly} are sorted by timestamp.</li>
* <li>{@code GroupAlsoByWindow}: This primitive processes the sorted values. Today it is
* implemented as a {@link ParDo} that calls reserved internal methods.</li>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either all link or all code?

@dhalperi
Copy link
Contributor

dhalperi commented Apr 5, 2016

LGTM

@kennknowles kennknowles closed this Apr 5, 2016
asfgit pushed a commit that referenced this pull request Apr 5, 2016
asfgit pushed a commit that referenced this pull request Apr 5, 2016
@kennknowles kennknowles deleted the GBK branch April 19, 2016 17:23
iemejia referenced this pull request in iemejia/beam Jan 12, 2018
mareksimunek pushed a commit to mareksimunek/beam that referenced this pull request May 9, 2018
mareksimunek pushed a commit to mareksimunek/beam that referenced this pull request May 9, 2018
mareksimunek pushed a commit to mareksimunek/beam that referenced this pull request May 9, 2018
mareksimunek pushed a commit to mareksimunek/beam that referenced this pull request May 9, 2018
dmvk pushed a commit to dmvk/beam that referenced this pull request May 15, 2018
dmvk pushed a commit to dmvk/beam that referenced this pull request May 15, 2018
dmvk pushed a commit to dmvk/beam that referenced this pull request May 15, 2018
mareksimunek pushed a commit to seznam/beam that referenced this pull request Jul 9, 2018
mareksimunek pushed a commit to seznam/beam that referenced this pull request Jul 9, 2018
mareksimunek pushed a commit to seznam/beam that referenced this pull request Jul 9, 2018
dmvk pushed a commit to seznam/beam that referenced this pull request Aug 17, 2018
dmvk pushed a commit to seznam/beam that referenced this pull request Aug 17, 2018
dmvk pushed a commit to seznam/beam that referenced this pull request Aug 17, 2018
dmvk pushed a commit to seznam/beam that referenced this pull request Oct 5, 2018
dmvk pushed a commit to seznam/beam that referenced this pull request Oct 5, 2018
dmvk pushed a commit to seznam/beam that referenced this pull request Oct 5, 2018
hengfengli referenced this pull request in hengfengli/beam Mar 21, 2022
* fix: avoid using read as transaction can get stuck

There is a bug in the current java-spanner client library used, where
if specific conditions are met, a transaction might get stuck while
performing a read call. This has been fixed in later versions of the
client library, but Apache Beam still uses a version without the fix.

In order to work around the issue, we do not use any read calls, but
instead do the same with a streaming execute query instead.

* feat: add todo to rollback to read in dao

Adds todo to rollback to use read when java-spanner library is updated
to contain the fix.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants