From 551faa11546e64b425e3d408988672c453a16e99 Mon Sep 17 00:00:00 2001 From: Chad Dombrova Date: Wed, 17 Jul 2019 00:48:15 -0700 Subject: [PATCH 1/7] Create a more user friendly external transform API Standardize and reduce boilerplate --- .../io/external/generate_sequence.py | 62 ++---- sdks/python/apache_beam/io/external/kafka.py | 168 ++++++---------- .../python/apache_beam/transforms/external.py | 189 +++++++++++++++++- .../apache_beam/transforms/external_test.py | 160 ++++++++++++++- .../transforms/external_test_py3.py | 135 +++++++++++++ 5 files changed, 559 insertions(+), 155 deletions(-) create mode 100644 sdks/python/apache_beam/transforms/external_test_py3.py diff --git a/sdks/python/apache_beam/io/external/generate_sequence.py b/sdks/python/apache_beam/io/external/generate_sequence.py index 0e0b1fd58b70..a17ec7b68228 100644 --- a/sdks/python/apache_beam/io/external/generate_sequence.py +++ b/sdks/python/apache_beam/io/external/generate_sequence.py @@ -17,15 +17,11 @@ from __future__ import absolute_import -from apache_beam import ExternalTransform -from apache_beam import pvalue -from apache_beam.coders import VarIntCoder -from apache_beam.portability.api.external_transforms_pb2 import ConfigValue -from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload -from apache_beam.transforms import ptransform +from apache_beam.transforms.external import ExternalTransform +from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder -class GenerateSequence(ptransform.PTransform): +class GenerateSequence(ExternalTransform): """ An external PTransform which provides a bounded or unbounded stream of integers. @@ -49,47 +45,19 @@ class GenerateSequence(ptransform.PTransform): Experimental; no backwards compatibility guarantees. """ - URN = 'beam:external:java:generate_sequence:v1' def __init__(self, start, stop=None, elements_per_period=None, max_read_time=None, - expansion_service='localhost:8097'): - super(GenerateSequence, self).__init__() - self.start = start - self.stop = stop - self.elements_per_period = elements_per_period - self.max_read_time = max_read_time - self.expansion_service = expansion_service - - def expand(self, pbegin): - if not isinstance(pbegin, pvalue.PBegin): - raise Exception("GenerateSequence must be a root transform") - - coder = VarIntCoder() - coder_urn = ['beam:coder:varint:v1'] - args = { - 'start': - ConfigValue( - coder_urn=coder_urn, - payload=coder.encode(self.start)) - } - if self.stop: - args['stop'] = ConfigValue( - coder_urn=coder_urn, - payload=coder.encode(self.stop)) - if self.elements_per_period: - args['elements_per_period'] = ConfigValue( - coder_urn=coder_urn, - payload=coder.encode(self.elements_per_period)) - if self.max_read_time: - args['max_read_time'] = ConfigValue( - coder_urn=coder_urn, - payload=coder.encode(self.max_read_time)) - - payload = ExternalConfigurationPayload(configuration=args) - return pbegin.apply( - ExternalTransform( - self.URN, - payload.SerializeToString(), - self.expansion_service)) + expansion_service=None): + super(GenerateSequence, self).__init__( + self.URN, + ImplicitSchemaPayloadBuilder( + { + 'start': start, + 'stop': stop, + 'elements_per_period': elements_per_period, + 'max_read_time': max_read_time, + } + ), + expansion_service) diff --git a/sdks/python/apache_beam/io/external/kafka.py b/sdks/python/apache_beam/io/external/kafka.py index ed24c00c61b0..f824515b1aa4 100644 --- a/sdks/python/apache_beam/io/external/kafka.py +++ b/sdks/python/apache_beam/io/external/kafka.py @@ -37,18 +37,25 @@ from __future__ import absolute_import -from apache_beam import ExternalTransform -from apache_beam import pvalue -from apache_beam.coders import BytesCoder -from apache_beam.coders import IterableCoder -from apache_beam.coders import TupleCoder -from apache_beam.coders.coders import LengthPrefixCoder -from apache_beam.portability.api.external_transforms_pb2 import ConfigValue -from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload -from apache_beam.transforms import ptransform - - -class ReadFromKafka(ptransform.PTransform): +import typing + +from past.builtins import unicode + +from apache_beam.transforms.external import ExternalTransform +from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder + +ReadFromKafkaSchema = typing.NamedTuple( + 'ReadFromKafkaSchema', + [ + ('consumer_config', typing.List[typing.Tuple[unicode, unicode]]), + ('topics', typing.List[unicode]), + ('key_deserializer', unicode), + ('value_deserializer', unicode), + ] +) + + +class ReadFromKafka(ExternalTransform): """ An external PTransform which reads from Kafka and returns a KV pair for each item in the specified Kafka topics. If no Kafka Deserializer for @@ -64,11 +71,13 @@ class ReadFromKafka(ptransform.PTransform): byte_array_deserializer = 'org.apache.kafka.common.serialization.' \ 'ByteArrayDeserializer' + URN = 'beam:external:java:kafka:read:v1' + def __init__(self, consumer_config, topics, key_deserializer=byte_array_deserializer, value_deserializer=byte_array_deserializer, - expansion_service='localhost:8097'): + expansion_service=None): """ Initializes a read operation from Kafka. @@ -88,38 +97,32 @@ def __init__(self, consumer_config, serialization.ByteArrayDeserializer'. :param expansion_service: The address (host:port) of the ExpansionService. """ - super(ReadFromKafka, self).__init__() - self._urn = 'beam:external:java:kafka:read:v1' - self.consumer_config = consumer_config - self.topics = topics - self.key_deserializer = key_deserializer - self.value_deserializer = value_deserializer - self.expansion_service = expansion_service - - def expand(self, pbegin): - if not isinstance(pbegin, pvalue.PBegin): - raise Exception("ReadFromKafka must be a root transform") - - args = { - 'consumer_config': - _encode_map(self.consumer_config), - 'topics': - _encode_list(self.topics), - 'key_deserializer': - _encode_str(self.key_deserializer), - 'value_deserializer': - _encode_str(self.value_deserializer), - } - - payload = ExternalConfigurationPayload(configuration=args) - return pbegin.apply( - ExternalTransform( - self._urn, - payload.SerializeToString(), - self.expansion_service)) - - -class WriteToKafka(ptransform.PTransform): + super(ReadFromKafka, self).__init__( + self.URN, + NamedTupleBasedPayloadBuilder( + ReadFromKafkaSchema( + consumer_config=list(consumer_config.items()), + topics=topics, + key_deserializer=key_deserializer, + value_deserializer=value_deserializer, + ) + ), + expansion_service + ) + + +WriteToKafkaSchema = typing.NamedTuple( + 'WriteToKafkaSchema', + [ + ('producer_config', typing.List[typing.Tuple[unicode, unicode]]), + ('topic', unicode), + ('key_serializer', unicode), + ('value_serializer', unicode), + ] +) + + +class WriteToKafka(ExternalTransform): """ An external PTransform which writes KV data to a specified Kafka topic. If no Kafka Serializer for key/value is provided, then key/value are @@ -132,11 +135,13 @@ class WriteToKafka(ptransform.PTransform): byte_array_serializer = 'org.apache.kafka.common.serialization.' \ 'ByteArraySerializer' + URN = 'beam:external:java:kafka:write:v1' + def __init__(self, producer_config, topic, key_serializer=byte_array_serializer, value_serializer=byte_array_serializer, - expansion_service='localhost:8097'): + expansion_service=None): """ Initializes a write operation to Kafka. @@ -156,62 +161,15 @@ def __init__(self, producer_config, serialization.ByteArraySerializer'. :param expansion_service: The address (host:port) of the ExpansionService. """ - super(WriteToKafka, self).__init__() - self._urn = 'beam:external:java:kafka:write:v1' - self.producer_config = producer_config - self.topic = topic - self.key_serializer = key_serializer - self.value_serializer = value_serializer - self.expansion_service = expansion_service - - def expand(self, pvalue): - args = { - 'producer_config': - _encode_map(self.producer_config), - 'topic': - _encode_str(self.topic), - 'key_serializer': - _encode_str(self.key_serializer), - 'value_serializer': - _encode_str(self.value_serializer), - } - - payload = ExternalConfigurationPayload(configuration=args) - return pvalue.apply( - ExternalTransform( - self._urn, - payload.SerializeToString(), - self.expansion_service)) - - -def _encode_map(dict_obj): - kv_list = [(key.encode('utf-8'), val.encode('utf-8')) - for key, val in dict_obj.items()] - coder = IterableCoder(TupleCoder( - [LengthPrefixCoder(BytesCoder()), LengthPrefixCoder(BytesCoder())])) - coder_urns = ['beam:coder:iterable:v1', - 'beam:coder:kv:v1', - 'beam:coder:bytes:v1', - 'beam:coder:bytes:v1'] - return ConfigValue( - coder_urn=coder_urns, - payload=coder.encode(kv_list)) - - -def _encode_list(list_obj): - encoded_list = [val.encode('utf-8') for val in list_obj] - coder = IterableCoder(LengthPrefixCoder(BytesCoder())) - coder_urns = ['beam:coder:iterable:v1', - 'beam:coder:bytes:v1'] - return ConfigValue( - coder_urn=coder_urns, - payload=coder.encode(encoded_list)) - - -def _encode_str(str_obj): - encoded_str = str_obj.encode('utf-8') - coder = LengthPrefixCoder(BytesCoder()) - coder_urns = ['beam:coder:bytes:v1'] - return ConfigValue( - coder_urn=coder_urns, - payload=coder.encode(encoded_str)) + super(WriteToKafka, self).__init__( + self.URN, + NamedTupleBasedPayloadBuilder( + WriteToKafkaSchema( + producer_config=list(producer_config.items()), + topic=topic, + key_serializer=key_serializer, + value_serializer=value_serializer, + ) + ), + expansion_service + ) diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index a7e9fb504be9..e0f8d4cc46bb 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -27,11 +27,18 @@ import threading from apache_beam import pvalue +from apache_beam.coders import registry from apache_beam.portability import common_urns from apache_beam.portability.api import beam_expansion_api_pb2 from apache_beam.portability.api import beam_runner_api_pb2 +from apache_beam.portability.api.external_transforms_pb2 import ConfigValue +from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload from apache_beam.runners import pipeline_context from apache_beam.transforms import ptransform +from apache_beam.typehints.native_type_compatibility import convert_to_beam_type +from apache_beam.typehints.trivial_inference import instance_to_type +from apache_beam.typehints.typehints import Union +from apache_beam.typehints.typehints import UnionConstraint # Protect against environments where grpc is not available. # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports @@ -42,6 +49,170 @@ grpc = None # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports +DEFAULT_EXPANSION_SERVICE = 'localhost:8097' + + +def _is_optional_or_none(typehint): + return (type(None) in typehint.union_types + if isinstance(typehint, UnionConstraint) else typehint is type(None)) + + +def _strip_optional(typehint): + if not _is_optional_or_none(typehint): + return typehint + new_types = typehint.union_types.difference({type(None)}) + if len(new_types) == 1: + return list(new_types)[0] + return Union[new_types] + + +def iter_urns(coder, context=None): + yield coder.to_runner_api_parameter(context)[0] + for child in coder._get_component_coders(): + for urn in iter_urns(child, context): + yield urn + + +class PayloadBuilder(object): + """ + Abstract base class for building payloads to pass to ExternalTransform. + """ + + @classmethod + def _config_value(cls, obj, typehint): + """ + Helper to create a ConfigValue with an encoded value. + """ + coder = registry.get_coder(typehint) + urns = list(iter_urns(coder)) + if 'beam:coder:pickled_python:v1' in urns: + raise RuntimeError("Found non-portable coder for %s" % (typehint,)) + return ConfigValue( + coder_urn=urns, + payload=coder.encode(obj)) + + def build(self): + """ + :return: ExternalConfigurationPayload + """ + raise NotImplementedError + + def payload(self): + """ + The serialized ExternalConfigurationPayload + + :return: bytes + """ + return self.build().SerializeToString() + + +class SchemaBasedPayloadBuilder(PayloadBuilder): + """ + Base class for building payloads based on a schema that provides + type information for each configuration value to encode. + + Note that if the schema defines a type as Optional, the corresponding value + will be omitted from the encoded payload, and thus the native transform + will determine the default. + """ + + def __init__(self, values, schema): + """ + :param values: mapping of config names to values + :param schema: mapping of config names to types + """ + self._values = values + self._schema = schema + + @classmethod + def _encode_config(cls, values, schema): + result = {} + for key, value in values.items(): + + try: + typehint = schema[key] + except KeyError: + raise RuntimeError("No typehint provided for key %r" % key) + + typehint = convert_to_beam_type(typehint) + + if value is None: + if not _is_optional_or_none(typehint): + raise RuntimeError("If value is None, typehint should be " + "optional. Got %r" % typehint) + # make it easy for user to filter None by default + continue + else: + # strip Optional from typehint so that pickled_python coder is not used + # for known types. + typehint = _strip_optional(typehint) + result[key] = cls._config_value(value, typehint) + return result + + def build(self): + """ + :return: ExternalConfigurationPayload + """ + args = self._encode_config(self._values, self._schema) + return ExternalConfigurationPayload(configuration=args) + + +class ImplicitSchemaPayloadBuilder(SchemaBasedPayloadBuilder): + """ + Build a payload that generates a schema from the provided values. + """ + def __init__(self, values): + schema = {key: instance_to_type(value) for key, value in values.items()} + super(ImplicitSchemaPayloadBuilder, self).__init__(values, schema) + + +class NamedTupleBasedPayloadBuilder(SchemaBasedPayloadBuilder): + """ + Build a payload based on a NamedTuple schema. + """ + def __init__(self, tuple_instance): + """ + :param tuple_instance: an instance of a typing.NamedTuple + """ + super(NamedTupleBasedPayloadBuilder, self).__init__( + values=tuple_instance._asdict(), schema=tuple_instance._field_types) + + +class AnnotationBasedPayloadBuilder(SchemaBasedPayloadBuilder): + """ + Build a payload based on an external transform's type annotations. + + Supported in python 3 only. + """ + def __init__(self, transform, **values): + """ + :param transform: a PTransform instance or class. type annotations will + be gathered from its __init__ method + :param values: values to encode + """ + schema = {k: v for k, v in + transform.__init__.__annotations__.items() + if k in values} + super(AnnotationBasedPayloadBuilder, self).__init__(values, schema) + + +class DataclassBasedPayloadBuilder(SchemaBasedPayloadBuilder): + """ + Build a payload based on an external transform that uses dataclasses. + + Supported in python 3 only. + """ + def __init__(self, transform): + """ + :param transform: a dataclass-decorated PTransform instance from which to + gather type annotations and values + """ + import dataclasses + schema = {field.name: field.type for field in + dataclasses.fields(transform)} + super(DataclassBasedPayloadBuilder, self).__init__( + dataclasses.asdict(transform), schema) + class ExternalTransform(ptransform.PTransform): """ @@ -56,15 +227,29 @@ class ExternalTransform(ptransform.PTransform): _EXPANDED_TRANSFORM_UNIQUE_NAME = 'root' _IMPULSE_PREFIX = 'impulse' - def __init__(self, urn, payload, endpoint): + def __init__(self, urn, payload, endpoint=None): + endpoint = endpoint or DEFAULT_EXPANSION_SERVICE if grpc is None and isinstance(endpoint, str): raise NotImplementedError('Grpc required for external transforms.') # TODO: Start an endpoint given an environment? self._urn = urn - self._payload = payload + self._payload = payload.payload() \ + if isinstance(payload, PayloadBuilder) \ + else payload self._endpoint = endpoint self._namespace = self._fresh_namespace() + def __post_init__(self, expansion_service): + """ + This will only be invoked if ExternalTransform is used as a base class + for a class decorated with dataclasses.dataclass + """ + ExternalTransform.__init__( + self, + self.URN, + DataclassBasedPayloadBuilder(self), + expansion_service) + def default_label(self): return '%s(%s)' % (self.__class__.__name__, self._urn) diff --git a/sdks/python/apache_beam/transforms/external_test.py b/sdks/python/apache_beam/transforms/external_test.py index e480f622e4c2..b0e7334e8a37 100644 --- a/sdks/python/apache_beam/transforms/external_test.py +++ b/sdks/python/apache_beam/transforms/external_test.py @@ -15,7 +15,7 @@ # limitations under the License. # -"""Unit tests for the transform.util classes.""" +"""Unit tests for the transform.external classes.""" from __future__ import absolute_import @@ -23,6 +23,7 @@ import os import subprocess import sys +import typing import unittest import grpc @@ -32,12 +33,21 @@ import apache_beam as beam from apache_beam import Pipeline +from apache_beam.coders import FloatCoder +from apache_beam.coders import IterableCoder +from apache_beam.coders import StrUtf8Coder +from apache_beam.coders import TupleCoder +from apache_beam.coders import VarIntCoder from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.portability.api.external_transforms_pb2 import ConfigValue +from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload from apache_beam.runners.portability import expansion_service from apache_beam.runners.portability.expansion_service_test import FibTransform from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder +from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder # Protect against environments where apitools library is not available. # pylint: disable=wrong-import-order, wrong-import-position @@ -48,6 +58,154 @@ # pylint: enable=wrong-import-order, wrong-import-position +def get_payload(args): + return ExternalConfigurationPayload(configuration=args) + + +class PayloadBase(object): + values = { + 'integer_example': 1, + 'string_example': u'thing', + 'list_of_strings': [u'foo', u'bar'], + 'optional_kv': (u'key', 1.1), + 'optional_integer': None, + } + + bytes_values = { + 'integer_example': 1, + 'string_example': 'thing', + 'list_of_strings': ['foo', 'bar'], + 'optional_kv': ('key', 1.1), + 'optional_integer': None, + } + + args = { + 'integer_example': ConfigValue( + coder_urn=['beam:coder:varint:v1'], + payload=VarIntCoder().encode(values['integer_example'])), + 'string_example': ConfigValue( + coder_urn=['beam:coder:string_utf8:v1'], + payload=StrUtf8Coder().encode(values['string_example'])), + 'list_of_strings': ConfigValue( + coder_urn=['beam:coder:iterable:v1', + 'beam:coder:string_utf8:v1'], + payload=IterableCoder(StrUtf8Coder()) + .encode(values['list_of_strings'])), + 'optional_kv': ConfigValue( + coder_urn=['beam:coder:kv:v1', + 'beam:coder:string_utf8:v1', + 'beam:coder:double:v1'], + payload=TupleCoder([StrUtf8Coder(), FloatCoder()]) + .get_impl().encode_nested(values['optional_kv'])), + } + + def get_payload_from_typing_hints(self, values): + """Return ExternalConfigurationPayload based on python typing hints""" + raise NotImplementedError + + def get_payload_from_beam_typehints(self, values): + """Return ExternalConfigurationPayload based on beam typehints""" + raise NotImplementedError + + def test_typing_payload_builder(self): + result = self.get_payload_from_typing_hints(self.values) + expected = get_payload(self.args) + self.assertEqual(result, expected) + + def test_typing_payload_builder_with_bytes(self): + """ + string_utf8 coder will be used even if values are not unicode in python 2.x + """ + result = self.get_payload_from_typing_hints(self.bytes_values) + expected = get_payload(self.args) + self.assertEqual(result, expected) + + def test_typehints_payload_builder(self): + result = self.get_payload_from_beam_typehints(self.values) + expected = get_payload(self.args) + self.assertEqual(result, expected) + + def test_typehints_payload_builder_with_bytes(self): + """ + string_utf8 coder will be used even if values are not unicode in python 2.x + """ + result = self.get_payload_from_beam_typehints(self.bytes_values) + expected = get_payload(self.args) + self.assertEqual(result, expected) + + def test_optional_error(self): + """ + value can only be None if typehint is Optional + """ + with self.assertRaises(RuntimeError): + self.get_payload_from_typing_hints({k: None for k in self.values}) + + +class ExternalTuplePayloadTest(PayloadBase, unittest.TestCase): + + def get_payload_from_typing_hints(self, values): + TestSchema = typing.NamedTuple( + 'TestSchema', + [ + ('integer_example', int), + ('string_example', unicode), + ('list_of_strings', typing.List[unicode]), + ('optional_kv', typing.Optional[typing.Tuple[unicode, float]]), + ('optional_integer', typing.Optional[int]), + ] + ) + + builder = NamedTupleBasedPayloadBuilder(TestSchema(**values)) + return builder.build() + + def get_payload_from_beam_typehints(self, values): + raise unittest.SkipTest("Beam typehints cannot be used with " + "typing.NamedTuple") + + +class ExternalImplicitPayloadTest(unittest.TestCase): + """ + ImplicitSchemaPayloadBuilder works very differently than the other payload + builders + """ + def test_implicit_payload_builder(self): + builder = ImplicitSchemaPayloadBuilder(PayloadBase.values) + result = builder.build() + expected = get_payload(PayloadBase.args) + self.assertEqual(result, expected) + + def test_implicit_payload_builder_with_bytes(self): + values = PayloadBase.bytes_values + builder = ImplicitSchemaPayloadBuilder(values) + result = builder.build() + if sys.version_info[0] < 3: + # in python 2.x bytes coder will be inferred + args = { + 'integer_example': ConfigValue( + coder_urn=['beam:coder:varint:v1'], + payload=VarIntCoder().encode(values['integer_example'])), + 'string_example': ConfigValue( + coder_urn=['beam:coder:bytes:v1'], + payload=StrUtf8Coder().encode(values['string_example'])), + 'list_of_strings': ConfigValue( + coder_urn=['beam:coder:iterable:v1', + 'beam:coder:bytes:v1'], + payload=IterableCoder(StrUtf8Coder()) + .encode(values['list_of_strings'])), + 'optional_kv': ConfigValue( + coder_urn=['beam:coder:kv:v1', + 'beam:coder:bytes:v1', + 'beam:coder:double:v1'], + payload=TupleCoder([StrUtf8Coder(), FloatCoder()]) + .encode(values['optional_kv'])), + } + expected = get_payload(args) + self.assertEqual(result, expected) + else: + expected = get_payload(PayloadBase.args) + self.assertEqual(result, expected) + + @attr('UsesCrossLanguageTransforms') class ExternalTransformTest(unittest.TestCase): diff --git a/sdks/python/apache_beam/transforms/external_test_py3.py b/sdks/python/apache_beam/transforms/external_test_py3.py new file mode 100644 index 000000000000..26548e5e88e6 --- /dev/null +++ b/sdks/python/apache_beam/transforms/external_test_py3.py @@ -0,0 +1,135 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for the transform.external classes.""" + +from __future__ import absolute_import + +import typing +import unittest + +import apache_beam as beam +from apache_beam import typehints +from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload +from apache_beam.transforms.external import AnnotationBasedPayloadBuilder +from apache_beam.transforms.external import DataclassBasedPayloadBuilder +from apache_beam.transforms.external_test import PayloadBase + +# pylint: disable=wrong-import-order, wrong-import-position +try: + import dataclasses +except ImportError: + dataclasses = None +# pylint: enable=wrong-import-order, wrong-import-position + + +def get_payload(cls): + payload = ExternalConfigurationPayload() + payload.ParseFromString(cls._payload) + return payload + + +@unittest.skipIf(dataclasses is None, 'dataclasses library not available') +class ExternalDataclassesPayloadTest(PayloadBase, unittest.TestCase): + + def get_payload_from_typing_hints(self, values): + + @dataclasses.dataclass + class DataclassTransform(beam.ExternalTransform): + URN = 'beam:external:fakeurn:v1' + + integer_example: int + string_example: str + list_of_strings: typing.List[str] + optional_kv: typing.Optional[typing.Tuple[str, float]] = None + optional_integer: typing.Optional[int] = None + expansion_service: dataclasses.InitVar[typing.Optional[str]] = None + + return get_payload(DataclassTransform(**values)) + + def get_payload_from_beam_typehints(self, values): + + @dataclasses.dataclass + class DataclassTransform(beam.ExternalTransform): + URN = 'beam:external:fakeurn:v1' + + integer_example: int + string_example: str + list_of_strings: typehints.List[str] + optional_kv: typehints.Optional[typehints.KV[str, float]] = None + optional_integer: typehints.Optional[int] = None + expansion_service: dataclasses.InitVar[typehints.Optional[str]] = None + + return get_payload(DataclassTransform(**values)) + + +class ExternalAnnotationPayloadTest(PayloadBase, unittest.TestCase): + + def get_payload_from_typing_hints(self, values): + class AnnotatedTransform(beam.ExternalTransform): + URN = 'beam:external:fakeurn:v1' + + def __init__(self, + integer_example: int, + string_example: str, + list_of_strings: typing.List[str], + optional_kv: typing.Optional[typing.Tuple[str, float]] = None, + optional_integer: typing.Optional[int] = None, + expansion_service=None): + super(AnnotatedTransform, self).__init__( + self.URN, + AnnotationBasedPayloadBuilder( + self, + integer_example=integer_example, + string_example=string_example, + list_of_strings=list_of_strings, + optional_kv=optional_kv, + optional_integer=optional_integer, + ), + expansion_service + ) + + return get_payload(AnnotatedTransform(**values)) + + def get_payload_from_beam_typehints(self, values): + class AnnotatedTransform(beam.ExternalTransform): + URN = 'beam:external:fakeurn:v1' + + def __init__(self, + integer_example: int, + string_example: str, + list_of_strings: typehints.List[str], + optional_kv: typehints.Optional[typehints.KV[str, float]] = None, + optional_integer: typehints.Optional[int] = None, + expansion_service=None): + super(AnnotatedTransform, self).__init__( + self.URN, + AnnotationBasedPayloadBuilder( + self, + integer_example=integer_example, + string_example=string_example, + list_of_strings=list_of_strings, + optional_kv=optional_kv, + optional_integer=optional_integer, + ), + expansion_service + ) + + return get_payload(AnnotatedTransform(**values)) + +if __name__ == '__main__': + unittest.main() From 356f470e9f64518f9d0b9fe530a30cb99ec2e93b Mon Sep 17 00:00:00 2001 From: Chad Dombrova Date: Thu, 5 Sep 2019 23:46:14 -0700 Subject: [PATCH 2/7] KafkaIO: use StringUtf8Coder in Java to match python previously was manually handling conversion from byte[] to String --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 68 ++++++++----------- .../sdk/io/kafka/KafkaIOExternalTest.java | 47 ++++++------- 2 files changed, 47 insertions(+), 68 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 373d4b894d33..ca74ecb6c84c 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -65,7 +65,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; @@ -402,26 +401,22 @@ abstract Builder setTimestampPolicyFactory( public PTransform>> buildExternal( External.Configuration config) { ImmutableList.Builder listBuilder = ImmutableList.builder(); - for (byte[] topic : config.topics) { - listBuilder.add(utf8String(topic)); + for (String topic : config.topics) { + listBuilder.add(topic); } setTopics(listBuilder.build()); - String keyDeserializerClassName = utf8String(config.keyDeserializer); - Class keyDeserializer = resolveClass(keyDeserializerClassName); + Class keyDeserializer = resolveClass(config.keyDeserializer); setKeyDeserializer(keyDeserializer); setKeyCoder(resolveCoder(keyDeserializer)); - String valueDeserializerClassName = utf8String(config.valueDeserializer); - Class valueDeserializer = resolveClass(valueDeserializerClassName); + Class valueDeserializer = resolveClass(config.valueDeserializer); setValueDeserializer(valueDeserializer); setValueCoder(resolveCoder(valueDeserializer)); Map consumerConfig = new HashMap<>(); - for (KV kv : config.consumerConfig) { - String key = utf8String(kv.getKey()); - String value = utf8String(kv.getValue()); - consumerConfig.put(key, value); + for (KV kv : config.consumerConfig) { + consumerConfig.put(kv.getKey(), kv.getValue()); } // Key and Value Deserializers always have to be in the config. consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName()); @@ -480,24 +475,24 @@ public Map> knownBuilders() { public static class Configuration { // All byte arrays are UTF-8 encoded strings - private Iterable> consumerConfig; - private Iterable topics; - private byte[] keyDeserializer; - private byte[] valueDeserializer; + private Iterable> consumerConfig; + private Iterable topics; + private String keyDeserializer; + private String valueDeserializer; - public void setConsumerConfig(Iterable> consumerConfig) { + public void setConsumerConfig(Iterable> consumerConfig) { this.consumerConfig = consumerConfig; } - public void setTopics(Iterable topics) { + public void setTopics(Iterable topics) { this.topics = topics; } - public void setKeyDeserializer(byte[] keyDeserializer) { + public void setKeyDeserializer(String keyDeserializer) { this.keyDeserializer = keyDeserializer; } - public void setValueDeserializer(byte[] valueDeserializer) { + public void setValueDeserializer(String valueDeserializer) { this.valueDeserializer = valueDeserializer; } } @@ -1365,24 +1360,21 @@ abstract static class Builder @Override public PTransform>, PDone> buildExternal( External.Configuration configuration) { - String topic = utf8String(configuration.topic); - setTopic(topic); + setTopic(configuration.topic); Map producerConfig = new HashMap<>(); - for (KV kv : configuration.producerConfig) { - String key = utf8String(kv.getKey()); - String value = utf8String(kv.getValue()); - producerConfig.put(key, value); + for (KV kv : configuration.producerConfig) { + producerConfig.put(kv.getKey(), kv.getValue()); } - Class keySerializer = resolveClass(utf8String(configuration.keySerializer)); - Class valSerializer = resolveClass(utf8String(configuration.valueSerializer)); + Class keySerializer = resolveClass(configuration.keySerializer); + Class valSerializer = resolveClass(configuration.valueSerializer); WriteRecords writeRecords = KafkaIO.writeRecords() .withProducerConfigUpdates(producerConfig) .withKeySerializer(keySerializer) .withValueSerializer(valSerializer) - .withTopic(topic); + .withTopic(configuration.topic); setWriteRecordsTransform(writeRecords); return build(); @@ -1405,24 +1397,24 @@ public Map> knownBuilders() { public static class Configuration { // All byte arrays are UTF-8 encoded strings - private Iterable> producerConfig; - private byte[] topic; - private byte[] keySerializer; - private byte[] valueSerializer; + private Iterable> producerConfig; + private String topic; + private String keySerializer; + private String valueSerializer; - public void setProducerConfig(Iterable> producerConfig) { + public void setProducerConfig(Iterable> producerConfig) { this.producerConfig = producerConfig; } - public void setTopic(byte[] topic) { + public void setTopic(String topic) { this.topic = topic; } - public void setKeySerializer(byte[] keySerializer) { + public void setKeySerializer(String keySerializer) { this.keySerializer = keySerializer; } - public void setValueSerializer(byte[] valueSerializer) { + public void setValueSerializer(String valueSerializer) { this.valueSerializer = valueSerializer; } } @@ -1691,10 +1683,6 @@ static NullableCoder inferCoder( String.format("Could not extract the Kafka Deserializer type from %s", deserializer)); } - private static String utf8String(byte[] bytes) { - return new String(bytes, Charsets.UTF_8); - } - private static Class resolveClass(String className) { try { return Class.forName(className); diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java index 83f2cc7586d5..a7b7f8a505f5 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java @@ -32,17 +32,15 @@ import org.apache.beam.runners.core.construction.ReadTranslation; import org.apache.beam.runners.core.construction.expansion.ExpansionService; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; @@ -76,7 +74,7 @@ public void testConstructKafkaRead() throws Exception { "topics", ExternalTransforms.ConfigValue.newBuilder() .addCoderUrn("beam:coder:iterable:v1") - .addCoderUrn("beam:coder:bytes:v1") + .addCoderUrn("beam:coder:string_utf8:v1") .setPayload(ByteString.copyFrom(listAsBytes(topics))) .build()) .putConfiguration( @@ -84,20 +82,20 @@ public void testConstructKafkaRead() throws Exception { ExternalTransforms.ConfigValue.newBuilder() .addCoderUrn("beam:coder:iterable:v1") .addCoderUrn("beam:coder:kv:v1") - .addCoderUrn("beam:coder:bytes:v1") - .addCoderUrn("beam:coder:bytes:v1") + .addCoderUrn("beam:coder:string_utf8:v1") + .addCoderUrn("beam:coder:string_utf8:v1") .setPayload(ByteString.copyFrom(mapAsBytes(consumerConfig))) .build()) .putConfiguration( "key_deserializer", ExternalTransforms.ConfigValue.newBuilder() - .addCoderUrn("beam:coder:bytes:v1") + .addCoderUrn("beam:coder:string_utf8:v1") .setPayload(ByteString.copyFrom(encodeString(keyDeserializer))) .build()) .putConfiguration( "value_deserializer", ExternalTransforms.ConfigValue.newBuilder() - .addCoderUrn("beam:coder:bytes:v1") + .addCoderUrn("beam:coder:string_utf8:v1") .setPayload(ByteString.copyFrom(encodeString(valueDeserializer))) .build()) .build(); @@ -161,7 +159,7 @@ public void testConstructKafkaWrite() throws Exception { .putConfiguration( "topic", ExternalTransforms.ConfigValue.newBuilder() - .addCoderUrn("beam:coder:bytes:v1") + .addCoderUrn("beam:coder:string_utf8:v1") .setPayload(ByteString.copyFrom(encodeString(topic))) .build()) .putConfiguration( @@ -169,20 +167,20 @@ public void testConstructKafkaWrite() throws Exception { ExternalTransforms.ConfigValue.newBuilder() .addCoderUrn("beam:coder:iterable:v1") .addCoderUrn("beam:coder:kv:v1") - .addCoderUrn("beam:coder:bytes:v1") - .addCoderUrn("beam:coder:bytes:v1") + .addCoderUrn("beam:coder:string_utf8:v1") + .addCoderUrn("beam:coder:string_utf8:v1") .setPayload(ByteString.copyFrom(mapAsBytes(producerConfig))) .build()) .putConfiguration( "key_serializer", ExternalTransforms.ConfigValue.newBuilder() - .addCoderUrn("beam:coder:bytes:v1") + .addCoderUrn("beam:coder:string_utf8:v1") .setPayload(ByteString.copyFrom(encodeString(keySerializer))) .build()) .putConfiguration( "value_serializer", ExternalTransforms.ConfigValue.newBuilder() - .addCoderUrn("beam:coder:bytes:v1") + .addCoderUrn("beam:coder:string_utf8:v1") .setPayload(ByteString.copyFrom(encodeString(valueSerializer))) .build()) .build(); @@ -248,37 +246,30 @@ public void testConstructKafkaWrite() throws Exception { } private static byte[] listAsBytes(List stringList) throws IOException { - IterableCoder coder = IterableCoder.of(ByteArrayCoder.of()); - List bytesList = - stringList.stream().map(KafkaIOExternalTest::utf8Bytes).collect(Collectors.toList()); + IterableCoder coder = IterableCoder.of(StringUtf8Coder.of()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - coder.encode(bytesList, baos); + coder.encode(stringList, baos); return baos.toByteArray(); } private static byte[] mapAsBytes(Map stringMap) throws IOException { - IterableCoder> coder = - IterableCoder.of(KvCoder.of(ByteArrayCoder.of(), ByteArrayCoder.of())); - List> bytesList = + IterableCoder> coder = + IterableCoder.of(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + List> stringList = stringMap.entrySet().stream() - .map(kv -> KV.of(utf8Bytes(kv.getKey()), utf8Bytes(kv.getValue()))) + .map(kv -> KV.of(kv.getKey(), kv.getValue())) .collect(Collectors.toList()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - coder.encode(bytesList, baos); + coder.encode(stringList, baos); return baos.toByteArray(); } private static byte[] encodeString(String str) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ByteArrayCoder.of().encode(utf8Bytes(str), baos); + StringUtf8Coder.of().encode(str, baos); return baos.toByteArray(); } - private static byte[] utf8Bytes(String str) { - Preconditions.checkNotNull(str, "String must not be null."); - return str.getBytes(Charsets.UTF_8); - } - private static class TestStreamObserver implements StreamObserver { private T result; From ef6a5dc4f899e6ae69b3982d0de69495c3d5c56d Mon Sep 17 00:00:00 2001 From: Chad Dombrova Date: Wed, 11 Sep 2019 23:24:09 -0700 Subject: [PATCH 3/7] Encode and decode using nested context --- .../expansion/ExpansionService.java | 3 ++- sdks/python/apache_beam/transforms/external.py | 2 +- .../apache_beam/transforms/external_test.py | 18 +++++++++++------- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java index 795298a9d775..a45f9e663752 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java @@ -183,7 +183,8 @@ static void populateConfiguration( config.getClass(), setterName, fieldName), e); } - method.invoke(config, coder.decode(entry.getValue().getPayload().newInput())); + method.invoke( + config, coder.decode(entry.getValue().getPayload().newInput(), Coder.Context.NESTED)); } } diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index e0f8d4cc46bb..fd79fcfd60a0 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -89,7 +89,7 @@ def _config_value(cls, obj, typehint): raise RuntimeError("Found non-portable coder for %s" % (typehint,)) return ConfigValue( coder_urn=urns, - payload=coder.encode(obj)) + payload=coder.get_impl().encode_nested(obj)) def build(self): """ diff --git a/sdks/python/apache_beam/transforms/external_test.py b/sdks/python/apache_beam/transforms/external_test.py index b0e7334e8a37..65764190a733 100644 --- a/sdks/python/apache_beam/transforms/external_test.py +++ b/sdks/python/apache_beam/transforms/external_test.py @@ -82,15 +82,17 @@ class PayloadBase(object): args = { 'integer_example': ConfigValue( coder_urn=['beam:coder:varint:v1'], - payload=VarIntCoder().encode(values['integer_example'])), + payload=VarIntCoder() + .get_impl().encode_nested(values['integer_example'])), 'string_example': ConfigValue( coder_urn=['beam:coder:string_utf8:v1'], - payload=StrUtf8Coder().encode(values['string_example'])), + payload=StrUtf8Coder() + .get_impl().encode_nested(values['string_example'])), 'list_of_strings': ConfigValue( coder_urn=['beam:coder:iterable:v1', 'beam:coder:string_utf8:v1'], payload=IterableCoder(StrUtf8Coder()) - .encode(values['list_of_strings'])), + .get_impl().encode_nested(values['list_of_strings'])), 'optional_kv': ConfigValue( coder_urn=['beam:coder:kv:v1', 'beam:coder:string_utf8:v1', @@ -183,21 +185,23 @@ def test_implicit_payload_builder_with_bytes(self): args = { 'integer_example': ConfigValue( coder_urn=['beam:coder:varint:v1'], - payload=VarIntCoder().encode(values['integer_example'])), + payload=VarIntCoder() + .get_impl().encode_nested(values['integer_example'])), 'string_example': ConfigValue( coder_urn=['beam:coder:bytes:v1'], - payload=StrUtf8Coder().encode(values['string_example'])), + payload=StrUtf8Coder() + .get_impl().encode_nested(values['string_example'])), 'list_of_strings': ConfigValue( coder_urn=['beam:coder:iterable:v1', 'beam:coder:bytes:v1'], payload=IterableCoder(StrUtf8Coder()) - .encode(values['list_of_strings'])), + .get_impl().encode_nested(values['list_of_strings'])), 'optional_kv': ConfigValue( coder_urn=['beam:coder:kv:v1', 'beam:coder:bytes:v1', 'beam:coder:double:v1'], payload=TupleCoder([StrUtf8Coder(), FloatCoder()]) - .encode(values['optional_kv'])), + .get_impl().encode_nested(values['optional_kv'])), } expected = get_payload(args) self.assertEqual(result, expected) From 932a696423e11ff559b80e72381a5131dbfffd72 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Fri, 6 Sep 2019 18:04:36 +0200 Subject: [PATCH 4/7] Add better error message for configuration type mismatch --- .../core/construction/expansion/ExpansionService.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java index a45f9e663752..ae1a3d724a20 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java @@ -179,8 +179,11 @@ static void populateConfiguration( } catch (NoSuchMethodException e) { throw new RuntimeException( String.format( - "The configuration class %s is missing a setter %s for %s", - config.getClass(), setterName, fieldName), + "The configuration class %s is missing a setter %s for %s with type %s", + config.getClass(), + setterName, + fieldName, + coder.getEncodedTypeDescriptor().getType().getTypeName()), e); } method.invoke( From 0c31f7c8b1c108f7df4cbedfaf9da0ce30092ce0 Mon Sep 17 00:00:00 2001 From: Chad Dombrova Date: Wed, 11 Sep 2019 11:14:50 -0700 Subject: [PATCH 5/7] Allow tests to specify a python 3 minor version, to isolate syntax changes in 3.6 --- sdks/python/scripts/generate_pydoc.sh | 2 +- sdks/python/scripts/run_mini_py3lint.sh | 21 ++++++++++++++++++++- sdks/python/scripts/run_pylint.sh | 2 +- sdks/python/tox.ini | 22 +++++++++++----------- 4 files changed, 33 insertions(+), 14 deletions(-) diff --git a/sdks/python/scripts/generate_pydoc.sh b/sdks/python/scripts/generate_pydoc.sh index d74dc62d3b8b..4a29b7acb96c 100755 --- a/sdks/python/scripts/generate_pydoc.sh +++ b/sdks/python/scripts/generate_pydoc.sh @@ -77,7 +77,7 @@ excluded_patterns=( *_test.py *_test_common.py # TODO(BEAM-7847): Remove this once doc generation can parse Py3 syntax. - *py3.py + *_py3*.py ) python $(type -p sphinx-apidoc) -fMeT -o target/docs/source apache_beam \ diff --git a/sdks/python/scripts/run_mini_py3lint.sh b/sdks/python/scripts/run_mini_py3lint.sh index 27ca3ceff06b..0bd7e0e5440a 100755 --- a/sdks/python/scripts/run_mini_py3lint.sh +++ b/sdks/python/scripts/run_mini_py3lint.sh @@ -38,6 +38,24 @@ set -o pipefail MODULE=apache_beam +PYTHON_MINOR=$(python -c 'import sys; print(sys.version_info[1])') +if [[ "${PYTHON_MINOR}" == 5 ]]; then + EXCLUDED_PY3_FILES=$(find ${MODULE} | grep 'py3[6-9]\.py$') + echo -e "Excluding Py3 files:\n${EXCLUDED_PY3_FILES}" +else + EXCLUDED_PY3_FILES="" +fi + +FILES_TO_IGNORE="" +for file in ${EXCLUDED_PY3_FILES}; do + if test -z "$FILES_TO_IGNORE" + then FILES_TO_IGNORE="$(basename $file)" + else FILES_TO_IGNORE="$FILES_TO_IGNORE, $(basename $file)" + fi +done + +echo -e "Skipping lint for files:\n${FILES_TO_IGNORE}" + usage(){ echo "Usage: $0 [MODULE|--help] # The default MODULE is $MODULE"; } if test $# -gt 0; then @@ -48,4 +66,5 @@ if test $# -gt 0; then fi echo "Running flake8 for module $MODULE:" -flake8 $MODULE --count --select=E9,F821,F822,F823 --show-source --statistics +flake8 $MODULE --count --select=E9,F821,F822,F823 --show-source --statistics \ + --exclude="${FILES_TO_IGNORE}" diff --git a/sdks/python/scripts/run_pylint.sh b/sdks/python/scripts/run_pylint.sh index 8a4b5e1061ae..a9978fe5950a 100755 --- a/sdks/python/scripts/run_pylint.sh +++ b/sdks/python/scripts/run_pylint.sh @@ -62,7 +62,7 @@ apache_beam/portability/api/*pb2*.py PYTHON_MAJOR=$(python -c 'import sys; print(sys.version_info[0])') if [[ "${PYTHON_MAJOR}" == 2 ]]; then - EXCLUDED_PY3_FILES=$(find ${MODULE} | grep 'py3\.py$') + EXCLUDED_PY3_FILES=$(find ${MODULE} | grep 'py3[0-9]*\.py$') echo -e "Excluding Py3 files:\n${EXCLUDED_PY3_FILES}" else EXCLUDED_PY3_FILES="" diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index f792bbfcc614..6fe8ecfbc64e 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -54,28 +54,28 @@ commands_post = [testenv:py27] commands = python apache_beam/examples/complete/autocomplete_test.py - python setup.py nosetests --ignore-files '.*py3.py$' {posargs} + python setup.py nosetests --ignore-files '.*py3\d?\.py$' {posargs} [testenv:py35] setenv = RUN_SKIPPED_PY3_TESTS=0 commands = python apache_beam/examples/complete/autocomplete_test.py - python setup.py nosetests {posargs} + python setup.py nosetests --ignore-files '.*py3[6-9]\.py$' {posargs} [testenv:py36] setenv = RUN_SKIPPED_PY3_TESTS=0 commands = python apache_beam/examples/complete/autocomplete_test.py - python setup.py nosetests {posargs} + python setup.py nosetests --ignore-files '.*py3[7-9]\.py$' {posargs} [testenv:py37] setenv = RUN_SKIPPED_PY3_TESTS=0 commands = python apache_beam/examples/complete/autocomplete_test.py - python setup.py nosetests {posargs} + python setup.py nosetests --ignore-files '.*py3[8-9]\.py$' {posargs} [testenv:py27-cython] # cython tests are only expected to work in linux (2.x and 3.x) @@ -85,7 +85,7 @@ commands = platform = linux2 commands = python apache_beam/examples/complete/autocomplete_test.py - python setup.py nosetests --ignore-files '.*py3.py$' {posargs} + python setup.py nosetests --ignore-files '.*py3\d?\.py$' {posargs} [testenv:py35-cython] # cython tests are only expected to work in linux (2.x and 3.x) @@ -97,7 +97,7 @@ setenv = RUN_SKIPPED_PY3_TESTS=0 commands = python apache_beam/examples/complete/autocomplete_test.py - python setup.py nosetests {posargs} + python setup.py nosetests --ignore-files '.*py3[5-9]\.py$' {posargs} [testenv:py36-cython] # cython tests are only expected to work in linux (2.x and 3.x) @@ -109,7 +109,7 @@ setenv = RUN_SKIPPED_PY3_TESTS=0 commands = python apache_beam/examples/complete/autocomplete_test.py - python setup.py nosetests {posargs} + python setup.py nosetests --ignore-files '.*py3[7-9]\.py$' {posargs} [testenv:py37-cython] # cython tests are only expected to work in linux (2.x and 3.x) @@ -121,13 +121,13 @@ setenv = RUN_SKIPPED_PY3_TESTS=0 commands = python apache_beam/examples/complete/autocomplete_test.py - python setup.py nosetests {posargs} + python setup.py nosetests --ignore-files '.*py3[8-9]\.py$' {posargs} [testenv:py27-gcp] extras = test,gcp commands = python apache_beam/examples/complete/autocomplete_test.py - python setup.py nosetests --ignore-files '.*py3.py$' {posargs} + python setup.py nosetests --ignore-files '.*py3\d?\.py$' {posargs} # Old and new Datastore client unit tests cannot be run in the same process # due to conflicting protobuf modules. # TODO(BEAM-4543): Remove these separate nosetests invocations once the @@ -140,14 +140,14 @@ setenv = RUN_SKIPPED_PY3_TESTS=0 extras = test,gcp commands = - python setup.py nosetests {posargs} + python setup.py nosetests --ignore-files '.*py3[6-9]\.py$' {posargs} [testenv:py37-gcp] setenv = RUN_SKIPPED_PY3_TESTS=0 extras = test,gcp commands = - python setup.py nosetests {posargs} + python setup.py nosetests --ignore-files '.*py3[8-9]\.py$' {posargs} [testenv:py27-lint] deps = From e470c4244e01df0f106ca8954af5783db8cd982f Mon Sep 17 00:00:00 2001 From: Chad Dombrova Date: Wed, 11 Sep 2019 11:16:37 -0700 Subject: [PATCH 6/7] Split dataclass-based external transform tests to run in python 3.7 only --- .../transforms/external_test_py3.py | 42 ----------- .../transforms/external_test_py37.py | 71 +++++++++++++++++++ 2 files changed, 71 insertions(+), 42 deletions(-) create mode 100644 sdks/python/apache_beam/transforms/external_test_py37.py diff --git a/sdks/python/apache_beam/transforms/external_test_py3.py b/sdks/python/apache_beam/transforms/external_test_py3.py index 26548e5e88e6..88fa870f17b5 100644 --- a/sdks/python/apache_beam/transforms/external_test_py3.py +++ b/sdks/python/apache_beam/transforms/external_test_py3.py @@ -26,16 +26,8 @@ from apache_beam import typehints from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload from apache_beam.transforms.external import AnnotationBasedPayloadBuilder -from apache_beam.transforms.external import DataclassBasedPayloadBuilder from apache_beam.transforms.external_test import PayloadBase -# pylint: disable=wrong-import-order, wrong-import-position -try: - import dataclasses -except ImportError: - dataclasses = None -# pylint: enable=wrong-import-order, wrong-import-position - def get_payload(cls): payload = ExternalConfigurationPayload() @@ -43,40 +35,6 @@ def get_payload(cls): return payload -@unittest.skipIf(dataclasses is None, 'dataclasses library not available') -class ExternalDataclassesPayloadTest(PayloadBase, unittest.TestCase): - - def get_payload_from_typing_hints(self, values): - - @dataclasses.dataclass - class DataclassTransform(beam.ExternalTransform): - URN = 'beam:external:fakeurn:v1' - - integer_example: int - string_example: str - list_of_strings: typing.List[str] - optional_kv: typing.Optional[typing.Tuple[str, float]] = None - optional_integer: typing.Optional[int] = None - expansion_service: dataclasses.InitVar[typing.Optional[str]] = None - - return get_payload(DataclassTransform(**values)) - - def get_payload_from_beam_typehints(self, values): - - @dataclasses.dataclass - class DataclassTransform(beam.ExternalTransform): - URN = 'beam:external:fakeurn:v1' - - integer_example: int - string_example: str - list_of_strings: typehints.List[str] - optional_kv: typehints.Optional[typehints.KV[str, float]] = None - optional_integer: typehints.Optional[int] = None - expansion_service: dataclasses.InitVar[typehints.Optional[str]] = None - - return get_payload(DataclassTransform(**values)) - - class ExternalAnnotationPayloadTest(PayloadBase, unittest.TestCase): def get_payload_from_typing_hints(self, values): diff --git a/sdks/python/apache_beam/transforms/external_test_py37.py b/sdks/python/apache_beam/transforms/external_test_py37.py new file mode 100644 index 000000000000..ad1ff72f0cfe --- /dev/null +++ b/sdks/python/apache_beam/transforms/external_test_py37.py @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for the transform.external classes.""" + +from __future__ import absolute_import + +import dataclasses +import typing +import unittest + +import apache_beam as beam +from apache_beam import typehints +from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload +from apache_beam.transforms.external_test import PayloadBase + + +def get_payload(cls): + payload = ExternalConfigurationPayload() + payload.ParseFromString(cls._payload) + return payload + + +class ExternalDataclassesPayloadTest(PayloadBase, unittest.TestCase): + + def get_payload_from_typing_hints(self, values): + + @dataclasses.dataclass + class DataclassTransform(beam.ExternalTransform): + URN = 'beam:external:fakeurn:v1' + + integer_example: int + string_example: str + list_of_strings: typing.List[str] + optional_kv: typing.Optional[typing.Tuple[str, float]] = None + optional_integer: typing.Optional[int] = None + expansion_service: dataclasses.InitVar[typing.Optional[str]] = None + + return get_payload(DataclassTransform(**values)) + + def get_payload_from_beam_typehints(self, values): + + @dataclasses.dataclass + class DataclassTransform(beam.ExternalTransform): + URN = 'beam:external:fakeurn:v1' + + integer_example: int + string_example: str + list_of_strings: typehints.List[str] + optional_kv: typehints.Optional[typehints.KV[str, float]] = None + optional_integer: typehints.Optional[int] = None + expansion_service: dataclasses.InitVar[typehints.Optional[str]] = None + + return get_payload(DataclassTransform(**values)) + +if __name__ == '__main__': + unittest.main() From 0ae5582d7a7df4af60a5cbb74afb7d307e5af0a8 Mon Sep 17 00:00:00 2001 From: Chad Dombrova Date: Wed, 11 Sep 2019 11:08:08 -0700 Subject: [PATCH 7/7] Add dataclasses as a native library for isort --- sdks/python/setup.cfg | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/python/setup.cfg b/sdks/python/setup.cfg index 69a5187c3875..361fe149a669 100644 --- a/sdks/python/setup.cfg +++ b/sdks/python/setup.cfg @@ -51,3 +51,6 @@ exclude_lines = [coverage:xml] output = target/site/cobertura/coverage.xml + +[isort] +known_standard_library = dataclasses