-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-10529] nullable xlang coder #16923
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-10529] nullable xlang coder #16923
Conversation
…nullable-Xlang-Coder # Conflicts: # runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java # runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java
…sidentify coders as nullable
|
R: @chamikaramj |
Codecov Report
@@ Coverage Diff @@
## master #16923 +/- ##
==========================================
+ Coverage 73.98% 74.10% +0.12%
==========================================
Files 674 675 +1
Lines 88558 88715 +157
==========================================
+ Hits 65521 65746 +225
+ Misses 21916 21822 -94
- Partials 1121 1147 +26
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
…g-Coder' into feature/BEAM-10529-nullable-Xlang-Coder
TheNeuralBit
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @johnjcasey! I have a few suggestions
| // A Nullable coder encodes nullable values of wrapped coder value that does | ||
| // not tolerate null values. A Nullable coder uses exactly 1 byte per entry | ||
| // to indicate whether the value is null, then adds the encoding of the | ||
| // inner coder for non-null values. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you have this clarify that 1 implies non-null? It would also be nice to update the row coder doc (under "Nullable types in container types (ArrayType, MapType)") to refer to this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean 1 byte (\u0000) implies null ? I might be misunderstanding something.
| def test_nullable_coder(self): | ||
| expected_coder = coders.NullableCoder(coders.BytesCoder()) | ||
| real_coder = typecoders.registry.get_coder( | ||
| typehints.UnionConstraint([type(None), type(bytes)])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you construct this with Optional instead of using Union directly?
Also I think bytes is already a type:
>>> bytes
<class 'bytes'>
>>> type(bytes)
<class 'type'>Shouldn't this be Optional[bytes]?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I'll make the change in the test case
| return coders.ListCoder.from_type_hint(typehint, self) | ||
| elif (isinstance(typehint, typehints.UnionConstraint) and | ||
| typehint.contains_type(type(None) and | ||
| len(list(typehint._inner_types())) == 2)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you extract this logic into an is_optional helper in typehints.py?
It also looks like there may be a paren mistake, the len is inside the contains_type call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make sure this is covered by a unit test.
| return NullableCoder.of(VarIntCoder.of()); | ||
| } else if (returnType.equals(Long.class)) { | ||
| return VarLongCoder.of(); | ||
| return NullableCoder.of(VarLongCoder.of()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this going to break the Go Kafka connector?
CC: @youngoli
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will. I checked with @lostluck, and Go doesn't have a nullable coder yet, so that will need to be built before this can be merged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:( sorry, Go didn't have a kafkaio when BEAM-10529 was filed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably we can introduce a new transform that is only exposed through the Python API ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option is to update URN_WITH_METADATA transforms to support nulls:
| public static final String URN_WITH_METADATA = |
Seems like Go SDK uses "URN_WITHOUT_METADATA" transform:
| readURN = "beam:transform:org.apache.beam:kafka_read_without_metadata:v1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think exposing the Go Nullable coder is a good option. That way we have it in the future, and we don't need to introduce work arounds in our code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed that would be the best solution, but to be clear, are you planning on doing that for the next step on this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I'm working on that now. I'm unfamiliar with Go, but making good progress so far
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome! Thank you for taking this on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Thanks.
chamikaramj
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
| examples: | ||
| "\u0001\u0003\u0061\u0062\u0063" : "abc" | ||
| "\u0001\u000a\u006d\u006f\u0072\u0065\u0020\u0062\u0079\u0074\u0065\u0073" : "more bytes" | ||
| "\u0000" : null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify, only the last byte here will be in the an encoded element that has value null ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep. 0000 means null, 0001 means "the rest of this is non-null"
| // A Nullable coder encodes nullable values of wrapped coder value that does | ||
| // not tolerate null values. A Nullable coder uses exactly 1 byte per entry | ||
| // to indicate whether the value is null, then adds the encoding of the | ||
| // inner coder for non-null values. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean 1 byte (\u0000) implies null ? I might be misunderstanding something.
| return NullableCoder.of(VarIntCoder.of()); | ||
| } else if (returnType.equals(Long.class)) { | ||
| return VarLongCoder.of(); | ||
| return NullableCoder.of(VarLongCoder.of()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Thanks.
| return coders.ListCoder.from_type_hint(typehint, self) | ||
| elif (isinstance(typehint, typehints.UnionConstraint) and | ||
| typehint.contains_type(type(None) and | ||
| len(list(typehint._inner_types())) == 2)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make sure this is covered by a unit test.
| elif (isinstance(typehint, typehints.UnionConstraint) and | ||
| typehint.contains_type(type(None) and | ||
| len(list(typehint._inner_types())) == 2)): | ||
| return coders.NullableCoder.from_type_hint(typehint, self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean to use the inner type in this call ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I thought it made more sense to use the full typehint, to try and match the existing coder.from_type_hint pattern
| for t in self.union_types: | ||
| yield t | ||
|
|
||
| def contains_type(self, maybe_type): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the order matter (for nullable and other cases) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional doesn't care about order, and contains type doesn't care about order either
…g-Coder' into feature/BEAM-10529-nullable-Xlang-Coder
…nullable-Xlang-Coder
chamikaramj
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks.
|
Lemme know if this is ready to be merged. |
|
Waiting on @lostluck to review the Go changes, but then I am good to go |
lostluck
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few quick fixes to the Go changes, but otherwise, I like it. Thank you!
| coders := make([]*Coder, 1) | ||
| coders[0] = component |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| coders := make([]*Coder, 1) | |
| coders[0] = component | |
| coders := []*Coder{ component } |
| return &Coder{ | ||
| Kind: Nullable, | ||
| T: typex.New(typex.NullableType, component.T), | ||
| Components: []*Coder{component}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Components: []*Coder{component}, | |
| Components: coders, |
| component *Coder | ||
| shouldpanic bool | ||
| want *Coder | ||
| }{{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
In this case, you should split the }} with a new line, and then run gofmt on this file.
I would strongly recommend having your IDE do so automatically on save anyway.
lostluck
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM for Go. Thanks!
|
The GoPortable error is xlang related but not due to this PR. |
|
run java precommit |
…nullable-Xlang-Coder # Conflicts: # sdks/go/pkg/beam/core/typex/special.go
|
Run Portable_Python PreCommit |
…g-Coder' into feature/BEAM-10529-nullable-Xlang-Coder
|
Failures are due to https://issues.apache.org/jira/browse/BEAM-14231, and this can be merged |
Oh I forgot, GHA checks do not run with changes from HEAD, so it's still running code with this flaky behavior. |
|
@johnjcasey The Actions build breakage was due to bad formatting in core/typex/special.go. I've got a PR out to fix it (linked above) but please don't ignore the build status moving forward! |
|
@jrmccluskey This was my fault, I assumed the Go failure was the one @lostluck was referencing here. |
|
Hi, I got an error on Seems related to this change. Could anyone please have a look, thanks! |
It turns out that this was a problem of my protobuf dependency, which causes sdks/python/gen_protos.py not running to generate new versions of RegularCoders.py. Force install protobuf with c++ implementation resolves this problem. |
* [BEAM-10529] add java and generic components of nullable xlang tests * [BEAM-10529] fix test case * [BEAM-10529] add coders and typehints to support nullable xlang coders * [BEAM-10529] update external builder to support nullable coder * [BEAM-10529] clean up coders.py * [BEAM-10529] add coder translation test * [BEAM-10529] add additional check to typecoder to not accidentally misidentify coders as nullable * [BEAM-10529] add test to retrieve nullable coder from typehint * [BEAM-10529] run spotless * [BEAM-10529] add go nullable coder * [BEAM-10529] cleanup extra println * [BEAM-10529] improve comments, clean up python * [BEAM-10529] remove changes to kafkaIO to simplify pr * [BEAM-10529] add coders to go exec, add asf license text * [BEAM-10529] clean up error handlign * [BEAM-10529] update go fromyaml to handle nullable cases * [BEAM-10529] add unit test, register nullable coder in dataflow.go * [BEAM-10529] remove mistaken commit * [BEAM-10529] add argument check to CoderTranslators * [BEAM-10529] Address python comments & cleanup * [BEAM-10529] address go comments * [BEAM-10529] remove extra check that was added in error * [BEAM-10529] fix typo * [BEAM-10529] re-order check for nonetype to prevent attribute errors * [BEAM-10529] change isinstance to ==
* added initial commit * removed modified file * removed params that dont exist * added clock, removed generics that were causing pickle error, fixed metrics name * fixed class names removed class that goes in apis * added base test file * Added unit tests * reordered imports * replied to comments * apis to api * added license * added mock clock test for metrics, realized our metric wouldn't work right with a generator * Minor changes from Andys comments. Push metric namespace decision to modleLoader class * Update sdks/python/apache_beam/ml/inference/base.py typo fix valentyn's suggestion Co-authored-by: tvalentyn <tvalentyn@users.noreply.github.com> * updated with changes from valentyns comments * merged from tfx version * added comment * linted * changed import order for jenkins linter * added a bug to track a todo * fixed for Roberts comments * make clock and metrics collector private * make shared second parameter * mark RunInferenceDoFn private * moved initialization of shared.Shared into constructor * added todo * Update sdks/python/apache_beam/ml/inference/base.py Co-authored-by: Brian Hulette <hulettbh@gmail.com> * Update sdks/python/apache_beam/ml/inference/base.py Co-authored-by: Brian Hulette <hulettbh@gmail.com> * Update sdks/python/apache_beam/ml/inference/base.py Co-authored-by: Brian Hulette <hulettbh@gmail.com> * updated to correct variable names * udpated variable names * added typevar * remove unbatch * added note that users should expect changes * Update python container version * Add --dataflowServiceOptions=enable_prime to useUnifiedWorker conditions (#17213) * Add self-descriptive message for expected errors. Ideally we would not log these in the first place, but this is an easy hack. * [BEAM-10529] nullable xlang coder (#16923) * [BEAM-10529] add java and generic components of nullable xlang tests * [BEAM-10529] fix test case * [BEAM-10529] add coders and typehints to support nullable xlang coders * [BEAM-10529] update external builder to support nullable coder * [BEAM-10529] clean up coders.py * [BEAM-10529] add coder translation test * [BEAM-10529] add additional check to typecoder to not accidentally misidentify coders as nullable * [BEAM-10529] add test to retrieve nullable coder from typehint * [BEAM-10529] run spotless * [BEAM-10529] add go nullable coder * [BEAM-10529] cleanup extra println * [BEAM-10529] improve comments, clean up python * [BEAM-10529] remove changes to kafkaIO to simplify pr * [BEAM-10529] add coders to go exec, add asf license text * [BEAM-10529] clean up error handlign * [BEAM-10529] update go fromyaml to handle nullable cases * [BEAM-10529] add unit test, register nullable coder in dataflow.go * [BEAM-10529] remove mistaken commit * [BEAM-10529] add argument check to CoderTranslators * [BEAM-10529] Address python comments & cleanup * [BEAM-10529] address go comments * [BEAM-10529] remove extra check that was added in error * [BEAM-10529] fix typo * [BEAM-10529] re-order check for nonetype to prevent attribute errors * [BEAM-10529] change isinstance to == * Fix go fmt break in core/typex/special.go (#17266) * [BEAM-8970] Add docs to run wordcount example on portable Spark Runner * [BEAM-8970] Add period to end of sentence * [BEAM-5436] Add doc page on Go cross compilation. (#17256) * Pr-bot Don't count all reviews as approvals (#17269) * Fix postcommits (#17263) * [BEAM-14241] Address staticcheck warnings in boot.go (#17264) * [BEAM-14157] GrpcWindmillServer: Use stream specific boolean to do client closed check (#17191) * [BEAM-14157] GrpcWindmillServer: Use stream specific boolean to do client closed check This is a follow up to #17162. An AbstractWindmillStream can have more than one grpc stream during its lifetime, new streams can be created after client closed for sending pending requests. So it is not correct to check `if(clientClosed)` in `send()`, this PR adds a new grpc stream level boolean to do the closed check in `send()`. * [BEAM-14157] Add unit test testing CommitWorkStream retries around stream closing * [BEAM-14157] review comments * [BEAM-14157] review comments * [BEAM-14157] review comments * [BEAM-14157] fix test * [BEAM-14157] fix test Co-authored-by: Arun Pandian <pandiana@google.com> * [BEAM-10582] Allow (and test) pyarrow 7 (#17229) * [BEAM-13519] Solve race issues when the server responds with an error before the GrpcStateClient finishes being constructed. (#17240) * [BEAM-13519] Solve race issues when the server responds with an error before the GrpcStateClient finishes. The issue was that the InboundObserver can be invoked before outboundObserverFactory#outboundObserverFor returns meaning that the server is waiting for a response for cache.remove but cache.computeIfAbsent is being invoked at the same time. Another issue was that the outstandingRequests map could be updated with another request within GrpcStateClient during closeAndCleanup meaning that the CompleteableFuture would never be completed exceptionally. Passes 1000 times locally now without getting stuck or failing. * [BEAM-14256] update SpEL dependency to 5.3.18.RELEASE * [BEAM-14256] remove .RELEASE * [BEAM-13015] Disable retries for fnapi grpc channels which otherwise defaults on. (#17243) * [BEAM-13015] Disable retries for grpc channels which otherwise default to true. Since the channel is to the local runner process, retries are not expected to help. This simplifies the grpc stream stack to not involve a RetryStream object. * fixup comment * Update sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java * Update sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java Co-authored-by: Lukasz Cwik <lcwik@google.com> * [BEAM-9649] Add region option to Mongo Dataflow test. * Fix dependency issue causing failures * [BEAM-13952] Sickbay testAfterProcessingTimeContinuationTriggerUsingState (#17214) * BEAM-14235 parquetio module does not parse PEP-440 compliant Pyarrow version (#17275) * Update parquetio.py * Update CHANGES.md * Fix import order * [BEAM-14250] Fix coder registration for types defined in __main__. Until all runners are portable and we can get rid of all round trips between Pipeline and proto representatons, register types in __main__ according to their string representations as pickling does not preserve identity. * Allow get_coder(None). Co-authored-by: Andy Ye <andyye333@gmail.com> * [Website] Contribution guide page indent bug fix (#17287) * Fix markdown indent issue in Development Setup section * update query * [BEAM-10976] Document go sdk bundle finalization (#17048) * [BEAM-13829] Expose status API from Go SDK Harness (#16957) * Avoid pr-bot state desync (#17299) * [BEAM-14259] Clean up staticcheck warnings in the exec package (#17285) * Minor: Prefer registered schema in SQL docs (#17298) * Prefer registered schema in SQL docs * address review comments * [Playground] add meta tags (#17207) * playground add meta tags * playground fix meta tags * fixes golint and deprecated issues in recent Go SDK import (#17304) * [BEAM-14262] Update plugins for Dockerized Jenkins. I copied the list from the cwiki and removed all of the ones that failed to install. https://cwiki.apache.org/confluence/display/INFRA/ci-beam.apache.org * Add ansicolor and ws-cleanup plugins. Without them, the seed job prints warnings: Warning: (CommonJobProperties.groovy, line 107) plugin 'ansicolor' needs to be installed Warning: (CommonJobProperties.groovy, line 113) plugin 'ws-cleanup' needs to be installed * [BEAM-14266] Replace deprecated ptypes package uses (#17302) * [BEAM-11936] Fix rawtypes warnings in SnowflakeIO (#17257) * [BEAM-10556] Fix rawtypes warnings in SnowflakeIO * fixup! [BEAM-10556] Fix rawtypes warnings in SnowflakeIO * Merge pull request #17262: [BEAM-14244] Use the supplied output timestamp for processing time timers rather than the input watermark * removed unused typing * added list typing * linted Co-authored-by: tvalentyn <tvalentyn@users.noreply.github.com> Co-authored-by: Brian Hulette <hulettbh@gmail.com> Co-authored-by: kileys <kileysok@google.com> Co-authored-by: Yichi Zhang <zyichi@google.com> Co-authored-by: Kyle Weaver <kcweaver@google.com> Co-authored-by: johnjcasey <95318300+johnjcasey@users.noreply.github.com> Co-authored-by: Jack McCluskey <34928439+jrmccluskey@users.noreply.github.com> Co-authored-by: Benjamin Gonzalez <benjamin.gonzalez@wizeline.com> Co-authored-by: Robert Burke <lostluck@users.noreply.github.com> Co-authored-by: Danny McCormick <dannymccormick@google.com> Co-authored-by: Arun Pandian <arunpandianp@gmail.com> Co-authored-by: Arun Pandian <pandiana@google.com> Co-authored-by: Brian Hulette <bhulette@google.com> Co-authored-by: Lukasz Cwik <lcwik@google.com> Co-authored-by: johnjcasey <johnjcasey@google.com> Co-authored-by: scwhittle <scwhittle@users.noreply.github.com> Co-authored-by: Arwin Tio <arwin.tio@adroll.com> Co-authored-by: Robert Bradshaw <robertwb@gmail.com> Co-authored-by: Andy Ye <andyye333@gmail.com> Co-authored-by: Yi Hu <yathu@google.com> Co-authored-by: Michael Li <bingyeli@google.com> Co-authored-by: Ritesh Ghorse <riteshghorse@gmail.com> Co-authored-by: Aydar Farrakhov <stranniknm@gmail.com> Co-authored-by: Kamil Breguła <kamil.bregula@snowflake.com> Co-authored-by: Steven Niemitz <steveniemitz@gmail.com>
Support KafkaIO with null keys and values. The majority of this change is to enable nullable coders for xlang, and then to make minor changes to the KafkaIO itself to use them
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.