-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[7746] Create a more user friendly external transform API #9098
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
robertwb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a nice improvement.
| """ | ||
| return ConfigValue( | ||
| coder_urn=[urn for urn in iter_urns(coder) | ||
| if urn not in FILTERED_CODERS], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can safely filter out certain coders, as that changes the encoding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Glad you mentioned it :)
I'm confused about why the LengthPrefixCoder urn is not included in the ConfigValue .coder_urns list in the original code. The original list or the kafka consumer_config param was ['beam:coder:iterable:v1', 'beam:coder:kv:v1', 'beam:coder:bytes:v1', 'beam:coder:bytes:v1']. I was trying to keep the end result the same, and I assumed that there were coders which were "write-only", and thus did not need to be transmitted to Java.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not need the length prefix coder here, do we? That's because we get the encoded bytes via Protobuf and do not have to reason about the length.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we do need the lengths if it is, say, KV<LengthPrefixedX, LenthPrefixedY>. What's more, we're getting the already encoded value here (which is different than if we had tried to filter them out before doing the encoding.
Are you seeing instances of LengthPrefixCoder that need filtering out? If so, this deserves further investigation. Otherwise, let's just drop this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this filter.
Note that between this change and the StrUtf8Coder change, the list of coder urns produced by ReadFromKafka has changed from this:
coder_urn: "beam:coder:iterable:v1"
coder_urn: "beam:coder:kv:v1"
coder_urn: "beam:coder:bytes:v1"
coder_urn: "beam:coder:bytes:v1"
to this:
coder_urn: "beam:coder:iterable:v1"
coder_urn: "beam:coder:kv:v1"
coder_urn: "beam:coder:length_prefix:v1"
coder_urn: "beam:coder:string_utf8:v1"
coder_urn: "beam:coder:length_prefix:v1"
coder_urn: "beam:coder:string_utf8:v1"
Likewise for WriteToKafka,
@mxm can you please confirm whether this will require adjustments to the Kafka Java code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not worry about LengthPrefixCoder at all.
Sounds good.
Eventually this can all be replaced by schemas anyway, so not something to worry to much about here.
Can you point me at a document or issue for this? I'd love to learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I played around with this some more. The current design seems to require the LengthPrefixCoder. If I don't wrap the str/bytes coder with length-prefix I get this error:
Caused by: org.apache.beam.sdk.coders.CoderException: java.io.EOFException: reached end of stream after reading 38 bytes; 112 bytes expected
@mxm can you confirm that the current design requires the LengthPrefixCoder for strings?
@robertwb I saw the schema / row-coder PR at #9188. Is this the schema support you were referring to? Is anyone assigned to porting external transforms to using schema coders? Luckily I think most of the discussion here about the design of the interface remains valid even after that's complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coders also have the notion of nested/unnested. It happens that LengthPrefix(UnnestedBytes) == NestedBytes. Perhaps this is part of the issue here?
Yes, that's the schema/row stuff. I was just commenting that this particular part is going to change (no, no one's on it yet) so whatever works here is fine (and should simplify the above issues).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chadrik Sorry for the late reply. Vacations and open-source work do not always go together well :) Actually, for the StringUtf8Coder, the length should be added automatically in the nested context, no need to add the length prefix.
how does "elements_per_period": 20 in our python-based payload end up calling GenerateSequence.Builder.setElementsPerPeriod(20) in Java?
It is a simple mapping scheme which we also use for the pipeline options. It converts snake_case to camelCase and then looks up the setter in the Java configuration class.
@mxm can you confirm that the current design requires the LengthPrefixCoder for strings?
I don't think so, even with the byte encoding in master, the length prefix is added automatically by the ByteArrayCoder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps paste the full stack trace here if you are still seeing problems.
mxm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great @chadrik. Thanks a lot. Some comments inline.
| """ | ||
| return ConfigValue( | ||
| coder_urn=[urn for urn in iter_urns(coder) | ||
| if urn not in FILTERED_CODERS], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not need the length prefix coder here, do we? That's because we get the encoded bytes via Protobuf and do not have to reason about the length.
robertwb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The design looks sound to me.
| typehint is None or not isinstance(typehint, typehints.Optional)): | ||
| # make it easy for user to filter None by default | ||
| continue | ||
| result[k] = cls.config_value(v, typehint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to strip the optional wrapping here (e.g. so an Optional[int] is encoded just as a raw into or omitted)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I think so. I didn't realize that Union/Optional caused coders to fall back to FastPrimitive, which is kind of a bummer.
chamikaramj
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
mxm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @chadrik. For the naming, please see #9098 (comment).
e2af3ba to
7396fdd
Compare
|
OK, taking a step back and thinking about this some more, there are three essential pieces of data that the user must provide for this transform: the urn and the configuration parameters, and how to encode the configuration parameters. This suggests the methods For simplicity, as urns are typically static, we could provide a default
Additionally, This is related to the further magic/ease-of-use, which is allowing a single _schema attribute rather than implementing Does this sound right? |
Spot on, as usual. A few comments below:
Note, we might want
I'm leaning toward requiring
I'd like to avoid the sub-class requirement. More info below.
yes, basically. let me clarify this a little bit more. A user should be able to write a complete class including schema using type annotations, like this: class ReadFromKafka(External):
_urn = 'beam:external:java:kafka:read:v1'
def __init__(self,
consumer_config: Iterable[Tuple[str, str]],
topics: Iterable[str],
key_deserializer: str,
value_deserializer: str,
expansion_service: Optional[str] = None):
super(ReadFromKafka, self).__init__(expansion_service)
self.consumer_config = list(consumer_config.items())
self.topics = topics
self.key_deserializer = key_deserializer
self.value_deserializer = value_deserializerAnd @dataclass
class ReadFromKafka(External):
_urn = 'beam:external:java:kafka:read:v1'
consumer_config: Iterable[Tuple[str, str]]
topics: Iterable[str]
key_deserializer: str
value_deserializer: str
expansion_service: Optional[str] = field(default=None)
The thing I wanted to make clear is that I don't think |
|
On may have transforms with no config params, or whose parameter is a single string. Requiring a schema in this case is extra boilerplate (though arguably good practice, but in the spirit of PEP 484 very optional). To pass a wrong value (e.g. a float when an int is required) would be a runtime type error (like one would expect). A default implementation that looks at the |
|
Run Python PreCommit |
chamikaramj
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Updated API LGTM.
| from apache_beam.transforms.external import ExternalTransform, NamedTupleBasedPayloadBuilder | ||
|
|
||
|
|
||
| ReadFromKafkaSchema = typing.NamedTuple( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 'ReadFromKafkaPayloadTuple' will be a better name I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, I used the name "schema" since ultimately we will switch to using the upcoming schema support when serializing this payload. Not sure if that changes your mind on the naming at all.
| ) | ||
|
|
||
|
|
||
| WriteToKafkaSchema = typing.NamedTuple( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
| return ExternalConfigurationPayload(configuration=args) | ||
|
|
||
|
|
||
| class NamedTupleBasedPayloadBuilder(SchemaBasedPayloadBuilder): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add unit tests for each of these payload builder types.
|
|
||
| Supported in python 3 only. | ||
| """ | ||
| def __init__(self, transform, **values): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a pydocs and define parameters (here and in other places)
| for k, v in config.items(): | ||
| typehint = schema.get(k) | ||
| if v is None and ( | ||
| typehint is None or not isinstance(typehint, typehints.Optional)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So seems like 'None' here means, used default value in remote SDK, right ? We should clarify that in documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The behavior is a bit undefined at this point, since we only have GenerateSequence as an example. In that case, the field is null on the Java side as well.
The reason we skip the fields if they're none here is ultimately because we don't have a cross-platform coder for none/null or a cross-platform way of declaring a type as optional/nullable. So the behavior would be better defined if we could serialize None and send that value to Java.
mxm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chadrik I think everything looks good now. Would be great if we could add the final touches and get this in.
|
@mxm I'll start working on the serialization tests for this today. I haven't tested this recently, but I'm pretty sure that deserialization in Java still does not work because of the LengthPrefix error I described earlier (the Java side seems to expect LengthPrefix, and the python side no longer adds it, since it's difficult to know when to do that automatically). I'll get you the full stack trace soon. |
c86f017 to
250a584
Compare
|
Pushed a lot of updates to this. I fixed up a number of issues and created a new The tests fail because #9344 is required for the implicit schema generation, so it'd be good to get that PR merged soon. @mxm this is ready for you to look at why the string decoding is failing on the java side. |
250a584 to
dafe890
Compare
|
This is looking great, and will be a very nice simplification for authoring external transform stubs. I think the ImplicitSchemaPayloadBuilder is pretty safe because most of the types here will be simple basic ones (I'd say 90%+ just ints, floats, and strings). I reviewed #9344, just needs some more comments. Could you clarify what string decoding issues you were seeing that @mxm was going to look into? I agree with Cham that it'd be good to have tests for these various payload builders. After that, this looks good to go and it'd be great to get in. |
dafe890 to
250a584
Compare
I started to respond there and realized it was more complicated than I thought. I just updated that.
Yes, I have to revisit that test to get the full stack trace, but the gist is that the external transform code in java assumes all strings have been wrapped in LengthPrefixCoder, which is no longer happening in this PR.
Added tests. They will fail until #9344 is merged. |
|
Run Python PreCommit |
|
Run RAT PreCommit |
|
Run Python_PVR_Flink PreCommit |
250a584 to
b9de4bc
Compare
|
Now that all the dependent MRs are in, I can finally demonstrate the coder problem that I was running into. I'll do that tomorrow. |
|
@mxm I could use your help now! With the latest from this branch, I'm using the following to expand an external KafkIO xform: import apache_beam as beam
from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
def main(pipeline_options, args):
pipe = beam.Pipeline(options=pipeline_options)
(
pipe
| 'PubSubInflow' >> ReadFromKafka(
consumer_config={'foo': 'bar'},
topics=['this', 'that'])
)
result = pipe.run()
try:
result.wait_until_finish()
except KeyboardInterrupt:
passAnd I get this error in the expansion service: |
| private Iterable<KV<String, String>> producerConfig; | ||
| private String topic; | ||
| private String keySerializer; | ||
| private String valueSerializer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you change the types here this will affect the lookup of the configuration fields in ExpansionService. The lookup is performed based on the returned type of the coder. The test still uses the bytes coder, so this will attempt to lookup a byte[] field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for the KafkaIOExternal test which I see failing.
|
Forget about the tests for a moment, I don’t think they are relevant to the problem I’m seeing. I will fix them shortly. Try the example code that I sent. The original Kafka code was manually handling conversion between utf8 strings and bytes in both python and java, instead of using the proper string coders. Now that the coders are based on types/schemas in python we are forced to use the correct coder (using bytes in the schema would not have been appropriate because it would have have handled utf8 properly). So fixing this in python forced us to fix it in Java. If you think it would be helpful I can make a separate PR to change KafkaIO to use StringUtf8Coder. |
| payload=VarIntCoder().encode(values['integer_example'])), | ||
| 'string_example': ConfigValue( | ||
| coder_urn=['beam:coder:string_utf8:v1'], | ||
| payload=StrUtf8Coder().encode(values['string_example'])), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not sufficient because the Python StrUtf8Coder does not add a length prefix while the Java one does. Maybe that is actually a bug. The old version was using a length prefix coder before the bytes coder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See
beam/sdks/python/apache_beam/coders/coders.py
Line 326 in 9678149
| def encode(self, value): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Difference between Python and Java UTF-8 coders was discussed before in dev list: https://lists.apache.org/thread.html/a3a0d9e7c4196bb6be14ba0bec103209317dcb98e781560eb3ccd48c@%3Cdev.beam.apache.org%3E
Unfortunately I don't believe we resolved this though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the nested vs. unnested issue back to bite us again.
Here we should be using the nested context consistently. For Python, write coder.get_impl().encode_nested(value). Probably would be worth adding an encode_nested method to coder itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@robertwb Can you provide a bit more explanation for the beam-newbs in the audience (me), please? Does this just ensure lengths prefixed for applicable types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, ever coder actually represents two concrete encodings, a "nested" one for use in an unbounded input stream (e.g. as a stream of many elements, or within a composite element type like list), and a "outer" one for use when the end-of-record must be provided explicitly (e.g. when writing to a text file, were the newline delimits strings and prefixing each line with a string length would be bad). (In Java this is reified in the second Context argument of the encode/decode functions. For Python it's only visible in the impl layer).
For some coders, nested and unnested are identical (e.g. varints or doubles, where the element length is implicit in the encoding). For others, a length prefix is added (e.g. utf8 strings and bytes). For others still, it gets pushed down (e.g. for kv coders, the key is always encoded nested, and the value is encoded in the outer context).
We're trying to move to use the "nested" one everywhere, as this is really confusing (see the linked thread).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should be good here and elsewhere to use the nested encoding. On the Java side, we need to change the decode call in ExpansionServer to use the NESTED parameter. The old code here did not need that because it simply wrapped everything in an additional LengthPrefixCoder, which encode_nested will essentially do as well. So either reverting back to adding a wrapping LengthPrefixCoder or using encode_nested should work. Note that the latter also requires the additional NESTED parameter on the Java side in ExpansionServer.
|
Run Java PreCommit |
c2314c1 to
2f90bc7
Compare
|
Run Java PreCommit |
|
woohoo! |
|
@robertwb there's a commit in this PR that makes it possible to specify a test's minimum major and minor version for python. I use it in this PR to ensure that the dataclasses tests, which rely on syntax changes introduced in python 3.6, and the dataclasses module introduced in python 3.7, are not loaded and run by the python 3.5 tests. To do so I just extended the regexes a bit. Let me know if you think this should be in a separate PR. |
|
In this particular case, can you just guard the test from running with a runtime check of sys.version, rather than the boilerplate to exclude tests in the testing scripts? (The py3 stuff was needed for exercising new syntax). After that, could you squash to more logical commits, and then I think it's ready to merge. Thanks again for doing this! |
|
Thanks you for your persistent work on the PR @chadrik. Will be great to merge this! :) |
I can't do a runtime check because this is protecting against a syntax change, introduced in python 3.6 (variable annotations, PEP-526) so the py35 tests fail with a syntax error during load. I figured this solution was simpler (though uglier) than trying to lobby the mailing to drop python 3.5 support :) |
|
Oh, I missed that new syntax...
I would be a fan of dropping 3.5 support, it's security-fix-only,
source-release-only by this point, and might be easier to pull now than
after we have a bunch of Python 3 users. However, I agree that's bigger
than this PR, so it's fine as is.
…On Wed, Sep 11, 2019 at 6:40 PM Chad Dombrova ***@***.***> wrote:
In this particular case, can you just guard the test from running with a
runtime check of sys.version, rather than the boilerplate to exclude tests
in the testing scripts? (The py3 stuff was needed for exercising new
syntax).
I can't do a runtime check because this *is* a syntax change, introduced
in python 3.6 (variable annotations, PEP-526) so the py35 tests fail with a
syntax error during load. I figured this solution was simpler (though
uglier) than trying to lobby the mailing to drop python 3.5 support :)
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#9098?email_source=notifications&email_token=AADWVAJTTB2E7MX5LNKQNCLQJGMXJA5CNFSM4IEXTNYKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD6QMFFI#issuecomment-530629269>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AADWVAI6GRRNYWJ73VREEXTQJGMXJANCNFSM4IEXTNYA>
.
|
Standardize and reduce boilerplate
previously was manually handling conversion from byte[] to String
2f90bc7 to
0ae5582
Compare
Done. |
|
I will revert this, postcommit tests are failing: https://issues.apache.org/jira/browse/BEAM-8229 |
|
Filed an issues for having py 3.x tests as part of precommits: https://issues.apache.org/jira/browse/BEAM-8230 This seems like a simple issue, feel free to fix forward if that can happen quicker than a rollback. |
|
There's a misunderstanding here. Pre-commit does run the python 3.x tests, but it runs many of the tests on python 3.5. This PR introduced tests that can only run on python 3.6 or higher, and I had to do some extra work to the beam test framework to make that possible: 0c31f7c I appears that same work to properly exclude/include tests at the major + minor version needs to be done for post-commit tests. I didn't do that simply because I didn't know it could be a problem. I assumed everything went through tox. |
@chadrik I agree this seems be the issue. |
|
Sorry about that. This PR has a lot going on in it. Chad, could you pull out the bulk of this PR (minus the data classes, and associated test and test framework changes) and we could get that latter part in via a subsequent PR? |
As a python developer I'd like to have a more opinionated API for implementing external transforms so that A) it can guide me toward more standardized solutions and B) reduce the amount of boilerplate code that I need to write.
Improvements:
__init__args of every external transformConfigValuewith encoded valueExternalTransformfor the user based on the configuration dictionary that they providePost-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.