Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions sdks/python/apache_beam/io/gcp/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
import re
from builtins import object

import six
from past.builtins import basestring
from past.builtins import unicode

from apache_beam import coders
from apache_beam.io.iobase import Read
Expand Down Expand Up @@ -57,7 +56,7 @@ class PubsubMessage(object):
This interface is experimental. No backwards compatibility guarantees.

Attributes:
data: (six.binary_type) Message data. May be None.
data: (bytes) Message data. May be None.
attributes: (dict) Key-value map of str to str, containing both user-defined
and service generated attributes (such as id_label and
timestamp_attribute). May be None.
Expand Down Expand Up @@ -150,7 +149,7 @@ def __init__(self, topic=None, subscription=None, id_label=None,
case, deduplication of the stream will be strictly best effort.
with_attributes:
True - output elements will be :class:`~PubsubMessage` objects.
False - output elements will be of type ``six.binary_type`` (message
False - output elements will be of type ``bytes`` (message
data only).
timestamp_attribute: Message value to use as element timestamp. If None,
uses message publishing time as the timestamp.
Expand All @@ -175,7 +174,7 @@ def __init__(self, topic=None, subscription=None, id_label=None,

def expand(self, pvalue):
pcoll = pvalue.pipeline | Read(self._source)
pcoll.element_type = six.binary_type
pcoll.element_type = bytes
if self.with_attributes:
pcoll = pcoll | Map(PubsubMessage._from_proto_str)
pcoll.element_type = PubsubMessage
Expand Down Expand Up @@ -206,7 +205,7 @@ def expand(self, pvalue):
| ReadFromPubSub(self.topic, self.subscription, self.id_label,
with_attributes=False)
| 'DecodeString' >> Map(lambda b: b.decode('utf-8')))
p.element_type = basestring
p.element_type = unicode
return p


Expand Down Expand Up @@ -246,7 +245,7 @@ def __init__(self, topic, with_attributes=False, id_label=None,
topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>".
with_attributes:
True - input elements will be :class:`~PubsubMessage` objects.
False - input elements will be of type ``six.binary_type`` (message
False - input elements will be of type ``bytes`` (message
data only).
id_label: If set, will set an attribute for each Cloud Pub/Sub message
with the given name and a unique value. This attribute can then be used
Expand Down Expand Up @@ -275,7 +274,7 @@ def expand(self, pcoll):
# Without attributes, message data is written as-is. With attributes,
# message data + attributes are passed as a serialized protobuf string (see
# ``PubsubMessage._to_proto_str`` for exact protobuf message type).
pcoll.element_type = six.binary_type
pcoll.element_type = bytes
return pcoll | Write(self._sink)

def to_runner_api_parameter(self, context):
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/io/gcp/pubsub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def test_expand_with_topic(self):
None, 'a_label', with_attributes=False,
timestamp_attribute=None)
| beam.Map(lambda x: x))
self.assertEqual(str, pcoll.element_type)
self.assertEqual(bytes, pcoll.element_type)

# Apply the necessary PTransformOverrides.
overrides = _get_transform_overrides(p.options)
Expand All @@ -143,7 +143,7 @@ def test_expand_with_subscription(self):
None, 'projects/fakeprj/subscriptions/a_subscription',
'a_label', with_attributes=False, timestamp_attribute=None)
| beam.Map(lambda x: x))
self.assertEqual(str, pcoll.element_type)
self.assertEqual(bytes, pcoll.element_type)

# Apply the necessary PTransformOverrides.
overrides = _get_transform_overrides(p.options)
Expand Down
6 changes: 3 additions & 3 deletions sdks/python/apache_beam/runners/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from builtins import zip

from future.utils import raise_
from past.builtins import basestring
from past.builtins import unicode

from apache_beam.internal import util
from apache_beam.options.value_provider import RuntimeValueProvider
Expand Down Expand Up @@ -677,7 +677,7 @@ def process_outputs(self, windowed_input_element, results):
tag = None
if isinstance(result, TaggedOutput):
tag = result.tag
if not isinstance(tag, basestring):
if not isinstance(tag, (str, unicode)):
raise TypeError('In %s, tag %s is not a string' % (self, tag))
result = result.value
if isinstance(result, WindowedValue):
Expand Down Expand Up @@ -724,7 +724,7 @@ def finish_bundle_outputs(self, results):
tag = None
if isinstance(result, TaggedOutput):
tag = result.tag
if not isinstance(tag, basestring):
if not isinstance(tag, (str, unicode)):
raise TypeError('In %s, tag %s is not a string' % (self, tag))
result = result.value

Expand Down
9 changes: 9 additions & 0 deletions sdks/python/apache_beam/testing/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ def __init__(self, timestamped_values):
def __eq__(self, other):
return self.timestamped_values == other.timestamped_values

def __ne__(self, other):
return not self == other

def __hash__(self):
return hash(self.timestamped_values)

Expand All @@ -88,6 +91,9 @@ def __init__(self, new_watermark):
def __eq__(self, other):
return self.new_watermark == other.new_watermark

def __ne__(self, other):
return not self == other

def __hash__(self):
return hash(self.new_watermark)

Expand All @@ -104,6 +110,9 @@ def __init__(self, advance_by):
def __eq__(self, other):
return self.advance_by == other.advance_by

def __ne__(self, other):
return not self == other

def __hash__(self):
return hash(self.advance_by)

Expand Down