From 6cf70fabb42821d32a8135b469ad0c849f62f621 Mon Sep 17 00:00:00 2001 From: Valentyn Tymofieiev Date: Fri, 3 Aug 2018 18:31:01 -0700 Subject: [PATCH 1/4] Use (str, past.builtins.unicode) where six.string_types was used in the past. --- sdks/python/apache_beam/runners/common.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 38ae66649bf1..a714eaba121a 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -31,7 +31,7 @@ from builtins import zip from future.utils import raise_ -from past.builtins import basestring +from past.builtins import unicode from apache_beam.internal import util from apache_beam.options.value_provider import RuntimeValueProvider @@ -677,7 +677,7 @@ def process_outputs(self, windowed_input_element, results): tag = None if isinstance(result, TaggedOutput): tag = result.tag - if not isinstance(tag, basestring): + if not isinstance(tag, (str, unicode)): raise TypeError('In %s, tag %s is not a string' % (self, tag)) result = result.value if isinstance(result, WindowedValue): @@ -724,7 +724,7 @@ def finish_bundle_outputs(self, results): tag = None if isinstance(result, TaggedOutput): tag = result.tag - if not isinstance(tag, basestring): + if not isinstance(tag, (str, unicode)): raise TypeError('In %s, tag %s is not a string' % (self, tag)) result = result.value From 8bdf1a454653ea04027c948f9326e127e2ace3b2 Mon Sep 17 00:00:00 2001 From: Valentyn Tymofieiev Date: Fri, 3 Aug 2018 18:37:39 -0700 Subject: [PATCH 2/4] Use past.builtins.unicode where six.text_type was used in the past. --- sdks/python/apache_beam/io/gcp/pubsub.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 3b65fac80e75..9d43846af303 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -28,7 +28,7 @@ from builtins import object import six -from past.builtins import basestring +from past.builtins import unicode from apache_beam import coders from apache_beam.io.iobase import Read @@ -206,7 +206,7 @@ def expand(self, pvalue): | ReadFromPubSub(self.topic, self.subscription, self.id_label, with_attributes=False) | 'DecodeString' >> Map(lambda b: b.decode('utf-8'))) - p.element_type = basestring + p.element_type = unicode return p From 00299448677e2f9d72eae0439aad873c298f34d5 Mon Sep 17 00:00:00 2001 From: Valentyn Tymofieiev Date: Fri, 3 Aug 2018 18:50:04 -0700 Subject: [PATCH 3/4] For Py2/Py3 compatibility, add complimentary __ne__ methods which are autogenerated in Py3. --- sdks/python/apache_beam/testing/test_stream.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sdks/python/apache_beam/testing/test_stream.py b/sdks/python/apache_beam/testing/test_stream.py index caa74ec6c75f..519a1b5ba747 100644 --- a/sdks/python/apache_beam/testing/test_stream.py +++ b/sdks/python/apache_beam/testing/test_stream.py @@ -72,6 +72,9 @@ def __init__(self, timestamped_values): def __eq__(self, other): return self.timestamped_values == other.timestamped_values + def __ne__(self, other): + return not self == other + def __hash__(self): return hash(self.timestamped_values) @@ -88,6 +91,9 @@ def __init__(self, new_watermark): def __eq__(self, other): return self.new_watermark == other.new_watermark + def __ne__(self, other): + return not self == other + def __hash__(self): return hash(self.new_watermark) @@ -104,6 +110,9 @@ def __init__(self, advance_by): def __eq__(self, other): return self.advance_by == other.advance_by + def __ne__(self, other): + return not self == other + def __hash__(self): return hash(self.advance_by) From a0c1fe8fd07f862fd365e0352461aa75450563b5 Mon Sep 17 00:00:00 2001 From: Valentyn Tymofieiev Date: Fri, 3 Aug 2018 18:34:23 -0700 Subject: [PATCH 4/4] Use bytes where six.binary_type was used in the past. --- sdks/python/apache_beam/io/gcp/pubsub.py | 11 +++++------ sdks/python/apache_beam/io/gcp/pubsub_test.py | 4 ++-- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 9d43846af303..b193c82c3e41 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -27,7 +27,6 @@ import re from builtins import object -import six from past.builtins import unicode from apache_beam import coders @@ -57,7 +56,7 @@ class PubsubMessage(object): This interface is experimental. No backwards compatibility guarantees. Attributes: - data: (six.binary_type) Message data. May be None. + data: (bytes) Message data. May be None. attributes: (dict) Key-value map of str to str, containing both user-defined and service generated attributes (such as id_label and timestamp_attribute). May be None. @@ -150,7 +149,7 @@ def __init__(self, topic=None, subscription=None, id_label=None, case, deduplication of the stream will be strictly best effort. with_attributes: True - output elements will be :class:`~PubsubMessage` objects. - False - output elements will be of type ``six.binary_type`` (message + False - output elements will be of type ``bytes`` (message data only). timestamp_attribute: Message value to use as element timestamp. If None, uses message publishing time as the timestamp. @@ -175,7 +174,7 @@ def __init__(self, topic=None, subscription=None, id_label=None, def expand(self, pvalue): pcoll = pvalue.pipeline | Read(self._source) - pcoll.element_type = six.binary_type + pcoll.element_type = bytes if self.with_attributes: pcoll = pcoll | Map(PubsubMessage._from_proto_str) pcoll.element_type = PubsubMessage @@ -246,7 +245,7 @@ def __init__(self, topic, with_attributes=False, id_label=None, topic: Cloud Pub/Sub topic in the form "/topics//". with_attributes: True - input elements will be :class:`~PubsubMessage` objects. - False - input elements will be of type ``six.binary_type`` (message + False - input elements will be of type ``bytes`` (message data only). id_label: If set, will set an attribute for each Cloud Pub/Sub message with the given name and a unique value. This attribute can then be used @@ -275,7 +274,7 @@ def expand(self, pcoll): # Without attributes, message data is written as-is. With attributes, # message data + attributes are passed as a serialized protobuf string (see # ``PubsubMessage._to_proto_str`` for exact protobuf message type). - pcoll.element_type = six.binary_type + pcoll.element_type = bytes return pcoll | Write(self._sink) def to_runner_api_parameter(self, context): diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index c1dd05e874e7..6f9c2736b83d 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -120,7 +120,7 @@ def test_expand_with_topic(self): None, 'a_label', with_attributes=False, timestamp_attribute=None) | beam.Map(lambda x: x)) - self.assertEqual(str, pcoll.element_type) + self.assertEqual(bytes, pcoll.element_type) # Apply the necessary PTransformOverrides. overrides = _get_transform_overrides(p.options) @@ -143,7 +143,7 @@ def test_expand_with_subscription(self): None, 'projects/fakeprj/subscriptions/a_subscription', 'a_label', with_attributes=False, timestamp_attribute=None) | beam.Map(lambda x: x)) - self.assertEqual(str, pcoll.element_type) + self.assertEqual(bytes, pcoll.element_type) # Apply the necessary PTransformOverrides. overrides = _get_transform_overrides(p.options)