-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-270] Support Timestamps/Windows in Flink Batch #328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This adds a Window.Bound translator that allows only GlobalWindows. It is a temporary measure, but one that brings the Flink batch translator in line with the Beam model - instead of "ignoring" windows, the GBK is a perfectly valid GBK for GlobalWindows. Previously, the SDK's runner test suite would fail due to the lack of a translator - now some of them will fail due to windowing support, but others have a chance.
This makes the runner available for selection by integration tests.
Today Flink batch supports only global windows. This is a situation we intend our build to allow, eventually via JUnit category filtering. For now all the test classes that use non-global windows are excluded entirely via maven configuration. In the future, it should be on a per-test-method basis.
We're now using a PerKeyCombineFnRunner for all interaction with the CombineFn. This required adding a proper ProcessContext in FlinkReduceFunction and FlinkPartialReduceFunction, along with adding support for side inputs there.
This does not work because Flink Batch does not allow sending null elements. This is a pretty deep thing and hard to fix. In an earlier commit I removed the special TypeSerializer for VoidCoder. Before, we got away by always intercepting the VoidCoder and wrapping it in a TypeSerializer that would always emit a VoidValue instead of a proper null. If the user fn reads this, this will not be consistent with how it should behave, however.
The single null value is only used as a dummy, thus can also be an integer. This makes it work with runners that don't support sending null values.
This makes it work with runners that don't support sending null values.
The single null value is only used as a dummy, thus can also be an integer. This makes it work with runners that don't support sending null values.
The single null value is only used as a dummy, thus can also be an integer. This makes it work with runners that don't support sending null values.
With this change we always use WindowedValue<T> for the underlying Flink DataSets instead of just T. This allows us to support windowing as well. This changes also a lot of other stuff enabled by the above: - Use WindowedValue throughout - Add proper translation for Window.into() - Make side inputs window aware - Make GroupByKey and Combine transformations window aware, this includes support for merging windows. GroupByKey is implemented as a Combine with a concatenating CombineFn, for simplicity This removes Flink specific transformations for things that are handled by builtin sources/sinks, among other things this: - Removes special translation for AvroIO.Read/Write and TextIO.Read/Write - Removes special support for Write.Bound, this was not working properly and is now handled by the Beam machinery that uses DoFns for this - Removes special translation for binary Co-Group, the code was still in there but was never used With this change all RunnableOnService tests run on Flink Batch.
|
Yes, it will work without that. All the other commits that change from
How should we squash those. Or should we include each commit separately? |
|
Great work @aljoscha 👍 I think it is fine to leave the relevant commits. You could squash all test-related commits into one commit. I'd like to take a look once you've opened the new PR. |
|
I'll open a new PR with only the relevant commits in a sec. There is still the question of the failing Flink specific tests. I can fix them now, but if we remove them anyways because most stuff is now covered by the RunnableOnService tests then I'd rather not do it. This is the list of tests that we currently have for the batch runner: I would say that we only have to keep these: These would basically test that source/sink integration and reading/writing from/to the outside world are correct. (The |
|
It seems there is still a small problem that leads to one test failing, I have not yet pinpointed it. I'll keep you posted. |
|
I finally found the bug and created a cleaned version in #335 |
|
So is this PR defunct? |
|
Yes, sorry, I left it open for the messages and history of commits. |
* Add v1beta3 version of datastore Source/Sink. * Deprecate old DatastoreIO classes * getProject to getDataset
This change sits on top of #291
With this we can run all of the
RunnableOnServicetests on the Flink batch runner. Almost all of our own Flink-specific IT cases fail now because they verified the older, non-complete support for the Beam model. I did not remove them, since we should discuss whether we want to keep them, or keep only those that test a specific integration with Beam. If we want to keep them they have to be fixed.For now
CombineTest.testSessionsCombineWithContextfails. I'm inserting a pre-shuffle combine phase but with merging windows the correct, final window in which an element will reside is not yet know in the pre-shuffle combine. This means that we don't get the correct side input for those values in theCombineFnWithContext. To fix the failing test I can get rid of the pre-shuffle combine, this means that we have more network traffic but are correct. What are the thought on this?For GroupByKey/Combine.PerKey I'm doing a shuffle by key and then an in-memory pass over the windows. For non-merging windows this could also be changed to do a shuffle by key-and-window but then we would have to explode all windows on the send side.
This is the text from the commit in question:
[BEAM-270] Support Timestamps/Windows in Flink Batch
R: @mxm for Flink review
R: @kennknowles, you are probably interested in how the shuffle/reduce works