-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-7738] Add external transform support to PubsubIO #9268
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| SimpleFunction<PubsubMessage, T> parseFn = | ||
| (SimpleFunction<PubsubMessage, T>) new IdentityMessageFn(); | ||
| setParseFn(parseFn); | ||
| // FIXME: call setCoder(). need to use PubsubMessage proto to be compatible with python |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case that an external sdk has requested needsAttributes, this transform needs to produce PubsubMessage instances rather than byte arrays. There's a wrinkle here: python deserializes PubsubMessage using protobufs, whereas Java encodes with PubsubMessageWithAttributesCoder.
I believe we want to use protobufs from Java. Can I get confirmation of that, please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I serialized the PubsubMessage using protobufs. Since there's no cross-language coder for PubsubMessage, and I assumed it would be overreach to add one, I used the bytes coder and then handled converting to and from protobufs in code that lives close to the transforms.
| args['id_label'] = _encode_str(self.id_label) | ||
|
|
||
| # FIXME: how do we encode a bool so that Java can decode it? | ||
| # args['with_attributes'] = _encode_bool(self.with_attributes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the right way to encode a bool so that Java can decode it correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I encoded as int and handled the cast in java
|
R: @robertwb |
45f3637 to
8f3ede3
Compare
|
Thanks @mxm. This is actually not yet working yet, and I'm getting some mysterious errors deep within flink wrt serialization. I'm working on writing tests and gathering as much info as I can so that I can report back here. So far I'm pretty perplexed, but I'm new to Beam, Flink, and Java, so not a big surprise! Once I have more info, it would be great to have your input. |
|
Here's some info on where I am with this. I could really use some help to push this over the finish line. The expansion service runs correctly, sends the expanded transforms back to python, but the job fails inside Java on Flink because it's trying to use the incorrect serializer. There's a good chance that I'm overlooking something very obvious. Here's the stack trace: Here's the serializer that Here's the value that's being serialized: Obviously ByteArrayCoder is not the right serializer for PubsubMessage. Note that I get the same error whether So why is The graph that's generated after expansion looks right to me. Here's a table of the primitive transforms and their PCollections (note the last row is a python pardo).
So as far as the Pipeline definition is concerned, it seems like the output of Any ideas? The last thing that's important to note is that I needed |
|
Ok, I now know how the In I'm unclear why this coder swap is necessary. The java part of this pipeline is a What's the proper solution here? The last java ParDo is the one that's ensuring we have a byte array for sending to python, but evidently this needs to be happening in the Read itself? |
2b02422 to
6f41f12
Compare
|
PubsubIO is officially working on Flink! Some notes:
There's one giant caveat: this actually does not work unless you have this special commit: chadrik@d12b990. It turns out the same is currently true for KafkaIO external transform support, which came as quite a surprise. The Jira issue for this problem is here: https://jira.apache.org/jira/browse/BEAM-7870. I'd love to see some movement on this. Happy to help where I can! |
|
I think this PR is ready to merge, but I could use a little help understanding how to resolve the test failures. Java PreCommit has no test failures, so I'm not sure why it's marked as failed here. Is it because of the warnings? Jenkins seems to imply that anything over 12 warnings is an error, but I don't see any warnings in files that I changed: Likewise Python PreCommit list no failures in jenkins, but is marked as failed here. And Portable_Python PreCommit just seems to have timed out. |
|
Run Portable_Python PreCommit |
|
Run Python PreCommit |
|
Hi. Still hoping for a little help bringing this to a close. |
|
Build results have expired: https://builds.apache.org/job/beam_PreCommit_Java_Commit/7551/ |
|
Retest this please |
It will work as is, but it should be converted to the new API to provide more real-world examples. I'll do that today. If you could help me diagnose the Java test failure it would be much appreciated! |
|
Will take a look ASAP, on a low time budget at the moment :) |
|
Seems like the Java PreCommit failure is for the new test ? Stacktrace is: |
|
Python failure seems to be a docs issue (:sdks:python:test-suites:tox:py2:docs'.) Could be a flake. Retrying. |
|
Run Python PreCommit |
6f41f12 to
fcbffb7
Compare
|
@chadrik Seems like you fixed the issue. Java tests are passing now: https://builds.apache.org/job/beam_PreCommit_Java_Commit/7889/ The one test failure is unrelated to your changes: |
9f32963 to
53af6bf
Compare
|
This is now waiting on the BooleanCoder in python |
53af6bf to
774373a
Compare
|
Run Java PreCommit |
|
Any update here? |
|
We are doing some final testing in production today and then I’ll hopefully
give you the thumbs up.
…-chad
On Wed, Oct 16, 2019 at 3:13 AM Maximilian Michels ***@***.***> wrote:
Any update here?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#9268?email_source=notifications&email_token=AAAPOE7GFKHITKQ6QNXCTADQO3SNBA5CNFSM4IJPKC42YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEBL6KBI#issuecomment-542631173>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAAPOEZENHLSW67DPSEA463QO3SNBANCNFSM4IJPKC4Q>
.
|
04f32a0 to
63cabe7
Compare
|
Run Python PreCommit |
3 similar comments
|
Run Python PreCommit |
|
Run Python PreCommit |
|
Run Python PreCommit |
|
@mxm This is tested on prem and ready to go! (with the caveat that the special commit to add dependencies is still required for end users, as is the case with kafka: chadrik/beam@d12b990) |
mxm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice. This is ready from my side, but I would follow-up with doc strings in the Python API to list the limitations. This should also be done with ReadFromKafka/WriteToKafka. Perhaps we should even add the additional commits as part of Beam, at least until we have worked around the coders problems?
I think it would be great for this to work out of the box, but I'm not sure what the ramifications of adding those commits would be. They don't just modify the standard coders, they also change the standard beam java docker containers by adding additional dependencies. Is it safe to add those even if people aren't using pubsub/kafka? Is the only downside larger container sizes? @TheNeuralBit How close are we to having schema coders ready to use? IIUC, we should be able to register a converter between Row -> PubsubMessage, right? What's the mechanism for that? It would be great to put that to use here as soon as the schema coders are ready. |
@robertwb gave #9188 a LGTM, I just need to get CI passing and I think we can merge it. I'll work on that now. Are you thinking you'd use beam:coder:row:v1 as the interface for the external transform, and the Java ExternalTransform implementations would handle the conversion of Row to/from PubsubMessage? There's no trivial way to register a converter between Row and PubsubMessage since the latter isn't structured, but of course on the Java side we could have code to serialize the Row to a variety of formats to put in the PubsubMessage payload: Avro, JSON, or the row serialization format itself (although I'm not sure we'd want to encourage using that outside of Beam), would be pretty simple to add. Maybe the format to use could be part of the external transform payload. |
There are two places that I see beam:coder:row:v1 being useful:
Right now I'm mostly thinking about the latter, which is when
Maybe I'm thinking about this wrong, but I think the public class PubsubMessage {
private byte[] message;
private Map<String, String> attributes;
private String messageId;
/** Returns the main PubSub message. */
public byte[] getPayload() {
return message;
}
/** Returns the full map of attributes. This is an unmodifiable map. */
public Map<String, String> getAttributeMap() {
return attributes;
}
/** Returns the messageId of the message populated by Cloud Pub/Sub. */
@Nullable
public String getMessageId() {
return messageId;
}I'm not a Java expert by any means, but this seems like a type that would work with AutoValue, we just need to rename What are the requirements for registering a Row converter?
I think the payload is not a concern when it comes to portability of external transforms: it gets encoded/decoded by another transform, not PubsubRead/Write. We can just assume that's a byte array. My grasp on the Java side is a bit tenuous, so I'd like for @mxm to confirm or deny what I've written here. |
Agreed, that's what I'm thinking about too.
Ah ok, fair. I was referring specifically to the structure (or lack thereof) of the byte array payload, but you're right the (Python SDK) user can handle creating a byte array themselves, and the row coder can just encode
JavaThere are a variety of ways to do it. If it were a class that we had control over we could use the @reuvenlax may have a better suggestion. PythonWith my PR I think it could look like: # this is py3 syntax for clarity, but we'd probably
# need to use the TypedDict('PubsubMessage', ...) version
class PubsubMessage(TypedDict):
message: ByteString
attributes: Mapping[unicode, unicode]
messageId: unicode
coders.registry.register_coder(PubsubMessage, coders.RowCoder)
pcoll
| 'make some messages' >> beam.Map(makeMessage).with_output_types(PubsubMessage)
| 'write to pubsub' >> beam.io.WriteToPubsub(project, topic) # or something |
It may reduce our overall technical debt if we just implement the full set of setters and getters on |
|
D'oh my bad, we do have control over |
|
@mxm I marked kafak and pubsub external transforms as external. I think this is ready to merge. |
|
@TheNeuralBit On the python side, as with the Java SDK, there is a custom In your example above, you registered the |
It hasn't been designed, but that's definitely something I'd like to support. Would we actually need the |
| from apache_beam.transforms.external import ExternalTransform | ||
| from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder | ||
|
|
||
| ReadFromPubsubSchema = typing.NamedTuple( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't have to be done in this PR, but in the future it will be great if we just move external PubSub and Kafka transforms to io/gcp/pubsub.py and io/kafka.py and configure/branch based on pipeline options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love that idea. The two sets of xforms are do not have the same arguments, in part because of the need for the expansion service endpoint, but also because the Java versions are not guaranteed to be the same as the python versions (though I would like that to be the case). Even the expansion service argument could somehow be passed through the options, the two sets of xform classes may have different base classes: ideally the ones in io.external inherit from ExternalTransform (they currently do not, but they should be able to once we resolve the issue we're working on with @TheNeuralBit )
| ) | ||
|
|
||
|
|
||
| class ReadFromPubSub(beam.PTransform): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename to avoid conflicts with ReadFromPubSub/WriteToPubSub in io/gcp/pubsub.py ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a feature that they are named the same. I can take a pipeline designed to run on dataflow, and simply change the imports to get it to run on flink.
We would not need to use PubsubMessage protobuf type any more (and thus we wouldn't need the conversion methods), but we may still want the beam PubsubMessage class. The latter is yielded by |
|
@TheNeuralBit let's move the conversation about row coders over to https://jira.apache.org/jira/browse/BEAM-7870 This PR is ready to merge! |
|
LGTM. Thanks. Looks like Max already approved so I'll merge. |
|
Whoohoo!
…On Fri, Oct 25, 2019 at 11:00 AM Chamikara Jayalath < ***@***.***> wrote:
Merged #9268 <#9268> into master.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#9268?email_source=notifications&email_token=AAAPOEYX42TXIM2QSI7AW73QQMX2BA5CNFSM4IJPKC42YY3PNVWWK3TUL52HS4DFWZEXG43VMVCXMZLOORHG65DJMZUWGYLUNFXW5KTDN5WW2ZLOORPWSZGOUOOZZGQ#event-2745015450>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAAPOE75YLYKHSGQWVCSMZTQQMX2BANCNFSM4IJPKC4Q>
.
|
|
Congrats @chadrik! :) Also thanks for your help @chamikaramj and @TheNeuralBit. |
Add support for using PubSubIO as an external transform, so that it works from python on portable runners like Flink.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.