-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-7274] Implement the Protobuf schema provider #8690
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-7274] Implement the Protobuf schema provider #8690
Conversation
8f3473c to
799e5b3
Compare
|
@reuvenlax here is the first draft of Protobuf support. I hope the unit test Things to discuss: Unknown fields. |
|
I'm still reviewing this. A few high-level thoughts off the bat:
|
|
@reuvenlax I'll wait for your complete review, but keep in mind that I wanted a uniform way for handling concrete typed classes and DynamicType (DynamicType do require access via the FieldDescriptor). I had trouble getting it in the same way as the AvroUtils, but maybe this is due to my unfamiliarity with it. (I'm open to try to convert it, as most of the effort is spend in the tests anyway. The for internal use only note in the Java doc didn't seem encouraging to use though. On LogicalTypes: Are you talking about Fixed32/Fixed64/UInt32/UInt64? I simply took the Scalar mapping described here: https://developers.google.com/protocol-buffers/docs/proto#scalar If you are talking about Timestamp... well I like to have this configurable. Having it default to DATETIME will be a helper for making it available in Beam SQL or in output IO's like BigQuery. |
|
Addition info, so I tried to get the logic using the FieldValueGetter with partial success, see experiment: alexvanboxel@d1a8e16 My main issue is with RowWithGetters private |
|
I got it to work with the RowWithGetters but not without modifying it. See alexvanboxel@1da5715 I keep on handling the List/Map/Row with the Getters as I need more context then the other implementations. Personally I find this better and more flexible then doing it in the I looked for reference to the |
...tensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTest.java
Show resolved
Hide resolved
...a/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchema.java
Show resolved
Hide resolved
...a/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchema.java
Outdated
Show resolved
Hide resolved
| * | ||
| * <p>Follow https://github.com/protocolbuffers/protobuf/issues/6101 for the state of the feature | ||
| * request in ProtoBuf. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you instead convert the Descriptor to a proto (using the Descriptor.toProto method) and store that? Then I think you won't need this class at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I removed this class as it was not super elegant, but .toProto() is not a solution as it only works in one way. A descriptor is always contained within a FileDescriptor.
I've introduced a Domain class that could group descriptors that are related with each other. This is an important component that a ProtoBuf schema registry should provision to make all Descriptors crosswired. See the Javadoc in the ProtoDomain class.
...java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoRow.java
Outdated
Show resolved
Hide resolved
...java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoRow.java
Outdated
Show resolved
Hide resolved
...java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoRow.java
Outdated
Show resolved
Hide resolved
...java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoRow.java
Outdated
Show resolved
Hide resolved
...a/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchema.java
Outdated
Show resolved
Hide resolved
|
ok, I was wondering if I should take action on this. Get it in master and
then rebase the other section.
_/
_/ Alex Van Boxel
…On Thu, Jun 6, 2019 at 7:49 PM reuvenlax ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoRow.java
<#8690 (comment)>:
> +import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Instant;
+
+/**
+ * ProtoRow extends the Row and does late materialisation. It hold a reference to the original proto
+ * message and has an overlay on each field of the proto message. It doesn't have it's own Coder as
+ * it relies on the SchemaCoder to (de)serialize the message over the wire.
+ *
+ * <p>Each row has a FieldOverlay that handles specific field conversions, as well has special
+ * overlays for Well Know Types, Repeatables, Maps and Nullables.
+ */
Yes. I want to review this first though.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#8690?email_source=notifications&email_token=AAE4EM5RTN33LJTSVV6H6ZLPZFE2XA5CNFSM4HPXM2D2YY3PNVWWK3TUL52HS4DFWFIHK3DMKJSXC5LFON2FEZLWNFSXPKTDN5WW2ZLOORPWSZGOB22KVRQ#discussion_r291299236>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAE4EM4BC2IAUU2VEUOP2CDPZFE2XANCNFSM4HPXM2DQ>
.
|
|
Yes. I'd prefer to keep Row as a somewhat sealed class and not introduce extra subtypes. However there was enough other things to review in this PR that I figured I could review that first. |
|
OK, I'll fix the issues you mentioned, although I doubt that I can do the
.toProto as it probably lacks
contextual information to reconstruct the descriptors, as the Descriptor
has a lot of possibilities to
reference other components like Enums and other Messages.
But I'll try...
_/
_/ Alex Van Boxel
…On Thu, Jun 6, 2019 at 7:52 PM reuvenlax ***@***.***> wrote:
Yes. I'd prefer to keep Row as a somewhat sealed class and not introduce
extra subtypes. However there was enough other things to review in this PR
that I figured I could review that first.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#8690?email_source=notifications&email_token=AAE4EM4Y265L3KH2IBSAYBDPZFFHLA5CNFSM4HPXM2D2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODXDUVAY#issuecomment-499600003>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAE4EM63XJIOB4GXYV3MEFTPZFFHLANCNFSM4HPXM2DQ>
.
|
782b6b9 to
cfadb9b
Compare
Answered (and fixed comments), I also rebased the experiment to remove the ProtoRow as a subtype: alexvanboxel@1629557 |
| public Builder withFieldValueGettersHandleCollections(boolean collectionHandledByGetter) { | ||
| this.collectionHandledByGetter = collectionHandledByGetter; | ||
| return this; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm having trouble understanding what this is for. Can you explain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, maybe the name could be better, but it means that the the FieldValueGetters also handle collections like ARRAY, ROW, MAP. If you go to the original implementation ( https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java#L90 ) you see that RowWithGetters does naive handling of ARRAY, ROW and MAP, for protobuf you need more context (the descriptor) to handle them. That's why I need to disable the naive mapping and let the FieldValueGetters handle ARRAY, ROW and MAP.
Feel free to suggest a better name though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reuvenlax is this the last remaining concern you have for this PR?
#8690 (comment) is a good reference for the motivation of this as well.
I'm not crazy about the way this is implemented since it's adding state to RowWithGetter that will get checked every time a collection field is accessed. I can't think of a better way to do it without some non-trivial refactoring though. Some ideas:
- Rather than storing
collectionHandledByGetterinRowWithGettersand changing behavior based on it, have two alternateRowWithGettersimplementations, one with the special handling for collections and one without. I think this is still an improvement over the separateProtoRowclass that was rejected since it's not explicitly tied to a particularSchemaProvider, in fact it sounds like it could be re-used for AvroGenericRecordinstances. - Move the special logic for collection getters into those
SchemaProvider/FieldValueGetterimplementations that need it, and makeRowWithGettersalways behave as ifcollectionHandledByGetteris true. I think this could be a cleaner approach? But it's challenging because that special logic stores cached values that wouldn't be appropriate to move into theFieldValueGetterimplementations. Maybe aSchemaProvidercould have some way to indicate that a certain row is expensive to access and should be cached inRowWithGetters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've refactored from your input: I've created a RowWithGettersCachedCollection that inherits for RowWithGetters. This cached is the default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you help me understand this a bit more? Why does it not work to cache lists for protocol buffers? We saw repeated array conversion to be a big problem (which is why we cache them). I'm wondering if we could instead cache a lazy array like we do with iterables.
I'll take a closer look at this code to figure it out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would we make a ticket out of this or is this blocking?
...a/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchema.java
Outdated
Show resolved
Hide resolved
...a/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchema.java
Outdated
Show resolved
Hide resolved
...ions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaProvider.java
Outdated
Show resolved
Hide resolved
...ions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaProvider.java
Outdated
Show resolved
Hide resolved
...ions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaProvider.java
Outdated
Show resolved
Hide resolved
...ions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaProvider.java
Outdated
Show resolved
Hide resolved
868ec11 to
37edf7b
Compare
|
@reuvenlax I think I tackled all your concerns you posted. The build is failing, but this is due to a warning in Jenkins that has nothing todo with this PR (I investigated). The PR is also rebased.
It would be great having this in the next release. From then on we can do incremental changes like:
I also backported the ProtoDomain on the DynamicMessage PR to align it with this. Sorry it took so long, but I needed to prioritize something different, but related with Proto (a proto schema registry). |
37edf7b to
2988800
Compare
|
Run Java PreCommit |
|
Requesting @pabloem as a reviewer. |
TheNeuralBit
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @alexvanboxel I'm excited to add one more schema provider to Beam :)
|
|
||
| Message messageCoded = schemaCoderCoded.getFromRowFunction().apply(rowCoded); | ||
| assertEquals(message, messageCoded); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think CoderProperties.coderDecodeEncodeEqual actually implements a lot of the logic here. It even serializes/deserializes the coder before encoding and decoding the elements. What do you think about using that here?
There's also CoderProperties.coderSerializable which just clones the coder and checks that they're equal.
| * external registry that maps those URN's with Descriptors. | ||
| */ | ||
| @Experimental(Experimental.Kind.SCHEMAS) | ||
| public class ProtoSchemaProvider implements SchemaProvider { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the need for withFieldValueGettersHandleCollections(true) the only reason this can't be made to implement GetterBasedSchemaProvider?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, each data type has it's own Provider (eg. AvroSchemaProvider). This class makes sure Proto can be auto mapped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me elaborate. The ProtoSchemaProvider delegates the functionality from to/fromRow to the ProtoSchema class that is, yes, optimized for dynamic types. The ProtoSchema class managed most of the descriptors. that's it doesn't the default GetterBasedProvider.
If we would optimize for generated classes this could switch (but I prefer this to do in a separate PR later on).
| * <li>Protobuf enums are mapped to non-nullable string values. | ||
| * <li>Enum map to their string representation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Are these two lines saying the same thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
| * <li>Protobuf maps are mapped to nullable maps, where empty maps are mapped to the null value. | ||
| * <li>Protobuf repeatables are mapped to nullable arrays, where empty arrays are mapped to the | ||
| * null value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just use non-nullable maps and arrays and produce empty ones when the proto ones are empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Business wise this makes more sense. As proto doesn't distinguish between empty map/array and not set. It's also more performant as if the default would be an empty map/array it would result in a bigger Row. It would be an option to make it later configurable. Kept it as is.
| @Override | ||
| public void set(Message.Builder message, Object value) { | ||
| // builder.setField(fieldDescriptor, | ||
| // convertSetObject(row.getString(fieldDescriptor.getName()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can you remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
|
|
||
| @Test | ||
| public void testRowAndBack() { | ||
| setup(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this could be re-structured a bit so that you could use @Before instead of calling the setup functions yourself. Maybe if you have two different Parameterized test classes that draw from the same data? One for message and one for dynamic message. You can have multiple suites in one outer class if you annotate it with @RunWith(Enclosed.class) (like I did here)
This is definitely a nitpicky thing... totally fine to leave it as is.
938ab37 to
7e9c3b3
Compare
|
This PR is now rebased against master.This removed ProtoDomain (and it's tests) from this PR as it was part of the DymamicMessage PR that is now part of master. I think I've incorporated most of the comment of the review in this PR as well. |
7e9c3b3 to
9f6d4f2
Compare
The default implementation for Protobuf messages. Supports both generated as dynamic messages. Add the initial release of handling of options, converting it to schema metadata. The implementation is not feature complete, be shows the viability of the feature.
Split RowWithGetters in an implementation with a version that caches collections, named RowWithGettersCachedCollection and one where all the types are handled by the FieldGetters named RowWithGetters.
9f6d4f2 to
e5d5c77
Compare
|
@TheNeuralBit / @pabloem can you re-review. Thanks. |
|
Another comment: this delegates everything to Proto's reflection API, which can be quite inefficient. Compare with AvroSchema where we delegate straight to generated classes when they exist. Reflection might be necessary for the case of DynamicMessage (though in that case I think we should use RowWithStorage instead of RowWithGetters), but shouldn't be necessary when we have generated classes. |
Totally agree, but I wanted first a solution that works for every use-case. Optimization can happen afterward. I still have plans for iterative improvements, certainly related to protobuf options first, then optimize for performance (but I have to make performance tests for this to measure this)
RowWithStorage could be a solution when the Row is materialized (with the getToRowFunction), but having the reflection API is still needed it you use the schema without materializing. Example just you the filter on a column function on a schema aware collection but keeping the original type, you don't want to materialize the a big proto just to filter on 1 column. Again, I like to measure this, but we need a base line. |
|
BTW I've checked out the PR to play with it a bit more. My worry is that in its current form "optimizing later" will not be so simple, since it doesn't currently fit in with the framework used by AvroSchema, AutoValueSchema, JavaBeanSchema, etc.. If you don't mind, I'll try and play with it a bit to see if I can get it to fit into that framework a bit better. |
I don't mind, the thing that worries me is that it will miss the 2.18 as well (as it missed the 2.17 already). As we are depending in our business on this feature we are currently building our own version of Beam and that's a pain. For me the DynamicMessage is very important... I was thinking for the normal use-case going to a complete other codepath. |
|
Closing this PR as it's supersede by several other PR that are already on master: Last remaining PR for full proto support is: #10502 Option/Metadata support has been split of and can be followed:
|
The default implementation for Protobuf messages. Supports both
generated as dynamic messages.
NOTE: This implementation doesn't support Unknown Fields. I like to have a discussion with the reviewers how todo this.