Skip to content

Conversation

@alexvanboxel
Copy link
Contributor

The ProtoCoder was unable to handle DynamicMessage as it was unable to
get a message specific parser. The Coder is expanded to handle
DynamicMessage as a special case. It stores the complete descriptor set
when (de)serializing.

Design decision: Although DynamicMessage could in theory have a
different schema per message in a single stream this doesn't make sense.
The common use-case for using DynamicMessages is when the schema is not
known at compile type, but is known at pipeline build time (example,
like pulled from a schema registry).

With this restriction we only need to store the schema (or descriptor)
once when the pipeline is serialized and send to the workers.

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Python Build Status
Build Status
--- Build Status
Build Status
Build Status --- --- ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@alexvanboxel
Copy link
Contributor Author

@mxm @reuvenlax @kennknowles This is the first draft of supporting ProtoBuf DynamicMessages. I took a design decision here (see commit) and like your feedback.

ref: https://issues.apache.org/jira/browse/BEAM-5967?filter=-1&jql=project%20%3D%20BEAM%20AND%20assignee%20in%20(currentUser())%20order%20by%20updated%20DESC

@mxm
Copy link
Contributor

mxm commented May 8, 2019

Thanks @alexvanboxel. I'll defer to Kenn and Reuven for now.

@alexvanboxel alexvanboxel force-pushed the feature/dynamic-message branch from b7929e8 to cb4fac3 Compare May 9, 2019 18:25
@alexvanboxel
Copy link
Contributor Author

Added extra check to make sure that when a DynamicMessage is used a Descriptor is provided.

@alexvanboxel alexvanboxel force-pushed the feature/dynamic-message branch from cb4fac3 to 0660124 Compare May 11, 2019 15:01
@alexvanboxel
Copy link
Contributor Author

Rebased against master to keep PR uptodate

this.protoMessageDescriptor = protoMessageDescriptor;
}

private void writeObject(ObjectOutputStream oos) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, interesting. It is a problem that ProtoCoder is a CustomCoder serialized via Java serialization rather than a StructuredCoder. Not related to your change, just noting. This essentially makes it incompatible with pipeline update. @reuvenlax

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I pulled the upgradability of by using (and making protoMessageClass transient):

oos.defaultWriteObject();
if (DynamicMessage.class.equals(this.protoMessageClass)) {

I don't know if a StructuredCoder would help because a Descriptor is quite a complex beast (I serialize the complete proto binary on the stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof, that's pretty bad. Maybe we should change that?

An alternative would be to just put the DynamicMessage support in a ProtoSchemaProvider, and encourage people to stop using ProtoCoder altogether.

Copy link
Contributor Author

@alexvanboxel alexvanboxel May 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, | don't see another way todo this. I need to get the Descriptor to the workers. The only way I see it working is sending the FileDescriptorSet as a proto steam onto the Java serialized object stream.

Getting everyone on ProtoSchemaProvider will have the exact same problem I'm afraid and I even think it will be worse as there is no way of sharing payload between the toRow and fromRow functions. I'll know in a few days when I got a working prototype.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be cleaner if you had class DynamicMessageProtoCoder extends ProtoCoder? Then you wouldn't need to branch based on protoMessageClass == DynamicMessage you just override the relevant functions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering the same thing. Might be cleaner to have a separate Coder for this case.

Copy link
Contributor Author

@alexvanboxel alexvanboxel Oct 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I split the DynamicProtoCoder from the ProtoCoder

@alexvanboxel alexvanboxel force-pushed the feature/dynamic-message branch from 0660124 to 6ca429b Compare May 12, 2019 08:23
@alexvanboxel
Copy link
Contributor Author

Force pushed to resolve weak type of of(Descriptors.Descriptor, also added missing comment.

@reuvenlax
Copy link
Contributor

reuvenlax commented May 12, 2019 via email

@alexvanboxel
Copy link
Contributor Author

alexvanboxel commented May 13, 2019

@reuvenlax @kennknowles if upgradeability is a problem I think I handled this correctly, please note this extract from the serialization spec:

5.6.2 Compatible Changes
Adding writeObject/readObject methods - If the version reading the stream has these methods then readObject is expected, as usual, to read the required data written to the stream by the default serialization. It should call defaultReadObject first before reading any optional data. The writeObject method is expected as usual to call defaultWriteObject to write the required data and then may write optional data.

ref: https://docs.oracle.com/javase/8/docs/platform/serialization/spec/version.html

This is why:

  1. on line 285 the descriptor is added transient
  2. on line 303 the writeObject (and readObject) as added as in the spec (compatible)
  3. on line 304 the oos.defaultWriteObject(); is added as in the spec (compatible)
  4. on line 305 writing the Descriptor only kicks in when the Class is DynamicMessage. Note that we will have no one that will have a pipeline running with DynamicMessage. This is because DynamicMessage doesn't have a getDescriptor() without arguments. (previous version of the Coder assumed this).

@kennknowles
Copy link
Member

You should also set serialVersionUID to a fixed value.

@alexvanboxel
Copy link
Contributor Author

Setting the serialVersionUID will break compatibility. I admit, my implementation is only forward compatible, although backward compatibility is not an issue because I only add bytes to the stream when it's a DynamicMessage.

Anyway, I'm have the impression this will never get accepted this way: I'll propose I override the ProtoCoder (calling it DynamicMessageCoder) and do the special logic there only for DynamicMessage. WDYT?

@kennknowles
Copy link
Member

If you set serialVersionUID to exactly what it already is, then you keep compatibility nicely, no?

@alexvanboxel alexvanboxel force-pushed the feature/dynamic-message branch from 6ca429b to 0402c5b Compare May 25, 2019 15:08
@alexvanboxel
Copy link
Contributor Author

@kennknowles had to take some time for this. I set the serialVersionUID to the one before my changes, and also added a test to keep it locked. I also backtracked to see what could make the UID change and too my surprise the only thing that made it change was the addition of the public static of(Descriptor) method.

I also rebased again master.

@kennknowles kennknowles self-requested a review June 10, 2019 16:15
@mxm
Copy link
Contributor

mxm commented Jul 4, 2019

@kennknowles @reuvenlax Should this go through another review round?

@alexvanboxel
Copy link
Contributor Author

alexvanboxel commented Jul 4, 2019 via email

@alexvanboxel alexvanboxel force-pushed the feature/dynamic-message branch from 0402c5b to 02304ca Compare August 29, 2019 22:51
@alexvanboxel
Copy link
Contributor Author

@reuvenlax @kennknowles , The ProtoDomain code from the Proto Schema support is now backported to this pull-request. I've created a pipeline on Dataflow with lots of static Protobufs and performed an upgrade for 2.14.0 -> 2.16.0-SNAPSHOT (build locally from this PR).

What is ProtoDomain you will ask... well it's an immutable wrapper around the protobuf descriptor set that is not serializable. It takes over serialization with custom read/write object. It also provides indexes on everything on the set (the indexes are transient). If instantiating the ProtoCoder with a shared ProtoDomain object equality with the DynamicMessages is guaranteed.

How is upgradability handled? ProtoCoder has now a fixed serialVersionUID = -5043999806040629525L, this is the UID from before the changes. The reference to ProtoDomain and the messageName are transient and handled by the custom read/write object methods.

It would be great if this still made the 2.16.0 release.

@alexvanboxel alexvanboxel force-pushed the feature/dynamic-message branch from 02304ca to 545ba9d Compare September 7, 2019 08:39
@pabloem
Copy link
Member

pabloem commented Oct 1, 2019

r: @TheNeuralBit

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay. This seems quite nice and useful. I am not expert in proto descriptor wrangling. I would guess if there is a problem in that, @reuvenlax is more likely to catch it.

Maybe follow up with more extensive testing exercising lots of cases of ProtoDomain?


/**
* Verifies that for the given {@code Coder<T>}, {@code Coder.Context}, and value of type {@code
* T}, encoding followed by decoding yields an equal value of type {@code T}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one doesn't test that it is equal, but tests that the matcher succeeds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed description

return map;
}

private static Descriptors.FileDescriptor convertToFileDescriptorMap(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Nullable. I'm a bit sad our analyses didn't catch this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added @nullable (stange that github doesn't mark this file as Outdated)

@alexvanboxel
Copy link
Contributor Author

alexvanboxel commented Oct 10, 2019 via email

@alexvanboxel
Copy link
Contributor Author

I'll fix the comments tommorow (it's evening now)

@pabloem
Copy link
Member

pabloem commented Oct 15, 2019

Thanks Alex! @reuvenlax @TheNeuralBit to review : )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be too onerous to add a few more tests here? Maybe set some more fields and some nested fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added another test testDynamicNestedRepeatedMessage that adds another repeated nested message.

@TheNeuralBit
Copy link
Member

LGTM overall. I'm not an expert in proto descriptor wrangling either though, so some more testing (or confirmation from Reuven) would make me feel better.

On the other hand, #8690 does test this more thoroughly, and it would be nice to get to remove these changes from that diff.

@alexvanboxel alexvanboxel force-pushed the feature/dynamic-message branch from 442eeee to ed1d964 Compare October 18, 2019 05:36
@alexvanboxel
Copy link
Contributor Author

On the other hand, #8690 does test this more thoroughly, and it would be nice to get to remove these changes from that diff.

Backposted the tests from the schema aware PR. If we get at least this PR in 2.17 the schema aware one will be somewhat smaller.

@TheNeuralBit
Copy link
Member

Can this be merged before the 2.17 cut (scheduled for Wednesday, October 23)?

@pabloem
Copy link
Member

pabloem commented Oct 21, 2019

Can this be merged before the 2.17 cut (scheduled for Wednesday, October 23)?

@reuvenlax

The ProtoCoder was unable to handle DynamicMessage as it was unable to
get a message specific parser. The Coder is expanded to handle
DynamicMessage as a special case. It stores the complete descriptor set
when (de)serializing.

Design decision: Although DynamicMessage could in theory have a
different schema per message in a single stream this doesn't make sense.
The common use-case for using DynamicMessages is when the schema is not
known at compile type, but is known at pipeline build time (example,
like pulled from a schema registry).

With this restriction we only need to store the schema (or descriptor)
once when the pipeline is serialized and send to the workers.
@alexvanboxel alexvanboxel force-pushed the feature/dynamic-message branch 2 times, most recently from 63dca7f to 1117a40 Compare October 22, 2019 06:43
@alexvanboxel alexvanboxel force-pushed the feature/dynamic-message branch from 1117a40 to 64829e1 Compare October 22, 2019 06:50
@reuvenlax
Copy link
Contributor

lgtm

@pabloem pabloem merged commit f67e3d8 into apache:master Oct 24, 2019
@pabloem
Copy link
Member

pabloem commented Oct 24, 2019

Thanks everyone! Glad we were able to get this in : )

@mxm
Copy link
Contributor

mxm commented Oct 25, 2019

Congrats @alexvanboxel! :)

@TheMatrix97
Copy link

Hi! Awesome work, thanks! When will this feature be available at maven central?

@TheNeuralBit
Copy link
Member

Hey @TheMatrix97! I think this made it into the 2.17.0 release cut that happened last week. Usually it takes a few weeks to get a release ready after the cut. When it's finished there will be an announcement on the mailing list and the blog.

Alternatively you should be able to get this feature if you use the 2.18.0-SNAPSHOT version.

@mxm
Copy link
Contributor

mxm commented Oct 29, 2019

This doesn't appear to be in the release-2.17.0 branch. So it will be part of Beam 2.18.0 release. Like Brian mentioned, you could still use the snapshot version. Or beg on the mailing list, that you want this to be in 2.17.0 ;)

@TheMatrix97
Copy link

@TheNeuralBit Actually I don't see the 2.18-SNAPSHOT version at Maven central repo :( The most recent one is 2.16.

@pabloem
Copy link
Member

pabloem commented Oct 29, 2019

ah every now and then the snapshot publishing seems to run into trouble. I'll ping the mailing list about it...

@TheMatrix97
Copy link

@pabloem seems it's not published at maven central, but you can find it in the apache repo https://repository.apache.org/content/groups/snapshots/org/apache/beam/beam-sdks-java-core/2.18.0-SNAPSHOT/

@alexvanboxel alexvanboxel deleted the feature/dynamic-message branch January 2, 2020 07:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants